From 29fbb9e40a40b2c5c9dce67cbc42682e64883297 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 16:32:33 +0800 Subject: [PATCH 01/15] add typing to transport --- libp2p/transport/listener_interface.py | 14 ++- libp2p/transport/tcp/tcp.py | 126 +++++++++++++----------- libp2p/transport/transport_interface.py | 15 ++- libp2p/transport/typing.py | 6 ++ libp2p/transport/upgrader.py | 35 ++++++- 5 files changed, 125 insertions(+), 71 deletions(-) create mode 100644 libp2p/transport/typing.py diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 5f84ef6c..8e2d86b4 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -1,9 +1,15 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import List + from multiaddr import Multiaddr + class IListener(ABC): @abstractmethod - def listen(self, maddr): + async def listen(self, maddr: "Multiaddr") -> bool: """ put listener in listening mode and wait for incoming connections :param maddr: multiaddr of peer @@ -11,18 +17,16 @@ class IListener(ABC): """ @abstractmethod - def get_addrs(self): + def get_addrs(self) -> "List[Multiaddr]": """ retrieve list of addresses the listener is listening on :return: return list of addrs """ @abstractmethod - def close(self, options=None): + def close(self) -> bool: """ close the listener such that no more connections can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections :return: return True if successful """ diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index c163beaa..918f7120 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,68 +1,79 @@ import asyncio -import multiaddr +from multiaddr import Multiaddr from libp2p.network.connection.raw_connection import RawConnection -from ..listener_interface import IListener -from ..transport_interface import ITransport +from libp2p.transport.listener_interface import IListener +from libp2p.transport.transport_interface import ITransport + +from typing import TYPE_CHECKING, List, Callable + +if TYPE_CHECKING: + from socket import socket + from libp2p.transport.typing import THandler + from libp2p.network.connection.raw_connection_interface import IRawConnection + + +class TCPListener(IListener): + multiaddrs: List[Multiaddr] + server = None + handler = None + + def __init__(self, handler_function: "THandler" = None) -> None: + self.multiaddrs = [] + self.server = None + self.handler = handler_function + + async def listen(self, maddr: Multiaddr) -> bool: + """ + put listener in listening mode and wait for incoming connections + :param maddr: maddr of peer + :return: return True if successful + """ + self.server = await asyncio.start_server( + self.handler, + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), + ) + socket = self.server.sockets[0] + self.multiaddrs.append(_multiaddr_from_socket(socket)) + + return True + + def get_addrs(self) -> List[Multiaddr]: + """ + retrieve list of addresses the listener is listening on + :return: return list of addrs + """ + # TODO check if server is listening + return self.multiaddrs + + def close(self) -> bool: + """ + close the listener such that no more connections + can be open on this transport instance + :return: return True if successful + """ + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True class TCP(ITransport): - def __init__(self): - self.listener = self.Listener() + def __init__(self) -> None: + self.listener = TCPListener() - class Listener(IListener): - def __init__(self, handler_function=None): - self.multiaddrs = [] - self.server = None - self.handler = handler_function - - async def listen(self, maddr): - """ - put listener in listening mode and wait for incoming connections - :param maddr: maddr of peer - :return: return True if successful - """ - self.server = await asyncio.start_server( - self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") - ) - socket = self.server.sockets[0] - self.multiaddrs.append(_multiaddr_from_socket(socket)) - - return True - - def get_addrs(self): - """ - retrieve list of addresses the listener is listening on - :return: return list of addrs - """ - # TODO check if server is listening - return self.multiaddrs - - def close(self, options=None): - """ - close the listener such that no more connections - can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections - :return: return True if successful - """ - if self.server is None: - return False - self.server.close() - _loop = asyncio.get_event_loop() - _loop.run_until_complete(self.server.wait_closed()) - _loop.close() - self.server = None - return True - - async def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: Multiaddr, self_id: ID) -> "IRawConnection": """ dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: True if successful """ host = maddr.value_for_protocol("ip4") @@ -81,18 +92,17 @@ class TCP(ITransport): if ack != expected_ack_str: raise Exception("Receiver did not receive peer id") - return RawConnection(host, port, reader, writer, True) + return RawConnection(host, str(port), reader, writer, True) - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: "THandler") -> TCPListener: """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new connection is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ - return self.Listener(handler_function) + return TCPListener(handler_function) -def _multiaddr_from_socket(socket): - return multiaddr.Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) +def _multiaddr_from_socket(socket: "socket") -> Multiaddr: + return Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index b067e5b5..b79d2461 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -1,22 +1,29 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from multiaddr import Multiaddr + from libp2p.peer.id import ID + from libp2p.network.connection.raw_connection_interface import IRawConnection + from .listener_interface import IListener + from .typing import THandler + class ITransport(ABC): @abstractmethod - def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: "Multiaddr", self_id: "ID") -> "IRawConnection": """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: list of multiaddrs """ @abstractmethod - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: "THandler") -> "IListener": """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new conntion is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py diff --git a/libp2p/transport/typing.py b/libp2p/transport/typing.py new file mode 100644 index 00000000..9fd36b9d --- /dev/null +++ b/libp2p/transport/typing.py @@ -0,0 +1,6 @@ +from asyncio import StreamReader, StreamWriter +from typing import NewType, Callable + + +THandler = Callable[[StreamReader, StreamWriter], None] + diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f14d7fcd..3cbdad4b 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,9 +1,28 @@ from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.mplex.mplex import Mplex +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + + from typing import Dict, Sequence + from libp2p.network.connection.raw_connection_interface import IRawConnection + from libp2p.network.swarm import GenericProtocolHandlerFn + from libp2p.peer.id import ID + from libp2p.security.secure_conn_interface import ISecureConn + from libp2p.security.secure_transport_interface import ISecureTransport + from libp2p.security.security_multistream import TProtocol + from .transport_interface import ITransport + from .listener_interface import IListener + class TransportUpgrader: - def __init__(self, secOpt, muxerOpt): + security_multistream: SecurityMultistream + muxer: "Sequence[str]" + + def __init__( + self, secOpt: "Dict[TProtocol, ISecureTransport]", muxerOpt: "Sequence[str]" + ) -> None: # Store security option self.security_multistream = SecurityMultistream() for key in secOpt: @@ -12,12 +31,15 @@ class TransportUpgrader: # Store muxer option self.muxer = muxerOpt - def upgrade_listener(self, transport, listeners): + def upgrade_listener(self, transport: "ITransport", listeners: "IListener") -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ + pass - async def upgrade_security(self, raw_conn, peer_id, initiator): + async def upgrade_security( + self, raw_conn: "IRawConnection", peer_id: "ID", initiator: bool + ) -> "ISecureConn": """ Upgrade conn to be a secured connection """ @@ -26,7 +48,12 @@ class TransportUpgrader: return await self.security_multistream.secure_inbound(raw_conn) - def upgrade_connection(self, conn, generic_protocol_handler, peer_id): + @staticmethod + def upgrade_connection( + conn: "IRawConnection", + generic_protocol_handler: "GenericProtocolHandlerFn", + peer_id: "ID", + ) -> "Mplex": """ Upgrade raw connection to muxed connection """ From 36b7e8ded96cd4aaab7b0666382dc304d93019f3 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 17:14:43 +0800 Subject: [PATCH 02/15] Refactor HeaderTags --- libp2p/stream_muxer/mplex/constants.py | 12 +++++++++++- libp2p/stream_muxer/mplex/mplex.py | 13 +++++++------ libp2p/stream_muxer/mplex/mplex_stream.py | 19 +++++++++++++------ libp2p/stream_muxer/mplex/utils.py | 14 -------------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/libp2p/stream_muxer/mplex/constants.py b/libp2p/stream_muxer/mplex/constants.py index a0537b2e..8989e763 100644 --- a/libp2p/stream_muxer/mplex/constants.py +++ b/libp2p/stream_muxer/mplex/constants.py @@ -1 +1,11 @@ -HEADER_TAGS = {"NEW_STREAM": 0, "MESSAGE": 2, "CLOSE": 4, "RESET": 6} +from enum import Enum + + +class HeaderTags(Enum): + NewStream = 0 + MessageReceiver = 1 + MessageInitiator = 2 + CloseReceiver = 3 + CloseInitiator = 4 + ResetReceiver = 5 + ResetInitiator = 6 diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index f00588b0..08970ff2 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,8 +1,9 @@ import asyncio -from ..muxed_connection_interface import IMuxedConn +from .constants import HeaderTags +from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream -from .utils import decode_uvarint_from_stream, encode_uvarint, get_flag +from ..muxed_connection_interface import IMuxedConn class Mplex(IMuxedConn): @@ -78,7 +79,7 @@ class Mplex(IMuxedConn): stream_id = self.raw_conn.next_stream_id() stream = MplexStream(stream_id, multi_addr, self) self.buffers[stream_id] = asyncio.Queue() - await self.send_message(get_flag(self.initiator, "NEW_STREAM"), None, stream_id) + await self.send_message(HeaderTags.NewStream, None, stream_id) return stream async def accept_stream(self): @@ -90,7 +91,7 @@ class Mplex(IMuxedConn): stream = MplexStream(stream_id, False, self) asyncio.ensure_future(self.generic_protocol_handler(stream)) - async def send_message(self, flag, data, stream_id): + async def send_message(self, flag: HeaderTags, data, stream_id): """ sends a message over the connection :param header: header to use @@ -99,7 +100,7 @@ class Mplex(IMuxedConn): :return: True if success """ # << by 3, then or with flag - header = (stream_id << 3) | flag + header = (stream_id << 3) | flag.value header = encode_uvarint(header) if data is None: @@ -135,7 +136,7 @@ class Mplex(IMuxedConn): self.buffers[stream_id] = asyncio.Queue() await self.stream_queue.put(stream_id) - if flag is get_flag(True, "NEW_STREAM"): + if flag == HeaderTags.NewStream.value: # new stream detected on connection await self.accept_stream() diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index a1a25b70..e452fbda 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -2,7 +2,7 @@ import asyncio from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream -from .utils import get_flag +from .constants import HeaderTags class MplexStream(IMuxedStream): @@ -38,9 +38,12 @@ class MplexStream(IMuxedStream): write to stream :return: number of bytes written """ - return await self.mplex_conn.send_message( - get_flag(self.initiator, "MESSAGE"), data, self.stream_id + flag = ( + HeaderTags.MessageInitiator + if self.initiator + else HeaderTags.MessageReceiver ) + return await self.mplex_conn.send_message(flag, data, self.stream_id) async def close(self): """ @@ -50,7 +53,8 @@ class MplexStream(IMuxedStream): """ # TODO error handling with timeout # TODO understand better how mutexes are used from go repo - await self.mplex_conn.send_message(get_flag(self.initiator, "CLOSE"), None, self.stream_id) + flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver + await self.mplex_conn.send_message(flag, None, self.stream_id) remote_lock = "" async with self.stream_lock: @@ -78,9 +82,12 @@ class MplexStream(IMuxedStream): return True if not self.remote_closed: - await self.mplex_conn.send_message( - get_flag(self.initiator, "RESET"), None, self.stream_id + flag = ( + HeaderTags.ResetInitiator + if self.initiator + else HeaderTags.ResetInitiator ) + await self.mplex_conn.send_message(flag, None, self.stream_id) self.local_closed = True self.remote_closed = True diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 70dcd128..7bd57729 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -1,8 +1,6 @@ import asyncio import struct -from .constants import HEADER_TAGS - def encode_uvarint(number): """Pack `number` into varint bytes""" @@ -44,15 +42,3 @@ async def decode_uvarint_from_stream(reader, timeout): break return result - - -def get_flag(initiator, action): - """ - get header flag based on action for mplex - :param action: action type in str - :return: int flag - """ - if initiator or HEADER_TAGS[action] == 0: - return HEADER_TAGS[action] - - return HEADER_TAGS[action] - 1 From 239a5c88fb30d9bcc4e0d6a1d6729b1659dcb9bb Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 17:53:51 +0800 Subject: [PATCH 03/15] add typing to mplex --- .../connection/raw_connection_interface.py | 16 +++++++ libp2p/stream_muxer/mplex/mplex.py | 47 ++++++++++++++----- libp2p/stream_muxer/mplex/mplex_stream.py | 2 +- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 1e35514b..088eaec1 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -1,3 +1,4 @@ +import asyncio from abc import ABC, abstractmethod @@ -6,6 +7,13 @@ class IRawConnection(ABC): A Raw Connection provides a Reader and a Writer """ + initiator: bool + + # TODO: reader and writer shouldn't be exposed. + # Need better API for the consumers + reader: asyncio.StreamReader + writer: asyncio.StreamWriter + @abstractmethod async def write(self, data: bytes) -> None: pass @@ -13,3 +21,11 @@ class IRawConnection(ABC): @abstractmethod async def read(self) -> bytes: pass + + @abstractmethod + def close(self) -> None: + pass + + @abstractmethod + def next_stream_id(self) -> int: + pass diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 08970ff2..e75d8eea 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -5,13 +5,36 @@ from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream from ..muxed_connection_interface import IMuxedConn +from typing import TYPE_CHECKING, Tuple, Dict + +if TYPE_CHECKING: + from multiaddr import Multiaddr + from libp2p.security.secure_conn_interface import ISecureConn + from libp2p.network.connection.raw_connection_interface import IRawConnection + from libp2p.network.swarm import GenericProtocolHandlerFn + from libp2p.peer.id import ID + from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + class Mplex(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ - def __init__(self, secured_conn, generic_protocol_handler, peer_id): + secured_conn: "ISecureConn" + raw_conn: "IRawConnection" + initiator: bool + generic_protocol_handler = None + peer_id: "ID" + buffers: Dict[int, asyncio.Queue[bytes]] + stream_queue: asyncio.Queue[int] + + def __init__( + self, + secured_conn: "ISecureConn", + generic_protocol_handler: "GenericProtocolHandlerFn", + peer_id: "ID", + ) -> None: """ create a new muxed connection :param conn: an instance of raw connection @@ -39,19 +62,20 @@ class Mplex(IMuxedConn): # Kick off reading asyncio.ensure_future(self.handle_incoming()) - def close(self): + def close(self) -> None: """ close the stream muxer and underlying raw connection """ self.raw_conn.close() - def is_closed(self): + def is_closed(self) -> bool: """ check connection is fully closed :return: true if successful """ + raise NotImplementedError() - async def read_buffer(self, stream_id): + async def read_buffer(self, stream_id: int) -> bytes: """ Read a message from stream_id's buffer, check raw connection for new messages :param stream_id: stream id of stream to read from @@ -69,7 +93,9 @@ class Mplex(IMuxedConn): # Stream not created yet return None - async def open_stream(self, protocol_id, multi_addr): + async def open_stream( + self, protocol_id: str, multi_addr: "Multiaddr" + ) -> "IMuxedStream": """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -82,7 +108,7 @@ class Mplex(IMuxedConn): await self.send_message(HeaderTags.NewStream, None, stream_id) return stream - async def accept_stream(self): + async def accept_stream(self) -> None: """ accepts a muxed stream opened by the other end :return: the accepted stream @@ -91,13 +117,12 @@ class Mplex(IMuxedConn): stream = MplexStream(stream_id, False, self) asyncio.ensure_future(self.generic_protocol_handler(stream)) - async def send_message(self, flag: HeaderTags, data, stream_id): + async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: """ sends a message over the connection :param header: header to use :param data: data to send in the message :param stream_id: stream the message is in - :return: True if success """ # << by 3, then or with flag header = (stream_id << 3) | flag.value @@ -112,7 +137,7 @@ class Mplex(IMuxedConn): return await self.write_to_stream(_bytes) - async def write_to_stream(self, _bytes): + async def write_to_stream(self, _bytes: bytearray) -> int: """ writes a byte array to a raw connection :param _bytes: byte array to write @@ -122,7 +147,7 @@ class Mplex(IMuxedConn): await self.raw_conn.writer.drain() return len(_bytes) - async def handle_incoming(self): + async def handle_incoming(self) -> None: """ Read a message off of the raw connection and add it to the corresponding message buffer """ @@ -146,7 +171,7 @@ class Mplex(IMuxedConn): # Force context switch await asyncio.sleep(0) - async def read_message(self): + async def read_message(self) -> Tuple[int, int, bytes]: """ Read a single message off of the raw connection :return: stream_id, flag, message contents diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index e452fbda..a2502a88 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -10,7 +10,7 @@ class MplexStream(IMuxedStream): reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator, mplex_conn): + def __init__(self, stream_id, initiator: bool, mplex_conn): """ create new MuxedStream in muxer :param stream_id: stream stream id From b64ed9fd6f575530cef5bb1f6a6b19e123b42aa0 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 17:57:30 +0800 Subject: [PATCH 04/15] typed mplex.utils --- libp2p/stream_muxer/mplex/utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 7bd57729..07f62fb7 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -1,8 +1,10 @@ import asyncio import struct +from typing import Tuple -def encode_uvarint(number): + +def encode_uvarint(number: int) -> bytearray: """Pack `number` into varint bytes""" buf = b"" while True: @@ -16,7 +18,7 @@ def encode_uvarint(number): return buf -def decode_uvarint(buff, index): +def decode_uvarint(buff: bytearray, index: int) -> Tuple[int, int]: shift = 0 result = 0 while True: @@ -30,7 +32,9 @@ def decode_uvarint(buff, index): return result, index + 1 -async def decode_uvarint_from_stream(reader, timeout): +async def decode_uvarint_from_stream( + reader: asyncio.StreamReader, timeout: float +) -> int: shift = 0 result = 0 while True: From dadac423f2035f9e26cce85f2d1e0bdad49e66d1 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 18:03:34 +0800 Subject: [PATCH 05/15] typed muxed_connection_interface.py --- libp2p/stream_muxer/mplex/mplex.py | 3 +- .../muxed_connection_interface.py | 30 +++++++++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index e75d8eea..233a9e66 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -42,7 +42,7 @@ class Mplex(IMuxedConn): for new muxed streams :param peer_id: peer_id of peer the connection is to """ - super(Mplex, self).__init__(secured_conn, generic_protocol_handler, peer_id) + super().__init__(secured_conn, generic_protocol_handler, peer_id) self.secured_conn = secured_conn self.raw_conn = secured_conn.get_conn() @@ -111,7 +111,6 @@ class Mplex(IMuxedConn): async def accept_stream(self) -> None: """ accepts a muxed stream opened by the other end - :return: the accepted stream """ stream_id = await self.stream_queue.get() stream = MplexStream(stream_id, False, self) diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index b7bd4e6b..9236fbca 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -1,6 +1,13 @@ from abc import ABC, abstractmethod -from libp2p.peer.id import ID +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from multiaddr import Multiaddr + from libp2p.security.secure_conn_interface import ISecureConn + from libp2p.network.swarm import GenericProtocolHandlerFn + from libp2p.peer.id import ID + from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream class IMuxedConn(ABC): @@ -9,10 +16,15 @@ class IMuxedConn(ABC): """ initiator: bool - peer_id: ID + peer_id: "ID" @abstractmethod - def __init__(self, conn, generic_protocol_handler, peer_id): + def __init__( + self, + conn: "ISecureConn", + generic_protocol_handler: "GenericProtocolHandlerFn", + peer_id: "ID", + ) -> None: """ create a new muxed connection :param conn: an instance of secured connection @@ -22,21 +34,22 @@ class IMuxedConn(ABC): """ @abstractmethod - def close(self): + def close(self) -> None: """ close connection - :return: true if successful """ @abstractmethod - def is_closed(self): + def is_closed(self) -> bool: """ check connection is fully closed :return: true if successful """ @abstractmethod - def open_stream(self, protocol_id, multi_addr): + async def open_stream( + self, protocol_id: str, multi_addr: "Multiaddr" + ) -> "IMuxedStream": """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -45,8 +58,7 @@ class IMuxedConn(ABC): """ @abstractmethod - def accept_stream(self): + async def accept_stream(self) -> None: """ accepts a muxed stream opened by the other end - :return: the accepted stream """ From 4c9a930f84c999908f75d89aab09b9cbfb476680 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Fri, 2 Aug 2019 18:28:04 +0800 Subject: [PATCH 06/15] stream_muxer done --- libp2p/stream_muxer/mplex/mplex.py | 5 +- libp2p/stream_muxer/mplex/mplex_stream.py | 47 +++++++++++++------ libp2p/stream_muxer/mplex/utils.py | 4 +- .../muxed_connection_interface.py | 18 +++++++ libp2p/stream_muxer/muxed_stream_interface.py | 12 ++--- 5 files changed, 62 insertions(+), 24 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 233a9e66..fe414c7f 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,11 +1,12 @@ import asyncio +from typing import TYPE_CHECKING, Tuple, Dict + +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from .constants import HeaderTags from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream -from ..muxed_connection_interface import IMuxedConn -from typing import TYPE_CHECKING, Tuple, Dict if TYPE_CHECKING: from multiaddr import Multiaddr diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index a2502a88..5e64ac72 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,16 +1,31 @@ import asyncio - +from typing import TYPE_CHECKING from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream from .constants import HeaderTags +if TYPE_CHECKING: + from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + + class MplexStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator: bool, mplex_conn): + stream_id: int + initiator: bool + mplex_conn: "IMuxedConn" + read_deadline: float + write_deadline: float + local_closed: bool + remote_closed: bool + stream_lock: asyncio.Lock + + def __init__( + self, stream_id: int, initiator: bool, mplex_conn: "IMuxedConn" + ) -> None: """ create new MuxedStream in muxer :param stream_id: stream stream id @@ -26,14 +41,14 @@ class MplexStream(IMuxedStream): self.remote_closed = False self.stream_lock = asyncio.Lock() - async def read(self): + async def read(self) -> bytes: """ read messages associated with stream from buffer til end of file :return: bytes of input """ return await self.mplex_conn.read_buffer(self.stream_id) - async def write(self, data): + async def write(self, data: bytes) -> int: """ write to stream :return: number of bytes written @@ -45,7 +60,7 @@ class MplexStream(IMuxedStream): ) return await self.mplex_conn.send_message(flag, data, self.stream_id) - async def close(self): + async def close(self) -> bool: """ Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction. @@ -56,7 +71,7 @@ class MplexStream(IMuxedStream): flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver await self.mplex_conn.send_message(flag, None, self.stream_id) - remote_lock = "" + remote_lock = False async with self.stream_lock: if self.local_closed: return True @@ -64,12 +79,14 @@ class MplexStream(IMuxedStream): remote_lock = self.remote_closed if remote_lock: - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id) # type: ignore return True - async def reset(self): + async def reset(self) -> bool: """ closes both ends of the stream tells this remote side to hang up @@ -92,13 +109,15 @@ class MplexStream(IMuxedStream): self.local_closed = True self.remote_closed = True - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id, None) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id, None) # type: ignore return True # TODO deadline not in use - def set_deadline(self, ttl): + def set_deadline(self, ttl: float) -> bool: """ set deadline for muxed stream :return: True if successful @@ -107,7 +126,7 @@ class MplexStream(IMuxedStream): self.write_deadline = ttl return True - def set_read_deadline(self, ttl): + def set_read_deadline(self, ttl: float) -> bool: """ set read deadline for muxed stream :return: True if successful @@ -115,7 +134,7 @@ class MplexStream(IMuxedStream): self.read_deadline = ttl return True - def set_write_deadline(self, ttl): + def set_write_deadline(self, ttl: float) -> bool: """ set write deadline for muxed stream :return: True if successful diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 07f62fb7..0de1f7d4 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -4,7 +4,7 @@ import struct from typing import Tuple -def encode_uvarint(number: int) -> bytearray: +def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" buf = b"" while True: @@ -18,7 +18,7 @@ def encode_uvarint(number: int) -> bytearray: return buf -def decode_uvarint(buff: bytearray, index: int) -> Tuple[int, int]: +def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: shift = 0 result = 0 while True: diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 9236fbca..ca23d9db 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from libp2p.network.swarm import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + from libp2p.stream_muxer.mplex.constants import HeaderTags class IMuxedConn(ABC): @@ -46,6 +47,14 @@ class IMuxedConn(ABC): :return: true if successful """ + @abstractmethod + async def read_buffer(self, stream_id: int) -> bytes: + """ + Read a message from stream_id's buffer, check raw connection for new messages + :param stream_id: stream id of stream to read from + :return: message read + """ + @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: "Multiaddr" @@ -62,3 +71,12 @@ class IMuxedConn(ABC): """ accepts a muxed stream opened by the other end """ + + @abstractmethod + async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: + """ + sends a message over the connection + :param header: header to use + :param data: data to send in the message + :param stream_id: stream the message is in + """ diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index 00ce5867..a4c602c2 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -8,36 +8,36 @@ class IMuxedStream(ABC): mplex_conn: IMuxedConn @abstractmethod - def read(self): + async def read(self) -> bytes: """ reads from the underlying muxed_conn :return: bytes of input """ @abstractmethod - def write(self, _bytes): + async def write(self, data: bytes) -> int: """ writes to the underlying muxed_conn :return: number of bytes written """ @abstractmethod - def close(self): + async def close(self) -> bool: """ close the underlying muxed_conn :return: true if successful """ @abstractmethod - def reset(self): + async def reset(self) -> bool: """ closes both ends of the stream tells this remote side to hang up - :return: error/exception + :return: true if successful """ @abstractmethod - def set_deadline(self, ttl): + def set_deadline(self, ttl: float) -> bool: """ set deadline for muxed stream :return: a new stream From 87ef2e4618f551faf1fb42ec055cb8309431441b Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 10:20:30 +0800 Subject: [PATCH 07/15] remove if TYPE_CHECKING as much as possible --- libp2p/stream_muxer/mplex/mplex.py | 33 +++++++-------- libp2p/stream_muxer/mplex/mplex_stream.py | 13 ++---- .../muxed_connection_interface.py | 27 ++++++------ libp2p/transport/listener_interface.py | 11 ++--- libp2p/transport/tcp/tcp.py | 19 ++++----- libp2p/transport/transport_interface.py | 17 ++++---- libp2p/transport/upgrader.py | 42 +++++++++---------- 7 files changed, 69 insertions(+), 93 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index fe414c7f..6e68fdc9 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,40 +1,37 @@ import asyncio -from typing import TYPE_CHECKING, Tuple, Dict +from typing import Tuple, Dict +from multiaddr import Multiaddr +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.peer.id import ID from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream from .constants import HeaderTags from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream -if TYPE_CHECKING: - from multiaddr import Multiaddr - from libp2p.security.secure_conn_interface import ISecureConn - from libp2p.network.connection.raw_connection_interface import IRawConnection - from libp2p.network.swarm import GenericProtocolHandlerFn - from libp2p.peer.id import ID - from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream - - class Mplex(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ - secured_conn: "ISecureConn" - raw_conn: "IRawConnection" + secured_conn: ISecureConn + raw_conn: IRawConnection initiator: bool generic_protocol_handler = None - peer_id: "ID" + peer_id: ID buffers: Dict[int, asyncio.Queue[bytes]] stream_queue: asyncio.Queue[int] def __init__( self, - secured_conn: "ISecureConn", - generic_protocol_handler: "GenericProtocolHandlerFn", - peer_id: "ID", + secured_conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, ) -> None: """ create a new muxed connection @@ -95,8 +92,8 @@ class Mplex(IMuxedConn): return None async def open_stream( - self, protocol_id: str, multi_addr: "Multiaddr" - ) -> "IMuxedStream": + self, protocol_id: str, multi_addr: Multiaddr + ) -> IMuxedStream: """ creates a new muxed_stream :param protocol_id: protocol_id of stream diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 5e64ac72..d90a5932 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,14 +1,11 @@ import asyncio -from typing import TYPE_CHECKING from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + from .constants import HeaderTags -if TYPE_CHECKING: - from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn - - class MplexStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go @@ -16,16 +13,14 @@ class MplexStream(IMuxedStream): stream_id: int initiator: bool - mplex_conn: "IMuxedConn" + mplex_conn: IMuxedConn read_deadline: float write_deadline: float local_closed: bool remote_closed: bool stream_lock: asyncio.Lock - def __init__( - self, stream_id: int, initiator: bool, mplex_conn: "IMuxedConn" - ) -> None: + def __init__(self, stream_id: int, initiator: bool, mplex_conn: IMuxedConn) -> None: """ create new MuxedStream in muxer :param stream_id: stream stream id diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index ca23d9db..dfaa58b6 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -1,14 +1,11 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from multiaddr import Multiaddr - from libp2p.security.secure_conn_interface import ISecureConn - from libp2p.network.swarm import GenericProtocolHandlerFn - from libp2p.peer.id import ID - from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream - from libp2p.stream_muxer.mplex.constants import HeaderTags +from multiaddr import Multiaddr +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.mplex.constants import HeaderTags class IMuxedConn(ABC): @@ -17,14 +14,14 @@ class IMuxedConn(ABC): """ initiator: bool - peer_id: "ID" + peer_id: ID @abstractmethod def __init__( self, - conn: "ISecureConn", - generic_protocol_handler: "GenericProtocolHandlerFn", - peer_id: "ID", + conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, ) -> None: """ create a new muxed connection @@ -57,8 +54,8 @@ class IMuxedConn(ABC): @abstractmethod async def open_stream( - self, protocol_id: str, multi_addr: "Multiaddr" - ) -> "IMuxedStream": + self, protocol_id: str, multi_addr: Multiaddr + ) -> IMuxedStream: """ creates a new muxed_stream :param protocol_id: protocol_id of stream diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 8e2d86b4..051d289b 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -1,15 +1,12 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import List - from multiaddr import Multiaddr +from typing import List +from multiaddr import Multiaddr class IListener(ABC): @abstractmethod - async def listen(self, maddr: "Multiaddr") -> bool: + async def listen(self, maddr: Multiaddr) -> bool: """ put listener in listening mode and wait for incoming connections :param maddr: multiaddr of peer @@ -17,7 +14,7 @@ class IListener(ABC): """ @abstractmethod - def get_addrs(self) -> "List[Multiaddr]": + def get_addrs(self) -> List[Multiaddr]: """ retrieve list of addresses the listener is listening on :return: return list of addrs diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 918f7120..b3d2fc8d 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,26 +1,23 @@ import asyncio +from typing import List, Callable from multiaddr import Multiaddr +from socket import socket +from libp2p.transport.typing import THandler +from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection import RawConnection from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport -from typing import TYPE_CHECKING, List, Callable - -if TYPE_CHECKING: - from socket import socket - from libp2p.transport.typing import THandler - from libp2p.network.connection.raw_connection_interface import IRawConnection - class TCPListener(IListener): multiaddrs: List[Multiaddr] server = None handler = None - def __init__(self, handler_function: "THandler" = None) -> None: + def __init__(self, handler_function: THandler = None) -> None: self.multiaddrs = [] self.server = None self.handler = handler_function @@ -69,7 +66,7 @@ class TCP(ITransport): def __init__(self) -> None: self.listener = TCPListener() - async def dial(self, maddr: Multiaddr, self_id: ID) -> "IRawConnection": + async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer @@ -94,7 +91,7 @@ class TCP(ITransport): return RawConnection(host, str(port), reader, writer, True) - def create_listener(self, handler_function: "THandler") -> TCPListener: + def create_listener(self, handler_function: THandler) -> TCPListener: """ create listener on transport :param handler_function: a function called when a new connection is received @@ -104,5 +101,5 @@ class TCP(ITransport): return TCPListener(handler_function) -def _multiaddr_from_socket(socket: "socket") -> Multiaddr: +def _multiaddr_from_socket(socket: socket) -> Multiaddr: return Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index b79d2461..38ea365f 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -1,18 +1,15 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from multiaddr import Multiaddr - from libp2p.peer.id import ID - from libp2p.network.connection.raw_connection_interface import IRawConnection - from .listener_interface import IListener - from .typing import THandler +from multiaddr import Multiaddr +from libp2p.peer.id import ID +from libp2p.network.connection.raw_connection_interface import IRawConnection +from .listener_interface import IListener +from .typing import THandler class ITransport(ABC): @abstractmethod - async def dial(self, maddr: "Multiaddr", self_id: "ID") -> "IRawConnection": + async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer @@ -21,7 +18,7 @@ class ITransport(ABC): """ @abstractmethod - def create_listener(self, handler_function: "THandler") -> "IListener": + def create_listener(self, handler_function: THandler) -> IListener: """ create listener on transport :param handler_function: a function called when a new conntion is received diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 3cbdad4b..56b4c619 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,27 +1,23 @@ -from libp2p.security.security_multistream import SecurityMultistream +from typing import Dict, Sequence + from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.security.security_multistream import SecurityMultistream, TProtocol +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.security.secure_transport_interface import ISecureTransport -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - - from typing import Dict, Sequence - from libp2p.network.connection.raw_connection_interface import IRawConnection - from libp2p.network.swarm import GenericProtocolHandlerFn - from libp2p.peer.id import ID - from libp2p.security.secure_conn_interface import ISecureConn - from libp2p.security.secure_transport_interface import ISecureTransport - from libp2p.security.security_multistream import TProtocol - from .transport_interface import ITransport - from .listener_interface import IListener +from .transport_interface import ITransport +from .listener_interface import IListener class TransportUpgrader: security_multistream: SecurityMultistream - muxer: "Sequence[str]" + muxer: Sequence[str] def __init__( - self, secOpt: "Dict[TProtocol, ISecureTransport]", muxerOpt: "Sequence[str]" + self, secOpt: Dict[TProtocol, ISecureTransport], muxerOpt: Sequence[str] ) -> None: # Store security option self.security_multistream = SecurityMultistream() @@ -31,15 +27,15 @@ class TransportUpgrader: # Store muxer option self.muxer = muxerOpt - def upgrade_listener(self, transport: "ITransport", listeners: "IListener") -> None: + def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ pass async def upgrade_security( - self, raw_conn: "IRawConnection", peer_id: "ID", initiator: bool - ) -> "ISecureConn": + self, raw_conn: IRawConnection, peer_id: ID, initiator: bool + ) -> ISecureConn: """ Upgrade conn to be a secured connection """ @@ -50,10 +46,10 @@ class TransportUpgrader: @staticmethod def upgrade_connection( - conn: "IRawConnection", - generic_protocol_handler: "GenericProtocolHandlerFn", - peer_id: "ID", - ) -> "Mplex": + conn: IRawConnection, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, + ) -> Mplex: """ Upgrade raw connection to muxed connection """ From 29091266fce18b6bea7e4b2f4bca10e039a59935 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 10:35:56 +0800 Subject: [PATCH 08/15] add still needed TYPE_CHECK --- libp2p/network/swarm.py | 4 +--- libp2p/network/typing.py | 4 ++++ libp2p/stream_muxer/mplex/mplex.py | 6 +++--- libp2p/stream_muxer/muxed_connection_interface.py | 11 ++++++++--- libp2p/stream_muxer/muxed_stream_interface.py | 7 +++++-- libp2p/transport/upgrader.py | 2 +- 6 files changed, 22 insertions(+), 12 deletions(-) create mode 100644 libp2p/network/typing.py diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index ce80b0ec..eccca255 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -19,6 +19,7 @@ from .network_interface import INetwork from .notifee_interface import INotifee from .stream.net_stream import NetStream from .stream.net_stream_interface import INetStream +from .typing import GenericProtocolHandlerFn StreamHandlerFn = Callable[[INetStream], Awaitable[None]] @@ -248,9 +249,6 @@ class Swarm(INetwork): # TODO: `disconnect`? -GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] - - def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: """ Create a generic protocol handler from the given swarm. We use swarm diff --git a/libp2p/network/typing.py b/libp2p/network/typing.py new file mode 100644 index 00000000..55b577e3 --- /dev/null +++ b/libp2p/network/typing.py @@ -0,0 +1,4 @@ +from typing import Awaitable, Callable +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + +GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 6e68fdc9..c359ee3b 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -4,7 +4,7 @@ from typing import Tuple, Dict from multiaddr import Multiaddr from libp2p.security.secure_conn_interface import ISecureConn from libp2p.network.connection.raw_connection_interface import IRawConnection -from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream @@ -24,8 +24,8 @@ class Mplex(IMuxedConn): initiator: bool generic_protocol_handler = None peer_id: ID - buffers: Dict[int, asyncio.Queue[bytes]] - stream_queue: asyncio.Queue[int] + buffers: Dict[int, "asyncio.Queue[bytes]"] + stream_queue: "asyncio.Queue[int]" def __init__( self, diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index dfaa58b6..9c54007c 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -2,11 +2,16 @@ from abc import ABC, abstractmethod from multiaddr import Multiaddr from libp2p.security.secure_conn_interface import ISecureConn -from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream from libp2p.stream_muxer.mplex.constants import HeaderTags +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + + class IMuxedConn(ABC): """ @@ -55,7 +60,7 @@ class IMuxedConn(ABC): @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: Multiaddr - ) -> IMuxedStream: + ) -> "IMuxedStream": """ creates a new muxed_stream :param protocol_id: protocol_id of stream diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index a4c602c2..141ba9f6 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -1,11 +1,14 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + +if TYPE_CHECKING: + from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn class IMuxedStream(ABC): - mplex_conn: IMuxedConn + mplex_conn: "IMuxedConn" @abstractmethod async def read(self) -> bytes: diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 56b4c619..6809fbb1 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -3,7 +3,7 @@ from typing import Dict, Sequence from libp2p.stream_muxer.mplex.mplex import Mplex from libp2p.security.security_multistream import SecurityMultistream, TProtocol from libp2p.network.connection.raw_connection_interface import IRawConnection -from libp2p.network.swarm import GenericProtocolHandlerFn +from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport From c804f5ad1985640566e405e47d1ef0c56e837e28 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 10:47:59 +0800 Subject: [PATCH 09/15] minor --- libp2p/transport/tcp/tcp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index b3d2fc8d..60a1e6e6 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -4,10 +4,11 @@ from typing import List, Callable from multiaddr import Multiaddr from socket import socket + +from libp2p.peer.id import ID from libp2p.transport.typing import THandler from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection import RawConnection - from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport From 9e0a8062182cc5f69fd60c164a8a89a2ceb7e81e Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 11:17:38 +0800 Subject: [PATCH 10/15] move stream and connection interfaces to abc --- libp2p/network/network_interface.py | 2 +- libp2p/network/notifee_interface.py | 2 +- libp2p/network/stream/net_stream.py | 3 +- libp2p/network/stream/net_stream_interface.py | 2 +- libp2p/network/swarm.py | 3 +- libp2p/network/typing.py | 2 +- libp2p/pubsub/pubsub_notifee.py | 2 +- .../{muxed_connection_interface.py => abc.py} | 59 +++++++++++++++---- libp2p/stream_muxer/mplex/mplex.py | 3 +- libp2p/stream_muxer/mplex/mplex_stream.py | 3 +- libp2p/stream_muxer/muxed_stream_interface.py | 47 --------------- 11 files changed, 56 insertions(+), 72 deletions(-) rename libp2p/stream_muxer/{muxed_connection_interface.py => abc.py} (61%) delete mode 100644 libp2p/stream_muxer/muxed_stream_interface.py diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 4989bdd3..c0085e55 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -5,7 +5,7 @@ from multiaddr import Multiaddr from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStore -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn from libp2p.transport.listener_interface import IListener from .stream.net_stream_interface import INetStream diff --git a/libp2p/network/notifee_interface.py b/libp2p/network/notifee_interface.py index 5f303b89..2eadd493 100644 --- a/libp2p/network/notifee_interface.py +++ b/libp2p/network/notifee_interface.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from multiaddr import Multiaddr from libp2p.network.stream.net_stream_interface import INetStream -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn if TYPE_CHECKING: from .network_interface import INetwork diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 8ebe7913..c5a7c2ec 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,5 +1,4 @@ -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from .net_stream_interface import INetStream diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index ca3858ff..d3ac2ffd 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn class INetStream(ABC): diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index eccca255..c743fd04 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -8,8 +8,7 @@ from libp2p.peer.peerstore import PeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.routing.interfaces import IPeerRouting -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader diff --git a/libp2p/network/typing.py b/libp2p/network/typing.py index 55b577e3..472510f2 100644 --- a/libp2p/network/typing.py +++ b/libp2p/network/typing.py @@ -1,4 +1,4 @@ from typing import Awaitable, Callable -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedStream GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index f8caa200..9d5132e2 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -5,7 +5,7 @@ from multiaddr import Multiaddr from libp2p.network.network_interface import INetwork from libp2p.network.notifee_interface import INotifee from libp2p.network.stream.net_stream_interface import INetStream -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn if TYPE_CHECKING: import asyncio diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/abc.py similarity index 61% rename from libp2p/stream_muxer/muxed_connection_interface.py rename to libp2p/stream_muxer/abc.py index 9c54007c..1e137eb9 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/abc.py @@ -1,16 +1,15 @@ from abc import ABC, abstractmethod -from multiaddr import Multiaddr -from libp2p.security.secure_conn_interface import ISecureConn -from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.mplex.constants import HeaderTags +from multiaddr import Multiaddr from typing import TYPE_CHECKING if TYPE_CHECKING: - from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream - + # Prevent GenericProtocolHandlerFn introducing circular dependencies + from libp2p.network.typing import GenericProtocolHandlerFn # noqa: F401 class IMuxedConn(ABC): @@ -23,10 +22,7 @@ class IMuxedConn(ABC): @abstractmethod def __init__( - self, - conn: ISecureConn, - generic_protocol_handler: GenericProtocolHandlerFn, - peer_id: ID, + self, conn: ISecureConn, generic_protocol_handler: "GenericProtocolHandlerFn", peer_id: ID ) -> None: """ create a new muxed connection @@ -58,9 +54,7 @@ class IMuxedConn(ABC): """ @abstractmethod - async def open_stream( - self, protocol_id: str, multi_addr: Multiaddr - ) -> "IMuxedStream": + async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> "IMuxedStream": """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -82,3 +76,44 @@ class IMuxedConn(ABC): :param data: data to send in the message :param stream_id: stream the message is in """ + + +class IMuxedStream(ABC): + + mplex_conn: IMuxedConn + + @abstractmethod + async def read(self) -> bytes: + """ + reads from the underlying muxed_conn + :return: bytes of input + """ + + @abstractmethod + async def write(self, data: bytes) -> int: + """ + writes to the underlying muxed_conn + :return: number of bytes written + """ + + @abstractmethod + async def close(self) -> bool: + """ + close the underlying muxed_conn + :return: true if successful + """ + + @abstractmethod + async def reset(self) -> bool: + """ + closes both ends of the stream + tells this remote side to hang up + :return: true if successful + """ + + @abstractmethod + def set_deadline(self, ttl: float) -> bool: + """ + set deadline for muxed stream + :return: a new stream + """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index c359ee3b..f113de17 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -6,8 +6,7 @@ from libp2p.security.secure_conn_interface import ISecureConn from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from .constants import HeaderTags from .utils import encode_uvarint, decode_uvarint_from_stream diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index d90a5932..3b8ceda3 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,6 +1,5 @@ import asyncio -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedStream, IMuxedConn from .constants import HeaderTags diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py deleted file mode 100644 index 141ba9f6..00000000 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ /dev/null @@ -1,47 +0,0 @@ -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - - -if TYPE_CHECKING: - from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn - - -class IMuxedStream(ABC): - - mplex_conn: "IMuxedConn" - - @abstractmethod - async def read(self) -> bytes: - """ - reads from the underlying muxed_conn - :return: bytes of input - """ - - @abstractmethod - async def write(self, data: bytes) -> int: - """ - writes to the underlying muxed_conn - :return: number of bytes written - """ - - @abstractmethod - async def close(self) -> bool: - """ - close the underlying muxed_conn - :return: true if successful - """ - - @abstractmethod - async def reset(self) -> bool: - """ - closes both ends of the stream - tells this remote side to hang up - :return: true if successful - """ - - @abstractmethod - def set_deadline(self, ttl: float) -> bool: - """ - set deadline for muxed stream - :return: a new stream - """ From 7a04ebb51f1f1af68815b1edd45a172beeecc255 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 11:21:20 +0800 Subject: [PATCH 11/15] run black --- libp2p/stream_muxer/mplex/mplex.py | 4 +--- libp2p/stream_muxer/mplex/mplex_stream.py | 12 ++---------- libp2p/stream_muxer/mplex/utils.py | 4 +--- libp2p/transport/tcp/tcp.py | 4 +--- libp2p/transport/typing.py | 1 - libp2p/transport/upgrader.py | 8 ++------ tests/peer/test_peerinfo.py | 4 +++- 7 files changed, 10 insertions(+), 27 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index f113de17..54a9e89d 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -90,9 +90,7 @@ class Mplex(IMuxedConn): # Stream not created yet return None - async def open_stream( - self, protocol_id: str, multi_addr: Multiaddr - ) -> IMuxedStream: + async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> IMuxedStream: """ creates a new muxed_stream :param protocol_id: protocol_id of stream diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3b8ceda3..05a87071 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -47,11 +47,7 @@ class MplexStream(IMuxedStream): write to stream :return: number of bytes written """ - flag = ( - HeaderTags.MessageInitiator - if self.initiator - else HeaderTags.MessageReceiver - ) + flag = HeaderTags.MessageInitiator if self.initiator else HeaderTags.MessageReceiver return await self.mplex_conn.send_message(flag, data, self.stream_id) async def close(self) -> bool: @@ -93,11 +89,7 @@ class MplexStream(IMuxedStream): return True if not self.remote_closed: - flag = ( - HeaderTags.ResetInitiator - if self.initiator - else HeaderTags.ResetInitiator - ) + flag = HeaderTags.ResetInitiator if self.initiator else HeaderTags.ResetInitiator await self.mplex_conn.send_message(flag, None, self.stream_id) self.local_closed = True diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 0de1f7d4..52cf0ff2 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -32,9 +32,7 @@ def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: return result, index + 1 -async def decode_uvarint_from_stream( - reader: asyncio.StreamReader, timeout: float -) -> int: +async def decode_uvarint_from_stream(reader: asyncio.StreamReader, timeout: float) -> int: shift = 0 result = 0 while True: diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 60a1e6e6..2682c2b1 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -30,9 +30,7 @@ class TCPListener(IListener): :return: return True if successful """ self.server = await asyncio.start_server( - self.handler, - maddr.value_for_protocol("ip4"), - maddr.value_for_protocol("tcp"), + self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") ) socket = self.server.sockets[0] self.multiaddrs.append(_multiaddr_from_socket(socket)) diff --git a/libp2p/transport/typing.py b/libp2p/transport/typing.py index 9fd36b9d..0f5eabbd 100644 --- a/libp2p/transport/typing.py +++ b/libp2p/transport/typing.py @@ -3,4 +3,3 @@ from typing import NewType, Callable THandler = Callable[[StreamReader, StreamWriter], None] - diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 6809fbb1..2d4dc63f 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -16,9 +16,7 @@ class TransportUpgrader: security_multistream: SecurityMultistream muxer: Sequence[str] - def __init__( - self, secOpt: Dict[TProtocol, ISecureTransport], muxerOpt: Sequence[str] - ) -> None: + def __init__(self, secOpt: Dict[TProtocol, ISecureTransport], muxerOpt: Sequence[str]) -> None: # Store security option self.security_multistream = SecurityMultistream() for key in secOpt: @@ -46,9 +44,7 @@ class TransportUpgrader: @staticmethod def upgrade_connection( - conn: IRawConnection, - generic_protocol_handler: GenericProtocolHandlerFn, - peer_id: ID, + conn: IRawConnection, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID ) -> Mplex: """ Upgrade raw connection to muxed connection diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index d8690f5d..a25cb805 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -8,7 +8,9 @@ from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" -VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" # noqa: E501 +VALID_MULTI_ADDR_STR = ( + "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" +) # noqa: E501 def test_init_(): From e763f579303fb7f7352d53d75a76a57f990a6557 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 11:22:44 +0800 Subject: [PATCH 12/15] run isort --- libp2p/network/connection/raw_connection_interface.py | 2 +- libp2p/network/typing.py | 1 + libp2p/stream_muxer/abc.py | 6 +++--- libp2p/stream_muxer/mplex/mplex.py | 7 ++++--- libp2p/stream_muxer/mplex/mplex_stream.py | 2 +- libp2p/stream_muxer/mplex/utils.py | 1 - libp2p/transport/listener_interface.py | 2 +- libp2p/transport/tcp/tcp.py | 11 +++++------ libp2p/transport/transport_interface.py | 4 +++- libp2p/transport/typing.py | 3 +-- libp2p/transport/upgrader.py | 6 +++--- 11 files changed, 23 insertions(+), 22 deletions(-) diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 088eaec1..1810f58f 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -1,5 +1,5 @@ -import asyncio from abc import ABC, abstractmethod +import asyncio class IRawConnection(ABC): diff --git a/libp2p/network/typing.py b/libp2p/network/typing.py index 472510f2..713c1d8f 100644 --- a/libp2p/network/typing.py +++ b/libp2p/network/typing.py @@ -1,4 +1,5 @@ from typing import Awaitable, Callable + from libp2p.stream_muxer.abc import IMuxedStream GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 1e137eb9..05a4cfe8 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -1,11 +1,11 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +from multiaddr import Multiaddr from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.mplex.constants import HeaderTags -from multiaddr import Multiaddr - -from typing import TYPE_CHECKING if TYPE_CHECKING: # Prevent GenericProtocolHandlerFn introducing circular dependencies diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 54a9e89d..a6e5b486 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,16 +1,17 @@ import asyncio -from typing import Tuple, Dict +from typing import Dict, Tuple from multiaddr import Multiaddr -from libp2p.security.secure_conn_interface import ISecureConn + from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from .constants import HeaderTags -from .utils import encode_uvarint, decode_uvarint_from_stream from .mplex_stream import MplexStream +from .utils import decode_uvarint_from_stream, encode_uvarint class Mplex(IMuxedConn): diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 05a87071..e4fb21d0 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,6 +1,6 @@ import asyncio -from libp2p.stream_muxer.abc import IMuxedStream, IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from .constants import HeaderTags diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 52cf0ff2..35467493 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -1,6 +1,5 @@ import asyncio import struct - from typing import Tuple diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 051d289b..fecc3b9e 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod - from typing import List + from multiaddr import Multiaddr diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 2682c2b1..4a9217db 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,16 +1,15 @@ import asyncio +from socket import socket +from typing import List -from typing import List, Callable from multiaddr import Multiaddr -from socket import socket - -from libp2p.peer.id import ID -from libp2p.transport.typing import THandler -from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection import RawConnection +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.peer.id import ID from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport +from libp2p.transport.typing import THandler class TCPListener(IListener): diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index 38ea365f..2f5a1ac6 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -1,8 +1,10 @@ from abc import ABC, abstractmethod from multiaddr import Multiaddr -from libp2p.peer.id import ID + from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.peer.id import ID + from .listener_interface import IListener from .typing import THandler diff --git a/libp2p/transport/typing.py b/libp2p/transport/typing.py index 0f5eabbd..147fe11d 100644 --- a/libp2p/transport/typing.py +++ b/libp2p/transport/typing.py @@ -1,5 +1,4 @@ from asyncio import StreamReader, StreamWriter -from typing import NewType, Callable - +from typing import Callable THandler = Callable[[StreamReader, StreamWriter], None] diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 2d4dc63f..363247a1 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,15 +1,15 @@ from typing import Dict, Sequence -from libp2p.stream_muxer.mplex.mplex import Mplex -from libp2p.security.security_multistream import SecurityMultistream, TProtocol from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.security.security_multistream import SecurityMultistream, TProtocol +from libp2p.stream_muxer.mplex.mplex import Mplex -from .transport_interface import ITransport from .listener_interface import IListener +from .transport_interface import ITransport class TransportUpgrader: From ccfb6eb35ff46a0f5f0092dd9edc5b0e6c69eca5 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 16:56:56 +0800 Subject: [PATCH 13/15] remove constructor of TCP --- libp2p/transport/tcp/tcp.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 4a9217db..20116f7f 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -61,9 +61,6 @@ class TCPListener(IListener): class TCP(ITransport): - def __init__(self) -> None: - self.listener = TCPListener() - async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr From 63c733c3f5c62c697ef16d3c067f8f9a819f912c Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 16:58:34 +0800 Subject: [PATCH 14/15] PR feedback --- libp2p/transport/tcp/tcp.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 20116f7f..02d981be 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -69,9 +69,9 @@ class TCP(ITransport): :return: True if successful """ host = maddr.value_for_protocol("ip4") - port = int(maddr.value_for_protocol("tcp")) + port = maddr.value_for_protocol("tcp") - reader, writer = await asyncio.open_connection(host, port) + reader, writer = await asyncio.open_connection(host, int(port)) # First: send our peer ID so receiver knows it writer.write(self_id.to_base58().encode()) @@ -84,7 +84,7 @@ class TCP(ITransport): if ack != expected_ack_str: raise Exception("Receiver did not receive peer id") - return RawConnection(host, str(port), reader, writer, True) + return RawConnection(host, port, reader, writer, True) def create_listener(self, handler_function: THandler) -> TCPListener: """ From cb3a59e0acb726faf4c2db9c23496ac519ad5432 Mon Sep 17 00:00:00 2001 From: Chih Cheng Liang Date: Mon, 5 Aug 2019 17:02:18 +0800 Subject: [PATCH 15/15] ttl as int --- libp2p/stream_muxer/abc.py | 2 +- libp2p/stream_muxer/mplex/mplex_stream.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 05a4cfe8..0b779336 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -112,7 +112,7 @@ class IMuxedStream(ABC): """ @abstractmethod - def set_deadline(self, ttl: float) -> bool: + def set_deadline(self, ttl: int) -> bool: """ set deadline for muxed stream :return: a new stream diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index e4fb21d0..e4627ec5 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -13,8 +13,8 @@ class MplexStream(IMuxedStream): stream_id: int initiator: bool mplex_conn: IMuxedConn - read_deadline: float - write_deadline: float + read_deadline: int + write_deadline: int local_closed: bool remote_closed: bool stream_lock: asyncio.Lock @@ -103,7 +103,7 @@ class MplexStream(IMuxedStream): return True # TODO deadline not in use - def set_deadline(self, ttl: float) -> bool: + def set_deadline(self, ttl: int) -> bool: """ set deadline for muxed stream :return: True if successful @@ -112,7 +112,7 @@ class MplexStream(IMuxedStream): self.write_deadline = ttl return True - def set_read_deadline(self, ttl: float) -> bool: + def set_read_deadline(self, ttl: int) -> bool: """ set read deadline for muxed stream :return: True if successful @@ -120,7 +120,7 @@ class MplexStream(IMuxedStream): self.read_deadline = ttl return True - def set_write_deadline(self, ttl: float) -> bool: + def set_write_deadline(self, ttl: int) -> bool: """ set write deadline for muxed stream :return: True if successful