mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Implement closed_stream event handling and enable related tests (#834)
* Implement closed_stream event handling and enable related tests * Fix linting issues and ensure all tests pass * Add logging for exception in SwarmConn and create newsfragment for closed_stream feature
This commit is contained in:
committed by
GitHub
parent
b01596ad92
commit
aa7276c863
@ -23,7 +23,8 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
|
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/
|
||||||
|
04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@ -43,6 +44,21 @@ class SwarmConn(INetConn):
|
|||||||
self.streams = set()
|
self.streams = set()
|
||||||
self.event_closed = trio.Event()
|
self.event_closed = trio.Event()
|
||||||
self.event_started = trio.Event()
|
self.event_started = trio.Event()
|
||||||
|
# Provide back-references/hooks expected by NetStream
|
||||||
|
try:
|
||||||
|
setattr(self.muxed_conn, "swarm", self.swarm)
|
||||||
|
|
||||||
|
# NetStream expects an awaitable remove_stream hook
|
||||||
|
async def _remove_stream_hook(stream: NetStream) -> None:
|
||||||
|
self.remove_stream(stream)
|
||||||
|
|
||||||
|
setattr(self.muxed_conn, "remove_stream", _remove_stream_hook)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(
|
||||||
|
f"Failed to set optional conveniences on muxed_conn "
|
||||||
|
f"for peer {muxed_conn.peer_id}: {e}"
|
||||||
|
)
|
||||||
|
# optional conveniences
|
||||||
if hasattr(muxed_conn, "on_close"):
|
if hasattr(muxed_conn, "on_close"):
|
||||||
logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}")
|
logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}")
|
||||||
setattr(muxed_conn, "on_close", self._on_muxed_conn_closed)
|
setattr(muxed_conn, "on_close", self._on_muxed_conn_closed)
|
||||||
|
|||||||
@ -1,3 +1,7 @@
|
|||||||
|
from collections.abc import (
|
||||||
|
Awaitable,
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from multiaddr import (
|
from multiaddr import (
|
||||||
@ -411,7 +415,15 @@ class Swarm(Service, INetworkService):
|
|||||||
nursery.start_soon(notifee.listen, self, multiaddr)
|
nursery.start_soon(notifee.listen, self, multiaddr)
|
||||||
|
|
||||||
async def notify_closed_stream(self, stream: INetStream) -> None:
|
async def notify_closed_stream(self, stream: INetStream) -> None:
|
||||||
raise NotImplementedError
|
async with trio.open_nursery() as nursery:
|
||||||
|
for notifee in self.notifees:
|
||||||
|
nursery.start_soon(notifee.closed_stream, self, stream)
|
||||||
|
|
||||||
async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
|
async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# Generic notifier used by NetStream._notify_closed
|
||||||
|
async def notify_all(self, notifier: Callable[[INotifee], Awaitable[None]]) -> None:
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
for notifee in self.notifees:
|
||||||
|
nursery.start_soon(notifier, notifee)
|
||||||
|
|||||||
6
newsfragments/826.feature.rst
Normal file
6
newsfragments/826.feature.rst
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
Implement closed_stream notification in MyNotifee
|
||||||
|
|
||||||
|
- Add notify_closed_stream method to swarm notification system for proper stream lifecycle management
|
||||||
|
- Integrate remove_stream hook in SwarmConn to enable stream closure notifications
|
||||||
|
- Add comprehensive tests for closed_stream functionality in test_notify.py
|
||||||
|
- Enable stream lifecycle integration for proper cleanup and resource management
|
||||||
@ -44,8 +44,11 @@ class MyNotifee(INotifee):
|
|||||||
self.events.append(Event.OpenedStream)
|
self.events.append(Event.OpenedStream)
|
||||||
|
|
||||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||||
# TODO: It is not implemented yet.
|
if network is None:
|
||||||
pass
|
raise ValueError("network parameter cannot be None")
|
||||||
|
if stream is None:
|
||||||
|
raise ValueError("stream parameter cannot be None")
|
||||||
|
self.events.append(Event.ClosedStream)
|
||||||
|
|
||||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||||
self.events.append(Event.Connected)
|
self.events.append(Event.Connected)
|
||||||
@ -103,28 +106,20 @@ async def test_notify(security_protocol):
|
|||||||
# Wait for events
|
# Wait for events
|
||||||
assert await wait_for_event(events_0_0, Event.Connected, 1.0)
|
assert await wait_for_event(events_0_0, Event.Connected, 1.0)
|
||||||
assert await wait_for_event(events_0_0, Event.OpenedStream, 1.0)
|
assert await wait_for_event(events_0_0, Event.OpenedStream, 1.0)
|
||||||
# assert await wait_for_event(
|
assert await wait_for_event(events_0_0, Event.ClosedStream, 1.0)
|
||||||
# events_0_0, Event.ClosedStream, 1.0
|
|
||||||
# ) # Not implemented
|
|
||||||
assert await wait_for_event(events_0_0, Event.Disconnected, 1.0)
|
assert await wait_for_event(events_0_0, Event.Disconnected, 1.0)
|
||||||
|
|
||||||
assert await wait_for_event(events_0_1, Event.Connected, 1.0)
|
assert await wait_for_event(events_0_1, Event.Connected, 1.0)
|
||||||
assert await wait_for_event(events_0_1, Event.OpenedStream, 1.0)
|
assert await wait_for_event(events_0_1, Event.OpenedStream, 1.0)
|
||||||
# assert await wait_for_event(
|
assert await wait_for_event(events_0_1, Event.ClosedStream, 1.0)
|
||||||
# events_0_1, Event.ClosedStream, 1.0
|
|
||||||
# ) # Not implemented
|
|
||||||
assert await wait_for_event(events_0_1, Event.Disconnected, 1.0)
|
assert await wait_for_event(events_0_1, Event.Disconnected, 1.0)
|
||||||
|
|
||||||
assert await wait_for_event(events_1_0, Event.Connected, 1.0)
|
assert await wait_for_event(events_1_0, Event.Connected, 1.0)
|
||||||
assert await wait_for_event(events_1_0, Event.OpenedStream, 1.0)
|
assert await wait_for_event(events_1_0, Event.OpenedStream, 1.0)
|
||||||
# assert await wait_for_event(
|
assert await wait_for_event(events_1_0, Event.ClosedStream, 1.0)
|
||||||
# events_1_0, Event.ClosedStream, 1.0
|
|
||||||
# ) # Not implemented
|
|
||||||
assert await wait_for_event(events_1_0, Event.Disconnected, 1.0)
|
assert await wait_for_event(events_1_0, Event.Disconnected, 1.0)
|
||||||
|
|
||||||
assert await wait_for_event(events_1_1, Event.Connected, 1.0)
|
assert await wait_for_event(events_1_1, Event.Connected, 1.0)
|
||||||
assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0)
|
assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0)
|
||||||
# assert await wait_for_event(
|
assert await wait_for_event(events_1_1, Event.ClosedStream, 1.0)
|
||||||
# events_1_1, Event.ClosedStream, 1.0
|
|
||||||
# ) # Not implemented
|
|
||||||
assert await wait_for_event(events_1_1, Event.Disconnected, 1.0)
|
assert await wait_for_event(events_1_1, Event.Disconnected, 1.0)
|
||||||
|
|||||||
Reference in New Issue
Block a user