mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Mplex: only close the send of new stream channel
This commit is contained in:
@ -137,7 +137,7 @@ class Mplex(IMuxedConn):
|
|||||||
"""accepts a muxed stream opened by the other end."""
|
"""accepts a muxed stream opened by the other end."""
|
||||||
try:
|
try:
|
||||||
return await self.new_stream_receive_channel.receive()
|
return await self.new_stream_receive_channel.receive()
|
||||||
except (trio.ClosedResourceError, trio.EndOfChannel):
|
except trio.EndOfChannel:
|
||||||
raise MplexUnavailable
|
raise MplexUnavailable
|
||||||
|
|
||||||
async def send_message(
|
async def send_message(
|
||||||
@ -254,7 +254,7 @@ class Mplex(IMuxedConn):
|
|||||||
mplex_stream = await self._initialize_stream(stream_id, message.decode())
|
mplex_stream = await self._initialize_stream(stream_id, message.decode())
|
||||||
try:
|
try:
|
||||||
await self.new_stream_send_channel.send(mplex_stream)
|
await self.new_stream_send_channel.send(mplex_stream)
|
||||||
except (trio.BrokenResourceError, trio.ClosedResourceError):
|
except trio.ClosedResourceError:
|
||||||
raise MplexUnavailable
|
raise MplexUnavailable
|
||||||
|
|
||||||
async def _handle_message(self, stream_id: StreamID, message: bytes) -> None:
|
async def _handle_message(self, stream_id: StreamID, message: bytes) -> None:
|
||||||
@ -336,6 +336,4 @@ class Mplex(IMuxedConn):
|
|||||||
send_channel = self.streams_msg_channels[stream_id]
|
send_channel = self.streams_msg_channels[stream_id]
|
||||||
await send_channel.aclose()
|
await send_channel.aclose()
|
||||||
self.event_closed.set()
|
self.event_closed.set()
|
||||||
# FIXME: It's enough to just close one side.
|
|
||||||
await self.new_stream_send_channel.aclose()
|
await self.new_stream_send_channel.aclose()
|
||||||
await self.new_stream_receive_channel.aclose()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user