From e5eb01d22b38a146dd4c9060192ce90217c84602 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 9 Sep 2019 22:48:49 +0800 Subject: [PATCH] Fix stream read --- libp2p/stream_muxer/mplex/mplex_stream.py | 28 ++++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 19a2637c..8c219659 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,5 +1,5 @@ import asyncio -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING from libp2p.stream_muxer.abc import IMuxedStream @@ -8,7 +8,6 @@ from .datastructures import StreamID from .exceptions import MplexStreamClosed, MplexStreamEOF, MplexStreamReset if TYPE_CHECKING: - from typing import Any # noqa: F401 from libp2p.stream_muxer.mplex.mplex import Mplex @@ -66,16 +65,33 @@ class MplexStream(IMuxedStream): ) for fut in pending: fut.cancel() + if self.event_reset.is_set(): raise MplexStreamReset - done_task = cast("asyncio.Task[Any]", tuple(done)[0]) - # TODO: `_coro` is not in `asyncio.Task`'s typeshed. - if done_task._coro.__qualname__ == "Queue.get": # type: ignore + + if len(done) != 1: + raise Exception(f"Should be exactly 1 job in {done}.") + done_task = tuple(done)[0] + # NOTE: Ignore type check because the typeshed for `asyncio.Task` does not + # have the field `_coro`. + coro_qualname = done_task._coro.__qualname__ # type: ignore + # If `qualname == "Queue.get"` then there is incoming data. We can add it to the buffer. + if coro_qualname == "Queue.get": data = done_task.result() self._buf.extend(data) return + if self.event_remote_closed.is_set(): raise MplexStreamEOF + + # If the task is not `Queue.get`, then it must be `Event.wait`. + # However, it is abnormal that `Event.wait` is unblocked without any of the event + # (remote_closed and reset) is set. Then it is highly possible that the task + # is cancelled. + raise Exception( + "Should not enter here. " + f"It is highly possible that `done_task` is cancelled. `done_task`={done_task}" + ) # TODO: Handle timeout when deadline is used. async def _read_until_eof(self) -> bytes: @@ -107,7 +123,7 @@ class MplexStream(IMuxedStream): return await self._read_until_eof() if len(self._buf) == 0 and self.incoming_data.empty(): await self._wait_for_data() - # Either `buf` is not empty or `incoming_data` is not empty now. + # Now we are sure we have something to read. # Try to put enough incoming data into `self._buf`. while len(self._buf) < n: try: