mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Merge pull request #203 from ChihChengLiang/tranport-typing
add typing to transport and stream_muxer
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import asyncio
|
||||
|
||||
|
||||
class IRawConnection(ABC):
|
||||
@ -6,6 +7,13 @@ class IRawConnection(ABC):
|
||||
A Raw Connection provides a Reader and a Writer
|
||||
"""
|
||||
|
||||
initiator: bool
|
||||
|
||||
# TODO: reader and writer shouldn't be exposed.
|
||||
# Need better API for the consumers
|
||||
reader: asyncio.StreamReader
|
||||
writer: asyncio.StreamWriter
|
||||
|
||||
@abstractmethod
|
||||
async def write(self, data: bytes) -> None:
|
||||
pass
|
||||
@ -13,3 +21,11 @@ class IRawConnection(ABC):
|
||||
@abstractmethod
|
||||
async def read(self) -> bytes:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def next_stream_id(self) -> int:
|
||||
pass
|
||||
|
||||
@ -5,7 +5,7 @@ from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peerstore import PeerStore
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.abc import IMuxedConn
|
||||
from libp2p.transport.listener_interface import IListener
|
||||
|
||||
from .stream.net_stream_interface import INetStream
|
||||
|
||||
@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.abc import IMuxedConn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .network_interface import INetwork
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
|
||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||
|
||||
from .net_stream_interface import INetStream
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.abc import IMuxedConn
|
||||
|
||||
|
||||
class INetStream(ABC):
|
||||
|
||||
@ -8,8 +8,7 @@ from libp2p.peer.peerstore import PeerStore
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||
from libp2p.routing.interfaces import IPeerRouting
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
|
||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||
from libp2p.transport.listener_interface import IListener
|
||||
from libp2p.transport.transport_interface import ITransport
|
||||
from libp2p.transport.upgrader import TransportUpgrader
|
||||
@ -19,6 +18,7 @@ from .network_interface import INetwork
|
||||
from .notifee_interface import INotifee
|
||||
from .stream.net_stream import NetStream
|
||||
from .stream.net_stream_interface import INetStream
|
||||
from .typing import GenericProtocolHandlerFn
|
||||
|
||||
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
|
||||
|
||||
@ -248,9 +248,6 @@ class Swarm(INetwork):
|
||||
# TODO: `disconnect`?
|
||||
|
||||
|
||||
GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]]
|
||||
|
||||
|
||||
def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn:
|
||||
"""
|
||||
Create a generic protocol handler from the given swarm. We use swarm
|
||||
|
||||
5
libp2p/network/typing.py
Normal file
5
libp2p/network/typing.py
Normal file
@ -0,0 +1,5 @@
|
||||
from typing import Awaitable, Callable
|
||||
|
||||
from libp2p.stream_muxer.abc import IMuxedStream
|
||||
|
||||
GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]]
|
||||
@ -5,7 +5,7 @@ from multiaddr import Multiaddr
|
||||
from libp2p.network.network_interface import INetwork
|
||||
from libp2p.network.notifee_interface import INotifee
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
from libp2p.stream_muxer.abc import IMuxedConn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import asyncio
|
||||
|
||||
119
libp2p/stream_muxer/abc.py
Normal file
119
libp2p/stream_muxer/abc.py
Normal file
@ -0,0 +1,119 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.security.secure_conn_interface import ISecureConn
|
||||
from libp2p.stream_muxer.mplex.constants import HeaderTags
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# Prevent GenericProtocolHandlerFn introducing circular dependencies
|
||||
from libp2p.network.typing import GenericProtocolHandlerFn # noqa: F401
|
||||
|
||||
|
||||
class IMuxedConn(ABC):
|
||||
"""
|
||||
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
|
||||
"""
|
||||
|
||||
initiator: bool
|
||||
peer_id: ID
|
||||
|
||||
@abstractmethod
|
||||
def __init__(
|
||||
self, conn: ISecureConn, generic_protocol_handler: "GenericProtocolHandlerFn", peer_id: ID
|
||||
) -> None:
|
||||
"""
|
||||
create a new muxed connection
|
||||
:param conn: an instance of secured connection
|
||||
:param generic_protocol_handler: generic protocol handler
|
||||
for new muxed streams
|
||||
:param peer_id: peer_id of peer the connection is to
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
"""
|
||||
close connection
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def is_closed(self) -> bool:
|
||||
"""
|
||||
check connection is fully closed
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def read_buffer(self, stream_id: int) -> bytes:
|
||||
"""
|
||||
Read a message from stream_id's buffer, check raw connection for new messages
|
||||
:param stream_id: stream id of stream to read from
|
||||
:return: message read
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> "IMuxedStream":
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
:param multi_addr: multi_addr that stream connects to
|
||||
:return: a new stream
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def accept_stream(self) -> None:
|
||||
"""
|
||||
accepts a muxed stream opened by the other end
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int:
|
||||
"""
|
||||
sends a message over the connection
|
||||
:param header: header to use
|
||||
:param data: data to send in the message
|
||||
:param stream_id: stream the message is in
|
||||
"""
|
||||
|
||||
|
||||
class IMuxedStream(ABC):
|
||||
|
||||
mplex_conn: IMuxedConn
|
||||
|
||||
@abstractmethod
|
||||
async def read(self) -> bytes:
|
||||
"""
|
||||
reads from the underlying muxed_conn
|
||||
:return: bytes of input
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def write(self, data: bytes) -> int:
|
||||
"""
|
||||
writes to the underlying muxed_conn
|
||||
:return: number of bytes written
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def close(self) -> bool:
|
||||
"""
|
||||
close the underlying muxed_conn
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def reset(self) -> bool:
|
||||
"""
|
||||
closes both ends of the stream
|
||||
tells this remote side to hang up
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_deadline(self, ttl: int) -> bool:
|
||||
"""
|
||||
set deadline for muxed stream
|
||||
:return: a new stream
|
||||
"""
|
||||
@ -1 +1,11 @@
|
||||
HEADER_TAGS = {"NEW_STREAM": 0, "MESSAGE": 2, "CLOSE": 4, "RESET": 6}
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class HeaderTags(Enum):
|
||||
NewStream = 0
|
||||
MessageReceiver = 1
|
||||
MessageInitiator = 2
|
||||
CloseReceiver = 3
|
||||
CloseInitiator = 4
|
||||
ResetReceiver = 5
|
||||
ResetInitiator = 6
|
||||
|
||||
@ -1,8 +1,17 @@
|
||||
import asyncio
|
||||
from typing import Dict, Tuple
|
||||
|
||||
from ..muxed_connection_interface import IMuxedConn
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
||||
from libp2p.network.typing import GenericProtocolHandlerFn
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.security.secure_conn_interface import ISecureConn
|
||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||
|
||||
from .constants import HeaderTags
|
||||
from .mplex_stream import MplexStream
|
||||
from .utils import decode_uvarint_from_stream, encode_uvarint, get_flag
|
||||
from .utils import decode_uvarint_from_stream, encode_uvarint
|
||||
|
||||
|
||||
class Mplex(IMuxedConn):
|
||||
@ -10,7 +19,20 @@ class Mplex(IMuxedConn):
|
||||
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
|
||||
"""
|
||||
|
||||
def __init__(self, secured_conn, generic_protocol_handler, peer_id):
|
||||
secured_conn: ISecureConn
|
||||
raw_conn: IRawConnection
|
||||
initiator: bool
|
||||
generic_protocol_handler = None
|
||||
peer_id: ID
|
||||
buffers: Dict[int, "asyncio.Queue[bytes]"]
|
||||
stream_queue: "asyncio.Queue[int]"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
secured_conn: ISecureConn,
|
||||
generic_protocol_handler: GenericProtocolHandlerFn,
|
||||
peer_id: ID,
|
||||
) -> None:
|
||||
"""
|
||||
create a new muxed connection
|
||||
:param conn: an instance of raw connection
|
||||
@ -18,7 +40,7 @@ class Mplex(IMuxedConn):
|
||||
for new muxed streams
|
||||
:param peer_id: peer_id of peer the connection is to
|
||||
"""
|
||||
super(Mplex, self).__init__(secured_conn, generic_protocol_handler, peer_id)
|
||||
super().__init__(secured_conn, generic_protocol_handler, peer_id)
|
||||
|
||||
self.secured_conn = secured_conn
|
||||
self.raw_conn = secured_conn.get_conn()
|
||||
@ -38,19 +60,20 @@ class Mplex(IMuxedConn):
|
||||
# Kick off reading
|
||||
asyncio.ensure_future(self.handle_incoming())
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""
|
||||
close the stream muxer and underlying raw connection
|
||||
"""
|
||||
self.raw_conn.close()
|
||||
|
||||
def is_closed(self):
|
||||
def is_closed(self) -> bool:
|
||||
"""
|
||||
check connection is fully closed
|
||||
:return: true if successful
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
async def read_buffer(self, stream_id):
|
||||
async def read_buffer(self, stream_id: int) -> bytes:
|
||||
"""
|
||||
Read a message from stream_id's buffer, check raw connection for new messages
|
||||
:param stream_id: stream id of stream to read from
|
||||
@ -68,7 +91,7 @@ class Mplex(IMuxedConn):
|
||||
# Stream not created yet
|
||||
return None
|
||||
|
||||
async def open_stream(self, protocol_id, multi_addr):
|
||||
async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> IMuxedStream:
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
@ -78,28 +101,26 @@ class Mplex(IMuxedConn):
|
||||
stream_id = self.raw_conn.next_stream_id()
|
||||
stream = MplexStream(stream_id, multi_addr, self)
|
||||
self.buffers[stream_id] = asyncio.Queue()
|
||||
await self.send_message(get_flag(self.initiator, "NEW_STREAM"), None, stream_id)
|
||||
await self.send_message(HeaderTags.NewStream, None, stream_id)
|
||||
return stream
|
||||
|
||||
async def accept_stream(self):
|
||||
async def accept_stream(self) -> None:
|
||||
"""
|
||||
accepts a muxed stream opened by the other end
|
||||
:return: the accepted stream
|
||||
"""
|
||||
stream_id = await self.stream_queue.get()
|
||||
stream = MplexStream(stream_id, False, self)
|
||||
asyncio.ensure_future(self.generic_protocol_handler(stream))
|
||||
|
||||
async def send_message(self, flag, data, stream_id):
|
||||
async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int:
|
||||
"""
|
||||
sends a message over the connection
|
||||
:param header: header to use
|
||||
:param data: data to send in the message
|
||||
:param stream_id: stream the message is in
|
||||
:return: True if success
|
||||
"""
|
||||
# << by 3, then or with flag
|
||||
header = (stream_id << 3) | flag
|
||||
header = (stream_id << 3) | flag.value
|
||||
header = encode_uvarint(header)
|
||||
|
||||
if data is None:
|
||||
@ -111,7 +132,7 @@ class Mplex(IMuxedConn):
|
||||
|
||||
return await self.write_to_stream(_bytes)
|
||||
|
||||
async def write_to_stream(self, _bytes):
|
||||
async def write_to_stream(self, _bytes: bytearray) -> int:
|
||||
"""
|
||||
writes a byte array to a raw connection
|
||||
:param _bytes: byte array to write
|
||||
@ -121,7 +142,7 @@ class Mplex(IMuxedConn):
|
||||
await self.raw_conn.writer.drain()
|
||||
return len(_bytes)
|
||||
|
||||
async def handle_incoming(self):
|
||||
async def handle_incoming(self) -> None:
|
||||
"""
|
||||
Read a message off of the raw connection and add it to the corresponding message buffer
|
||||
"""
|
||||
@ -135,7 +156,7 @@ class Mplex(IMuxedConn):
|
||||
self.buffers[stream_id] = asyncio.Queue()
|
||||
await self.stream_queue.put(stream_id)
|
||||
|
||||
if flag is get_flag(True, "NEW_STREAM"):
|
||||
if flag == HeaderTags.NewStream.value:
|
||||
# new stream detected on connection
|
||||
await self.accept_stream()
|
||||
|
||||
@ -145,7 +166,7 @@ class Mplex(IMuxedConn):
|
||||
# Force context switch
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def read_message(self):
|
||||
async def read_message(self) -> Tuple[int, int, bytes]:
|
||||
"""
|
||||
Read a single message off of the raw connection
|
||||
:return: stream_id, flag, message contents
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
import asyncio
|
||||
|
||||
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
|
||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||
|
||||
from .utils import get_flag
|
||||
from .constants import HeaderTags
|
||||
|
||||
|
||||
class MplexStream(IMuxedStream):
|
||||
@ -10,7 +10,16 @@ class MplexStream(IMuxedStream):
|
||||
reference: https://github.com/libp2p/go-mplex/blob/master/stream.go
|
||||
"""
|
||||
|
||||
def __init__(self, stream_id, initiator, mplex_conn):
|
||||
stream_id: int
|
||||
initiator: bool
|
||||
mplex_conn: IMuxedConn
|
||||
read_deadline: int
|
||||
write_deadline: int
|
||||
local_closed: bool
|
||||
remote_closed: bool
|
||||
stream_lock: asyncio.Lock
|
||||
|
||||
def __init__(self, stream_id: int, initiator: bool, mplex_conn: IMuxedConn) -> None:
|
||||
"""
|
||||
create new MuxedStream in muxer
|
||||
:param stream_id: stream stream id
|
||||
@ -26,23 +35,22 @@ class MplexStream(IMuxedStream):
|
||||
self.remote_closed = False
|
||||
self.stream_lock = asyncio.Lock()
|
||||
|
||||
async def read(self):
|
||||
async def read(self) -> bytes:
|
||||
"""
|
||||
read messages associated with stream from buffer til end of file
|
||||
:return: bytes of input
|
||||
"""
|
||||
return await self.mplex_conn.read_buffer(self.stream_id)
|
||||
|
||||
async def write(self, data):
|
||||
async def write(self, data: bytes) -> int:
|
||||
"""
|
||||
write to stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
return await self.mplex_conn.send_message(
|
||||
get_flag(self.initiator, "MESSAGE"), data, self.stream_id
|
||||
)
|
||||
flag = HeaderTags.MessageInitiator if self.initiator else HeaderTags.MessageReceiver
|
||||
return await self.mplex_conn.send_message(flag, data, self.stream_id)
|
||||
|
||||
async def close(self):
|
||||
async def close(self) -> bool:
|
||||
"""
|
||||
Closing a stream closes it for writing and closes the remote end for reading
|
||||
but allows writing in the other direction.
|
||||
@ -50,9 +58,10 @@ class MplexStream(IMuxedStream):
|
||||
"""
|
||||
# TODO error handling with timeout
|
||||
# TODO understand better how mutexes are used from go repo
|
||||
await self.mplex_conn.send_message(get_flag(self.initiator, "CLOSE"), None, self.stream_id)
|
||||
flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver
|
||||
await self.mplex_conn.send_message(flag, None, self.stream_id)
|
||||
|
||||
remote_lock = ""
|
||||
remote_lock = False
|
||||
async with self.stream_lock:
|
||||
if self.local_closed:
|
||||
return True
|
||||
@ -60,12 +69,14 @@ class MplexStream(IMuxedStream):
|
||||
remote_lock = self.remote_closed
|
||||
|
||||
if remote_lock:
|
||||
async with self.mplex_conn.conn_lock:
|
||||
self.mplex_conn.buffers.pop(self.stream_id)
|
||||
# FIXME: mplex_conn has no conn_lock!
|
||||
async with self.mplex_conn.conn_lock: # type: ignore
|
||||
# FIXME: Don't access to buffers directly
|
||||
self.mplex_conn.buffers.pop(self.stream_id) # type: ignore
|
||||
|
||||
return True
|
||||
|
||||
async def reset(self):
|
||||
async def reset(self) -> bool:
|
||||
"""
|
||||
closes both ends of the stream
|
||||
tells this remote side to hang up
|
||||
@ -78,20 +89,21 @@ class MplexStream(IMuxedStream):
|
||||
return True
|
||||
|
||||
if not self.remote_closed:
|
||||
await self.mplex_conn.send_message(
|
||||
get_flag(self.initiator, "RESET"), None, self.stream_id
|
||||
)
|
||||
flag = HeaderTags.ResetInitiator if self.initiator else HeaderTags.ResetInitiator
|
||||
await self.mplex_conn.send_message(flag, None, self.stream_id)
|
||||
|
||||
self.local_closed = True
|
||||
self.remote_closed = True
|
||||
|
||||
async with self.mplex_conn.conn_lock:
|
||||
self.mplex_conn.buffers.pop(self.stream_id, None)
|
||||
# FIXME: mplex_conn has no conn_lock!
|
||||
async with self.mplex_conn.conn_lock: # type: ignore
|
||||
# FIXME: Don't access to buffers directly
|
||||
self.mplex_conn.buffers.pop(self.stream_id, None) # type: ignore
|
||||
|
||||
return True
|
||||
|
||||
# TODO deadline not in use
|
||||
def set_deadline(self, ttl):
|
||||
def set_deadline(self, ttl: int) -> bool:
|
||||
"""
|
||||
set deadline for muxed stream
|
||||
:return: True if successful
|
||||
@ -100,7 +112,7 @@ class MplexStream(IMuxedStream):
|
||||
self.write_deadline = ttl
|
||||
return True
|
||||
|
||||
def set_read_deadline(self, ttl):
|
||||
def set_read_deadline(self, ttl: int) -> bool:
|
||||
"""
|
||||
set read deadline for muxed stream
|
||||
:return: True if successful
|
||||
@ -108,7 +120,7 @@ class MplexStream(IMuxedStream):
|
||||
self.read_deadline = ttl
|
||||
return True
|
||||
|
||||
def set_write_deadline(self, ttl):
|
||||
def set_write_deadline(self, ttl: int) -> bool:
|
||||
"""
|
||||
set write deadline for muxed stream
|
||||
:return: True if successful
|
||||
|
||||
@ -1,10 +1,9 @@
|
||||
import asyncio
|
||||
import struct
|
||||
|
||||
from .constants import HEADER_TAGS
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def encode_uvarint(number):
|
||||
def encode_uvarint(number: int) -> bytes:
|
||||
"""Pack `number` into varint bytes"""
|
||||
buf = b""
|
||||
while True:
|
||||
@ -18,7 +17,7 @@ def encode_uvarint(number):
|
||||
return buf
|
||||
|
||||
|
||||
def decode_uvarint(buff, index):
|
||||
def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]:
|
||||
shift = 0
|
||||
result = 0
|
||||
while True:
|
||||
@ -32,7 +31,7 @@ def decode_uvarint(buff, index):
|
||||
return result, index + 1
|
||||
|
||||
|
||||
async def decode_uvarint_from_stream(reader, timeout):
|
||||
async def decode_uvarint_from_stream(reader: asyncio.StreamReader, timeout: float) -> int:
|
||||
shift = 0
|
||||
result = 0
|
||||
while True:
|
||||
@ -44,15 +43,3 @@ async def decode_uvarint_from_stream(reader, timeout):
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_flag(initiator, action):
|
||||
"""
|
||||
get header flag based on action for mplex
|
||||
:param action: action type in str
|
||||
:return: int flag
|
||||
"""
|
||||
if initiator or HEADER_TAGS[action] == 0:
|
||||
return HEADER_TAGS[action]
|
||||
|
||||
return HEADER_TAGS[action] - 1
|
||||
|
||||
@ -1,52 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
|
||||
|
||||
class IMuxedConn(ABC):
|
||||
"""
|
||||
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
|
||||
"""
|
||||
|
||||
initiator: bool
|
||||
peer_id: ID
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, conn, generic_protocol_handler, peer_id):
|
||||
"""
|
||||
create a new muxed connection
|
||||
:param conn: an instance of secured connection
|
||||
:param generic_protocol_handler: generic protocol handler
|
||||
for new muxed streams
|
||||
:param peer_id: peer_id of peer the connection is to
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""
|
||||
close connection
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def is_closed(self):
|
||||
"""
|
||||
check connection is fully closed
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def open_stream(self, protocol_id, multi_addr):
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
:param multi_addr: multi_addr that stream connects to
|
||||
:return: a new stream
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def accept_stream(self):
|
||||
"""
|
||||
accepts a muxed stream opened by the other end
|
||||
:return: the accepted stream
|
||||
"""
|
||||
@ -1,44 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
|
||||
|
||||
|
||||
class IMuxedStream(ABC):
|
||||
|
||||
mplex_conn: IMuxedConn
|
||||
|
||||
@abstractmethod
|
||||
def read(self):
|
||||
"""
|
||||
reads from the underlying muxed_conn
|
||||
:return: bytes of input
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def write(self, _bytes):
|
||||
"""
|
||||
writes to the underlying muxed_conn
|
||||
:return: number of bytes written
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""
|
||||
close the underlying muxed_conn
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def reset(self):
|
||||
"""
|
||||
closes both ends of the stream
|
||||
tells this remote side to hang up
|
||||
:return: error/exception
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_deadline(self, ttl):
|
||||
"""
|
||||
set deadline for muxed stream
|
||||
:return: a new stream
|
||||
"""
|
||||
@ -1,9 +1,12 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
|
||||
class IListener(ABC):
|
||||
@abstractmethod
|
||||
def listen(self, maddr):
|
||||
async def listen(self, maddr: Multiaddr) -> bool:
|
||||
"""
|
||||
put listener in listening mode and wait for incoming connections
|
||||
:param maddr: multiaddr of peer
|
||||
@ -11,18 +14,16 @@ class IListener(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_addrs(self):
|
||||
def get_addrs(self) -> List[Multiaddr]:
|
||||
"""
|
||||
retrieve list of addresses the listener is listening on
|
||||
:return: return list of addrs
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def close(self, options=None):
|
||||
def close(self) -> bool:
|
||||
"""
|
||||
close the listener such that no more connections
|
||||
can be open on this transport instance
|
||||
:param options: optional object potential with timeout
|
||||
a timeout value in ms that fires and destroy all connections
|
||||
:return: return True if successful
|
||||
"""
|
||||
|
||||
@ -1,74 +1,77 @@
|
||||
import asyncio
|
||||
from socket import socket
|
||||
from typing import List
|
||||
|
||||
import multiaddr
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.network.connection.raw_connection import RawConnection
|
||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.transport.listener_interface import IListener
|
||||
from libp2p.transport.transport_interface import ITransport
|
||||
from libp2p.transport.typing import THandler
|
||||
|
||||
from ..listener_interface import IListener
|
||||
from ..transport_interface import ITransport
|
||||
|
||||
class TCPListener(IListener):
|
||||
multiaddrs: List[Multiaddr]
|
||||
server = None
|
||||
handler = None
|
||||
|
||||
def __init__(self, handler_function: THandler = None) -> None:
|
||||
self.multiaddrs = []
|
||||
self.server = None
|
||||
self.handler = handler_function
|
||||
|
||||
async def listen(self, maddr: Multiaddr) -> bool:
|
||||
"""
|
||||
put listener in listening mode and wait for incoming connections
|
||||
:param maddr: maddr of peer
|
||||
:return: return True if successful
|
||||
"""
|
||||
self.server = await asyncio.start_server(
|
||||
self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp")
|
||||
)
|
||||
socket = self.server.sockets[0]
|
||||
self.multiaddrs.append(_multiaddr_from_socket(socket))
|
||||
|
||||
return True
|
||||
|
||||
def get_addrs(self) -> List[Multiaddr]:
|
||||
"""
|
||||
retrieve list of addresses the listener is listening on
|
||||
:return: return list of addrs
|
||||
"""
|
||||
# TODO check if server is listening
|
||||
return self.multiaddrs
|
||||
|
||||
def close(self) -> bool:
|
||||
"""
|
||||
close the listener such that no more connections
|
||||
can be open on this transport instance
|
||||
:return: return True if successful
|
||||
"""
|
||||
if self.server is None:
|
||||
return False
|
||||
self.server.close()
|
||||
_loop = asyncio.get_event_loop()
|
||||
_loop.run_until_complete(self.server.wait_closed())
|
||||
_loop.close()
|
||||
self.server = None
|
||||
return True
|
||||
|
||||
|
||||
class TCP(ITransport):
|
||||
def __init__(self):
|
||||
self.listener = self.Listener()
|
||||
|
||||
class Listener(IListener):
|
||||
def __init__(self, handler_function=None):
|
||||
self.multiaddrs = []
|
||||
self.server = None
|
||||
self.handler = handler_function
|
||||
|
||||
async def listen(self, maddr):
|
||||
"""
|
||||
put listener in listening mode and wait for incoming connections
|
||||
:param maddr: maddr of peer
|
||||
:return: return True if successful
|
||||
"""
|
||||
self.server = await asyncio.start_server(
|
||||
self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp")
|
||||
)
|
||||
socket = self.server.sockets[0]
|
||||
self.multiaddrs.append(_multiaddr_from_socket(socket))
|
||||
|
||||
return True
|
||||
|
||||
def get_addrs(self):
|
||||
"""
|
||||
retrieve list of addresses the listener is listening on
|
||||
:return: return list of addrs
|
||||
"""
|
||||
# TODO check if server is listening
|
||||
return self.multiaddrs
|
||||
|
||||
def close(self, options=None):
|
||||
"""
|
||||
close the listener such that no more connections
|
||||
can be open on this transport instance
|
||||
:param options: optional object potential with timeout
|
||||
a timeout value in ms that fires and destroy all connections
|
||||
:return: return True if successful
|
||||
"""
|
||||
if self.server is None:
|
||||
return False
|
||||
self.server.close()
|
||||
_loop = asyncio.get_event_loop()
|
||||
_loop.run_until_complete(self.server.wait_closed())
|
||||
_loop.close()
|
||||
self.server = None
|
||||
return True
|
||||
|
||||
async def dial(self, maddr, self_id, options=None):
|
||||
async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection:
|
||||
"""
|
||||
dial a transport to peer listening on multiaddr
|
||||
:param maddr: multiaddr of peer
|
||||
:param self_id: peer_id of the dialer (to send to receiver)
|
||||
:param options: optional object
|
||||
:return: True if successful
|
||||
"""
|
||||
host = maddr.value_for_protocol("ip4")
|
||||
port = int(maddr.value_for_protocol("tcp"))
|
||||
port = maddr.value_for_protocol("tcp")
|
||||
|
||||
reader, writer = await asyncio.open_connection(host, port)
|
||||
reader, writer = await asyncio.open_connection(host, int(port))
|
||||
|
||||
# First: send our peer ID so receiver knows it
|
||||
writer.write(self_id.to_base58().encode())
|
||||
@ -83,16 +86,15 @@ class TCP(ITransport):
|
||||
|
||||
return RawConnection(host, port, reader, writer, True)
|
||||
|
||||
def create_listener(self, handler_function, options=None):
|
||||
def create_listener(self, handler_function: THandler) -> TCPListener:
|
||||
"""
|
||||
create listener on transport
|
||||
:param options: optional object with properties the listener must have
|
||||
:param handler_function: a function called when a new connection is received
|
||||
that takes a connection as argument which implements interface-connection
|
||||
:return: a listener object that implements listener_interface.py
|
||||
"""
|
||||
return self.Listener(handler_function)
|
||||
return TCPListener(handler_function)
|
||||
|
||||
|
||||
def _multiaddr_from_socket(socket):
|
||||
return multiaddr.Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname())
|
||||
def _multiaddr_from_socket(socket: socket) -> Multiaddr:
|
||||
return Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname())
|
||||
|
||||
@ -1,22 +1,28 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
||||
from libp2p.peer.id import ID
|
||||
|
||||
from .listener_interface import IListener
|
||||
from .typing import THandler
|
||||
|
||||
|
||||
class ITransport(ABC):
|
||||
@abstractmethod
|
||||
def dial(self, maddr, self_id, options=None):
|
||||
async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection:
|
||||
"""
|
||||
dial a transport to peer listening on multiaddr
|
||||
:param multiaddr: multiaddr of peer
|
||||
:param self_id: peer_id of the dialer (to send to receiver)
|
||||
:param options: optional object
|
||||
:return: list of multiaddrs
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def create_listener(self, handler_function, options=None):
|
||||
def create_listener(self, handler_function: THandler) -> IListener:
|
||||
"""
|
||||
create listener on transport
|
||||
:param options: optional object with properties the listener must have
|
||||
:param handler_function: a function called when a new conntion is received
|
||||
that takes a connection as argument which implements interface-connection
|
||||
:return: a listener object that implements listener_interface.py
|
||||
|
||||
4
libp2p/transport/typing.py
Normal file
4
libp2p/transport/typing.py
Normal file
@ -0,0 +1,4 @@
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from typing import Callable
|
||||
|
||||
THandler = Callable[[StreamReader, StreamWriter], None]
|
||||
@ -1,9 +1,22 @@
|
||||
from libp2p.security.security_multistream import SecurityMultistream
|
||||
from typing import Dict, Sequence
|
||||
|
||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
||||
from libp2p.network.typing import GenericProtocolHandlerFn
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.security.secure_conn_interface import ISecureConn
|
||||
from libp2p.security.secure_transport_interface import ISecureTransport
|
||||
from libp2p.security.security_multistream import SecurityMultistream, TProtocol
|
||||
from libp2p.stream_muxer.mplex.mplex import Mplex
|
||||
|
||||
from .listener_interface import IListener
|
||||
from .transport_interface import ITransport
|
||||
|
||||
|
||||
class TransportUpgrader:
|
||||
def __init__(self, secOpt, muxerOpt):
|
||||
security_multistream: SecurityMultistream
|
||||
muxer: Sequence[str]
|
||||
|
||||
def __init__(self, secOpt: Dict[TProtocol, ISecureTransport], muxerOpt: Sequence[str]) -> None:
|
||||
# Store security option
|
||||
self.security_multistream = SecurityMultistream()
|
||||
for key in secOpt:
|
||||
@ -12,12 +25,15 @@ class TransportUpgrader:
|
||||
# Store muxer option
|
||||
self.muxer = muxerOpt
|
||||
|
||||
def upgrade_listener(self, transport, listeners):
|
||||
def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None:
|
||||
"""
|
||||
Upgrade multiaddr listeners to libp2p-transport listeners
|
||||
"""
|
||||
pass
|
||||
|
||||
async def upgrade_security(self, raw_conn, peer_id, initiator):
|
||||
async def upgrade_security(
|
||||
self, raw_conn: IRawConnection, peer_id: ID, initiator: bool
|
||||
) -> ISecureConn:
|
||||
"""
|
||||
Upgrade conn to be a secured connection
|
||||
"""
|
||||
@ -26,7 +42,10 @@ class TransportUpgrader:
|
||||
|
||||
return await self.security_multistream.secure_inbound(raw_conn)
|
||||
|
||||
def upgrade_connection(self, conn, generic_protocol_handler, peer_id):
|
||||
@staticmethod
|
||||
def upgrade_connection(
|
||||
conn: IRawConnection, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID
|
||||
) -> Mplex:
|
||||
"""
|
||||
Upgrade raw connection to muxed connection
|
||||
"""
|
||||
|
||||
@ -8,7 +8,9 @@ from libp2p.peer.peerdata import PeerData
|
||||
from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr
|
||||
|
||||
ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
||||
VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" # noqa: E501
|
||||
VALID_MULTI_ADDR_STR = (
|
||||
"/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ"
|
||||
) # noqa: E501
|
||||
|
||||
|
||||
def test_init_():
|
||||
|
||||
Reference in New Issue
Block a user