From 5efdf4c7036cacce8a2c49fbc67534c721801845 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 3 Dec 2019 15:48:23 +0800 Subject: [PATCH] Group messages for peer in heartbeat --- libp2p/pubsub/gossipsub.py | 134 +++++++++++++++++++++++++++++++++---- 1 file changed, 120 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index cf48f53a..29e23ebb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -2,7 +2,7 @@ from ast import literal_eval import asyncio import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Set +from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -305,6 +305,75 @@ class GossipSub(IPubsubRouter): # Forget mesh[topic] del self.mesh[topic] + async def _emit_control_msgs( + self, + peers_to_graft: Dict[ID, List[str]], + peers_to_prune: Dict[ID, List[str]], + peers_to_gossip: Dict[ID, Dict[str, List[str]]], + ) -> None: + # Starting with GRAFT messages + for peer, topics in peers_to_graft.items(): + graft_msgs: List[rpc_pb2.ControlGraft] = [] + for topic in topics: + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() + graft_msg.topicID = topic + graft_msgs.append(graft_msg) + + # If there are also PRUNE messages to send to this peer + if peer in peers_to_prune: + prune_msgs: List[rpc_pb2.ControlPrune] = [] + for topic in peers_to_prune[peer]: + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() + prune_msg.topicID = topic + prune_msgs.append(prune_msg) + del peers_to_prune[peer] + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + ihave_msgs: List[rpc_pb2.ControlIHave] = [] + for topic in peers_to_gossip[peer]: + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, graft_msgs, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Next with PRUNE messages + for peer, topics in peers_to_prune.items(): + prune_msgs = [] + for topic in topics: + prune_msg = rpc_pb2.ControlPrune() + prune_msg.topicID = topic + prune_msgs.append(prune_msg) + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, None, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Fianlly IHAVE messages + for peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave() + ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) + ihave_msg.topicID = topic + ihave_msgs.append(ihave_msg) + + control_msg = self.pack_control_msgs(ihave_msgs, None, None) + await self.emit_control_message(control_msg, peer) + # Heartbeat async def heartbeat(self) -> None: """ @@ -316,15 +385,24 @@ class GossipSub(IPubsubRouter): # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 await asyncio.sleep(self.heartbeat_initial_delay) while True: + # Maintain mesh and keep track of which peers to send GRAFT or PRUNE to + peers_to_graft, peers_to_prune = self.mesh_heartbeat() + # Maintain fanout + self.fanout_heartbeat() + # Get the peers to send IHAVE to + peers_to_gossip = self.gossip_heartbeat() + # Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it + await self._emit_control_msgs( + peers_to_graft, peers_to_prune, peers_to_gossip + ) - await self.mesh_heartbeat() - await self.fanout_heartbeat() - await self.gossip_heartbeat() + self.mcache.shift() await asyncio.sleep(self.heartbeat_interval) - async def mesh_heartbeat(self) -> None: - # Note: the comments here are the exact pseudocode from the spec + def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]: + peers_to_graft: Dict[ID, List[str]] = {} + peers_to_prune: Dict[ID, List[str]] = {} for topic in self.mesh: # Skip if no peers have subscribed to the topic if topic not in self.pubsub.peer_topics: @@ -342,7 +420,10 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) # Emit GRAFT(topic) control message to peer - await self.emit_graft(topic, peer) + if peer not in peers_to_graft: + peers_to_graft[peer] = [topic] + else: + peers_to_graft[peer].append(topic) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] @@ -354,9 +435,13 @@ class GossipSub(IPubsubRouter): self.mesh[topic].remove(peer) # Emit PRUNE(topic) control message to peer - await self.emit_prune(topic, peer) + if peer not in peers_to_prune: + peers_to_prune[peer] = [topic] + else: + peers_to_prune[peer].append(topic) + return peers_to_graft, peers_to_prune - async def fanout_heartbeat(self) -> None: + def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: # Delete topic entry if it's not in `pubsub.peer_topics` @@ -388,7 +473,8 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - async def gossip_heartbeat(self) -> None: + def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]: + peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {} for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: @@ -401,7 +487,10 @@ class GossipSub(IPubsubRouter): msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - await self.emit_ihave(topic, msg_id_strs, peer) + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -417,9 +506,11 @@ class GossipSub(IPubsubRouter): ) msg_id_strs = [str(msg) for msg in msg_ids] for peer in peers_to_emit_ihave_to: - await self.emit_ihave(topic, msg_id_strs, peer) - - self.mcache.shift() + if peer not in peers_to_gossip: + peers_to_gossip[peer] = {topic: msg_id_strs} + else: + peers_to_gossip[peer][topic] = msg_id_strs + return peers_to_gossip @staticmethod def select_from_minus( @@ -554,6 +645,21 @@ class GossipSub(IPubsubRouter): # RPC emitters + def pack_control_msgs( + self, + ihave_msgs: List[rpc_pb2.ControlIHave], + graft_msgs: List[rpc_pb2.ControlGraft], + prune_msgs: List[rpc_pb2.ControlPrune], + ) -> rpc_pb2.ControlMessage: + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() + if ihave_msgs: + control_msg.ihave.extend(ihave_msgs) + if graft_msgs: + control_msg.graft.extend(graft_msgs) + if prune_msgs: + control_msg.prune.extend(prune_msgs) + return control_msg + async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: """Emit ihave message, sent to to_peer, for topic and msg_ids."""