Use new type hinting for trio channel

This commit is contained in:
mhchia
2020-01-26 23:56:19 +08:00
parent 42bc4d5d06
commit e3a1dd62e4
2 changed files with 5 additions and 18 deletions

View File

@ -66,10 +66,7 @@ class Mplex(IMuxedConn):
self.streams = {}
self.streams_lock = trio.Lock()
self.streams_msg_channels = {}
channels: Tuple[
"trio.MemorySendChannel[IMuxedStream]",
"trio.MemoryReceiveChannel[IMuxedStream]",
] = trio.open_memory_channel(math.inf)
channels = trio.open_memory_channel[IMuxedStream](math.inf)
self.new_stream_send_channel, self.new_stream_receive_channel = channels
self.event_shutting_down = trio.Event()
self.event_closed = trio.Event()
@ -114,9 +111,7 @@ class Mplex(IMuxedConn):
async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream:
# Use an unbounded buffer, to avoid `handle_incoming` being blocked when doing
# `send_channel.send`.
channels: Tuple[
"trio.MemorySendChannel[bytes]", "trio.MemoryReceiveChannel[bytes]"
] = trio.open_memory_channel(math.inf)
channels = trio.open_memory_channel[bytes](math.inf)
stream = MplexStream(name, stream_id, self, channels[1])
async with self.streams_lock:
self.streams[stream_id] = stream