From 67f02c512a81cb2bb55f428d67781ad8a188f1fd Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 5 Dec 2019 15:10:04 +0800 Subject: [PATCH] Remove unnecessary check and fix test --- libp2p/pubsub/gossipsub.py | 47 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7f80ddd5..7dcac133 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -216,6 +216,7 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ + send_to: List[ID] = [] for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue @@ -240,14 +241,19 @@ class GossipSub(IPubsubRouter): topic_in_fanout: bool = topic in self.fanout fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else [] fanout_size = len(fanout_peers) - if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): - # Combine fanout peers with selected peers - self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers - ) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers += self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) + self.fanout[topic] = fanout_peers gossipsub_peers = fanout_peers + send_to.extend(floodsub_peers + gossipsub_peers) # Excludes `msg_forwarder` and `origin` - yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin]) + yield from set(send_to).difference([msg_forwarder, origin]) async def join(self, topic: str) -> None: """ @@ -496,21 +502,20 @@ class GossipSub(IPubsubRouter): # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: - if topic not in self.mesh: - msg_ids = self.mcache.window(topic) - if msg_ids: - # Get all pubsub peers in topic and only add if they are gossipsub peers also - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.fanout[topic] - ) - msg_id_strs = [str(msg) for msg in msg_ids] - for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + msg_ids = self.mcache.window(topic) + if msg_ids: + # Get all pubsub peers in topic and only add if they are gossipsub peers also + if topic in self.pubsub.peer_topics: + # Select D peers from peers.gossipsub[topic] + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree, self.fanout[topic] + ) + msg_id_strs = [str(msg) for msg in msg_ids] + for peer in peers_to_emit_ihave_to: + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs return peers_to_gossip @staticmethod