From 2d3bfc8184e892ef51967c60b73402cac0ccb873 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 6 Dec 2019 23:42:31 +0800 Subject: [PATCH] Apply PR feedback: use defaultdict and init control message --- libp2p/pubsub/gossipsub.py | 64 ++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 208d2878..b8b0e624 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,8 +1,9 @@ from ast import literal_eval import asyncio +from collections import defaultdict import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Tuple +from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -319,24 +320,24 @@ class GossipSub(IPubsubRouter): # Starting with GRAFT messages for peer, topics in peers_to_graft.items(): for topic in topics: - graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() - graft_msg.topicID = topic + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft(topicID=topic) graft_msgs.append(graft_msg) # If there are also PRUNE messages to send to this peer if peer in peers_to_prune: for topic in peers_to_prune[peer]: - prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() - prune_msg.topicID = topic + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune( + topicID=topic + ) prune_msgs.append(prune_msg) del peers_to_prune[peer] # If there are also IHAVE messages to send to this peer if peer in peers_to_gossip: for topic in peers_to_gossip[peer]: - ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) del peers_to_gossip[peer] @@ -347,17 +348,16 @@ class GossipSub(IPubsubRouter): for peer, topics in peers_to_prune.items(): prune_msgs = [] for topic in topics: - prune_msg = rpc_pb2.ControlPrune() - prune_msg.topicID = topic + prune_msg = rpc_pb2.ControlPrune(topicID=topic) prune_msgs.append(prune_msg) # If there are also IHAVE messages to send to this peer if peer in peers_to_gossip: ihave_msgs = [] for topic in peers_to_gossip[peer]: - ihave_msg = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) del peers_to_gossip[peer] @@ -368,9 +368,9 @@ class GossipSub(IPubsubRouter): for peer in peers_to_gossip: ihave_msgs = [] for topic in peers_to_gossip[peer]: - ihave_msg = rpc_pb2.ControlIHave() - ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic]) - ihave_msg.topicID = topic + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) ihave_msgs.append(ihave_msg) control_msg = self.pack_control_msgs(ihave_msgs, None, None) @@ -402,9 +402,11 @@ class GossipSub(IPubsubRouter): await asyncio.sleep(self.heartbeat_interval) - def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]: - peers_to_graft: Dict[ID, List[str]] = {} - peers_to_prune: Dict[ID, List[str]] = {} + def mesh_heartbeat( + self + ) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]: + peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list) + peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list) for topic in self.mesh: # Skip if no peers have subscribed to the topic if topic not in self.pubsub.peer_topics: @@ -422,10 +424,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) # Emit GRAFT(topic) control message to peer - if peer not in peers_to_graft: - peers_to_graft[peer] = [topic] - else: - peers_to_graft[peer].append(topic) + peers_to_graft[peer].append(topic) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] @@ -437,10 +436,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].remove(peer) # Emit PRUNE(topic) control message to peer - if peer not in peers_to_prune: - peers_to_prune[peer] = [topic] - else: - peers_to_prune[peer].append(topic) + peers_to_prune[peer].append(topic) return peers_to_graft, peers_to_prune def fanout_heartbeat(self) -> None: @@ -478,8 +474,8 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]: - peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {} + def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]: + peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict) for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: @@ -492,10 +488,7 @@ class GossipSub(IPubsubRouter): msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + peers_to_gossip[peer][topic] = msg_id_strs # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -510,10 +503,7 @@ class GossipSub(IPubsubRouter): ) msg_id_strs = [str(msg) for msg in msg_ids] for peer in peers_to_emit_ihave_to: - if peer not in peers_to_gossip: - peers_to_gossip[peer] = {topic: msg_id_strs} - else: - peers_to_gossip[peer][topic] = msg_id_strs + peers_to_gossip[peer][topic] = msg_id_strs return peers_to_gossip @staticmethod