diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 1e35514b..1810f58f 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +import asyncio class IRawConnection(ABC): @@ -6,6 +7,13 @@ class IRawConnection(ABC): A Raw Connection provides a Reader and a Writer """ + initiator: bool + + # TODO: reader and writer shouldn't be exposed. + # Need better API for the consumers + reader: asyncio.StreamReader + writer: asyncio.StreamWriter + @abstractmethod async def write(self, data: bytes) -> None: pass @@ -13,3 +21,11 @@ class IRawConnection(ABC): @abstractmethod async def read(self) -> bytes: pass + + @abstractmethod + def close(self) -> None: + pass + + @abstractmethod + def next_stream_id(self) -> int: + pass diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 4989bdd3..c0085e55 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -5,7 +5,7 @@ from multiaddr import Multiaddr from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStore -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn from libp2p.transport.listener_interface import IListener from .stream.net_stream_interface import INetStream diff --git a/libp2p/network/notifee_interface.py b/libp2p/network/notifee_interface.py index 5f303b89..2eadd493 100644 --- a/libp2p/network/notifee_interface.py +++ b/libp2p/network/notifee_interface.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from multiaddr import Multiaddr from libp2p.network.stream.net_stream_interface import INetStream -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn if TYPE_CHECKING: from .network_interface import INetwork diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 8ebe7913..c5a7c2ec 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,5 +1,4 @@ -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from .net_stream_interface import INetStream diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index ca3858ff..d3ac2ffd 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn class INetStream(ABC): diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index ce80b0ec..c743fd04 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -8,8 +8,7 @@ from libp2p.peer.peerstore import PeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.routing.interfaces import IPeerRouting -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader @@ -19,6 +18,7 @@ from .network_interface import INetwork from .notifee_interface import INotifee from .stream.net_stream import NetStream from .stream.net_stream_interface import INetStream +from .typing import GenericProtocolHandlerFn StreamHandlerFn = Callable[[INetStream], Awaitable[None]] @@ -248,9 +248,6 @@ class Swarm(INetwork): # TODO: `disconnect`? -GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] - - def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: """ Create a generic protocol handler from the given swarm. We use swarm diff --git a/libp2p/network/typing.py b/libp2p/network/typing.py new file mode 100644 index 00000000..713c1d8f --- /dev/null +++ b/libp2p/network/typing.py @@ -0,0 +1,5 @@ +from typing import Awaitable, Callable + +from libp2p.stream_muxer.abc import IMuxedStream + +GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index f8caa200..9d5132e2 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -5,7 +5,7 @@ from multiaddr import Multiaddr from libp2p.network.network_interface import INetwork from libp2p.network.notifee_interface import INotifee from libp2p.network.stream.net_stream_interface import INetStream -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.abc import IMuxedConn if TYPE_CHECKING: import asyncio diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py new file mode 100644 index 00000000..0b779336 --- /dev/null +++ b/libp2p/stream_muxer/abc.py @@ -0,0 +1,119 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +from multiaddr import Multiaddr + +from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.stream_muxer.mplex.constants import HeaderTags + +if TYPE_CHECKING: + # Prevent GenericProtocolHandlerFn introducing circular dependencies + from libp2p.network.typing import GenericProtocolHandlerFn # noqa: F401 + + +class IMuxedConn(ABC): + """ + reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go + """ + + initiator: bool + peer_id: ID + + @abstractmethod + def __init__( + self, conn: ISecureConn, generic_protocol_handler: "GenericProtocolHandlerFn", peer_id: ID + ) -> None: + """ + create a new muxed connection + :param conn: an instance of secured connection + :param generic_protocol_handler: generic protocol handler + for new muxed streams + :param peer_id: peer_id of peer the connection is to + """ + + @abstractmethod + def close(self) -> None: + """ + close connection + """ + + @abstractmethod + def is_closed(self) -> bool: + """ + check connection is fully closed + :return: true if successful + """ + + @abstractmethod + async def read_buffer(self, stream_id: int) -> bytes: + """ + Read a message from stream_id's buffer, check raw connection for new messages + :param stream_id: stream id of stream to read from + :return: message read + """ + + @abstractmethod + async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> "IMuxedStream": + """ + creates a new muxed_stream + :param protocol_id: protocol_id of stream + :param multi_addr: multi_addr that stream connects to + :return: a new stream + """ + + @abstractmethod + async def accept_stream(self) -> None: + """ + accepts a muxed stream opened by the other end + """ + + @abstractmethod + async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: + """ + sends a message over the connection + :param header: header to use + :param data: data to send in the message + :param stream_id: stream the message is in + """ + + +class IMuxedStream(ABC): + + mplex_conn: IMuxedConn + + @abstractmethod + async def read(self) -> bytes: + """ + reads from the underlying muxed_conn + :return: bytes of input + """ + + @abstractmethod + async def write(self, data: bytes) -> int: + """ + writes to the underlying muxed_conn + :return: number of bytes written + """ + + @abstractmethod + async def close(self) -> bool: + """ + close the underlying muxed_conn + :return: true if successful + """ + + @abstractmethod + async def reset(self) -> bool: + """ + closes both ends of the stream + tells this remote side to hang up + :return: true if successful + """ + + @abstractmethod + def set_deadline(self, ttl: int) -> bool: + """ + set deadline for muxed stream + :return: a new stream + """ diff --git a/libp2p/stream_muxer/mplex/constants.py b/libp2p/stream_muxer/mplex/constants.py index a0537b2e..8989e763 100644 --- a/libp2p/stream_muxer/mplex/constants.py +++ b/libp2p/stream_muxer/mplex/constants.py @@ -1 +1,11 @@ -HEADER_TAGS = {"NEW_STREAM": 0, "MESSAGE": 2, "CLOSE": 4, "RESET": 6} +from enum import Enum + + +class HeaderTags(Enum): + NewStream = 0 + MessageReceiver = 1 + MessageInitiator = 2 + CloseReceiver = 3 + CloseInitiator = 4 + ResetReceiver = 5 + ResetInitiator = 6 diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index f00588b0..a6e5b486 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,8 +1,17 @@ import asyncio +from typing import Dict, Tuple -from ..muxed_connection_interface import IMuxedConn +from multiaddr import Multiaddr + +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.typing import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream + +from .constants import HeaderTags from .mplex_stream import MplexStream -from .utils import decode_uvarint_from_stream, encode_uvarint, get_flag +from .utils import decode_uvarint_from_stream, encode_uvarint class Mplex(IMuxedConn): @@ -10,7 +19,20 @@ class Mplex(IMuxedConn): reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ - def __init__(self, secured_conn, generic_protocol_handler, peer_id): + secured_conn: ISecureConn + raw_conn: IRawConnection + initiator: bool + generic_protocol_handler = None + peer_id: ID + buffers: Dict[int, "asyncio.Queue[bytes]"] + stream_queue: "asyncio.Queue[int]" + + def __init__( + self, + secured_conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, + ) -> None: """ create a new muxed connection :param conn: an instance of raw connection @@ -18,7 +40,7 @@ class Mplex(IMuxedConn): for new muxed streams :param peer_id: peer_id of peer the connection is to """ - super(Mplex, self).__init__(secured_conn, generic_protocol_handler, peer_id) + super().__init__(secured_conn, generic_protocol_handler, peer_id) self.secured_conn = secured_conn self.raw_conn = secured_conn.get_conn() @@ -38,19 +60,20 @@ class Mplex(IMuxedConn): # Kick off reading asyncio.ensure_future(self.handle_incoming()) - def close(self): + def close(self) -> None: """ close the stream muxer and underlying raw connection """ self.raw_conn.close() - def is_closed(self): + def is_closed(self) -> bool: """ check connection is fully closed :return: true if successful """ + raise NotImplementedError() - async def read_buffer(self, stream_id): + async def read_buffer(self, stream_id: int) -> bytes: """ Read a message from stream_id's buffer, check raw connection for new messages :param stream_id: stream id of stream to read from @@ -68,7 +91,7 @@ class Mplex(IMuxedConn): # Stream not created yet return None - async def open_stream(self, protocol_id, multi_addr): + async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> IMuxedStream: """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -78,28 +101,26 @@ class Mplex(IMuxedConn): stream_id = self.raw_conn.next_stream_id() stream = MplexStream(stream_id, multi_addr, self) self.buffers[stream_id] = asyncio.Queue() - await self.send_message(get_flag(self.initiator, "NEW_STREAM"), None, stream_id) + await self.send_message(HeaderTags.NewStream, None, stream_id) return stream - async def accept_stream(self): + async def accept_stream(self) -> None: """ accepts a muxed stream opened by the other end - :return: the accepted stream """ stream_id = await self.stream_queue.get() stream = MplexStream(stream_id, False, self) asyncio.ensure_future(self.generic_protocol_handler(stream)) - async def send_message(self, flag, data, stream_id): + async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: """ sends a message over the connection :param header: header to use :param data: data to send in the message :param stream_id: stream the message is in - :return: True if success """ # << by 3, then or with flag - header = (stream_id << 3) | flag + header = (stream_id << 3) | flag.value header = encode_uvarint(header) if data is None: @@ -111,7 +132,7 @@ class Mplex(IMuxedConn): return await self.write_to_stream(_bytes) - async def write_to_stream(self, _bytes): + async def write_to_stream(self, _bytes: bytearray) -> int: """ writes a byte array to a raw connection :param _bytes: byte array to write @@ -121,7 +142,7 @@ class Mplex(IMuxedConn): await self.raw_conn.writer.drain() return len(_bytes) - async def handle_incoming(self): + async def handle_incoming(self) -> None: """ Read a message off of the raw connection and add it to the corresponding message buffer """ @@ -135,7 +156,7 @@ class Mplex(IMuxedConn): self.buffers[stream_id] = asyncio.Queue() await self.stream_queue.put(stream_id) - if flag is get_flag(True, "NEW_STREAM"): + if flag == HeaderTags.NewStream.value: # new stream detected on connection await self.accept_stream() @@ -145,7 +166,7 @@ class Mplex(IMuxedConn): # Force context switch await asyncio.sleep(0) - async def read_message(self): + async def read_message(self) -> Tuple[int, int, bytes]: """ Read a single message off of the raw connection :return: stream_id, flag, message contents diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index a1a25b70..e4627ec5 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,8 +1,8 @@ import asyncio -from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream -from .utils import get_flag +from .constants import HeaderTags class MplexStream(IMuxedStream): @@ -10,7 +10,16 @@ class MplexStream(IMuxedStream): reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator, mplex_conn): + stream_id: int + initiator: bool + mplex_conn: IMuxedConn + read_deadline: int + write_deadline: int + local_closed: bool + remote_closed: bool + stream_lock: asyncio.Lock + + def __init__(self, stream_id: int, initiator: bool, mplex_conn: IMuxedConn) -> None: """ create new MuxedStream in muxer :param stream_id: stream stream id @@ -26,23 +35,22 @@ class MplexStream(IMuxedStream): self.remote_closed = False self.stream_lock = asyncio.Lock() - async def read(self): + async def read(self) -> bytes: """ read messages associated with stream from buffer til end of file :return: bytes of input """ return await self.mplex_conn.read_buffer(self.stream_id) - async def write(self, data): + async def write(self, data: bytes) -> int: """ write to stream :return: number of bytes written """ - return await self.mplex_conn.send_message( - get_flag(self.initiator, "MESSAGE"), data, self.stream_id - ) + flag = HeaderTags.MessageInitiator if self.initiator else HeaderTags.MessageReceiver + return await self.mplex_conn.send_message(flag, data, self.stream_id) - async def close(self): + async def close(self) -> bool: """ Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction. @@ -50,9 +58,10 @@ class MplexStream(IMuxedStream): """ # TODO error handling with timeout # TODO understand better how mutexes are used from go repo - await self.mplex_conn.send_message(get_flag(self.initiator, "CLOSE"), None, self.stream_id) + flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver + await self.mplex_conn.send_message(flag, None, self.stream_id) - remote_lock = "" + remote_lock = False async with self.stream_lock: if self.local_closed: return True @@ -60,12 +69,14 @@ class MplexStream(IMuxedStream): remote_lock = self.remote_closed if remote_lock: - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id) # type: ignore return True - async def reset(self): + async def reset(self) -> bool: """ closes both ends of the stream tells this remote side to hang up @@ -78,20 +89,21 @@ class MplexStream(IMuxedStream): return True if not self.remote_closed: - await self.mplex_conn.send_message( - get_flag(self.initiator, "RESET"), None, self.stream_id - ) + flag = HeaderTags.ResetInitiator if self.initiator else HeaderTags.ResetInitiator + await self.mplex_conn.send_message(flag, None, self.stream_id) self.local_closed = True self.remote_closed = True - async with self.mplex_conn.conn_lock: - self.mplex_conn.buffers.pop(self.stream_id, None) + # FIXME: mplex_conn has no conn_lock! + async with self.mplex_conn.conn_lock: # type: ignore + # FIXME: Don't access to buffers directly + self.mplex_conn.buffers.pop(self.stream_id, None) # type: ignore return True # TODO deadline not in use - def set_deadline(self, ttl): + def set_deadline(self, ttl: int) -> bool: """ set deadline for muxed stream :return: True if successful @@ -100,7 +112,7 @@ class MplexStream(IMuxedStream): self.write_deadline = ttl return True - def set_read_deadline(self, ttl): + def set_read_deadline(self, ttl: int) -> bool: """ set read deadline for muxed stream :return: True if successful @@ -108,7 +120,7 @@ class MplexStream(IMuxedStream): self.read_deadline = ttl return True - def set_write_deadline(self, ttl): + def set_write_deadline(self, ttl: int) -> bool: """ set write deadline for muxed stream :return: True if successful diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 70dcd128..35467493 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -1,10 +1,9 @@ import asyncio import struct - -from .constants import HEADER_TAGS +from typing import Tuple -def encode_uvarint(number): +def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" buf = b"" while True: @@ -18,7 +17,7 @@ def encode_uvarint(number): return buf -def decode_uvarint(buff, index): +def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: shift = 0 result = 0 while True: @@ -32,7 +31,7 @@ def decode_uvarint(buff, index): return result, index + 1 -async def decode_uvarint_from_stream(reader, timeout): +async def decode_uvarint_from_stream(reader: asyncio.StreamReader, timeout: float) -> int: shift = 0 result = 0 while True: @@ -44,15 +43,3 @@ async def decode_uvarint_from_stream(reader, timeout): break return result - - -def get_flag(initiator, action): - """ - get header flag based on action for mplex - :param action: action type in str - :return: int flag - """ - if initiator or HEADER_TAGS[action] == 0: - return HEADER_TAGS[action] - - return HEADER_TAGS[action] - 1 diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py deleted file mode 100644 index b7bd4e6b..00000000 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ /dev/null @@ -1,52 +0,0 @@ -from abc import ABC, abstractmethod - -from libp2p.peer.id import ID - - -class IMuxedConn(ABC): - """ - reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go - """ - - initiator: bool - peer_id: ID - - @abstractmethod - def __init__(self, conn, generic_protocol_handler, peer_id): - """ - create a new muxed connection - :param conn: an instance of secured connection - :param generic_protocol_handler: generic protocol handler - for new muxed streams - :param peer_id: peer_id of peer the connection is to - """ - - @abstractmethod - def close(self): - """ - close connection - :return: true if successful - """ - - @abstractmethod - def is_closed(self): - """ - check connection is fully closed - :return: true if successful - """ - - @abstractmethod - def open_stream(self, protocol_id, multi_addr): - """ - creates a new muxed_stream - :param protocol_id: protocol_id of stream - :param multi_addr: multi_addr that stream connects to - :return: a new stream - """ - - @abstractmethod - def accept_stream(self): - """ - accepts a muxed stream opened by the other end - :return: the accepted stream - """ diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py deleted file mode 100644 index 00ce5867..00000000 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ /dev/null @@ -1,44 +0,0 @@ -from abc import ABC, abstractmethod - -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn - - -class IMuxedStream(ABC): - - mplex_conn: IMuxedConn - - @abstractmethod - def read(self): - """ - reads from the underlying muxed_conn - :return: bytes of input - """ - - @abstractmethod - def write(self, _bytes): - """ - writes to the underlying muxed_conn - :return: number of bytes written - """ - - @abstractmethod - def close(self): - """ - close the underlying muxed_conn - :return: true if successful - """ - - @abstractmethod - def reset(self): - """ - closes both ends of the stream - tells this remote side to hang up - :return: error/exception - """ - - @abstractmethod - def set_deadline(self, ttl): - """ - set deadline for muxed stream - :return: a new stream - """ diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 5f84ef6c..fecc3b9e 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -1,9 +1,12 @@ from abc import ABC, abstractmethod +from typing import List + +from multiaddr import Multiaddr class IListener(ABC): @abstractmethod - def listen(self, maddr): + async def listen(self, maddr: Multiaddr) -> bool: """ put listener in listening mode and wait for incoming connections :param maddr: multiaddr of peer @@ -11,18 +14,16 @@ class IListener(ABC): """ @abstractmethod - def get_addrs(self): + def get_addrs(self) -> List[Multiaddr]: """ retrieve list of addresses the listener is listening on :return: return list of addrs """ @abstractmethod - def close(self, options=None): + def close(self) -> bool: """ close the listener such that no more connections can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections :return: return True if successful """ diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index c163beaa..02d981be 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,74 +1,77 @@ import asyncio +from socket import socket +from typing import List -import multiaddr +from multiaddr import Multiaddr from libp2p.network.connection.raw_connection import RawConnection +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.peer.id import ID +from libp2p.transport.listener_interface import IListener +from libp2p.transport.transport_interface import ITransport +from libp2p.transport.typing import THandler -from ..listener_interface import IListener -from ..transport_interface import ITransport + +class TCPListener(IListener): + multiaddrs: List[Multiaddr] + server = None + handler = None + + def __init__(self, handler_function: THandler = None) -> None: + self.multiaddrs = [] + self.server = None + self.handler = handler_function + + async def listen(self, maddr: Multiaddr) -> bool: + """ + put listener in listening mode and wait for incoming connections + :param maddr: maddr of peer + :return: return True if successful + """ + self.server = await asyncio.start_server( + self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") + ) + socket = self.server.sockets[0] + self.multiaddrs.append(_multiaddr_from_socket(socket)) + + return True + + def get_addrs(self) -> List[Multiaddr]: + """ + retrieve list of addresses the listener is listening on + :return: return list of addrs + """ + # TODO check if server is listening + return self.multiaddrs + + def close(self) -> bool: + """ + close the listener such that no more connections + can be open on this transport instance + :return: return True if successful + """ + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True class TCP(ITransport): - def __init__(self): - self.listener = self.Listener() - - class Listener(IListener): - def __init__(self, handler_function=None): - self.multiaddrs = [] - self.server = None - self.handler = handler_function - - async def listen(self, maddr): - """ - put listener in listening mode and wait for incoming connections - :param maddr: maddr of peer - :return: return True if successful - """ - self.server = await asyncio.start_server( - self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") - ) - socket = self.server.sockets[0] - self.multiaddrs.append(_multiaddr_from_socket(socket)) - - return True - - def get_addrs(self): - """ - retrieve list of addresses the listener is listening on - :return: return list of addrs - """ - # TODO check if server is listening - return self.multiaddrs - - def close(self, options=None): - """ - close the listener such that no more connections - can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections - :return: return True if successful - """ - if self.server is None: - return False - self.server.close() - _loop = asyncio.get_event_loop() - _loop.run_until_complete(self.server.wait_closed()) - _loop.close() - self.server = None - return True - - async def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: True if successful """ host = maddr.value_for_protocol("ip4") - port = int(maddr.value_for_protocol("tcp")) + port = maddr.value_for_protocol("tcp") - reader, writer = await asyncio.open_connection(host, port) + reader, writer = await asyncio.open_connection(host, int(port)) # First: send our peer ID so receiver knows it writer.write(self_id.to_base58().encode()) @@ -83,16 +86,15 @@ class TCP(ITransport): return RawConnection(host, port, reader, writer, True) - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: THandler) -> TCPListener: """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new connection is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ - return self.Listener(handler_function) + return TCPListener(handler_function) -def _multiaddr_from_socket(socket): - return multiaddr.Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) +def _multiaddr_from_socket(socket: socket) -> Multiaddr: + return Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index b067e5b5..2f5a1ac6 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -1,22 +1,28 @@ from abc import ABC, abstractmethod +from multiaddr import Multiaddr + +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.peer.id import ID + +from .listener_interface import IListener +from .typing import THandler + class ITransport(ABC): @abstractmethod - def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: list of multiaddrs """ @abstractmethod - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: THandler) -> IListener: """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new conntion is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py diff --git a/libp2p/transport/typing.py b/libp2p/transport/typing.py new file mode 100644 index 00000000..147fe11d --- /dev/null +++ b/libp2p/transport/typing.py @@ -0,0 +1,4 @@ +from asyncio import StreamReader, StreamWriter +from typing import Callable + +THandler = Callable[[StreamReader, StreamWriter], None] diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f14d7fcd..363247a1 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,9 +1,22 @@ -from libp2p.security.security_multistream import SecurityMultistream +from typing import Dict, Sequence + +from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.network.typing import GenericProtocolHandlerFn +from libp2p.peer.id import ID +from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.security.security_multistream import SecurityMultistream, TProtocol from libp2p.stream_muxer.mplex.mplex import Mplex +from .listener_interface import IListener +from .transport_interface import ITransport + class TransportUpgrader: - def __init__(self, secOpt, muxerOpt): + security_multistream: SecurityMultistream + muxer: Sequence[str] + + def __init__(self, secOpt: Dict[TProtocol, ISecureTransport], muxerOpt: Sequence[str]) -> None: # Store security option self.security_multistream = SecurityMultistream() for key in secOpt: @@ -12,12 +25,15 @@ class TransportUpgrader: # Store muxer option self.muxer = muxerOpt - def upgrade_listener(self, transport, listeners): + def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ + pass - async def upgrade_security(self, raw_conn, peer_id, initiator): + async def upgrade_security( + self, raw_conn: IRawConnection, peer_id: ID, initiator: bool + ) -> ISecureConn: """ Upgrade conn to be a secured connection """ @@ -26,7 +42,10 @@ class TransportUpgrader: return await self.security_multistream.secure_inbound(raw_conn) - def upgrade_connection(self, conn, generic_protocol_handler, peer_id): + @staticmethod + def upgrade_connection( + conn: IRawConnection, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID + ) -> Mplex: """ Upgrade raw connection to muxed connection """ diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index d8690f5d..a25cb805 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -8,7 +8,9 @@ from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" -VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" # noqa: E501 +VALID_MULTI_ADDR_STR = ( + "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" +) # noqa: E501 def test_init_():