diff --git a/examples/doc-examples/example_net_stream.py b/examples/doc-examples/example_net_stream.py new file mode 100644 index 00000000..62dff05e --- /dev/null +++ b/examples/doc-examples/example_net_stream.py @@ -0,0 +1,262 @@ +""" +Enhanced NetStream Example for py-libp2p with State Management + +This example demonstrates the new NetStream features including: +- State tracking and transitions +- Proper error handling and validation +- Resource cleanup and event notifications +- Thread-safe operations with Trio locks + +Based on the standard echo demo but enhanced to show NetStream state management. +""" + +import argparse +import random +import secrets + +import multiaddr +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.exceptions import ( + StreamClosed, + StreamEOF, + StreamReset, +) +from libp2p.network.stream.net_stream import ( + NetStream, + StreamState, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + +PROTOCOL_ID = TProtocol("/echo/1.0.0") + + +async def enhanced_echo_handler(stream: NetStream) -> None: + """ + Enhanced echo handler that demonstrates NetStream state management. + """ + print(f"New connection established: {stream}") + print(f"Initial stream state: {await stream.state}") + + try: + # Verify stream is in expected initial state + assert await stream.state == StreamState.OPEN + assert await stream.is_readable() + assert await stream.is_writable() + print("āœ“ Stream initialized in OPEN state") + + # Read incoming data with proper state checking + print("Waiting for client data...") + + while await stream.is_readable(): + try: + # Read data from client + data = await stream.read(1024) + if not data: + print("Received empty data, client may have closed") + break + + print(f"Received: {data.decode('utf-8').strip()}") + + # Check if we can still write before echoing + if await stream.is_writable(): + await stream.write(data) + print(f"Echoed: {data.decode('utf-8').strip()}") + else: + print("Cannot echo - stream not writable") + break + + except StreamEOF: + print("Client closed their write side (EOF)") + break + except StreamReset: + print("Stream was reset by client") + return + except StreamClosed as e: + print(f"Stream operation failed: {e}") + break + + # Demonstrate graceful closure + current_state = await stream.state + print(f"Current state before close: {current_state}") + + if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]: + await stream.close() + print("Server closed write side") + + final_state = await stream.state + print(f"Final stream state: {final_state}") + + except Exception as e: + print(f"Handler error: {e}") + # Reset stream on unexpected errors + if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]: + await stream.reset() + print("Stream reset due to error") + + +async def enhanced_client_demo(stream: NetStream) -> None: + """ + Enhanced client that demonstrates various NetStream state scenarios. + """ + print(f"Client stream established: {stream}") + print(f"Initial state: {await stream.state}") + + try: + # Verify initial state + assert await stream.state == StreamState.OPEN + print("āœ“ Client stream in OPEN state") + + # Scenario 1: Normal communication + message = b"Hello from enhanced NetStream client!\n" + + if await stream.is_writable(): + await stream.write(message) + print(f"Sent: {message.decode('utf-8').strip()}") + else: + print("Cannot write - stream not writable") + return + + # Close write side to signal EOF to server + await stream.close() + print("Client closed write side") + + # Verify state transition + state_after_close = await stream.state + print(f"State after close: {state_after_close}") + assert state_after_close == StreamState.CLOSE_WRITE + assert await stream.is_readable() # Should still be readable + assert not await stream.is_writable() # Should not be writable + + # Try to write (should fail) + try: + await stream.write(b"This should fail") + print("ERROR: Write succeeded when it should have failed!") + except StreamClosed as e: + print(f"āœ“ Expected error when writing to closed stream: {e}") + + # Read the echo response + if await stream.is_readable(): + try: + response = await stream.read() + print(f"Received echo: {response.decode('utf-8').strip()}") + except StreamEOF: + print("Server closed their write side") + except StreamReset: + print("Stream was reset") + + # Check final state + final_state = await stream.state + print(f"Final client state: {final_state}") + + except Exception as e: + print(f"Client error: {e}") + # Reset on error + await stream.reset() + print("Client reset stream due to error") + + +async def run_enhanced_demo( + port: int, destination: str, seed: int | None = None +) -> None: + """ + Run enhanced echo demo with NetStream state management. + """ + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + # Generate or use provided key + if seed: + random.seed(seed) + secret_number = random.getrandbits(32 * 8) + secret = secret_number.to_bytes(length=32, byteorder="big") + else: + secret = secrets.token_bytes(32) + + host = new_host(key_pair=create_new_key_pair(secret)) + + async with host.run(listen_addrs=[listen_addr]): + print(f"Host ID: {host.get_id().to_string()}") + print("=" * 60) + + if not destination: # Server mode + print("šŸ–„ļø ENHANCED ECHO SERVER MODE") + print("=" * 60) + + # Set the enhanced stream handler + host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler) + + print( + "Run client from another console:\n" + f"python3 example_net_stream.py " + f"-d {host.get_addrs()[0]}\n" + ) + print("Waiting for connections...") + print("Press Ctrl+C to stop server") + await trio.sleep_forever() + + else: # Client mode + print("šŸ“± ENHANCED ECHO CLIENT MODE") + print("=" * 60) + + # Connect to server + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + await host.connect(info) + print(f"Connected to server: {info.peer_id.pretty()}") + + # Create stream and run enhanced demo + stream = await host.new_stream(info.peer_id, [PROTOCOL_ID]) + await enhanced_client_demo(stream) + + print("\n" + "=" * 60) + print("CLIENT DEMO COMPLETE") + + +def main() -> None: + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument("-p", "--port", default=0, type=int, help="source port number") + parser.add_argument( + "-d", + "--destination", + type=str, + help=f"destination multiaddr string, e.g. {example_maddr}", + ) + parser.add_argument( + "-s", + "--seed", + type=int, + help="seed for deterministic peer ID generation", + ) + parser.add_argument( + "--demo-states", action="store_true", help="run state transition demo only" + ) + + args = parser.parse_args() + + try: + trio.run(run_enhanced_demo, args.port, args.destination, args.seed) + except KeyboardInterrupt: + print("\nšŸ‘‹ Demo interrupted by user") + except Exception as e: + print(f"āŒ Demo failed: {e}") + + +if __name__ == "__main__": + main() diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 5b1e2668..073abfb4 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -39,7 +39,67 @@ class StreamState(Enum): class NetStream(INetStream): - """Class representing NetStream Handler""" + """ + Summary + _______ + A Network stream implementation. + + NetStream wraps a muxed stream and provides proper state tracking, resource cleanup, + and event notification capabilities. + + State Machine + _____________ + + .. code:: markdown + + [CREATED] → OPEN → CLOSE_READ → CLOSE_BOTH → [CLEANUP] + ↓ ↗ ↗ + CLOSE_WRITE → ← ↗ + ↓ ↗ + RESET → → → → → → → → + + State Transitions + _________________ + - OPEN → CLOSE_READ: EOF encountered during read() + - OPEN → CLOSE_WRITE: Explicit close() call + - OPEN → RESET: reset() call or critical stream error + - CLOSE_READ → CLOSE_BOTH: Explicit close() call + - CLOSE_WRITE → CLOSE_BOTH: EOF encountered during read() + - Any state → RESET: reset() call + + Terminal States (trigger cleanup) + _________________________________ + - CLOSE_BOTH: Stream fully closed, triggers resource cleanup + - RESET: Stream reset/terminated, triggers resource cleanup + + Operation Validity by State + ___________________________ + OPEN: read() āœ“ write() āœ“ close() āœ“ reset() āœ“ + CLOSE_READ: read() āœ— write() āœ“ close() āœ“ reset() āœ“ + CLOSE_WRITE: read() āœ“ write() āœ— close() āœ“ reset() āœ“ + CLOSE_BOTH: read() āœ— write() āœ— close() āœ“ reset() āœ“ + RESET: read() āœ— write() āœ— close() āœ“ reset() āœ“ + + Cleanup Process (triggered by CLOSE_BOTH or RESET) + __________________________________________________ + 1. Remove stream from SwarmConn + 2. Notify all listeners with ClosedStream event + 3. Decrement reference counter + 4. Background cleanup via nursery (if provided) + + Thread Safety + _____________ + All state operations are protected by trio.Lock() for safe concurrent access. + State checks and modifications are atomic operations. + + Example: See :file:`examples/doc-examples/example_net_stream.py` + + :param muxed_stream (IMuxedStream): The underlying muxed stream + :param nursery (Optional[trio.Nursery]): Nursery for background cleanup tasks + :raises StreamClosed: When attempting invalid operations on closed streams + :raises StreamEOF: When EOF is encountered during read operations + :raises StreamReset: When the underlying stream has been reset + """ muxed_stream: IMuxedStream protocol_id: Optional[TProtocol] @@ -87,7 +147,10 @@ class NetStream(INetStream): Read from stream. :param n: number of bytes to read - :return: bytes of input + :raises StreamClosed: If `NetStream` is closed for reading + :raises StreamReset: If `NetStream` is reset + :raises StreamEOF: If trying to read after reaching end of file + :return: Bytes read from the stream """ async with self._state_lock: if self.__stream_state in [ @@ -126,6 +189,8 @@ class NetStream(INetStream): Write to stream. :param data: bytes to write + :raises StreamClosed: If `NetStream` is closed for writing or reset + :raises StreamClosed: If `StreamError` occurred while writing """ async with self._state_lock: if self.__stream_state in [ @@ -218,21 +283,24 @@ class NetStream(INetStream): """Delegate to the underlying muxed stream.""" return self.muxed_stream.get_remote_address() - def is_closed(self) -> bool: + async def is_closed(self) -> bool: """Check if stream is closed.""" - return self.__stream_state in [StreamState.CLOSE_BOTH, StreamState.RESET] + current_state = await self.state + return current_state in [StreamState.CLOSE_BOTH, StreamState.RESET] - def is_readable(self) -> bool: + async def is_readable(self) -> bool: """Check if stream is readable.""" - return self.__stream_state not in [ + current_state = await self.state + return current_state not in [ StreamState.CLOSE_READ, StreamState.CLOSE_BOTH, StreamState.RESET, ] - def is_writable(self) -> bool: + async def is_writable(self) -> bool: """Check if stream is writable.""" - return self.__stream_state not in [ + current_state = await self.state + return current_state not in [ StreamState.CLOSE_WRITE, StreamState.CLOSE_BOTH, StreamState.RESET, diff --git a/newsfragments/300.breaking.rst b/newsfragments/300.breaking.rst new file mode 100644 index 00000000..b1d1cfe3 --- /dev/null +++ b/newsfragments/300.breaking.rst @@ -0,0 +1 @@ +The `NetStream.state` property is now async and requires `await`. Update any direct state access to use `await stream.state`. diff --git a/newsfragments/300.bugfix.rst b/newsfragments/300.bugfix.rst new file mode 100644 index 00000000..9f947490 --- /dev/null +++ b/newsfragments/300.bugfix.rst @@ -0,0 +1 @@ +Added proper state management and resource cleanup to `NetStream`, fixing memory leaks and improved error handling.