mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
added complete back_off implementation
This commit is contained in:
@ -93,7 +93,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
direct_connect_interval: int
|
direct_connect_interval: int
|
||||||
|
|
||||||
do_px: bool
|
do_px: bool
|
||||||
back_off: int
|
back_off: dict[str, dict[ID, int]]
|
||||||
|
prune_back_off: int
|
||||||
unsubscribe_back_off: int
|
unsubscribe_back_off: int
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -111,7 +112,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
direct_connect_initial_delay: float = 0.1,
|
direct_connect_initial_delay: float = 0.1,
|
||||||
direct_connect_interval: int = 300,
|
direct_connect_interval: int = 300,
|
||||||
do_px: bool = False,
|
do_px: bool = False,
|
||||||
back_off: int = 60,
|
prune_back_off: int = 60,
|
||||||
unsubscribe_back_off: int = 10,
|
unsubscribe_back_off: int = 10,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.protocols = list(protocols)
|
self.protocols = list(protocols)
|
||||||
@ -148,7 +149,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.time_since_last_publish = {}
|
self.time_since_last_publish = {}
|
||||||
|
|
||||||
self.do_px = do_px
|
self.do_px = do_px
|
||||||
self.back_off = back_off
|
self.back_off = dict()
|
||||||
|
self.prune_back_off = prune_back_off
|
||||||
self.unsubscribe_back_off = unsubscribe_back_off
|
self.unsubscribe_back_off = unsubscribe_back_off
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
@ -345,15 +347,21 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.mesh[topic] = set()
|
self.mesh[topic] = set()
|
||||||
|
|
||||||
topic_in_fanout: bool = topic in self.fanout
|
topic_in_fanout: bool = topic in self.fanout
|
||||||
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
|
fanout_peers: set[ID] = set()
|
||||||
|
|
||||||
|
for peer in self.fanout[topic]:
|
||||||
|
if self._check_back_off(peer, topic):
|
||||||
|
continue
|
||||||
|
fanout_peers.add(peer)
|
||||||
|
|
||||||
fanout_size = len(fanout_peers)
|
fanout_size = len(fanout_peers)
|
||||||
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
||||||
# There are less than D peers (let this number be x)
|
# There are less than D peers (let this number be x)
|
||||||
# in the fanout for a topic (or the topic is not in the fanout).
|
# in the fanout for a topic (or the topic is not in the fanout).
|
||||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
||||||
if topic in self.pubsub.peer_topics:
|
if self.pubsub is not None and topic in self.pubsub.peer_topics:
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - fanout_size, fanout_peers
|
topic, self.degree - fanout_size, fanout_peers, True
|
||||||
)
|
)
|
||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers.update(selected_peers)
|
fanout_peers.update(selected_peers)
|
||||||
@ -380,7 +388,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
return
|
return
|
||||||
# Notify the peers in mesh[topic] with a PRUNE(topic) message
|
# Notify the peers in mesh[topic] with a PRUNE(topic) message
|
||||||
for peer in self.mesh[topic]:
|
for peer in self.mesh[topic]:
|
||||||
await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True)
|
await self.emit_prune(topic, peer, self.do_px, True)
|
||||||
|
self._add_back_off(peer, topic, True)
|
||||||
|
|
||||||
# Forget mesh[topic]
|
# Forget mesh[topic]
|
||||||
self.mesh.pop(topic, None)
|
self.mesh.pop(topic, None)
|
||||||
@ -516,7 +525,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if num_mesh_peers_in_topic < self.degree_low:
|
if num_mesh_peers_in_topic < self.degree_low:
|
||||||
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
|
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
|
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
|
||||||
)
|
)
|
||||||
|
|
||||||
for peer in selected_peers:
|
for peer in selected_peers:
|
||||||
@ -579,9 +588,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if len(in_topic_peers) < self.degree:
|
if len(in_topic_peers) < self.degree:
|
||||||
# Select additional peers from peers.gossipsub[topic]
|
# Select additional peers from peers.gossipsub[topic]
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic,
|
topic, self.degree - len(in_topic_peers), in_topic_peers, True
|
||||||
self.degree - len(in_topic_peers),
|
|
||||||
in_topic_peers,
|
|
||||||
)
|
)
|
||||||
# Add the selected peers
|
# Add the selected peers
|
||||||
in_topic_peers.update(selected_peers)
|
in_topic_peers.update(selected_peers)
|
||||||
@ -592,7 +599,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if msg_ids:
|
if msg_ids:
|
||||||
# Select D peers from peers.gossipsub[topic] excluding current peers
|
# Select D peers from peers.gossipsub[topic] excluding current peers
|
||||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree, current_peers
|
topic, self.degree, current_peers, True
|
||||||
)
|
)
|
||||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||||
for peer in peers_to_emit_ihave_to:
|
for peer in peers_to_emit_ihave_to:
|
||||||
@ -666,7 +673,11 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
return selection
|
return selection
|
||||||
|
|
||||||
def _get_in_topic_gossipsub_peers_from_minus(
|
def _get_in_topic_gossipsub_peers_from_minus(
|
||||||
self, topic: str, num_to_select: int, minus: Iterable[ID]
|
self,
|
||||||
|
topic: str,
|
||||||
|
num_to_select: int,
|
||||||
|
minus: Iterable[ID],
|
||||||
|
backoff_check: bool = False,
|
||||||
) -> list[ID]:
|
) -> list[ID]:
|
||||||
if self.pubsub is None:
|
if self.pubsub is None:
|
||||||
raise NoPubsubAttached
|
raise NoPubsubAttached
|
||||||
@ -675,8 +686,57 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
for peer_id in self.pubsub.peer_topics[topic]
|
for peer_id in self.pubsub.peer_topics[topic]
|
||||||
if self.peer_protocol[peer_id] == PROTOCOL_ID
|
if self.peer_protocol[peer_id] == PROTOCOL_ID
|
||||||
}
|
}
|
||||||
|
if backoff_check:
|
||||||
|
# filter out peers that are in back off for this topic
|
||||||
|
gossipsub_peers_in_topic = {
|
||||||
|
peer_id
|
||||||
|
for peer_id in gossipsub_peers_in_topic
|
||||||
|
if self._check_back_off(peer_id, topic) is False
|
||||||
|
}
|
||||||
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
|
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
|
||||||
|
|
||||||
|
def _add_back_off(
|
||||||
|
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Add back off for a peer in a topic.
|
||||||
|
:param peer: peer to add back off for
|
||||||
|
:param topic: topic to add back off for
|
||||||
|
:param is_unsubscribe: whether this is an unsubscribe operation
|
||||||
|
:param backoff_duration: duration of back off in seconds, if 0, use default
|
||||||
|
"""
|
||||||
|
if topic not in self.back_off:
|
||||||
|
self.back_off[topic] = dict()
|
||||||
|
|
||||||
|
backoff_till = int(time.time())
|
||||||
|
if backoff_duration > 0:
|
||||||
|
backoff_till += backoff_duration
|
||||||
|
else:
|
||||||
|
if is_unsubscribe:
|
||||||
|
backoff_till += self.unsubscribe_back_off
|
||||||
|
else:
|
||||||
|
backoff_till += self.prune_back_off
|
||||||
|
|
||||||
|
if peer not in self.back_off[topic]:
|
||||||
|
self.back_off[topic][peer] = backoff_till
|
||||||
|
else:
|
||||||
|
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
|
||||||
|
|
||||||
|
def _check_back_off(self, peer: ID, topic: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a peer is in back off for a topic and cleanup expired back off entries.
|
||||||
|
:param peer: peer to check
|
||||||
|
:param topic: topic to check
|
||||||
|
:return: True if the peer is in back off, False otherwise
|
||||||
|
"""
|
||||||
|
if topic not in self.back_off:
|
||||||
|
return False
|
||||||
|
if self.back_off[topic].get(peer, 0) > int(time.time()):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
del self.back_off[topic][peer]
|
||||||
|
return False
|
||||||
|
|
||||||
# RPC handlers
|
# RPC handlers
|
||||||
|
|
||||||
async def handle_ihave(
|
async def handle_ihave(
|
||||||
@ -762,9 +822,6 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
) -> None:
|
) -> None:
|
||||||
topic: str = graft_msg.topicID
|
topic: str = graft_msg.topicID
|
||||||
|
|
||||||
# TODO: complete the remaining logic
|
|
||||||
self.do_px
|
|
||||||
|
|
||||||
# Add peer to mesh for topic
|
# Add peer to mesh for topic
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
for direct_peer in self.direct_peers:
|
for direct_peer in self.direct_peers:
|
||||||
@ -772,26 +829,38 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
"GRAFT: ignoring request from direct peer %s", sender_peer_id
|
"GRAFT: ignoring request from direct peer %s", sender_peer_id
|
||||||
)
|
)
|
||||||
await self.emit_prune(
|
await self.emit_prune(topic, sender_peer_id, False, False)
|
||||||
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if self._check_back_off(sender_peer_id, topic):
|
||||||
|
logger.warning(
|
||||||
|
"GRAFT: ignoring request from %s, back off until %d",
|
||||||
|
sender_peer_id,
|
||||||
|
self.back_off[topic][sender_peer_id],
|
||||||
|
)
|
||||||
|
self._add_back_off(sender_peer_id, topic, False)
|
||||||
|
await self.emit_prune(topic, sender_peer_id, False, False)
|
||||||
|
return
|
||||||
|
|
||||||
if sender_peer_id not in self.mesh[topic]:
|
if sender_peer_id not in self.mesh[topic]:
|
||||||
self.mesh[topic].add(sender_peer_id)
|
self.mesh[topic].add(sender_peer_id)
|
||||||
else:
|
else:
|
||||||
# Respond with PRUNE if not subscribed to the topic
|
# Respond with PRUNE if not subscribed to the topic
|
||||||
await self.emit_prune(
|
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
|
||||||
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
|
|
||||||
)
|
|
||||||
|
|
||||||
async def handle_prune(
|
async def handle_prune(
|
||||||
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
|
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
|
||||||
) -> None:
|
) -> None:
|
||||||
topic: str = prune_msg.topicID
|
topic: str = prune_msg.topicID
|
||||||
|
backoff_till: int = prune_msg.backoff
|
||||||
|
|
||||||
# Remove peer from mesh for topic
|
# Remove peer from mesh for topic
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
|
if backoff_till > 0:
|
||||||
|
self._add_back_off(sender_peer_id, topic, False, backoff_till)
|
||||||
|
else:
|
||||||
|
self._add_back_off(sender_peer_id, topic, False)
|
||||||
|
|
||||||
self.mesh[topic].discard(sender_peer_id)
|
self.mesh[topic].discard(sender_peer_id)
|
||||||
|
|
||||||
# RPC emitters
|
# RPC emitters
|
||||||
@ -845,12 +914,11 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
async def emit_prune(
|
async def emit_prune(
|
||||||
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
|
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
|
||||||
) -> None:
|
) -> None:
|
||||||
async def emit_prune(self, topic: str, id: ID) -> None:
|
|
||||||
"""Emit graft message, sent to to_peer, for topic."""
|
"""Emit graft message, sent to to_peer, for topic."""
|
||||||
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
|
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
|
||||||
prune_msg.topicID = topic
|
prune_msg.topicID = topic
|
||||||
|
|
||||||
back_off_duration = self.back_off
|
back_off_duration = self.prune_back_off
|
||||||
if is_unsubscribe:
|
if is_unsubscribe:
|
||||||
back_off_duration = self.unsubscribe_back_off
|
back_off_duration = self.unsubscribe_back_off
|
||||||
|
|
||||||
@ -862,7 +930,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
|
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
|
||||||
control_msg.prune.extend([prune_msg])
|
control_msg.prune.extend([prune_msg])
|
||||||
|
|
||||||
await self.emit_control_message(control_msg, id)
|
await self.emit_control_message(control_msg, to_peer)
|
||||||
|
|
||||||
async def emit_control_message(
|
async def emit_control_message(
|
||||||
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
|
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
|
||||||
|
|||||||
@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch):
|
|||||||
# check if it is called in `handle_graft`
|
# check if it is called in `handle_graft`
|
||||||
event_emit_prune = trio.Event()
|
event_emit_prune = trio.Event()
|
||||||
|
|
||||||
async def emit_prune(topic, sender_peer_id):
|
async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe):
|
||||||
event_emit_prune.set()
|
event_emit_prune.set()
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
@ -193,7 +193,7 @@ async def test_handle_prune():
|
|||||||
|
|
||||||
# alice emit prune message to bob, alice should be removed
|
# alice emit prune message to bob, alice should be removed
|
||||||
# from bob's mesh peer
|
# from bob's mesh peer
|
||||||
await gossipsubs[index_alice].emit_prune(topic, id_bob)
|
await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False)
|
||||||
# `emit_prune` does not remove bob from alice's mesh peers
|
# `emit_prune` does not remove bob from alice's mesh peers
|
||||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user