From 86d4ce1da8494ff099df43b314ea01c94ce9b2f7 Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 15 Aug 2019 23:31:26 +0800 Subject: [PATCH] Add `delim_encode` and `delim_read` - Add `StreamCommunicator` and `RawConnectionCommunicator`, read/write messages with delim codec, with `IMuxedStream` and `IRawConnection` respectively. - Use it in `Multiselect` and `MultiselectClient`. --- libp2p/network/swarm.py | 7 +- libp2p/protocol_muxer/multiselect.py | 11 ++-- libp2p/protocol_muxer/multiselect_client.py | 19 ++---- .../multiselect_client_interface.py | 8 ++- .../multiselect_communicator.py | 64 +++++++++++-------- .../multiselect_communicator_interface.py | 2 +- .../multiselect_muxer_interface.py | 6 +- libp2p/security/secure_conn_interface.py | 2 + libp2p/security/security_multistream.py | 8 ++- libp2p/stream_muxer/mplex/utils.py | 6 +- libp2p/typing.py | 5 +- 11 files changed, 74 insertions(+), 64 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 3353f186..0f0b1d95 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -7,6 +7,7 @@ from libp2p.peer.id import ID from libp2p.peer.peerstore_interface import IPeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.transport.listener_interface import IListener @@ -148,7 +149,7 @@ class Swarm(INetwork): # Perform protocol muxing to determine protocol to use selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), muxed_stream + list(protocol_ids), StreamCommunicator(muxed_stream) ) # Create a net stream with the selected protocol @@ -264,7 +265,9 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None: # Perform protocol muxing to determine protocol to use - protocol, handler = await multiselect.negotiate(muxed_stream) + protocol, handler = await multiselect.negotiate( + StreamCommunicator(muxed_stream) + ) net_stream = NetStream(muxed_stream) net_stream.set_protocol(protocol) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index a6b93ce7..c854415f 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -1,8 +1,7 @@ from typing import Dict, Tuple -from libp2p.typing import NegotiableTransport, StreamHandlerFn, TProtocol +from libp2p.typing import StreamHandlerFn, TProtocol -from .multiselect_communicator import MultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_muxer_interface import IMultiselectMuxer @@ -31,7 +30,7 @@ class Multiselect(IMultiselectMuxer): self.handlers[protocol] = handler async def negotiate( - self, stream: NegotiableTransport + self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: """ Negotiate performs protocol selection @@ -39,8 +38,6 @@ class Multiselect(IMultiselectMuxer): :return: selected protocol name, handler function :raise Exception: negotiation failed exception """ - # Create a communicator to handle all communication across the stream - communicator = MultiselectCommunicator(stream) # Perform handshake to ensure multiselect protocol IDs match await self.handshake(communicator) @@ -48,7 +45,7 @@ class Multiselect(IMultiselectMuxer): # Read and respond to commands until a valid protocol ID is sent while True: # Read message - command = await communicator.read_stream_until_eof() + command = await communicator.read() # Command is ls or a protocol if command == "ls": @@ -78,7 +75,7 @@ class Multiselect(IMultiselectMuxer): await communicator.write(MULTISELECT_PROTOCOL_ID) # Read in the protocol ID from other party - handshake_contents = await communicator.read_stream_until_eof() + handshake_contents = await communicator.read() # Confirm that the protocols are the same if not validate_handshake(handshake_contents): diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 8a5b7f11..062aedc7 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -1,10 +1,8 @@ from typing import Sequence -from libp2p.stream_muxer.abc import IMuxedStream -from libp2p.typing import NegotiableTransport, TProtocol +from libp2p.typing import TProtocol from .multiselect_client_interface import IMultiselectClient -from .multiselect_communicator import MultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0" @@ -31,7 +29,7 @@ class MultiselectClient(IMultiselectClient): await communicator.write(MULTISELECT_PROTOCOL_ID) # Read in the protocol ID from other party - handshake_contents = await communicator.read_stream_until_eof() + handshake_contents = await communicator.read() # Confirm that the protocols are the same if not validate_handshake(handshake_contents): @@ -40,7 +38,7 @@ class MultiselectClient(IMultiselectClient): # Handshake succeeded if this point is reached async def select_protocol_or_fail( - self, protocol: TProtocol, stream: IMuxedStream + self, protocol: TProtocol, communicator: IMultiselectCommunicator ) -> TProtocol: """ Send message to multiselect selecting protocol @@ -49,9 +47,6 @@ class MultiselectClient(IMultiselectClient): :param stream: stream to communicate with multiselect over :return: selected protocol """ - # Create a communicator to handle all communication across the stream - communicator = MultiselectCommunicator(stream) - # Perform handshake to ensure multiselect protocol IDs match await self.handshake(communicator) @@ -61,7 +56,7 @@ class MultiselectClient(IMultiselectClient): return selected_protocol async def select_one_of( - self, protocols: Sequence[TProtocol], stream: NegotiableTransport + self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: """ For each protocol, send message to multiselect selecting protocol @@ -71,10 +66,6 @@ class MultiselectClient(IMultiselectClient): :param stream: stream to communicate with multiselect over :return: selected protocol """ - - # Create a communicator to handle all communication across the stream - communicator = MultiselectCommunicator(stream) - # Perform handshake to ensure multiselect protocol IDs match await self.handshake(communicator) @@ -105,7 +96,7 @@ class MultiselectClient(IMultiselectClient): await communicator.write(protocol) # Get what counterparty says in response - response = await communicator.read_stream_until_eof() + response = await communicator.read() # Return protocol if response is equal to protocol or raise error if response == protocol: diff --git a/libp2p/protocol_muxer/multiselect_client_interface.py b/libp2p/protocol_muxer/multiselect_client_interface.py index df5c3a88..c54c41ab 100644 --- a/libp2p/protocol_muxer/multiselect_client_interface.py +++ b/libp2p/protocol_muxer/multiselect_client_interface.py @@ -1,7 +1,9 @@ from abc import ABC, abstractmethod from typing import Sequence -from libp2p.stream_muxer.abc import IMuxedStream +from libp2p.protocol_muxer.multiselect_communicator_interface import ( + IMultiselectCommunicator, +) from libp2p.typing import TProtocol @@ -13,7 +15,7 @@ class IMultiselectClient(ABC): @abstractmethod async def select_protocol_or_fail( - self, protocol: TProtocol, stream: IMuxedStream + self, protocol: TProtocol, communicator: IMultiselectCommunicator ) -> TProtocol: """ Send message to multiselect selecting protocol @@ -25,7 +27,7 @@ class IMultiselectClient(ABC): @abstractmethod async def select_one_of( - self, protocols: Sequence[TProtocol], stream: IMuxedStream + self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: """ For each protocol, send message to multiselect selecting protocol diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 36513fcd..783c3802 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,35 +1,47 @@ -from libp2p.typing import NegotiableTransport +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.stream_muxer.abc import IMuxedStream +from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint +from libp2p.typing import StreamReader from .multiselect_communicator_interface import IMultiselectCommunicator -class MultiselectCommunicator(IMultiselectCommunicator): - """ - Communicator helper class that ensures both the client - and multistream module will follow the same multistream protocol, - which is necessary for them to work - """ +def delim_encode(msg_str: str) -> bytes: + msg_bytes = msg_str.encode() + varint_len_msg = encode_uvarint(len(msg_bytes) + 1) + return varint_len_msg + msg_bytes + b"\n" - reader_writer: NegotiableTransport - def __init__(self, reader_writer: NegotiableTransport) -> None: - """ - MultistreamCommunicator expects a reader_writer object that has - an async read and an async write function (this could be a stream, - raw connection, or other object implementing those functions) - """ - self.reader_writer = reader_writer +async def delim_read(reader: StreamReader, timeout: int = 10) -> str: + len_msg = await decode_uvarint_from_stream(reader, timeout) + msg_bytes = await reader.read(len_msg) + return msg_bytes.decode().rstrip() + + +class RawConnectionCommunicator(IMultiselectCommunicator): + conn: IRawConnection + + def __init__(self, conn: IRawConnection) -> None: + self.conn = conn async def write(self, msg_str: str) -> None: - """ - Write message to reader_writer - :param msg_str: message to write - """ - await self.reader_writer.write(msg_str.encode()) + msg_bytes = delim_encode(msg_str) + self.conn.writer.write(msg_bytes) + await self.conn.writer.drain() - async def read_stream_until_eof(self) -> str: - """ - Reads message from reader_writer until EOF - """ - read_str = (await self.reader_writer.read()).decode() - return read_str + async def read(self) -> str: + return await delim_read(self.conn.reader) + + +class StreamCommunicator(IMultiselectCommunicator): + stream: IMuxedStream + + def __init__(self, stream: IMuxedStream) -> None: + self.stream = stream + + async def write(self, msg_str: str) -> None: + msg_bytes = delim_encode(msg_str) + await self.stream.write(msg_bytes) + + async def read(self) -> str: + return await delim_read(self.stream) diff --git a/libp2p/protocol_muxer/multiselect_communicator_interface.py b/libp2p/protocol_muxer/multiselect_communicator_interface.py index 78a05a0c..d72142bb 100644 --- a/libp2p/protocol_muxer/multiselect_communicator_interface.py +++ b/libp2p/protocol_muxer/multiselect_communicator_interface.py @@ -16,7 +16,7 @@ class IMultiselectCommunicator(ABC): """ @abstractmethod - async def read_stream_until_eof(self) -> str: + async def read(self) -> str: """ Reads message from stream until EOF """ diff --git a/libp2p/protocol_muxer/multiselect_muxer_interface.py b/libp2p/protocol_muxer/multiselect_muxer_interface.py index f2e1e893..acfbdbdc 100644 --- a/libp2p/protocol_muxer/multiselect_muxer_interface.py +++ b/libp2p/protocol_muxer/multiselect_muxer_interface.py @@ -1,7 +1,9 @@ from abc import ABC, abstractmethod from typing import Dict, Tuple -from libp2p.typing import NegotiableTransport, StreamHandlerFn, TProtocol +from libp2p.typing import StreamHandlerFn, TProtocol + +from .multiselect_communicator_interface import IMultiselectCommunicator class IMultiselectMuxer(ABC): @@ -23,7 +25,7 @@ class IMultiselectMuxer(ABC): @abstractmethod async def negotiate( - self, stream: NegotiableTransport + self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: """ Negotiate performs protocol selection diff --git a/libp2p/security/secure_conn_interface.py b/libp2p/security/secure_conn_interface.py index ab69a6d0..8a399233 100644 --- a/libp2p/security/secure_conn_interface.py +++ b/libp2p/security/secure_conn_interface.py @@ -14,6 +14,8 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class AbstractSecureConn(ABC): + conn: IRawConnection + @abstractmethod def get_local_peer(self) -> ID: pass diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index fcb85a73..a422ccf4 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -7,6 +7,7 @@ from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator from libp2p.typing import TProtocol @@ -74,14 +75,15 @@ class SecurityMultistream(ABC): # instead of stream? In go repo, they pass in a raw conn # (https://raw.githubusercontent.com/libp2p/go-conn-security-multistream/master/ssms.go) - protocol = None + protocol: TProtocol + communicator = RawConnectionCommunicator(conn) if initiator: # Select protocol if initiator protocol = await self.multiselect_client.select_one_of( - list(self.transports.keys()), conn + list(self.transports.keys()), communicator ) else: # Select protocol if non-initiator - protocol, _ = await self.multiselect.negotiate(conn) + protocol, _ = await self.multiselect.negotiate(communicator) # Return transport from protocol return self.transports[protocol] diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index dba64652..44326ad2 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -2,6 +2,8 @@ import asyncio import struct from typing import Tuple +from libp2p.typing import StreamReader + def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" @@ -31,9 +33,7 @@ def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: return result, index + 1 -async def decode_uvarint_from_stream( - reader: asyncio.StreamReader, timeout: float -) -> int: +async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int: shift = 0 result = 0 while True: diff --git a/libp2p/typing.py b/libp2p/typing.py index 08631dcf..98107464 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -1,7 +1,6 @@ +import asyncio from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union -from libp2p.network.connection.raw_connection_interface import IRawConnection - if TYPE_CHECKING: from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401 from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401 @@ -10,4 +9,4 @@ TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] -NegotiableTransport = Union["IMuxedStream", IRawConnection] +StreamReader = Union["IMuxedStream", asyncio.StreamReader]