diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 14394c71..4b160203 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -35,7 +35,7 @@ class FloodSub(IPubsubRouter): :param peer_id: id of peer to remove """ - def handle_rpc(self, rpc): + async def handle_rpc(self, rpc, sender_peer_id): """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -64,6 +64,11 @@ class FloodSub(IPubsubRouter): for message in packet.publish: decoded_from_id = message.from_id.decode('utf-8') if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): + id_in_seen_msgs = (message.seqno, message.from_id) + + if id_in_seen_msgs not in self.pubsub.seen_messages: + self.pubsub.seen_messages[id_in_seen_msgs] = 1 + await self.pubsub.handle_talk(message) # Deliver to self and peers @@ -81,7 +86,7 @@ class FloodSub(IPubsubRouter): # Publish the packet await stream.write(new_packet.SerializeToString()) - def join(self, topic): + async def join(self, topic): """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the @@ -89,7 +94,7 @@ class FloodSub(IPubsubRouter): :param topic: topic to join """ - def leave(self, topic): + async def leave(self, topic): """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py new file mode 100644 index 00000000..8524ca6c --- /dev/null +++ b/libp2p/pubsub/gossipsub.py @@ -0,0 +1,543 @@ +import random +import asyncio + +from ast import literal_eval +from .pb import rpc_pb2 +from .pubsub_router_interface import IPubsubRouter +from .mcache import MessageCache + + +class GossipSub(IPubsubRouter): + # pylint: disable=no-member + # pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-public-methods + + def __init__(self, protocols, degree, degree_low, degree_high, time_to_live, gossip_window=3, + gossip_history=5, heartbeat_interval=120): + # pylint: disable=too-many-arguments + self.protocols = protocols + self.pubsub = None + + # Store target degree, upper degree bound, and lower degree bound + self.degree = degree + self.degree_high = degree_high + self.degree_low = degree_low + + # Store time to live (for topics in fanout) + self.time_to_live = time_to_live + + # Create topic --> list of peers mappings + self.mesh = {} + self.fanout = {} + + # Create topic --> time since last publish map + self.time_since_last_publish = {} + + self.peers_gossipsub = [] + self.peers_floodsub = [] + + # Create message cache + self.mcache = MessageCache(gossip_window, gossip_history) + + # Create heartbeat timer + self.heartbeat_interval = heartbeat_interval + + # Interface functions + + def get_protocols(self): + """ + :return: the list of protocols supported by the router + """ + return self.protocols + + def attach(self, pubsub): + """ + Attach is invoked by the PubSub constructor to attach the router to a + freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to + """ + self.pubsub = pubsub + + # Start heartbeat now that we have a pubsub instance + # TODO: Start after delay + asyncio.ensure_future(self.heartbeat()) + + def add_peer(self, peer_id, protocol_id): + """ + Notifies the router that a new peer has been connected + :param peer_id: id of peer to add + """ + + # Add peer to the correct peer list + peer_type = GossipSub.get_peer_type(protocol_id) + peer_id_str = str(peer_id) + if peer_type == "gossip": + self.peers_gossipsub.append(peer_id_str) + elif peer_type == "flood": + self.peers_floodsub.append(peer_id_str) + + def remove_peer(self, peer_id): + """ + Notifies the router that a peer has been disconnected + :param peer_id: id of peer to remove + """ + peer_id_str = str(peer_id) + self.peers_to_protocol.remove(peer_id_str) + + async def handle_rpc(self, rpc, sender_peer_id): + """ + Invoked to process control messages in the RPC envelope. + It is invoked after subscriptions and payload messages have been processed + :param rpc: rpc message + """ + control_message = rpc.control + + # Relay each rpc control to the appropriate handler + if control_message.ihave: + for ihave in control_message.ihave: + await self.handle_ihave(ihave, sender_peer_id) + if control_message.iwant: + for iwant in control_message.iwant: + await self.handle_iwant(iwant, sender_peer_id) + if control_message.graft: + for graft in control_message.graft: + await self.handle_graft(graft, sender_peer_id) + if control_message.prune: + for prune in control_message.prune: + await self.handle_prune(prune, sender_peer_id) + + async def publish(self, sender_peer_id, rpc_message): + # pylint: disable=too-many-locals + """ + Invoked to forward a new message that has been validated. + """ + + packet = rpc_pb2.RPC() + packet.ParseFromString(rpc_message) + msg_sender = str(sender_peer_id) + + # Deliver to self if self was origin + # Note: handle_talk checks if self is subscribed to topics in message + for message in packet.publish: + # Add RPC message to cache + self.mcache.put(message) + + decoded_from_id = message.from_id.decode('utf-8') + new_packet = rpc_pb2.RPC() + new_packet.publish.extend([message]) + new_packet_serialized = new_packet.SerializeToString() + + # Deliver to self if needed + if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): + id_in_seen_msgs = (message.seqno, message.from_id) + + if id_in_seen_msgs not in self.pubsub.seen_messages: + self.pubsub.seen_messages[id_in_seen_msgs] = 1 + + await self.pubsub.handle_talk(message) + + # Deliver to peers + for topic in message.topicIDs: + # If topic has floodsub peers, deliver to floodsub peers + # TODO: This can be done more efficiently. Do it more efficiently. + floodsub_peers_in_topic = [] + if topic in self.pubsub.peer_topics: + for peer in self.pubsub.peer_topics[topic]: + if str(peer) in self.peers_floodsub: + floodsub_peers_in_topic.append(peer) + + await self.deliver_messages_to_peers(floodsub_peers_in_topic, msg_sender, + decoded_from_id, new_packet_serialized) + + # If you are subscribed to topic, send to mesh, otherwise send to fanout + if topic in self.pubsub.my_topics and topic in self.mesh: + await self.deliver_messages_to_peers(self.mesh[topic], msg_sender, + decoded_from_id, new_packet_serialized) + else: + # Send to fanout peers + if topic not in self.fanout: + # If no peers in fanout, choose some peers from gossipsub peers in topic + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + + selected = \ + GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + self.fanout[topic] = selected + + # TODO: Is topic DEFINITELY supposed to be in fanout if we are not subscribed? + # I assume there could be short periods between heartbeats where topic may not + # be but we should check that this path gets hit appropriately + + await self.deliver_messages_to_peers(self.fanout[topic], msg_sender, + decoded_from_id, new_packet_serialized) + + async def join(self, topic): + # Note: the comments here are the near-exact algorithm description from the spec + """ + Join notifies the router that we want to receive and + forward messages in a topic. It is invoked after the + subscription announcement + :param topic: topic to join + """ + # Create mesh[topic] if it does not yet exist + if topic not in self.mesh: + self.mesh[topic] = [] + + if topic in self.fanout and len(self.fanout[topic]) == self.degree: + # If router already has D peers from the fanout peers of a topic + # TODO: Do we remove all peers from fanout[topic]? + + # Add them to mesh[topic], and notifies them with a + # GRAFT(topic) control message. + for peer in self.fanout[topic]: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) + else: + # Otherwise, if there are less than D peers + # (let this number be x) in the fanout for a topic (or the topic is not in the fanout), + fanout_size = 0 + if topic in self.fanout: + fanout_size = len(self.fanout[topic]) + # then it still adds them as above (if there are any) + for peer in self.fanout[topic]: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) + + if topic in self.peers_gossipsub: + # TODO: Should we have self.fanout[topic] here or [] (as the minus variable)? + # Selects the remaining number of peers (D-x) from peers.gossipsub[topic] + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + selected_peers = \ + GossipSub.select_from_minus(self.degree - fanout_size, + gossipsub_peers_in_topic, + self.fanout[topic] if topic in self.fanout else []) + + # And likewise adds them to mesh[topic] and notifies them with a + # GRAFT(topic) control message. + for peer in selected_peers: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) + + # TODO: Do we remove all peers from fanout[topic]? + + async def leave(self, topic): + # Note: the comments here are the near-exact algorithm description from the spec + """ + Leave notifies the router that we are no longer interested in a topic. + It is invoked after the unsubscription announcement. + :param topic: topic to leave + """ + # Notify the peers in mesh[topic] with a PRUNE(topic) message + for peer in self.mesh[topic]: + await self.emit_prune(topic, peer) + + # Forget mesh[topic] + self.mesh.pop(topic, None) + + # Interface Helper Functions + @staticmethod + def get_peer_type(protocol_id): + # TODO: Do this in a better, more efficient way + if "gossipsub" in protocol_id: + return "gossip" + if "floodsub" in protocol_id: + return "flood" + return "unknown" + + async def deliver_messages_to_peers(self, peers, msg_sender, origin_id, serialized_packet): + for peer_id_in_topic in peers: + # Forward to all peers that are not the + # message sender and are not the message origin + + if peer_id_in_topic not in (msg_sender, origin_id): + stream = self.pubsub.peers[peer_id_in_topic] + + # Publish the packet + await stream.write(serialized_packet) + + # Heartbeat + async def heartbeat(self): + """ + Call individual heartbeats. + Note: the heartbeats are called with awaits because each heartbeat depends on the + state changes in the preceding heartbeat + """ + while True: + + await self.mesh_heartbeat() + await self.fanout_heartbeat() + await self.gossip_heartbeat() + + await asyncio.sleep(self.heartbeat_interval) + + async def mesh_heartbeat(self): + # Note: the comments here are the exact pseudocode from the spec + for topic in self.mesh: + + num_mesh_peers_in_topic = len(self.mesh[topic]) + if num_mesh_peers_in_topic < self.degree_low: + + # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] + selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic, + self.peers_gossipsub, self.mesh[topic]) + + for peer in selected_peers: + # Add peer to mesh[topic] + self.mesh[topic].append(peer) + + # Emit GRAFT(topic) control message to peer + await self.emit_graft(topic, peer) + + if num_mesh_peers_in_topic > self.degree_high: + # Select |mesh[topic]| - D peers from mesh[topic] + selected_peers = GossipSub.select_from_minus(num_mesh_peers_in_topic - self.degree, + self.mesh[topic], []) + for peer in selected_peers: + # Remove peer from mesh[topic] + self.mesh[topic].remove(peer) + + # Emit PRUNE(topic) control message to peer + await self.emit_prune(topic, peer) + + async def fanout_heartbeat(self): + # Note: the comments here are the exact pseudocode from the spec + for topic in self.fanout: + # If time since last published > ttl + # TODO: there's no way time_since_last_publish gets set anywhere yet + if self.time_since_last_publish[topic] > self.time_to_live: + # Remove topic from fanout + self.fanout.remove(topic) + self.time_since_last_publish.remove(topic) + else: + num_fanout_peers_in_topic = len(self.fanout[topic]) + + # If |fanout[topic]| < D + if num_fanout_peers_in_topic < self.degree: + # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + selected_peers = \ + GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic, + gossipsub_peers_in_topic, self.fanout[topic]) + + # Add the peers to fanout[topic] + self.fanout[topic].extend(selected_peers) + + async def gossip_heartbeat(self): + # pylint: disable=too-many-nested-blocks + for topic in self.mesh: + msg_ids = self.mcache.window(topic) + if msg_ids: + # TODO: Make more efficient, possibly using a generator? + # Get all pubsub peers in a topic and only add them if they are gossipsub peers too + if topic in self.pubsub.peer_topics: + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + + # Select D peers from peers.gossipsub[topic] + peers_to_emit_ihave_to = \ + GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + + for peer in peers_to_emit_ihave_to: + # TODO: this line is a monster, can hopefully be simplified + if (topic not in self.mesh or (peer not in self.mesh[topic]))\ + and (topic not in self.fanout or (peer not in self.fanout[topic])): + msg_ids = [str(msg) for msg in msg_ids] + await self.emit_ihave(topic, msg_ids, peer) + + # Do the same for fanout, for all topics not already hit in mesh + for topic in self.fanout: + if topic not in self.mesh: + msg_ids = self.mcache.window(topic) + if msg_ids: + # TODO: Make more efficient, possibly using a generator? + # Get all pubsub peers in topic and only add if they are gossipsub peers also + if topic in self.pubsub.peer_topics: + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + + # Select D peers from peers.gossipsub[topic] + peers_to_emit_ihave_to = \ + GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + for peer in peers_to_emit_ihave_to: + if peer not in self.mesh[topic] and peer not in self.fanout[topic]: + + msg_ids = [str(msg) for msg in msg_ids] + await self.emit_ihave(topic, msg_ids, peer) + + self.mcache.shift() + + @staticmethod + def select_from_minus(num_to_select, pool, minus): + """ + Select at most num_to_select subset of elements from the set (pool - minus) randomly. + :param num_to_select: number of elements to randomly select + :param pool: list of items to select from (excluding elements in minus) + :param minus: elements to be excluded from selection pool + :return: list of selected elements + """ + # Create selection pool, which is selection_pool = pool - minus + if minus: + # Create a new selection pool by removing elements of minus + selection_pool = [x for x in pool if x not in minus] + else: + # Don't create a new selection_pool if we are not subbing anything + selection_pool = pool + + # If num_to_select > size(selection_pool), then return selection_pool (which has the most + # possible elements s.t. the number of elements is less than num_to_select) + if num_to_select > len(selection_pool): + return selection_pool + + # Random selection + selection = random.sample(selection_pool, num_to_select) + + return selection + + # RPC handlers + + async def handle_ihave(self, ihave_msg, sender_peer_id): + """ + Checks the seen set and requests unknown messages with an IWANT message. + """ + # from_id_bytes = ihave_msg.from_id + + from_id_str = sender_peer_id + + # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache + seen_seqnos_and_peers = [seqno_and_from + for seqno_and_from in self.pubsub.seen_messages.keys()] + + # Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list + # of messages we want to request + msg_ids_wanted = [msg_id for msg_id in ihave_msg.messageIDs + if literal_eval(msg_id) not in seen_seqnos_and_peers] + + # Request messages with IWANT message + if msg_ids_wanted: + await self.emit_iwant(msg_ids_wanted, from_id_str) + + async def handle_iwant(self, iwant_msg, sender_peer_id): + """ + Forwards all request messages that are present in mcache to the requesting peer. + """ + from_id_str = sender_peer_id + + msg_ids = [literal_eval(msg) for msg in iwant_msg.messageIDs] + msgs_to_forward = [] + for msg_id_iwant in msg_ids: + # Check if the wanted message ID is present in mcache + msg = self.mcache.get(msg_id_iwant) + + # Cache hit + if msg: + # Add message to list of messages to forward to requesting peers + msgs_to_forward.append(msg) + + # Forward messages to requesting peer + # Should this just be publishing? No + # because then the message will forwarded to peers in the topics contained in the messages. + # We should + # 1) Package these messages into a single packet + packet = rpc_pb2.RPC() + + packet.publish.extend(msgs_to_forward) + + # 2) Serialize that packet + rpc_msg = packet.SerializeToString() + + # 3) Get the stream to this peer + # TODO: Should we pass in from_id or from_id_str here? + peer_stream = self.pubsub.peers[from_id_str] + + # 4) And write the packet to the stream + await peer_stream.write(rpc_msg) + + async def handle_graft(self, graft_msg, sender_peer_id): + topic = graft_msg.topicID + + from_id_str = sender_peer_id + + # Add peer to mesh for topic + if topic in self.mesh: + self.mesh[topic].append(from_id_str) + else: + self.mesh[topic] = [from_id_str] + + async def handle_prune(self, prune_msg, sender_peer_id): + topic = prune_msg.topicID + + from_id_str = sender_peer_id + + # Remove peer from mesh for topic, if peer is in topic + if topic in self.mesh and from_id_str in self.mesh[topic]: + self.mesh[topic].remove(from_id_str) + + # RPC emitters + + async def emit_ihave(self, topic, msg_ids, to_peer): + """ + Emit ihave message, sent to to_peer, for topic and msg_ids + """ + + ihave_msg = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(msg_ids) + ihave_msg.topicID = topic + + control_msg = rpc_pb2.ControlMessage() + control_msg.ihave.extend([ihave_msg]) + + await self.emit_control_message(control_msg, to_peer) + + async def emit_iwant(self, msg_ids, to_peer): + """ + Emit iwant message, sent to to_peer, for msg_ids + """ + + iwant_msg = rpc_pb2.ControlIWant() + iwant_msg.messageIDs.extend(msg_ids) + + control_msg = rpc_pb2.ControlMessage() + control_msg.iwant.extend([iwant_msg]) + + await self.emit_control_message(control_msg, to_peer) + + async def emit_graft(self, topic, to_peer): + """ + Emit graft message, sent to to_peer, for topic + """ + + graft_msg = rpc_pb2.ControlGraft() + graft_msg.topicID = topic + + control_msg = rpc_pb2.ControlMessage() + control_msg.graft.extend([graft_msg]) + + await self.emit_control_message(control_msg, to_peer) + + async def emit_prune(self, topic, to_peer): + """ + Emit graft message, sent to to_peer, for topic + """ + + prune_msg = rpc_pb2.ControlPrune() + prune_msg.topicID = topic + + control_msg = rpc_pb2.ControlMessage() + control_msg.prune.extend([prune_msg]) + + await self.emit_control_message(control_msg, to_peer) + + async def emit_control_message(self, control_msg, to_peer): + # Add control message to packet + packet = rpc_pb2.RPC() + packet.control.CopyFrom(control_msg) + + rpc_msg = packet.SerializeToString() + + # Get stream for peer from pubsub + peer_stream = self.pubsub.peers[to_peer] + + # Write rpc to stream + await peer_stream.write(rpc_msg) diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py new file mode 100644 index 00000000..071945a5 --- /dev/null +++ b/libp2p/pubsub/mcache.py @@ -0,0 +1,92 @@ +class MessageCache: + + class CacheEntry: + # pylint: disable=too-few-public-methods + """ + A logical representation of an entry in the mcache's _history_. + """ + def __init__(self, mid, topics): + """ + Constructor. + :param mid: (seqno, from_id) of the msg + :param topics: list of topics this message was sent on + """ + self.mid = mid + self.topics = topics + + def __init__(self, window_size, history_size): + """ + Constructor. + :param window_size: Size of the window desired. + :param history_size: Size of the history desired. + :return: the MessageCache + """ + self.window_size = window_size + self.history_size = history_size + + # (seqno, from_id) -> rpc message + self.msgs = dict() + + # max length of history_size. each item is a list of CacheEntry. + # messages lost upon shift(). + self.history = [] + + for _ in range(history_size): + self.history.append([]) + + def put(self, msg): + """ + Put a message into the mcache. + :param msg: The rpc message to put in. Should contain seqno and from_id + """ + mid = (msg.seqno, msg.from_id) + self.msgs[mid] = msg + + if not self.history[0]: + self.history[0] = [] + + self.history[0].append(self.CacheEntry(mid, msg.topicIDs)) + + def get(self, mid): + """ + Get a message from the mcache. + :param mid: (seqno, from_id) of the message to get. + :return: The rpc message associated with this mid + """ + if mid in self.msgs: + return self.msgs[mid] + + return None + + def window(self, topic): + """ + Get the window for this topic. + :param topic: Topic whose message ids we desire. + :return: List of mids in the current window. + """ + mids = [] + + for entries_list in self.history[: self.window_size]: + for entry in entries_list: + for entry_topic in entry.topics: + if entry_topic == topic: + mids.append(entry.mid) + + return mids + + def shift(self): + """ + Shift the window over by 1 position, dropping the last element of the history. + """ + last_entries = self.history[len(self.history) - 1] + + for entry in last_entries: + del self.msgs[entry.mid] + + i = len(self.history) - 2 + + while i >= 0: + self.history[i + 1] = self.history[i] + i -= 1 + + self.history[0] = [] diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 2109a9ca..e0f6580f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,5 +1,6 @@ # pylint: disable=no-name-in-module import asyncio + from lru import LRU from .pb import rpc_pb2 @@ -34,8 +35,7 @@ class Pubsub(): for protocol in self.protocols: self.host.set_stream_handler(protocol, self.stream_handler) - # TODO: determine if these need to be asyncio queues, or if could possibly - # be ordinary blocking queues + # Use asyncio queues for proper context switching self.incoming_msgs_from_peers = asyncio.Queue() self.outgoing_messages = asyncio.Queue() @@ -44,9 +44,10 @@ class Pubsub(): self.cache_size = 128 else: self.cache_size = cache_size + self.seen_messages = LRU(self.cache_size) - # Map of topics we are subscribed to to handler functions + # Map of topics we are subscribed to blocking queues # for when the given topic receives a message self.my_topics = {} @@ -96,6 +97,7 @@ class Pubsub(): if id_in_seen_msgs not in self.seen_messages: should_publish = True self.seen_messages[id_in_seen_msgs] = 1 + await self.handle_talk(message) if rpc_incoming.subscriptions: @@ -112,6 +114,10 @@ class Pubsub(): # relay message to peers with router await self.router.publish(peer_id, incoming) + if rpc_incoming.control: + # Pass rpc to router so router could perform custom logic + await self.router.handle_rpc(rpc_incoming, peer_id) + # Force context switch await asyncio.sleep(0) @@ -180,8 +186,9 @@ class Pubsub(): # Add peer to topic self.peer_topics[sub_message.topicid].append(origin_id) else: - # TODO: Remove peer from topic - pass + if sub_message.topicid in self.peer_topics: + if origin_id in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid].remove(origin_id) async def handle_talk(self, publish_message): """ @@ -217,7 +224,7 @@ class Pubsub(): await self.message_all_peers(packet.SerializeToString()) # Tell router we are joining this topic - self.router.join(topic_id) + await self.router.join(topic_id) # Return the asyncio queue for messages on this topic return self.my_topics[topic_id] @@ -243,7 +250,7 @@ class Pubsub(): await self.message_all_peers(packet.SerializeToString()) # Tell router we are leaving this topic - self.router.leave(topic_id) + await self.router.leave(topic_id) async def message_all_peers(self, rpc_msg): """ diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index ec5132e8..e581570c 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -31,10 +31,13 @@ class IPubsubRouter(ABC): """ @abstractmethod - def handle_rpc(self, rpc): + def handle_rpc(self, rpc, sender_peer_id): """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed + TODO: Check if this interface is ok. It's not the exact same as the go code, but the go + code is really confusing with the msg origin, they specify `rpc.from` even when the rpc + shouldn't have a from :param rpc: rpc message """ diff --git a/setup.py b/setup.py index 85420608..e244a8b3 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,8 @@ setuptools.setup( "rpcudp", "grpcio", "grpcio-tools", - "lru-dict>=1.1.6" + "lru-dict>=1.1.6", + "aio_timers" ], packages=["libp2p"], zip_safe=False, diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 06c1cbd4..2c67185c 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -613,4 +613,4 @@ async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): } ] } - await perform_test_from_obj(test_obj) \ No newline at end of file + await perform_test_from_obj(test_obj) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py new file mode 100644 index 00000000..0c94faee --- /dev/null +++ b/tests/pubsub/test_gossipsub.py @@ -0,0 +1,271 @@ +import asyncio +import pytest +import random + +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.pubsub import Pubsub +from utils import message_id_generator, generate_RPC_packet, \ + create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ + connect +from tests.utils import cleanup + +SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] + + +@pytest.mark.asyncio +async def test_dense(): + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 10 + num_msgs = 5 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + # All pubsub subscribe to foobar + queues = [] + for pubsub in pubsubs: + q = await pubsub.subscribe("foobar") + + # Add each blocking queue to an array of blocking queues + queues.append(q) + + # Sparsely connect libp2p hosts in random way + await dense_connect(libp2p_hosts) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + for i in range(num_msgs): + msg_content = "foo " + str(i) + + # randomly pick a message origin + origin_idx = random.randint(0, num_hosts - 1) + origin_host = libp2p_hosts[origin_idx] + host_id = str(origin_host.get_id()) + + # Generate message packet + packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + + await asyncio.sleep(0.5) + # Assert that all blocking queues receive the message + items = [] + for queue in queues: + msg = await queue.get() + assert msg.data == packet.publish[0].data + items.append(msg.data) + await cleanup() + +@pytest.mark.asyncio +async def test_fanout(): + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 10 + num_msgs = 5 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + # All pubsub subscribe to foobar + queues = [] + for i in range(1, len(pubsubs)): + q = await pubsubs[i].subscribe("foobar") + + # Add each blocking queue to an array of blocking queues + queues.append(q) + + # Sparsely connect libp2p hosts in random way + await dense_connect(libp2p_hosts) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + # Send messages with origin not subscribed + for i in range(num_msgs): + msg_content = "foo " + str(i) + + # Pick the message origin to the node that is not subscribed to 'foobar' + origin_idx = 0 + origin_host = libp2p_hosts[origin_idx] + host_id = str(origin_host.get_id()) + + # Generate message packet + packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + + await asyncio.sleep(0.5) + # Assert that all blocking queues receive the message + for queue in queues: + msg = await queue.get() + assert msg.SerializeToString() == packet.publish[0].SerializeToString() + + # Subscribe message origin + queues.append(await pubsubs[0].subscribe("foobar")) + + # Send messages again + for i in range(num_msgs): + msg_content = "foo " + str(i) + + # Pick the message origin to the node that is not subscribed to 'foobar' + origin_idx = 0 + origin_host = libp2p_hosts[origin_idx] + host_id = str(origin_host.get_id()) + + # Generate message packet + packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + + await asyncio.sleep(0.5) + # Assert that all blocking queues receive the message + for queue in queues: + msg = await queue.get() + assert msg.SerializeToString() == packet.publish[0].SerializeToString() + + await cleanup() + +@pytest.mark.asyncio +async def test_fanout_maintenance(): + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 10 + num_msgs = 5 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + # All pubsub subscribe to foobar + queues = [] + for i in range(1, len(pubsubs)): + q = await pubsubs[i].subscribe("foobar") + + # Add each blocking queue to an array of blocking queues + queues.append(q) + + # Sparsely connect libp2p hosts in random way + await dense_connect(libp2p_hosts) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + # Send messages with origin not subscribed + for i in range(num_msgs): + msg_content = "foo " + str(i) + + # Pick the message origin to the node that is not subscribed to 'foobar' + origin_idx = 0 + origin_host = libp2p_hosts[origin_idx] + host_id = str(origin_host.get_id()) + + # Generate message packet + packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + + await asyncio.sleep(0.5) + # Assert that all blocking queues receive the message + for queue in queues: + msg = await queue.get() + assert msg.SerializeToString() == packet.publish[0].SerializeToString() + + for sub in pubsubs: + await sub.unsubscribe('foobar') + + queues = [] + + await asyncio.sleep(2) + + # Resub and repeat + for i in range(1, len(pubsubs)): + q = await pubsubs[i].subscribe("foobar") + + # Add each blocking queue to an array of blocking queues + queues.append(q) + + await asyncio.sleep(2) + + # Check messages can still be sent + for i in range(num_msgs): + msg_content = "foo " + str(i) + + # Pick the message origin to the node that is not subscribed to 'foobar' + origin_idx = 0 + origin_host = libp2p_hosts[origin_idx] + host_id = str(origin_host.get_id()) + + # Generate message packet + packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + + await asyncio.sleep(0.5) + # Assert that all blocking queues receive the message + for queue in queues: + msg = await queue.get() + assert msg.SerializeToString() == packet.publish[0].SerializeToString() + + await cleanup() + +@pytest.mark.asyncio +async def test_gossip_propagation(): + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 2 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 1, 0, 2, 30, 50, 100, 0.5) + node1, node2 = libp2p_hosts[0], libp2p_hosts[1] + sub1, sub2 = pubsubs[0], pubsubs[1] + gsub1, gsub2 = gossipsubs[0], gossipsubs[1] + + node1_queue = await sub1.subscribe('foo') + + # node 1 publish to topic + msg_content = 'foo_msg' + node1_id = str(node1.get_id()) + + # Generate message packet + packet = generate_RPC_packet(node1_id, ["foo"], msg_content, next_msg_id_func()) + + # publish from the randomly chosen host + await gsub1.publish(node1_id, packet.SerializeToString()) + + # now node 2 subscribes + node2_queue = await sub2.subscribe('foo') + + await connect(node2, node1) + + # wait for gossip heartbeat + await asyncio.sleep(2) + + # should be able to read message + msg = await node2_queue.get() + assert msg.SerializeToString() == packet.publish[0].SerializeToString() + + await cleanup() diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py new file mode 100644 index 00000000..468e25fd --- /dev/null +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -0,0 +1,519 @@ +import asyncio +import multiaddr +import pytest + +from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.pubsub import Pubsub +from utils import message_id_generator, generate_RPC_packet +from tests.utils import cleanup + +# pylint: disable=too-many-locals + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +@pytest.mark.asyncio +async def test_init(): + node = await new_node(transport_opt=["/ip4/127.1/tcp/0"]) + + await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.1/tcp/0")) + + supported_protocols = ["/gossipsub/1.0.0"] + + gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) + pubsub = Pubsub(node, gossipsub, "a") + + # Did it work? + assert gossipsub and pubsub + + await cleanup() + +async def perform_test_from_obj(obj): + """ + Perform a floodsub test from a test obj. + test obj are composed as follows: + + { + "supported_protocols": ["supported/protocol/1.0.0",...], + "adj_list": { + "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], + "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], + ... + }, + "topic_map": { + "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] + }, + "messages": [ + { + "topics": ["topic1_for_message", "topic2_for_message", ...], + "data": "some contents of the message (newlines are not supported)", + "node_id": "message sender node id" + }, + ... + ] + } + NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A + or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior + is undefined (even if it may work) + """ + + # Step 1) Create graph + adj_list = obj["adj_list"] + node_map = {} + gossipsub_map = {} + pubsub_map = {} + + supported_protocols = obj["supported_protocols"] + + tasks_connect = [] + for start_node_id in adj_list: + # Create node if node does not yet exist + if start_node_id not in node_map: + node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + node_map[start_node_id] = node + + gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) + gossipsub_map[start_node_id] = gossipsub + pubsub = Pubsub(node, gossipsub, start_node_id) + pubsub_map[start_node_id] = pubsub + + # For each neighbor of start_node, create if does not yet exist, + # then connect start_node to neighbor + for neighbor_id in adj_list[start_node_id]: + # Create neighbor if neighbor does not yet exist + if neighbor_id not in node_map: + neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + node_map[neighbor_id] = neighbor_node + + gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) + gossipsub_map[neighbor_id] = gossipsub + pubsub = Pubsub(neighbor_node, gossipsub, neighbor_id) + pubsub_map[neighbor_id] = pubsub + + # Connect node and neighbor + tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) + tasks_connect.append(asyncio.sleep(2)) + await asyncio.gather(*tasks_connect) + + # Allow time for graph creation before continuing + # await asyncio.sleep(0.25) + + # Step 2) Subscribe to topics + queues_map = {} + topic_map = obj["topic_map"] + + tasks_topic = [] + tasks_topic_data = [] + for topic in topic_map: + for node_id in topic_map[topic]: + """ + # Subscribe node to topic + q = await pubsub_map[node_id].subscribe(topic) + + # Create topic-queue map for node_id if one does not yet exist + if node_id not in queues_map: + queues_map[node_id] = {} + + # Store queue in topic-queue map for node + queues_map[node_id][topic] = q + """ + tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) + tasks_topic_data.append((node_id, topic)) + tasks_topic.append(asyncio.sleep(2)) + + # Gather is like Promise.all + responses = await asyncio.gather(*tasks_topic, return_exceptions=True) + for i in range(len(responses) - 1): + q = responses[i] + node_id, topic = tasks_topic_data[i] + if node_id not in queues_map: + queues_map[node_id] = {} + + # Store queue in topic-queue map for node + queues_map[node_id][topic] = q + + # Allow time for subscribing before continuing + # await asyncio.sleep(0.01) + + # Step 3) Publish messages + topics_in_msgs_ordered = [] + messages = obj["messages"] + tasks_publish = [] + next_msg_id_func = message_id_generator(0) + + for msg in messages: + topics = msg["topics"] + + data = msg["data"] + node_id = msg["node_id"] + + # Get actual id for sender node (not the id from the test obj) + actual_node_id = str(node_map[node_id].get_id()) + + # Create correctly formatted message + msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) + + # Publish message + tasks_publish.append(asyncio.ensure_future(gossipsub_map[node_id].publish(\ + actual_node_id, msg_talk.SerializeToString()))) + + # For each topic in topics, add topic, msg_talk tuple to ordered test list + # TODO: Update message sender to be correct message sender before + # adding msg_talk to this list + for topic in topics: + topics_in_msgs_ordered.append((topic, msg_talk)) + + # Allow time for publishing before continuing + # await asyncio.sleep(0.4) + tasks_publish.append(asyncio.sleep(2)) + await asyncio.gather(*tasks_publish) + + # Step 4) Check that all messages were received correctly. + # TODO: Check message sender too + for i in range(len(topics_in_msgs_ordered)): + topic, actual_msg = topics_in_msgs_ordered[i] + + # Look at each node in each topic + for node_id in topic_map[topic]: + # Get message from subscription queue + msg_on_node = await queues_map[node_id][topic].get() + assert actual_msg.publish[0].SerializeToString() == msg_on_node.SerializeToString() + + # Success, terminate pending tasks. + await cleanup() + +@pytest.mark.asyncio +async def test_simple_two_nodes_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_three_nodes_two_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"], + "B": ["C"] + }, + "topic_map": { + "topic1": ["B", "C"], + "topic2": ["B", "C"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + }, + { + "topics": ["topic2"], + "data": "Alex is tall", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "Alex is tall", + "node_id": "B" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_two_nodes_one_topic_two_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "Alex is tall", + "node_id": "B" + }, + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_one_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_three_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + "space": ["2", "3", "4", "5", "6", "7"], + "onions": ["2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["space"], + "data": "foobar", + "node_id": "1" + }, + { + "topics": ["onions"], + "data": "I am allergic", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], + "space": ["1", "2", "3", "4", "5", "6", "7"], + "onions": ["1", "2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["space"], + "data": "foobar", + "node_id": "4" + }, + { + "topics": ["onions"], + "data": "I am allergic", + "node_id": "7" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["3"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3"], + "school": ["1", "2", "3"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3", "4"], + "2": ["1", "3", "4"], + "3": ["1", "2", "4"], + "4": ["1", "2", "3"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2"], + "2": ["3"], + "3": ["4"], + "4": ["5"], + "5": ["1"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5"], + "school": ["1", "2", "3", "4", "5"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) diff --git a/tests/pubsub/test_mcache.py b/tests/pubsub/test_mcache.py new file mode 100644 index 00000000..5446f74c --- /dev/null +++ b/tests/pubsub/test_mcache.py @@ -0,0 +1,129 @@ +import pytest +from libp2p.pubsub.mcache import MessageCache + + +class Msg: + + def __init__(self, topicIDs, seqno, from_id): + self.topicIDs = topicIDs + self.seqno = seqno, + self.from_id = from_id + +@pytest.mark.asyncio +async def test_mcache(): + # Ported from: + # https://github.com/libp2p/go-libp2p-pubsub + # /blob/51b7501433411b5096cac2b4994a36a68515fc03/mcache_test.go + mcache = MessageCache(3, 5) + msgs = [] + + for i in range(60): + msgs.append(Msg(["test"], i, "test")) + + for i in range(10): + mcache.put(msgs[i]) + + for i in range(10): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + get_msg = mcache.get(mid) + + # successful read + assert get_msg == msg + + gids = mcache.window('test') + + assert len(gids) == 10 + + for i in range(10): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[i] + + mcache.shift() + + for i in range(10, 20): + mcache.put(msgs[i]) + + for i in range(20): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + get_msg = mcache.get(mid) + + assert get_msg == msg + + gids = mcache.window('test') + + assert len(gids) == 20 + + for i in range(10): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[10 + i] + + for i in range(10, 20): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[i - 10] + + mcache.shift() + + for i in range(20, 30): + mcache.put(msgs[i]) + + mcache.shift() + + for i in range(30, 40): + mcache.put(msgs[i]) + + mcache.shift() + + for i in range(40, 50): + mcache.put(msgs[i]) + + mcache.shift() + + for i in range(50, 60): + mcache.put(msgs[i]) + + assert len(mcache.msgs) == 50 + + for i in range(10): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + get_msg = mcache.get(mid) + + # Should be evicted from cache + assert not get_msg + + for i in range(10, 60): + msg = msgs[i] + mid = (msg.seqno, msg.from_id) + get_msg = mcache.get(mid) + + assert get_msg == msg + + gids = mcache.window('test') + + assert len(gids) == 30 + + for i in range(10): + msg = msgs[50 + i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[i] + + for i in range(10, 20): + msg = msgs[30 + i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[i] + + for i in range(20, 30): + msg = msgs[10 + i] + mid = (msg.seqno, msg.from_id) + + assert mid == gids[i] diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index 056baac0..66916613 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,6 +1,13 @@ +import asyncio +import multiaddr import uuid +import random import struct +from libp2p import new_node from libp2p.pubsub.pb import rpc_pb2 +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.gossipsub import GossipSub def message_id_generator(start_val): @@ -42,3 +49,76 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet.publish.extend([message]) return packet + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +async def create_libp2p_hosts(num_hosts): + """ + Create libp2p hosts + :param num_hosts: number of hosts to create + """ + hosts = [] + tasks_create = [] + for i in range(0, num_hosts): + # Create node + tasks_create.append(asyncio.ensure_future(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]))) + hosts = await asyncio.gather(*tasks_create) + + tasks_listen = [] + for node in hosts: + # Start listener + tasks_listen.append(asyncio.ensure_future(node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")))) + await asyncio.gather(*tasks_listen) + + return hosts + +def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, degree, degree_low, \ + degree_high, time_to_live, gossip_window, gossip_history, heartbeat_interval): + pubsubs = [] + gossipsubs = [] + for node in libp2p_hosts: + gossipsub = GossipSub(supported_protocols, degree, + degree_low, degree_high, time_to_live, + gossip_window, gossip_history, + heartbeat_interval) + pubsub = Pubsub(node, gossipsub, "a") + pubsubs.append(pubsub) + gossipsubs.append(gossipsub) + + return pubsubs, gossipsubs + +async def sparse_connect(hosts): + await connect_some(hosts, 3) + + +async def dense_connect(hosts): + await connect_some(hosts, 10) + + +async def connect_some(hosts, degree): + for i, host in enumerate(hosts): + for j, host2 in enumerate(hosts): + if i != j and i < j: + await connect(host, host2) + + # TODO: USE THE CODE BELOW + # for i, host in enumerate(hosts): + # j = 0 + # while j < degree: + # n = random.randint(0, len(hosts) - 1) + + # if n == i: + # j -= 1 + # continue + + # neighbor = hosts[n] + + # await connect(host, neighbor) + + # j += 1