mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix: allow accept stream to wait indefinitely
This commit is contained in:
@ -246,10 +246,6 @@ class Swarm(Service, INetworkService):
|
|||||||
logger.debug("attempting to open a stream to peer %s", peer_id)
|
logger.debug("attempting to open a stream to peer %s", peer_id)
|
||||||
|
|
||||||
swarm_conn = await self.dial_peer(peer_id)
|
swarm_conn = await self.dial_peer(peer_id)
|
||||||
dd = "Yes" if swarm_conn is None else "No"
|
|
||||||
|
|
||||||
print(f"Is swarm conn None: {dd}")
|
|
||||||
|
|
||||||
net_stream = await swarm_conn.new_stream()
|
net_stream = await swarm_conn.new_stream()
|
||||||
logger.debug("successfully opened a stream to peer %s", peer_id)
|
logger.debug("successfully opened a stream to peer %s", peer_id)
|
||||||
return net_stream
|
return net_stream
|
||||||
@ -283,18 +279,24 @@ class Swarm(Service, INetworkService):
|
|||||||
async def conn_handler(
|
async def conn_handler(
|
||||||
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
|
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
|
||||||
) -> None:
|
) -> None:
|
||||||
raw_conn = RawConnection(read_write_closer, False)
|
|
||||||
|
|
||||||
# No need to upgrade QUIC Connection
|
# No need to upgrade QUIC Connection
|
||||||
if isinstance(self.transport, QUICTransport):
|
if isinstance(self.transport, QUICTransport):
|
||||||
quic_conn = cast(QUICConnection, raw_conn)
|
try:
|
||||||
await self.add_conn(quic_conn)
|
quic_conn = cast(QUICConnection, read_write_closer)
|
||||||
# NOTE: This is a intentional barrier to prevent from the handler
|
await self.add_conn(quic_conn)
|
||||||
# exiting and closing the connection.
|
peer_id = quic_conn.peer_id
|
||||||
await self.manager.wait_finished()
|
logger.debug(
|
||||||
print("Connection Connected")
|
f"successfully opened connection to peer {peer_id}"
|
||||||
|
)
|
||||||
|
# NOTE: This is a intentional barrier to prevent from the
|
||||||
|
# handler exiting and closing the connection.
|
||||||
|
await self.manager.wait_finished()
|
||||||
|
except Exception:
|
||||||
|
await read_write_closer.close()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
raw_conn = RawConnection(read_write_closer, False)
|
||||||
|
|
||||||
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first
|
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first
|
||||||
# secure the conn and then mux the conn
|
# secure the conn and then mux the conn
|
||||||
try:
|
try:
|
||||||
@ -410,9 +412,10 @@ class Swarm(Service, INetworkService):
|
|||||||
muxed_conn,
|
muxed_conn,
|
||||||
self,
|
self,
|
||||||
)
|
)
|
||||||
print("add_conn called")
|
logger.debug("Swarm::add_conn | starting muxed connection")
|
||||||
self.manager.run_task(muxed_conn.start)
|
self.manager.run_task(muxed_conn.start)
|
||||||
await muxed_conn.event_started.wait()
|
await muxed_conn.event_started.wait()
|
||||||
|
logger.debug("Swarm::add_conn | starting swarm connection")
|
||||||
self.manager.run_task(swarm_conn.start)
|
self.manager.run_task(swarm_conn.start)
|
||||||
await swarm_conn.event_started.wait()
|
await swarm_conn.event_started.wait()
|
||||||
# Store muxed_conn with peer id
|
# Store muxed_conn with peer id
|
||||||
|
|||||||
@ -728,51 +728,47 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
|
|
||||||
async def accept_stream(self, timeout: float | None = None) -> QUICStream:
|
async def accept_stream(self, timeout: float | None = None) -> QUICStream:
|
||||||
"""
|
"""
|
||||||
Accept an incoming stream with timeout support.
|
Accept incoming stream.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
timeout: Optional timeout for accepting streams
|
timeout: Optional timeout. If None, waits indefinitely.
|
||||||
|
|
||||||
Returns:
|
|
||||||
Accepted incoming stream
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
QUICStreamTimeoutError: Accept timeout exceeded
|
|
||||||
QUICConnectionClosedError: Connection is closed
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if self._closed:
|
if self._closed:
|
||||||
raise QUICConnectionClosedError("Connection is closed")
|
raise QUICConnectionClosedError("Connection is closed")
|
||||||
|
|
||||||
timeout = timeout or self.STREAM_ACCEPT_TIMEOUT
|
if timeout is not None:
|
||||||
|
with trio.move_on_after(timeout):
|
||||||
with trio.move_on_after(timeout):
|
return await self._accept_stream_impl()
|
||||||
while True:
|
# Timeout occurred
|
||||||
if self._closed:
|
if self._closed_event.is_set() or self._closed:
|
||||||
raise MuxedConnUnavailable("QUIC connection is closed")
|
raise MuxedConnUnavailable("QUIC connection closed during timeout")
|
||||||
|
else:
|
||||||
async with self._accept_queue_lock:
|
raise QUICStreamTimeoutError(
|
||||||
if self._stream_accept_queue:
|
f"Stream accept timed out after {timeout}s"
|
||||||
stream = self._stream_accept_queue.pop(0)
|
)
|
||||||
logger.debug(f"Accepted inbound stream {stream.stream_id}")
|
|
||||||
return stream
|
|
||||||
|
|
||||||
if self._closed:
|
|
||||||
raise MuxedConnUnavailable(
|
|
||||||
"Connection closed while accepting stream"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Wait for new streams
|
|
||||||
await self._stream_accept_event.wait()
|
|
||||||
|
|
||||||
logger.error(
|
|
||||||
"Timeout occured while accepting stream for local peer "
|
|
||||||
f"{self._local_peer_id.to_string()} on QUIC connection"
|
|
||||||
)
|
|
||||||
if self._closed_event.is_set() or self._closed:
|
|
||||||
raise MuxedConnUnavailable("QUIC connection closed during timeout")
|
|
||||||
else:
|
else:
|
||||||
raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s")
|
# No timeout - wait indefinitely
|
||||||
|
return await self._accept_stream_impl()
|
||||||
|
|
||||||
|
async def _accept_stream_impl(self) -> QUICStream:
|
||||||
|
while True:
|
||||||
|
if self._closed:
|
||||||
|
raise MuxedConnUnavailable("QUIC connection is closed")
|
||||||
|
|
||||||
|
async with self._accept_queue_lock:
|
||||||
|
if self._stream_accept_queue:
|
||||||
|
stream = self._stream_accept_queue.pop(0)
|
||||||
|
logger.debug(f"Accepted inbound stream {stream.stream_id}")
|
||||||
|
return stream
|
||||||
|
|
||||||
|
if self._closed:
|
||||||
|
raise MuxedConnUnavailable("Connection closed while accepting stream")
|
||||||
|
|
||||||
|
# Wait for new streams indefinitely
|
||||||
|
await self._stream_accept_event.wait()
|
||||||
|
|
||||||
|
raise QUICConnectionError("Error occurred while waiting to accept stream")
|
||||||
|
|
||||||
def set_stream_handler(self, handler_function: TQUICStreamHandlerFn) -> None:
|
def set_stream_handler(self, handler_function: TQUICStreamHandlerFn) -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -744,10 +744,6 @@ class QUICListener(IListener):
|
|||||||
f"Started background tasks for connection {dest_cid.hex()}"
|
f"Started background tasks for connection {dest_cid.hex()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._transport._swarm:
|
|
||||||
await self._transport._swarm.add_conn(connection)
|
|
||||||
logger.debug(f"Successfully added connection {dest_cid.hex()} to swarm")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.debug(f"Invoking user callback {dest_cid.hex()}")
|
logger.debug(f"Invoking user callback {dest_cid.hex()}")
|
||||||
await self._handler(connection)
|
await self._handler(connection)
|
||||||
|
|||||||
@ -625,7 +625,7 @@ class QUICStream(IMuxedStream):
|
|||||||
exc_tb: TracebackType | None,
|
exc_tb: TracebackType | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Exit the async context manager and close the stream."""
|
"""Exit the async context manager and close the stream."""
|
||||||
print("Exiting the context and closing the stream")
|
logger.debug("Exiting the context and closing the stream")
|
||||||
await self.close()
|
await self.close()
|
||||||
|
|
||||||
def set_deadline(self, ttl: int) -> bool:
|
def set_deadline(self, ttl: int) -> bool:
|
||||||
|
|||||||
Reference in New Issue
Block a user