From 4358a4bc890c54b4d80b1147bdae43c6bb637f46 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 14:12:10 +0800 Subject: [PATCH] Negotiate multiselect version for Muxer `MuxerMultistream` is introduced to negotiate `Multiselect` version before negotiating Multiplexer's version. This is required by multistream 1.x --- libp2p/__init__.py | 22 ++++---- libp2p/network/swarm.py | 4 +- libp2p/pubsub/gossipsub.py | 2 - libp2p/security/secure_conn_interface.py | 2 - libp2p/security/security_multistream.py | 5 ++ libp2p/stream_muxer/muxer_multistream.py | 71 ++++++++++++++++++++++++ libp2p/transport/tcp/tcp.py | 3 +- libp2p/transport/upgrader.py | 25 +++++---- 8 files changed, 105 insertions(+), 29 deletions(-) create mode 100644 libp2p/stream_muxer/muxer_multistream.py diff --git a/libp2p/__init__.py b/libp2p/__init__.py index a69f6f61..54904999 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -15,6 +15,8 @@ from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.security.insecure.transport import InsecureTransport from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.muxer_multistream import MuxerClassType from libp2p.transport.tcp.tcp import TCP from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import TProtocol @@ -72,7 +74,7 @@ def initialize_default_swarm( key_pair: KeyPair, id_opt: ID = None, transport_opt: Sequence[str] = None, - muxer_opt: Sequence[str] = None, + muxer_opt: Mapping[TProtocol, MuxerClassType] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None, peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, @@ -91,23 +93,21 @@ def initialize_default_swarm( if not id_opt: id_opt = generate_peer_id_from_rsa_identity(key_pair) - # TODO parse transport_opt to determine transport - transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"] + # TODO: Parse `transport_opt` to determine transport transport = TCP() # TODO TransportUpgrader is not doing anything really - # TODO parse muxer and sec to pass into TransportUpgrader - muxer = muxer_opt or ["mplex/6.7.0"] + muxer_transports_by_protocol = muxer_opt or {TProtocol("/mplex/6.7.0"): Mplex} security_transports_by_protocol = sec_opt or { - TProtocol("insecure/1.0.0"): InsecureTransport(key_pair) + TProtocol("/plaintext/1.0.0"): InsecureTransport(key_pair) } - upgrader = TransportUpgrader(security_transports_by_protocol, muxer) + upgrader = TransportUpgrader( + security_transports_by_protocol, muxer_transports_by_protocol + ) peerstore = peerstore_opt or PeerStore() # TODO: Initialize discovery if not presented - swarm_opt = Swarm(id_opt, peerstore, upgrader, transport, disc_opt) - - return swarm_opt + return Swarm(id_opt, peerstore, upgrader, transport, disc_opt) async def new_node( @@ -115,7 +115,7 @@ async def new_node( swarm_opt: INetwork = None, id_opt: ID = None, transport_opt: Sequence[str] = None, - muxer_opt: Sequence[str] = None, + muxer_opt: Mapping[TProtocol, MuxerClassType] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None, peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 0f0b1d95..9dd1c152 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -111,7 +111,7 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) - muxed_conn = self.upgrader.upgrade_connection( + muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) @@ -204,7 +204,7 @@ class Swarm(INetwork): secured_conn = await self.upgrader.upgrade_security( raw_conn, peer_id, False ) - muxed_conn = self.upgrader.upgrade_connection( + muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 598e16be..8b3a62cb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -456,8 +456,6 @@ class GossipSub(IPubsubRouter): """ Checks the seen set and requests unknown messages with an IWANT message. """ - # from_id_bytes = ihave_msg.from_id - # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache seen_seqnos_and_peers = [ seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() diff --git a/libp2p/security/secure_conn_interface.py b/libp2p/security/secure_conn_interface.py index 8a399233..ab69a6d0 100644 --- a/libp2p/security/secure_conn_interface.py +++ b/libp2p/security/secure_conn_interface.py @@ -14,8 +14,6 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class AbstractSecureConn(ABC): - conn: IRawConnection - @abstractmethod def get_local_peer(self) -> ID: pass diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 05c21018..6e736c16 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -20,6 +20,11 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class SecurityMultistream(ABC): + """ + SSMuxer is a multistream stream security transport multiplexer. + Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go + """ + transports: Dict[TProtocol, ISecureTransport] multiselect: Multiselect multiselect_client: MultiselectClient diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py new file mode 100644 index 00000000..4d4ce468 --- /dev/null +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -0,0 +1,71 @@ +from typing import Dict, List, Mapping, Type + +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.typing import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.protocol_muxer.multiselect import Multiselect +from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.typing import TProtocol + +from .abc import IMuxedConn + +MuxerClassType = Type[IMuxedConn] + +# FIXME: add negotiate timeout to `MuxerMultistream` +DEFAULT_NEGOTIATE_TIMEOUT = 60 + + +class MuxerMultistream: + """ + MuxerMultistream is a multistream stream muxed transport multiplexer. + go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go + """ + + transports: Dict[TProtocol, MuxerClassType] + multiselect: Multiselect + multiselect_client: MultiselectClient + order_preference: List[TProtocol] + + def __init__( + self, muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType] + ) -> None: + self.transports = {} + self.multiselect = Multiselect() + self.multiselect_client = MultiselectClient() + self.order_preference = [] + for protocol, transport in muxer_transports_by_protocol.items(): + self.add_transport(protocol, transport) + + def add_transport(self, protocol: TProtocol, transport: MuxerClassType) -> None: + self.transports[protocol] = transport + self.multiselect.add_handler(protocol, None) + self.order_preference.append(protocol) + + async def select_transport(self, conn: IRawConnection) -> MuxerClassType: + """ + Select a transport that both us and the node on the + other end of conn support and agree on + :param conn: conn to choose a transport over + :param initiator: true if we are the initiator, false otherwise + :return: selected secure transport + """ + protocol: TProtocol + communicator = RawConnectionCommunicator(conn) + if conn.initiator: + protocol = await self.multiselect_client.select_one_of( + self.order_preference, communicator + ) + else: + protocol, _ = await self.multiselect.negotiate(communicator) + return self.transports[protocol] + + async def new_conn( + self, + conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, + ) -> IMuxedConn: + transport_class = await self.select_transport(conn) + return transport_class(conn, generic_protocol_handler, peer_id) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 4378e1cf..ae1e7764 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -67,13 +67,14 @@ class TCP(ITransport): dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :return: True if successful + :return: `RawConnection` if successful """ host = maddr.value_for_protocol("ip4") port = maddr.value_for_protocol("tcp") reader, writer = await asyncio.open_connection(host, int(port)) + # TODO: Change this `sending peer id` process to `/plaintext/2.0.0` # First: send our peer ID so receiver knows it writer.write(self_id.to_base58().encode()) await writer.drain() diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f727767c..31306bab 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,4 +1,4 @@ -from typing import Mapping, Sequence +from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn @@ -6,7 +6,8 @@ from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream -from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.abc import IMuxedConn +from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream from libp2p.typing import TProtocol from .listener_interface import IListener @@ -15,20 +16,21 @@ from .transport_interface import ITransport class TransportUpgrader: security_multistream: SecurityMultistream - muxer: Sequence[str] + muxer_multistream: MuxerMultistream def __init__( self, secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport], - muxerOpt: Sequence[str], + muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType], ): self.security_multistream = SecurityMultistream(secure_transports_by_protocol) - self.muxer = muxerOpt + self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ + # TODO: Figure out what to do with this function. pass async def upgrade_security( @@ -42,14 +44,15 @@ class TransportUpgrader: return await self.security_multistream.secure_inbound(raw_conn) - @staticmethod - def upgrade_connection( + async def upgrade_connection( + self, conn: ISecureConn, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID, - ) -> Mplex: + ) -> IMuxedConn: """ - Upgrade raw connection to muxed connection + Upgrade secured connection to be a muxed connection """ - # TODO do exchange to determine multiplexer - return Mplex(conn, generic_protocol_handler, peer_id) + return await self.muxer_multistream.new_conn( + conn, generic_protocol_handler, peer_id + )