2 Commits

Author SHA1 Message Date
4b331d96a7 fix: pre-commit issues 2025-05-31 12:47:46 +01:00
1d9849cb43 feat: achieve ping interop py-libp2p - rust-libp2p
WORKING: Connection, handshake, Yamux setup, initial ping/pong. ISSUE: Frame parser corruption after 2-3 frames (boundary sync)
2025-05-31 12:25:12 +01:00
29 changed files with 484 additions and 1718 deletions

View File

@ -1,9 +1,16 @@
import argparse
import logging
import os
import struct
from cryptography.hazmat.primitives.asymmetric import (
x25519,
)
import multiaddr
import trio
from libp2p import (
generate_new_rsa_identity,
new_host,
)
from libp2p.custom_types import (
@ -12,109 +19,496 @@ from libp2p.custom_types import (
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.stream_muxer.yamux.yamux import (
Yamux,
YamuxStream,
)
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"),
],
)
# Protocol constants - must match rust-libp2p exactly
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60
RESP_TIMEOUT = 30
MAX_FRAME_SIZE = 1024 * 1024 # 1MB max frame size
class InteropYamux(Yamux):
"""Enhanced Yamux with proper rust-libp2p interoperability"""
def __init__(self, *args, **kwargs):
logging.info("InteropYamux.__init__ called")
super().__init__(*args, **kwargs)
self.frame_count = 0
self.debug_frames = True
async def _read_exact_bytes(self, n):
"""Read exactly n bytes from the connection with proper error handling"""
if n == 0:
return b""
if n > MAX_FRAME_SIZE:
logging.error(f"Requested read size {n} exceeds maximum {MAX_FRAME_SIZE}")
return None
data = b""
while len(data) < n:
try:
remaining = n - len(data)
chunk = await self.secured_conn.read(remaining)
except (trio.ClosedResourceError, trio.BrokenResourceError):
logging.debug(
f"Connection closed while reading {n}"
f"bytes (got {len(data)}) for peer {self.peer_id}"
)
return None
except Exception as e:
logging.error(f"Error reading {n} bytes: {e}")
return None
if not chunk:
logging.debug(
f"Connection closed while reading {n}"
f"bytes (got {len(data)}) for peer {self.peer_id}"
)
return None
data += chunk
return data
async def handle_incoming(self):
"""Enhanced incoming frame handler with better error recovery"""
logging.info(f"Starting Yamux for {self.peer_id}")
consecutive_errors = 0
max_consecutive_errors = 3
while not self.event_shutting_down.is_set():
try:
# Read frame header (12 bytes)
header_data = await self._read_exact_bytes(12)
if header_data is None:
logging.debug(
f"Connection closed or incomplete"
f"header for peer {self.peer_id}"
)
break
# Quick sanity check for protocol data leakage
if (
b"/ipfs" in header_data
or b"multi" in header_data
or b"noise" in header_data
):
logging.error(
f"Protocol data in header position: {header_data.hex()}"
)
break
try:
# Unpack header: version, type, flags, stream_id, length
version, msg_type, flags, stream_id, length = struct.unpack(
">BBHII", header_data
)
# Validate header values strictly
if version != 0:
logging.error(f"Invalid yamux version {version}, expected 0")
break
if msg_type not in [0, 1, 2, 3]:
logging.error(f"Invalid message type {msg_type}, expected 0-3")
break
if length > MAX_FRAME_SIZE:
logging.error(f"Frame too large: {length} > {MAX_FRAME_SIZE}")
break
# Additional validation for ping frames
if msg_type == 2 and length != 4:
logging.error(
f"Invalid ping frame length: {length}, expected 4"
)
break
# Log frame details
logging.debug(
f"Received header for peer {self.peer_id}"
f": type={msg_type}, flags={flags},"
f"stream_id={stream_id}, length={length}"
)
consecutive_errors = 0 # Reset error counter on successful parse
except struct.error as e:
consecutive_errors += 1
logging.error(
f"Header parse error #{consecutive_errors}"
f": {e}, data: {header_data.hex()}"
)
if consecutive_errors >= max_consecutive_errors:
logging.error("Too many consecutive header parse errors")
break
continue
# Read payload if present
payload = b""
if length > 0:
payload = await self._read_exact_bytes(length)
if payload is None:
logging.debug(
f"Failed to read payload of"
f"{length} bytes for peer {self.peer_id}"
)
break
if len(payload) != length:
logging.error(
f"Payload length mismatch:"
f"got {len(payload)}, expected {length}"
)
break
# Process frame by type
if msg_type == 0: # Data frame
await self._handle_data_frame(stream_id, flags, payload)
elif msg_type == 1: # Window update
await self._handle_window_update(stream_id, payload)
elif msg_type == 2: # Ping frame
await self._handle_ping_frame(stream_id, flags, payload)
elif msg_type == 3: # GoAway frame
await self._handle_goaway_frame(payload)
break
except (trio.ClosedResourceError, trio.BrokenResourceError):
logging.debug(
f"Connection closed during frame processing for peer {self.peer_id}"
)
break
except Exception as e:
consecutive_errors += 1
logging.error(f"Frame processing error #{consecutive_errors}: {e}")
if consecutive_errors >= max_consecutive_errors:
logging.error("Too many consecutive frame processing errors")
break
await self._cleanup_on_error()
async def _handle_data_frame(self, stream_id, flags, payload):
"""Handle data frames with proper stream lifecycle"""
if stream_id == 0:
logging.warning("Received data frame for stream 0 (control stream)")
return
# Handle SYN flag - new stream creation
if flags & 0x1: # SYN flag
if stream_id in self.streams:
logging.warning(f"SYN received for existing stream {stream_id}")
else:
logging.debug(
f"Creating new stream {stream_id} for peer {self.peer_id}"
)
stream = YamuxStream(self, stream_id, is_outbound=False)
async with self.streams_lock:
self.streams[stream_id] = stream
# Send the new stream to the handler
await self.new_stream_send_channel.send(stream)
logging.debug(f"Sent stream {stream_id} to handler")
# Add data to stream buffer if stream exists
if stream_id in self.streams:
stream = self.streams[stream_id]
if payload:
# Add to stream's receive buffer
async with self.streams_lock:
if not hasattr(stream, "_receive_buffer"):
stream._receive_buffer = bytearray()
if not hasattr(stream, "_receive_event"):
stream._receive_event = trio.Event()
stream._receive_buffer.extend(payload)
stream._receive_event.set()
# Handle stream closure flags
if flags & 0x2: # FIN flag
stream.recv_closed = True
logging.debug(f"Stream {stream_id} received FIN")
if flags & 0x4: # RST flag
stream.reset_received = True
logging.debug(f"Stream {stream_id} received RST")
else:
if payload:
logging.warning(f"Received data for unknown stream {stream_id}")
async def _handle_window_update(self, stream_id, payload):
"""Handle window update frames"""
if len(payload) != 4:
logging.warning(f"Invalid window update payload length: {len(payload)}")
return
delta = struct.unpack(">I", payload)[0]
logging.debug(f"Window update: stream={stream_id}, delta={delta}")
async with self.streams_lock:
if stream_id in self.streams:
if not hasattr(self.streams[stream_id], "send_window"):
self.streams[stream_id].send_window = 256 * 1024 # Default window
self.streams[stream_id].send_window += delta
async def _handle_ping_frame(self, stream_id, flags, payload):
"""Handle ping/pong frames with proper validation"""
if len(payload) != 4:
logging.warning(f"Invalid ping payload length: {len(payload)} (expected 4)")
return
ping_value = struct.unpack(">I", payload)[0]
if flags & 0x1: # SYN flag - ping request
logging.debug(
f"Received ping request with value {ping_value} for peer {self.peer_id}"
)
# Send pong response (ACK flag = 0x2)
try:
pong_header = struct.pack(
">BBHII", 0, 2, 0x2, 0, 4
) # Version=0, Type=2, Flags=ACK, StreamID=0, Length=4
pong_payload = struct.pack(">I", ping_value)
await self.secured_conn.write(pong_header + pong_payload)
logging.debug(f"Sent pong response with value {ping_value}")
except Exception as e:
logging.error(f"Failed to send pong response: {e}")
else:
# Pong response
logging.debug(f"Received pong response with value {ping_value}")
async def _handle_goaway_frame(self, payload):
"""Handle GoAway frames"""
if len(payload) != 4:
logging.warning(f"Invalid GoAway payload length: {len(payload)}")
return
code = struct.unpack(">I", payload)[0]
logging.info(f"Received GoAway frame with code {code}")
self.event_shutting_down.set()
async def handle_ping(stream: INetStream) -> None:
while True:
try:
peer_id = stream.muxed_conn.peer_id
logging.info(f"Handling ping stream from {peer_id}")
try:
with trio.fail_after(RESP_TIMEOUT):
# Read initial protocol negotiation
initial_data = await stream.read(1024)
logging.debug(
f"Received initial stream data from {peer_id}"
f": {initial_data.hex()} (length={len(initial_data)})"
)
if initial_data == b"/ipfs/ping/1.0.0\n":
logging.debug(
f"Confirmed /ipfs/ping/1.0.0 protocol negotiation from {peer_id}"
)
else:
logging.warning(f"Unexpected initial data: {initial_data!r}")
# Read ping payload
payload = await stream.read(PING_LENGTH)
peer_id = stream.muxed_conn.peer_id
if payload is not None:
print(f"received ping from {peer_id}")
await stream.write(payload)
print(f"responded with pong to {peer_id}")
if not payload:
logging.info(f"Stream closed by {peer_id}")
return
if len(payload) != PING_LENGTH:
logging.warning(
f"Unexpected payload length"
f" {len(payload)} from {peer_id}: {payload.hex()}"
)
return
logging.info(
f"Received ping from {peer_id}:"
f" {payload[:8].hex()}... (length={len(payload)})"
)
await stream.write(payload)
logging.info(f"Sent pong to {peer_id}: {payload[:8].hex()}...")
except trio.TooSlowError:
logging.warning(f"Ping timeout with {peer_id}")
except trio.BrokenResourceError:
logging.info(f"Connection broken with {peer_id}")
except Exception as e:
logging.error(f"Error handling ping from {peer_id}: {e}")
finally:
try:
await stream.close()
logging.debug(f"Closed ping stream with {peer_id}")
except Exception:
await stream.reset()
break
pass
async def send_ping(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id
try:
payload = b"\x01" * PING_LENGTH
print(f"sending ping to {stream.muxed_conn.peer_id}")
await stream.write(payload)
payload = os.urandom(PING_LENGTH)
logging.info(f"Sending ping to {peer_id}: {payload[:8].hex()}...")
with trio.fail_after(RESP_TIMEOUT):
await stream.write(payload)
logging.debug(f"Ping sent to {peer_id}")
response = await stream.read(PING_LENGTH)
if not response:
logging.error(f"No pong response from {peer_id}")
return
if len(response) != PING_LENGTH:
logging.warning(
f"Pong length mismatch: got {len(response)}, expected {PING_LENGTH}"
)
if response == payload:
print(f"received pong from {stream.muxed_conn.peer_id}")
logging.info(f"Ping successful! Pong matches from {peer_id}")
else:
logging.warning(f"Pong mismatch from {peer_id}")
except trio.TooSlowError:
logging.error(f"Ping timeout to {peer_id}")
except Exception as e:
print(f"error occurred : {e}")
logging.error(f"Error sending ping to {peer_id}: {e}")
finally:
try:
await stream.close()
except Exception:
pass
def create_noise_keypair():
try:
x25519_private_key = x25519.X25519PrivateKey.generate()
class NoisePrivateKey:
def __init__(self, key):
self._key = key
def to_bytes(self):
return self._key.private_bytes_raw()
def public_key(self):
return NoisePublicKey(self._key.public_key())
def get_public_key(self):
return NoisePublicKey(self._key.public_key())
class NoisePublicKey:
def __init__(self, key):
self._key = key
def to_bytes(self):
return self._key.public_bytes_raw()
return NoisePrivateKey(x25519_private_key)
except Exception as e:
logging.error(f"Failed to create Noise keypair: {e}")
return None
def info_from_p2p_addr(addr):
"""Extract peer info from multiaddr - you'll need to implement this"""
# This is a placeholder - you need to implement the actual parsing
# based on your libp2p implementation
async def run(port: int, destination: str) -> None:
localhost_ip = "127.0.0.1"
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
host = new_host(listen_addrs=[listen_addr])
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
try:
key_pair = generate_new_rsa_identity()
logging.debug("Generated RSA keypair")
noise_privkey = create_noise_keypair()
logging.debug("Generated Noise keypair")
except Exception as e:
logging.error(f"Key generation failed: {e}")
raise
noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey)
logging.debug(f"Noise transport initialized: {noise_transport}")
sec_opt = {TProtocol("/noise"): noise_transport}
muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): InteropYamux}
logging.info(f"Using muxer: {muxer_opt}")
host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt)
peer_id = host.get_id().pretty()
logging.info(f"Host peer ID: {peer_id}")
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery():
if not destination:
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
print(
"Run this from the same folder in another console:\n\n"
f"ping-demo -p {int(port) + 1} "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}\n"
)
print("Waiting for incoming connection...")
logging.info(f"Server listening on {listen_addr}")
logging.info(f"Full address: {listen_addr}/p2p/{peer_id}")
logging.info("Waiting for connections...")
await trio.sleep_forever()
else:
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)
await host.connect(info)
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
nursery.start_soon(send_ping, stream)
logging.info(f"Connecting to {info.peer_id}")
return
try:
with trio.fail_after(30):
await host.connect(info)
logging.info(f"Connected to {info.peer_id}")
await trio.sleep_forever()
await trio.sleep(2.0)
logging.info(f"Opening ping stream to {info.peer_id}")
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
logging.info(f"Opened ping stream to {info.peer_id}")
await trio.sleep(0.5)
await send_ping(stream)
logging.info("Ping completed successfully")
logging.info("Keeping connection alive for 5 seconds...")
await trio.sleep(5.0)
except trio.TooSlowError:
logging.error(f"Connection timeout to {info.peer_id}")
raise
except Exception as e:
logging.error(f"Connection failed to {info.peer_id}: {e}")
import traceback
traceback.print_exc()
raise
def main() -> None:
description = """
This program demonstrates a simple p2p ping application using libp2p.
To use it, first run 'python ping.py -p <PORT>', where <PORT> is the port number.
Then, run another instance with 'python ping.py -p <ANOTHER_PORT> -d <DESTINATION>',
where <DESTINATION> is the multiaddress of the previous listener host.
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-p", "--port", default=8000, type=int, help="source port number"
def main():
parser = argparse.ArgumentParser(
description="libp2p ping with Rust interoperability"
)
parser.add_argument(
"-d",
"--destination",
type=str,
help=f"destination multiaddr string, e.g. {example_maddr}",
"-p", "--port", default=8000, type=int, help="Port to listen on"
)
parser.add_argument("-d", "--destination", type=str, help="Destination multiaddr")
args = parser.parse_args()
if not args.port:
raise RuntimeError("failed to determine local port")
try:
trio.run(run, *(args.port, args.destination))
trio.run(run, args.port, args.destination)
except KeyboardInterrupt:
pass
logging.info("Terminated by user")
except Exception as e:
logging.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":

View File

@ -1,6 +1,5 @@
from collections.abc import (
Mapping,
Sequence,
)
from importlib.metadata import version as __version
from typing import (
@ -10,8 +9,6 @@ from typing import (
cast,
)
import multiaddr
from libp2p.abc import (
IHost,
IMuxedConn,
@ -157,7 +154,6 @@ def new_swarm(
sec_opt: Optional[TSecurityOptions] = None,
peerstore_opt: Optional[IPeerStore] = None,
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
listen_addrs: Optional[Sequence[multiaddr.Multiaddr]] = None,
) -> INetworkService:
"""
Create a swarm instance based on the parameters.
@ -167,7 +163,6 @@ def new_swarm(
:param sec_opt: optional choice of security upgrade
:param peerstore_opt: optional peerstore
:param muxer_preference: optional explicit muxer preference
:param listen_addrs: optional list of multiaddrs to listen on
:return: return a default swarm instance
Note: Yamux (/yamux/1.0.0) is the preferred stream multiplexer
@ -180,16 +175,8 @@ def new_swarm(
id_opt = generate_peer_id_from(key_pair)
if listen_addrs is None:
transport = TCP()
else:
addr = listen_addrs[0]
if addr.__contains__("tcp"):
transport = TCP()
elif addr.__contains__("quic"):
raise ValueError("QUIC not yet supported")
else:
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
# TODO: Parse `listen_addrs` to determine transport
transport = TCP()
# Generate X25519 keypair for Noise
noise_key_pair = create_new_x25519_key_pair()
@ -242,7 +229,6 @@ def new_host(
peerstore_opt: Optional[IPeerStore] = None,
disc_opt: Optional[IPeerRouting] = None,
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
listen_addrs: Sequence[multiaddr.Multiaddr] = None,
) -> IHost:
"""
Create a new libp2p host based on the given parameters.
@ -253,7 +239,6 @@ def new_host(
:param peerstore_opt: optional peerstore
:param disc_opt: optional discovery
:param muxer_preference: optional explicit muxer preference
:param listen_addrs: optional list of multiaddrs to listen on
:return: return a host instance
"""
swarm = new_swarm(
@ -262,7 +247,6 @@ def new_host(
sec_opt=sec_opt,
peerstore_opt=peerstore_opt,
muxer_preference=muxer_preference,
listen_addrs=listen_addrs,
)
if disc_opt is not None:

View File

@ -8,14 +8,10 @@ from collections.abc import (
KeysView,
Sequence,
)
from types import (
TracebackType,
)
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Optional,
)
from multiaddr import (
@ -219,7 +215,7 @@ class IMuxedConn(ABC):
"""
class IMuxedStream(ReadWriteCloser, AsyncContextManager["IMuxedStream"]):
class IMuxedStream(ReadWriteCloser):
"""
Interface for a multiplexed stream.
@ -253,20 +249,6 @@ class IMuxedStream(ReadWriteCloser, AsyncContextManager["IMuxedStream"]):
otherwise False.
"""
@abstractmethod
async def __aenter__(self) -> "IMuxedStream":
"""Enter the async context manager."""
return self
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()
# -------------------------- net_stream interface.py --------------------------

View File

@ -195,29 +195,6 @@ class BasicHost(IHost):
net_stream.set_protocol(selected_protocol)
return net_stream
async def send_command(self, peer_id: ID, command: str) -> list[str]:
"""
Send a multistream-select command to the specified peer and return
the response.
:param peer_id: peer_id that host is connecting
:param command: supported multistream-select command (e.g., "ls)
:raise StreamFailure: If the stream cannot be opened or negotiation fails
:return: list of strings representing the response from peer.
"""
new_stream = await self._network.new_stream(peer_id)
try:
response = await self.multiselect_client.query_multistream_command(
MultiselectCommunicator(new_stream), command
)
except MultiselectClientError as error:
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
await new_stream.reset()
raise StreamFailure(f"failed to open a stream to peer {peer_id}") from error
return response
async def connect(self, peer_info: PeerInfo) -> None:
"""
Ensure there is a connection between this host and the peer

View File

@ -60,14 +60,8 @@ class Multiselect(IMultiselectMuxer):
raise MultiselectError() from error
if command == "ls":
supported_protocols = list(self.handlers.keys())
response = "\n".join(supported_protocols) + "\n"
try:
await communicator.write(response)
except MultiselectCommunicatorError as error:
raise MultiselectError() from error
# TODO: handle ls command
pass
else:
protocol = TProtocol(command)
if protocol in self.handlers:

View File

@ -70,36 +70,6 @@ class MultiselectClient(IMultiselectClient):
raise MultiselectClientError("protocols not supported")
async def query_multistream_command(
self, communicator: IMultiselectCommunicator, command: str
) -> list[str]:
"""
Send a multistream-select command over the given communicator and return
parsed response.
:param communicator: communicator to use to communicate with counterparty
:param command: supported multistream-select command(e.g., ls)
:raise MultiselectClientError: If the communicator fails to process data.
:return: list of strings representing the response from peer.
"""
await self.handshake(communicator)
if command == "ls":
try:
await communicator.write("ls")
except MultiselectCommunicatorError as error:
raise MultiselectClientError() from error
else:
raise ValueError("Command not supported")
try:
response = await communicator.read()
response_list = response.strip().splitlines()
except MultiselectCommunicatorError as error:
raise MultiselectClientError() from error
return response_list
async def try_select(
self, communicator: IMultiselectCommunicator, protocol: TProtocol
) -> TProtocol:

View File

@ -10,7 +10,6 @@ from collections.abc import (
)
import logging
import random
import time
from typing import (
Any,
DefaultDict,
@ -81,7 +80,8 @@ class GossipSub(IPubsubRouter, Service):
# The protocol peer supports
peer_protocol: dict[ID, TProtocol]
time_since_last_publish: dict[str, int]
# TODO: Add `time_since_last_publish`
# Create topic --> time since last publish map.
mcache: MessageCache
@ -138,7 +138,6 @@ class GossipSub(IPubsubRouter, Service):
self.direct_peers[direct_peer.peer_id] = direct_peer
self.direct_connect_interval = direct_connect_interval
self.direct_connect_initial_delay = direct_connect_initial_delay
self.time_since_last_publish = {}
async def run(self) -> None:
if self.pubsub is None:
@ -254,8 +253,6 @@ class GossipSub(IPubsubRouter, Service):
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
for topic in pubsub_msg.topicIDs:
self.time_since_last_publish[topic] = int(time.time())
def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
@ -345,7 +342,6 @@ class GossipSub(IPubsubRouter, Service):
await self.emit_graft(topic, peer)
self.fanout.pop(topic, None)
self.time_since_last_publish.pop(topic, None)
async def leave(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec
@ -518,12 +514,10 @@ class GossipSub(IPubsubRouter, Service):
def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec
for topic in list(self.fanout):
if (
topic not in self.pubsub.peer_topics
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
< int(time.time())
):
for topic in self.fanout:
# Delete topic entry if it's not in `pubsub.peer_topics`
# or (TODO) if it's time-since-last-published > ttl
if topic not in self.pubsub.peer_topics:
# Remove topic from fanout
del self.fanout[topic]
else:

View File

@ -122,9 +122,6 @@ class Pubsub(Service, IPubsub):
strict_signing: bool
sign_key: PrivateKey
# Set of blacklisted peer IDs
blacklisted_peers: set[ID]
event_handle_peer_queue_started: trio.Event
event_handle_dead_peer_queue_started: trio.Event
@ -204,9 +201,6 @@ class Pubsub(Service, IPubsub):
self.counter = int(time.time())
# Set of blacklisted peer IDs
self.blacklisted_peers = set()
self.event_handle_peer_queue_started = trio.Event()
self.event_handle_dead_peer_queue_started = trio.Event()
@ -326,82 +320,6 @@ class Pubsub(Service, IPubsub):
if topic in self.topic_validators
)
def add_to_blacklist(self, peer_id: ID) -> None:
"""
Add a peer to the blacklist.
When a peer is blacklisted:
- Any existing connection to that peer is immediately closed and removed
- The peer is removed from all topic subscription mappings
- Future connection attempts from this peer will be rejected
- Messages forwarded by or originating from this peer will be dropped
- The peer will not be able to participate in pubsub communication
:param peer_id: the peer ID to blacklist
"""
self.blacklisted_peers.add(peer_id)
logger.debug("Added peer %s to blacklist", peer_id)
self.manager.run_task(self._teardown_if_connected, peer_id)
async def _teardown_if_connected(self, peer_id: ID) -> None:
"""Close their stream and remove them if connected"""
stream = self.peers.get(peer_id)
if stream is not None:
try:
await stream.reset()
except Exception:
pass
del self.peers[peer_id]
# Also remove from any subscription maps:
for _topic, peerset in self.peer_topics.items():
if peer_id in peerset:
peerset.discard(peer_id)
def remove_from_blacklist(self, peer_id: ID) -> None:
"""
Remove a peer from the blacklist.
Once removed from the blacklist:
- The peer can establish new connections to this node
- Messages from this peer will be processed normally
- The peer can participate in topic subscriptions and message forwarding
:param peer_id: the peer ID to remove from blacklist
"""
self.blacklisted_peers.discard(peer_id)
logger.debug("Removed peer %s from blacklist", peer_id)
def is_peer_blacklisted(self, peer_id: ID) -> bool:
"""
Check if a peer is blacklisted.
:param peer_id: the peer ID to check
:return: True if peer is blacklisted, False otherwise
"""
return peer_id in self.blacklisted_peers
def clear_blacklist(self) -> None:
"""
Clear all peers from the blacklist.
This removes all blacklist restrictions, allowing previously blacklisted
peers to:
- Establish new connections
- Send and forward messages
- Participate in topic subscriptions
"""
self.blacklisted_peers.clear()
logger.debug("Cleared all peers from blacklist")
def get_blacklisted_peers(self) -> set[ID]:
"""
Get a copy of the current blacklisted peers.
Returns a snapshot of all currently blacklisted peer IDs. These peers
are completely isolated from pubsub communication - their connections
are rejected and their messages are dropped.
:return: a set containing all blacklisted peer IDs
"""
return self.blacklisted_peers.copy()
async def stream_handler(self, stream: INetStream) -> None:
"""
Stream handler for pubsub. Gets invoked whenever a new stream is
@ -428,10 +346,6 @@ class Pubsub(Service, IPubsub):
await self.event_handle_dead_peer_queue_started.wait()
async def _handle_new_peer(self, peer_id: ID) -> None:
if self.is_peer_blacklisted(peer_id):
logger.debug("Rejecting blacklisted peer %s", peer_id)
return
try:
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
except SwarmException as error:
@ -445,6 +359,7 @@ class Pubsub(Service, IPubsub):
except StreamClosed:
logger.debug("Fail to add new peer %s: stream closed", peer_id)
return
# TODO: Check if the peer in black list.
try:
self.router.add_peer(peer_id, stream.get_protocol())
except Exception as error:
@ -694,20 +609,9 @@ class Pubsub(Service, IPubsub):
"""
logger.debug("attempting to publish message %s", msg)
# Check if the message forwarder (source) is in the blacklist. If yes, reject.
if self.is_peer_blacklisted(msg_forwarder):
logger.debug(
"Rejecting message from blacklisted source peer %s", msg_forwarder
)
return
# TODO: Check if the `source` is in the blacklist. If yes, reject.
# Check if the message originator (from) is in the blacklist. If yes, reject.
msg_from_peer = ID(msg.from_id)
if self.is_peer_blacklisted(msg_from_peer):
logger.debug(
"Rejecting message from blacklisted originator peer %s", msg_from_peer
)
return
# TODO: Check if the `from` is in the blacklist. If yes, reject.
# If the message is processed before, return(i.e., don't further process the message) # noqa: E501
if self._is_msg_seen(msg):

View File

@ -1,6 +1,3 @@
from types import (
TracebackType,
)
from typing import (
TYPE_CHECKING,
Optional,
@ -260,16 +257,3 @@ class MplexStream(IMuxedStream):
def get_remote_address(self) -> Optional[tuple[str, int]]:
"""Delegate to the parent Mplex connection."""
return self.muxed_conn.get_remote_address()
async def __aenter__(self) -> "MplexStream":
"""Enter the async context manager."""
return self
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()

View File

@ -9,9 +9,6 @@ from collections.abc import (
import inspect
import logging
import struct
from types import (
TracebackType,
)
from typing import (
Callable,
Optional,
@ -77,19 +74,6 @@ class YamuxStream(IMuxedStream):
self.recv_window = DEFAULT_WINDOW_SIZE
self.window_lock = trio.Lock()
async def __aenter__(self) -> "YamuxStream":
"""Enter the async context manager."""
return self
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit the async context manager and close the stream."""
await self.close()
async def write(self, data: bytes) -> None:
if self.send_closed:
raise MuxedStreamError("Stream is closed for sending")

View File

@ -1 +0,0 @@
Allow passing `listen_addrs` to `new_swarm` to customize swarm listening behavior.

View File

@ -1,2 +0,0 @@
Feature: Support for sending `ls` command over `multistream-select` to list supported protocols from remote peer.
This allows inspecting which protocol handlers a peer supports at runtime.

View File

@ -1 +0,0 @@
implement AsyncContextManager for IMuxedStream to support async with

View File

@ -1 +0,0 @@
feat: add method to compute time since last message published by a peer and remove fanout peers based on ttl.

View File

@ -1 +0,0 @@
implement blacklist management for `pubsub.Pubsub` with methods to get, add, remove, check, and clear blacklisted peer IDs.

View File

@ -209,8 +209,8 @@ async def ping_demo(host_a, host_b):
async def pubsub_demo(host_a, host_b):
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 1, 1)
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 1, 1)
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
pubsub_a = Pubsub(host_a, gossipsub_a)
pubsub_b = Pubsub(host_b, gossipsub_b)
message_a_to_b = "Hello from A to B"

View File

@ -7,18 +7,12 @@ from trio.testing import (
wait_all_tasks_blocked,
)
from libp2p import (
new_swarm,
)
from libp2p.network.exceptions import (
SwarmException,
)
from libp2p.tools.utils import (
connect_swarm,
)
from libp2p.transport.tcp.tcp import (
TCP,
)
from tests.utils.factories import (
SwarmFactory,
)
@ -162,20 +156,3 @@ async def test_swarm_multiaddr(security_protocol):
swarms[0].peerstore.add_addrs(swarms[1].get_peer_id(), addrs + addrs, 10000)
await swarms[0].dial_peer(swarms[1].get_peer_id())
def test_new_swarm_defaults_to_tcp():
swarm = new_swarm()
assert isinstance(swarm.transport, TCP)
def test_new_swarm_tcp_multiaddr_supported():
addr = Multiaddr("/ip4/127.0.0.1/tcp/9999")
swarm = new_swarm(listen_addrs=[addr])
assert isinstance(swarm.transport, TCP)
def test_new_swarm_quic_multiaddr_raises():
addr = Multiaddr("/ip4/127.0.0.1/udp/9999/quic")
with pytest.raises(ValueError, match="QUIC not yet supported"):
new_swarm(listen_addrs=[addr])

View File

@ -116,35 +116,3 @@ async def test_multiple_protocol_fails(security_protocol):
await perform_simple_test(
"", protocols_for_client, protocols_for_listener, security_protocol
)
@pytest.mark.trio
async def test_multistream_command(security_protocol):
supported_protocols = [PROTOCOL_ECHO, PROTOCOL_FOO, PROTOCOL_POTATO, PROTOCOL_ROCK]
async with HostFactory.create_batch_and_listen(
2, security_protocol=security_protocol
) as hosts:
listener, dialer = hosts[1], hosts[0]
for protocol in supported_protocols:
listener.set_stream_handler(
protocol, create_echo_stream_handler(ACK_PREFIX)
)
# Ensure dialer knows how to reach the listener
dialer.get_peerstore().add_addrs(listener.get_id(), listener.get_addrs(), 10)
# Dialer asks peer to list the supported protocols using `ls`
response = await dialer.send_command(listener.get_id(), "ls")
# We expect all supported protocols to show up
for protocol in supported_protocols:
assert protocol in response
assert "/does/not/exist" not in response
assert "/foo/bar/1.2.3" not in response
# Dialer asks for unspoorted command
with pytest.raises(ValueError, match="Command not supported"):
await dialer.send_command(listener.get_id(), "random")

View File

@ -22,14 +22,13 @@ from tests.utils.pubsub.utils import (
@pytest.mark.trio
async def test_join():
async with PubsubFactory.create_batch_with_gossipsub(
4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1
4, degree=4, degree_low=3, degree_high=5
) as pubsubs_gsub:
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
hosts = [pubsub.host for pubsub in pubsubs_gsub]
hosts_indices = list(range(len(pubsubs_gsub)))
topic = "test_join"
to_drop_topic = "test_drop_topic"
central_node_index = 0
# Remove index of central host from the indices
hosts_indices.remove(central_node_index)
@ -43,31 +42,23 @@ async def test_join():
# Connect central host to all other hosts
await one_to_all_connect(hosts, central_node_index)
# Wait 1 seconds for heartbeat to allow mesh to connect
await trio.sleep(1)
# Wait 2 seconds for heartbeat to allow mesh to connect
await trio.sleep(2)
# Central node publish to the topic so that this topic
# is added to central node's fanout
# publish from the randomly chosen host
await pubsubs_gsub[central_node_index].publish(topic, b"data")
await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data")
await trio.sleep(0.5)
# Check that the gossipsub of central node has fanout for the topics
assert topic, to_drop_topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topics
assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh
# Check that the gossipsub of central node
# has a time_since_last_publish for the topics
assert topic in gossipsubs[central_node_index].time_since_last_publish
assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish
await trio.sleep(1)
# Check that after ttl the to_drop_topic is no more in fanout of central node
assert to_drop_topic not in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topic
assert topic not in gossipsubs[central_node_index].mesh
# Central node subscribes the topic
await pubsubs_gsub[central_node_index].subscribe(topic)
await trio.sleep(1)
await trio.sleep(2)
# Check that the gossipsub of central node no longer has fanout for the topic
assert topic not in gossipsubs[central_node_index].fanout

View File

@ -702,369 +702,3 @@ async def test_strict_signing_failed_validation(monkeypatch):
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
await trio.sleep(0.01)
assert event.is_set()
@pytest.mark.trio
async def test_blacklist_basic_operations():
"""Test basic blacklist operations: add, remove, check, clear."""
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
# Create test peer IDs
peer1 = IDFactory()
peer2 = IDFactory()
peer3 = IDFactory()
# Initially no peers should be blacklisted
assert len(pubsub.get_blacklisted_peers()) == 0
assert not pubsub.is_peer_blacklisted(peer1)
assert not pubsub.is_peer_blacklisted(peer2)
assert not pubsub.is_peer_blacklisted(peer3)
# Add peers to blacklist
pubsub.add_to_blacklist(peer1)
pubsub.add_to_blacklist(peer2)
# Check blacklist state
assert len(pubsub.get_blacklisted_peers()) == 2
assert pubsub.is_peer_blacklisted(peer1)
assert pubsub.is_peer_blacklisted(peer2)
assert not pubsub.is_peer_blacklisted(peer3)
# Remove one peer from blacklist
pubsub.remove_from_blacklist(peer1)
# Check state after removal
assert len(pubsub.get_blacklisted_peers()) == 1
assert not pubsub.is_peer_blacklisted(peer1)
assert pubsub.is_peer_blacklisted(peer2)
assert not pubsub.is_peer_blacklisted(peer3)
# Add peer3 and then clear all
pubsub.add_to_blacklist(peer3)
assert len(pubsub.get_blacklisted_peers()) == 2
pubsub.clear_blacklist()
assert len(pubsub.get_blacklisted_peers()) == 0
assert not pubsub.is_peer_blacklisted(peer1)
assert not pubsub.is_peer_blacklisted(peer2)
assert not pubsub.is_peer_blacklisted(peer3)
# Test duplicate additions (should not increase size)
pubsub.add_to_blacklist(peer1)
pubsub.add_to_blacklist(peer1)
assert len(pubsub.get_blacklisted_peers()) == 1
# Test removing non-blacklisted peer (should not cause errors)
pubsub.remove_from_blacklist(peer2)
assert len(pubsub.get_blacklisted_peers()) == 1
@pytest.mark.trio
async def test_blacklist_blocks_new_peer_connections(monkeypatch):
"""Test that blacklisted peers are rejected when trying to connect."""
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
# Create a blacklisted peer ID
blacklisted_peer = IDFactory()
# Add peer to blacklist
pubsub.add_to_blacklist(blacklisted_peer)
new_stream_called = False
async def mock_new_stream(*args, **kwargs):
nonlocal new_stream_called
new_stream_called = True
# Create a mock stream
from unittest.mock import (
AsyncMock,
Mock,
)
mock_stream = Mock()
mock_stream.write = AsyncMock()
mock_stream.reset = AsyncMock()
mock_stream.get_protocol = Mock(return_value="test_protocol")
return mock_stream
router_add_peer_called = False
def mock_add_peer(*args, **kwargs):
nonlocal router_add_peer_called
router_add_peer_called = True
with monkeypatch.context() as m:
m.setattr(pubsub.host, "new_stream", mock_new_stream)
m.setattr(pubsub.router, "add_peer", mock_add_peer)
# Attempt to handle the blacklisted peer
await pubsub._handle_new_peer(blacklisted_peer)
# Verify that both new_stream and router.add_peer was not called
assert (
not new_stream_called
), "new_stream should be not be called to get hello packet"
assert (
not router_add_peer_called
), "Router.add_peer should not be called for blacklisted peer"
assert (
blacklisted_peer not in pubsub.peers
), "Blacklisted peer should not be in peers dict"
@pytest.mark.trio
async def test_blacklist_blocks_messages_from_blacklisted_originator():
"""Test that messages from blacklisted originator (from field) are rejected."""
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
blacklisted_originator = pubsubs_fsub[1].my_id # Use existing peer ID
# Add the originator to blacklist
pubsub.add_to_blacklist(blacklisted_originator)
# Create a message with blacklisted originator
msg = make_pubsub_msg(
origin_id=blacklisted_originator,
topic_ids=[TESTING_TOPIC],
data=TESTING_DATA,
seqno=b"\x00" * 8,
)
# Subscribe to the topic
await pubsub.subscribe(TESTING_TOPIC)
# Track if router.publish is called
router_publish_called = False
async def mock_router_publish(*args, **kwargs):
nonlocal router_publish_called
router_publish_called = True
await trio.lowlevel.checkpoint()
original_router_publish = pubsub.router.publish
pubsub.router.publish = mock_router_publish
try:
# Attempt to push message from blacklisted originator
await pubsub.push_msg(blacklisted_originator, msg)
# Verify message was rejected
assert (
not router_publish_called
), "Router.publish should not be called for blacklisted originator"
assert not pubsub._is_msg_seen(
msg
), "Message from blacklisted originator should not be marked as seen"
finally:
pubsub.router.publish = original_router_publish
@pytest.mark.trio
async def test_blacklist_allows_non_blacklisted_peers():
"""Test that non-blacklisted peers can send messages normally."""
async with PubsubFactory.create_batch_with_floodsub(3) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
allowed_peer = pubsubs_fsub[1].my_id
blacklisted_peer = pubsubs_fsub[2].my_id
# Blacklist one peer but not the other
pubsub.add_to_blacklist(blacklisted_peer)
# Create messages from both peers
msg_from_allowed = make_pubsub_msg(
origin_id=allowed_peer,
topic_ids=[TESTING_TOPIC],
data=b"allowed_data",
seqno=b"\x00" * 8,
)
msg_from_blacklisted = make_pubsub_msg(
origin_id=blacklisted_peer,
topic_ids=[TESTING_TOPIC],
data=b"blacklisted_data",
seqno=b"\x11" * 8,
)
# Subscribe to the topic
sub = await pubsub.subscribe(TESTING_TOPIC)
# Track router.publish calls
router_publish_calls = []
async def mock_router_publish(*args, **kwargs):
router_publish_calls.append(args)
await trio.lowlevel.checkpoint()
original_router_publish = pubsub.router.publish
pubsub.router.publish = mock_router_publish
try:
# Send message from allowed peer (should succeed)
await pubsub.push_msg(allowed_peer, msg_from_allowed)
# Send message from blacklisted peer (should be rejected)
await pubsub.push_msg(allowed_peer, msg_from_blacklisted)
# Verify only allowed message was processed
assert (
len(router_publish_calls) == 1
), "Only one message should be processed"
assert pubsub._is_msg_seen(
msg_from_allowed
), "Allowed message should be marked as seen"
assert not pubsub._is_msg_seen(
msg_from_blacklisted
), "Blacklisted message should not be marked as seen"
# Verify subscription received the allowed message
received_msg = await sub.get()
assert received_msg.data == b"allowed_data"
finally:
pubsub.router.publish = original_router_publish
@pytest.mark.trio
async def test_blacklist_integration_with_existing_functionality():
"""Test that blacklisting works correctly with existing pubsub functionality."""
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
other_peer = pubsubs_fsub[1].my_id
# Test that seen messages cache still works with blacklisting
pubsub.add_to_blacklist(other_peer)
msg = make_pubsub_msg(
origin_id=other_peer,
topic_ids=[TESTING_TOPIC],
data=TESTING_DATA,
seqno=b"\x00" * 8,
)
# First attempt - should be rejected due to blacklist
await pubsub.push_msg(other_peer, msg)
assert not pubsub._is_msg_seen(msg)
# Remove from blacklist
pubsub.remove_from_blacklist(other_peer)
# Now the message should be processed
await pubsub.subscribe(TESTING_TOPIC)
await pubsub.push_msg(other_peer, msg)
assert pubsub._is_msg_seen(msg)
# If we try to send the same message again, it should be rejected
# due to seen cache (not blacklist)
router_publish_called = False
async def mock_router_publish(*args, **kwargs):
nonlocal router_publish_called
router_publish_called = True
await trio.lowlevel.checkpoint()
original_router_publish = pubsub.router.publish
pubsub.router.publish = mock_router_publish
try:
await pubsub.push_msg(other_peer, msg)
assert (
not router_publish_called
), "Duplicate message should be rejected by seen cache"
finally:
pubsub.router.publish = original_router_publish
@pytest.mark.trio
async def test_blacklist_blocks_messages_from_blacklisted_source():
"""Test that messages from blacklisted source (forwarder) are rejected."""
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
pubsub = pubsubs_fsub[0]
blacklisted_forwarder = pubsubs_fsub[1].my_id
# Add the forwarder to blacklist
pubsub.add_to_blacklist(blacklisted_forwarder)
# Create a message
msg = make_pubsub_msg(
origin_id=pubsubs_fsub[1].my_id,
topic_ids=[TESTING_TOPIC],
data=TESTING_DATA,
seqno=b"\x00" * 8,
)
# Subscribe to the topic so we can check if message is processed
await pubsub.subscribe(TESTING_TOPIC)
# Track if router.publish is called (it shouldn't be for blacklisted forwarder)
router_publish_called = False
async def mock_router_publish(*args, **kwargs):
nonlocal router_publish_called
router_publish_called = True
await trio.lowlevel.checkpoint()
original_router_publish = pubsub.router.publish
pubsub.router.publish = mock_router_publish
try:
# Attempt to push message from blacklisted forwarder
await pubsub.push_msg(blacklisted_forwarder, msg)
# Verify message was rejected
assert (
not router_publish_called
), "Router.publish should not be called for blacklisted forwarder"
assert not pubsub._is_msg_seen(
msg
), "Message from blacklisted forwarder should not be marked as seen"
finally:
pubsub.router.publish = original_router_publish
@pytest.mark.trio
async def test_blacklist_tears_down_existing_connection():
"""
Verify that if a peer is already in pubsub.peers and pubsub.peer_topics,
calling add_to_blacklist(peer_id) immediately resets its stream and
removes it from both places.
"""
# Create two pubsub instances (floodsub), so they can connect to each other
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
pubsub0, pubsub1 = pubsubs_fsub
# 1) Connect peer1 to peer0
await connect(pubsub0.host, pubsub1.host)
# Give handle_peer_queue some time to run
await trio.sleep(0.1)
# After connect, pubsub0.peers should contain pubsub1.my_id
assert pubsub1.my_id in pubsub0.peers
# 2) Manually record a subscription from peer1 under TESTING_TOPIC,
# so that peer1 shows up in pubsub0.peer_topics[TESTING_TOPIC].
sub_msg = rpc_pb2.RPC.SubOpts(subscribe=True, topicid=TESTING_TOPIC)
pubsub0.handle_subscription(pubsub1.my_id, sub_msg)
assert TESTING_TOPIC in pubsub0.peer_topics
assert pubsub1.my_id in pubsub0.peer_topics[TESTING_TOPIC]
# 3) Now blacklist peer1
pubsub0.add_to_blacklist(pubsub1.my_id)
# Allow the asynchronous teardown task (_teardown_if_connected) to run
await trio.sleep(0.1)
# 4a) pubsub0.peers should no longer contain peer1
assert pubsub1.my_id not in pubsub0.peers
# 4b) pubsub0.peer_topics[TESTING_TOPIC] should no longer contain peer1
# (or TESTING_TOPIC may have been removed entirely if no other peers remain)
if TESTING_TOPIC in pubsub0.peer_topics:
assert pubsub1.my_id not in pubsub0.peer_topics[TESTING_TOPIC]
else:
# Its also fine if the entire topic entry was pruned
assert TESTING_TOPIC not in pubsub0.peer_topics

View File

@ -1,166 +0,0 @@
# libp2p Interoperability Tests
This directory contains interoperability tests between py-libp2p and rust-libp2p implementations, focusing on the ping protocol to verify core compatibility.
## Overview
The tests verify the following libp2p components work correctly between implementations:
- **Transport Layer**: TCP connection establishment
- **Security Layer**: Noise encryption protocol
- **Stream Multiplexing**: Yamux multiplexer compatibility
- **Protocol Negotiation**: Multistream-select protocol selection
- **Application Protocol**: Ping protocol (`/ipfs/ping/1.0.0`)
## Test Structure
```
├── py_node/
│ └── ping.py # Python libp2p ping client/server
├── rust_node/
│ ├── src/main.rs # Rust libp2p ping client/server
│ └── Cargo.toml
└── scripts/
├── run_py_to_rust_test.ps1 # Test: Python client → Rust server
└── run_rust_to_py_test.ps1 # Test: Rust client → Python server
```
## Prerequisites
### Python Environment
```bash
# Install py-libp2p and dependencies
pip install .
```
### Rust Environment
```bash
# Ensure Rust is installed
rustc --version
cargo --version
# Dependencies are defined in rust_node/Cargo.toml
```
## Running Tests
### Test 1: Rust Client → Python Server
This test starts a Python server and connects with a Rust client:
```powershell
# Run the automated test
.\scripts\run_rust_to_py_test.ps1
# Or with custom parameters
.\scripts\run_rust_to_py_test.ps1 -Port 9000 -PingCount 10
```
**Manual steps:**
1. Start Python server: `python py_node/ping.py server --port 8000`
2. Note the Peer ID from server output
3. Run Rust client: `cargo run --manifest-path rust_node/Cargo.toml -- /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID>`
### Test 2: Python Client → Rust Server
This test starts a Rust server and connects with a Python client:
```powershell
# Run the automated test (requires manual intervention)
.\scripts\run_py_to_rust_test.ps1
# Follow the on-screen instructions to complete the test
```
**Manual steps:**
1. Start Rust server: `cargo run --manifest-path rust_node/Cargo.toml`
2. Note the Peer ID and port from server output
3. Run Python client: `python py_node/ping.py client /ip4/127.0.0.1/tcp/<PORT>/p2p/<PEER_ID> --count 5`
## Expected Behavior
### Successful Test Output
**Python Server Logs:**
```
[INFO] Starting py-libp2p ping server...
[INFO] Peer ID: QmYourPeerIdHere
[INFO] Listening: /ip4/0.0.0.0/tcp/8000
[INFO] New ping stream opened by 12D3KooW...
[PING 1] Received ping from 12D3KooW...: 32 bytes
[PING 1] Echoed ping back to 12D3KooW...
```
**Rust Client Logs:**
```
Local peer ID: 12D3KooW...
Listening on "/ip4/0.0.0.0/tcp/54321"
Dialed /ip4/127.0.0.1/tcp/8000/p2p/QmYourPeerIdHere
Behaviour(Event { peer: QmYourPeerIdHere, result: Ok(Pong) })
```
### Performance Metrics
The tests measure:
- **Connection Establishment Time**: Time to establish secure connection
- **Round-Trip Time (RTT)**: Latency for ping/pong exchanges
- **Success Rate**: Percentage of successful ping attempts
- **Protocol Negotiation**: Successful selection of `/ipfs/ping/1.0.0`
## Troubleshooting
### Common Issues
1. **Protocol Mismatch**: Ensure both implementations use the same protocol ID
- Python: `/ipfs/ping/1.0.0`
- Rust: `/ipfs/ping/1.0.0` (default ping protocol)
2. **Connection Timeout**:
- Check firewall settings
- Verify correct IP addresses and ports
- Ensure both peers are running
3. **Noise Encryption Errors**:
- Verify cryptography library versions
- Check that both implementations support the same Noise variants
4. **Yamux Multiplexing Issues**:
- Confirm Yamux protocol versions match
- Check stream handling implementation
### Debug Logging
Enable detailed logging:
**Python:**
```bash
# Logs are automatically written to ping_debug.log
tail -f ping_debug.log
```
**Rust:**
```bash
# Set environment variable for detailed logs
$env:RUST_LOG="debug"
cargo run --manifest-path rust_node/Cargo.toml
```
## Interoperability Checklist
- [ ] TCP transport connection establishment
- [ ] Noise encryption handshake
- [ ] Yamux stream multiplexing
- [ ] Multistream protocol negotiation
- [ ] Ping protocol payload exchange (32 bytes)
- [ ] Proper connection cleanup
- [ ] Error handling and timeouts
- [ ] Performance metrics collection
## Contributing
When adding new tests:
1. Follow the existing pattern for client/server implementations
2. Add appropriate error handling and logging
3. Update this README with new test procedures
4. Ensure tests clean up resources properly

View File

@ -1,427 +0,0 @@
import argparse
import logging
from cryptography.hazmat.primitives.asymmetric import (
x25519,
)
import multiaddr
import trio
from libp2p import (
generate_new_rsa_identity,
new_host,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.stream_muxer.yamux.yamux import (
Yamux,
)
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"),
],
)
# Standard libp2p ping protocol - this is what rust-libp2p uses by default
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60
async def handle_ping(stream: INetStream) -> None:
"""Handle incoming ping requests from rust-libp2p clients"""
peer_id = stream.muxed_conn.peer_id
print(f"[INFO] New ping stream opened by {peer_id}")
logging.info(f"Ping handler called for peer {peer_id}")
ping_count = 0
try:
while True:
try:
print(f"[INFO] Waiting for ping data from {peer_id}...")
logging.debug(f"Stream state: {stream}")
data = await stream.read(PING_LENGTH)
if not data:
print(
f"[INFO] No data received,"
f" connection likely closed by {peer_id}"
)
logging.debug("No data received, stream closed")
break
if len(data) == 0:
print(f"[INFO] Empty data received, connection closed by {peer_id}")
logging.debug("Empty data received")
break
ping_count += 1
print(
f"[PING {ping_count}] Received ping from {peer_id}:"
f" {len(data)} bytes"
)
logging.debug(f"Ping data: {data.hex()}")
# Echo the data back (this is what ping protocol does)
await stream.write(data)
print(f"[PING {ping_count}] Echoed ping back to {peer_id}")
except Exception as e:
print(f"[ERROR] Error in ping loop with {peer_id}: {e}")
logging.exception("Ping loop error")
break
except Exception as e:
print(f"[ERROR] Error handling ping from {peer_id}: {e}")
logging.exception("Ping handler error")
finally:
try:
print(f"[INFO] Closing ping stream with {peer_id}")
await stream.close()
except Exception as e:
logging.debug(f"Error closing stream: {e}")
print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)")
async def send_ping_sequence(stream: INetStream, count: int = 5) -> None:
"""Send a sequence of pings compatible with rust-libp2p."""
peer_id = stream.muxed_conn.peer_id
print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)")
import os
import time
rtts = []
for i in range(1, count + 1):
try:
# Generate random 32-byte payload as per ping protocol spec
payload = os.urandom(PING_LENGTH)
print(f"[PING {i}/{count}] Sending ping to {peer_id}")
logging.debug(f"Sending payload: {payload.hex()}")
start_time = time.time()
await stream.write(payload)
with trio.fail_after(RESP_TIMEOUT):
response = await stream.read(PING_LENGTH)
end_time = time.time()
rtt = (end_time - start_time) * 1000
if (
response
and len(response) >= PING_LENGTH
and response[:PING_LENGTH] == payload
):
rtts.append(rtt)
print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms")
else:
print(f"[ERROR] Ping {i} failed: response mismatch or incomplete")
if response:
logging.debug(f"Expected: {payload.hex()}")
logging.debug(f"Received: {response.hex()}")
if i < count:
await trio.sleep(1)
except trio.TooSlowError:
print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s")
except Exception as e:
print(f"[ERROR] Ping {i} failed: {e}")
logging.exception(f"Ping {i} error")
# Print statistics
if rtts:
avg_rtt = sum(rtts) / len(rtts)
min_rtt = min(rtts)
max_rtt = max(rtts) # Fixed typo: was max_rtts
success_count = len(rtts)
loss_rate = ((count - success_count) / count) * 100
print(f"\n[STATS] Ping Statistics:")
print(
f" Packets: Sent={count}, Received={success_count},"
f" Lost={count - success_count}"
)
print(f" Loss rate: {loss_rate:.1f}%")
print(
f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms,"
f" max={max_rtt:.2f}ms"
)
else:
print(f"\n[STATS] All pings failed ({count} attempts)")
def create_noise_keypair():
"""Create a Noise protocol keypair for secure communication"""
try:
x25519_private_key = x25519.X25519PrivateKey.generate()
class NoisePrivateKey:
def __init__(self, key):
self._key = key
def to_bytes(self):
return self._key.private_bytes_raw()
def public_key(self):
return NoisePublicKey(self._key.public_key())
def get_public_key(self):
return NoisePublicKey(self._key.public_key())
class NoisePublicKey:
def __init__(self, key):
self._key = key
def to_bytes(self):
return self._key.public_bytes_raw()
return NoisePrivateKey(x25519_private_key)
except Exception as e:
logging.error(f"Failed to create Noise keypair: {e}")
return None
async def run_server(port: int) -> None:
"""Run ping server that accepts connections from rust-libp2p clients."""
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
key_pair = generate_new_rsa_identity()
logging.debug("Generated RSA keypair")
noise_privkey = create_noise_keypair()
if not noise_privkey:
print("[ERROR] Failed to create Noise keypair")
return
logging.debug("Generated Noise keypair")
noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey)
logging.debug(f"Noise transport initialized: {noise_transport}")
sec_opt = {TProtocol("/noise"): noise_transport}
muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux}
logging.info(f"Using muxer: {muxer_opt}")
host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt)
print("[INFO] Starting py-libp2p ping server...")
async with host.run(listen_addrs=[listen_addr]):
print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}")
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
# Also register alternative protocol IDs for better compatibility
alt_protocols = [
TProtocol("/ping/1.0.0"),
TProtocol("/libp2p/ping/1.0.0"),
]
for alt_proto in alt_protocols:
print(f"[INFO] Also registering handler for: {alt_proto}")
host.set_stream_handler(alt_proto, handle_ping)
print("[INFO] Server started successfully!")
print(f"[INFO] Peer ID: {host.get_id()}")
print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}")
print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}")
print(f"[INFO] Security: Noise encryption")
print(f"[INFO] Muxer: Yamux stream multiplexing")
print("\n[INFO] Registered protocols:")
print(f" - {PING_PROTOCOL_ID}")
for proto in alt_protocols:
print(f" - {proto}")
peer_id = host.get_id()
print("\n[TEST] Test with rust-libp2p:")
print(f" cargo run -- /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}")
print("\n[TEST] Test with py-libp2p:")
print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}")
print("\n[INFO] Waiting for connections...")
print("Press Ctrl+C to exit")
await trio.sleep_forever()
async def run_client(destination: str, count: int = 5) -> None:
"""Run ping client to test connectivity with another peer."""
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")
key_pair = generate_new_rsa_identity()
logging.debug("Generated RSA keypair")
noise_privkey = create_noise_keypair()
if not noise_privkey:
print("[ERROR] Failed to create Noise keypair")
return 1
logging.debug("Generated Noise keypair")
noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey)
logging.debug(f"Noise transport initialized: {noise_transport}")
sec_opt = {TProtocol("/noise"): noise_transport}
muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux}
logging.info(f"Using muxer: {muxer_opt}")
host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt)
print("[INFO] Starting py-libp2p ping client...")
async with host.run(listen_addrs=[listen_addr]):
print(f"[INFO] Our Peer ID: {host.get_id()}")
print(f"[INFO] Target: {destination}")
print("[INFO] Security: Noise encryption")
print("[INFO] Muxer: Yamux stream multiplexing")
try:
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)
target_peer_id = info.peer_id
print(f"[INFO] Target Peer ID: {target_peer_id}")
print("[INFO] Connecting to peer...")
await host.connect(info)
print("[INFO] Connection established!")
# Try protocols in order of preference
# Start with the standard libp2p ping protocol
protocols_to_try = [
PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol
TProtocol("/ping/1.0.0"), # Alternative
TProtocol("/libp2p/ping/1.0.0"), # Another alternative
]
stream = None
for proto in protocols_to_try:
try:
print(f"[INFO] Trying to open stream with protocol: {proto}")
stream = await host.new_stream(target_peer_id, [proto])
print(f"[INFO] Stream opened with protocol: {proto}")
break
except Exception as e:
print(f"[ERROR] Failed to open stream with {proto}: {e}")
logging.debug(f"Protocol {proto} failed: {e}")
continue
if not stream:
print("[ERROR] Failed to open stream with any ping protocol")
print("[ERROR] Ensure the target peer supports one of these protocols:")
for proto in protocols_to_try:
print(f"[ERROR] - {proto}")
return 1
await send_ping_sequence(stream, count)
await stream.close()
print("[INFO] Stream closed successfully")
except Exception as e:
print(f"[ERROR] Client error: {e}")
logging.exception("Client error")
import traceback
traceback.print_exc()
return 1
print("\n[INFO] Client stopped")
return 0
def main() -> None:
"""Main function with argument parsing."""
description = """
py-libp2p ping tool for interoperability testing with rust-libp2p.
Uses Noise encryption and Yamux multiplexing for compatibility.
Server mode: Listens for ping requests from rust-libp2p or py-libp2p clients.
Client mode: Sends ping requests to rust-libp2p or py-libp2p servers.
The tool implements the standard libp2p ping protocol (/ipfs/ping/1.0.0)
which exchanges 32-byte random payloads and measures round-trip time.
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Examples:
python ping.py server # Start server on port 8000
python ping.py server --port 9000 # Start server on port 9000
python ping.py client {example_maddr}
python ping.py client {example_maddr} --count 10
Protocols supported:
- /ipfs/ping/1.0.0 (primary, rust-libp2p default)
- /ping/1.0.0 (alternative)
- /libp2p/ping/1.0.0 (alternative)
""",
)
subparsers = parser.add_subparsers(dest="mode", help="Operation mode")
server_parser = subparsers.add_parser("server", help="Run as ping server")
server_parser.add_argument(
"--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)"
)
client_parser = subparsers.add_parser("client", help="Run as ping client")
client_parser.add_argument("destination", help="Target peer multiaddr")
client_parser.add_argument(
"--count",
"-c",
type=int,
default=5,
help="Number of pings to send (default: 5)",
)
args = parser.parse_args()
if not args.mode:
parser.print_help()
return 1
try:
if args.mode == "server":
trio.run(run_server, args.port)
elif args.mode == "client":
return trio.run(run_client, args.destination, args.count)
except KeyboardInterrupt:
print("\n[INFO] Goodbye!")
return 0
except Exception as e:
print(f"[ERROR] Fatal error: {e}")
logging.exception("Fatal error")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
exit(main())

View File

@ -1,18 +0,0 @@
[package]
name = "ping-example"
version = "0.1.0"
edition.workspace = true
publish = false
license = "MIT"
[package.metadata.release]
release = false
[dependencies]
futures = { workspace = true }
libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "tokio", "yamux", "rsa"] }
tokio = { workspace = true, features = ["full"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
[lints]
workspace = true

View File

@ -1,30 +0,0 @@
## Description
The ping example showcases how to create a network of nodes that establish connections, negotiate the ping protocol, and ping each other.
## Usage
To run the example, follow these steps:
1. In a first terminal window, run the following command:
```sh
cargo run
```
This command starts a node and prints the `PeerId` and the listening addresses, such as `Listening on "/ip4/0.0.0.0/tcp/24915"`.
2. In a second terminal window, start a new instance of the example with the following command:
```sh
cargo run -- /ip4/127.0.0.1/tcp/24915
```
Replace `/ip4/127.0.0.1/tcp/24915` with the listen address of the first node obtained from the first terminal window.
3. The two nodes will establish a connection, negotiate the ping protocol, and begin pinging each other.
## Conclusion
The ping example demonstrates the basic usage of **libp2p** to create a simple p2p network and implement a ping protocol.
By running multiple nodes and observing the ping behavior, users can gain insights into how **libp2p** facilitates communication and interaction between peers.

View File

@ -1,68 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![doc = include_str!("../README.md")]
use std::{error::Error, time::Duration};
use futures::prelude::*;
use libp2p::{noise, ping, swarm::SwarmEvent, tcp, yamux, Multiaddr};
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|_| ping::Behaviour::default())?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
.build();
// Print the peer ID
println!("Local peer ID: {}", swarm.local_peer_id());
// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Dial the peer identified by the multi-address given as the second
// command-line argument, if any.
if let Some(addr) = std::env::args().nth(1) {
let remote: Multiaddr = addr.parse()?;
swarm.dial(remote)?;
println!("Dialed {addr}")
}
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
SwarmEvent::Behaviour(event) => println!("{event:?}"),
_ => {}
}
}
}

View File

@ -1,44 +0,0 @@
# scripts/run_py_to_rust_test.ps1
# Test script for py-libp2p client connecting to rust-libp2p server
param(
[int]$PingCount = 5,
[int]$TimeoutSeconds = 30
)
Write-Host "=== py-libp2p to rust-libp2p Interop Test ===" -ForegroundColor Cyan
Write-Host "Starting rust-libp2p server..." -ForegroundColor Yellow
# Start rust server in background
$rustProcess = Start-Process -FilePath "cargo" -ArgumentList "run" -WorkingDirectory "rust_node" -PassThru -WindowStyle Hidden
# Wait a moment for server to start
Start-Sleep -Seconds 3
try {
# Get the rust server's listening address from its output
# For now, we'll assume it's listening on a predictable port
# In a real scenario, you'd parse the server output to get the actual address
Write-Host "Waiting for rust server to start..." -ForegroundColor Yellow
Start-Sleep -Seconds 5
# Try to find the server's peer ID and port from netstat or process output
# For this test, we'll need to manually check the rust server output
Write-Host "Please check the rust server output for its Peer ID and port" -ForegroundColor Red
Write-Host "Then run the Python client manually with:" -ForegroundColor Yellow
Write-Host "python py_node/ping.py client /ip4/127.0.0.1/tcp/<PORT>/p2p/<PEER_ID> --count $PingCount" -ForegroundColor Green
# Keep the server running
Write-Host "Press any key to stop the test..." -ForegroundColor Cyan
$null = $Host.UI.RawUI.ReadKey("NoEcho,IncludeKeyDown")
} finally {
# Clean up
Write-Host "Stopping rust server..." -ForegroundColor Yellow
if ($rustProcess -and !$rustProcess.HasExited) {
$rustProcess.Kill()
$rustProcess.WaitForExit(5000)
}
Write-Host "Test completed." -ForegroundColor Green
}

View File

@ -1,78 +0,0 @@
# scripts/run_rust_to_py_test.ps1
# Test script for rust-libp2p client connecting to py-libp2p server
param(
[int]$Port = 8000,
[int]$PingCount = 5,
[int]$TimeoutSeconds = 30
)
Write-Host "=== rust-libp2p to py-libp2p Interop Test ===" -ForegroundColor Cyan
Write-Host "Starting py-libp2p server on port $Port..." -ForegroundColor Yellow
# Start Python server in background
$pyProcess = Start-Process -FilePath "python" -ArgumentList "py_node/ping.py", "server", "--port", $Port -PassThru -RedirectStandardOutput "py_server_output.txt" -RedirectStandardError "py_server_error.txt"
# Wait for server to start
Start-Sleep -Seconds 5
try {
# Read the server output to get peer ID
$maxWaitTime = 10
$waited = 0
$peerID = $null
while ($waited -lt $maxWaitTime -and !$peerID) {
if (Test-Path "py_server_output.txt") {
$output = Get-Content "py_server_output.txt" -Raw
if ($output -match "Peer ID: ([\w\d]+)") {
$peerID = $matches[1]
break
}
}
Start-Sleep -Seconds 1
$waited++
}
if (!$peerID) {
Write-Host "Could not extract Peer ID from Python server output" -ForegroundColor Red
Write-Host "Server output:" -ForegroundColor Yellow
if (Test-Path "py_server_output.txt") {
Get-Content "py_server_output.txt"
}
if (Test-Path "py_server_error.txt") {
Write-Host "Server errors:" -ForegroundColor Red
Get-Content "py_server_error.txt"
}
return
}
$multiaddr = "/ip4/127.0.0.1/tcp/$Port/p2p/$peerID"
Write-Host "Python server started with Peer ID: $peerID" -ForegroundColor Green
Write-Host "Full address: $multiaddr" -ForegroundColor Green
Write-Host "Starting rust client..." -ForegroundColor Yellow
# Run rust client
$rustResult = Start-Process -FilePath "cargo" -ArgumentList "run", "--", $multiaddr -WorkingDirectory "rust_node" -Wait -PassThru -NoNewWindow
if ($rustResult.ExitCode -eq 0) {
Write-Host "Rust client completed successfully!" -ForegroundColor Green
} else {
Write-Host "Rust client failed with exit code: $($rustResult.ExitCode)" -ForegroundColor Red
}
} finally {
# Clean up
Write-Host "Stopping Python server..." -ForegroundColor Yellow
if ($pyProcess -and !$pyProcess.HasExited) {
$pyProcess.Kill()
$pyProcess.WaitForExit(5000)
}
# Clean up output files
if (Test-Path "py_server_output.txt") { Remove-Item "py_server_output.txt" }
if (Test-Path "py_server_error.txt") { Remove-Item "py_server_error.txt" }
Write-Host "Test completed." -ForegroundColor Green
}

View File

@ -1,127 +0,0 @@
import pytest
import trio
from libp2p.stream_muxer.exceptions import (
MuxedStreamClosed,
MuxedStreamError,
)
from libp2p.stream_muxer.mplex.datastructures import (
StreamID,
)
from libp2p.stream_muxer.mplex.mplex_stream import (
MplexStream,
)
from libp2p.stream_muxer.yamux.yamux import (
YamuxStream,
)
class DummySecuredConn:
async def write(self, data):
pass
class MockMuxedConn:
def __init__(self):
self.streams = {}
self.streams_lock = trio.Lock()
self.event_shutting_down = trio.Event()
self.event_closed = trio.Event()
self.event_started = trio.Event()
self.secured_conn = DummySecuredConn() # For YamuxStream
async def send_message(self, flag, data, stream_id):
pass
def get_remote_address(self):
return None
@pytest.mark.trio
async def test_mplex_stream_async_context_manager():
muxed_conn = MockMuxedConn()
stream_id = StreamID(1, True) # Use real StreamID
stream = MplexStream(
name="test_stream",
stream_id=stream_id,
muxed_conn=muxed_conn,
incoming_data_channel=trio.open_memory_channel(8)[1],
)
async with stream as s:
assert s is stream
assert not stream.event_local_closed.is_set()
assert not stream.event_remote_closed.is_set()
assert not stream.event_reset.is_set()
assert stream.event_local_closed.is_set()
@pytest.mark.trio
async def test_yamux_stream_async_context_manager():
muxed_conn = MockMuxedConn()
stream = YamuxStream(stream_id=1, conn=muxed_conn, is_initiator=True)
async with stream as s:
assert s is stream
assert not stream.closed
assert not stream.send_closed
assert not stream.recv_closed
assert stream.send_closed
@pytest.mark.trio
async def test_mplex_stream_async_context_manager_with_error():
muxed_conn = MockMuxedConn()
stream_id = StreamID(1, True)
stream = MplexStream(
name="test_stream",
stream_id=stream_id,
muxed_conn=muxed_conn,
incoming_data_channel=trio.open_memory_channel(8)[1],
)
with pytest.raises(ValueError):
async with stream as s:
assert s is stream
assert not stream.event_local_closed.is_set()
assert not stream.event_remote_closed.is_set()
assert not stream.event_reset.is_set()
raise ValueError("Test error")
assert stream.event_local_closed.is_set()
@pytest.mark.trio
async def test_yamux_stream_async_context_manager_with_error():
muxed_conn = MockMuxedConn()
stream = YamuxStream(stream_id=1, conn=muxed_conn, is_initiator=True)
with pytest.raises(ValueError):
async with stream as s:
assert s is stream
assert not stream.closed
assert not stream.send_closed
assert not stream.recv_closed
raise ValueError("Test error")
assert stream.send_closed
@pytest.mark.trio
async def test_mplex_stream_async_context_manager_write_after_close():
muxed_conn = MockMuxedConn()
stream_id = StreamID(1, True)
stream = MplexStream(
name="test_stream",
stream_id=stream_id,
muxed_conn=muxed_conn,
incoming_data_channel=trio.open_memory_channel(8)[1],
)
async with stream as s:
assert s is stream
with pytest.raises(MuxedStreamClosed):
await stream.write(b"test data")
@pytest.mark.trio
async def test_yamux_stream_async_context_manager_write_after_close():
muxed_conn = MockMuxedConn()
stream = YamuxStream(stream_id=1, conn=muxed_conn, is_initiator=True)
async with stream as s:
assert s is stream
with pytest.raises(MuxedStreamError):
await stream.write(b"test data")

View File

@ -32,25 +32,18 @@ class BaseInteractiveProcess(AbstractInterativeProcess):
async def wait_until_ready(self) -> None:
patterns_occurred = {pat: False for pat in self.patterns}
buffers = {pat: bytearray() for pat in self.patterns}
async def read_from_daemon_and_check() -> None:
async for data in self.proc.stdout:
# TODO: It takes O(n^2), which is quite bad.
# But it should succeed in a few seconds.
self.bytes_read.extend(data)
for pat, occurred in patterns_occurred.items():
if occurred:
continue
# Check if pattern is in new data or spans across chunks
buf = buffers[pat]
buf.extend(data)
if pat in buf:
if pat in self.bytes_read:
patterns_occurred[pat] = True
else:
keep = min(len(pat) - 1, len(buf))
buffers[pat] = buf[-keep:] if keep > 0 else bytearray()
if all(patterns_occurred.values()):
if all([value for value in patterns_occurred.values()]):
return
with trio.fail_after(TIMEOUT_DURATION):