mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
docs: add documentation and examples for new NetStream state management
This commit is contained in:
262
examples/doc-examples/example_net_stream.py
Normal file
262
examples/doc-examples/example_net_stream.py
Normal file
@ -0,0 +1,262 @@
|
||||
"""
|
||||
Enhanced NetStream Example for py-libp2p with State Management
|
||||
|
||||
This example demonstrates the new NetStream features including:
|
||||
- State tracking and transitions
|
||||
- Proper error handling and validation
|
||||
- Resource cleanup and event notifications
|
||||
- Thread-safe operations with Trio locks
|
||||
|
||||
Based on the standard echo demo but enhanced to show NetStream state management.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import random
|
||||
import secrets
|
||||
|
||||
import multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.crypto.secp256k1 import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.network.stream.exceptions import (
|
||||
StreamClosed,
|
||||
StreamEOF,
|
||||
StreamReset,
|
||||
)
|
||||
from libp2p.network.stream.net_stream import (
|
||||
NetStream,
|
||||
StreamState,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
|
||||
PROTOCOL_ID = TProtocol("/echo/1.0.0")
|
||||
|
||||
|
||||
async def enhanced_echo_handler(stream: NetStream) -> None:
|
||||
"""
|
||||
Enhanced echo handler that demonstrates NetStream state management.
|
||||
"""
|
||||
print(f"New connection established: {stream}")
|
||||
print(f"Initial stream state: {await stream.state}")
|
||||
|
||||
try:
|
||||
# Verify stream is in expected initial state
|
||||
assert await stream.state == StreamState.OPEN
|
||||
assert await stream.is_readable()
|
||||
assert await stream.is_writable()
|
||||
print("✓ Stream initialized in OPEN state")
|
||||
|
||||
# Read incoming data with proper state checking
|
||||
print("Waiting for client data...")
|
||||
|
||||
while await stream.is_readable():
|
||||
try:
|
||||
# Read data from client
|
||||
data = await stream.read(1024)
|
||||
if not data:
|
||||
print("Received empty data, client may have closed")
|
||||
break
|
||||
|
||||
print(f"Received: {data.decode('utf-8').strip()}")
|
||||
|
||||
# Check if we can still write before echoing
|
||||
if await stream.is_writable():
|
||||
await stream.write(data)
|
||||
print(f"Echoed: {data.decode('utf-8').strip()}")
|
||||
else:
|
||||
print("Cannot echo - stream not writable")
|
||||
break
|
||||
|
||||
except StreamEOF:
|
||||
print("Client closed their write side (EOF)")
|
||||
break
|
||||
except StreamReset:
|
||||
print("Stream was reset by client")
|
||||
return
|
||||
except StreamClosed as e:
|
||||
print(f"Stream operation failed: {e}")
|
||||
break
|
||||
|
||||
# Demonstrate graceful closure
|
||||
current_state = await stream.state
|
||||
print(f"Current state before close: {current_state}")
|
||||
|
||||
if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]:
|
||||
await stream.close()
|
||||
print("Server closed write side")
|
||||
|
||||
final_state = await stream.state
|
||||
print(f"Final stream state: {final_state}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Handler error: {e}")
|
||||
# Reset stream on unexpected errors
|
||||
if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]:
|
||||
await stream.reset()
|
||||
print("Stream reset due to error")
|
||||
|
||||
|
||||
async def enhanced_client_demo(stream: NetStream) -> None:
|
||||
"""
|
||||
Enhanced client that demonstrates various NetStream state scenarios.
|
||||
"""
|
||||
print(f"Client stream established: {stream}")
|
||||
print(f"Initial state: {await stream.state}")
|
||||
|
||||
try:
|
||||
# Verify initial state
|
||||
assert await stream.state == StreamState.OPEN
|
||||
print("✓ Client stream in OPEN state")
|
||||
|
||||
# Scenario 1: Normal communication
|
||||
message = b"Hello from enhanced NetStream client!\n"
|
||||
|
||||
if await stream.is_writable():
|
||||
await stream.write(message)
|
||||
print(f"Sent: {message.decode('utf-8').strip()}")
|
||||
else:
|
||||
print("Cannot write - stream not writable")
|
||||
return
|
||||
|
||||
# Close write side to signal EOF to server
|
||||
await stream.close()
|
||||
print("Client closed write side")
|
||||
|
||||
# Verify state transition
|
||||
state_after_close = await stream.state
|
||||
print(f"State after close: {state_after_close}")
|
||||
assert state_after_close == StreamState.CLOSE_WRITE
|
||||
assert await stream.is_readable() # Should still be readable
|
||||
assert not await stream.is_writable() # Should not be writable
|
||||
|
||||
# Try to write (should fail)
|
||||
try:
|
||||
await stream.write(b"This should fail")
|
||||
print("ERROR: Write succeeded when it should have failed!")
|
||||
except StreamClosed as e:
|
||||
print(f"✓ Expected error when writing to closed stream: {e}")
|
||||
|
||||
# Read the echo response
|
||||
if await stream.is_readable():
|
||||
try:
|
||||
response = await stream.read()
|
||||
print(f"Received echo: {response.decode('utf-8').strip()}")
|
||||
except StreamEOF:
|
||||
print("Server closed their write side")
|
||||
except StreamReset:
|
||||
print("Stream was reset")
|
||||
|
||||
# Check final state
|
||||
final_state = await stream.state
|
||||
print(f"Final client state: {final_state}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Client error: {e}")
|
||||
# Reset on error
|
||||
await stream.reset()
|
||||
print("Client reset stream due to error")
|
||||
|
||||
|
||||
async def run_enhanced_demo(
|
||||
port: int, destination: str, seed: int | None = None
|
||||
) -> None:
|
||||
"""
|
||||
Run enhanced echo demo with NetStream state management.
|
||||
"""
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
|
||||
# Generate or use provided key
|
||||
if seed:
|
||||
random.seed(seed)
|
||||
secret_number = random.getrandbits(32 * 8)
|
||||
secret = secret_number.to_bytes(length=32, byteorder="big")
|
||||
else:
|
||||
secret = secrets.token_bytes(32)
|
||||
|
||||
host = new_host(key_pair=create_new_key_pair(secret))
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]):
|
||||
print(f"Host ID: {host.get_id().to_string()}")
|
||||
print("=" * 60)
|
||||
|
||||
if not destination: # Server mode
|
||||
print("🖥️ ENHANCED ECHO SERVER MODE")
|
||||
print("=" * 60)
|
||||
|
||||
# Set the enhanced stream handler
|
||||
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
|
||||
|
||||
print(
|
||||
"Run client from another console:\n"
|
||||
f"python3 example_net_stream.py "
|
||||
f"-d {host.get_addrs()[0]}\n"
|
||||
)
|
||||
print("Waiting for connections...")
|
||||
print("Press Ctrl+C to stop server")
|
||||
await trio.sleep_forever()
|
||||
|
||||
else: # Client mode
|
||||
print("📱 ENHANCED ECHO CLIENT MODE")
|
||||
print("=" * 60)
|
||||
|
||||
# Connect to server
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
await host.connect(info)
|
||||
print(f"Connected to server: {info.peer_id.pretty()}")
|
||||
|
||||
# Create stream and run enhanced demo
|
||||
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
|
||||
await enhanced_client_demo(stream)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("CLIENT DEMO COMPLETE")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
example_maddr = (
|
||||
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||
)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--destination",
|
||||
type=str,
|
||||
help=f"destination multiaddr string, e.g. {example_maddr}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--seed",
|
||||
type=int,
|
||||
help="seed for deterministic peer ID generation",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--demo-states", action="store_true", help="run state transition demo only"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
trio.run(run_enhanced_demo, args.port, args.destination, args.seed)
|
||||
except KeyboardInterrupt:
|
||||
print("\n👋 Demo interrupted by user")
|
||||
except Exception as e:
|
||||
print(f"❌ Demo failed: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -39,7 +39,67 @@ class StreamState(Enum):
|
||||
|
||||
|
||||
class NetStream(INetStream):
|
||||
"""Class representing NetStream Handler"""
|
||||
"""
|
||||
Summary
|
||||
_______
|
||||
A Network stream implementation.
|
||||
|
||||
NetStream wraps a muxed stream and provides proper state tracking, resource cleanup,
|
||||
and event notification capabilities.
|
||||
|
||||
State Machine
|
||||
_____________
|
||||
|
||||
.. code:: markdown
|
||||
|
||||
[CREATED] → OPEN → CLOSE_READ → CLOSE_BOTH → [CLEANUP]
|
||||
↓ ↗ ↗
|
||||
CLOSE_WRITE → ← ↗
|
||||
↓ ↗
|
||||
RESET → → → → → → → →
|
||||
|
||||
State Transitions
|
||||
_________________
|
||||
- OPEN → CLOSE_READ: EOF encountered during read()
|
||||
- OPEN → CLOSE_WRITE: Explicit close() call
|
||||
- OPEN → RESET: reset() call or critical stream error
|
||||
- CLOSE_READ → CLOSE_BOTH: Explicit close() call
|
||||
- CLOSE_WRITE → CLOSE_BOTH: EOF encountered during read()
|
||||
- Any state → RESET: reset() call
|
||||
|
||||
Terminal States (trigger cleanup)
|
||||
_________________________________
|
||||
- CLOSE_BOTH: Stream fully closed, triggers resource cleanup
|
||||
- RESET: Stream reset/terminated, triggers resource cleanup
|
||||
|
||||
Operation Validity by State
|
||||
___________________________
|
||||
OPEN: read() ✓ write() ✓ close() ✓ reset() ✓
|
||||
CLOSE_READ: read() ✗ write() ✓ close() ✓ reset() ✓
|
||||
CLOSE_WRITE: read() ✓ write() ✗ close() ✓ reset() ✓
|
||||
CLOSE_BOTH: read() ✗ write() ✗ close() ✓ reset() ✓
|
||||
RESET: read() ✗ write() ✗ close() ✓ reset() ✓
|
||||
|
||||
Cleanup Process (triggered by CLOSE_BOTH or RESET)
|
||||
__________________________________________________
|
||||
1. Remove stream from SwarmConn
|
||||
2. Notify all listeners with ClosedStream event
|
||||
3. Decrement reference counter
|
||||
4. Background cleanup via nursery (if provided)
|
||||
|
||||
Thread Safety
|
||||
_____________
|
||||
All state operations are protected by trio.Lock() for safe concurrent access.
|
||||
State checks and modifications are atomic operations.
|
||||
|
||||
Example: See :file:`examples/doc-examples/example_net_stream.py`
|
||||
|
||||
:param muxed_stream (IMuxedStream): The underlying muxed stream
|
||||
:param nursery (Optional[trio.Nursery]): Nursery for background cleanup tasks
|
||||
:raises StreamClosed: When attempting invalid operations on closed streams
|
||||
:raises StreamEOF: When EOF is encountered during read operations
|
||||
:raises StreamReset: When the underlying stream has been reset
|
||||
"""
|
||||
|
||||
muxed_stream: IMuxedStream
|
||||
protocol_id: Optional[TProtocol]
|
||||
@ -87,7 +147,10 @@ class NetStream(INetStream):
|
||||
Read from stream.
|
||||
|
||||
:param n: number of bytes to read
|
||||
:return: bytes of input
|
||||
:raises StreamClosed: If `NetStream` is closed for reading
|
||||
:raises StreamReset: If `NetStream` is reset
|
||||
:raises StreamEOF: If trying to read after reaching end of file
|
||||
:return: Bytes read from the stream
|
||||
"""
|
||||
async with self._state_lock:
|
||||
if self.__stream_state in [
|
||||
@ -126,6 +189,8 @@ class NetStream(INetStream):
|
||||
Write to stream.
|
||||
|
||||
:param data: bytes to write
|
||||
:raises StreamClosed: If `NetStream` is closed for writing or reset
|
||||
:raises StreamClosed: If `StreamError` occurred while writing
|
||||
"""
|
||||
async with self._state_lock:
|
||||
if self.__stream_state in [
|
||||
@ -218,21 +283,24 @@ class NetStream(INetStream):
|
||||
"""Delegate to the underlying muxed stream."""
|
||||
return self.muxed_stream.get_remote_address()
|
||||
|
||||
def is_closed(self) -> bool:
|
||||
async def is_closed(self) -> bool:
|
||||
"""Check if stream is closed."""
|
||||
return self.__stream_state in [StreamState.CLOSE_BOTH, StreamState.RESET]
|
||||
current_state = await self.state
|
||||
return current_state in [StreamState.CLOSE_BOTH, StreamState.RESET]
|
||||
|
||||
def is_readable(self) -> bool:
|
||||
async def is_readable(self) -> bool:
|
||||
"""Check if stream is readable."""
|
||||
return self.__stream_state not in [
|
||||
current_state = await self.state
|
||||
return current_state not in [
|
||||
StreamState.CLOSE_READ,
|
||||
StreamState.CLOSE_BOTH,
|
||||
StreamState.RESET,
|
||||
]
|
||||
|
||||
def is_writable(self) -> bool:
|
||||
async def is_writable(self) -> bool:
|
||||
"""Check if stream is writable."""
|
||||
return self.__stream_state not in [
|
||||
current_state = await self.state
|
||||
return current_state not in [
|
||||
StreamState.CLOSE_WRITE,
|
||||
StreamState.CLOSE_BOTH,
|
||||
StreamState.RESET,
|
||||
|
||||
1
newsfragments/300.breaking.rst
Normal file
1
newsfragments/300.breaking.rst
Normal file
@ -0,0 +1 @@
|
||||
The `NetStream.state` property is now async and requires `await`. Update any direct state access to use `await stream.state`.
|
||||
1
newsfragments/300.bugfix.rst
Normal file
1
newsfragments/300.bugfix.rst
Normal file
@ -0,0 +1 @@
|
||||
Added proper state management and resource cleanup to `NetStream`, fixing memory leaks and improved error handling.
|
||||
Reference in New Issue
Block a user