mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-11 23:51:07 +00:00
Remove unnecessary check and fix test
This commit is contained in:
@ -216,6 +216,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
:param origin: peer id of the peer the message originate from.
|
:param origin: peer id of the peer the message originate from.
|
||||||
:return: a generator of the peer ids who we send data to.
|
:return: a generator of the peer ids who we send data to.
|
||||||
"""
|
"""
|
||||||
|
send_to: List[ID] = []
|
||||||
for topic in topic_ids:
|
for topic in topic_ids:
|
||||||
if topic not in self.pubsub.peer_topics:
|
if topic not in self.pubsub.peer_topics:
|
||||||
continue
|
continue
|
||||||
@ -240,14 +241,19 @@ class GossipSub(IPubsubRouter):
|
|||||||
topic_in_fanout: bool = topic in self.fanout
|
topic_in_fanout: bool = topic in self.fanout
|
||||||
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
|
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
|
||||||
fanout_size = len(fanout_peers)
|
fanout_size = len(fanout_peers)
|
||||||
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
if not topic_in_fanout or (
|
||||||
# Combine fanout peers with selected peers
|
topic_in_fanout and fanout_size < self.degree
|
||||||
self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus(
|
):
|
||||||
topic, self.degree - fanout_size, fanout_peers
|
if topic in self.pubsub.peer_topics:
|
||||||
)
|
# Combine fanout peers with selected peers
|
||||||
|
fanout_peers += self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
topic, self.degree - fanout_size, fanout_peers
|
||||||
|
)
|
||||||
|
self.fanout[topic] = fanout_peers
|
||||||
gossipsub_peers = fanout_peers
|
gossipsub_peers = fanout_peers
|
||||||
|
send_to.extend(floodsub_peers + gossipsub_peers)
|
||||||
# Excludes `msg_forwarder` and `origin`
|
# Excludes `msg_forwarder` and `origin`
|
||||||
yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin])
|
yield from set(send_to).difference([msg_forwarder, origin])
|
||||||
|
|
||||||
async def join(self, topic: str) -> None:
|
async def join(self, topic: str) -> None:
|
||||||
"""
|
"""
|
||||||
@ -496,21 +502,20 @@ class GossipSub(IPubsubRouter):
|
|||||||
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
|
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
|
||||||
# Do the same for fanout, for all topics not already hit in mesh
|
# Do the same for fanout, for all topics not already hit in mesh
|
||||||
for topic in self.fanout:
|
for topic in self.fanout:
|
||||||
if topic not in self.mesh:
|
msg_ids = self.mcache.window(topic)
|
||||||
msg_ids = self.mcache.window(topic)
|
if msg_ids:
|
||||||
if msg_ids:
|
# Get all pubsub peers in topic and only add if they are gossipsub peers also
|
||||||
# Get all pubsub peers in topic and only add if they are gossipsub peers also
|
if topic in self.pubsub.peer_topics:
|
||||||
if topic in self.pubsub.peer_topics:
|
# Select D peers from peers.gossipsub[topic]
|
||||||
# Select D peers from peers.gossipsub[topic]
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
topic, self.degree, self.fanout[topic]
|
||||||
topic, self.degree, self.fanout[topic]
|
)
|
||||||
)
|
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
for peer in peers_to_emit_ihave_to:
|
||||||
for peer in peers_to_emit_ihave_to:
|
if peer not in peers_to_gossip:
|
||||||
if peer not in peers_to_gossip:
|
peers_to_gossip[peer] = {topic: msg_id_strs}
|
||||||
peers_to_gossip[peer] = {topic: msg_id_strs}
|
else:
|
||||||
else:
|
peers_to_gossip[peer][topic] = msg_id_strs
|
||||||
peers_to_gossip[peer][topic] = msg_id_strs
|
|
||||||
return peers_to_gossip
|
return peers_to_gossip
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user