mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
refactor(gossipsub.py): Add helper function to fanout and gossipsub (#678)
* fanout and gossibsub helper * add newsfragment * remove dub fanout check
This commit is contained in:
@ -528,82 +528,99 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
peers_to_prune[peer].append(topic)
|
peers_to_prune[peer].append(topic)
|
||||||
return peers_to_graft, peers_to_prune
|
return peers_to_graft, peers_to_prune
|
||||||
|
|
||||||
def fanout_heartbeat(self) -> None:
|
def _handle_topic_heartbeat(
|
||||||
# Note: the comments here are the exact pseudocode from the spec
|
self,
|
||||||
for topic in list(self.fanout):
|
topic: str,
|
||||||
if (
|
current_peers: set[ID],
|
||||||
self.pubsub is not None
|
is_fanout: bool = False,
|
||||||
and topic not in self.pubsub.peer_topics
|
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] | None = None,
|
||||||
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
|
) -> tuple[set[ID], bool]:
|
||||||
< int(time.time())
|
"""
|
||||||
|
Helper method to handle heartbeat for a single topic,
|
||||||
|
supporting both fanout and gossip.
|
||||||
|
|
||||||
|
:param topic: The topic to handle
|
||||||
|
:param current_peers: Current set of peers in the topic
|
||||||
|
:param is_fanout: Whether this is a fanout topic (affects expiration check)
|
||||||
|
:param peers_to_gossip: Optional dictionary to store peers to gossip to
|
||||||
|
:return: Tuple of (updated_peers, should_remove_topic)
|
||||||
|
"""
|
||||||
|
if self.pubsub is None:
|
||||||
|
raise NoPubsubAttached
|
||||||
|
|
||||||
|
# Skip if no peers have subscribed to the topic
|
||||||
|
if topic not in self.pubsub.peer_topics:
|
||||||
|
return current_peers, False
|
||||||
|
|
||||||
|
# For fanout topics, check if we should remove the topic
|
||||||
|
if is_fanout:
|
||||||
|
if self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(
|
||||||
|
time.time()
|
||||||
):
|
):
|
||||||
# Remove topic from fanout
|
return set(), True
|
||||||
|
|
||||||
|
# Check if peers are still in the topic and remove the ones that are not
|
||||||
|
in_topic_peers: set[ID] = {
|
||||||
|
peer for peer in current_peers if peer in self.pubsub.peer_topics[topic]
|
||||||
|
}
|
||||||
|
|
||||||
|
# If we need more peers to reach target degree
|
||||||
|
if len(in_topic_peers) < self.degree:
|
||||||
|
# Select additional peers from peers.gossipsub[topic]
|
||||||
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
topic,
|
||||||
|
self.degree - len(in_topic_peers),
|
||||||
|
in_topic_peers,
|
||||||
|
)
|
||||||
|
# Add the selected peers
|
||||||
|
in_topic_peers.update(selected_peers)
|
||||||
|
|
||||||
|
# Handle gossip if requested
|
||||||
|
if peers_to_gossip is not None:
|
||||||
|
msg_ids = self.mcache.window(topic)
|
||||||
|
if msg_ids:
|
||||||
|
# Select D peers from peers.gossipsub[topic] excluding current peers
|
||||||
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
topic, self.degree, current_peers
|
||||||
|
)
|
||||||
|
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||||
|
for peer in peers_to_emit_ihave_to:
|
||||||
|
peers_to_gossip[peer][topic] = msg_id_strs
|
||||||
|
|
||||||
|
return in_topic_peers, False
|
||||||
|
|
||||||
|
def fanout_heartbeat(self) -> None:
|
||||||
|
"""
|
||||||
|
Maintain fanout topics by:
|
||||||
|
1. Removing expired topics
|
||||||
|
2. Removing peers that are no longer in the topic
|
||||||
|
3. Adding new peers if needed to maintain the target degree
|
||||||
|
"""
|
||||||
|
for topic in list(self.fanout):
|
||||||
|
updated_peers, should_remove = self._handle_topic_heartbeat(
|
||||||
|
topic, self.fanout[topic], is_fanout=True
|
||||||
|
)
|
||||||
|
if should_remove:
|
||||||
del self.fanout[topic]
|
del self.fanout[topic]
|
||||||
else:
|
else:
|
||||||
# Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501
|
self.fanout[topic] = updated_peers
|
||||||
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
|
|
||||||
|
|
||||||
in_topic_fanout_peers: list[ID] = []
|
|
||||||
if self.pubsub is not None:
|
|
||||||
in_topic_fanout_peers = [
|
|
||||||
peer
|
|
||||||
for peer in self.fanout[topic]
|
|
||||||
if peer in self.pubsub.peer_topics[topic]
|
|
||||||
]
|
|
||||||
self.fanout[topic] = set(in_topic_fanout_peers)
|
|
||||||
num_fanout_peers_in_topic = len(self.fanout[topic])
|
|
||||||
|
|
||||||
# If |fanout[topic]| < D
|
|
||||||
if num_fanout_peers_in_topic < self.degree:
|
|
||||||
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # noqa: E501
|
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
|
||||||
topic,
|
|
||||||
self.degree - num_fanout_peers_in_topic,
|
|
||||||
self.fanout[topic],
|
|
||||||
)
|
|
||||||
# Add the peers to fanout[topic]
|
|
||||||
self.fanout[topic].update(selected_peers)
|
|
||||||
|
|
||||||
def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]:
|
def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]:
|
||||||
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict)
|
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict)
|
||||||
|
|
||||||
|
# Handle mesh topics
|
||||||
for topic in self.mesh:
|
for topic in self.mesh:
|
||||||
msg_ids = self.mcache.window(topic)
|
self._handle_topic_heartbeat(
|
||||||
if msg_ids:
|
topic, self.mesh[topic], peers_to_gossip=peers_to_gossip
|
||||||
if self.pubsub is None:
|
)
|
||||||
raise NoPubsubAttached
|
|
||||||
# Get all pubsub peers in a topic and only add them if they are
|
|
||||||
# gossipsub peers too
|
|
||||||
if topic in self.pubsub.peer_topics:
|
|
||||||
# Select D peers from peers.gossipsub[topic]
|
|
||||||
peers_to_emit_ihave_to = (
|
|
||||||
self._get_in_topic_gossipsub_peers_from_minus(
|
|
||||||
topic, self.degree, self.mesh[topic]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
# Handle fanout topics that aren't in mesh
|
||||||
for peer in peers_to_emit_ihave_to:
|
|
||||||
peers_to_gossip[peer][topic] = msg_id_strs
|
|
||||||
|
|
||||||
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
|
|
||||||
# Do the same for fanout, for all topics not already hit in mesh
|
|
||||||
for topic in self.fanout:
|
for topic in self.fanout:
|
||||||
msg_ids = self.mcache.window(topic)
|
if topic not in self.mesh:
|
||||||
if msg_ids:
|
self._handle_topic_heartbeat(
|
||||||
if self.pubsub is None:
|
topic, self.fanout[topic], peers_to_gossip=peers_to_gossip
|
||||||
raise NoPubsubAttached
|
)
|
||||||
# Get all pubsub peers in topic and only add if they are
|
|
||||||
# gossipsub peers also
|
|
||||||
if topic in self.pubsub.peer_topics:
|
|
||||||
# Select D peers from peers.gossipsub[topic]
|
|
||||||
peers_to_emit_ihave_to = (
|
|
||||||
self._get_in_topic_gossipsub_peers_from_minus(
|
|
||||||
topic, self.degree, self.fanout[topic]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
|
||||||
for peer in peers_to_emit_ihave_to:
|
|
||||||
peers_to_gossip[peer][topic] = msg_id_strs
|
|
||||||
return peers_to_gossip
|
return peers_to_gossip
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
1
newsfragments/678.misc.rst
Normal file
1
newsfragments/678.misc.rst
Normal file
@ -0,0 +1 @@
|
|||||||
|
Refactored gossipsub heartbeat logic to use a single helper method `_handle_topic_heartbeat` that handles both fanout and gossip heartbeats.
|
||||||
Reference in New Issue
Block a user