diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 7bb40cee..18676b3d 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -248,7 +248,7 @@ class Swarm(INetwork): # TODO: Should be changed to close multisple connections, # if we have several connections per peer in the future. connection = self.connections[peer_id] - # NOTE: `connection.close` will perform `del self.connections[peer_id]` + # NOTE: `connection.close` will delete `peer_id` from `self.connections` # and `notify_disconnected` for us. await connection.close() @@ -270,11 +270,9 @@ class Swarm(INetwork): """Simply remove the connection from Swarm's records, without closing the connection.""" peer_id = swarm_conn.muxed_conn.peer_id - if peer_id not in self.connections: - return # TODO: Should be changed to remove the exact connection, # if we have several connections per peer in the future. - del self.connections[peer_id] + self.connections.pop(peer_id, None) # Notifee diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 93faebdd..e9d0f355 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -144,8 +144,7 @@ class GossipSub(IPubsubRouter): elif peer_id in self.peers_floodsub: self.peers_floodsub.remove(peer_id) - if peer_id in self.peers_to_protocol: - del self.peers_to_protocol[peer_id] + self.peers_to_protocol.pop(peer_id, None) async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ @@ -274,8 +273,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) await self.emit_graft(topic, peer) - if topic_in_fanout: - del self.fanout[topic] + self.fanout.pop(topic, None) async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec @@ -294,7 +292,7 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - del self.mesh[topic] + self.mesh.pop(topic, None) # Heartbeat async def heartbeat(self) -> None: @@ -355,8 +353,8 @@ class GossipSub(IPubsubRouter): # TODO: there's no way time_since_last_publish gets set anywhere yet if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout - del self.fanout[topic] - del self.time_since_last_publish[topic] + self.fanout.pop(topic, None) + self.time_since_last_publish.pop(topic, None) else: num_fanout_peers_in_topic = len(self.fanout[topic]) diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index b17f8679..c8489123 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -96,8 +96,7 @@ class MessageCache: last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: - if entry.mid in self.msgs: - del self.msgs[entry.mid] + self.msgs.pop(entry.mid) i: int = len(self.history) - 2 diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3834eb4b..ba1994ea 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -232,8 +232,7 @@ class Pubsub: :param topic: the topic to remove validator from """ - if topic in self.topic_validators: - del self.topic_validators[topic] + self.topic_validators.pop(topic, None) def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: """ @@ -283,7 +282,7 @@ class Pubsub: await stream.write(encode_varint_prefixed(hello.SerializeToString())) except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) - del self.peers[peer_id] + self.peers.pop(peer_id, None) return # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. @@ -291,7 +290,7 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) - del self.peers[peer_id] + self.peers.pop(peer_id, None) return logger.debug("added new peer %s", peer_id) @@ -299,7 +298,7 @@ class Pubsub: def _handle_dead_peer(self, peer_id: ID) -> None: if peer_id not in self.peers: return - del self.peers[peer_id] + self.peers.pop(peer_id, None) for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: @@ -411,7 +410,7 @@ class Pubsub: if topic_id not in self.my_topics: return # Remove topic_id from map if present - del self.my_topics[topic_id] + self.my_topics.pop(topic_id, None) # Create unsubscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 52c957c1..0507a524 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -50,8 +50,7 @@ class SecurityMultistream(ABC): :param transport: the corresponding transportation to the ``protocol``. """ # If protocol is already added before, remove it and add it again. - if protocol in self.transports: - del self.transports[protocol] + self.transports.pop(protocol, None) self.transports[protocol] = transport # Note: None is added as the handler for the given protocol since # we only care about selecting the protocol, not any handler function diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 1a43c7cb..f70cae20 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -297,8 +297,7 @@ class Mplex(IMuxedConn): # the entry of this stream, to avoid others from accessing it. if is_local_closed: async with self.streams_lock: - if stream_id in self.streams: - del self.streams[stream_id] + self.streams.pop(stream_id, None) async def _handle_reset(self, stream_id: StreamID) -> None: async with self.streams_lock: @@ -316,8 +315,7 @@ class Mplex(IMuxedConn): if not stream.event_local_closed.is_set(): stream.event_local_closed.set() async with self.streams_lock: - if stream_id in self.streams: - del self.streams[stream_id] + self.streams.pop(stream_id, None) async def _cleanup(self) -> None: if not self.event_shutting_down.is_set(): diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index f080d3cf..7630c964 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -180,8 +180,7 @@ class MplexStream(IMuxedStream): if _is_remote_closed: # Both sides are closed, we can safely remove the buffer from the dict. async with self.muxed_conn.streams_lock: - if self.stream_id in self.muxed_conn.streams: - del self.muxed_conn.streams[self.stream_id] + self.muxed_conn.streams.pop(self.stream_id, None) async def reset(self) -> None: """closes both ends of the stream tells this remote side to hang up.""" @@ -208,11 +207,8 @@ class MplexStream(IMuxedStream): self.event_remote_closed.set() async with self.muxed_conn.streams_lock: - if ( - self.muxed_conn.streams is not None - and self.stream_id in self.muxed_conn.streams - ): - del self.muxed_conn.streams[self.stream_id] + if self.muxed_conn.streams is not None: + self.muxed_conn.streams.pop(self.stream_id, None) # TODO deadline not in use def set_deadline(self, ttl: int) -> bool: diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index f82cd19d..d83869f0 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -44,8 +44,7 @@ class MuxerMultistream: :param transport: the corresponding transportation to the ``protocol``. """ # If protocol is already added before, remove it and add it again. - if protocol in self.transports: - del self.transports[protocol] + self.transports.pop(protocol, None) self.transports[protocol] = transport self.multiselect.add_handler(protocol, None)