From e59ac6a250ee44d24139c693ed1dd897f753d4ad Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 30 Nov 2019 17:12:37 +0800 Subject: [PATCH 01/25] Cleanup TODOs in pubsub --- libp2p/pubsub/pubsub.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index fab8024b..8a85a894 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -78,7 +78,6 @@ class Pubsub: topic_validators: Dict[str, TopicValidator] - # TODO: Be sure it is increased atomically everytime. counter: int # uint64 _tasks: List["asyncio.Future[Any]"] @@ -300,7 +299,6 @@ class Pubsub: logger.debug("Fail to add new peer %s: stream closed", peer_id) del self.peers[peer_id] return - # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. try: self.router.add_peer(peer_id, stream.get_protocol()) @@ -328,7 +326,6 @@ class Pubsub: """ Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol - TODO: Handle failure for when the peer does not support any of the pubsub protocols we support """ while True: From 0a52a053758213a682e0d059289e1a1b6027b046 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 30 Nov 2019 20:02:11 +0800 Subject: [PATCH 02/25] Del entry if no more peers subscribe to the topic --- libp2p/pubsub/pubsub.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8a85a894..a3827f9f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -316,7 +316,11 @@ class Pubsub: for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: - self.peer_topics[topic].remove(peer_id) + # Delete the entry if no other peers left + if len(self.peer_topics[topic]) == 1: + del self.peer_topics[topic] + else: + self.peer_topics[topic].remove(peer_id) self.router.remove_peer(peer_id) @@ -361,7 +365,11 @@ class Pubsub: else: if sub_message.topicid in self.peer_topics: if origin_id in self.peer_topics[sub_message.topicid]: - self.peer_topics[sub_message.topicid].remove(origin_id) + # Delete the entry if no other peers left + if len(self.peer_topics[sub_message.topicid]) == 1: + del self.peer_topics[sub_message.topicid] + else: + self.peer_topics[sub_message.topicid].remove(origin_id) # FIXME(mhchia): Change the function name? async def handle_talk(self, publish_message: rpc_pb2.Message) -> None: From 50fd0acf41f929eb50c0f3b2ba672e9ece876227 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 30 Nov 2019 20:19:17 +0800 Subject: [PATCH 03/25] Cleanup outdated TODOs in gossipsub --- libp2p/pubsub/gossipsub.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d09db76e..ea2594e0 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -106,7 +106,8 @@ class GossipSub(IPubsubRouter): logger.debug("attached to pusub") # Start heartbeat now that we have a pubsub instance - # TODO: Start after delay + # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 + await asyncio.sleep(0.1) asyncio.ensure_future(self.heartbeat()) def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: @@ -127,8 +128,7 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - # TODO: Better handling - raise Exception(f"protocol is not supported: protocol_id={protocol_id}") + raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.") self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: @@ -187,7 +187,6 @@ class GossipSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. try: await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: @@ -218,15 +217,9 @@ class GossipSub(IPubsubRouter): # gossipsub peers in_topic_gossipsub_peers: List[ID] = None - # TODO: Do we need to check `topic in self.pubsub.my_topics`? if topic in self.mesh: in_topic_gossipsub_peers = self.mesh[topic] else: - # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not - # subscribed? - # I assume there could be short periods between heartbeats where topic may not - # be but we should check that this path gets hit appropriately - if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( @@ -373,7 +366,6 @@ class GossipSub(IPubsubRouter): for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: - # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in a topic and only add them if they are gossipsub peers too if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] @@ -397,7 +389,6 @@ class GossipSub(IPubsubRouter): if topic not in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: - # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in topic and only add if they are gossipsub peers also if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] @@ -409,7 +400,6 @@ class GossipSub(IPubsubRouter): peer not in self.mesh[topic] and peer not in self.fanout[topic] ): - msg_id_strs = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) From 0672f5ae6dca04a07731fb4bedbd423619067bb2 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 16:38:48 +0800 Subject: [PATCH 04/25] Fix: move heartbeat delay to `heartbeat` --- libp2p/pubsub/gossipsub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index ea2594e0..6d88f874 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -106,8 +106,6 @@ class GossipSub(IPubsubRouter): logger.debug("attached to pusub") # Start heartbeat now that we have a pubsub instance - # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 - await asyncio.sleep(0.1) asyncio.ensure_future(self.heartbeat()) def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: @@ -295,6 +293,8 @@ class GossipSub(IPubsubRouter): Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat """ + # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 + await asyncio.sleep(0.1) while True: await self.mesh_heartbeat() From c2d88962c7751d67b13c4d658395e498239dac03 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 16:55:16 +0800 Subject: [PATCH 05/25] Add gossipsub `heartbeat_initial_delay` --- libp2p/pubsub/gossipsub.py | 5 ++++- libp2p/tools/constants.py | 1 + libp2p/tools/factories.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 6d88f874..7fbcfdfb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -43,6 +43,7 @@ class GossipSub(IPubsubRouter): mcache: MessageCache + heartbeat_initial_delay: float heartbeat_interval: int def __init__( @@ -54,6 +55,7 @@ class GossipSub(IPubsubRouter): time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, + heartbeat_initial_delay: int = 0.1, heartbeat_interval: int = 120, ) -> None: self.protocols = list(protocols) @@ -84,6 +86,7 @@ class GossipSub(IPubsubRouter): self.mcache = MessageCache(gossip_window, gossip_history) # Create heartbeat timer + self.heartbeat_initial_delay = heartbeat_initial_delay self.heartbeat_interval = heartbeat_interval # Interface functions @@ -294,7 +297,7 @@ class GossipSub(IPubsubRouter): state changes in the preceding heartbeat """ # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 - await asyncio.sleep(0.1) + await asyncio.sleep(self.heartbeat_initial_delay) while True: await self.mesh_heartbeat() diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 34dade46..2f132bb0 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -24,6 +24,7 @@ class GossipsubParams(NamedTuple): time_to_live: int = 30 gossip_window: int = 3 gossip_history: int = 5 + heartbeat_initial_delay: int = 0.1 heartbeat_interval: float = 0.5 diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index b189cfa1..1b98eaa6 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -142,6 +142,7 @@ class GossipsubFactory(factory.Factory): time_to_live = GOSSIPSUB_PARAMS.time_to_live gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_history = GOSSIPSUB_PARAMS.gossip_history + heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval From 357341e0d82b3ab0d3c0e4c3ffe11fe2f758ccd2 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 22:40:35 +0800 Subject: [PATCH 06/25] Remove unneccessary filter and check in gossipsub --- libp2p/pubsub/gossipsub.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7fbcfdfb..78e55040 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -320,10 +320,7 @@ class GossipSub(IPubsubRouter): topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] ) - fanout_peers_not_in_mesh: List[ID] = [ - peer for peer in selected_peers if peer not in self.mesh[topic] - ] - for peer in fanout_peers_not_in_mesh: + for peer in selected_peers: # Add peer to mesh[topic] self.mesh[topic].append(peer) @@ -377,12 +374,7 @@ class GossipSub(IPubsubRouter): ) for peer in peers_to_emit_ihave_to: - # TODO: this line is a monster, can hopefully be simplified - if ( - topic not in self.mesh or (peer not in self.mesh[topic]) - ) and ( - topic not in self.fanout or (peer not in self.fanout[topic]) - ): + if peer not in self.mesh[topic]: msg_id_strs = [str(msg_id) for msg_id in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) @@ -399,10 +391,7 @@ class GossipSub(IPubsubRouter): topic, self.degree, [] ) for peer in peers_to_emit_ihave_to: - if ( - peer not in self.mesh[topic] - and peer not in self.fanout[topic] - ): + if peer not in self.fanout[topic]: msg_id_strs = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) From a7e0c5d7378a9bfd28ac3f1431ab3cb851b31b6d Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 22:41:49 +0800 Subject: [PATCH 07/25] Add missing cleanup in gossipsub `remove_peer` --- libp2p/pubsub/gossipsub.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 78e55040..39397003 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -145,6 +145,21 @@ class GossipSub(IPubsubRouter): elif peer_id in self.peers_floodsub: self.peers_floodsub.remove(peer_id) + for topic in self.mesh: + if peer_id in self.mesh[topic]: + # Delete the entry if no other peers left + if len(self.mesh[topic]) == 1: + del self.mesh[topic] + else: + self.mesh[topic].remove(peer_id) + for topic in self.fanout: + if peer_id in self.fanout[topic]: + # Delete the entry if no other peers left + if len(self.fanout[topic]) == 1: + del self.fanout[topic] + else: + self.fanout[topic].remove(peer_id) + self.peers_to_protocol.pop(peer_id, None) async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: From 920cf646efe8cb979b55c96979008a08f368da4e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 22:49:27 +0800 Subject: [PATCH 08/25] Fix lint and add check in fanout heartbeat --- libp2p/pubsub/gossipsub.py | 19 +++++++++++++++---- libp2p/pubsub/pubsub.py | 8 +++----- libp2p/tools/constants.py | 2 +- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 39397003..831e6d3e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -55,7 +55,7 @@ class GossipSub(IPubsubRouter): time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, - heartbeat_initial_delay: int = 0.1, + heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, ) -> None: self.protocols = list(protocols) @@ -129,7 +129,9 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.") + raise Exception( + f"This should not happen. Protocol={protocol_id} is not supported." + ) self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: @@ -357,13 +359,22 @@ class GossipSub(IPubsubRouter): async def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: - # If time since last published > ttl + # Delete topic entry if it's not in `pubsub.peer_topics` + # or if it's time-since-last-published > ttl # TODO: there's no way time_since_last_publish gets set anywhere yet - if self.time_since_last_publish[topic] > self.time_to_live: + if ( + topic not in self.pubsub.peer_topics + or self.time_since_last_publish[topic] > self.time_to_live + ): # Remove topic from fanout del self.fanout[topic] del self.time_since_last_publish[topic] else: + # Check whether our peers are still in the topic + # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 + for peer in self.fanout[topic]: + if peer not in self.pubsub.peer_topics[topic]: + self.fanout[topic].remove(peer) num_fanout_peers_in_topic = len(self.fanout[topic]) # If |fanout[topic]| < D diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a3827f9f..45e7ffca 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -327,11 +327,9 @@ class Pubsub: logger.debug("removed dead peer %s", peer_id) async def handle_peer_queue(self) -> None: - """ - Continuously read from peer queue and each time a new peer is found, - open a stream to the peer using a supported pubsub protocol - pubsub protocols we support - """ + """Continuously read from peer queue and each time a new peer is found, + open a stream to the peer using a supported pubsub protocol pubsub + protocols we support.""" while True: peer_id: ID = await self.peer_queue.get() # Add Peer diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 2f132bb0..8c22d151 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -24,7 +24,7 @@ class GossipsubParams(NamedTuple): time_to_live: int = 30 gossip_window: int = 3 gossip_history: int = 5 - heartbeat_initial_delay: int = 0.1 + heartbeat_initial_delay: float = 0.1 heartbeat_interval: float = 0.5 From ab1500c7084ba377e76fa4bd97a19ad03a6922ff Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 15:03:06 +0800 Subject: [PATCH 09/25] Remove unneccessary check in gossip heartbeat --- libp2p/pubsub/gossipsub.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 831e6d3e..cf48f53a 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -396,13 +396,12 @@ class GossipSub(IPubsubRouter): if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] + topic, self.degree, self.mesh[topic] ) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in self.mesh[topic]: - msg_id_strs = [str(msg_id) for msg_id in msg_ids] - await self.emit_ihave(topic, msg_id_strs, peer) + await self.emit_ihave(topic, msg_id_strs, peer) # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -414,12 +413,11 @@ class GossipSub(IPubsubRouter): if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] + topic, self.degree, self.fanout[topic] ) + msg_id_strs = [str(msg) for msg in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in self.fanout[topic]: - msg_id_strs = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_id_strs, peer) + await self.emit_ihave(topic, msg_id_strs, peer) self.mcache.shift() From 5efdf4c7036cacce8a2c49fbc67534c721801845 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 15:48:23 +0800 Subject: [PATCH 10/25] Group messages for peer in heartbeat --- libp2p/pubsub/gossipsub.py | 134 +++++++++++++++++++++++++++++++++---- 1 file changed, 120 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index cf48f53a..29e23ebb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -2,7 +2,7 @@ from ast import literal_eval import asyncio import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Set +from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -305,6 +305,75 @@ class GossipSub(IPubsubRouter): # Forget mesh[topic] del self.mesh[topic] + async def _emit_control_msgs( + self, + peers_to_graft: Dict[ID, List[str]], + peers_to_prune: Dict[ID, List[str]], + peers_to_gossip: Dict[ID, Dict[str, List[str]]], + ) -> None: + # Starting with GRAFT messages + for peer, topics in peers_to_graft.items(): + graft_msgs: List[rpc_pb2.ControlGraft] = [] + for topic in topics: + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() + graft_msg.topicID = topic + graft_msgs.append(graft_msg) + + # If there are also PRUNE messages to send to this peer + if peer in peers_to_prune: + prune_msgs: List[rpc_pb2.ControlPrune] = [] + for topic in peers_to_prune[peer]: + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() + prune_msg.topicID = topic + prune_msgs.append(prune_msg) + del peers_to_prune[peer] + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + ihave_msgs: List[rpc_pb2.ControlIHave] = [] + for topic in peers_to_gossip[peer]: + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, graft_msgs, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Next with PRUNE messages + for peer, topics in peers_to_prune.items(): + prune_msgs = [] + for topic in topics: + prune_msg = rpc_pb2.ControlPrune() + prune_msg.topicID = topic + prune_msgs.append(prune_msg) + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, None, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Fianlly IHAVE messages + for peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + + control_msg = self.pack_control_msgs(ihave_msgs, None, None) + await self.emit_control_message(control_msg, peer) + # Heartbeat async def heartbeat(self) -> None: """ @@ -316,15 +385,24 @@ class GossipSub(IPubsubRouter): # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 await asyncio.sleep(self.heartbeat_initial_delay) while True: + # Maintain mesh and keep track of which peers to send GRAFT or PRUNE to + peers_to_graft, peers_to_prune = self.mesh_heartbeat() + # Maintain fanout + self.fanout_heartbeat() + # Get the peers to send IHAVE to + peers_to_gossip = self.gossip_heartbeat() + # Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it + await self._emit_control_msgs( + peers_to_graft, peers_to_prune, peers_to_gossip + ) - await self.mesh_heartbeat() - await self.fanout_heartbeat() - await self.gossip_heartbeat() + self.mcache.shift() await asyncio.sleep(self.heartbeat_interval) - async def mesh_heartbeat(self) -> None: - # Note: the comments here are the exact pseudocode from the spec + def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]: + peers_to_graft: Dict[ID, List[str]] = {} + peers_to_prune: Dict[ID, List[str]] = {} for topic in self.mesh: # Skip if no peers have subscribed to the topic if topic not in self.pubsub.peer_topics: @@ -342,7 +420,10 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) # Emit GRAFT(topic) control message to peer - await self.emit_graft(topic, peer) + if peer not in peers_to_graft: + peers_to_graft[peer] = [topic] + else: + peers_to_graft[peer].append(topic) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] @@ -354,9 +435,13 @@ class GossipSub(IPubsubRouter): self.mesh[topic].remove(peer) # Emit PRUNE(topic) control message to peer - await self.emit_prune(topic, peer) + if peer not in peers_to_prune: + peers_to_prune[peer] = [topic] + else: + peers_to_prune[peer].append(topic) + return peers_to_graft, peers_to_prune - async def fanout_heartbeat(self) -> None: + def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: # Delete topic entry if it's not in `pubsub.peer_topics` @@ -388,7 +473,8 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - async def gossip_heartbeat(self) -> None: + def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]: + peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {} for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: @@ -401,7 +487,10 @@ class GossipSub(IPubsubRouter): msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - await self.emit_ihave(topic, msg_id_strs, peer) + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -417,9 +506,11 @@ class GossipSub(IPubsubRouter): ) msg_id_strs = [str(msg) for msg in msg_ids] for peer in peers_to_emit_ihave_to: - await self.emit_ihave(topic, msg_id_strs, peer) - - self.mcache.shift() + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs + return peers_to_gossip @staticmethod def select_from_minus( @@ -554,6 +645,21 @@ class GossipSub(IPubsubRouter): # RPC emitters + def pack_control_msgs( + self, + ihave_msgs: List[rpc_pb2.ControlIHave], + graft_msgs: List[rpc_pb2.ControlGraft], + prune_msgs: List[rpc_pb2.ControlPrune], + ) -> rpc_pb2.ControlMessage: + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() + if ihave_msgs: + control_msg.ihave.extend(ihave_msgs) + if graft_msgs: + control_msg.graft.extend(graft_msgs) + if prune_msgs: + control_msg.prune.extend(prune_msgs) + return control_msg + async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: """Emit ihave message, sent to to_peer, for topic and msg_ids.""" From 8dec0b111d861cc64202d19fada86eead3dde045 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 15:49:45 +0800 Subject: [PATCH 11/25] Add test for mesh heartbeat --- tests/pubsub/test_gossipsub.py | 42 +++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 2121f8fb..3ff602d1 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -3,7 +3,8 @@ import random import pytest -from libp2p.tools.constants import GossipsubParams +from libp2p.peer.id import ID +from libp2p.tools.constants import GOSSIPSUB_PARAMS, GossipsubParams from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect from libp2p.tools.utils import connect @@ -366,3 +367,42 @@ async def test_gossip_propagation(hosts, pubsubs_gsub): # should be able to read message msg = await queue_1.get() assert msg.data == msg_content + + +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),) +) +@pytest.mark.parametrize("initial_mesh_peer_count", (7, 10, 13)) +@pytest.mark.asyncio +async def test_mesh_heartbeat( + num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch +): + total_peer_count = 14 + topic = "TEST_MESH_HEARTBEAT" + + fake_peer_ids = [ + ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) + ] + monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) + + peer_topics = {topic: fake_peer_ids} + monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) + + mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count) + mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] + router_mesh = {topic: list(mesh_peers)} + monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) + + peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat() + if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree: + assert len(peers_to_graft) == 0 + assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree + for peer in peers_to_prune: + assert peer in mesh_peers + elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree: + assert len(peers_to_prune) == 0 + assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count + for peer in peers_to_graft: + assert peer not in mesh_peers + else: + assert len(peers_to_prune) == 0 and len(peers_to_graft) == 0 From b405fd76e95dee070c556f3921bb2473fe1b765f Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 15:49:58 +0800 Subject: [PATCH 12/25] Add test for gossip heartbeat --- tests/pubsub/test_gossipsub.py | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 3ff602d1..a8a89a6c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -406,3 +406,63 @@ async def test_mesh_heartbeat( assert peer not in mesh_peers else: assert len(peers_to_prune) == 0 and len(peers_to_graft) == 0 + + +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),) +) +@pytest.mark.parametrize("initial_peer_count", (1, 4, 7)) +@pytest.mark.asyncio +async def test_gossip_heartbeat( + num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch +): + total_peer_count = 28 + topic_mesh = "TEST_GOSSIP_HEARTBEAT_1" + topic_fanout = "TEST_GOSSIP_HEARTBEAT_2" + + fake_peer_ids = [ + ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) + ] + monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) + + topic_mesh_peer_count = 14 + peer_topics = { + topic_mesh: fake_peer_ids[:topic_mesh_peer_count], + topic_fanout: fake_peer_ids[topic_mesh_peer_count:], + } + monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) + + mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count) + mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] + router_mesh = {topic_mesh: list(mesh_peers)} + monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) + fanout_peer_indices = random.sample( + range(topic_mesh_peer_count, total_peer_count), initial_peer_count + ) + fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices] + router_fanout = {topic_fanout: list(fanout_peers)} + monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout) + + def window(topic): + if topic == topic_mesh: + return [topic_mesh] + elif topic == topic_fanout: + return [topic_fanout] + else: + return [] + + monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window) + + peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat() + if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree: + assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count) + elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree: + assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree) + + for peer in peers_to_gossip: + if peer in peer_topics[topic_mesh]: + assert peer not in mesh_peers + assert topic_mesh in peers_to_gossip[peer] + elif peer in peer_topics[topic_fanout]: + assert peer not in fanout_peers + assert topic_fanout in peers_to_gossip[peer] From 60bd4694a4cdb12822ca1e2769635b2adea81449 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 18:03:45 +0800 Subject: [PATCH 13/25] Extend wait time for test to pass --- tests/pubsub/test_gossipsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index a8a89a6c..d05f024d 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -197,7 +197,7 @@ async def test_dense(num_hosts, pubsubs_gsub, hosts): # publish from the randomly chosen host await pubsubs_gsub[origin_idx].publish("foobar", msg_content) - await asyncio.sleep(0.5) + await asyncio.sleep(1) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() From ea6cd30a16269537e8f93f96b7e6ce35777c0b00 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 18:45:33 +0800 Subject: [PATCH 14/25] Add back some comment and TODO. Add comment to tests --- libp2p/pubsub/gossipsub.py | 3 +++ tests/pubsub/test_gossipsub.py | 25 ++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 29e23ebb..f0b12195 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -205,6 +205,7 @@ class GossipSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. try: await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: @@ -238,6 +239,8 @@ class GossipSub(IPubsubRouter): if topic in self.mesh: in_topic_gossipsub_peers = self.mesh[topic] else: + # It could be the case that we publish to a topic that we have not subscribe + # and the topic is not yet added to our `fanout`. if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index d05f024d..ca175e53 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -197,7 +197,7 @@ async def test_dense(num_hosts, pubsubs_gsub, hosts): # publish from the randomly chosen host await pubsubs_gsub[origin_idx].publish("foobar", msg_content) - await asyncio.sleep(1) + await asyncio.sleep(2) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() @@ -377,6 +377,12 @@ async def test_gossip_propagation(hosts, pubsubs_gsub): async def test_mesh_heartbeat( num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch ): + # It's difficult to set up the initial peer subscription condition. + # Ideally I would like to have initial mesh peer count that's below ``GossipSubDegree`` + # so I can test if `mesh_heartbeat` return correct peers to GRAFT. + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. total_peer_count = 14 topic = "TEST_MESH_HEARTBEAT" @@ -386,20 +392,24 @@ async def test_mesh_heartbeat( monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) peer_topics = {topic: fake_peer_ids} + # Monkeypatch the peer subscriptions monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count) mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] router_mesh = {topic: list(mesh_peers)} + # Monkeypatch our mesh peers monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat() if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is more than `GossipSubDegree`, we should PRUNE mesh peers assert len(peers_to_graft) == 0 assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree for peer in peers_to_prune: assert peer in mesh_peers elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is less than `GossipSubDegree`, we should GRAFT more peers assert len(peers_to_prune) == 0 assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count for peer in peers_to_graft: @@ -416,6 +426,9 @@ async def test_mesh_heartbeat( async def test_gossip_heartbeat( num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch ): + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. total_peer_count = 28 topic_mesh = "TEST_GOSSIP_HEARTBEAT_1" topic_fanout = "TEST_GOSSIP_HEARTBEAT_2" @@ -426,21 +439,25 @@ async def test_gossip_heartbeat( monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) topic_mesh_peer_count = 14 + # Split into mesh peers and fanout peers peer_topics = { topic_mesh: fake_peer_ids[:topic_mesh_peer_count], topic_fanout: fake_peer_ids[topic_mesh_peer_count:], } + # Monkeypatch the peer subscriptions monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count) mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] router_mesh = {topic_mesh: list(mesh_peers)} + # Monkeypatch our mesh peers monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) fanout_peer_indices = random.sample( range(topic_mesh_peer_count, total_peer_count), initial_peer_count ) fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices] router_fanout = {topic_fanout: list(fanout_peers)} + # Monkeypatch our fanout peers monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout) def window(topic): @@ -451,18 +468,24 @@ async def test_gossip_heartbeat( else: return [] + # Monkeypatch the memory cache messages monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window) peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat() + # If our mesh peer count is less than `GossipSubDegree`, we should gossip to up to + # `GossipSubDegree` peers (exclude mesh peers). if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree: + # The same goes for fanout so it's two times the number of peers to gossip. assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count) elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree: assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree) for peer in peers_to_gossip: if peer in peer_topics[topic_mesh]: + # Check that the peer to gossip to is not in our mesh peers assert peer not in mesh_peers assert topic_mesh in peers_to_gossip[peer] elif peer in peer_topics[topic_fanout]: + # Check that the peer to gossip to is not in our fanout peers assert peer not in fanout_peers assert topic_fanout in peers_to_gossip[peer] From bb15c817b1000eff7d5e23934ab412df523d2b3a Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 22:14:45 +0800 Subject: [PATCH 15/25] Fix var access before assignment --- libp2p/pubsub/gossipsub.py | 6 +++--- tests/pubsub/test_gossipsub.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f0b12195..3b6e7366 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -314,9 +314,11 @@ class GossipSub(IPubsubRouter): peers_to_prune: Dict[ID, List[str]], peers_to_gossip: Dict[ID, Dict[str, List[str]]], ) -> None: + graft_msgs: List[rpc_pb2.ControlGraft] = [] + prune_msgs: List[rpc_pb2.ControlPrune] = [] + ihave_msgs: List[rpc_pb2.ControlIHave] = [] # Starting with GRAFT messages for peer, topics in peers_to_graft.items(): - graft_msgs: List[rpc_pb2.ControlGraft] = [] for topic in topics: graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() graft_msg.topicID = topic @@ -324,7 +326,6 @@ class GossipSub(IPubsubRouter): # If there are also PRUNE messages to send to this peer if peer in peers_to_prune: - prune_msgs: List[rpc_pb2.ControlPrune] = [] for topic in peers_to_prune[peer]: prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic @@ -333,7 +334,6 @@ class GossipSub(IPubsubRouter): # If there are also IHAVE messages to send to this peer if peer in peers_to_gossip: - ihave_msgs: List[rpc_pb2.ControlIHave] = [] for topic in peers_to_gossip[peer]: ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index ca175e53..183ff6be 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -197,7 +197,7 @@ async def test_dense(num_hosts, pubsubs_gsub, hosts): # publish from the randomly chosen host await pubsubs_gsub[origin_idx].publish("foobar", msg_content) - await asyncio.sleep(2) + await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() From a9abf1e3dd5540fd59f29822f91c2ec07eb563e8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 22:37:49 +0800 Subject: [PATCH 16/25] Fix list deletion and add list remove check --- libp2p/pubsub/gossipsub.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3b6e7366..65253e25 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -460,9 +460,12 @@ class GossipSub(IPubsubRouter): else: # Check whether our peers are still in the topic # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 - for peer in self.fanout[topic]: - if peer not in self.pubsub.peer_topics[topic]: - self.fanout[topic].remove(peer) + in_topic_fanout_peers = [ + peer + for peer in self.fanout[topic] + if peer in self.pubsub.peer_topics[topic] + ] + self.fanout[topic] = in_topic_fanout_peers num_fanout_peers_in_topic = len(self.fanout[topic]) # If |fanout[topic]| < D @@ -644,7 +647,10 @@ class GossipSub(IPubsubRouter): # Remove peer from mesh for topic, if peer is in topic if topic in self.mesh and sender_peer_id in self.mesh[topic]: - self.mesh[topic].remove(sender_peer_id) + if len(self.mesh[topic]) == 1: + del self.mesh[topic] + else: + self.mesh[topic].remove(sender_peer_id) # RPC emitters From c08b2375e133cf5af31bec932ac835791a0618ea Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 23:10:47 +0800 Subject: [PATCH 17/25] Fix: should not remove topic if no peers --- libp2p/pubsub/gossipsub.py | 15 +++------------ libp2p/pubsub/pubsub.py | 10 ++-------- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 65253e25..a6cee2f7 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -150,17 +150,11 @@ class GossipSub(IPubsubRouter): for topic in self.mesh: if peer_id in self.mesh[topic]: # Delete the entry if no other peers left - if len(self.mesh[topic]) == 1: - del self.mesh[topic] - else: - self.mesh[topic].remove(peer_id) + self.mesh[topic].remove(peer_id) for topic in self.fanout: if peer_id in self.fanout[topic]: # Delete the entry if no other peers left - if len(self.fanout[topic]) == 1: - del self.fanout[topic] - else: - self.fanout[topic].remove(peer_id) + self.fanout[topic].remove(peer_id) self.peers_to_protocol.pop(peer_id, None) @@ -647,10 +641,7 @@ class GossipSub(IPubsubRouter): # Remove peer from mesh for topic, if peer is in topic if topic in self.mesh and sender_peer_id in self.mesh[topic]: - if len(self.mesh[topic]) == 1: - del self.mesh[topic] - else: - self.mesh[topic].remove(sender_peer_id) + self.mesh[topic].remove(sender_peer_id) # RPC emitters diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 45e7ffca..25ab81e6 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -317,10 +317,7 @@ class Pubsub: for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: # Delete the entry if no other peers left - if len(self.peer_topics[topic]) == 1: - del self.peer_topics[topic] - else: - self.peer_topics[topic].remove(peer_id) + self.peer_topics[topic].remove(peer_id) self.router.remove_peer(peer_id) @@ -364,10 +361,7 @@ class Pubsub: if sub_message.topicid in self.peer_topics: if origin_id in self.peer_topics[sub_message.topicid]: # Delete the entry if no other peers left - if len(self.peer_topics[sub_message.topicid]) == 1: - del self.peer_topics[sub_message.topicid] - else: - self.peer_topics[sub_message.topicid].remove(origin_id) + self.peer_topics[sub_message.topicid].remove(origin_id) # FIXME(mhchia): Change the function name? async def handle_talk(self, publish_message: rpc_pb2.Message) -> None: From 8e591229fddb3f02989530527877fdd9c02ae8e0 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 23:10:56 +0800 Subject: [PATCH 18/25] Update the sleep time in `test_handle_prune` --- tests/pubsub/test_gossipsub.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 183ff6be..5b70e19a 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -149,7 +149,7 @@ async def test_handle_prune(pubsubs_gsub, hosts): await connect(hosts[index_alice], hosts[index_bob]) # Wait 3 seconds for heartbeat to allow mesh to connect - await asyncio.sleep(3) + await asyncio.sleep(1) # Check that they are each other's mesh peer assert id_alice in gossipsubs[index_bob].mesh[topic] @@ -158,15 +158,17 @@ async def test_handle_prune(pubsubs_gsub, hosts): # alice emit prune message to bob, alice should be removed # from bob's mesh peer await gossipsubs[index_alice].emit_prune(topic, id_bob) + # `emit_prune` does not remove bob from alice's mesh peers + assert id_bob in gossipsubs[index_alice].mesh[topic] # FIXME: This test currently works because the heartbeat interval # is increased to 3 seconds, so alice won't get add back into # bob's mesh peer during heartbeat. - await asyncio.sleep(1) + # Wait for bob to `handle_prune` + await asyncio.sleep(0.1) # Check that alice is no longer bob's mesh peer assert id_alice not in gossipsubs[index_bob].mesh[topic] - assert id_bob in gossipsubs[index_alice].mesh[topic] @pytest.mark.parametrize("num_hosts", (10,)) From e6813da5f5a9815288b9ef76eda923deec71f245 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 5 Dec 2019 14:35:34 +0800 Subject: [PATCH 19/25] Refactor `_get_peers_to_send` --- libp2p/pubsub/gossipsub.py | 51 +++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index a6cee2f7..7f80ddd5 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -2,7 +2,7 @@ from ast import literal_eval import asyncio import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple +from typing import Any, Dict, Iterable, List, Sequence, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -216,38 +216,40 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - send_to: Set[ID] = set() for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue # floodsub peers - for peer_id in self.pubsub.peer_topics[topic]: - # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. - # This will improve the efficiency when searching for a peer's protocol id. - if peer_id in self.peers_floodsub: - send_to.add(peer_id) + # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. + # This will improve the efficiency when searching for a peer's protocol id. + floodsub_peers: List[ID] = [ + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if peer_id in self.peers_floodsub + ] # gossipsub peers - in_topic_gossipsub_peers: List[ID] = None + gossipsub_peers: List[ID] = [] if topic in self.mesh: - in_topic_gossipsub_peers = self.mesh[topic] + gossipsub_peers = self.mesh[topic] else: - # It could be the case that we publish to a topic that we have not subscribe - # and the topic is not yet added to our `fanout`. - if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): - # If no peers in fanout, choose some peers from gossipsub peers in topic. - self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] + # When we publish to a topic that we have not subscribe to, we randomly pick + # `self.degree` number of peers who have subscribe to the topic and add them + # as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else [] + fanout_size = len(fanout_peers) + if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): + # Combine fanout peers with selected peers + self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers ) - in_topic_gossipsub_peers = self.fanout[topic] - for peer_id in in_topic_gossipsub_peers: - send_to.add(peer_id) + gossipsub_peers = fanout_peers # Excludes `msg_forwarder` and `origin` - yield from send_to.difference([msg_forwarder, origin]) + yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin]) async def join(self, topic: str) -> None: - # Note: the comments here are the near-exact algorithm description from the spec """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement. @@ -277,9 +279,8 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - if peer not in self.mesh[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) self.fanout.pop(topic, None) @@ -300,7 +301,7 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - del self.mesh[topic] + self.mesh.pop(topic, None) async def _emit_control_msgs( self, @@ -452,7 +453,7 @@ class GossipSub(IPubsubRouter): del self.fanout[topic] del self.time_since_last_publish[topic] else: - # Check whether our peers are still in the topic + # Check if fanout peers are still in the topic and remove the ones that are not # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 in_topic_fanout_peers = [ peer From fae3798ca9828a674298c6f7b74eb73c8fa97040 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 5 Dec 2019 14:40:49 +0800 Subject: [PATCH 20/25] Apply PR feedback: correct the comment in test --- tests/pubsub/test_gossipsub.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 5b70e19a..e1d562e8 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -161,9 +161,8 @@ async def test_handle_prune(pubsubs_gsub, hosts): # `emit_prune` does not remove bob from alice's mesh peers assert id_bob in gossipsubs[index_alice].mesh[topic] - # FIXME: This test currently works because the heartbeat interval - # is increased to 3 seconds, so alice won't get add back into - # bob's mesh peer during heartbeat. + # NOTE: We increase `heartbeat_interval` to 3 seconds so that bob will not + # add alice back to his mesh after heartbeat. # Wait for bob to `handle_prune` await asyncio.sleep(0.1) From 67f02c512a81cb2bb55f428d67781ad8a188f1fd Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 5 Dec 2019 15:10:04 +0800 Subject: [PATCH 21/25] Remove unnecessary check and fix test --- libp2p/pubsub/gossipsub.py | 47 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7f80ddd5..7dcac133 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -216,6 +216,7 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ + send_to: List[ID] = [] for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue @@ -240,14 +241,19 @@ class GossipSub(IPubsubRouter): topic_in_fanout: bool = topic in self.fanout fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else [] fanout_size = len(fanout_peers) - if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): - # Combine fanout peers with selected peers - self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers - ) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers += self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) + self.fanout[topic] = fanout_peers gossipsub_peers = fanout_peers + send_to.extend(floodsub_peers + gossipsub_peers) # Excludes `msg_forwarder` and `origin` - yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin]) + yield from set(send_to).difference([msg_forwarder, origin]) async def join(self, topic: str) -> None: """ @@ -496,21 +502,20 @@ class GossipSub(IPubsubRouter): # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: - if topic not in self.mesh: - msg_ids = self.mcache.window(topic) - if msg_ids: - # Get all pubsub peers in topic and only add if they are gossipsub peers also - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.fanout[topic] - ) - msg_id_strs = [str(msg) for msg in msg_ids] - for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + msg_ids = self.mcache.window(topic) + if msg_ids: + # Get all pubsub peers in topic and only add if they are gossipsub peers also + if topic in self.pubsub.peer_topics: + # Select D peers from peers.gossipsub[topic] + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree, self.fanout[topic] + ) + msg_id_strs = [str(msg) for msg in msg_ids] + for peer in peers_to_emit_ihave_to: + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs return peers_to_gossip @staticmethod From b4900d53da39667fb740882af0db145f0231c710 Mon Sep 17 00:00:00 2001 From: NIC Lin Date: Thu, 5 Dec 2019 15:21:09 +0800 Subject: [PATCH 22/25] Apply suggestions from code review Co-Authored-By: Chih Cheng Liang --- libp2p/pubsub/gossipsub.py | 2 +- tests/pubsub/test_gossipsub.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7dcac133..af5ea771 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -130,7 +130,7 @@ class GossipSub(IPubsubRouter): # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. raise Exception( - f"This should not happen. Protocol={protocol_id} is not supported." + f"Unreachable: Protocol={protocol_id} is not supported." ) self.peers_to_protocol[peer_id] = protocol_id diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e1d562e8..030dba43 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -148,7 +148,7 @@ async def test_handle_prune(pubsubs_gsub, hosts): await connect(hosts[index_alice], hosts[index_bob]) - # Wait 3 seconds for heartbeat to allow mesh to connect + # Wait for heartbeat to allow mesh to connect await asyncio.sleep(1) # Check that they are each other's mesh peer From db0017ddbb64c49b20ae3afb220f810f9333c79d Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 5 Dec 2019 17:33:07 +0800 Subject: [PATCH 23/25] Fix lint after applying suggestion --- libp2p/pubsub/gossipsub.py | 4 +--- tests/pubsub/test_gossipsub.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index af5ea771..208d2878 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -129,9 +129,7 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - raise Exception( - f"Unreachable: Protocol={protocol_id} is not supported." - ) + raise Exception(f"Unreachable: Protocol={protocol_id} is not supported.") self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 030dba43..19ec07c5 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -148,7 +148,7 @@ async def test_handle_prune(pubsubs_gsub, hosts): await connect(hosts[index_alice], hosts[index_bob]) - # Wait for heartbeat to allow mesh to connect + # Wait for heartbeat to allow mesh to connect await asyncio.sleep(1) # Check that they are each other's mesh peer From 2d3bfc8184e892ef51967c60b73402cac0ccb873 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 6 Dec 2019 23:42:31 +0800 Subject: [PATCH 24/25] Apply PR feedback: use defaultdict and init control message --- libp2p/pubsub/gossipsub.py | 64 ++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 208d2878..b8b0e624 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,8 +1,9 @@ from ast import literal_eval import asyncio +from collections import defaultdict import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Tuple +from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -319,24 +320,24 @@ class GossipSub(IPubsubRouter): # Starting with GRAFT messages for peer, topics in peers_to_graft.items(): for topic in topics: - graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() - graft_msg.topicID = topic + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft(topicID=topic) graft_msgs.append(graft_msg) # If there are also PRUNE messages to send to this peer if peer in peers_to_prune: for topic in peers_to_prune[peer]: - prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() - prune_msg.topicID = topic + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune( + topicID=topic + ) prune_msgs.append(prune_msg) del peers_to_prune[peer] # If there are also IHAVE messages to send to this peer if peer in peers_to_gossip: for topic in peers_to_gossip[peer]: - ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) del peers_to_gossip[peer] @@ -347,17 +348,16 @@ class GossipSub(IPubsubRouter): for peer, topics in peers_to_prune.items(): prune_msgs = [] for topic in topics: - prune_msg = rpc_pb2.ControlPrune() - prune_msg.topicID = topic + prune_msg = rpc_pb2.ControlPrune(topicID=topic) prune_msgs.append(prune_msg) # If there are also IHAVE messages to send to this peer if peer in peers_to_gossip: ihave_msgs = [] for topic in peers_to_gossip[peer]: - ihave_msg = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) del peers_to_gossip[peer] @@ -368,9 +368,9 @@ class GossipSub(IPubsubRouter): for peer in peers_to_gossip: ihave_msgs = [] for topic in peers_to_gossip[peer]: - ihave_msg = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) control_msg = self.pack_control_msgs(ihave_msgs, None, None) @@ -402,9 +402,11 @@ class GossipSub(IPubsubRouter): await asyncio.sleep(self.heartbeat_interval) - def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]: - peers_to_graft: Dict[ID, List[str]] = {} - peers_to_prune: Dict[ID, List[str]] = {} + def mesh_heartbeat( + self + ) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]: + peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list) + peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list) for topic in self.mesh: # Skip if no peers have subscribed to the topic if topic not in self.pubsub.peer_topics: @@ -422,10 +424,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) # Emit GRAFT(topic) control message to peer - if peer not in peers_to_graft: - peers_to_graft[peer] = [topic] - else: - peers_to_graft[peer].append(topic) + peers_to_graft[peer].append(topic) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] @@ -437,10 +436,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].remove(peer) # Emit PRUNE(topic) control message to peer - if peer not in peers_to_prune: - peers_to_prune[peer] = [topic] - else: - peers_to_prune[peer].append(topic) + peers_to_prune[peer].append(topic) return peers_to_graft, peers_to_prune def fanout_heartbeat(self) -> None: @@ -478,8 +474,8 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]: - peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {} + def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]: + peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict) for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: @@ -492,10 +488,7 @@ class GossipSub(IPubsubRouter): msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + peers_to_gossip[peer][topic] = msg_id_strs # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -510,10 +503,7 @@ class GossipSub(IPubsubRouter): ) msg_id_strs = [str(msg) for msg in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + peers_to_gossip[peer][topic] = msg_id_strs return peers_to_gossip @staticmethod From a675da52ee4063dd659edd36d0c3ce665f219625 Mon Sep 17 00:00:00 2001 From: NIC Lin Date: Sat, 7 Dec 2019 15:46:42 +0800 Subject: [PATCH 25/25] Update libp2p/pubsub/gossipsub.py Co-Authored-By: Kevin Mai-Husan Chia --- libp2p/pubsub/gossipsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index b8b0e624..045ef397 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -235,7 +235,7 @@ class GossipSub(IPubsubRouter): gossipsub_peers = self.mesh[topic] else: # When we publish to a topic that we have not subscribe to, we randomly pick - # `self.degree` number of peers who have subscribe to the topic and add them + # `self.degree` number of peers who have subscribed to the topic and add them # as our `fanout` peers. topic_in_fanout: bool = topic in self.fanout fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []