Merge pull request #373 from NIC619/refactor_and_cleanup_gossipsub

Refactor and cleanup gossipsub
This commit is contained in:
NIC Lin
2019-12-07 16:28:55 +08:00
committed by GitHub
5 changed files with 322 additions and 93 deletions

View File

@ -1,8 +1,9 @@
from ast import literal_eval from ast import literal_eval
import asyncio import asyncio
from collections import defaultdict
import logging import logging
import random import random
from typing import Any, Dict, Iterable, List, Sequence, Set from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple
from libp2p.network.stream.exceptions import StreamClosed from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID from libp2p.peer.id import ID
@ -43,6 +44,7 @@ class GossipSub(IPubsubRouter):
mcache: MessageCache mcache: MessageCache
heartbeat_initial_delay: float
heartbeat_interval: int heartbeat_interval: int
def __init__( def __init__(
@ -54,6 +56,7 @@ class GossipSub(IPubsubRouter):
time_to_live: int, time_to_live: int,
gossip_window: int = 3, gossip_window: int = 3,
gossip_history: int = 5, gossip_history: int = 5,
heartbeat_initial_delay: float = 0.1,
heartbeat_interval: int = 120, heartbeat_interval: int = 120,
) -> None: ) -> None:
self.protocols = list(protocols) self.protocols = list(protocols)
@ -84,6 +87,7 @@ class GossipSub(IPubsubRouter):
self.mcache = MessageCache(gossip_window, gossip_history) self.mcache = MessageCache(gossip_window, gossip_history)
# Create heartbeat timer # Create heartbeat timer
self.heartbeat_initial_delay = heartbeat_initial_delay
self.heartbeat_interval = heartbeat_interval self.heartbeat_interval = heartbeat_interval
# Interface functions # Interface functions
@ -106,7 +110,6 @@ class GossipSub(IPubsubRouter):
logger.debug("attached to pusub") logger.debug("attached to pusub")
# Start heartbeat now that we have a pubsub instance # Start heartbeat now that we have a pubsub instance
# TODO: Start after delay
asyncio.ensure_future(self.heartbeat()) asyncio.ensure_future(self.heartbeat())
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
@ -127,8 +130,7 @@ class GossipSub(IPubsubRouter):
# instance in multistream-select, but it is not the protocol that gossipsub supports. # instance in multistream-select, but it is not the protocol that gossipsub supports.
# In this case, probably we registered gossipsub to a wrong `protocol_id` # In this case, probably we registered gossipsub to a wrong `protocol_id`
# in multistream-select, or wrong versions. # in multistream-select, or wrong versions.
# TODO: Better handling raise Exception(f"Unreachable: Protocol={protocol_id} is not supported.")
raise Exception(f"protocol is not supported: protocol_id={protocol_id}")
self.peers_to_protocol[peer_id] = protocol_id self.peers_to_protocol[peer_id] = protocol_id
def remove_peer(self, peer_id: ID) -> None: def remove_peer(self, peer_id: ID) -> None:
@ -144,6 +146,15 @@ class GossipSub(IPubsubRouter):
elif peer_id in self.peers_floodsub: elif peer_id in self.peers_floodsub:
self.peers_floodsub.remove(peer_id) self.peers_floodsub.remove(peer_id)
for topic in self.mesh:
if peer_id in self.mesh[topic]:
# Delete the entry if no other peers left
self.mesh[topic].remove(peer_id)
for topic in self.fanout:
if peer_id in self.fanout[topic]:
# Delete the entry if no other peers left
self.fanout[topic].remove(peer_id)
self.peers_to_protocol.pop(peer_id, None) self.peers_to_protocol.pop(peer_id, None)
async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
@ -204,42 +215,46 @@ class GossipSub(IPubsubRouter):
:param origin: peer id of the peer the message originate from. :param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to. :return: a generator of the peer ids who we send data to.
""" """
send_to: Set[ID] = set() send_to: List[ID] = []
for topic in topic_ids: for topic in topic_ids:
if topic not in self.pubsub.peer_topics: if topic not in self.pubsub.peer_topics:
continue continue
# floodsub peers # floodsub peers
for peer_id in self.pubsub.peer_topics[topic]: # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. # This will improve the efficiency when searching for a peer's protocol id.
# This will improve the efficiency when searching for a peer's protocol id. floodsub_peers: List[ID] = [
if peer_id in self.peers_floodsub: peer_id
send_to.add(peer_id) for peer_id in self.pubsub.peer_topics[topic]
if peer_id in self.peers_floodsub
]
# gossipsub peers # gossipsub peers
in_topic_gossipsub_peers: List[ID] = None gossipsub_peers: List[ID] = []
# TODO: Do we need to check `topic in self.pubsub.my_topics`?
if topic in self.mesh: if topic in self.mesh:
in_topic_gossipsub_peers = self.mesh[topic] gossipsub_peers = self.mesh[topic]
else: else:
# TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not # When we publish to a topic that we have not subscribe to, we randomly pick
# subscribed? # `self.degree` number of peers who have subscribed to the topic and add them
# I assume there could be short periods between heartbeats where topic may not # as our `fanout` peers.
# be but we should check that this path gets hit appropriately topic_in_fanout: bool = topic in self.fanout
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): fanout_size = len(fanout_peers)
# If no peers in fanout, choose some peers from gossipsub peers in topic. if not topic_in_fanout or (
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( topic_in_fanout and fanout_size < self.degree
topic, self.degree, [] ):
) if topic in self.pubsub.peer_topics:
in_topic_gossipsub_peers = self.fanout[topic] # Combine fanout peers with selected peers
for peer_id in in_topic_gossipsub_peers: fanout_peers += self._get_in_topic_gossipsub_peers_from_minus(
send_to.add(peer_id) topic, self.degree - fanout_size, fanout_peers
)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.extend(floodsub_peers + gossipsub_peers)
# Excludes `msg_forwarder` and `origin` # Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin]) yield from set(send_to).difference([msg_forwarder, origin])
async def join(self, topic: str) -> None: async def join(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec
""" """
Join notifies the router that we want to receive and forward messages Join notifies the router that we want to receive and forward messages
in a topic. It is invoked after the subscription announcement. in a topic. It is invoked after the subscription announcement.
@ -269,9 +284,8 @@ class GossipSub(IPubsubRouter):
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
for peer in fanout_peers: for peer in fanout_peers:
if peer not in self.mesh[topic]: self.mesh[topic].append(peer)
self.mesh[topic].append(peer) await self.emit_graft(topic, peer)
await self.emit_graft(topic, peer)
self.fanout.pop(topic, None) self.fanout.pop(topic, None)
@ -292,7 +306,75 @@ class GossipSub(IPubsubRouter):
await self.emit_prune(topic, peer) await self.emit_prune(topic, peer)
# Forget mesh[topic] # Forget mesh[topic]
del self.mesh[topic] self.mesh.pop(topic, None)
async def _emit_control_msgs(
self,
peers_to_graft: Dict[ID, List[str]],
peers_to_prune: Dict[ID, List[str]],
peers_to_gossip: Dict[ID, Dict[str, List[str]]],
) -> None:
graft_msgs: List[rpc_pb2.ControlGraft] = []
prune_msgs: List[rpc_pb2.ControlPrune] = []
ihave_msgs: List[rpc_pb2.ControlIHave] = []
# Starting with GRAFT messages
for peer, topics in peers_to_graft.items():
for topic in topics:
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft(topicID=topic)
graft_msgs.append(graft_msg)
# If there are also PRUNE messages to send to this peer
if peer in peers_to_prune:
for topic in peers_to_prune[peer]:
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune(
topicID=topic
)
prune_msgs.append(prune_msg)
del peers_to_prune[peer]
# If there are also IHAVE messages to send to this peer
if peer in peers_to_gossip:
for topic in peers_to_gossip[peer]:
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
del peers_to_gossip[peer]
control_msg = self.pack_control_msgs(ihave_msgs, graft_msgs, prune_msgs)
await self.emit_control_message(control_msg, peer)
# Next with PRUNE messages
for peer, topics in peers_to_prune.items():
prune_msgs = []
for topic in topics:
prune_msg = rpc_pb2.ControlPrune(topicID=topic)
prune_msgs.append(prune_msg)
# If there are also IHAVE messages to send to this peer
if peer in peers_to_gossip:
ihave_msgs = []
for topic in peers_to_gossip[peer]:
ihave_msg = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
del peers_to_gossip[peer]
control_msg = self.pack_control_msgs(ihave_msgs, None, prune_msgs)
await self.emit_control_message(control_msg, peer)
# Fianlly IHAVE messages
for peer in peers_to_gossip:
ihave_msgs = []
for topic in peers_to_gossip[peer]:
ihave_msg = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
control_msg = self.pack_control_msgs(ihave_msgs, None, None)
await self.emit_control_message(control_msg, peer)
# Heartbeat # Heartbeat
async def heartbeat(self) -> None: async def heartbeat(self) -> None:
@ -302,16 +384,29 @@ class GossipSub(IPubsubRouter):
Note: the heartbeats are called with awaits because each heartbeat depends on the Note: the heartbeats are called with awaits because each heartbeat depends on the
state changes in the preceding heartbeat state changes in the preceding heartbeat
""" """
# Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501
await asyncio.sleep(self.heartbeat_initial_delay)
while True: while True:
# Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
peers_to_graft, peers_to_prune = self.mesh_heartbeat()
# Maintain fanout
self.fanout_heartbeat()
# Get the peers to send IHAVE to
peers_to_gossip = self.gossip_heartbeat()
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it
await self._emit_control_msgs(
peers_to_graft, peers_to_prune, peers_to_gossip
)
await self.mesh_heartbeat() self.mcache.shift()
await self.fanout_heartbeat()
await self.gossip_heartbeat()
await asyncio.sleep(self.heartbeat_interval) await asyncio.sleep(self.heartbeat_interval)
async def mesh_heartbeat(self) -> None: def mesh_heartbeat(
# Note: the comments here are the exact pseudocode from the spec self
) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]:
peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list)
for topic in self.mesh: for topic in self.mesh:
# Skip if no peers have subscribed to the topic # Skip if no peers have subscribed to the topic
if topic not in self.pubsub.peer_topics: if topic not in self.pubsub.peer_topics:
@ -324,15 +419,12 @@ class GossipSub(IPubsubRouter):
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
) )
fanout_peers_not_in_mesh: List[ID] = [ for peer in selected_peers:
peer for peer in selected_peers if peer not in self.mesh[topic]
]
for peer in fanout_peers_not_in_mesh:
# Add peer to mesh[topic] # Add peer to mesh[topic]
self.mesh[topic].append(peer) self.mesh[topic].append(peer)
# Emit GRAFT(topic) control message to peer # Emit GRAFT(topic) control message to peer
await self.emit_graft(topic, peer) peers_to_graft[peer].append(topic)
if num_mesh_peers_in_topic > self.degree_high: if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic] # Select |mesh[topic]| - D peers from mesh[topic]
@ -344,18 +436,31 @@ class GossipSub(IPubsubRouter):
self.mesh[topic].remove(peer) self.mesh[topic].remove(peer)
# Emit PRUNE(topic) control message to peer # Emit PRUNE(topic) control message to peer
await self.emit_prune(topic, peer) peers_to_prune[peer].append(topic)
return peers_to_graft, peers_to_prune
async def fanout_heartbeat(self) -> None: def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec # Note: the comments here are the exact pseudocode from the spec
for topic in self.fanout: for topic in self.fanout:
# If time since last published > ttl # Delete topic entry if it's not in `pubsub.peer_topics`
# or if it's time-since-last-published > ttl
# TODO: there's no way time_since_last_publish gets set anywhere yet # TODO: there's no way time_since_last_publish gets set anywhere yet
if self.time_since_last_publish[topic] > self.time_to_live: if (
topic not in self.pubsub.peer_topics
or self.time_since_last_publish[topic] > self.time_to_live
):
# Remove topic from fanout # Remove topic from fanout
del self.fanout[topic] del self.fanout[topic]
del self.time_since_last_publish[topic] del self.time_since_last_publish[topic]
else: else:
# Check if fanout peers are still in the topic and remove the ones that are not
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
in_topic_fanout_peers = [
peer
for peer in self.fanout[topic]
if peer in self.pubsub.peer_topics[topic]
]
self.fanout[topic] = in_topic_fanout_peers
num_fanout_peers_in_topic = len(self.fanout[topic]) num_fanout_peers_in_topic = len(self.fanout[topic])
# If |fanout[topic]| < D # If |fanout[topic]| < D
@ -369,51 +474,37 @@ class GossipSub(IPubsubRouter):
# Add the peers to fanout[topic] # Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers) self.fanout[topic].extend(selected_peers)
async def gossip_heartbeat(self) -> None: def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]:
peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict)
for topic in self.mesh: for topic in self.mesh:
msg_ids = self.mcache.window(topic) msg_ids = self.mcache.window(topic)
if msg_ids: if msg_ids:
# TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too # Get all pubsub peers in a topic and only add them if they are gossipsub peers too
if topic in self.pubsub.peer_topics: if topic in self.pubsub.peer_topics:
# Select D peers from peers.gossipsub[topic] # Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, [] topic, self.degree, self.mesh[topic]
) )
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
for peer in peers_to_emit_ihave_to: for peer in peers_to_emit_ihave_to:
# TODO: this line is a monster, can hopefully be simplified peers_to_gossip[peer][topic] = msg_id_strs
if (
topic not in self.mesh or (peer not in self.mesh[topic])
) and (
topic not in self.fanout or (peer not in self.fanout[topic])
):
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
await self.emit_ihave(topic, msg_id_strs, peer)
# TODO: Refactor and Dedup. This section is the roughly the same as the above. # TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh # Do the same for fanout, for all topics not already hit in mesh
for topic in self.fanout: for topic in self.fanout:
if topic not in self.mesh: msg_ids = self.mcache.window(topic)
msg_ids = self.mcache.window(topic) if msg_ids:
if msg_ids: # Get all pubsub peers in topic and only add if they are gossipsub peers also
# TODO: Make more efficient, possibly using a generator? if topic in self.pubsub.peer_topics:
# Get all pubsub peers in topic and only add if they are gossipsub peers also # Select D peers from peers.gossipsub[topic]
if topic in self.pubsub.peer_topics: peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
# Select D peers from peers.gossipsub[topic] topic, self.degree, self.fanout[topic]
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( )
topic, self.degree, [] msg_id_strs = [str(msg) for msg in msg_ids]
) for peer in peers_to_emit_ihave_to:
for peer in peers_to_emit_ihave_to: peers_to_gossip[peer][topic] = msg_id_strs
if ( return peers_to_gossip
peer not in self.mesh[topic]
and peer not in self.fanout[topic]
):
msg_id_strs = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_id_strs, peer)
self.mcache.shift()
@staticmethod @staticmethod
def select_from_minus( def select_from_minus(
@ -548,6 +639,21 @@ class GossipSub(IPubsubRouter):
# RPC emitters # RPC emitters
def pack_control_msgs(
self,
ihave_msgs: List[rpc_pb2.ControlIHave],
graft_msgs: List[rpc_pb2.ControlGraft],
prune_msgs: List[rpc_pb2.ControlPrune],
) -> rpc_pb2.ControlMessage:
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
if ihave_msgs:
control_msg.ihave.extend(ihave_msgs)
if graft_msgs:
control_msg.graft.extend(graft_msgs)
if prune_msgs:
control_msg.prune.extend(prune_msgs)
return control_msg
async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None:
"""Emit ihave message, sent to to_peer, for topic and msg_ids.""" """Emit ihave message, sent to to_peer, for topic and msg_ids."""

View File

@ -78,7 +78,6 @@ class Pubsub:
topic_validators: Dict[str, TopicValidator] topic_validators: Dict[str, TopicValidator]
# TODO: Be sure it is increased atomically everytime.
counter: int # uint64 counter: int # uint64
_tasks: List["asyncio.Future[Any]"] _tasks: List["asyncio.Future[Any]"]
@ -300,7 +299,6 @@ class Pubsub:
logger.debug("Fail to add new peer %s: stream closed", peer_id) logger.debug("Fail to add new peer %s: stream closed", peer_id)
del self.peers[peer_id] del self.peers[peer_id]
return return
# TODO: Check EOF of this stream.
# TODO: Check if the peer in black list. # TODO: Check if the peer in black list.
try: try:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
@ -318,6 +316,7 @@ class Pubsub:
for topic in self.peer_topics: for topic in self.peer_topics:
if peer_id in self.peer_topics[topic]: if peer_id in self.peer_topics[topic]:
# Delete the entry if no other peers left
self.peer_topics[topic].remove(peer_id) self.peer_topics[topic].remove(peer_id)
self.router.remove_peer(peer_id) self.router.remove_peer(peer_id)
@ -325,12 +324,9 @@ class Pubsub:
logger.debug("removed dead peer %s", peer_id) logger.debug("removed dead peer %s", peer_id)
async def handle_peer_queue(self) -> None: async def handle_peer_queue(self) -> None:
""" """Continuously read from peer queue and each time a new peer is found,
Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub
open a stream to the peer using a supported pubsub protocol protocols we support."""
TODO: Handle failure for when the peer does not support any of the
pubsub protocols we support
"""
while True: while True:
peer_id: ID = await self.peer_queue.get() peer_id: ID = await self.peer_queue.get()
# Add Peer # Add Peer
@ -364,6 +360,7 @@ class Pubsub:
else: else:
if sub_message.topicid in self.peer_topics: if sub_message.topicid in self.peer_topics:
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id in self.peer_topics[sub_message.topicid]:
# Delete the entry if no other peers left
self.peer_topics[sub_message.topicid].remove(origin_id) self.peer_topics[sub_message.topicid].remove(origin_id)
# FIXME(mhchia): Change the function name? # FIXME(mhchia): Change the function name?

View File

@ -24,6 +24,7 @@ class GossipsubParams(NamedTuple):
time_to_live: int = 30 time_to_live: int = 30
gossip_window: int = 3 gossip_window: int = 3
gossip_history: int = 5 gossip_history: int = 5
heartbeat_initial_delay: float = 0.1
heartbeat_interval: float = 0.5 heartbeat_interval: float = 0.5

View File

@ -142,6 +142,7 @@ class GossipsubFactory(factory.Factory):
time_to_live = GOSSIPSUB_PARAMS.time_to_live time_to_live = GOSSIPSUB_PARAMS.time_to_live
gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_window = GOSSIPSUB_PARAMS.gossip_window
gossip_history = GOSSIPSUB_PARAMS.gossip_history gossip_history = GOSSIPSUB_PARAMS.gossip_history
heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval

View File

@ -3,7 +3,8 @@ import random
import pytest import pytest
from libp2p.tools.constants import GossipsubParams from libp2p.peer.id import ID
from libp2p.tools.constants import GOSSIPSUB_PARAMS, GossipsubParams
from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect
from libp2p.tools.utils import connect from libp2p.tools.utils import connect
@ -147,8 +148,8 @@ async def test_handle_prune(pubsubs_gsub, hosts):
await connect(hosts[index_alice], hosts[index_bob]) await connect(hosts[index_alice], hosts[index_bob])
# Wait 3 seconds for heartbeat to allow mesh to connect # Wait for heartbeat to allow mesh to connect
await asyncio.sleep(3) await asyncio.sleep(1)
# Check that they are each other's mesh peer # Check that they are each other's mesh peer
assert id_alice in gossipsubs[index_bob].mesh[topic] assert id_alice in gossipsubs[index_bob].mesh[topic]
@ -157,15 +158,16 @@ async def test_handle_prune(pubsubs_gsub, hosts):
# alice emit prune message to bob, alice should be removed # alice emit prune message to bob, alice should be removed
# from bob's mesh peer # from bob's mesh peer
await gossipsubs[index_alice].emit_prune(topic, id_bob) await gossipsubs[index_alice].emit_prune(topic, id_bob)
# `emit_prune` does not remove bob from alice's mesh peers
assert id_bob in gossipsubs[index_alice].mesh[topic]
# FIXME: This test currently works because the heartbeat interval # NOTE: We increase `heartbeat_interval` to 3 seconds so that bob will not
# is increased to 3 seconds, so alice won't get add back into # add alice back to his mesh after heartbeat.
# bob's mesh peer during heartbeat. # Wait for bob to `handle_prune`
await asyncio.sleep(1) await asyncio.sleep(0.1)
# Check that alice is no longer bob's mesh peer # Check that alice is no longer bob's mesh peer
assert id_alice not in gossipsubs[index_bob].mesh[topic] assert id_alice not in gossipsubs[index_bob].mesh[topic]
assert id_bob in gossipsubs[index_alice].mesh[topic]
@pytest.mark.parametrize("num_hosts", (10,)) @pytest.mark.parametrize("num_hosts", (10,))
@ -366,3 +368,125 @@ async def test_gossip_propagation(hosts, pubsubs_gsub):
# should be able to read message # should be able to read message
msg = await queue_1.get() msg = await queue_1.get()
assert msg.data == msg_content assert msg.data == msg_content
@pytest.mark.parametrize(
"num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),)
)
@pytest.mark.parametrize("initial_mesh_peer_count", (7, 10, 13))
@pytest.mark.asyncio
async def test_mesh_heartbeat(
num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch
):
# It's difficult to set up the initial peer subscription condition.
# Ideally I would like to have initial mesh peer count that's below ``GossipSubDegree``
# so I can test if `mesh_heartbeat` return correct peers to GRAFT.
# The problem is that I can not set it up so that we have peers subscribe to the topic
# but not being part of our mesh peers (as these peers are the peers to GRAFT).
# So I monkeypatch the peer subscriptions and our mesh peers.
total_peer_count = 14
topic = "TEST_MESH_HEARTBEAT"
fake_peer_ids = [
ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count)
]
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
peer_topics = {topic: fake_peer_ids}
# Monkeypatch the peer subscriptions
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count)
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
router_mesh = {topic: list(mesh_peers)}
# Monkeypatch our mesh peers
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat()
if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree:
# If number of initial mesh peers is more than `GossipSubDegree`, we should PRUNE mesh peers
assert len(peers_to_graft) == 0
assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree
for peer in peers_to_prune:
assert peer in mesh_peers
elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree:
# If number of initial mesh peers is less than `GossipSubDegree`, we should GRAFT more peers
assert len(peers_to_prune) == 0
assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count
for peer in peers_to_graft:
assert peer not in mesh_peers
else:
assert len(peers_to_prune) == 0 and len(peers_to_graft) == 0
@pytest.mark.parametrize(
"num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),)
)
@pytest.mark.parametrize("initial_peer_count", (1, 4, 7))
@pytest.mark.asyncio
async def test_gossip_heartbeat(
num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch
):
# The problem is that I can not set it up so that we have peers subscribe to the topic
# but not being part of our mesh peers (as these peers are the peers to GRAFT).
# So I monkeypatch the peer subscriptions and our mesh peers.
total_peer_count = 28
topic_mesh = "TEST_GOSSIP_HEARTBEAT_1"
topic_fanout = "TEST_GOSSIP_HEARTBEAT_2"
fake_peer_ids = [
ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count)
]
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
topic_mesh_peer_count = 14
# Split into mesh peers and fanout peers
peer_topics = {
topic_mesh: fake_peer_ids[:topic_mesh_peer_count],
topic_fanout: fake_peer_ids[topic_mesh_peer_count:],
}
# Monkeypatch the peer subscriptions
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count)
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
router_mesh = {topic_mesh: list(mesh_peers)}
# Monkeypatch our mesh peers
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
fanout_peer_indices = random.sample(
range(topic_mesh_peer_count, total_peer_count), initial_peer_count
)
fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices]
router_fanout = {topic_fanout: list(fanout_peers)}
# Monkeypatch our fanout peers
monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout)
def window(topic):
if topic == topic_mesh:
return [topic_mesh]
elif topic == topic_fanout:
return [topic_fanout]
else:
return []
# Monkeypatch the memory cache messages
monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window)
peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat()
# If our mesh peer count is less than `GossipSubDegree`, we should gossip to up to
# `GossipSubDegree` peers (exclude mesh peers).
if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree:
# The same goes for fanout so it's two times the number of peers to gossip.
assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count)
elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree:
assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree)
for peer in peers_to_gossip:
if peer in peer_topics[topic_mesh]:
# Check that the peer to gossip to is not in our mesh peers
assert peer not in mesh_peers
assert topic_mesh in peers_to_gossip[peer]
elif peer in peer_topics[topic_fanout]:
# Check that the peer to gossip to is not in our fanout peers
assert peer not in fanout_peers
assert topic_fanout in peers_to_gossip[peer]