From e51d376d5eb2568c4f7228e8a1e3c935bc64480b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Dec 2019 14:44:49 +0800 Subject: [PATCH] Combine `peers_gossipsub` and `peers_floodsub` --- libp2p/pubsub/gossipsub.py | 34 +++++++++------------------------- tests/pubsub/test_gossipsub.py | 11 +++++++---- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7b0759cf..8097f705 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -35,13 +35,11 @@ class GossipSub(IPubsubRouter): mesh: Dict[str, Set[ID]] fanout: Dict[str, Set[ID]] - peers_to_protocol: Dict[ID, str] + # The protocol peer supports + peer_protocol: Dict[ID, TProtocol] time_since_last_publish: Dict[str, int] - peers_gossipsub: Set[ID] - peers_floodsub: Set[ID] - mcache: MessageCache heartbeat_initial_delay: float @@ -75,14 +73,11 @@ class GossipSub(IPubsubRouter): self.fanout = {} # Create peer --> protocol mapping - self.peers_to_protocol = {} + self.peer_protocol = {} # Create topic --> time since last publish map self.time_since_last_publish = {} - self.peers_gossipsub = set() - self.peers_floodsub = set() - # Create message cache self.mcache = MessageCache(gossip_window, gossip_history) @@ -121,17 +116,13 @@ class GossipSub(IPubsubRouter): """ logger.debug("adding peer %s with protocol %s", peer_id, protocol_id) - if protocol_id == PROTOCOL_ID: - self.peers_gossipsub.add(peer_id) - elif protocol_id == floodsub.PROTOCOL_ID: - self.peers_floodsub.add(peer_id) - else: + if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID): # We should never enter here. Becuase the `protocol_id` is registered by your pubsub # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. raise Exception(f"Unreachable: Protocol={protocol_id} is not supported.") - self.peers_to_protocol[peer_id] = protocol_id + self.peer_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: """ @@ -141,15 +132,12 @@ class GossipSub(IPubsubRouter): """ logger.debug("removing peer %s", peer_id) - self.peers_gossipsub.discard(peer_id) - self.peers_floodsub.discard(peer_id) - for topic in self.mesh: self.mesh[topic].discard(peer_id) for topic in self.fanout: self.fanout[topic].discard(peer_id) - self.peers_to_protocol.pop(peer_id, None) + self.peer_protocol.pop(peer_id, None) async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ @@ -217,12 +205,10 @@ class GossipSub(IPubsubRouter): continue # floodsub peers - # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. - # This will improve the efficiency when searching for a peer's protocol id. floodsub_peers: Set[ID] = set( peer_id for peer_id in self.pubsub.peer_topics[topic] - if peer_id in self.peers_floodsub + if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID ) send_to.update(floodsub_peers) @@ -540,11 +526,9 @@ class GossipSub(IPubsubRouter): gossipsub_peers_in_topic = set( peer_id for peer_id in self.pubsub.peer_topics[topic] - if peer_id in self.peers_gossipsub - ) - return self.select_from_minus( - num_to_select, gossipsub_peers_in_topic, minus + if self.peer_protocol[peer_id] == PROTOCOL_ID ) + return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus) # RPC handlers diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index fe6bf3b8..1bc34260 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -4,6 +4,7 @@ import random import pytest from libp2p.peer.id import ID +from libp2p.pubsub.gossipsub import PROTOCOL_ID from libp2p.tools.constants import GOSSIPSUB_PARAMS, GossipsubParams from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect from libp2p.tools.utils import connect @@ -108,7 +109,7 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): monkeypatch.setattr(gossipsubs[index_bob], "emit_prune", emit_prune) # Check that alice is bob's peer but not his mesh peer - assert id_alice in gossipsubs[index_bob].peers_gossipsub + assert gossipsubs[index_bob].peer_protocol[id_alice] == PROTOCOL_ID assert topic not in gossipsubs[index_bob].mesh await gossipsubs[index_alice].emit_graft(topic, id_bob) @@ -120,7 +121,7 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): # Check that bob is alice's peer but not her mesh peer assert topic in gossipsubs[index_alice].mesh assert id_bob not in gossipsubs[index_alice].mesh[topic] - assert id_bob in gossipsubs[index_alice].peers_gossipsub + assert gossipsubs[index_alice].peer_protocol[id_bob] == PROTOCOL_ID await gossipsubs[index_bob].emit_graft(topic, id_alice) @@ -390,7 +391,8 @@ async def test_mesh_heartbeat( fake_peer_ids = [ ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) ] - monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", set(fake_peer_ids)) + peer_protocol = {peer_id: PROTOCOL_ID for peer_id in fake_peer_ids} + monkeypatch.setattr(pubsubs_gsub[0].router, "peer_protocol", peer_protocol) peer_topics = {topic: set(fake_peer_ids)} # Monkeypatch the peer subscriptions @@ -437,7 +439,8 @@ async def test_gossip_heartbeat( fake_peer_ids = [ ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) ] - monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", set(fake_peer_ids)) + peer_protocol = {peer_id: PROTOCOL_ID for peer_id in fake_peer_ids} + monkeypatch.setattr(pubsubs_gsub[0].router, "peer_protocol", peer_protocol) topic_mesh_peer_count = 14 # Split into mesh peers and fanout peers