ft. modernise py-libp2p (#618)

* fix pyproject.toml , add ruff

* rm lock

* make progress

* add poetry lock ignore

* fix type issues

* fix tcp type errors

* fix text example - type error - wrong args

* add setuptools to dev

* test ci

* fix docs build

* fix type issues for new_swarm & new_host

* fix types in gossipsub

* fix type issues in noise

* wip: factories

* revert factories

* fix more type issues

* more type fixes

* fix: add null checks for noise protocol initialization and key handling

* corrected argument-errors in peerId and Multiaddr in peer tests

* fix: Noice - remove redundant type casts in BaseNoiseMsgReadWriter

* fix: update test_notify.py to use SwarmFactory.create_batch_and_listen, fix type hints, and comment out ClosedStream assertions

* Fix type checks for pubsub module

Signed-off-by: sukhman <sukhmansinghsaluja@gmail.com>

* Fix type checks for pubsub module-tests

Signed-off-by: sukhman <sukhmansinghsaluja@gmail.com>

* noise: add checks for uninitialized protocol and key states in PatternXX

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>

* pubsub: add None checks for optional fields in FloodSub and Pubsub

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>

* Fix type hints and improve testing

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>

* remove redundant checks

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>

* fix build issues

* add optional to trio service

* fix types

* fix type errors

* Fix type errors

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>

* fixed more-type checks in crypto and peer_data files

* wip: factories

* replaced union with optional

* fix: type-error in interp-utils and peerinfo

* replace pyright with pyrefly

* add pyrefly.toml

* wip: fix multiselect issues

* try typecheck

* base check

* mcache test fixes , typecheck ci update

* fix ci

* will this work

* minor fix

* use poetry

* fix wokflow

* use cache,fix err

* fix pyrefly.toml

* fix pyrefly.toml

* fix cache in ci

* deploy commit

* add main baseline

* update to v5

* improve typecheck ci (#14)

* fix typo

* remove holepunching code (#16)

* fix gossipsub typeerrors (#17)

* fix: ensure initiator user includes remote peer id in handshake (#15)

* fix ci (#19)

* typefix: custom_types | core/peerinfo/test_peer_info | io/abc | pubsub/floodsub | protocol_muxer/multiselect (#18)

* fix: Typefixes in PeerInfo  (#21)

* fix minor type issue (#22)

* fix type errors in pubsub (#24)

* fix: Minor typefixes in tests (#23)

* Fix failing tests for type-fixed test/pubsub (#8)

* move pyrefly & ruff to pyproject.toml & rm .project-template (#28)

* move the async_context file to tests/core

* move crypto test to crypto folder

* fix: some typefixes (#25)

* fix type errors

* fix type issues

* fix: update gRPC API usage in autonat_pb2_grpc.py (#31)

* md: typecheck ci

* rm comments

* clean up : from review suggestions

* use | None over Optional as per new python standards

* drop supporto for py3.9

* newsfragments

---------

Signed-off-by: sukhman <sukhmansinghsaluja@gmail.com>
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
Co-authored-by: acul71 <luca.pisani@birdo.net>
Co-authored-by: kaneki003 <sakshamchauhan707@gmail.com>
Co-authored-by: sukhman <sukhmansinghsaluja@gmail.com>
Co-authored-by: varun-r-mallya <varunrmallya@gmail.com>
Co-authored-by: varunrmallya <100590632+varun-r-mallya@users.noreply.github.com>
Co-authored-by: lla-dane <abhinavagarwalla6@gmail.com>
Co-authored-by: Collins <ArtemisfowlX@protonmail.com>
Co-authored-by: Abhinav Agarwalla <120122716+lla-dane@users.noreply.github.com>
Co-authored-by: guha-rahul <52607971+guha-rahul@users.noreply.github.com>
Co-authored-by: Sukhman Singh <63765293+sukhman-sukh@users.noreply.github.com>
Co-authored-by: acul71 <34693171+acul71@users.noreply.github.com>
Co-authored-by: pacrob <5199899+pacrob@users.noreply.github.com>
This commit is contained in:
Arush Kurundodi
2025-06-09 23:09:59 +05:30
committed by GitHub
parent d020bbc066
commit bdadec7519
111 changed files with 1537 additions and 1401 deletions

View File

@ -152,12 +152,12 @@ def get_default_muxer_options() -> TMuxerOptions:
def new_swarm(
key_pair: Optional[KeyPair] = None,
muxer_opt: Optional[TMuxerOptions] = None,
sec_opt: Optional[TSecurityOptions] = None,
peerstore_opt: Optional[IPeerStore] = None,
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
listen_addrs: Optional[Sequence[multiaddr.Multiaddr]] = None,
key_pair: KeyPair | None = None,
muxer_opt: TMuxerOptions | None = None,
sec_opt: TSecurityOptions | None = None,
peerstore_opt: IPeerStore | None = None,
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
) -> INetworkService:
"""
Create a swarm instance based on the parameters.
@ -236,13 +236,13 @@ def new_swarm(
def new_host(
key_pair: Optional[KeyPair] = None,
muxer_opt: Optional[TMuxerOptions] = None,
sec_opt: Optional[TSecurityOptions] = None,
peerstore_opt: Optional[IPeerStore] = None,
disc_opt: Optional[IPeerRouting] = None,
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
listen_addrs: Sequence[multiaddr.Multiaddr] = None,
key_pair: KeyPair | None = None,
muxer_opt: TMuxerOptions | None = None,
sec_opt: TSecurityOptions | None = None,
peerstore_opt: IPeerStore | None = None,
disc_opt: IPeerRouting | None = None,
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
) -> IHost:
"""
Create a new libp2p host based on the given parameters.

View File

@ -8,6 +8,7 @@ from collections.abc import (
KeysView,
Sequence,
)
from contextlib import AbstractAsyncContextManager
from types import (
TracebackType,
)
@ -15,7 +16,6 @@ from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Optional,
)
from multiaddr import (
@ -160,7 +160,11 @@ class IMuxedConn(ABC):
event_started: trio.Event
@abstractmethod
def __init__(self, conn: ISecureConn, peer_id: ID) -> None:
def __init__(
self,
conn: ISecureConn,
peer_id: ID,
) -> None:
"""
Initialize a new multiplexed connection.
@ -260,9 +264,9 @@ class IMuxedStream(ReadWriteCloser, AsyncContextManager["IMuxedStream"]):
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()
@ -287,7 +291,7 @@ class INetStream(ReadWriteCloser):
muxed_conn: IMuxedConn
@abstractmethod
def get_protocol(self) -> TProtocol:
def get_protocol(self) -> TProtocol | None:
"""
Retrieve the protocol identifier for the stream.
@ -916,7 +920,7 @@ class INetwork(ABC):
"""
@abstractmethod
async def listen(self, *multiaddrs: Sequence[Multiaddr]) -> bool:
async def listen(self, *multiaddrs: Multiaddr) -> bool:
"""
Start listening on one or more multiaddresses.
@ -1174,7 +1178,9 @@ class IHost(ABC):
"""
@abstractmethod
def run(self, listen_addrs: Sequence[Multiaddr]) -> AsyncContextManager[None]:
def run(
self, listen_addrs: Sequence[Multiaddr]
) -> AbstractAsyncContextManager[None]:
"""
Run the host and start listening on the specified multiaddresses.
@ -1564,7 +1570,7 @@ class IMultiselectMuxer(ABC):
and its corresponding handler for communication.
"""
handlers: dict[TProtocol, StreamHandlerFn]
handlers: dict[TProtocol | None, StreamHandlerFn | None]
@abstractmethod
def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:
@ -1580,7 +1586,7 @@ class IMultiselectMuxer(ABC):
"""
def get_protocols(self) -> tuple[TProtocol, ...]:
def get_protocols(self) -> tuple[TProtocol | None, ...]:
"""
Retrieve the protocols for which handlers have been registered.
@ -1595,7 +1601,7 @@ class IMultiselectMuxer(ABC):
@abstractmethod
async def negotiate(
self, communicator: IMultiselectCommunicator
) -> tuple[TProtocol, StreamHandlerFn]:
) -> tuple[TProtocol | None, StreamHandlerFn | None]:
"""
Negotiate a protocol selection with a multiselect client.
@ -1672,7 +1678,7 @@ class IPeerRouting(ABC):
"""
@abstractmethod
async def find_peer(self, peer_id: ID) -> PeerInfo:
async def find_peer(self, peer_id: ID) -> PeerInfo | None:
"""
Search for a peer with the specified peer ID.
@ -1840,6 +1846,11 @@ class IPubsubRouter(ABC):
"""
mesh: dict[str, set[ID]]
fanout: dict[str, set[ID]]
peer_protocol: dict[ID, TProtocol]
degree: int
@abstractmethod
def get_protocols(self) -> list[TProtocol]:
"""
@ -1865,7 +1876,7 @@ class IPubsubRouter(ABC):
"""
@abstractmethod
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None:
"""
Notify the router that a new peer has connected.

View File

@ -116,15 +116,15 @@ def initialize_pair(
EncryptionParameters(
cipher_type,
hash_type,
first_half[0:iv_size],
first_half[iv_size + cipher_key_size :],
first_half[iv_size : iv_size + cipher_key_size],
bytes(first_half[0:iv_size]),
bytes(first_half[iv_size + cipher_key_size :]),
bytes(first_half[iv_size : iv_size + cipher_key_size]),
),
EncryptionParameters(
cipher_type,
hash_type,
second_half[0:iv_size],
second_half[iv_size + cipher_key_size :],
second_half[iv_size : iv_size + cipher_key_size],
bytes(second_half[0:iv_size]),
bytes(second_half[iv_size + cipher_key_size :]),
bytes(second_half[iv_size : iv_size + cipher_key_size]),
),
)

View File

@ -9,29 +9,40 @@ from libp2p.crypto.keys import (
if sys.platform != "win32":
from fastecdsa import (
curve as curve_types,
keys,
point,
)
from fastecdsa import curve as curve_types
from fastecdsa.encoding.sec1 import (
SEC1Encoder,
)
else:
from coincurve import PrivateKey as CPrivateKey
from coincurve import PublicKey as CPublicKey
from coincurve import (
PrivateKey as CPrivateKey,
PublicKey as CPublicKey,
)
def infer_local_type(curve: str) -> object:
"""
Convert a str representation of some elliptic curve to a
representation understood by the backend of this module.
"""
if curve != "P-256":
raise NotImplementedError("Only P-256 curve is supported")
if sys.platform != "win32":
if sys.platform != "win32":
def infer_local_type(curve: str) -> curve_types.Curve:
"""
Convert a str representation of some elliptic curve to a
representation understood by the backend of this module.
"""
if curve != "P-256":
raise NotImplementedError("Only P-256 curve is supported")
return curve_types.P256
return "P-256" # coincurve only supports P-256
else:
def infer_local_type(curve: str) -> str:
"""
Convert a str representation of some elliptic curve to a
representation understood by the backend of this module.
"""
if curve != "P-256":
raise NotImplementedError("Only P-256 curve is supported")
return "P-256" # coincurve only supports P-256
if sys.platform != "win32":
@ -68,7 +79,10 @@ if sys.platform != "win32":
return cls(private_key_impl, curve_type)
def to_bytes(self) -> bytes:
return keys.export_key(self.impl, self.curve)
key_str = keys.export_key(self.impl, self.curve)
if key_str is None:
raise Exception("Key not found")
return key_str.encode()
def get_type(self) -> KeyType:
return KeyType.ECC_P256

View File

@ -4,8 +4,10 @@ from Crypto.Hash import (
from nacl.exceptions import (
BadSignatureError,
)
from nacl.public import PrivateKey as PrivateKeyImpl
from nacl.public import PublicKey as PublicKeyImpl
from nacl.public import (
PrivateKey as PrivateKeyImpl,
PublicKey as PublicKeyImpl,
)
from nacl.signing import (
SigningKey,
VerifyKey,
@ -48,7 +50,7 @@ class Ed25519PrivateKey(PrivateKey):
self.impl = impl
@classmethod
def new(cls, seed: bytes = None) -> "Ed25519PrivateKey":
def new(cls, seed: bytes | None = None) -> "Ed25519PrivateKey":
if not seed:
seed = utils.random()
@ -75,7 +77,7 @@ class Ed25519PrivateKey(PrivateKey):
return Ed25519PublicKey(self.impl.public_key)
def create_new_key_pair(seed: bytes = None) -> KeyPair:
def create_new_key_pair(seed: bytes | None = None) -> KeyPair:
private_key = Ed25519PrivateKey.new(seed)
public_key = private_key.get_public_key()
return KeyPair(private_key, public_key)

View File

@ -1,6 +1,6 @@
from collections.abc import Callable
import sys
from typing import (
Callable,
cast,
)

View File

@ -81,12 +81,10 @@ class PrivateKey(Key):
"""A ``PrivateKey`` represents a cryptographic private key."""
@abstractmethod
def sign(self, data: bytes) -> bytes:
...
def sign(self, data: bytes) -> bytes: ...
@abstractmethod
def get_public_key(self) -> PublicKey:
...
def get_public_key(self) -> PublicKey: ...
def _serialize_to_protobuf(self) -> crypto_pb2.PrivateKey:
"""Return the protobuf representation of this ``Key``."""

View File

@ -37,7 +37,7 @@ class Secp256k1PrivateKey(PrivateKey):
self.impl = impl
@classmethod
def new(cls, secret: bytes = None) -> "Secp256k1PrivateKey":
def new(cls, secret: bytes | None = None) -> "Secp256k1PrivateKey":
private_key_impl = coincurve.PrivateKey(secret)
return cls(private_key_impl)
@ -65,7 +65,7 @@ class Secp256k1PrivateKey(PrivateKey):
return Secp256k1PublicKey(public_key_impl)
def create_new_key_pair(secret: bytes = None) -> KeyPair:
def create_new_key_pair(secret: bytes | None = None) -> KeyPair:
"""
Returns a new Secp256k1 keypair derived from the provided ``secret``, a
sequence of bytes corresponding to some integer between 0 and the group

View File

@ -1,13 +1,9 @@
from collections.abc import (
Awaitable,
Callable,
Mapping,
)
from typing import (
TYPE_CHECKING,
Callable,
NewType,
Union,
)
from typing import TYPE_CHECKING, NewType, Union, cast
if TYPE_CHECKING:
from libp2p.abc import (
@ -16,15 +12,9 @@ if TYPE_CHECKING:
ISecureTransport,
)
else:
class INetStream:
pass
class IMuxedConn:
pass
class ISecureTransport:
pass
IMuxedConn = cast(type, object)
INetStream = cast(type, object)
ISecureTransport = cast(type, object)
from libp2p.io.abc import (
@ -38,10 +28,10 @@ from libp2p.pubsub.pb import (
)
TProtocol = NewType("TProtocol", str)
StreamHandlerFn = Callable[["INetStream"], Awaitable[None]]
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
THandler = Callable[[ReadWriteCloser], Awaitable[None]]
TSecurityOptions = Mapping[TProtocol, "ISecureTransport"]
TMuxerClass = type["IMuxedConn"]
TSecurityOptions = Mapping[TProtocol, ISecureTransport]
TMuxerClass = type[IMuxedConn]
TMuxerOptions = Mapping[TProtocol, TMuxerClass]
SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool]
AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]]

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Union,
)
from libp2p.custom_types import (
TProtocol,
@ -94,7 +91,7 @@ class AutoNATService:
finally:
await stream.close()
async def _handle_request(self, request: Union[bytes, Message]) -> Message:
async def _handle_request(self, request: bytes | Message) -> Message:
"""
Process an AutoNAT protocol request.

View File

@ -84,26 +84,23 @@ class AutoNAT:
request: Any,
target: str,
options: tuple[Any, ...] = (),
channel_credentials: Optional[Any] = None,
call_credentials: Optional[Any] = None,
channel_credentials: Any | None = None,
call_credentials: Any | None = None,
insecure: bool = False,
compression: Optional[Any] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = None,
metadata: Optional[list[tuple[str, str]]] = None,
compression: Any | None = None,
wait_for_ready: bool | None = None,
timeout: float | None = None,
metadata: list[tuple[str, str]] | None = None,
) -> Any:
return grpc.experimental.unary_unary(
request,
target,
channel = grpc.secure_channel(target, channel_credentials) if channel_credentials else grpc.insecure_channel(target)
return channel.unary_unary(
"/autonat.pb.AutoNAT/Dial",
autonat__pb2.Message.SerializeToString,
autonat__pb2.Message.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
request_serializer=autonat__pb2.Message.SerializeToString,
response_deserializer=autonat__pb2.Message.FromString,
_registered_method=True,
)(
request,
timeout=timeout,
metadata=metadata,
wait_for_ready=wait_for_ready,
)

View File

@ -3,6 +3,7 @@ from collections.abc import (
Sequence,
)
from contextlib import (
AbstractAsyncContextManager,
asynccontextmanager,
)
import logging
@ -88,14 +89,14 @@ class BasicHost(IHost):
def __init__(
self,
network: INetworkService,
default_protocols: "OrderedDict[TProtocol, StreamHandlerFn]" = None,
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
) -> None:
self._network = network
self._network.set_stream_handler(self._swarm_stream_handler)
self.peerstore = self._network.peerstore
# Protocol muxing
default_protocols = default_protocols or get_default_protocols(self)
self.multiselect = Multiselect(default_protocols)
self.multiselect = Multiselect(dict(default_protocols.items()))
self.multiselect_client = MultiselectClient()
def get_id(self) -> ID:
@ -147,19 +148,23 @@ class BasicHost(IHost):
"""
return list(self._network.connections.keys())
@asynccontextmanager
async def run(
def run(
self, listen_addrs: Sequence[multiaddr.Multiaddr]
) -> AsyncIterator[None]:
) -> AbstractAsyncContextManager[None]:
"""
Run the host instance and listen to ``listen_addrs``.
:param listen_addrs: a sequence of multiaddrs that we want to listen to
"""
network = self.get_network()
async with background_trio_service(network):
await network.listen(*listen_addrs)
yield
@asynccontextmanager
async def _run() -> AsyncIterator[None]:
network = self.get_network()
async with background_trio_service(network):
await network.listen(*listen_addrs)
yield
return _run()
def set_stream_handler(
self, protocol_id: TProtocol, stream_handler: StreamHandlerFn
@ -258,6 +263,15 @@ class BasicHost(IHost):
await net_stream.reset()
return
net_stream.set_protocol(protocol)
if handler is None:
logger.debug(
"no handler for protocol %s, closing stream from peer %s",
protocol,
net_stream.muxed_conn.peer_id,
)
await net_stream.reset()
return
await handler(net_stream)
def get_live_peers(self) -> list[ID]:
@ -277,7 +291,7 @@ class BasicHost(IHost):
"""
return peer_id in self._network.connections
def get_peer_connection_info(self, peer_id: ID) -> Optional[INetConn]:
def get_peer_connection_info(self, peer_id: ID) -> INetConn | None:
"""
Get connection information for a specific peer if connected.

View File

@ -9,13 +9,13 @@ from libp2p.abc import (
IHost,
)
from libp2p.host.ping import (
ID as PingID,
handle_ping,
)
from libp2p.host.ping import ID as PingID
from libp2p.identity.identify.identify import (
ID as IdentifyID,
identify_handler_for,
)
from libp2p.identity.identify.identify import ID as IdentifyID
if TYPE_CHECKING:
from libp2p.custom_types import (

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Optional,
)
from multiaddr import (
Multiaddr,
@ -40,8 +37,8 @@ def _multiaddr_to_bytes(maddr: Multiaddr) -> bytes:
def _remote_address_to_multiaddr(
remote_address: Optional[tuple[str, int]]
) -> Optional[Multiaddr]:
remote_address: tuple[str, int] | None,
) -> Multiaddr | None:
"""Convert a (host, port) tuple to a Multiaddr."""
if remote_address is None:
return None
@ -58,7 +55,7 @@ def _remote_address_to_multiaddr(
def _mk_identify_protobuf(
host: IHost, observed_multiaddr: Optional[Multiaddr]
host: IHost, observed_multiaddr: Multiaddr | None
) -> Identify:
public_key = host.get_public_key()
laddrs = host.get_addrs()
@ -81,15 +78,14 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn:
peer_id = (
stream.muxed_conn.peer_id
) # remote peer_id is in class Mplex (mplex.py )
observed_multiaddr: Multiaddr | None = None
# Get the remote address
try:
remote_address = stream.get_remote_address()
# Convert to multiaddr
if remote_address:
observed_multiaddr = _remote_address_to_multiaddr(remote_address)
else:
observed_multiaddr = None
logger.debug(
"Connection from remote peer %s, address: %s, multiaddr: %s",
peer_id,

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Optional,
)
from multiaddr import (
Multiaddr,
@ -135,7 +132,7 @@ async def _update_peerstore_from_identify(
async def push_identify_to_peer(
host: IHost, peer_id: ID, observed_multiaddr: Optional[Multiaddr] = None
host: IHost, peer_id: ID, observed_multiaddr: Multiaddr | None = None
) -> bool:
"""
Push an identify message to a specific peer.
@ -172,8 +169,8 @@ async def push_identify_to_peer(
async def push_identify_to_peers(
host: IHost,
peer_ids: Optional[set[ID]] = None,
observed_multiaddr: Optional[Multiaddr] = None,
peer_ids: set[ID] | None = None,
observed_multiaddr: Multiaddr | None = None,
) -> None:
"""
Push an identify message to multiple peers in parallel.

View File

@ -2,27 +2,22 @@ from abc import (
ABC,
abstractmethod,
)
from typing import (
Optional,
)
from typing import Any
class Closer(ABC):
@abstractmethod
async def close(self) -> None:
...
async def close(self) -> None: ...
class Reader(ABC):
@abstractmethod
async def read(self, n: int = None) -> bytes:
...
async def read(self, n: int | None = None) -> bytes: ...
class Writer(ABC):
@abstractmethod
async def write(self, data: bytes) -> None:
...
async def write(self, data: bytes) -> None: ...
class WriteCloser(Writer, Closer):
@ -39,7 +34,7 @@ class ReadWriter(Reader, Writer):
class ReadWriteCloser(Reader, Writer, Closer):
@abstractmethod
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""
Return the remote address of the connected peer.
@ -50,14 +45,12 @@ class ReadWriteCloser(Reader, Writer, Closer):
class MsgReader(ABC):
@abstractmethod
async def read_msg(self) -> bytes:
...
async def read_msg(self) -> bytes: ...
class MsgWriter(ABC):
@abstractmethod
async def write_msg(self, msg: bytes) -> None:
...
async def write_msg(self, msg: bytes) -> None: ...
class MsgReadWriteCloser(MsgReader, MsgWriter, Closer):
@ -66,19 +59,26 @@ class MsgReadWriteCloser(MsgReader, MsgWriter, Closer):
class Encrypter(ABC):
@abstractmethod
def encrypt(self, data: bytes) -> bytes:
...
def encrypt(self, data: bytes) -> bytes: ...
@abstractmethod
def decrypt(self, data: bytes) -> bytes:
...
def decrypt(self, data: bytes) -> bytes: ...
class EncryptedMsgReadWriter(MsgReadWriteCloser, Encrypter):
"""Read/write message with encryption/decryption."""
def get_remote_address(self) -> Optional[tuple[str, int]]:
conn: Any | None
def __init__(self, conn: Any | None = None):
self.conn = conn
def get_remote_address(self) -> tuple[str, int] | None:
"""Get remote address if supported by the underlying connection."""
if hasattr(self, "conn") and hasattr(self.conn, "get_remote_address"):
if (
self.conn is not None
and hasattr(self, "conn")
and hasattr(self.conn, "get_remote_address")
):
return self.conn.get_remote_address()
return None

View File

@ -5,6 +5,7 @@ from that repo: "a simple package to r/w length-delimited slices."
NOTE: currently missing the capability to indicate lengths by "varint" method.
"""
from abc import (
abstractmethod,
)
@ -60,12 +61,10 @@ class BaseMsgReadWriter(MsgReadWriteCloser):
return await read_exactly(self.read_write_closer, length)
@abstractmethod
async def next_msg_len(self) -> int:
...
async def next_msg_len(self) -> int: ...
@abstractmethod
def encode_msg(self, msg: bytes) -> bytes:
...
def encode_msg(self, msg: bytes) -> bytes: ...
async def close(self) -> None:
await self.read_write_closer.close()

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Optional,
)
import trio
@ -34,7 +31,7 @@ class TrioTCPStream(ReadWriteCloser):
except (trio.ClosedResourceError, trio.BrokenResourceError) as error:
raise IOException from error
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
async with self.read_lock:
if n is not None and n == 0:
return b""
@ -46,7 +43,7 @@ class TrioTCPStream(ReadWriteCloser):
async def close(self) -> None:
await self.stream.aclose()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Return the remote address as (host, port) tuple."""
try:
return self.stream.socket.getpeername()

View File

@ -14,12 +14,14 @@ async def read_exactly(
"""
NOTE: relying on exceptions to break out on erroneous conditions, like EOF
"""
data = await reader.read(n)
buffer = bytearray()
buffer.extend(await reader.read(n))
for _ in range(retry_count):
if len(data) < n:
remaining = n - len(data)
data += await reader.read(remaining)
if len(buffer) < n:
remaining = n - len(buffer)
buffer.extend(await reader.read(remaining))
else:
return data
raise IncompleteReadError({"requested_count": n, "received_count": len(data)})
return bytes(buffer)
raise IncompleteReadError({"requested_count": n, "received_count": len(buffer)})

View File

@ -1,7 +1,3 @@
from typing import (
Optional,
)
from libp2p.abc import (
IRawConnection,
)
@ -32,7 +28,7 @@ class RawConnection(IRawConnection):
except IOException as error:
raise RawConnError from error
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
"""
Read up to ``n`` bytes from the underlying stream. This call is
delegated directly to the underlying ``self.reader``.
@ -47,6 +43,6 @@ class RawConnection(IRawConnection):
async def close(self) -> None:
await self.stream.close()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the underlying stream's get_remote_address method."""
return self.stream.get_remote_address()

View File

@ -22,7 +22,7 @@ if TYPE_CHECKING:
"""
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go # noqa: E501
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
"""
@ -32,7 +32,11 @@ class SwarmConn(INetConn):
streams: set[NetStream]
event_closed: trio.Event
def __init__(self, muxed_conn: IMuxedConn, swarm: "Swarm") -> None:
def __init__(
self,
muxed_conn: IMuxedConn,
swarm: "Swarm",
) -> None:
self.muxed_conn = muxed_conn
self.swarm = swarm
self.streams = set()
@ -40,7 +44,7 @@ class SwarmConn(INetConn):
self.event_started = trio.Event()
if hasattr(muxed_conn, "on_close"):
logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}")
muxed_conn.on_close = self._on_muxed_conn_closed
setattr(muxed_conn, "on_close", self._on_muxed_conn_closed)
else:
logging.error(
f"muxed_conn for peer {muxed_conn.peer_id} has no on_close attribute"

View File

@ -1,7 +1,3 @@
from typing import (
Optional,
)
from libp2p.abc import (
IMuxedStream,
INetStream,
@ -28,14 +24,14 @@ from .exceptions import (
# - Reference: https://github.com/libp2p/go-libp2p-swarm/blob/99831444e78c8f23c9335c17d8f7c700ba25ca14/swarm_stream.go # noqa: E501
class NetStream(INetStream):
muxed_stream: IMuxedStream
protocol_id: Optional[TProtocol]
protocol_id: TProtocol | None
def __init__(self, muxed_stream: IMuxedStream) -> None:
self.muxed_stream = muxed_stream
self.muxed_conn = muxed_stream.muxed_conn
self.protocol_id = None
def get_protocol(self) -> TProtocol:
def get_protocol(self) -> TProtocol | None:
"""
:return: protocol id that stream runs on
"""
@ -47,7 +43,7 @@ class NetStream(INetStream):
"""
self.protocol_id = protocol_id
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
"""
Read from stream.
@ -79,7 +75,7 @@ class NetStream(INetStream):
async def reset(self) -> None:
await self.muxed_stream.reset()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the underlying muxed stream."""
return self.muxed_stream.get_remote_address()

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Optional,
)
from multiaddr import (
Multiaddr,
@ -75,7 +72,7 @@ class Swarm(Service, INetworkService):
connections: dict[ID, INetConn]
listeners: dict[str, IListener]
common_stream_handler: StreamHandlerFn
listener_nursery: Optional[trio.Nursery]
listener_nursery: trio.Nursery | None
event_listener_nursery_created: trio.Event
notifees: list[INotifee]
@ -340,7 +337,9 @@ class Swarm(Service, INetworkService):
if hasattr(self, "transport") and self.transport is not None:
# Check if transport has close method before calling it
if hasattr(self.transport, "close"):
await self.transport.close()
await self.transport.close() # type: ignore
# Ignoring the type above since `transport` may not have a close method
# and we have already checked it with hasattr
logger.debug("swarm successfully closed")
@ -360,7 +359,11 @@ class Swarm(Service, INetworkService):
and start to monitor the connection for its new streams and
disconnection.
"""
swarm_conn = SwarmConn(muxed_conn, self)
swarm_conn = SwarmConn(
muxed_conn,
self,
)
self.manager.run_task(muxed_conn.start)
await muxed_conn.event_started.wait()
self.manager.run_task(swarm_conn.start)

View File

@ -1,7 +1,4 @@
import hashlib
from typing import (
Union,
)
import base58
import multihash
@ -24,7 +21,7 @@ if ENABLE_INLINING:
_digest: bytes
def __init__(self) -> None:
self._digest = bytearray()
self._digest = b""
def update(self, input: bytes) -> None:
self._digest += input
@ -39,8 +36,8 @@ if ENABLE_INLINING:
class ID:
_bytes: bytes
_xor_id: int = None
_b58_str: str = None
_xor_id: int | None = None
_b58_str: str | None = None
def __init__(self, peer_id_bytes: bytes) -> None:
self._bytes = peer_id_bytes
@ -93,7 +90,7 @@ class ID:
return cls(mh_digest.encode())
def sha256_digest(data: Union[str, bytes]) -> bytes:
def sha256_digest(data: str | bytes) -> bytes:
if isinstance(data, str):
data = data.encode("utf8")
return hashlib.sha256(data).digest()

View File

@ -1,9 +1,7 @@
from collections.abc import (
Sequence,
)
from typing import (
Any,
)
from typing import Any
from multiaddr import (
Multiaddr,
@ -19,8 +17,8 @@ from libp2p.crypto.keys import (
class PeerData(IPeerData):
pubkey: PublicKey
privkey: PrivateKey
pubkey: PublicKey | None
privkey: PrivateKey | None
metadata: dict[Any, Any]
protocols: list[str]
addrs: list[Multiaddr]

View File

@ -32,21 +32,31 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
if not addr:
raise InvalidAddrError("`addr` should not be `None`")
parts = addr.split()
parts: list[multiaddr.Multiaddr] = addr.split()
if not parts:
raise InvalidAddrError(
f"`parts`={parts} should at least have a protocol `P_P2P`"
)
p2p_part = parts[-1]
last_protocol_code = p2p_part.protocols()[0].code
if last_protocol_code != multiaddr.protocols.P_P2P:
p2p_protocols = p2p_part.protocols()
if not p2p_protocols:
raise InvalidAddrError("The last part of the address has no protocols")
last_protocol = p2p_protocols[0]
if last_protocol is None:
raise InvalidAddrError("The last protocol is None")
last_protocol_code = last_protocol.code
if last_protocol_code != multiaddr.multiaddr.protocols.P_P2P:
raise InvalidAddrError(
f"The last protocol should be `P_P2P` instead of `{last_protocol_code}`"
)
# make sure the /p2p value parses as a peer.ID
peer_id_str: str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P)
peer_id_str = p2p_part.value_for_protocol(multiaddr.multiaddr.protocols.P_P2P)
if peer_id_str is None:
raise InvalidAddrError("Missing value for /p2p protocol in multiaddr")
peer_id: ID = ID.from_base58(peer_id_str)
# we might have received just an / p2p part, which means there's no addr.

View File

@ -23,16 +23,20 @@ class Multiselect(IMultiselectMuxer):
communication.
"""
handlers: dict[TProtocol, StreamHandlerFn]
handlers: dict[TProtocol | None, StreamHandlerFn | None]
def __init__(
self, default_handlers: dict[TProtocol, StreamHandlerFn] = None
self,
default_handlers: None
| (dict[TProtocol | None, StreamHandlerFn | None]) = None,
) -> None:
if not default_handlers:
default_handlers = {}
self.handlers = default_handlers
def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:
def add_handler(
self, protocol: TProtocol | None, handler: StreamHandlerFn | None
) -> None:
"""
Store the handler with the given protocol.
@ -41,9 +45,10 @@ class Multiselect(IMultiselectMuxer):
"""
self.handlers[protocol] = handler
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent
async def negotiate(
self, communicator: IMultiselectCommunicator
) -> tuple[TProtocol, StreamHandlerFn]:
) -> tuple[TProtocol, StreamHandlerFn | None]:
"""
Negotiate performs protocol selection.
@ -60,7 +65,7 @@ class Multiselect(IMultiselectMuxer):
raise MultiselectError() from error
if command == "ls":
supported_protocols = list(self.handlers.keys())
supported_protocols = [p for p in self.handlers.keys() if p is not None]
response = "\n".join(supported_protocols) + "\n"
try:
@ -82,6 +87,8 @@ class Multiselect(IMultiselectMuxer):
except MultiselectCommunicatorError as error:
raise MultiselectError() from error
raise MultiselectError("Negotiation failed: no matching protocol")
async def handshake(self, communicator: IMultiselectCommunicator) -> None:
"""
Perform handshake to agree on multiselect protocol.

View File

@ -22,6 +22,9 @@ from libp2p.utils import (
encode_varint_prefixed,
)
from .exceptions import (
PubsubRouterError,
)
from .pb import (
rpc_pb2,
)
@ -37,7 +40,7 @@ logger = logging.getLogger("libp2p.pubsub.floodsub")
class FloodSub(IPubsubRouter):
protocols: list[TProtocol]
pubsub: Pubsub
pubsub: Pubsub | None
def __init__(self, protocols: Sequence[TProtocol]) -> None:
self.protocols = list(protocols)
@ -58,7 +61,7 @@ class FloodSub(IPubsubRouter):
"""
self.pubsub = pubsub
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None:
"""
Notifies the router that a new peer has been connected.
@ -108,17 +111,22 @@ class FloodSub(IPubsubRouter):
logger.debug("publishing message %s", pubsub_msg)
if self.pubsub is None:
raise PubsubRouterError("pubsub not attached to this instance")
else:
pubsub = self.pubsub
for peer_id in peers_gen:
if peer_id not in self.pubsub.peers:
if peer_id not in pubsub.peers:
continue
stream = self.pubsub.peers[peer_id]
stream = pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
pubsub._handle_dead_peer(peer_id)
async def join(self, topic: str) -> None:
"""
@ -150,12 +158,16 @@ class FloodSub(IPubsubRouter):
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
if self.pubsub is None:
raise PubsubRouterError("pubsub not attached to this instance")
else:
pubsub = self.pubsub
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
if topic not in pubsub.peer_topics:
continue
for peer_id in self.pubsub.peer_topics[topic]:
for peer_id in pubsub.peer_topics[topic]:
if peer_id in (msg_forwarder, origin):
continue
if peer_id not in self.pubsub.peers:
if peer_id not in pubsub.peers:
continue
yield peer_id

View File

@ -67,7 +67,7 @@ logger = logging.getLogger("libp2p.pubsub.gossipsub")
class GossipSub(IPubsubRouter, Service):
protocols: list[TProtocol]
pubsub: Pubsub
pubsub: Pubsub | None
degree: int
degree_high: int
@ -98,7 +98,7 @@ class GossipSub(IPubsubRouter, Service):
degree: int,
degree_low: int,
degree_high: int,
direct_peers: Sequence[PeerInfo] = None,
direct_peers: Sequence[PeerInfo] | None = None,
time_to_live: int = 60,
gossip_window: int = 3,
gossip_history: int = 5,
@ -141,8 +141,6 @@ class GossipSub(IPubsubRouter, Service):
self.time_since_last_publish = {}
async def run(self) -> None:
if self.pubsub is None:
raise NoPubsubAttached
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
self.manager.run_daemon_task(self.direct_connect_heartbeat)
@ -173,7 +171,7 @@ class GossipSub(IPubsubRouter, Service):
logger.debug("attached to pusub")
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None:
"""
Notifies the router that a new peer has been connected.
@ -182,6 +180,9 @@ class GossipSub(IPubsubRouter, Service):
"""
logger.debug("adding peer %s with protocol %s", peer_id, protocol_id)
if protocol_id is None:
raise ValueError("Protocol cannot be None")
if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID):
# We should never enter here. Becuase the `protocol_id` is registered by
# your pubsub instance in multistream-select, but it is not the protocol
@ -243,6 +244,8 @@ class GossipSub(IPubsubRouter, Service):
logger.debug("publishing message %s", pubsub_msg)
for peer_id in peers_gen:
if self.pubsub is None:
raise NoPubsubAttached
if peer_id not in self.pubsub.peers:
continue
stream = self.pubsub.peers[peer_id]
@ -269,6 +272,8 @@ class GossipSub(IPubsubRouter, Service):
"""
send_to: set[ID] = set()
for topic in topic_ids:
if self.pubsub is None:
raise NoPubsubAttached
if topic not in self.pubsub.peer_topics:
continue
@ -318,6 +323,9 @@ class GossipSub(IPubsubRouter, Service):
:param topic: topic to join
"""
if self.pubsub is None:
raise NoPubsubAttached
logger.debug("joining topic %s", topic)
if topic in self.mesh:
@ -468,6 +476,8 @@ class GossipSub(IPubsubRouter, Service):
await trio.sleep(self.direct_connect_initial_delay)
while True:
for direct_peer in self.direct_peers:
if self.pubsub is None:
raise NoPubsubAttached
if direct_peer not in self.pubsub.peers:
try:
await self.pubsub.host.connect(self.direct_peers[direct_peer])
@ -485,6 +495,8 @@ class GossipSub(IPubsubRouter, Service):
peers_to_graft: DefaultDict[ID, list[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, list[str]] = defaultdict(list)
for topic in self.mesh:
if self.pubsub is None:
raise NoPubsubAttached
# Skip if no peers have subscribed to the topic
if topic not in self.pubsub.peer_topics:
continue
@ -520,7 +532,8 @@ class GossipSub(IPubsubRouter, Service):
# Note: the comments here are the exact pseudocode from the spec
for topic in list(self.fanout):
if (
topic not in self.pubsub.peer_topics
self.pubsub is not None
and topic not in self.pubsub.peer_topics
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
< int(time.time())
):
@ -529,11 +542,14 @@ class GossipSub(IPubsubRouter, Service):
else:
# Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
in_topic_fanout_peers = [
peer
for peer in self.fanout[topic]
if peer in self.pubsub.peer_topics[topic]
]
in_topic_fanout_peers: list[ID] = []
if self.pubsub is not None:
in_topic_fanout_peers = [
peer
for peer in self.fanout[topic]
if peer in self.pubsub.peer_topics[topic]
]
self.fanout[topic] = set(in_topic_fanout_peers)
num_fanout_peers_in_topic = len(self.fanout[topic])
@ -553,6 +569,8 @@ class GossipSub(IPubsubRouter, Service):
for topic in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
if self.pubsub is None:
raise NoPubsubAttached
# Get all pubsub peers in a topic and only add them if they are
# gossipsub peers too
if topic in self.pubsub.peer_topics:
@ -572,6 +590,8 @@ class GossipSub(IPubsubRouter, Service):
for topic in self.fanout:
msg_ids = self.mcache.window(topic)
if msg_ids:
if self.pubsub is None:
raise NoPubsubAttached
# Get all pubsub peers in topic and only add if they are
# gossipsub peers also
if topic in self.pubsub.peer_topics:
@ -620,6 +640,8 @@ class GossipSub(IPubsubRouter, Service):
def _get_in_topic_gossipsub_peers_from_minus(
self, topic: str, num_to_select: int, minus: Iterable[ID]
) -> list[ID]:
if self.pubsub is None:
raise NoPubsubAttached
gossipsub_peers_in_topic = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
@ -633,6 +655,8 @@ class GossipSub(IPubsubRouter, Service):
self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID
) -> None:
"""Checks the seen set and requests unknown messages with an IWANT message."""
if self.pubsub is None:
raise NoPubsubAttached
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in
# seen_messages cache
seen_seqnos_and_peers = [
@ -665,7 +689,7 @@ class GossipSub(IPubsubRouter, Service):
msgs_to_forward: list[rpc_pb2.Message] = []
for msg_id_iwant in msg_ids:
# Check if the wanted message ID is present in mcache
msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant)
msg: rpc_pb2.Message | None = self.mcache.get(msg_id_iwant)
# Cache hit
if msg:
@ -683,6 +707,8 @@ class GossipSub(IPubsubRouter, Service):
# 2) Serialize that packet
rpc_msg: bytes = packet.SerializeToString()
if self.pubsub is None:
raise NoPubsubAttached
# 3) Get the stream to this peer
if sender_peer_id not in self.pubsub.peers:
@ -737,9 +763,9 @@ class GossipSub(IPubsubRouter, Service):
def pack_control_msgs(
self,
ihave_msgs: list[rpc_pb2.ControlIHave],
graft_msgs: list[rpc_pb2.ControlGraft],
prune_msgs: list[rpc_pb2.ControlPrune],
ihave_msgs: list[rpc_pb2.ControlIHave] | None,
graft_msgs: list[rpc_pb2.ControlGraft] | None,
prune_msgs: list[rpc_pb2.ControlPrune] | None,
) -> rpc_pb2.ControlMessage:
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
if ihave_msgs:
@ -771,7 +797,7 @@ class GossipSub(IPubsubRouter, Service):
await self.emit_control_message(control_msg, to_peer)
async def emit_graft(self, topic: str, to_peer: ID) -> None:
async def emit_graft(self, topic: str, id: ID) -> None:
"""Emit graft message, sent to to_peer, for topic."""
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
graft_msg.topicID = topic
@ -779,9 +805,9 @@ class GossipSub(IPubsubRouter, Service):
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.graft.extend([graft_msg])
await self.emit_control_message(control_msg, to_peer)
await self.emit_control_message(control_msg, id)
async def emit_prune(self, topic: str, to_peer: ID) -> None:
async def emit_prune(self, topic: str, id: ID) -> None:
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
@ -789,11 +815,13 @@ class GossipSub(IPubsubRouter, Service):
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])
await self.emit_control_message(control_msg, to_peer)
await self.emit_control_message(control_msg, id)
async def emit_control_message(
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
) -> None:
if self.pubsub is None:
raise NoPubsubAttached
# Add control message to packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg)

View File

@ -1,9 +1,6 @@
from collections.abc import (
Sequence,
)
from typing import (
Optional,
)
from .pb import (
rpc_pb2,
@ -66,7 +63,7 @@ class MessageCache:
self.history[0].append(CacheEntry(mid, msg.topicIDs))
def get(self, mid: tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]:
def get(self, mid: tuple[bytes, bytes]) -> rpc_pb2.Message | None:
"""
Get a message from the mcache.

View File

@ -4,6 +4,7 @@ from __future__ import (
import base64
from collections.abc import (
Callable,
KeysView,
)
import functools
@ -11,7 +12,6 @@ import hashlib
import logging
import time
from typing import (
Callable,
NamedTuple,
cast,
)
@ -53,6 +53,9 @@ from libp2p.network.stream.exceptions import (
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerdata import (
PeerDataError,
)
from libp2p.tools.async_service import (
Service,
)
@ -120,7 +123,7 @@ class Pubsub(Service, IPubsub):
# Indicate if we should enforce signature verification
strict_signing: bool
sign_key: PrivateKey
sign_key: PrivateKey | None
# Set of blacklisted peer IDs
blacklisted_peers: set[ID]
@ -132,7 +135,7 @@ class Pubsub(Service, IPubsub):
self,
host: IHost,
router: IPubsubRouter,
cache_size: int = None,
cache_size: int | None = None,
seen_ttl: int = 120,
sweep_interval: int = 60,
strict_signing: bool = True,
@ -634,6 +637,9 @@ class Pubsub(Service, IPubsub):
if self.strict_signing:
priv_key = self.sign_key
if priv_key is None:
raise PeerDataError("private key not found")
signature = priv_key.sign(
PUBSUB_SIGNING_PREFIX.encode() + msg.SerializeToString()
)

View File

@ -1,7 +1,3 @@
from typing import (
Optional,
)
from libp2p.abc import (
ISecureConn,
)
@ -49,5 +45,5 @@ class BaseSession(ISecureConn):
def get_remote_peer(self) -> ID:
return self.remote_peer
def get_remote_public_key(self) -> Optional[PublicKey]:
def get_remote_public_key(self) -> PublicKey:
return self.remote_permanent_pubkey

View File

@ -1,7 +1,7 @@
import secrets
from typing import (
from collections.abc import (
Callable,
)
import secrets
from libp2p.abc import (
ISecureTransport,

View File

@ -1,7 +1,3 @@
from typing import (
Optional,
)
from libp2p.abc import (
IRawConnection,
ISecureConn,
@ -87,13 +83,13 @@ class InsecureSession(BaseSession):
async def write(self, data: bytes) -> None:
await self.conn.write(data)
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
return await self.conn.read(n)
async def close(self) -> None:
await self.conn.close()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""
Delegate to the underlying connection's get_remote_address method.
"""
@ -105,7 +101,7 @@ async def run_handshake(
local_private_key: PrivateKey,
conn: IRawConnection,
is_initiator: bool,
remote_peer_id: ID,
remote_peer_id: ID | None,
) -> ISecureConn:
"""Raise `HandshakeFailure` when handshake failed."""
msg = make_exchange_message(local_private_key.get_public_key())
@ -124,6 +120,15 @@ async def run_handshake(
remote_msg.ParseFromString(remote_msg_bytes)
received_peer_id = ID(remote_msg.id)
# Verify that `remote_peer_id` isn't `None`
# That is the only condition that `remote_peer_id` would not need to be checked
# against the `recieved_peer_id` gotten from the outbound/recieved `msg`.
# The check against `received_peer_id` happens in the next if-block
if is_initiator and remote_peer_id is None:
raise HandshakeFailure(
"remote peer ID cannot be None if `is_initiator` is set to `True`"
)
# Verify if the receive `ID` matches the one we originally initialize the session.
# We only need to check it when we are the initiator, because only in that condition
# we possibly knows the `ID` of the remote.

View File

@ -1,5 +1,4 @@
from typing import (
Optional,
cast,
)
@ -10,7 +9,6 @@ from libp2p.abc import (
)
from libp2p.io.abc import (
EncryptedMsgReadWriter,
MsgReadWriteCloser,
ReadWriteCloser,
)
from libp2p.io.msgio import (
@ -40,7 +38,7 @@ class BaseNoiseMsgReadWriter(EncryptedMsgReadWriter):
implemented by the subclasses.
"""
read_writer: MsgReadWriteCloser
read_writer: NoisePacketReadWriter
noise_state: NoiseState
# FIXME: This prefix is added in msg#3 in Go. Check whether it's a desired behavior.
@ -50,12 +48,12 @@ class BaseNoiseMsgReadWriter(EncryptedMsgReadWriter):
self.read_writer = NoisePacketReadWriter(cast(ReadWriteCloser, conn))
self.noise_state = noise_state
async def write_msg(self, data: bytes, prefix_encoded: bool = False) -> None:
data_encrypted = self.encrypt(data)
async def write_msg(self, msg: bytes, prefix_encoded: bool = False) -> None:
data_encrypted = self.encrypt(msg)
if prefix_encoded:
await self.read_writer.write_msg(self.prefix + data_encrypted)
else:
await self.read_writer.write_msg(data_encrypted)
# Manually add the prefix if needed
data_encrypted = self.prefix + data_encrypted
await self.read_writer.write_msg(data_encrypted)
async def read_msg(self, prefix_encoded: bool = False) -> bytes:
noise_msg_encrypted = await self.read_writer.read_msg()
@ -67,10 +65,11 @@ class BaseNoiseMsgReadWriter(EncryptedMsgReadWriter):
async def close(self) -> None:
await self.read_writer.close()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
# Delegate to the underlying connection if possible
if hasattr(self.read_writer, "read_write_closer") and hasattr(
self.read_writer.read_write_closer, "get_remote_address"
self.read_writer.read_write_closer,
"get_remote_address",
):
return self.read_writer.read_write_closer.get_remote_address()
return None
@ -78,7 +77,7 @@ class BaseNoiseMsgReadWriter(EncryptedMsgReadWriter):
class NoiseHandshakeReadWriter(BaseNoiseMsgReadWriter):
def encrypt(self, data: bytes) -> bytes:
return self.noise_state.write_message(data)
return bytes(self.noise_state.write_message(data))
def decrypt(self, data: bytes) -> bytes:
return bytes(self.noise_state.read_message(data))

View File

@ -19,7 +19,7 @@ SIGNED_DATA_PREFIX = "noise-libp2p-static-key:"
class NoiseHandshakePayload:
id_pubkey: PublicKey
id_sig: bytes
early_data: bytes = None
early_data: bytes | None = None
def serialize(self) -> bytes:
msg = noise_pb.NoiseHandshakePayload(

View File

@ -7,8 +7,10 @@ from cryptography.hazmat.primitives import (
serialization,
)
from noise.backends.default.keypairs import KeyPair as NoiseKeyPair
from noise.connection import Keypair as NoiseKeypairEnum
from noise.connection import NoiseConnection as NoiseState
from noise.connection import (
Keypair as NoiseKeypairEnum,
NoiseConnection as NoiseState,
)
from libp2p.abc import (
IRawConnection,
@ -47,14 +49,12 @@ from .messages import (
class IPattern(ABC):
@abstractmethod
async def handshake_inbound(self, conn: IRawConnection) -> ISecureConn:
...
async def handshake_inbound(self, conn: IRawConnection) -> ISecureConn: ...
@abstractmethod
async def handshake_outbound(
self, conn: IRawConnection, remote_peer: ID
) -> ISecureConn:
...
) -> ISecureConn: ...
class BasePattern(IPattern):
@ -62,13 +62,15 @@ class BasePattern(IPattern):
noise_static_key: PrivateKey
local_peer: ID
libp2p_privkey: PrivateKey
early_data: bytes
early_data: bytes | None
def create_noise_state(self) -> NoiseState:
noise_state = NoiseState.from_name(self.protocol_name)
noise_state.set_keypair_from_private_bytes(
NoiseKeypairEnum.STATIC, self.noise_static_key.to_bytes()
)
if noise_state.noise_protocol is None:
raise NoiseStateError("noise_protocol is not initialized")
return noise_state
def make_handshake_payload(self) -> NoiseHandshakePayload:
@ -84,7 +86,7 @@ class PatternXX(BasePattern):
local_peer: ID,
libp2p_privkey: PrivateKey,
noise_static_key: PrivateKey,
early_data: bytes = None,
early_data: bytes | None = None,
) -> None:
self.protocol_name = b"Noise_XX_25519_ChaChaPoly_SHA256"
self.local_peer = local_peer
@ -96,7 +98,12 @@ class PatternXX(BasePattern):
noise_state = self.create_noise_state()
noise_state.set_as_responder()
noise_state.start_handshake()
if noise_state.noise_protocol is None:
raise NoiseStateError("noise_protocol is not initialized")
handshake_state = noise_state.noise_protocol.handshake_state
if handshake_state is None:
raise NoiseStateError("Handshake state is not initialized")
read_writer = NoiseHandshakeReadWriter(conn, noise_state)
# Consume msg#1.
@ -145,7 +152,11 @@ class PatternXX(BasePattern):
read_writer = NoiseHandshakeReadWriter(conn, noise_state)
noise_state.set_as_initiator()
noise_state.start_handshake()
if noise_state.noise_protocol is None:
raise NoiseStateError("noise_protocol is not initialized")
handshake_state = noise_state.noise_protocol.handshake_state
if handshake_state is None:
raise NoiseStateError("Handshake state is not initialized")
# Send msg#1, which is *not* encrypted.
msg_1 = b""
@ -195,6 +206,8 @@ class PatternXX(BasePattern):
@staticmethod
def _get_pubkey_from_noise_keypair(key_pair: NoiseKeyPair) -> PublicKey:
# Use `Ed25519PublicKey` since 25519 is used in our pattern.
if key_pair.public is None:
raise NoiseStateError("public key is not initialized")
raw_bytes = key_pair.public.public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)

View File

@ -26,7 +26,7 @@ class Transport(ISecureTransport):
libp2p_privkey: PrivateKey
noise_privkey: PrivateKey
local_peer: ID
early_data: bytes
early_data: bytes | None
with_noise_pipes: bool
# NOTE: Implementations that support Noise Pipes must decide whether to use
@ -37,8 +37,8 @@ class Transport(ISecureTransport):
def __init__(
self,
libp2p_keypair: KeyPair,
noise_privkey: PrivateKey = None,
early_data: bytes = None,
noise_privkey: PrivateKey,
early_data: bytes | None = None,
with_noise_pipes: bool = False,
) -> None:
self.libp2p_privkey = libp2p_keypair.private_key

View File

@ -2,9 +2,6 @@ from dataclasses import (
dataclass,
)
import itertools
from typing import (
Optional,
)
import multihash
@ -14,14 +11,10 @@ from libp2p.abc import (
)
from libp2p.crypto.authenticated_encryption import (
EncryptionParameters as AuthenticatedEncryptionParameters,
)
from libp2p.crypto.authenticated_encryption import (
InvalidMACException,
)
from libp2p.crypto.authenticated_encryption import (
MacAndCipher as Encrypter,
initialize_pair as initialize_pair_for_encryption,
)
from libp2p.crypto.authenticated_encryption import MacAndCipher as Encrypter
from libp2p.crypto.ecc import (
ECCPublicKey,
)
@ -91,6 +84,8 @@ class SecioPacketReadWriter(FixedSizeLenMsgReadWriter):
class SecioMsgReadWriter(EncryptedMsgReadWriter):
read_writer: SecioPacketReadWriter
local_encrypter: Encrypter
remote_encrypter: Encrypter
def __init__(
self,
@ -213,7 +208,8 @@ async def _response_to_msg(read_writer: SecioPacketReadWriter, msg: bytes) -> by
def _mk_multihash_sha256(data: bytes) -> bytes:
return multihash.digest(data, "sha2-256")
mh = multihash.digest(data, "sha2-256")
return mh.encode()
def _mk_score(public_key: PublicKey, nonce: bytes) -> bytes:
@ -270,7 +266,7 @@ def _select_encryption_parameters(
async def _establish_session_parameters(
local_peer: PeerID,
local_private_key: PrivateKey,
remote_peer: Optional[PeerID],
remote_peer: PeerID | None,
conn: SecioPacketReadWriter,
nonce: bytes,
) -> tuple[SessionParameters, bytes]:
@ -399,7 +395,7 @@ async def create_secure_session(
local_peer: PeerID,
local_private_key: PrivateKey,
conn: IRawConnection,
remote_peer: PeerID = None,
remote_peer: PeerID | None = None,
) -> ISecureConn:
"""
Attempt the initial `secio` handshake with the remote peer.

View File

@ -1,7 +1,4 @@
import io
from typing import (
Optional,
)
from libp2p.crypto.keys import (
PrivateKey,
@ -44,7 +41,7 @@ class SecureSession(BaseSession):
self._reset_internal_buffer()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the underlying connection's get_remote_address method."""
return self.conn.get_remote_address()
@ -53,7 +50,7 @@ class SecureSession(BaseSession):
self.low_watermark = 0
self.high_watermark = 0
def _drain(self, n: int) -> bytes:
def _drain(self, n: int | None) -> bytes:
if self.low_watermark == self.high_watermark:
return b""
@ -75,7 +72,7 @@ class SecureSession(BaseSession):
self.low_watermark = 0
self.high_watermark = len(msg)
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
if n == 0:
return b""
@ -85,6 +82,9 @@ class SecureSession(BaseSession):
msg = await self.conn.read_msg()
if n is None:
return msg
if n < len(msg):
self._fill(msg)
return self._drain(n)

View File

@ -1,7 +1,4 @@
import logging
from typing import (
Optional,
)
import trio
@ -168,7 +165,7 @@ class Mplex(IMuxedConn):
raise MplexUnavailable
async def send_message(
self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID
self, flag: HeaderTags, data: bytes | None, stream_id: StreamID
) -> int:
"""
Send a message over the connection.
@ -366,6 +363,6 @@ class Mplex(IMuxedConn):
self.event_closed.set()
await self.new_stream_send_channel.aclose()
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the underlying Mplex connection's secured_conn."""
return self.secured_conn.get_remote_address()

View File

@ -3,7 +3,6 @@ from types import (
)
from typing import (
TYPE_CHECKING,
Optional,
)
import trio
@ -40,9 +39,12 @@ class MplexStream(IMuxedStream):
name: str
stream_id: StreamID
muxed_conn: "Mplex"
read_deadline: int
write_deadline: int
# NOTE: All methods used here are part of `Mplex` which is a derived
# class of IMuxedConn. Ignoring this type assignment should not pose
# any risk.
muxed_conn: "Mplex" # type: ignore[assignment]
read_deadline: int | None
write_deadline: int | None
# TODO: Add lock for read/write to avoid interleaving receiving messages?
close_lock: trio.Lock
@ -92,7 +94,7 @@ class MplexStream(IMuxedStream):
self._buf = self._buf[len(payload) :]
return bytes(payload)
def _read_return_when_blocked(self) -> bytes:
def _read_return_when_blocked(self) -> bytearray:
buf = bytearray()
while True:
try:
@ -102,7 +104,7 @@ class MplexStream(IMuxedStream):
break
return buf
async def read(self, n: int = None) -> bytes:
async def read(self, n: int | None = None) -> bytes:
"""
Read up to n bytes. Read possibly returns fewer than `n` bytes, if
there are not enough bytes in the Mplex buffer. If `n is None`, read
@ -257,7 +259,7 @@ class MplexStream(IMuxedStream):
self.write_deadline = ttl
return True
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the parent Mplex connection."""
return self.muxed_conn.get_remote_address()
@ -267,9 +269,9 @@ class MplexStream(IMuxedStream):
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()

View File

@ -95,7 +95,7 @@ class MuxerMultistream:
if protocol == PROTOCOL_ID:
async with trio.open_nursery():
def on_close() -> None:
async def on_close() -> None:
pass
return Yamux(

View File

@ -3,8 +3,10 @@ Yamux stream multiplexer implementation for py-libp2p.
This is the preferred multiplexing protocol due to its performance and feature set.
Mplex is also available for legacy compatibility but may be deprecated in the future.
"""
from collections.abc import (
Awaitable,
Callable,
)
import inspect
import logging
@ -13,8 +15,7 @@ from types import (
TracebackType,
)
from typing import (
Callable,
Optional,
Any,
)
import trio
@ -83,9 +84,9 @@ class YamuxStream(IMuxedStream):
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()
@ -126,7 +127,7 @@ class YamuxStream(IMuxedStream):
if self.send_window < DEFAULT_WINDOW_SIZE // 2:
await self.send_window_update()
async def send_window_update(self, increment: Optional[int] = None) -> None:
async def send_window_update(self, increment: int | None = None) -> None:
"""Send a window update to peer."""
if increment is None:
increment = DEFAULT_WINDOW_SIZE - self.recv_window
@ -141,7 +142,7 @@ class YamuxStream(IMuxedStream):
)
await self.conn.secured_conn.write(header)
async def read(self, n: int = -1) -> bytes:
async def read(self, n: int | None = -1) -> bytes:
# Handle None value for n by converting it to -1
if n is None:
n = -1
@ -161,8 +162,7 @@ class YamuxStream(IMuxedStream):
if buffer and len(buffer) > 0:
# Wait for closure even if data is available
logging.debug(
f"Stream {self.stream_id}:"
f"Waiting for FIN before returning data"
f"Stream {self.stream_id}:Waiting for FIN before returning data"
)
await self.conn.stream_events[self.stream_id].wait()
self.conn.stream_events[self.stream_id] = trio.Event()
@ -240,7 +240,7 @@ class YamuxStream(IMuxedStream):
"""
raise NotImplementedError("Yamux does not support setting read deadlines")
def get_remote_address(self) -> Optional[tuple[str, int]]:
def get_remote_address(self) -> tuple[str, int] | None:
"""
Returns the remote address of the underlying connection.
"""
@ -268,8 +268,8 @@ class Yamux(IMuxedConn):
self,
secured_conn: ISecureConn,
peer_id: ID,
is_initiator: Optional[bool] = None,
on_close: Optional[Callable[[], Awaitable[None]]] = None,
is_initiator: bool | None = None,
on_close: Callable[[], Awaitable[Any]] | None = None,
) -> None:
self.secured_conn = secured_conn
self.peer_id = peer_id
@ -283,7 +283,7 @@ class Yamux(IMuxedConn):
self.is_initiator_value = (
is_initiator if is_initiator is not None else secured_conn.is_initiator
)
self.next_stream_id = 1 if self.is_initiator_value else 2
self.next_stream_id: int = 1 if self.is_initiator_value else 2
self.streams: dict[int, YamuxStream] = {}
self.streams_lock = trio.Lock()
self.new_stream_send_channel: MemorySendChannel[YamuxStream]
@ -297,7 +297,7 @@ class Yamux(IMuxedConn):
self.event_started = trio.Event()
self.stream_buffers: dict[int, bytearray] = {}
self.stream_events: dict[int, trio.Event] = {}
self._nursery: Optional[Nursery] = None
self._nursery: Nursery | None = None
async def start(self) -> None:
logging.debug(f"Starting Yamux for {self.peer_id}")
@ -465,8 +465,14 @@ class Yamux(IMuxedConn):
# Wait for data if stream is still open
logging.debug(f"Waiting for data on stream {self.peer_id}:{stream_id}")
await self.stream_events[stream_id].wait()
self.stream_events[stream_id] = trio.Event()
try:
await self.stream_events[stream_id].wait()
self.stream_events[stream_id] = trio.Event()
except KeyError:
raise MuxedStreamEOF("Stream was removed")
# This line should never be reached, but satisfies the type checker
raise MuxedStreamEOF("Unexpected end of read_stream")
async def handle_incoming(self) -> None:
while not self.event_shutting_down.is_set():
@ -474,8 +480,7 @@ class Yamux(IMuxedConn):
header = await self.secured_conn.read(HEADER_SIZE)
if not header or len(header) < HEADER_SIZE:
logging.debug(
f"Connection closed or"
f"incomplete header for peer {self.peer_id}"
f"Connection closed orincomplete header for peer {self.peer_id}"
)
self.event_shutting_down.set()
await self._cleanup_on_error()
@ -544,8 +549,7 @@ class Yamux(IMuxedConn):
)
elif error_code == GO_AWAY_PROTOCOL_ERROR:
logging.error(
f"Received GO_AWAY for peer"
f"{self.peer_id}: Protocol error"
f"Received GO_AWAY for peer{self.peer_id}: Protocol error"
)
elif error_code == GO_AWAY_INTERNAL_ERROR:
logging.error(

View File

@ -1,12 +1,10 @@
# Copied from https://github.com/ethereum/async-service
import os
from typing import (
Any,
)
from typing import Any
def get_task_name(value: Any, explicit_name: str = None) -> str:
def get_task_name(value: Any, explicit_name: str | None = None) -> str:
# inline import to ensure `_utils` is always importable from the rest of
# the module.
from .abc import ( # noqa: F401

View File

@ -28,33 +28,27 @@ class TaskAPI(Hashable):
parent: Optional["TaskWithChildrenAPI"]
@abstractmethod
async def run(self) -> None:
...
async def run(self) -> None: ...
@abstractmethod
async def cancel(self) -> None:
...
async def cancel(self) -> None: ...
@property
@abstractmethod
def is_done(self) -> bool:
...
def is_done(self) -> bool: ...
@abstractmethod
async def wait_done(self) -> None:
...
async def wait_done(self) -> None: ...
class TaskWithChildrenAPI(TaskAPI):
children: set[TaskAPI]
@abstractmethod
def add_child(self, child: TaskAPI) -> None:
...
def add_child(self, child: TaskAPI) -> None: ...
@abstractmethod
def discard_child(self, child: TaskAPI) -> None:
...
def discard_child(self, child: TaskAPI) -> None: ...
class ServiceAPI(ABC):
@ -212,7 +206,11 @@ class InternalManagerAPI(ManagerAPI):
@trio_typing.takes_callable_and_args
@abstractmethod
def run_task(
self, async_fn: AsyncFn, *args: Any, daemon: bool = False, name: str = None
self,
async_fn: AsyncFn,
*args: Any,
daemon: bool = False,
name: str | None = None,
) -> None:
"""
Run a task in the background. If the function throws an exception it
@ -225,7 +223,9 @@ class InternalManagerAPI(ManagerAPI):
@trio_typing.takes_callable_and_args
@abstractmethod
def run_daemon_task(self, async_fn: AsyncFn, *args: Any, name: str = None) -> None:
def run_daemon_task(
self, async_fn: AsyncFn, *args: Any, name: str | None = None
) -> None:
"""
Run a daemon task in the background.
@ -235,7 +235,7 @@ class InternalManagerAPI(ManagerAPI):
@abstractmethod
def run_child_service(
self, service: ServiceAPI, daemon: bool = False, name: str = None
self, service: ServiceAPI, daemon: bool = False, name: str | None = None
) -> "ManagerAPI":
"""
Run a service in the background. If the function throws an exception it
@ -248,7 +248,7 @@ class InternalManagerAPI(ManagerAPI):
@abstractmethod
def run_daemon_child_service(
self, service: ServiceAPI, name: str = None
self, service: ServiceAPI, name: str | None = None
) -> "ManagerAPI":
"""
Run a daemon service in the background.

View File

@ -9,6 +9,7 @@ from collections import (
)
from collections.abc import (
Awaitable,
Callable,
Iterable,
Sequence,
)
@ -16,8 +17,6 @@ import logging
import sys
from typing import (
Any,
Callable,
Optional,
TypeVar,
cast,
)
@ -98,7 +97,7 @@ def as_service(service_fn: LogicFnType) -> type[ServiceAPI]:
class BaseTask(TaskAPI):
def __init__(
self, name: str, daemon: bool, parent: Optional[TaskWithChildrenAPI]
self, name: str, daemon: bool, parent: TaskWithChildrenAPI | None
) -> None:
# meta
self.name = name
@ -125,7 +124,7 @@ class BaseTask(TaskAPI):
class BaseTaskWithChildren(BaseTask, TaskWithChildrenAPI):
def __init__(
self, name: str, daemon: bool, parent: Optional[TaskWithChildrenAPI]
self, name: str, daemon: bool, parent: TaskWithChildrenAPI | None
) -> None:
super().__init__(name, daemon, parent)
self.children = set()
@ -142,26 +141,20 @@ T = TypeVar("T", bound="BaseFunctionTask")
class BaseFunctionTask(BaseTaskWithChildren):
@classmethod
def iterate_tasks(cls: type[T], *tasks: TaskAPI) -> Iterable[T]:
def iterate_tasks(cls, *tasks: TaskAPI) -> Iterable["BaseFunctionTask"]:
"""Iterate over all tasks of this class type and their children recursively."""
for task in tasks:
if isinstance(task, cls):
if isinstance(task, BaseFunctionTask):
yield task
else:
continue
yield from cls.iterate_tasks(
*(
child_task
for child_task in task.children
if isinstance(child_task, cls)
)
)
if isinstance(task, TaskWithChildrenAPI):
yield from cls.iterate_tasks(*task.children)
def __init__(
self,
name: str,
daemon: bool,
parent: Optional[TaskWithChildrenAPI],
parent: TaskWithChildrenAPI | None,
async_fn: AsyncFn,
async_fn_args: Sequence[Any],
) -> None:
@ -259,12 +252,15 @@ class BaseManager(InternalManagerAPI):
# Wait API
#
def run_daemon_task(
self, async_fn: Callable[..., Awaitable[Any]], *args: Any, name: str = None
self,
async_fn: Callable[..., Awaitable[Any]],
*args: Any,
name: str | None = None,
) -> None:
self.run_task(async_fn, *args, daemon=True, name=name)
def run_daemon_child_service(
self, service: ServiceAPI, name: str = None
self, service: ServiceAPI, name: str | None = None
) -> ManagerAPI:
return self.run_child_service(service, daemon=True, name=name)
@ -286,8 +282,7 @@ class BaseManager(InternalManagerAPI):
# Task Management
#
@abstractmethod
def _schedule_task(self, task: TaskAPI) -> None:
...
def _schedule_task(self, task: TaskAPI) -> None: ...
def _common_run_task(self, task: TaskAPI) -> None:
if not self.is_running:
@ -307,7 +302,7 @@ class BaseManager(InternalManagerAPI):
self._schedule_task(task)
def _add_child_task(
self, parent: Optional[TaskWithChildrenAPI], task: TaskAPI
self, parent: TaskWithChildrenAPI | None, task: TaskAPI
) -> None:
if parent is None:
all_children = self._root_tasks

View File

@ -6,7 +6,9 @@ from __future__ import (
from collections.abc import (
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Iterable,
Sequence,
)
from contextlib import (
@ -16,7 +18,6 @@ import functools
import sys
from typing import (
Any,
Callable,
Optional,
TypeVar,
cast,
@ -59,6 +60,16 @@ from .typing import (
class FunctionTask(BaseFunctionTask):
_trio_task: trio.lowlevel.Task | None = None
@classmethod
def iterate_tasks(cls, *tasks: TaskAPI) -> Iterable[FunctionTask]:
"""Iterate over all FunctionTask instances and their children recursively."""
for task in tasks:
if isinstance(task, FunctionTask):
yield task
if isinstance(task, TaskWithChildrenAPI):
yield from cls.iterate_tasks(*task.children)
def __init__(
self,
name: str,
@ -75,7 +86,7 @@ class FunctionTask(BaseFunctionTask):
# Each task gets its own `CancelScope` which is how we can manually
# control cancellation order of the task DAG
self._cancel_scope = trio.CancelScope()
self._cancel_scope = trio.CancelScope() # type: ignore[call-arg]
#
# Trio specific API
@ -309,7 +320,7 @@ class TrioManager(BaseManager):
async_fn: Callable[..., Awaitable[Any]],
*args: Any,
daemon: bool = False,
name: str = None,
name: str | None = None,
) -> None:
task = FunctionTask(
name=get_task_name(async_fn, name),
@ -322,7 +333,7 @@ class TrioManager(BaseManager):
self._common_run_task(task)
def run_child_service(
self, service: ServiceAPI, daemon: bool = False, name: str = None
self, service: ServiceAPI, daemon: bool = False, name: str | None = None
) -> ManagerAPI:
task = ChildServiceTask(
name=get_task_name(service, name),
@ -416,7 +427,12 @@ def external_api(func: TFunc) -> TFunc:
async with trio.open_nursery() as nursery:
# mypy's type hints for start_soon break with this invocation.
nursery.start_soon(
_wait_api_fn, self, func, args, kwargs, send_channel # type: ignore
_wait_api_fn, # type: ignore
self,
func,
args,
kwargs,
send_channel,
)
nursery.start_soon(_wait_finished, self, func, send_channel)
result, err = await receive_channel.receive()

View File

@ -2,13 +2,13 @@
from collections.abc import (
Awaitable,
Callable,
)
from types import (
TracebackType,
)
from typing import (
Any,
Callable,
)
EXC_INFO = tuple[type[BaseException], BaseException, TracebackType]

View File

@ -32,7 +32,7 @@ class GossipsubParams(NamedTuple):
degree: int = 10
degree_low: int = 9
degree_high: int = 11
direct_peers: Sequence[PeerInfo] = None
direct_peers: Sequence[PeerInfo] = []
time_to_live: int = 30
gossip_window: int = 3
gossip_history: int = 5

View File

@ -1,10 +1,8 @@
from collections.abc import (
Awaitable,
)
import logging
from typing import (
Callable,
)
import logging
import trio
@ -63,12 +61,12 @@ async def connect_swarm(swarm_0: Swarm, swarm_1: Swarm) -> None:
logging.debug(
"Swarm connection verification failed on attempt"
+ f" {attempt+1}, retrying..."
+ f" {attempt + 1}, retrying..."
)
except Exception as e:
last_error = e
logging.debug(f"Swarm connection attempt {attempt+1} failed: {e}")
logging.debug(f"Swarm connection attempt {attempt + 1} failed: {e}")
await trio.sleep(retry_delay)
# If we got here, all retries failed
@ -115,12 +113,12 @@ async def connect(node1: IHost, node2: IHost) -> None:
return
logging.debug(
f"Connection verification failed on attempt {attempt+1}, retrying..."
f"Connection verification failed on attempt {attempt + 1}, retrying..."
)
except Exception as e:
last_error = e
logging.debug(f"Connection attempt {attempt+1} failed: {e}")
logging.debug(f"Connection attempt {attempt + 1} failed: {e}")
await trio.sleep(retry_delay)
# If we got here, all retries failed

View File

@ -1,11 +1,9 @@
from collections.abc import (
Awaitable,
Callable,
Sequence,
)
import logging
from typing import (
Callable,
)
from multiaddr import (
Multiaddr,
@ -44,7 +42,7 @@ class TCPListener(IListener):
self.handler = handler_function
# TODO: Get rid of `nursery`?
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> None:
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool:
"""
Put listener in listening mode and wait for incoming connections.
@ -56,7 +54,7 @@ class TCPListener(IListener):
handler: Callable[[trio.SocketStream], Awaitable[None]],
port: int,
host: str,
task_status: TaskStatus[Sequence[trio.SocketListener]] = None,
task_status: TaskStatus[Sequence[trio.SocketListener]],
) -> None:
"""Just a proxy function to add logging here."""
logger.debug("serve_tcp %s %s", host, port)
@ -67,18 +65,53 @@ class TCPListener(IListener):
remote_port: int = 0
try:
tcp_stream = TrioTCPStream(stream)
remote_host, remote_port = tcp_stream.get_remote_address()
remote_tuple = tcp_stream.get_remote_address()
if remote_tuple is not None:
remote_host, remote_port = remote_tuple
await self.handler(tcp_stream)
except Exception:
logger.debug(f"Connection from {remote_host}:{remote_port} failed.")
listeners = await nursery.start(
tcp_port_str = maddr.value_for_protocol("tcp")
if tcp_port_str is None:
logger.error(f"Cannot listen: TCP port is missing in multiaddress {maddr}")
return False
try:
tcp_port = int(tcp_port_str)
except ValueError:
logger.error(
f"Cannot listen: Invalid TCP port '{tcp_port_str}' "
f"in multiaddress {maddr}"
)
return False
ip4_host_str = maddr.value_for_protocol("ip4")
# For trio.serve_tcp, ip4_host_str (as host argument) can be None,
# which typically means listen on all available interfaces.
started_listeners = await nursery.start(
serve_tcp,
handler,
int(maddr.value_for_protocol("tcp")),
maddr.value_for_protocol("ip4"),
tcp_port,
ip4_host_str,
)
self.listeners.extend(listeners)
if started_listeners is None:
# This implies that task_status.started() was not called within serve_tcp,
# likely because trio.serve_tcp itself failed to start (e.g., port in use).
logger.error(
f"Failed to start TCP listener for {maddr}: "
f"`nursery.start` returned None. "
"This might be due to issues like the port already "
"being in use or invalid host."
)
return False
self.listeners.extend(started_listeners)
return True
def get_addrs(self) -> tuple[Multiaddr, ...]:
"""
@ -105,15 +138,42 @@ class TCP(ITransport):
:return: `RawConnection` if successful
:raise OpenConnectionError: raised when failed to open connection
"""
self.host = maddr.value_for_protocol("ip4")
self.port = int(maddr.value_for_protocol("tcp"))
host_str = maddr.value_for_protocol("ip4")
port_str = maddr.value_for_protocol("tcp")
if host_str is None:
raise OpenConnectionError(
f"Failed to dial {maddr}: IP address not found in multiaddr."
)
if port_str is None:
raise OpenConnectionError(
f"Failed to dial {maddr}: TCP port not found in multiaddr."
)
try:
stream = await trio.open_tcp_stream(self.host, self.port)
except OSError as error:
raise OpenConnectionError from error
read_write_closer = TrioTCPStream(stream)
port_int = int(port_str)
except ValueError:
raise OpenConnectionError(
f"Failed to dial {maddr}: Invalid TCP port '{port_str}'."
)
try:
# trio.open_tcp_stream requires host to be str or bytes, not None.
stream = await trio.open_tcp_stream(host_str, port_int)
except OSError as error:
# OSError is common for network issues like "Connection refused"
# or "Host unreachable".
raise OpenConnectionError(
f"Failed to open TCP stream to {maddr}: {error}"
) from error
except Exception as error:
# Catch other potential errors from trio.open_tcp_stream and wrap them.
raise OpenConnectionError(
f"An unexpected error occurred when dialing {maddr}: {error}"
) from error
read_write_closer = TrioTCPStream(stream)
return RawConnection(read_write_closer, True)
def create_listener(self, handler_function: THandler) -> TCPListener:

View File

@ -13,15 +13,13 @@ import sys
import threading
from typing import (
Any,
Optional,
Union,
)
# Create a log queue
log_queue: "queue.Queue[Any]" = queue.Queue()
# Store the current listener to stop it on exit
_current_listener: Optional[logging.handlers.QueueListener] = None
_current_listener: logging.handlers.QueueListener | None = None
# Event to track when the listener is ready
_listener_ready = threading.Event()
@ -135,7 +133,7 @@ def setup_logging() -> None:
formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
# Configure handlers
handlers: list[Union[logging.StreamHandler[Any], logging.FileHandler]] = []
handlers: list[logging.StreamHandler[Any] | logging.FileHandler] = []
# Console handler
console_handler = logging.StreamHandler(sys.stderr)