From 5bc4d01eea22f9526ec5784c794efcefd6afff5f Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Fri, 6 Jun 2025 06:27:23 +0000 Subject: [PATCH 1/4] fix: add connection states for net stream Other changes: 1. Add operation validation based on states 2. Gracefully handle exceptions and cleanup --- libp2p/network/stream/net_stream.py | 189 +++++++++++++++++++++++++--- 1 file changed, 174 insertions(+), 15 deletions(-) diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 300f0fa4..5b1e2668 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,3 +1,12 @@ +from enum import ( + Enum, +) +from typing import ( + Optional, +) + +import trio + from libp2p.abc import ( IMuxedStream, INetStream, @@ -19,18 +28,42 @@ from .exceptions import ( ) -# TODO: Handle exceptions from `muxed_stream` -# TODO: Add stream state -# - Reference: https://github.com/libp2p/go-libp2p-swarm/blob/99831444e78c8f23c9335c17d8f7c700ba25ca14/swarm_stream.go # noqa: E501 -class NetStream(INetStream): - muxed_stream: IMuxedStream - protocol_id: TProtocol | None +class StreamState(Enum): + """NetStream States""" + + OPEN = "open" + CLOSE_READ = "close_read" + CLOSE_WRITE = "close_write" + CLOSE_BOTH = "close_both" + RESET = "reset" + + +class NetStream(INetStream): + """Class representing NetStream Handler""" + + muxed_stream: IMuxedStream + protocol_id: Optional[TProtocol] + __stream_state: StreamState + + def __init__( + self, muxed_stream: IMuxedStream, nursery: Optional[trio.Nursery] = None + ) -> None: + super().__init__() - def __init__(self, muxed_stream: IMuxedStream) -> None: self.muxed_stream = muxed_stream self.muxed_conn = muxed_stream.muxed_conn self.protocol_id = None + # For background tasks + self._nursery = nursery + + # State management + self.__stream_state = StreamState.OPEN + self._state_lock = trio.Lock() + + # For notification handling + self._notify_lock = trio.Lock() + def get_protocol(self) -> TProtocol | None: """ :return: protocol id that stream runs on @@ -43,42 +76,168 @@ class NetStream(INetStream): """ self.protocol_id = protocol_id - async def read(self, n: int | None = None) -> bytes: + @property + async def state(self) -> StreamState: + """Get current stream state.""" + async with self._state_lock: + return self.__stream_state + + async def read(self, n: Optional[int] = None) -> bytes: """ Read from stream. :param n: number of bytes to read :return: bytes of input """ + async with self._state_lock: + if self.__stream_state in [ + StreamState.CLOSE_READ, + StreamState.CLOSE_BOTH, + ]: + raise StreamClosed("Stream is closed for reading") + + if self.__stream_state == StreamState.RESET: + raise StreamReset("Stream is reset, cannot be used to read") + try: - return await self.muxed_stream.read(n) + data = await self.muxed_stream.read(n) + return data except MuxedStreamEOF as error: + async with self._state_lock: + if self.__stream_state == StreamState.CLOSE_WRITE: + self.__stream_state = StreamState.CLOSE_BOTH + await self._remove() + elif self.__stream_state == StreamState.OPEN: + self.__stream_state = StreamState.CLOSE_READ raise StreamEOF() from error except MuxedStreamReset as error: + async with self._state_lock: + if self.__stream_state in [ + StreamState.OPEN, + StreamState.CLOSE_READ, + StreamState.CLOSE_WRITE, + ]: + self.__stream_state = StreamState.RESET + await self._remove() raise StreamReset() from error async def write(self, data: bytes) -> None: """ Write to stream. - :return: number of bytes written + :param data: bytes to write """ + async with self._state_lock: + if self.__stream_state in [ + StreamState.CLOSE_WRITE, + StreamState.CLOSE_BOTH, + StreamState.RESET, + ]: + raise StreamClosed("Stream is closed for writing") + try: await self.muxed_stream.write(data) except (MuxedStreamClosed, MuxedStreamError) as error: + async with self._state_lock: + if self.__stream_state == StreamState.OPEN: + self.__stream_state = StreamState.CLOSE_WRITE + elif self.__stream_state == StreamState.CLOSE_READ: + self.__stream_state = StreamState.CLOSE_BOTH + await self._remove() raise StreamClosed() from error async def close(self) -> None: - """Close stream.""" + """Close stream for writing.""" + async with self._state_lock: + if self.__stream_state in [ + StreamState.CLOSE_BOTH, + StreamState.RESET, + StreamState.CLOSE_WRITE, + ]: + return + await self.muxed_stream.close() + async with self._state_lock: + if self.__stream_state == StreamState.CLOSE_READ: + self.__stream_state = StreamState.CLOSE_BOTH + await self._remove() + elif self.__stream_state == StreamState.OPEN: + self.__stream_state = StreamState.CLOSE_WRITE + async def reset(self) -> None: + """Reset stream, closing both ends.""" + async with self._state_lock: + if self.__stream_state == StreamState.RESET: + return + await self.muxed_stream.reset() - def get_remote_address(self) -> tuple[str, int] | None: + async with self._state_lock: + if self.__stream_state in [ + StreamState.OPEN, + StreamState.CLOSE_READ, + StreamState.CLOSE_WRITE, + ]: + self.__stream_state = StreamState.RESET + await self._remove() + + async def _remove(self) -> None: + """ + Remove stream from connection and notify listeners. + This is called when the stream is fully closed or reset. + """ + if hasattr(self.muxed_conn, "remove_stream"): + remove_stream = getattr(self.muxed_conn, "remove_stream") + await remove_stream(self) + + # Notify in background using Trio nursery if available + if self._nursery: + self._nursery.start_soon(self._notify_closed) + else: + await self._notify_closed() + + async def _notify_closed(self) -> None: + """ + Notify all listeners that the stream has been closed. + This runs in a separate task to avoid blocking the main flow. + """ + async with self._notify_lock: + if hasattr(self.muxed_conn, "swarm"): + swarm = getattr(self.muxed_conn, "swarm") + + if hasattr(swarm, "notify_all"): + await swarm.notify_all( + lambda notifiee: notifiee.closed_stream(swarm, self) + ) + + if hasattr(swarm, "refs") and hasattr(swarm.refs, "done"): + swarm.refs.done() + + def get_remote_address(self) -> Optional[tuple[str, int]]: """Delegate to the underlying muxed stream.""" return self.muxed_stream.get_remote_address() - # TODO: `remove`: Called by close and write when the stream is in specific states. - # It notifies `ClosedStream` after `SwarmConn.remove_stream` is called. - # Reference: https://github.com/libp2p/go-libp2p-swarm/blob/99831444e78c8f23c9335c17d8f7c700ba25ca14/swarm_stream.go # noqa: E501 + def is_closed(self) -> bool: + """Check if stream is closed.""" + return self.__stream_state in [StreamState.CLOSE_BOTH, StreamState.RESET] + + def is_readable(self) -> bool: + """Check if stream is readable.""" + return self.__stream_state not in [ + StreamState.CLOSE_READ, + StreamState.CLOSE_BOTH, + StreamState.RESET, + ] + + def is_writable(self) -> bool: + """Check if stream is writable.""" + return self.__stream_state not in [ + StreamState.CLOSE_WRITE, + StreamState.CLOSE_BOTH, + StreamState.RESET, + ] + + def __str__(self) -> str: + """String representation of the stream.""" + return f"" From f7757fa726932359af7a354bfdd1f08321831830 Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Mon, 9 Jun 2025 04:43:53 +0000 Subject: [PATCH 2/4] docs: add documentation and examples for new NetStream state management --- examples/doc-examples/example_net_stream.py | 262 ++++++++++++++++++++ libp2p/network/stream/net_stream.py | 84 ++++++- newsfragments/300.breaking.rst | 1 + newsfragments/300.bugfix.rst | 1 + 4 files changed, 340 insertions(+), 8 deletions(-) create mode 100644 examples/doc-examples/example_net_stream.py create mode 100644 newsfragments/300.breaking.rst create mode 100644 newsfragments/300.bugfix.rst diff --git a/examples/doc-examples/example_net_stream.py b/examples/doc-examples/example_net_stream.py new file mode 100644 index 00000000..62dff05e --- /dev/null +++ b/examples/doc-examples/example_net_stream.py @@ -0,0 +1,262 @@ +""" +Enhanced NetStream Example for py-libp2p with State Management + +This example demonstrates the new NetStream features including: +- State tracking and transitions +- Proper error handling and validation +- Resource cleanup and event notifications +- Thread-safe operations with Trio locks + +Based on the standard echo demo but enhanced to show NetStream state management. +""" + +import argparse +import random +import secrets + +import multiaddr +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.exceptions import ( + StreamClosed, + StreamEOF, + StreamReset, +) +from libp2p.network.stream.net_stream import ( + NetStream, + StreamState, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + +PROTOCOL_ID = TProtocol("/echo/1.0.0") + + +async def enhanced_echo_handler(stream: NetStream) -> None: + """ + Enhanced echo handler that demonstrates NetStream state management. + """ + print(f"New connection established: {stream}") + print(f"Initial stream state: {await stream.state}") + + try: + # Verify stream is in expected initial state + assert await stream.state == StreamState.OPEN + assert await stream.is_readable() + assert await stream.is_writable() + print("āœ“ Stream initialized in OPEN state") + + # Read incoming data with proper state checking + print("Waiting for client data...") + + while await stream.is_readable(): + try: + # Read data from client + data = await stream.read(1024) + if not data: + print("Received empty data, client may have closed") + break + + print(f"Received: {data.decode('utf-8').strip()}") + + # Check if we can still write before echoing + if await stream.is_writable(): + await stream.write(data) + print(f"Echoed: {data.decode('utf-8').strip()}") + else: + print("Cannot echo - stream not writable") + break + + except StreamEOF: + print("Client closed their write side (EOF)") + break + except StreamReset: + print("Stream was reset by client") + return + except StreamClosed as e: + print(f"Stream operation failed: {e}") + break + + # Demonstrate graceful closure + current_state = await stream.state + print(f"Current state before close: {current_state}") + + if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]: + await stream.close() + print("Server closed write side") + + final_state = await stream.state + print(f"Final stream state: {final_state}") + + except Exception as e: + print(f"Handler error: {e}") + # Reset stream on unexpected errors + if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]: + await stream.reset() + print("Stream reset due to error") + + +async def enhanced_client_demo(stream: NetStream) -> None: + """ + Enhanced client that demonstrates various NetStream state scenarios. + """ + print(f"Client stream established: {stream}") + print(f"Initial state: {await stream.state}") + + try: + # Verify initial state + assert await stream.state == StreamState.OPEN + print("āœ“ Client stream in OPEN state") + + # Scenario 1: Normal communication + message = b"Hello from enhanced NetStream client!\n" + + if await stream.is_writable(): + await stream.write(message) + print(f"Sent: {message.decode('utf-8').strip()}") + else: + print("Cannot write - stream not writable") + return + + # Close write side to signal EOF to server + await stream.close() + print("Client closed write side") + + # Verify state transition + state_after_close = await stream.state + print(f"State after close: {state_after_close}") + assert state_after_close == StreamState.CLOSE_WRITE + assert await stream.is_readable() # Should still be readable + assert not await stream.is_writable() # Should not be writable + + # Try to write (should fail) + try: + await stream.write(b"This should fail") + print("ERROR: Write succeeded when it should have failed!") + except StreamClosed as e: + print(f"āœ“ Expected error when writing to closed stream: {e}") + + # Read the echo response + if await stream.is_readable(): + try: + response = await stream.read() + print(f"Received echo: {response.decode('utf-8').strip()}") + except StreamEOF: + print("Server closed their write side") + except StreamReset: + print("Stream was reset") + + # Check final state + final_state = await stream.state + print(f"Final client state: {final_state}") + + except Exception as e: + print(f"Client error: {e}") + # Reset on error + await stream.reset() + print("Client reset stream due to error") + + +async def run_enhanced_demo( + port: int, destination: str, seed: int | None = None +) -> None: + """ + Run enhanced echo demo with NetStream state management. + """ + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + # Generate or use provided key + if seed: + random.seed(seed) + secret_number = random.getrandbits(32 * 8) + secret = secret_number.to_bytes(length=32, byteorder="big") + else: + secret = secrets.token_bytes(32) + + host = new_host(key_pair=create_new_key_pair(secret)) + + async with host.run(listen_addrs=[listen_addr]): + print(f"Host ID: {host.get_id().to_string()}") + print("=" * 60) + + if not destination: # Server mode + print("šŸ–„ļø ENHANCED ECHO SERVER MODE") + print("=" * 60) + + # Set the enhanced stream handler + host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler) + + print( + "Run client from another console:\n" + f"python3 example_net_stream.py " + f"-d {host.get_addrs()[0]}\n" + ) + print("Waiting for connections...") + print("Press Ctrl+C to stop server") + await trio.sleep_forever() + + else: # Client mode + print("šŸ“± ENHANCED ECHO CLIENT MODE") + print("=" * 60) + + # Connect to server + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + await host.connect(info) + print(f"Connected to server: {info.peer_id.pretty()}") + + # Create stream and run enhanced demo + stream = await host.new_stream(info.peer_id, [PROTOCOL_ID]) + await enhanced_client_demo(stream) + + print("\n" + "=" * 60) + print("CLIENT DEMO COMPLETE") + + +def main() -> None: + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument("-p", "--port", default=0, type=int, help="source port number") + parser.add_argument( + "-d", + "--destination", + type=str, + help=f"destination multiaddr string, e.g. {example_maddr}", + ) + parser.add_argument( + "-s", + "--seed", + type=int, + help="seed for deterministic peer ID generation", + ) + parser.add_argument( + "--demo-states", action="store_true", help="run state transition demo only" + ) + + args = parser.parse_args() + + try: + trio.run(run_enhanced_demo, args.port, args.destination, args.seed) + except KeyboardInterrupt: + print("\nšŸ‘‹ Demo interrupted by user") + except Exception as e: + print(f"āŒ Demo failed: {e}") + + +if __name__ == "__main__": + main() diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 5b1e2668..073abfb4 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -39,7 +39,67 @@ class StreamState(Enum): class NetStream(INetStream): - """Class representing NetStream Handler""" + """ + Summary + _______ + A Network stream implementation. + + NetStream wraps a muxed stream and provides proper state tracking, resource cleanup, + and event notification capabilities. + + State Machine + _____________ + + .. code:: markdown + + [CREATED] → OPEN → CLOSE_READ → CLOSE_BOTH → [CLEANUP] + ↓ ↗ ↗ + CLOSE_WRITE → ← ↗ + ↓ ↗ + RESET → → → → → → → → + + State Transitions + _________________ + - OPEN → CLOSE_READ: EOF encountered during read() + - OPEN → CLOSE_WRITE: Explicit close() call + - OPEN → RESET: reset() call or critical stream error + - CLOSE_READ → CLOSE_BOTH: Explicit close() call + - CLOSE_WRITE → CLOSE_BOTH: EOF encountered during read() + - Any state → RESET: reset() call + + Terminal States (trigger cleanup) + _________________________________ + - CLOSE_BOTH: Stream fully closed, triggers resource cleanup + - RESET: Stream reset/terminated, triggers resource cleanup + + Operation Validity by State + ___________________________ + OPEN: read() āœ“ write() āœ“ close() āœ“ reset() āœ“ + CLOSE_READ: read() āœ— write() āœ“ close() āœ“ reset() āœ“ + CLOSE_WRITE: read() āœ“ write() āœ— close() āœ“ reset() āœ“ + CLOSE_BOTH: read() āœ— write() āœ— close() āœ“ reset() āœ“ + RESET: read() āœ— write() āœ— close() āœ“ reset() āœ“ + + Cleanup Process (triggered by CLOSE_BOTH or RESET) + __________________________________________________ + 1. Remove stream from SwarmConn + 2. Notify all listeners with ClosedStream event + 3. Decrement reference counter + 4. Background cleanup via nursery (if provided) + + Thread Safety + _____________ + All state operations are protected by trio.Lock() for safe concurrent access. + State checks and modifications are atomic operations. + + Example: See :file:`examples/doc-examples/example_net_stream.py` + + :param muxed_stream (IMuxedStream): The underlying muxed stream + :param nursery (Optional[trio.Nursery]): Nursery for background cleanup tasks + :raises StreamClosed: When attempting invalid operations on closed streams + :raises StreamEOF: When EOF is encountered during read operations + :raises StreamReset: When the underlying stream has been reset + """ muxed_stream: IMuxedStream protocol_id: Optional[TProtocol] @@ -87,7 +147,10 @@ class NetStream(INetStream): Read from stream. :param n: number of bytes to read - :return: bytes of input + :raises StreamClosed: If `NetStream` is closed for reading + :raises StreamReset: If `NetStream` is reset + :raises StreamEOF: If trying to read after reaching end of file + :return: Bytes read from the stream """ async with self._state_lock: if self.__stream_state in [ @@ -126,6 +189,8 @@ class NetStream(INetStream): Write to stream. :param data: bytes to write + :raises StreamClosed: If `NetStream` is closed for writing or reset + :raises StreamClosed: If `StreamError` occurred while writing """ async with self._state_lock: if self.__stream_state in [ @@ -218,21 +283,24 @@ class NetStream(INetStream): """Delegate to the underlying muxed stream.""" return self.muxed_stream.get_remote_address() - def is_closed(self) -> bool: + async def is_closed(self) -> bool: """Check if stream is closed.""" - return self.__stream_state in [StreamState.CLOSE_BOTH, StreamState.RESET] + current_state = await self.state + return current_state in [StreamState.CLOSE_BOTH, StreamState.RESET] - def is_readable(self) -> bool: + async def is_readable(self) -> bool: """Check if stream is readable.""" - return self.__stream_state not in [ + current_state = await self.state + return current_state not in [ StreamState.CLOSE_READ, StreamState.CLOSE_BOTH, StreamState.RESET, ] - def is_writable(self) -> bool: + async def is_writable(self) -> bool: """Check if stream is writable.""" - return self.__stream_state not in [ + current_state = await self.state + return current_state not in [ StreamState.CLOSE_WRITE, StreamState.CLOSE_BOTH, StreamState.RESET, diff --git a/newsfragments/300.breaking.rst b/newsfragments/300.breaking.rst new file mode 100644 index 00000000..b1d1cfe3 --- /dev/null +++ b/newsfragments/300.breaking.rst @@ -0,0 +1 @@ +The `NetStream.state` property is now async and requires `await`. Update any direct state access to use `await stream.state`. diff --git a/newsfragments/300.bugfix.rst b/newsfragments/300.bugfix.rst new file mode 100644 index 00000000..9f947490 --- /dev/null +++ b/newsfragments/300.bugfix.rst @@ -0,0 +1 @@ +Added proper state management and resource cleanup to `NetStream`, fixing memory leaks and improved error handling. From 47ae20d29c0d4871144d6279c073ce848a9fc181 Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Mon, 9 Jun 2025 04:45:08 +0000 Subject: [PATCH 3/4] fix: run pytests parallely in CI and makefile --- tox.ini | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index 5ebb00ce..44f74bab 100644 --- a/tox.ini +++ b/tox.ini @@ -19,10 +19,10 @@ max_issue_threshold=1 [testenv] usedevelop=True commands= - core: pytest {posargs:tests/core} - interop: pytest {posargs:tests/interop} + core: pytest -n auto {posargs:tests/core} + interop: pytest -n auto {posargs:tests/interop} docs: make check-docs-ci - demos: pytest {posargs:tests/core/examples/test_examples.py} + demos: pytest -n auto {posargs:tests/core/examples/test_examples.py} basepython= docs: python windows-wheel: python From 13d730ae5c76f2efe2729624512068deef5323b9 Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Mon, 9 Jun 2025 19:10:15 +0000 Subject: [PATCH 4/4] fix: improve types according to new typecheck --- examples/doc-examples/example_net_stream.py | 5 +++-- libp2p/network/stream/net_stream.py | 11 ++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/examples/doc-examples/example_net_stream.py b/examples/doc-examples/example_net_stream.py index 62dff05e..d8842bea 100644 --- a/examples/doc-examples/example_net_stream.py +++ b/examples/doc-examples/example_net_stream.py @@ -193,7 +193,7 @@ async def run_enhanced_demo( print("šŸ–„ļø ENHANCED ECHO SERVER MODE") print("=" * 60) - # Set the enhanced stream handler + # type: ignore: Stream is type of NetStream host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler) print( @@ -217,7 +217,8 @@ async def run_enhanced_demo( # Create stream and run enhanced demo stream = await host.new_stream(info.peer_id, [PROTOCOL_ID]) - await enhanced_client_demo(stream) + if isinstance(stream, NetStream): + await enhanced_client_demo(stream) print("\n" + "=" * 60) print("CLIENT DEMO COMPLETE") diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 073abfb4..b54fdda4 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,9 +1,6 @@ from enum import ( Enum, ) -from typing import ( - Optional, -) import trio @@ -102,11 +99,11 @@ class NetStream(INetStream): """ muxed_stream: IMuxedStream - protocol_id: Optional[TProtocol] + protocol_id: TProtocol | None __stream_state: StreamState def __init__( - self, muxed_stream: IMuxedStream, nursery: Optional[trio.Nursery] = None + self, muxed_stream: IMuxedStream, nursery: trio.Nursery | None = None ) -> None: super().__init__() @@ -142,7 +139,7 @@ class NetStream(INetStream): async with self._state_lock: return self.__stream_state - async def read(self, n: Optional[int] = None) -> bytes: + async def read(self, n: int | None = None) -> bytes: """ Read from stream. @@ -279,7 +276,7 @@ class NetStream(INetStream): if hasattr(swarm, "refs") and hasattr(swarm.refs, "done"): swarm.refs.done() - def get_remote_address(self) -> Optional[tuple[str, int]]: + def get_remote_address(self) -> tuple[str, int] | None: """Delegate to the underlying muxed stream.""" return self.muxed_stream.get_remote_address()