From 95926b7376d1476fe39d4449abaf37e1d3399e21 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 6 Sep 2019 01:08:42 +0800 Subject: [PATCH] Temp for mplex_stream --- libp2p/stream_muxer/mplex/exceptions.py | 12 ++++++--- libp2p/stream_muxer/mplex/mplex_stream.py | 31 ++++++++++++++++++----- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/libp2p/stream_muxer/mplex/exceptions.py b/libp2p/stream_muxer/mplex/exceptions.py index bd4ceb56..11663e2e 100644 --- a/libp2p/stream_muxer/mplex/exceptions.py +++ b/libp2p/stream_muxer/mplex/exceptions.py @@ -5,9 +5,13 @@ class MplexError(BaseLibp2pError): pass +class MplexStreamReset(MplexError): + pass + + +class MplexStreamEOF(MplexError, EOFError): + pass + + class MplexShutdown(MplexError): pass - - -class StreamNotFound(MplexError): - pass diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index d257297f..4f2e76ce 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING from libp2p.stream_muxer.abc import IMuxedStream from .constants import HeaderTags +from .exceptions import MplexStreamReset, MplexStreamEOF from .datastructures import StreamID if TYPE_CHECKING: @@ -53,6 +54,26 @@ class MplexStream(IMuxedStream): def is_initiator(self) -> bool: return self.stream_id.is_initiator + async def _wait_for_data(self) -> None: + print("!@# _wait_for_data: 0") + done, pending = await asyncio.wait( + [ + self.event_reset.wait(), + self.event_remote_closed.wait(), + self.incoming_data.get(), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + print("!@# _wait_for_data: 1") + if self.event_reset.is_set(): + raise MplexStreamReset + if self.event_remote_closed.is_set(): + while not self.incoming_data.empty(): + self._buf.extend(await self.incoming_data.get()) + raise MplexStreamEOF + data = tuple(done)[0].result() + self._buf.extend(data) + async def read(self, n: int = -1) -> bytes: """ Read up to n bytes. Read possibly returns fewer than `n` bytes, @@ -66,19 +87,17 @@ class MplexStream(IMuxedStream): raise ValueError( f"the number of bytes to read `n` must be positive or -1 to indicate read until EOF" ) - # If the buffer is empty at first, blocking wait for data. - if len(self._buf) == 0: - self._buf.extend(await self.incoming_data.get()) # FIXME: If `n == -1`, we should blocking read until EOF, instead of returning when # no message is available. # If `n >= 0`, read up to `n` bytes. # Else, read until no message is available. while len(self._buf) < n or n == -1: - if self.incoming_data.empty(): + # new_bytes = await self.incoming_data.get() + try: + await self._wait_for_data() + except MplexStreamEOF: break - new_bytes = await self.incoming_data.get() - self._buf.extend(new_bytes) payload: bytearray if n == -1: payload = self._buf