diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 7394736e..2d66da82 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -16,6 +16,7 @@ class PubsubNotifee(INotifee): initiator_peers_queue: "trio.MemorySendChannel[ID]" dead_peers_queue: "trio.MemorySendChannel[ID]" + dead_peers_queue_lock: trio.Lock def __init__( self, @@ -29,7 +30,9 @@ class PubsubNotifee(INotifee): can process dead peers after we disconnect from each other """ self.initiator_peers_queue = initiator_peers_queue + self.initiator_peers_queue_lock: trio.Lock() self.dead_peers_queue = dead_peers_queue + self.dead_peers_queue_lock: trio.Lock() async def opened_stream(self, network: INetwork, stream: INetStream) -> None: pass @@ -46,12 +49,17 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - try: - await self.initiator_peers_queue.send(conn.muxed_conn.peer_id) - except trio.BrokenResourceError: - # Raised when the receive channel is closed. - # TODO: Do something with loggers? - ... + async with self.initiator_peers_queue_lock: + try: + await self.initiator_peers_queue.send(conn.muxed_conn.peer_id) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.BusyResourceError, + ): + # Raised when the receive channel is closed. + # TODO: Do something with loggers? + ... async def disconnected(self, network: INetwork, conn: INetConn) -> None: """ @@ -61,7 +69,17 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - await self.dead_peers_queue.send(conn.muxed_conn.peer_id) + async with self.dead_peers_queue_lock: + try: + await self.dead_peers_queue.send(conn.muxed_conn.peer_id) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.BusyResourceError, + ): + # Raised when the receive channel is closed. + # TODO: Do something with loggers? + ... async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: pass