From 4c9a930f84c999908f75d89aab09b9cbfb476680 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 18:28:04 +0800 Subject: [PATCH] stream_muxer done --- libp2p/stream_muxer/mplex/mplex.py | 5 +- libp2p/stream_muxer/mplex/mplex_stream.py | 47 +++++++++++++------ libp2p/stream_muxer/mplex/utils.py | 4 +- .../muxed_connection_interface.py | 18 +++++++ libp2p/stream_muxer/muxed_stream_interface.py | 12 ++--- 5 files changed, 62 insertions(+), 24 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 233a9e66..fe414c7f 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,11 +1,12 @@ import asyncio +from typing import TYPE_CHECKING, Tuple, Dict + +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from .constants import HeaderTags from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream -from ..muxed_connection_interface import IMuxedConn -from typing import TYPE_CHECKING, Tuple, Dict if TYPE_CHECKING: from multiaddr import Multiaddr diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index a2502a88..5e64ac72 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,16 +1,31 @@ import asyncio - +from typing import TYPE_CHECKING from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream from .constants import HeaderTags +if TYPE_CHECKING: + from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + + class MplexStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator: bool, mplex_conn): + stream_id: int + initiator: bool + mplex_conn: "IMuxedConn" + read_deadline: float + write_deadline: float + local_closed: bool + remote_closed: bool + stream_lock: asyncio.Lock + + def __init__( + self, stream_id: int, initiator: bool, mplex_conn: "IMuxedConn" + ) -> None: """ create new MuxedStream in muxer :param stream_id: stream stream id @@ -26,14 +41,14 @@ class MplexStream(IMuxedStream): self.remote_closed = False self.stream_lock = asyncio.Lock() - async def read(self): + async def read(self) -> bytes: """ read messages associated with stream from buffer til end of file :return: bytes of input """ return await self.mplex_conn.read_buffer(self.stream_id) - async def write(self, data): + async def write(self, data: bytes) -> int: """ write to stream :return: number of bytes written @@ -45,7 +60,7 @@ class MplexStream(IMuxedStream): ) return await self.mplex_conn.send_message(flag, data, self.stream_id) - async def close(self): + async def close(self) -> bool: """ Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction. @@ -56,7 +71,7 @@ class MplexStream(IMuxedStream): flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver await self.mplex_conn.send_message(flag, None, self.stream_id) - remote_lock = "" + remote_lock = False async with self.stream_lock: if self.local_closed: return True @@ -64,12 +79,14 @@ class MplexStream(IMuxedStream): remote_lock = self.remote_closed if remote_lock: - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id) # type: ignore return True - async def reset(self): + async def reset(self) -> bool: """ closes both ends of the stream tells this remote side to hang up @@ -92,13 +109,15 @@ class MplexStream(IMuxedStream): self.local_closed = True self.remote_closed = True - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id, None) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id, None) # type: ignore return True # TODO deadline not in use - def set_deadline(self, ttl): + def set_deadline(self, ttl: float) -> bool: """ set deadline for muxed stream :return: True if successful @@ -107,7 +126,7 @@ class MplexStream(IMuxedStream): self.write_deadline = ttl return True - def set_read_deadline(self, ttl): + def set_read_deadline(self, ttl: float) -> bool: """ set read deadline for muxed stream :return: True if successful @@ -115,7 +134,7 @@ class MplexStream(IMuxedStream): self.read_deadline = ttl return True - def set_write_deadline(self, ttl): + def set_write_deadline(self, ttl: float) -> bool: """ set write deadline for muxed stream :return: True if successful diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 07f62fb7..0de1f7d4 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -4,7 +4,7 @@ import struct from typing import Tuple -def encode_uvarint(number: int) -> bytearray: +def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" buf = b"" while True: @@ -18,7 +18,7 @@ def encode_uvarint(number: int) -> bytearray: return buf -def decode_uvarint(buff: bytearray, index: int) -> Tuple[int, int]: +def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: shift = 0 result = 0 while True: diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 9236fbca..ca23d9db 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from libp2p.network.swarm import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + from libp2p.stream_muxer.mplex.constants import HeaderTags class IMuxedConn(ABC): @@ -46,6 +47,14 @@ class IMuxedConn(ABC): :return: true if successful """ + @abstractmethod + async def read_buffer(self, stream_id: int) -> bytes: + """ + Read a message from stream_id's buffer, check raw connection for new messages + :param stream_id: stream id of stream to read from + :return: message read + """ + @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: "Multiaddr" @@ -62,3 +71,12 @@ class IMuxedConn(ABC): """ accepts a muxed stream opened by the other end """ + + @abstractmethod + async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: + """ + sends a message over the connection + :param header: header to use + :param data: data to send in the message + :param stream_id: stream the message is in + """ diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index 00ce5867..a4c602c2 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -8,36 +8,36 @@ class IMuxedStream(ABC): mplex_conn: IMuxedConn @abstractmethod - def read(self): + async def read(self) -> bytes: """ reads from the underlying muxed_conn :return: bytes of input """ @abstractmethod - def write(self, _bytes): + async def write(self, data: bytes) -> int: """ writes to the underlying muxed_conn :return: number of bytes written """ @abstractmethod - def close(self): + async def close(self) -> bool: """ close the underlying muxed_conn :return: true if successful """ @abstractmethod - def reset(self): + async def reset(self) -> bool: """ closes both ends of the stream tells this remote side to hang up - :return: error/exception + :return: true if successful """ @abstractmethod - def set_deadline(self, ttl): + def set_deadline(self, ttl: float) -> bool: """ set deadline for muxed stream :return: a new stream