diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 01576ae7..31ab606f 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -204,8 +204,9 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + if peer not in self.mesh[topic]: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) if topic_in_fanout: del self.fanout[topic] @@ -281,7 +282,12 @@ class GossipSub(IPubsubRouter): self.mesh[topic] ) - for peer in selected_peers: + fanout_peers_not_in_mesh = [ + peer + for peer in selected_peers + if peer not in self.mesh[topic] + ] + for peer in fanout_peers_not_in_mesh: # Add peer to mesh[topic] self.mesh[topic].append(peer) @@ -460,9 +466,11 @@ class GossipSub(IPubsubRouter): # Add peer to mesh for topic if topic in self.mesh: - self.mesh[topic].append(from_id_str) + if from_id_str not in self.mesh[topic]: + self.mesh[topic].append(from_id_str) else: - self.mesh[topic] = [from_id_str] + # Respond with PRUNE if not subscribed to the topic + await self.emit_prune(topic, sender_peer_id) async def handle_prune(self, prune_msg, sender_peer_id): topic = prune_msg.topicID diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index a8f58d36..bb47135f 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -80,8 +80,8 @@ async def test_leave(): # Create pubsub, gossipsub instances _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) gossipsub = gossipsubs[0] topic = "test_leave" @@ -98,6 +98,110 @@ async def test_leave(): await cleanup() +@pytest.mark.asyncio +async def test_handle_graft(event_loop, monkeypatch): + num_hosts = 2 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + index_alice = 0 + id_alice = str(libp2p_hosts[index_alice].get_id()) + index_bob = 1 + id_bob = str(libp2p_hosts[index_bob].get_id()) + await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + topic = "test_handle_graft" + # Only lice subscribe to the topic + await gossipsubs[index_alice].join(topic) + + # Monkey patch bob's `emit_prune` function so we can + # check if it is called in `handle_graft` + event_emit_prune = asyncio.Event() + async def emit_prune(topic, sender_peer_id): + event_emit_prune.set() + + monkeypatch.setattr(gossipsubs[index_bob], 'emit_prune', emit_prune) + + # Check that alice is bob's peer but not his mesh peer + assert id_alice in gossipsubs[index_bob].peers_gossipsub + assert topic not in gossipsubs[index_bob].mesh + + await gossipsubs[index_alice].emit_graft(topic, id_bob) + + # Check that `emit_prune` is called + await asyncio.wait_for( + event_emit_prune.wait(), + timeout=1, + loop=event_loop, + ) + assert event_emit_prune.is_set() + + # Check that bob is alice's peer but not her mesh peer + assert topic in gossipsubs[index_alice].mesh + assert id_bob not in gossipsubs[index_alice].mesh[topic] + assert id_bob in gossipsubs[index_alice].peers_gossipsub + + await gossipsubs[index_bob].emit_graft(topic, id_alice) + + await asyncio.sleep(1) + + # Check that bob is now alice's mesh peer + assert id_bob in gossipsubs[index_alice].mesh[topic] + + await cleanup() + + +@pytest.mark.asyncio +async def test_handle_prune(): + num_hosts = 2 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 3) + + index_alice = 0 + id_alice = str(libp2p_hosts[index_alice].get_id()) + index_bob = 1 + id_bob = str(libp2p_hosts[index_bob].get_id()) + + topic = "test_handle_prune" + for pubsub in pubsubs: + await pubsub.subscribe(topic) + + await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + + # Wait 3 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(3) + + # Check that they are each other's mesh peer + assert id_alice in gossipsubs[index_bob].mesh[topic] + assert id_bob in gossipsubs[index_alice].mesh[topic] + + # alice emit prune message to bob, alice should be removed + # from bob's mesh peer + await gossipsubs[index_alice].emit_prune(topic, id_bob) + + # FIXME: This test currently works because the heartbeat interval + # is increased to 3 seconds, so alice won't get add back into + # bob's mesh peer during heartbeat. + await asyncio.sleep(1) + + # Check that alice is no longer bob's mesh peer + assert id_alice not in gossipsubs[index_bob].mesh[topic] + assert id_bob in gossipsubs[index_alice].mesh[topic] + + await cleanup() + + @pytest.mark.asyncio async def test_dense(): # Create libp2p hosts