diff --git a/libp2p/network/connection/swarm_connection.py b/libp2p/network/connection/swarm_connection.py index f6e60784..baa9df50 100644 --- a/libp2p/network/connection/swarm_connection.py +++ b/libp2p/network/connection/swarm_connection.py @@ -52,7 +52,7 @@ class SwarmConn(INetConn): # before we cancel the stream handler tasks. await trio.sleep(0.1) - self._notify_disconnected() + await self._notify_disconnected() async def _handle_new_streams(self) -> None: self.event_started.set() @@ -67,7 +67,7 @@ class SwarmConn(INetConn): nursery.start_soon(self._handle_muxed_stream, stream) async def _handle_muxed_stream(self, muxed_stream: IMuxedStream) -> None: - net_stream = self._add_stream(muxed_stream) + net_stream = await self._add_stream(muxed_stream) try: # Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427 await self.swarm.common_stream_handler(net_stream) # type: ignore @@ -75,21 +75,21 @@ class SwarmConn(INetConn): # As long as `common_stream_handler`, remove the stream. self.remove_stream(net_stream) - def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: + async def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: net_stream = NetStream(muxed_stream) self.streams.add(net_stream) - self.swarm.notify_opened_stream(net_stream) + await self.swarm.notify_opened_stream(net_stream) return net_stream - def _notify_disconnected(self) -> None: - self.swarm.notify_disconnected(self) + async def _notify_disconnected(self) -> None: + await self.swarm.notify_disconnected(self) async def start(self) -> None: await self._handle_new_streams() async def new_stream(self) -> NetStream: muxed_stream = await self.muxed_conn.open_stream() - return self._add_stream(muxed_stream) + return await self._add_stream(muxed_stream) def get_streams(self) -> Tuple[NetStream, ...]: return tuple(self.streams) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 45180a98..57ce1358 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -274,7 +274,7 @@ class Swarm(Service, INetworkService): await listener.listen(maddr, self.listener_nursery) # Call notifiers since event occurred - self.notify_listen(maddr) + await self.notify_listen(maddr) return True except IOError: @@ -310,7 +310,7 @@ class Swarm(Service, INetworkService): # Store muxed_conn with peer id self.connections[muxed_conn.peer_id] = swarm_conn # Call notifiers since event occurred - self.notify_connected(swarm_conn) + await self.notify_connected(swarm_conn) return swarm_conn def remove_conn(self, swarm_conn: SwarmConn) -> None: @@ -330,22 +330,28 @@ class Swarm(Service, INetworkService): """ self.notifees.append(notifee) - def notify_opened_stream(self, stream: INetStream) -> None: - for notifee in self.notifees: - self.manager.run_task(notifee.opened_stream, self, stream) + async def notify_opened_stream(self, stream: INetStream) -> None: + async with trio.open_nursery() as nursery: + for notifee in self.notifees: + nursery.start_soon(notifee.opened_stream, self, stream) - # TODO: `notify_closed_stream` + async def notify_connected(self, conn: INetConn) -> None: + async with trio.open_nursery() as nursery: + for notifee in self.notifees: + nursery.start_soon(notifee.connected, self, conn) - def notify_connected(self, conn: INetConn) -> None: - for notifee in self.notifees: - self.manager.run_task(notifee.connected, self, conn) + async def notify_disconnected(self, conn: INetConn) -> None: + async with trio.open_nursery() as nursery: + for notifee in self.notifees: + nursery.start_soon(notifee.disconnected, self, conn) - def notify_disconnected(self, conn: INetConn) -> None: - for notifee in self.notifees: - self.manager.run_task(notifee.disconnected, self, conn) + async def notify_listen(self, multiaddr: Multiaddr) -> None: + async with trio.open_nursery() as nursery: + for notifee in self.notifees: + nursery.start_soon(notifee.listen, self, multiaddr) - def notify_listen(self, multiaddr: Multiaddr) -> None: - for notifee in self.notifees: - self.manager.run_task(notifee.listen, self, multiaddr) + async def notify_closed_stream(self, stream: INetStream) -> None: + raise NotImplementedError - # TODO: `notify_listen_close` + async def notify_listen_close(self, multiaddr: Multiaddr) -> None: + raise NotImplementedError