From 4358a4bc890c54b4d80b1147bdae43c6bb637f46 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 14:12:10 +0800 Subject: [PATCH 1/4] Negotiate multiselect version for Muxer `MuxerMultistream` is introduced to negotiate `Multiselect` version before negotiating Multiplexer's version. This is required by multistream 1.x --- libp2p/__init__.py | 22 ++++---- libp2p/network/swarm.py | 4 +- libp2p/pubsub/gossipsub.py | 2 - libp2p/security/secure_conn_interface.py | 2 - libp2p/security/security_multistream.py | 5 ++ libp2p/stream_muxer/muxer_multistream.py | 71 ++++++++++++++++++++++++ libp2p/transport/tcp/tcp.py | 3 +- libp2p/transport/upgrader.py | 25 +++++---- 8 files changed, 105 insertions(+), 29 deletions(-) create mode 100644 libp2p/stream_muxer/muxer_multistream.py diff --git a/libp2p/__init__.py b/libp2p/__init__.py index a69f6f61..54904999 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -15,6 +15,8 @@ from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.security.insecure.transport import InsecureTransport from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.muxer_multistream import MuxerClassType from libp2p.transport.tcp.tcp import TCP from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import TProtocol @@ -72,7 +74,7 @@ def initialize_default_swarm( key_pair: KeyPair, id_opt: ID = None, transport_opt: Sequence[str] = None, - muxer_opt: Sequence[str] = None, + muxer_opt: Mapping[TProtocol, MuxerClassType] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None, peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, @@ -91,23 +93,21 @@ def initialize_default_swarm( if not id_opt: id_opt = generate_peer_id_from_rsa_identity(key_pair) - # TODO parse transport_opt to determine transport - transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"] + # TODO: Parse `transport_opt` to determine transport transport = TCP() # TODO TransportUpgrader is not doing anything really - # TODO parse muxer and sec to pass into TransportUpgrader - muxer = muxer_opt or ["mplex/6.7.0"] + muxer_transports_by_protocol = muxer_opt or {TProtocol("/mplex/6.7.0"): Mplex} security_transports_by_protocol = sec_opt or { - TProtocol("insecure/1.0.0"): InsecureTransport(key_pair) + TProtocol("/plaintext/1.0.0"): InsecureTransport(key_pair) } - upgrader = TransportUpgrader(security_transports_by_protocol, muxer) + upgrader = TransportUpgrader( + security_transports_by_protocol, muxer_transports_by_protocol + ) peerstore = peerstore_opt or PeerStore() # TODO: Initialize discovery if not presented - swarm_opt = Swarm(id_opt, peerstore, upgrader, transport, disc_opt) - - return swarm_opt + return Swarm(id_opt, peerstore, upgrader, transport, disc_opt) async def new_node( @@ -115,7 +115,7 @@ async def new_node( swarm_opt: INetwork = None, id_opt: ID = None, transport_opt: Sequence[str] = None, - muxer_opt: Sequence[str] = None, + muxer_opt: Mapping[TProtocol, MuxerClassType] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None, peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 0f0b1d95..9dd1c152 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -111,7 +111,7 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) - muxed_conn = self.upgrader.upgrade_connection( + muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) @@ -204,7 +204,7 @@ class Swarm(INetwork): secured_conn = await self.upgrader.upgrade_security( raw_conn, peer_id, False ) - muxed_conn = self.upgrader.upgrade_connection( + muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 598e16be..8b3a62cb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -456,8 +456,6 @@ class GossipSub(IPubsubRouter): """ Checks the seen set and requests unknown messages with an IWANT message. """ - # from_id_bytes = ihave_msg.from_id - # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache seen_seqnos_and_peers = [ seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() diff --git a/libp2p/security/secure_conn_interface.py b/libp2p/security/secure_conn_interface.py index 8a399233..ab69a6d0 100644 --- a/libp2p/security/secure_conn_interface.py +++ b/libp2p/security/secure_conn_interface.py @@ -14,8 +14,6 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class AbstractSecureConn(ABC): - conn: IRawConnection - @abstractmethod def get_local_peer(self) -> ID: pass diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 05c21018..6e736c16 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -20,6 +20,11 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class SecurityMultistream(ABC): + """ + SSMuxer is a multistream stream security transport multiplexer. + Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go + """ + transports: Dict[TProtocol, ISecureTransport] multiselect: Multiselect multiselect_client: MultiselectClient diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py new file mode 100644 index 00000000..4d4ce468 --- /dev/null +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -0,0 +1,71 @@ +from typing import Dict, List, Mapping, Type + +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.typing import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.protocol_muxer.multiselect import Multiselect +from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.typing import TProtocol + +from .abc import IMuxedConn + +MuxerClassType = Type[IMuxedConn] + +# FIXME: add negotiate timeout to `MuxerMultistream` +DEFAULT_NEGOTIATE_TIMEOUT = 60 + + +class MuxerMultistream: + """ + MuxerMultistream is a multistream stream muxed transport multiplexer. + go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go + """ + + transports: Dict[TProtocol, MuxerClassType] + multiselect: Multiselect + multiselect_client: MultiselectClient + order_preference: List[TProtocol] + + def __init__( + self, muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType] + ) -> None: + self.transports = {} + self.multiselect = Multiselect() + self.multiselect_client = MultiselectClient() + self.order_preference = [] + for protocol, transport in muxer_transports_by_protocol.items(): + self.add_transport(protocol, transport) + + def add_transport(self, protocol: TProtocol, transport: MuxerClassType) -> None: + self.transports[protocol] = transport + self.multiselect.add_handler(protocol, None) + self.order_preference.append(protocol) + + async def select_transport(self, conn: IRawConnection) -> MuxerClassType: + """ + Select a transport that both us and the node on the + other end of conn support and agree on + :param conn: conn to choose a transport over + :param initiator: true if we are the initiator, false otherwise + :return: selected secure transport + """ + protocol: TProtocol + communicator = RawConnectionCommunicator(conn) + if conn.initiator: + protocol = await self.multiselect_client.select_one_of( + self.order_preference, communicator + ) + else: + protocol, _ = await self.multiselect.negotiate(communicator) + return self.transports[protocol] + + async def new_conn( + self, + conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, + ) -> IMuxedConn: + transport_class = await self.select_transport(conn) + return transport_class(conn, generic_protocol_handler, peer_id) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 4378e1cf..ae1e7764 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -67,13 +67,14 @@ class TCP(ITransport): dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :return: True if successful + :return: `RawConnection` if successful """ host = maddr.value_for_protocol("ip4") port = maddr.value_for_protocol("tcp") reader, writer = await asyncio.open_connection(host, int(port)) + # TODO: Change this `sending peer id` process to `/plaintext/2.0.0` # First: send our peer ID so receiver knows it writer.write(self_id.to_base58().encode()) await writer.drain() diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f727767c..31306bab 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,4 +1,4 @@ -from typing import Mapping, Sequence +from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn @@ -6,7 +6,8 @@ from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream -from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.abc import IMuxedConn +from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream from libp2p.typing import TProtocol from .listener_interface import IListener @@ -15,20 +16,21 @@ from .transport_interface import ITransport class TransportUpgrader: security_multistream: SecurityMultistream - muxer: Sequence[str] + muxer_multistream: MuxerMultistream def __init__( self, secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport], - muxerOpt: Sequence[str], + muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType], ): self.security_multistream = SecurityMultistream(secure_transports_by_protocol) - self.muxer = muxerOpt + self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ + # TODO: Figure out what to do with this function. pass async def upgrade_security( @@ -42,14 +44,15 @@ class TransportUpgrader: return await self.security_multistream.secure_inbound(raw_conn) - @staticmethod - def upgrade_connection( + async def upgrade_connection( + self, conn: ISecureConn, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID, - ) -> Mplex: + ) -> IMuxedConn: """ - Upgrade raw connection to muxed connection + Upgrade secured connection to be a muxed connection """ - # TODO do exchange to determine multiplexer - return Mplex(conn, generic_protocol_handler, peer_id) + return await self.muxer_multistream.new_conn( + conn, generic_protocol_handler, peer_id + ) From d7d8440b2c331015d2f63b5e103f51f3d1f93a4b Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 19 Aug 2019 23:18:00 +0800 Subject: [PATCH 2/4] PR feedback: nitpicks --- libp2p/stream_muxer/muxer_multistream.py | 3 +-- libp2p/transport/upgrader.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 4d4ce468..c247e530 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -48,8 +48,7 @@ class MuxerMultistream: Select a transport that both us and the node on the other end of conn support and agree on :param conn: conn to choose a transport over - :param initiator: true if we are the initiator, false otherwise - :return: selected secure transport + :return: selected muxer transport """ protocol: TProtocol communicator = RawConnectionCommunicator(conn) diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 31306bab..b0373ec7 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -37,7 +37,7 @@ class TransportUpgrader: self, raw_conn: IRawConnection, peer_id: ID, initiator: bool ) -> ISecureConn: """ - Upgrade conn to be a secured connection + Upgrade conn to a secured connection """ if initiator: return await self.security_multistream.secure_outbound(raw_conn, peer_id) @@ -51,7 +51,7 @@ class TransportUpgrader: peer_id: ID, ) -> IMuxedConn: """ - Upgrade secured connection to be a muxed connection + Upgrade secured connection to a muxed connection """ return await self.muxer_multistream.new_conn( conn, generic_protocol_handler, peer_id From 8596f7390f1fdd51498f7777cc094b4ac86acbaf Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 11:02:21 +0800 Subject: [PATCH 3/4] PR feedback: set protocol_id to constants --- libp2p/__init__.py | 9 ++++----- libp2p/security/insecure/transport.py | 3 +++ libp2p/stream_muxer/mplex/mplex.py | 3 +++ 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 54904999..21f1d9cb 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -13,9 +13,9 @@ from libp2p.peer.peerstore import PeerStore from libp2p.peer.peerstore_interface import IPeerStore from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter -from libp2p.security.insecure.transport import InsecureTransport +from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport from libp2p.security.secure_transport_interface import ISecureTransport -from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex from libp2p.stream_muxer.muxer_multistream import MuxerClassType from libp2p.transport.tcp.tcp import TCP from libp2p.transport.upgrader import TransportUpgrader @@ -96,10 +96,9 @@ def initialize_default_swarm( # TODO: Parse `transport_opt` to determine transport transport = TCP() - # TODO TransportUpgrader is not doing anything really - muxer_transports_by_protocol = muxer_opt or {TProtocol("/mplex/6.7.0"): Mplex} + muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex} security_transports_by_protocol = sec_opt or { - TProtocol("/plaintext/1.0.0"): InsecureTransport(key_pair) + PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair) } upgrader = TransportUpgrader( security_transports_by_protocol, muxer_transports_by_protocol diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 2d8a5a83..97c9676d 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -3,6 +3,9 @@ from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.typing import TProtocol + +PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/1.0.0") class InsecureSession(BaseSession): diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index f154a162..aa64b69d 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -8,12 +8,15 @@ from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream +from libp2p.typing import TProtocol from .constants import HeaderTags from .exceptions import StreamNotFound from .mplex_stream import MplexStream from .utils import decode_uvarint_from_stream, encode_uvarint +MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") + class Mplex(IMuxedConn): """ From 550c23f9f9f991ab942133edddf7646d327ae46f Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 15:27:07 +0800 Subject: [PATCH 4/4] PR feedback - Use the order in `MuxerMultistream` as the precedence in multiselect --- libp2p/security/security_multistream.py | 19 +++++++++++++++---- libp2p/stream_muxer/muxer_multistream.py | 23 ++++++++++++++++------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 6e736c16..ec24b46e 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -1,5 +1,6 @@ from abc import ABC -from typing import Dict, Mapping +from collections import OrderedDict +from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID @@ -25,14 +26,15 @@ class SecurityMultistream(ABC): Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go """ - transports: Dict[TProtocol, ISecureTransport] + # NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2. + transports: "OrderedDict[TProtocol, ISecureTransport]" multiselect: Multiselect multiselect_client: MultiselectClient def __init__( self, secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport] ) -> None: - self.transports = {} + self.transports = OrderedDict() self.multiselect = Multiselect() self.multiselect_client = MultiselectClient() @@ -40,8 +42,17 @@ class SecurityMultistream(ABC): self.add_transport(protocol, transport) def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None: + """ + Add a protocol and its corresponding transport to multistream-select(multiselect). + The order that a protocol is added is exactly the precedence it is negotiated in + multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. + :param transport: the corresponding transportation to the ``protocol``. + """ + # If protocol is already added before, remove it and add it again. + if protocol in self.transports: + del self.transports[protocol] self.transports[protocol] = transport - # Note: None is added as the handler for the given protocol since # we only care about selecting the protocol, not any handler function self.multiselect.add_handler(protocol, None) diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index c247e530..703c4e2d 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -1,4 +1,5 @@ -from typing import Dict, List, Mapping, Type +from collections import OrderedDict +from typing import Mapping, Type from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn @@ -23,25 +24,33 @@ class MuxerMultistream: go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """ - transports: Dict[TProtocol, MuxerClassType] + # NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2. + transports: "OrderedDict[TProtocol, MuxerClassType]" multiselect: Multiselect multiselect_client: MultiselectClient - order_preference: List[TProtocol] def __init__( self, muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType] ) -> None: - self.transports = {} + self.transports = OrderedDict() self.multiselect = Multiselect() self.multiselect_client = MultiselectClient() - self.order_preference = [] for protocol, transport in muxer_transports_by_protocol.items(): self.add_transport(protocol, transport) def add_transport(self, protocol: TProtocol, transport: MuxerClassType) -> None: + """ + Add a protocol and its corresponding transport to multistream-select(multiselect). + The order that a protocol is added is exactly the precedence it is negotiated in + multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. + :param transport: the corresponding transportation to the ``protocol``. + """ + # If protocol is already added before, remove it and add it again. + if protocol in self.transports: + del self.transports[protocol] self.transports[protocol] = transport self.multiselect.add_handler(protocol, None) - self.order_preference.append(protocol) async def select_transport(self, conn: IRawConnection) -> MuxerClassType: """ @@ -54,7 +63,7 @@ class MuxerMultistream: communicator = RawConnectionCommunicator(conn) if conn.initiator: protocol = await self.multiselect_client.select_one_of( - self.order_preference, communicator + tuple(self.transports.keys()), communicator ) else: protocol, _ = await self.multiselect.negotiate(communicator)