run lint with pyupgrade at py39-plus

This commit is contained in:
pacrob
2025-01-25 15:31:51 -07:00
committed by Paul Robinson
parent 20580b9a4e
commit 8787613e91
44 changed files with 221 additions and 240 deletions

View File

@ -4,17 +4,15 @@ from ast import (
from collections import (
defaultdict,
)
from collections.abc import (
Iterable,
Sequence,
)
import logging
import random
from typing import (
Any,
DefaultDict,
Dict,
Iterable,
List,
Sequence,
Set,
Tuple,
)
import trio
@ -60,7 +58,7 @@ logger = logging.getLogger("libp2p.pubsub.gossipsub")
class GossipSub(IPubsubRouter, Service):
protocols: List[TProtocol]
protocols: list[TProtocol]
pubsub: Pubsub
degree: int
@ -69,11 +67,11 @@ class GossipSub(IPubsubRouter, Service):
time_to_live: int
mesh: Dict[str, Set[ID]]
fanout: Dict[str, Set[ID]]
mesh: dict[str, set[ID]]
fanout: dict[str, set[ID]]
# The protocol peer supports
peer_protocol: Dict[ID, TProtocol]
peer_protocol: dict[ID, TProtocol]
# TODO: Add `time_since_last_publish`
# Create topic --> time since last publish map.
@ -128,7 +126,7 @@ class GossipSub(IPubsubRouter, Service):
# Interface functions
def get_protocols(self) -> List[TProtocol]:
def get_protocols(self) -> list[TProtocol]:
"""
:return: the list of protocols supported by the router
"""
@ -237,13 +235,13 @@ class GossipSub(IPubsubRouter, Service):
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
send_to: Set[ID] = set()
send_to: set[ID] = set()
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
continue
# floodsub peers
floodsub_peers: Set[ID] = {
floodsub_peers: set[ID] = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
@ -251,7 +249,7 @@ class GossipSub(IPubsubRouter, Service):
send_to.update(floodsub_peers)
# gossipsub peers
gossipsub_peers: Set[ID] = set()
gossipsub_peers: set[ID] = set()
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
else:
@ -259,7 +257,7 @@ class GossipSub(IPubsubRouter, Service):
# pick `self.degree` number of peers who have subscribed to the topic
# and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: Set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_size = len(fanout_peers)
if not topic_in_fanout or (
topic_in_fanout and fanout_size < self.degree
@ -292,7 +290,7 @@ class GossipSub(IPubsubRouter, Service):
self.mesh[topic] = set()
topic_in_fanout: bool = topic in self.fanout
fanout_peers: Set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_size = len(fanout_peers)
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
# There are less than D peers (let this number be x)
@ -333,13 +331,13 @@ class GossipSub(IPubsubRouter, Service):
async def _emit_control_msgs(
self,
peers_to_graft: Dict[ID, List[str]],
peers_to_prune: Dict[ID, List[str]],
peers_to_gossip: Dict[ID, Dict[str, List[str]]],
peers_to_graft: dict[ID, list[str]],
peers_to_prune: dict[ID, list[str]],
peers_to_gossip: dict[ID, dict[str, list[str]]],
) -> None:
graft_msgs: List[rpc_pb2.ControlGraft] = []
prune_msgs: List[rpc_pb2.ControlPrune] = []
ihave_msgs: List[rpc_pb2.ControlIHave] = []
graft_msgs: list[rpc_pb2.ControlGraft] = []
prune_msgs: list[rpc_pb2.ControlPrune] = []
ihave_msgs: list[rpc_pb2.ControlIHave] = []
# Starting with GRAFT messages
for peer, topics in peers_to_graft.items():
for topic in topics:
@ -428,9 +426,9 @@ class GossipSub(IPubsubRouter, Service):
def mesh_heartbeat(
self,
) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]:
peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list)
) -> tuple[DefaultDict[ID, list[str]], DefaultDict[ID, list[str]]]:
peers_to_graft: DefaultDict[ID, list[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, list[str]] = defaultdict(list)
for topic in self.mesh:
# Skip if no peers have subscribed to the topic
if topic not in self.pubsub.peer_topics:
@ -493,8 +491,8 @@ class GossipSub(IPubsubRouter, Service):
# Add the peers to fanout[topic]
self.fanout[topic].update(selected_peers)
def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]:
peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict)
def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]:
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict)
for topic in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
@ -534,7 +532,7 @@ class GossipSub(IPubsubRouter, Service):
@staticmethod
def select_from_minus(
num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]
) -> List[Any]:
) -> list[Any]:
"""
Select at most num_to_select subset of elements from the set
(pool - minus) randomly.
@ -546,7 +544,7 @@ class GossipSub(IPubsubRouter, Service):
# Create selection pool, which is selection_pool = pool - minus
if minus:
# Create a new selection pool by removing elements of minus
selection_pool: List[Any] = [x for x in pool if x not in minus]
selection_pool: list[Any] = [x for x in pool if x not in minus]
else:
# Don't create a new selection_pool if we are not subbing anything
selection_pool = list(pool)
@ -558,13 +556,13 @@ class GossipSub(IPubsubRouter, Service):
return selection_pool
# Random selection
selection: List[Any] = random.sample(selection_pool, num_to_select)
selection: list[Any] = random.sample(selection_pool, num_to_select)
return selection
def _get_in_topic_gossipsub_peers_from_minus(
self, topic: str, num_to_select: int, minus: Iterable[ID]
) -> List[ID]:
) -> list[ID]:
gossipsub_peers_in_topic = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
@ -587,7 +585,7 @@ class GossipSub(IPubsubRouter, Service):
# Add all unknown message ids (ids that appear in ihave_msg but not in
# seen_seqnos) to list of messages we want to request
# FIXME: Update type of message ID
msg_ids_wanted: List[Any] = [
msg_ids_wanted: list[Any] = [
msg_id
for msg_id in ihave_msg.messageIDs
if literal_eval(msg_id) not in seen_seqnos_and_peers
@ -606,8 +604,8 @@ class GossipSub(IPubsubRouter, Service):
"""
# FIXME: Update type of message ID
# FIXME: Find a better way to parse the msg ids
msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
msgs_to_forward: List[rpc_pb2.Message] = []
msg_ids: list[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
msgs_to_forward: list[rpc_pb2.Message] = []
for msg_id_iwant in msg_ids:
# Check if the wanted message ID is present in mcache
msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant)
@ -674,9 +672,9 @@ class GossipSub(IPubsubRouter, Service):
def pack_control_msgs(
self,
ihave_msgs: List[rpc_pb2.ControlIHave],
graft_msgs: List[rpc_pb2.ControlGraft],
prune_msgs: List[rpc_pb2.ControlPrune],
ihave_msgs: list[rpc_pb2.ControlIHave],
graft_msgs: list[rpc_pb2.ControlGraft],
prune_msgs: list[rpc_pb2.ControlPrune],
) -> rpc_pb2.ControlMessage:
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
if ihave_msgs: