mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Run black over repo
This commit is contained in:
@ -1,13 +1,6 @@
|
||||
from typing import (
|
||||
Iterable,
|
||||
List,
|
||||
Sequence,
|
||||
)
|
||||
from typing import Iterable, List, Sequence
|
||||
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
id_b58_decode,
|
||||
)
|
||||
from libp2p.peer.id import ID, id_b58_decode
|
||||
|
||||
from .pb import rpc_pb2
|
||||
from .pubsub import Pubsub
|
||||
@ -78,9 +71,7 @@ class FloodSub(IPubsubRouter):
|
||||
msg_forwarder=msg_forwarder,
|
||||
origin=ID(pubsub_msg.from_id),
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(
|
||||
publish=[pubsub_msg],
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
|
||||
for peer_id in peers_gen:
|
||||
stream = self.pubsub.peers[str(peer_id)]
|
||||
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
||||
@ -103,10 +94,8 @@ class FloodSub(IPubsubRouter):
|
||||
"""
|
||||
|
||||
def _get_peers_to_send(
|
||||
self,
|
||||
topic_ids: Iterable[str],
|
||||
msg_forwarder: ID,
|
||||
origin: ID) -> Iterable[ID]:
|
||||
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
||||
) -> Iterable[ID]:
|
||||
"""
|
||||
Get the eligible peers to send the data to.
|
||||
:param msg_forwarder: peer ID of the peer who forwards the message to us.
|
||||
|
||||
@ -1,19 +1,9 @@
|
||||
from ast import literal_eval
|
||||
import asyncio
|
||||
import random
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Set,
|
||||
Sequence,
|
||||
)
|
||||
from typing import Any, Dict, Iterable, List, Set, Sequence
|
||||
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
id_b58_decode,
|
||||
)
|
||||
from libp2p.peer.id import ID, id_b58_decode
|
||||
|
||||
from .mcache import MessageCache
|
||||
from .pb import rpc_pb2
|
||||
@ -45,24 +35,26 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
time_since_last_publish: Dict[str, int]
|
||||
|
||||
#FIXME: Should be changed to List[ID]
|
||||
# FIXME: Should be changed to List[ID]
|
||||
peers_gossipsub: List[str]
|
||||
#FIXME: Should be changed to List[ID]
|
||||
# FIXME: Should be changed to List[ID]
|
||||
peers_floodsub: List[str]
|
||||
|
||||
mcache: MessageCache
|
||||
|
||||
heartbeat_interval: int
|
||||
|
||||
def __init__(self,
|
||||
protocols: Sequence[str],
|
||||
degree: int,
|
||||
degree_low: int,
|
||||
degree_high: int,
|
||||
time_to_live: int,
|
||||
gossip_window: int = 3,
|
||||
gossip_history: int = 5,
|
||||
heartbeat_interval: int = 120) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
protocols: Sequence[str],
|
||||
degree: int,
|
||||
degree_low: int,
|
||||
degree_high: int,
|
||||
time_to_live: int,
|
||||
gossip_window: int = 3,
|
||||
gossip_history: int = 5,
|
||||
heartbeat_interval: int = 120,
|
||||
) -> None:
|
||||
# pylint: disable=too-many-arguments
|
||||
self.protocols = list(protocols)
|
||||
self.pubsub = None
|
||||
@ -181,9 +173,7 @@ class GossipSub(IPubsubRouter):
|
||||
msg_forwarder=msg_forwarder,
|
||||
origin=ID(pubsub_msg.from_id),
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(
|
||||
publish=[pubsub_msg],
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
|
||||
for peer_id in peers_gen:
|
||||
stream = self.pubsub.peers[str(peer_id)]
|
||||
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
||||
@ -192,10 +182,8 @@ class GossipSub(IPubsubRouter):
|
||||
await stream.write(rpc_msg.SerializeToString())
|
||||
|
||||
def _get_peers_to_send(
|
||||
self,
|
||||
topic_ids: Iterable[str],
|
||||
msg_forwarder: ID,
|
||||
origin: ID) -> Iterable[ID]:
|
||||
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
||||
) -> Iterable[ID]:
|
||||
"""
|
||||
Get the eligible peers to send the data to.
|
||||
:param msg_forwarder: the peer id of the peer who forwards the message to me.
|
||||
@ -231,9 +219,7 @@ class GossipSub(IPubsubRouter):
|
||||
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
|
||||
# If no peers in fanout, choose some peers from gossipsub peers in topic.
|
||||
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic,
|
||||
self.degree,
|
||||
[],
|
||||
topic, self.degree, []
|
||||
)
|
||||
in_topic_gossipsub_peers = self.fanout[topic]
|
||||
for peer_id_str in in_topic_gossipsub_peers:
|
||||
@ -264,9 +250,7 @@ class GossipSub(IPubsubRouter):
|
||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
||||
if topic in self.pubsub.peer_topics:
|
||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic,
|
||||
self.degree - fanout_size,
|
||||
fanout_peers,
|
||||
topic, self.degree - fanout_size, fanout_peers
|
||||
)
|
||||
# Combine fanout peers with selected peers
|
||||
fanout_peers += selected_peers
|
||||
@ -308,11 +292,13 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
# FIXME: type of `peers` should be changed to `List[ID]`
|
||||
# FIXME: type of `msg_sender` and `origin_id` should be changed to `ID`
|
||||
async def deliver_messages_to_peers(self,
|
||||
peers: List[str],
|
||||
msg_sender: str,
|
||||
origin_id: str,
|
||||
serialized_packet: bytes) -> None:
|
||||
async def deliver_messages_to_peers(
|
||||
self,
|
||||
peers: List[str],
|
||||
msg_sender: str,
|
||||
origin_id: str,
|
||||
serialized_packet: bytes,
|
||||
) -> None:
|
||||
for peer_id_in_topic in peers:
|
||||
# Forward to all peers that are not the
|
||||
# message sender and are not the message origin
|
||||
@ -349,16 +335,12 @@ class GossipSub(IPubsubRouter):
|
||||
if num_mesh_peers_in_topic < self.degree_low:
|
||||
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
|
||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic,
|
||||
self.degree - num_mesh_peers_in_topic,
|
||||
self.mesh[topic],
|
||||
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
|
||||
)
|
||||
|
||||
# FIXME: Should be changed to `List[ID]`
|
||||
fanout_peers_not_in_mesh: List[str] = [
|
||||
peer
|
||||
for peer in selected_peers
|
||||
if peer not in self.mesh[topic]
|
||||
peer for peer in selected_peers if peer not in self.mesh[topic]
|
||||
]
|
||||
for peer in fanout_peers_not_in_mesh:
|
||||
# Add peer to mesh[topic]
|
||||
@ -371,9 +353,7 @@ class GossipSub(IPubsubRouter):
|
||||
# Select |mesh[topic]| - D peers from mesh[topic]
|
||||
# FIXME: Should be changed to `List[ID]`
|
||||
selected_peers = GossipSub.select_from_minus(
|
||||
num_mesh_peers_in_topic - self.degree,
|
||||
self.mesh[topic],
|
||||
[],
|
||||
num_mesh_peers_in_topic - self.degree, self.mesh[topic], []
|
||||
)
|
||||
for peer in selected_peers:
|
||||
# Remove peer from mesh[topic]
|
||||
@ -415,15 +395,16 @@ class GossipSub(IPubsubRouter):
|
||||
if topic in self.pubsub.peer_topics:
|
||||
# Select D peers from peers.gossipsub[topic]
|
||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic,
|
||||
self.degree,
|
||||
[],
|
||||
topic, self.degree, []
|
||||
)
|
||||
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
# TODO: this line is a monster, can hopefully be simplified
|
||||
if (topic not in self.mesh or (peer not in self.mesh[topic]))\
|
||||
and (topic not in self.fanout or (peer not in self.fanout[topic])):
|
||||
if (
|
||||
topic not in self.mesh or (peer not in self.mesh[topic])
|
||||
) and (
|
||||
topic not in self.fanout or (peer not in self.fanout[topic])
|
||||
):
|
||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||
|
||||
@ -438,12 +419,13 @@ class GossipSub(IPubsubRouter):
|
||||
if topic in self.pubsub.peer_topics:
|
||||
# Select D peers from peers.gossipsub[topic]
|
||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic,
|
||||
self.degree,
|
||||
[],
|
||||
topic, self.degree, []
|
||||
)
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
|
||||
if (
|
||||
peer not in self.mesh[topic]
|
||||
and peer not in self.fanout[topic]
|
||||
):
|
||||
|
||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||
@ -451,9 +433,9 @@ class GossipSub(IPubsubRouter):
|
||||
self.mcache.shift()
|
||||
|
||||
@staticmethod
|
||||
def select_from_minus(num_to_select: int,
|
||||
pool: Sequence[Any],
|
||||
minus: Sequence[Any]) -> List[Any]:
|
||||
def select_from_minus(
|
||||
num_to_select: int, pool: Sequence[Any], minus: Sequence[Any]
|
||||
) -> List[Any]:
|
||||
"""
|
||||
Select at most num_to_select subset of elements from the set (pool - minus) randomly.
|
||||
:param num_to_select: number of elements to randomly select
|
||||
@ -482,24 +464,22 @@ class GossipSub(IPubsubRouter):
|
||||
# FIXME: type of `minus` should be changed to type `Sequence[ID]`
|
||||
# FIXME: return type should be changed to type `List[ID]`
|
||||
def _get_in_topic_gossipsub_peers_from_minus(
|
||||
self,
|
||||
topic: str,
|
||||
num_to_select: int,
|
||||
minus: Sequence[str]) -> List[str]:
|
||||
self, topic: str, num_to_select: int, minus: Sequence[str]
|
||||
) -> List[str]:
|
||||
gossipsub_peers_in_topic = [
|
||||
peer_str
|
||||
for peer_str in self.pubsub.peer_topics[topic]
|
||||
if peer_str in self.peers_gossipsub
|
||||
]
|
||||
return self.select_from_minus(
|
||||
num_to_select,
|
||||
gossipsub_peers_in_topic,
|
||||
list(minus),
|
||||
num_to_select, gossipsub_peers_in_topic, list(minus)
|
||||
)
|
||||
|
||||
# RPC handlers
|
||||
|
||||
async def handle_ihave(self, ihave_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
|
||||
async def handle_ihave(
|
||||
self, ihave_msg: rpc_pb2.Message, sender_peer_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Checks the seen set and requests unknown messages with an IWANT message.
|
||||
"""
|
||||
@ -509,8 +489,7 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
|
||||
seen_seqnos_and_peers = [
|
||||
seqno_and_from
|
||||
for seqno_and_from in self.pubsub.seen_messages.keys()
|
||||
seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys()
|
||||
]
|
||||
|
||||
# Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list
|
||||
@ -526,7 +505,9 @@ class GossipSub(IPubsubRouter):
|
||||
if msg_ids_wanted:
|
||||
await self.emit_iwant(msg_ids_wanted, from_id_str)
|
||||
|
||||
async def handle_iwant(self, iwant_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
|
||||
async def handle_iwant(
|
||||
self, iwant_msg: rpc_pb2.Message, sender_peer_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Forwards all request messages that are present in mcache to the requesting peer.
|
||||
"""
|
||||
@ -564,7 +545,9 @@ class GossipSub(IPubsubRouter):
|
||||
# 4) And write the packet to the stream
|
||||
await peer_stream.write(rpc_msg)
|
||||
|
||||
async def handle_graft(self, graft_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
|
||||
async def handle_graft(
|
||||
self, graft_msg: rpc_pb2.Message, sender_peer_id: str
|
||||
) -> None:
|
||||
topic: str = graft_msg.topicID
|
||||
|
||||
from_id_str = sender_peer_id
|
||||
@ -577,7 +560,9 @@ class GossipSub(IPubsubRouter):
|
||||
# Respond with PRUNE if not subscribed to the topic
|
||||
await self.emit_prune(topic, sender_peer_id)
|
||||
|
||||
async def handle_prune(self, prune_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
|
||||
async def handle_prune(
|
||||
self, prune_msg: rpc_pb2.Message, sender_peer_id: str
|
||||
) -> None:
|
||||
topic: str = prune_msg.topicID
|
||||
|
||||
from_id_str = sender_peer_id
|
||||
@ -641,7 +626,9 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
await self.emit_control_message(control_msg, to_peer)
|
||||
|
||||
async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: str) -> None:
|
||||
async def emit_control_message(
|
||||
self, control_msg: rpc_pb2.ControlMessage, to_peer: str
|
||||
) -> None:
|
||||
# Add control message to packet
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.control.CopyFrom(control_msg)
|
||||
|
||||
@ -1,10 +1,4 @@
|
||||
from typing import (
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
)
|
||||
from typing import Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
from .pb import rpc_pb2
|
||||
|
||||
@ -18,6 +12,7 @@ class CacheEntry:
|
||||
"""
|
||||
A logical representation of an entry in the mcache's _history_.
|
||||
"""
|
||||
|
||||
def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None:
|
||||
"""
|
||||
Constructor.
|
||||
@ -30,7 +25,6 @@ class CacheEntry:
|
||||
|
||||
class MessageCache:
|
||||
|
||||
|
||||
window_size: int
|
||||
history_size: int
|
||||
|
||||
@ -53,10 +47,7 @@ class MessageCache:
|
||||
|
||||
# max length of history_size. each item is a list of CacheEntry.
|
||||
# messages lost upon shift().
|
||||
self.history = [
|
||||
[]
|
||||
for _ in range(history_size)
|
||||
]
|
||||
self.history = [[] for _ in range(history_size)]
|
||||
|
||||
def put(self, msg: rpc_pb2.Message) -> None:
|
||||
"""
|
||||
|
||||
@ -1,13 +1,7 @@
|
||||
# pylint: disable=no-name-in-module
|
||||
import asyncio
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Tuple,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from typing import Any, Dict, List, Tuple, TYPE_CHECKING
|
||||
|
||||
from lru import LRU
|
||||
|
||||
@ -34,18 +28,18 @@ class Pubsub:
|
||||
host: IHost
|
||||
my_id: ID
|
||||
|
||||
router: 'IPubsubRouter'
|
||||
router: "IPubsubRouter"
|
||||
|
||||
peer_queue: 'asyncio.Queue[ID]'
|
||||
peer_queue: "asyncio.Queue[ID]"
|
||||
|
||||
protocols: List[str]
|
||||
|
||||
incoming_msgs_from_peers: 'asyncio.Queue[rpc_pb2.Message]'
|
||||
outgoing_messages: 'asyncio.Queue[rpc_pb2.Message]'
|
||||
incoming_msgs_from_peers: "asyncio.Queue[rpc_pb2.Message]"
|
||||
outgoing_messages: "asyncio.Queue[rpc_pb2.Message]"
|
||||
|
||||
seen_messages: LRU
|
||||
|
||||
my_topics: Dict[str, 'asyncio.Queue[rpc_pb2.Message]']
|
||||
my_topics: Dict[str, "asyncio.Queue[rpc_pb2.Message]"]
|
||||
|
||||
# FIXME: Should be changed to `Dict[str, List[ID]]`
|
||||
peer_topics: Dict[str, List[str]]
|
||||
@ -55,11 +49,9 @@ class Pubsub:
|
||||
# NOTE: Be sure it is increased atomically everytime.
|
||||
counter: int # uint64
|
||||
|
||||
def __init__(self,
|
||||
host: IHost,
|
||||
router: 'IPubsubRouter',
|
||||
my_id: ID,
|
||||
cache_size: int = None) -> None:
|
||||
def __init__(
|
||||
self, host: IHost, router: "IPubsubRouter", my_id: ID, cache_size: int = None
|
||||
) -> None:
|
||||
"""
|
||||
Construct a new Pubsub object, which is responsible for handling all
|
||||
Pubsub-related messages and relaying messages as appropriate to the
|
||||
@ -120,12 +112,9 @@ class Pubsub:
|
||||
"""
|
||||
packet = rpc_pb2.RPC()
|
||||
for topic_id in self.my_topics:
|
||||
packet.subscriptions.extend([
|
||||
rpc_pb2.RPC.SubOpts(
|
||||
subscribe=True,
|
||||
topicid=topic_id,
|
||||
)
|
||||
])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
||||
)
|
||||
return packet.SerializeToString()
|
||||
|
||||
async def continuously_read_stream(self, stream: INetStream) -> None:
|
||||
@ -262,7 +251,7 @@ class Pubsub:
|
||||
# for each topic
|
||||
await self.my_topics[topic].put(publish_message)
|
||||
|
||||
async def subscribe(self, topic_id: str) -> 'asyncio.Queue[rpc_pb2.Message]':
|
||||
async def subscribe(self, topic_id: str) -> "asyncio.Queue[rpc_pb2.Message]":
|
||||
"""
|
||||
Subscribe ourself to a topic
|
||||
:param topic_id: topic_id to subscribe to
|
||||
@ -277,10 +266,9 @@ class Pubsub:
|
||||
|
||||
# Create subscribe message
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
|
||||
subscribe=True,
|
||||
topicid=topic_id.encode('utf-8')
|
||||
)])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id.encode("utf-8"))]
|
||||
)
|
||||
|
||||
# Send out subscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
@ -305,10 +293,9 @@ class Pubsub:
|
||||
|
||||
# Create unsubscribe message
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
|
||||
subscribe=False,
|
||||
topicid=topic_id.encode('utf-8')
|
||||
)])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id.encode("utf-8"))]
|
||||
)
|
||||
|
||||
# Send out unsubscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
@ -371,7 +358,7 @@ class Pubsub:
|
||||
Make the next message sequence id.
|
||||
"""
|
||||
self.counter += 1
|
||||
return self.counter.to_bytes(8, 'big')
|
||||
return self.counter.to_bytes(8, "big")
|
||||
|
||||
def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool:
|
||||
msg_id = get_msg_id(msg)
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
@ -18,9 +16,9 @@ if TYPE_CHECKING:
|
||||
class PubsubNotifee(INotifee):
|
||||
# pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object
|
||||
|
||||
initiator_peers_queue: 'asyncio.Queue[ID]'
|
||||
initiator_peers_queue: "asyncio.Queue[ID]"
|
||||
|
||||
def __init__(self, initiator_peers_queue: 'asyncio.Queue[ID]') -> None:
|
||||
def __init__(self, initiator_peers_queue: "asyncio.Queue[ID]") -> None:
|
||||
"""
|
||||
:param initiator_peers_queue: queue to add new peers to so that pubsub
|
||||
can process new peers after we connect to them
|
||||
|
||||
@ -1,8 +1,5 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import (
|
||||
List,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from typing import List, TYPE_CHECKING
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
|
||||
@ -11,8 +8,8 @@ from .pb import rpc_pb2
|
||||
if TYPE_CHECKING:
|
||||
from .pubsub import Pubsub
|
||||
|
||||
class IPubsubRouter(ABC):
|
||||
|
||||
class IPubsubRouter(ABC):
|
||||
@abstractmethod
|
||||
def get_protocols(self) -> List[str]:
|
||||
"""
|
||||
@ -20,7 +17,7 @@ class IPubsubRouter(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def attach(self, pubsub: 'Pubsub') -> None:
|
||||
def attach(self, pubsub: "Pubsub") -> None:
|
||||
"""
|
||||
Attach is invoked by the PubSub constructor to attach the router to a
|
||||
freshly initialized PubSub instance.
|
||||
|
||||
Reference in New Issue
Block a user