diff --git a/libp2p/transport/websocket/connection.py b/libp2p/transport/websocket/connection.py index 0322d3fc..372d8d03 100644 --- a/libp2p/transport/websocket/connection.py +++ b/libp2p/transport/websocket/connection.py @@ -13,12 +13,17 @@ class P2PWebSocketConnection(ReadWriteCloser): """ Wraps a WebSocketConnection to provide the raw stream interface that libp2p protocols expect. - + Implements production-ready buffer management and flow control as recommended in the libp2p WebSocket specification. """ - def __init__(self, ws_connection: Any, ws_context: Any = None, max_buffered_amount: int = 4 * 1024 * 1024) -> None: + def __init__( + self, + ws_connection: Any, + ws_context: Any = None, + max_buffered_amount: int = 4 * 1024 * 1024, + ) -> None: self._ws_connection = ws_connection self._ws_context = ws_context self._read_buffer = b"" @@ -31,23 +36,24 @@ class P2PWebSocketConnection(ReadWriteCloser): """Write data with flow control and buffer management""" if self._closed: raise IOException("Connection is closed") - + async with self._write_lock: try: logger.debug(f"WebSocket writing {len(data)} bytes") - + # Check buffer amount for flow control - if hasattr(self._ws_connection, 'bufferedAmount'): + if hasattr(self._ws_connection, "bufferedAmount"): buffered = self._ws_connection.bufferedAmount if buffered > self._max_buffered_amount: logger.warning(f"WebSocket buffer full: {buffered} bytes") - # In production, you might want to wait or implement backpressure + # In production, you might want to + # wait or implement backpressure # For now, we'll continue but log the warning - + # Send as a binary WebSocket message await self._ws_connection.send_message(data) logger.debug(f"WebSocket wrote {len(data)} bytes successfully") - + except Exception as e: logger.error(f"WebSocket write failed: {e}") self._closed = True @@ -147,7 +153,7 @@ class P2PWebSocketConnection(ReadWriteCloser): """Close the WebSocket connection with proper cleanup""" if self._closed: return - + self._closed = True try: # Close the WebSocket connection @@ -157,7 +163,7 @@ class P2PWebSocketConnection(ReadWriteCloser): await self._ws_context.__aexit__(None, None, None) except Exception as e: logger.error(f"Error closing WebSocket connection: {e}") - + def is_closed(self) -> bool: """Check if the connection is closed""" return self._closed diff --git a/libp2p/transport/websocket/transport.py b/libp2p/transport/websocket/transport.py index 0d35f231..a8329bbc 100644 --- a/libp2p/transport/websocket/transport.py +++ b/libp2p/transport/websocket/transport.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) class WebsocketTransport(ITransport): """ Libp2p WebSocket transport: dial and listen on /ip4/.../tcp/.../ws - + Implements production-ready WebSocket transport with: - Flow control and buffer management - Connection limits and rate limiting @@ -25,7 +25,9 @@ class WebsocketTransport(ITransport): - Support for both WS and WSS protocols """ - def __init__(self, upgrader: TransportUpgrader, max_buffered_amount: int = 4 * 1024 * 1024): + def __init__( + self, upgrader: TransportUpgrader, max_buffered_amount: int = 4 * 1024 * 1024 + ): self._upgrader = upgrader self._max_buffered_amount = max_buffered_amount self._connection_count = 0 @@ -57,21 +59,21 @@ class WebsocketTransport(ITransport): # Check connection limits if self._connection_count >= self._max_connections: - raise OpenConnectionError(f"Maximum connections reached: {self._max_connections}") + raise OpenConnectionError( + f"Maximum connections reached: {self._max_connections}" + ) # Use the context manager but don't exit it immediately # The connection will be closed when the RawConnection is closed ws_context = open_websocket_url(ws_url) ws = await ws_context.__aenter__() conn = P2PWebSocketConnection( - ws, - ws_context, - max_buffered_amount=self._max_buffered_amount + ws, ws_context, max_buffered_amount=self._max_buffered_amount ) # type: ignore[attr-defined] - + self._connection_count += 1 - logger.debug(f"WebSocket connection established. Total connections: {self._connection_count}") - + logger.debug(f"Total connections: {self._connection_count}") + return RawConnection(conn, initiator=True) except Exception as e: logger.error(f"Failed to dial WebSocket {maddr}: {e}") diff --git a/tests/interop/test_js_ws_ping.py b/tests/interop/test_js_ws_ping.py index 4be54990..fee251d4 100644 --- a/tests/interop/test_js_ws_ping.py +++ b/tests/interop/test_js_ws_ping.py @@ -16,8 +16,6 @@ from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo from libp2p.peer.peerstore import PeerStore from libp2p.security.insecure.transport import InsecureTransport -from libp2p.security.noise.transport import Transport as NoiseTransport -from libp2p.crypto.ed25519 import create_new_key_pair as create_ed25519_key_pair from libp2p.stream_muxer.yamux.yamux import Yamux from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.websocket.transport import WebsocketTransport