run lint and fix errors, except mypy

This commit is contained in:
pacrob
2024-02-19 15:56:20 -07:00
parent 42605c0288
commit 94483714a3
171 changed files with 4809 additions and 2290 deletions

View File

@ -1,4 +1,7 @@
from abc import ABC, abstractmethod
from abc import (
ABC,
abstractmethod,
)
from typing import (
TYPE_CHECKING,
AsyncContextManager,
@ -8,13 +11,23 @@ from typing import (
Tuple,
)
from async_service import ServiceAPI
from async_service import (
ServiceAPI,
)
from libp2p.peer.id import ID
from libp2p.typing import TProtocol
from libp2p.peer.id import (
ID,
)
from libp2p.typing import (
TProtocol,
)
from .pb import rpc_pb2
from .typing import ValidatorFn
from .pb import (
rpc_pb2,
)
from .typing import (
ValidatorFn,
)
if TYPE_CHECKING:
from .pubsub import Pubsub # noqa: F401
@ -69,9 +82,9 @@ class IPubsubRouter(ABC):
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
TODO: Check if this interface is ok. It's not the exact same as the go code, but the go
code is really confusing with the msg origin, they specify `rpc.from` even when the rpc
shouldn't have a from
TODO: Check if this interface is ok. It's not the exact same as the go code, but
the go code is really confusing with the msg origin, they specify `rpc.from`
even when the rpc shouldn't have a from
:param rpc: rpc message
"""

View File

@ -1,4 +1,6 @@
from libp2p.exceptions import BaseLibp2pError
from libp2p.exceptions import (
BaseLibp2pError,
)
class PubsubRouterError(BaseLibp2pError):

View File

@ -1,16 +1,34 @@
import logging
from typing import Iterable, List, Sequence
from typing import (
Iterable,
List,
Sequence,
)
import trio
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID
from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
from libp2p.typing import (
TProtocol,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .abc import IPubsubRouter
from .pb import rpc_pb2
from .pubsub import Pubsub
from .abc import (
IPubsubRouter,
)
from .pb import (
rpc_pb2,
)
from .pubsub import (
Pubsub,
)
PROTOCOL_ID = TProtocol("/floodsub/1.0.0")
@ -18,7 +36,6 @@ logger = logging.getLogger("libp2p.pubsub.floodsub")
class FloodSub(IPubsubRouter):
protocols: List[TProtocol]
pubsub: Pubsub
@ -80,7 +97,6 @@ class FloodSub(IPubsubRouter):
:param msg_forwarder: peer ID of the peer who forwards the message to us
:param pubsub_msg: pubsub message in protobuf.
"""
peers_gen = set(
self._get_peers_to_send(
pubsub_msg.topicIDs,

View File

@ -1,23 +1,58 @@
from ast import literal_eval
from collections import defaultdict
from ast import (
literal_eval,
)
from collections import (
defaultdict,
)
import logging
import random
from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Set, Tuple
from typing import (
Any,
DefaultDict,
Dict,
Iterable,
List,
Sequence,
Set,
Tuple,
)
from async_service import Service
from async_service import (
Service,
)
import trio
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID
from libp2p.pubsub import floodsub
from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
from libp2p.pubsub import (
floodsub,
)
from libp2p.typing import (
TProtocol,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .abc import IPubsubRouter
from .exceptions import NoPubsubAttached
from .mcache import MessageCache
from .pb import rpc_pb2
from .pubsub import Pubsub
from .abc import (
IPubsubRouter,
)
from .exceptions import (
NoPubsubAttached,
)
from .mcache import (
MessageCache,
)
from .pb import (
rpc_pb2,
)
from .pubsub import (
Pubsub,
)
PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
@ -120,10 +155,10 @@ class GossipSub(IPubsubRouter, Service):
logger.debug("adding peer %s with protocol %s", peer_id, protocol_id)
if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID):
# We should never enter here. Becuase the `protocol_id` is registered by your pubsub
# instance in multistream-select, but it is not the protocol that gossipsub supports.
# In this case, probably we registered gossipsub to a wrong `protocol_id`
# in multistream-select, or wrong versions.
# We should never enter here. Becuase the `protocol_id` is registered by
# your pubsub instance in multistream-select, but it is not the protocol
# that gossipsub supports. In this case, probably we registered gossipsub
# to a wrong `protocol_id` in multistream-select, or wrong versions.
raise ValueError(f"Protocol={protocol_id} is not supported.")
self.peer_protocol[peer_id] = protocol_id
@ -208,11 +243,11 @@ class GossipSub(IPubsubRouter, Service):
continue
# floodsub peers
floodsub_peers: Set[ID] = set(
floodsub_peers: Set[ID] = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
)
}
send_to.update(floodsub_peers)
# gossipsub peers
@ -220,9 +255,9 @@ class GossipSub(IPubsubRouter, Service):
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
else:
# When we publish to a topic that we have not subscribe to, we randomly pick
# `self.degree` number of peers who have subscribed to the topic and add them
# as our `fanout` peers.
# When we publish to a topic that we have not subscribe to, we randomly
# pick `self.degree` number of peers who have subscribed to the topic
# and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: Set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_size = len(fanout_peers)
@ -270,7 +305,7 @@ class GossipSub(IPubsubRouter, Service):
# Combine fanout peers with selected peers
fanout_peers.update(selected_peers)
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message
for peer in fanout_peers:
self.mesh[topic].add(peer)
await self.emit_graft(topic, peer)
@ -369,10 +404,10 @@ class GossipSub(IPubsubRouter, Service):
"""
Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the
state changes in the preceding heartbeat
Note: the heartbeats are called with awaits because each heartbeat depends on
the state changes in the preceding heartbeat
"""
# Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501
# Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # noqa: E501
await trio.sleep(self.heartbeat_initial_delay)
while True:
# Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
@ -381,7 +416,8 @@ class GossipSub(IPubsubRouter, Service):
self.fanout_heartbeat()
# Get the peers to send IHAVE to
peers_to_gossip = self.gossip_heartbeat()
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and
# send it
await self._emit_control_msgs(
peers_to_graft, peers_to_prune, peers_to_gossip
)
@ -391,7 +427,7 @@ class GossipSub(IPubsubRouter, Service):
await trio.sleep(self.heartbeat_interval)
def mesh_heartbeat(
self
self,
) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]:
peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list)
@ -402,7 +438,7 @@ class GossipSub(IPubsubRouter, Service):
num_mesh_peers_in_topic = len(self.mesh[topic])
if num_mesh_peers_in_topic < self.degree_low:
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
)
@ -436,7 +472,7 @@ class GossipSub(IPubsubRouter, Service):
# Remove topic from fanout
del self.fanout[topic]
else:
# Check if fanout peers are still in the topic and remove the ones that are not
# Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
in_topic_fanout_peers = [
peer
@ -448,7 +484,7 @@ class GossipSub(IPubsubRouter, Service):
# If |fanout[topic]| < D
if num_fanout_peers_in_topic < self.degree:
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # noqa: E501
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - num_fanout_peers_in_topic,
@ -462,11 +498,14 @@ class GossipSub(IPubsubRouter, Service):
for topic in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too
# Get all pubsub peers in a topic and only add them if they are
# gossipsub peers too
if topic in self.pubsub.peer_topics:
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.mesh[topic]
peers_to_emit_ihave_to = (
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.mesh[topic]
)
)
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
@ -478,11 +517,14 @@ class GossipSub(IPubsubRouter, Service):
for topic in self.fanout:
msg_ids = self.mcache.window(topic)
if msg_ids:
# Get all pubsub peers in topic and only add if they are gossipsub peers also
# Get all pubsub peers in topic and only add if they are
# gossipsub peers also
if topic in self.pubsub.peer_topics:
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.fanout[topic]
peers_to_emit_ihave_to = (
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.fanout[topic]
)
)
msg_id_strs = [str(msg) for msg in msg_ids]
for peer in peers_to_emit_ihave_to:
@ -494,7 +536,8 @@ class GossipSub(IPubsubRouter, Service):
num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]
) -> List[Any]:
"""
Select at most num_to_select subset of elements from the set (pool - minus) randomly.
Select at most num_to_select subset of elements from the set
(pool - minus) randomly.
:param num_to_select: number of elements to randomly select
:param pool: list of items to select from (excluding elements in minus)
:param minus: elements to be excluded from selection pool
@ -508,8 +551,9 @@ class GossipSub(IPubsubRouter, Service):
# Don't create a new selection_pool if we are not subbing anything
selection_pool = list(pool)
# If num_to_select > size(selection_pool), then return selection_pool (which has the most
# possible elements s.t. the number of elements is less than num_to_select)
# If num_to_select > size(selection_pool), then return selection_pool (which has
# the most possible elements s.t. the number of elements is less than
# num_to_select)
if num_to_select >= len(selection_pool):
return selection_pool
@ -521,11 +565,11 @@ class GossipSub(IPubsubRouter, Service):
def _get_in_topic_gossipsub_peers_from_minus(
self, topic: str, num_to_select: int, minus: Iterable[ID]
) -> List[ID]:
gossipsub_peers_in_topic = set(
gossipsub_peers_in_topic = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == PROTOCOL_ID
)
}
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
# RPC handlers
@ -533,15 +577,15 @@ class GossipSub(IPubsubRouter, Service):
async def handle_ihave(
self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID
) -> None:
"""Checks the seen set and requests unknown messages with an IWANT
message."""
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
"""Checks the seen set and requests unknown messages with an IWANT message."""
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in
# seen_messages cache
seen_seqnos_and_peers = [
seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys()
]
# Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list
# of messages we want to request
# Add all unknown message ids (ids that appear in ihave_msg but not in
# seen_seqnos) to list of messages we want to request
# FIXME: Update type of message ID
msg_ids_wanted: List[Any] = [
msg_id
@ -556,8 +600,10 @@ class GossipSub(IPubsubRouter, Service):
async def handle_iwant(
self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID
) -> None:
"""Forwards all request messages that are present in mcache to the
requesting peer."""
"""
Forwards all request messages that are present in mcache to the
requesting peer.
"""
# FIXME: Update type of message ID
# FIXME: Find a better way to parse the msg ids
msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
@ -572,8 +618,8 @@ class GossipSub(IPubsubRouter, Service):
msgs_to_forward.append(msg)
# Forward messages to requesting peer
# Should this just be publishing? No
# because then the message will forwarded to peers in the topics contained in the messages.
# Should this just be publishing? No, because then the message will forwarded to
# peers in the topics contained in the messages.
# We should
# 1) Package these messages into a single packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
@ -643,7 +689,6 @@ class GossipSub(IPubsubRouter, Service):
async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None:
"""Emit ihave message, sent to to_peer, for topic and msg_ids."""
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(msg_ids)
ihave_msg.topicID = topic
@ -655,7 +700,6 @@ class GossipSub(IPubsubRouter, Service):
async def emit_iwant(self, msg_ids: Any, to_peer: ID) -> None:
"""Emit iwant message, sent to to_peer, for msg_ids."""
iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant()
iwant_msg.messageIDs.extend(msg_ids)
@ -666,7 +710,6 @@ class GossipSub(IPubsubRouter, Service):
async def emit_graft(self, topic: str, to_peer: ID) -> None:
"""Emit graft message, sent to to_peer, for topic."""
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
graft_msg.topicID = topic
@ -677,7 +720,6 @@ class GossipSub(IPubsubRouter, Service):
async def emit_prune(self, topic: str, to_peer: ID) -> None:
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic

View File

@ -1,10 +1,17 @@
from typing import Dict, List, Optional, Sequence, Tuple
from typing import (
Dict,
List,
Optional,
Sequence,
Tuple,
)
from .pb import rpc_pb2
from .pb import (
rpc_pb2,
)
class CacheEntry:
mid: Tuple[bytes, bytes]
topics: List[str]
@ -24,7 +31,6 @@ class CacheEntry:
class MessageCache:
window_size: int
history_size: int
@ -91,8 +97,9 @@ class MessageCache:
return mids
def shift(self) -> None:
"""Shift the window over by 1 position, dropping the last element of
the history."""
"""
Shift the window over by 1 position, dropping the last element of the history.
"""
last_entries: List[CacheEntry] = self.history[len(self.history) - 1]
for entry in last_entries:

File diff suppressed because it is too large Load Diff

View File

@ -27,197 +27,352 @@ from typing_extensions import (
Literal as typing_extensions___Literal,
)
class RPC(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
class SubOpts(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
subscribe = ... # type: bool
topicid = ... # type: typing___Text
subscribe = ... # type: bool
topicid = ... # type: typing___Text
def __init__(self,
def __init__(
self,
*,
subscribe : typing___Optional[bool] = None,
topicid : typing___Optional[typing___Text] = None,
) -> None: ...
subscribe: typing___Optional[bool] = None,
topicid: typing___Optional[typing___Text] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> RPC.SubOpts: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def MergeFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
def CopyFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"subscribe",u"topicid"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"subscribe",u"topicid"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["subscribe", "topicid"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["subscribe", "topicid"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"subscribe",b"subscribe",u"topicid",b"topicid"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"subscribe",b"subscribe",u"topicid",b"topicid"]) -> None: ...
def HasField(
self,
field_name: typing_extensions___Literal[
"subscribe", b"subscribe", "topicid", b"topicid"
],
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"subscribe", b"subscribe", "topicid", b"topicid"
],
) -> None: ...
@property
def subscriptions(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[RPC.SubOpts]: ...
def subscriptions(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
RPC.SubOpts
]: ...
@property
def publish(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[Message]: ...
def publish(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
Message
]: ...
@property
def control(self) -> ControlMessage: ...
def __init__(self,
def __init__(
self,
*,
subscriptions : typing___Optional[typing___Iterable[RPC.SubOpts]] = None,
publish : typing___Optional[typing___Iterable[Message]] = None,
control : typing___Optional[ControlMessage] = None,
) -> None: ...
subscriptions: typing___Optional[typing___Iterable[RPC.SubOpts]] = None,
publish: typing___Optional[typing___Iterable[Message]] = None,
control: typing___Optional[ControlMessage] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> RPC: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"control"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"control",u"publish",u"subscriptions"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["control"]
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"control", "publish", "subscriptions"
],
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"control",b"control"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"control",b"control",u"publish",b"publish",u"subscriptions",b"subscriptions"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["control", b"control"]
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"control",
b"control",
"publish",
b"publish",
"subscriptions",
b"subscriptions",
],
) -> None: ...
class Message(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
from_id = ... # type: bytes
data = ... # type: bytes
seqno = ... # type: bytes
topicIDs = ... # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
signature = ... # type: bytes
key = ... # type: bytes
from_id = ... # type: bytes
data = ... # type: bytes
seqno = ... # type: bytes
topicIDs = (
...
) # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
signature = ... # type: bytes
key = ... # type: bytes
def __init__(self,
def __init__(
self,
*,
from_id : typing___Optional[bytes] = None,
data : typing___Optional[bytes] = None,
seqno : typing___Optional[bytes] = None,
topicIDs : typing___Optional[typing___Iterable[typing___Text]] = None,
signature : typing___Optional[bytes] = None,
key : typing___Optional[bytes] = None,
) -> None: ...
from_id: typing___Optional[bytes] = None,
data: typing___Optional[bytes] = None,
seqno: typing___Optional[bytes] = None,
topicIDs: typing___Optional[typing___Iterable[typing___Text]] = None,
signature: typing___Optional[bytes] = None,
key: typing___Optional[bytes] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> Message: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"data",u"from_id",u"key",u"seqno",u"signature"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"data",u"from_id",u"key",u"seqno",u"signature",u"topicIDs"]) -> None: ...
def HasField(
self,
field_name: typing_extensions___Literal[
"data", "from_id", "key", "seqno", "signature"
],
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"data", "from_id", "key", "seqno", "signature", "topicIDs"
],
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"data",b"data",u"from_id",b"from_id",u"key",b"key",u"seqno",b"seqno",u"signature",b"signature"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"data",b"data",u"from_id",b"from_id",u"key",b"key",u"seqno",b"seqno",u"signature",b"signature",u"topicIDs",b"topicIDs"]) -> None: ...
def HasField(
self,
field_name: typing_extensions___Literal[
"data",
b"data",
"from_id",
b"from_id",
"key",
b"key",
"seqno",
b"seqno",
"signature",
b"signature",
],
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"data",
b"data",
"from_id",
b"from_id",
"key",
b"key",
"seqno",
b"seqno",
"signature",
b"signature",
"topicIDs",
b"topicIDs",
],
) -> None: ...
class ControlMessage(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
@property
def ihave(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[ControlIHave]: ...
def ihave(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
ControlIHave
]: ...
@property
def iwant(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[ControlIWant]: ...
def iwant(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
ControlIWant
]: ...
@property
def graft(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[ControlGraft]: ...
def graft(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
ControlGraft
]: ...
@property
def prune(self) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[ControlPrune]: ...
def __init__(self,
def prune(
self,
) -> google___protobuf___internal___containers___RepeatedCompositeFieldContainer[
ControlPrune
]: ...
def __init__(
self,
*,
ihave : typing___Optional[typing___Iterable[ControlIHave]] = None,
iwant : typing___Optional[typing___Iterable[ControlIWant]] = None,
graft : typing___Optional[typing___Iterable[ControlGraft]] = None,
prune : typing___Optional[typing___Iterable[ControlPrune]] = None,
) -> None: ...
ihave: typing___Optional[typing___Iterable[ControlIHave]] = None,
iwant: typing___Optional[typing___Iterable[ControlIWant]] = None,
graft: typing___Optional[typing___Iterable[ControlGraft]] = None,
prune: typing___Optional[typing___Iterable[ControlPrune]] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> ControlMessage: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def ClearField(self, field_name: typing_extensions___Literal[u"graft",u"ihave",u"iwant",u"prune"]) -> None: ...
def ClearField(
self,
field_name: typing_extensions___Literal["graft", "ihave", "iwant", "prune"],
) -> None: ...
else:
def ClearField(self, field_name: typing_extensions___Literal[u"graft",b"graft",u"ihave",b"ihave",u"iwant",b"iwant",u"prune",b"prune"]) -> None: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"graft",
b"graft",
"ihave",
b"ihave",
"iwant",
b"iwant",
"prune",
b"prune",
],
) -> None: ...
class ControlIHave(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
topicID = ... # type: typing___Text
messageIDs = ... # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
topicID = ... # type: typing___Text
messageIDs = (
...
) # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
def __init__(self,
def __init__(
self,
*,
topicID : typing___Optional[typing___Text] = None,
messageIDs : typing___Optional[typing___Iterable[typing___Text]] = None,
) -> None: ...
topicID: typing___Optional[typing___Text] = None,
messageIDs: typing___Optional[typing___Iterable[typing___Text]] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> ControlIHave: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"messageIDs",u"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["messageIDs", "topicID"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"topicID",b"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"messageIDs",b"messageIDs",u"topicID",b"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID", b"topicID"]
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"messageIDs", b"messageIDs", "topicID", b"topicID"
],
) -> None: ...
class ControlIWant(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
messageIDs = ... # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
messageIDs = (
...
) # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[typing___Text]
def __init__(self,
def __init__(
self,
*,
messageIDs : typing___Optional[typing___Iterable[typing___Text]] = None,
) -> None: ...
messageIDs: typing___Optional[typing___Iterable[typing___Text]] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> ControlIWant: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def ClearField(self, field_name: typing_extensions___Literal[u"messageIDs"]) -> None: ...
def ClearField(
self, field_name: typing_extensions___Literal["messageIDs"]
) -> None: ...
else:
def ClearField(self, field_name: typing_extensions___Literal[u"messageIDs",b"messageIDs"]) -> None: ...
def ClearField(
self, field_name: typing_extensions___Literal["messageIDs", b"messageIDs"]
) -> None: ...
class ControlGraft(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
topicID = ... # type: typing___Text
topicID = ... # type: typing___Text
def __init__(self,
def __init__(
self,
*,
topicID : typing___Optional[typing___Text] = None,
) -> None: ...
topicID: typing___Optional[typing___Text] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> ControlGraft: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["topicID"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"topicID",b"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"topicID",b"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID", b"topicID"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["topicID", b"topicID"]
) -> None: ...
class ControlPrune(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
topicID = ... # type: typing___Text
topicID = ... # type: typing___Text
def __init__(self,
def __init__(
self,
*,
topicID : typing___Optional[typing___Text] = None,
) -> None: ...
topicID: typing___Optional[typing___Text] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> ControlPrune: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["topicID"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"topicID",b"topicID"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"topicID",b"topicID"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["topicID", b"topicID"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["topicID", b"topicID"]
) -> None: ...
class TopicDescriptor(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
class AuthOpts(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
class AuthMode(int):
DESCRIPTOR: google___protobuf___descriptor___EnumDescriptor = ...
@classmethod
@ -229,7 +384,11 @@ class TopicDescriptor(google___protobuf___message___Message):
@classmethod
def values(cls) -> typing___List[TopicDescriptor.AuthOpts.AuthMode]: ...
@classmethod
def items(cls) -> typing___List[typing___Tuple[str, TopicDescriptor.AuthOpts.AuthMode]]: ...
def items(
cls,
) -> typing___List[
typing___Tuple[str, TopicDescriptor.AuthOpts.AuthMode]
]: ...
NONE = typing___cast(TopicDescriptor.AuthOpts.AuthMode, 0)
KEY = typing___cast(TopicDescriptor.AuthOpts.AuthMode, 1)
WOT = typing___cast(TopicDescriptor.AuthOpts.AuthMode, 2)
@ -237,27 +396,46 @@ class TopicDescriptor(google___protobuf___message___Message):
KEY = typing___cast(TopicDescriptor.AuthOpts.AuthMode, 1)
WOT = typing___cast(TopicDescriptor.AuthOpts.AuthMode, 2)
mode = ... # type: TopicDescriptor.AuthOpts.AuthMode
keys = ... # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[bytes]
mode = ... # type: TopicDescriptor.AuthOpts.AuthMode
keys = (
...
) # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[bytes]
def __init__(self,
def __init__(
self,
*,
mode : typing___Optional[TopicDescriptor.AuthOpts.AuthMode] = None,
keys : typing___Optional[typing___Iterable[bytes]] = None,
) -> None: ...
mode: typing___Optional[TopicDescriptor.AuthOpts.AuthMode] = None,
keys: typing___Optional[typing___Iterable[bytes]] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> TopicDescriptor.AuthOpts: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def MergeFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
def CopyFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"mode"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"keys",u"mode"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["mode"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["keys", "mode"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"mode",b"mode"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"keys",b"keys",u"mode",b"mode"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["mode", b"mode"]
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"keys", b"keys", "mode", b"mode"
],
) -> None: ...
class EncOpts(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
class EncMode(int):
DESCRIPTOR: google___protobuf___descriptor___EnumDescriptor = ...
@classmethod
@ -269,7 +447,11 @@ class TopicDescriptor(google___protobuf___message___Message):
@classmethod
def values(cls) -> typing___List[TopicDescriptor.EncOpts.EncMode]: ...
@classmethod
def items(cls) -> typing___List[typing___Tuple[str, TopicDescriptor.EncOpts.EncMode]]: ...
def items(
cls,
) -> typing___List[
typing___Tuple[str, TopicDescriptor.EncOpts.EncMode]
]: ...
NONE = typing___cast(TopicDescriptor.EncOpts.EncMode, 0)
SHAREDKEY = typing___cast(TopicDescriptor.EncOpts.EncMode, 1)
WOT = typing___cast(TopicDescriptor.EncOpts.EncMode, 2)
@ -277,46 +459,77 @@ class TopicDescriptor(google___protobuf___message___Message):
SHAREDKEY = typing___cast(TopicDescriptor.EncOpts.EncMode, 1)
WOT = typing___cast(TopicDescriptor.EncOpts.EncMode, 2)
mode = ... # type: TopicDescriptor.EncOpts.EncMode
keyHashes = ... # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[bytes]
mode = ... # type: TopicDescriptor.EncOpts.EncMode
keyHashes = (
...
) # type: google___protobuf___internal___containers___RepeatedScalarFieldContainer[bytes]
def __init__(self,
def __init__(
self,
*,
mode : typing___Optional[TopicDescriptor.EncOpts.EncMode] = None,
keyHashes : typing___Optional[typing___Iterable[bytes]] = None,
) -> None: ...
mode: typing___Optional[TopicDescriptor.EncOpts.EncMode] = None,
keyHashes: typing___Optional[typing___Iterable[bytes]] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> TopicDescriptor.EncOpts: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def MergeFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
def CopyFrom(
self, other_msg: google___protobuf___message___Message
) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"mode"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"keyHashes",u"mode"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["mode"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["keyHashes", "mode"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"mode",b"mode"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"keyHashes",b"keyHashes",u"mode",b"mode"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["mode", b"mode"]
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"keyHashes", b"keyHashes", "mode", b"mode"
],
) -> None: ...
name = ... # type: typing___Text
name = ... # type: typing___Text
@property
def auth(self) -> TopicDescriptor.AuthOpts: ...
@property
def enc(self) -> TopicDescriptor.EncOpts: ...
def __init__(self,
def __init__(
self,
*,
name : typing___Optional[typing___Text] = None,
auth : typing___Optional[TopicDescriptor.AuthOpts] = None,
enc : typing___Optional[TopicDescriptor.EncOpts] = None,
) -> None: ...
name: typing___Optional[typing___Text] = None,
auth: typing___Optional[TopicDescriptor.AuthOpts] = None,
enc: typing___Optional[TopicDescriptor.EncOpts] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> TopicDescriptor: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"auth",u"enc",u"name"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"auth",u"enc",u"name"]) -> None: ...
def HasField(
self, field_name: typing_extensions___Literal["auth", "enc", "name"]
) -> bool: ...
def ClearField(
self, field_name: typing_extensions___Literal["auth", "enc", "name"]
) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"auth",b"auth",u"enc",b"enc",u"name",b"name"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"auth",b"auth",u"enc",b"enc",u"name",b"name"]) -> None: ...
def HasField(
self,
field_name: typing_extensions___Literal[
"auth", b"auth", "enc", b"enc", "name", b"name"
],
) -> bool: ...
def ClearField(
self,
field_name: typing_extensions___Literal[
"auth", b"auth", "enc", b"enc", "name", b"name"
],
) -> None: ...

View File

@ -15,33 +15,78 @@ from typing import (
cast,
)
from async_service import Service
from async_service import (
Service,
)
import base58
from lru import LRU
from lru import (
LRU,
)
import trio
from libp2p.crypto.keys import PrivateKey
from libp2p.exceptions import ParseError, ValidationError
from libp2p.host.host_interface import IHost
from libp2p.io.exceptions import IncompleteReadError
from libp2p.network.exceptions import SwarmException
from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.id import ID
from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes
from libp2p.crypto.keys import (
PrivateKey,
)
from libp2p.exceptions import (
ParseError,
ValidationError,
)
from libp2p.host.host_interface import (
IHost,
)
from libp2p.io.exceptions import (
IncompleteReadError,
)
from libp2p.network.exceptions import (
SwarmException,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
StreamEOF,
StreamReset,
)
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
from libp2p.peer.id import (
ID,
)
from libp2p.typing import (
TProtocol,
)
from libp2p.utils import (
encode_varint_prefixed,
read_varint_prefixed_bytes,
)
from .abc import IPubsub, ISubscriptionAPI
from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee
from .subscription import TrioSubscriptionAPI
from .typing import AsyncValidatorFn, SyncValidatorFn, ValidatorFn
from .validators import PUBSUB_SIGNING_PREFIX, signature_validator
from .abc import (
IPubsub,
ISubscriptionAPI,
)
from .pb import (
rpc_pb2,
)
from .pubsub_notifee import (
PubsubNotifee,
)
from .subscription import (
TrioSubscriptionAPI,
)
from .typing import (
AsyncValidatorFn,
SyncValidatorFn,
ValidatorFn,
)
from .validators import (
PUBSUB_SIGNING_PREFIX,
signature_validator,
)
if TYPE_CHECKING:
from .abc import IPubsubRouter # noqa: F401
from typing import Any # noqa: F401
from .abc import IPubsubRouter # noqa: F401
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501
SUBSCRIPTION_CHANNEL_SIZE = 32
@ -64,7 +109,6 @@ class TopicValidator(NamedTuple):
class Pubsub(Service, IPubsub):
host: IHost
router: "IPubsubRouter"
@ -186,8 +230,10 @@ class Pubsub(Service, IPubsub):
return self.subscribed_topics_receive.keys()
def get_hello_packet(self) -> rpc_pb2.RPC:
"""Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics."""
"""
Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics.
"""
packet = rpc_pb2.RPC()
for topic_id in self.topic_ids:
packet.subscriptions.extend(
@ -254,7 +300,7 @@ class Pubsub(Service, IPubsub):
:param topic: the topic to register validator under
:param validator: the validator used to validate messages published to the topic
:param is_async_validator: indicate if the validator is an asynchronous validator
"""
""" # noqa: E501
self.topic_validators[topic] = TopicValidator(validator, is_async_validator)
def remove_topic_validator(self, topic: str) -> None:
@ -341,9 +387,11 @@ class Pubsub(Service, IPubsub):
logger.debug("removed dead peer %s", peer_id)
async def handle_peer_queue(self) -> None:
"""Continuously read from peer queue and each time a new peer is found,
"""
Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol pubsub
protocols we support."""
protocols we support.
"""
async with self.peer_receive_channel:
self.event_handle_peer_queue_started.set()
async for peer_id in self.peer_receive_channel:
@ -351,9 +399,10 @@ class Pubsub(Service, IPubsub):
self.manager.run_task(self._handle_new_peer, peer_id)
async def handle_dead_peer_queue(self) -> None:
"""Continuously read from dead peer channel and close the stream
between that peer and remove peer info from pubsub and pubsub
router."""
"""
Continuously read from dead peer channel and close the stream
between that peer and remove peer info from pubsub and pubsub router.
"""
async with self.dead_peer_receive_channel:
self.event_handle_dead_peer_queue_started.set()
async for peer_id in self.dead_peer_receive_channel:
@ -373,7 +422,7 @@ class Pubsub(Service, IPubsub):
"""
if sub_message.subscribe:
if sub_message.topicid not in self.peer_topics:
self.peer_topics[sub_message.topicid] = set([origin_id])
self.peer_topics[sub_message.topicid] = {origin_id}
elif origin_id not in self.peer_topics[sub_message.topicid]:
# Add peer to topic
self.peer_topics[sub_message.topicid].add(origin_id)
@ -388,7 +437,6 @@ class Pubsub(Service, IPubsub):
:param publish_message: RPC.Message format
"""
# Check if this message has any topics that we are subscribed to
for topic in publish_message.topicIDs:
if topic in self.topic_ids:
@ -409,7 +457,6 @@ class Pubsub(Service, IPubsub):
:param topic_id: topic_id to subscribe to
"""
logger.debug("subscribing to topic %s", topic_id)
# Already subscribed
@ -448,7 +495,6 @@ class Pubsub(Service, IPubsub):
:param topic_id: topic_id to unsubscribe from
"""
logger.debug("unsubscribing from topic %s", topic_id)
# Return if we already unsubscribed from the topic
@ -479,7 +525,6 @@ class Pubsub(Service, IPubsub):
:param raw_msg: raw contents of the message to broadcast
"""
# Broadcast message
for stream in self.peers.values():
# Write message to stream
@ -571,7 +616,7 @@ class Pubsub(Service, IPubsub):
# TODO: Check if the `from` is in the blacklist. If yes, reject.
# If the message is processed before, return(i.e., don't further process the message).
# If the message is processed before, return(i.e., don't further process the message) # noqa: E501
if self._is_msg_seen(msg):
return
@ -588,7 +633,7 @@ class Pubsub(Service, IPubsub):
await self.validate_msg(msg_forwarder, msg)
except ValidationError:
logger.debug(
"Topic validation failed: sender %s sent data %s under topic IDs: %s %s:%s",
"Topic validation failed: sender %s sent data %s under topic IDs: %s %s:%s", # noqa: E501
msg_forwarder,
msg.data.hex(),
msg.topicIDs,
@ -612,8 +657,8 @@ class Pubsub(Service, IPubsub):
def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None:
msg_id = self._msg_id_constructor(msg)
# FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there is a
# more appropriate way.
# FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there
# is a more appropriate way.
self.seen_messages[msg_id] = 1
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:

View File

@ -1,19 +1,30 @@
from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
)
from multiaddr import Multiaddr
from multiaddr import (
Multiaddr,
)
import trio
from libp2p.network.connection.net_connection_interface import INetConn
from libp2p.network.network_interface import INetwork
from libp2p.network.notifee_interface import INotifee
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.network.connection.net_connection_interface import (
INetConn,
)
from libp2p.network.network_interface import (
INetwork,
)
from libp2p.network.notifee_interface import (
INotifee,
)
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
if TYPE_CHECKING:
from libp2p.peer.id import ID # noqa: F401
class PubsubNotifee(INotifee):
initiator_peers_queue: "trio.MemorySendChannel[ID]"
dead_peers_queue: "trio.MemorySendChannel[ID]"

View File

@ -1,11 +1,23 @@
from types import TracebackType
from typing import AsyncIterator, Optional, Type
from types import (
TracebackType,
)
from typing import (
AsyncIterator,
Optional,
Type,
)
import trio
from .abc import ISubscriptionAPI
from .pb import rpc_pb2
from .typing import UnsubscribeFn
from .abc import (
ISubscriptionAPI,
)
from .pb import (
rpc_pb2,
)
from .typing import (
UnsubscribeFn,
)
class BaseSubscriptionAPI(ISubscriptionAPI):
@ -32,11 +44,11 @@ class TrioSubscriptionAPI(BaseSubscriptionAPI):
unsubscribe_fn: UnsubscribeFn,
) -> None:
self.receive_channel = receive_channel
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427 # noqa: E501
self.unsubscribe_fn = unsubscribe_fn # type: ignore
async def unsubscribe(self) -> None:
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427 # noqa: E501
await self.unsubscribe_fn() # type: ignore
def __aiter__(self) -> AsyncIterator[rpc_pb2.Message]:

View File

@ -1,8 +1,16 @@
from typing import Awaitable, Callable, Union
from typing import (
Awaitable,
Callable,
Union,
)
from libp2p.peer.id import ID
from libp2p.peer.id import (
ID,
)
from .pb import rpc_pb2
from .pb import (
rpc_pb2,
)
SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool]
AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]]

View File

@ -1,9 +1,15 @@
import logging
from libp2p.crypto.serialization import deserialize_public_key
from libp2p.peer.id import ID
from libp2p.crypto.serialization import (
deserialize_public_key,
)
from libp2p.peer.id import (
ID,
)
from .pb import rpc_pb2
from .pb import (
rpc_pb2,
)
logger = logging.getLogger("libp2p.pubsub")