added params for peer exchange and back off

This commit is contained in:
Mystical
2025-06-01 12:19:09 +05:30
parent c48618825d
commit b78468ca32
3 changed files with 92 additions and 34 deletions

View File

@ -92,6 +92,10 @@ class GossipSub(IPubsubRouter, Service):
direct_connect_initial_delay: float
direct_connect_interval: int
do_px: bool
back_off: int
unsubscribe_back_off: int
def __init__(
self,
protocols: Sequence[TProtocol],
@ -106,6 +110,9 @@ class GossipSub(IPubsubRouter, Service):
heartbeat_interval: int = 120,
direct_connect_initial_delay: float = 0.1,
direct_connect_interval: int = 300,
do_px: bool = False,
back_off: int = 60,
unsubscribe_back_off: int = 10,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
@ -140,6 +147,10 @@ class GossipSub(IPubsubRouter, Service):
self.direct_connect_initial_delay = direct_connect_initial_delay
self.time_since_last_publish = {}
self.do_px = do_px
self.back_off = back_off
self.unsubscribe_back_off = unsubscribe_back_off
async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
@ -369,7 +380,7 @@ class GossipSub(IPubsubRouter, Service):
return
# Notify the peers in mesh[topic] with a PRUNE(topic) message
for peer in self.mesh[topic]:
await self.emit_prune(topic, peer)
await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True)
# Forget mesh[topic]
self.mesh.pop(topic, None)
@ -751,6 +762,9 @@ class GossipSub(IPubsubRouter, Service):
) -> None:
topic: str = graft_msg.topicID
# TODO: complete the remaining logic
self.do_px
# Add peer to mesh for topic
if topic in self.mesh:
for direct_peer in self.direct_peers:
@ -758,14 +772,18 @@ class GossipSub(IPubsubRouter, Service):
logger.warning(
"GRAFT: ignoring request from direct peer %s", sender_peer_id
)
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
)
return
if sender_peer_id not in self.mesh[topic]:
self.mesh[topic].add(sender_peer_id)
else:
# Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
)
async def handle_prune(
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
@ -824,11 +842,23 @@ class GossipSub(IPubsubRouter, Service):
await self.emit_control_message(control_msg, id)
async def emit_prune(
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
) -> None:
async def emit_prune(self, topic: str, id: ID) -> None:
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
back_off_duration = self.back_off
if is_unsubscribe:
back_off_duration = self.unsubscribe_back_off
prune_msg.backoff = back_off_duration
# TODO: add peers once peerstore changes are complete
# prune_msg.peers =
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])