diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 29e23ebb..f0b12195 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -205,6 +205,7 @@ class GossipSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. try: await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: @@ -238,6 +239,8 @@ class GossipSub(IPubsubRouter): if topic in self.mesh: in_topic_gossipsub_peers = self.mesh[topic] else: + # It could be the case that we publish to a topic that we have not subscribe + # and the topic is not yet added to our `fanout`. if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index d05f024d..ca175e53 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -197,7 +197,7 @@ async def test_dense(num_hosts, pubsubs_gsub, hosts): # publish from the randomly chosen host await pubsubs_gsub[origin_idx].publish("foobar", msg_content) - await asyncio.sleep(1) + await asyncio.sleep(2) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() @@ -377,6 +377,12 @@ async def test_gossip_propagation(hosts, pubsubs_gsub): async def test_mesh_heartbeat( num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch ): + # It's difficult to set up the initial peer subscription condition. + # Ideally I would like to have initial mesh peer count that's below ``GossipSubDegree`` + # so I can test if `mesh_heartbeat` return correct peers to GRAFT. + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. total_peer_count = 14 topic = "TEST_MESH_HEARTBEAT" @@ -386,20 +392,24 @@ async def test_mesh_heartbeat( monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) peer_topics = {topic: fake_peer_ids} + # Monkeypatch the peer subscriptions monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count) mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] router_mesh = {topic: list(mesh_peers)} + # Monkeypatch our mesh peers monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat() if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is more than `GossipSubDegree`, we should PRUNE mesh peers assert len(peers_to_graft) == 0 assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree for peer in peers_to_prune: assert peer in mesh_peers elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is less than `GossipSubDegree`, we should GRAFT more peers assert len(peers_to_prune) == 0 assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count for peer in peers_to_graft: @@ -416,6 +426,9 @@ async def test_mesh_heartbeat( async def test_gossip_heartbeat( num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch ): + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. total_peer_count = 28 topic_mesh = "TEST_GOSSIP_HEARTBEAT_1" topic_fanout = "TEST_GOSSIP_HEARTBEAT_2" @@ -426,21 +439,25 @@ async def test_gossip_heartbeat( monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) topic_mesh_peer_count = 14 + # Split into mesh peers and fanout peers peer_topics = { topic_mesh: fake_peer_ids[:topic_mesh_peer_count], topic_fanout: fake_peer_ids[topic_mesh_peer_count:], } + # Monkeypatch the peer subscriptions monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count) mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] router_mesh = {topic_mesh: list(mesh_peers)} + # Monkeypatch our mesh peers monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) fanout_peer_indices = random.sample( range(topic_mesh_peer_count, total_peer_count), initial_peer_count ) fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices] router_fanout = {topic_fanout: list(fanout_peers)} + # Monkeypatch our fanout peers monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout) def window(topic): @@ -451,18 +468,24 @@ async def test_gossip_heartbeat( else: return [] + # Monkeypatch the memory cache messages monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window) peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat() + # If our mesh peer count is less than `GossipSubDegree`, we should gossip to up to + # `GossipSubDegree` peers (exclude mesh peers). if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree: + # The same goes for fanout so it's two times the number of peers to gossip. assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count) elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree: assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree) for peer in peers_to_gossip: if peer in peer_topics[topic_mesh]: + # Check that the peer to gossip to is not in our mesh peers assert peer not in mesh_peers assert topic_mesh in peers_to_gossip[peer] elif peer in peer_topics[topic_fanout]: + # Check that the peer to gossip to is not in our fanout peers assert peer not in fanout_peers assert topic_fanout in peers_to_gossip[peer]