From 19650d0f72f170e90c4721e26b061050ce5682dd Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 31 Oct 2018 22:31:00 +0100 Subject: [PATCH 1/2] muxer scaffolding --- connection/muxed_connection.py | 0 connection/muxed_connection_interface.py | 0 muxer/muxed_connection.py | 41 ++++++++++++ muxer/muxed_connection_interface.py | 32 +++++++++ muxer/muxed_stream.py | 33 ++++++++++ muxer/muxed_stream_interface.py | 25 +++++++ muxer/smux_multiplex.py | 44 +++++++++++++ network/tcp.py | 83 ++++++++++++++++++++++++ 8 files changed, 258 insertions(+) delete mode 100644 connection/muxed_connection.py delete mode 100644 connection/muxed_connection_interface.py create mode 100644 muxer/muxed_connection.py create mode 100644 muxer/muxed_connection_interface.py create mode 100644 muxer/muxed_stream.py create mode 100644 muxer/muxed_stream_interface.py create mode 100644 network/tcp.py diff --git a/connection/muxed_connection.py b/connection/muxed_connection.py deleted file mode 100644 index e69de29b..00000000 diff --git a/connection/muxed_connection_interface.py b/connection/muxed_connection_interface.py deleted file mode 100644 index e69de29b..00000000 diff --git a/muxer/muxed_connection.py b/muxer/muxed_connection.py new file mode 100644 index 00000000..26c80f5c --- /dev/null +++ b/muxer/muxed_connection.py @@ -0,0 +1,41 @@ +from .muxed_connection_interface import IMuxedConnection + +class MuxedConnection(IMuxedConnection): + """ + reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go + """ + def __init__(self, conn, initiator): + """ + create a new muxed connection + :param conn: an instance of raw connection + :param initiator: boolean to prevent multiplex with self + """ + self.raw_conn = conn + self.initiator = initiator + + def close(self): + """ + close the stream muxer and underlying raw connection + """ + pass + + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + pass + + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + pass + + def accept_stream(self): + """ + accepts a muxed stream opened by the other end + :return: the accepted stream + """ + pass diff --git a/muxer/muxed_connection_interface.py b/muxer/muxed_connection_interface.py new file mode 100644 index 00000000..cf125918 --- /dev/null +++ b/muxer/muxed_connection_interface.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod + +class IMuxedConn(ABC): + """ + reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go + """ + + # TODO closer + + @abstractmethod + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + pass + + @abstractmethod + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + pass + + @abstractmethod + def accept_stream(self): + """ + accepts a muxed stream opened by the other end + :return: the accepted stream + """ + pass diff --git a/muxer/muxed_stream.py b/muxer/muxed_stream.py new file mode 100644 index 00000000..be37d58a --- /dev/null +++ b/muxer/muxed_stream.py @@ -0,0 +1,33 @@ +from .muxed_stream_interface import IMuxedStream + +class MuxedStream(IMuxedStream): + """ + reference: https://github.com/libp2p/go-mplex/blob/master/stream.go + """ + def __init__(self, stream_id, stream_name): + self.id = stream_id + self.name = stream_name + + def read(self): + pass + + def write(self): + pass + + def close(self): + pass + + def reset(self): + """ + closes both ends of the stream + tells this remote side to hang up + :return: error/exception + """ + pass + + def set_deadline(self, ttl): + """ + set deadline for muxed stream + :return: a new stream + """ + pass diff --git a/muxer/muxed_stream_interface.py b/muxer/muxed_stream_interface.py new file mode 100644 index 00000000..0ac3ec9a --- /dev/null +++ b/muxer/muxed_stream_interface.py @@ -0,0 +1,25 @@ +from abc import ABC, abstractmethod +from datetime import time + +class IMuxedStream(ABC): + + # TODO Reader + # TODO Writer + # TODO Closer + + @abstractmethod + def reset(self): + """ + closes both ends of the stream + tells this remote side to hang up + :return: error/exception + """ + pass + + @abstractmethod + def set_deadline(self, ttl): + """ + set deadline for muxed stream + :return: a new stream + """ + pass diff --git a/muxer/smux_multiplex.py b/muxer/smux_multiplex.py index e69de29b..ae7feb97 100644 --- a/muxer/smux_multiplex.py +++ b/muxer/smux_multiplex.py @@ -0,0 +1,44 @@ +from .muxed_stream import MuxedStream +from .muxed_connection import MuxedConn + +class Multiplex(object): + """ + reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go + """ + def __init__(self, conn, initiator): + """ + :param conn: an instance of raw connection + : param initiator: boolean to prevent multiplex with self + """ + self.muxed_conn = MuxedConn(conn, initiator) + + def close(self): + """ + close the stream muxer and underlying raw connection + """ + return self.muxed_conn.close() + + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + return self.muxed_conn.is_closed() + + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + return self.muxed_conn.open_stream() + + def accept_stream(self, _muxed_stream): + """ + accepts a muxed stream opened by the other end + :param _muxed_stream: stream to be accepted + :return: the accepted stream + """ + pass + + # def new_conn(raw_conn, is_server): + # pass diff --git a/network/tcp.py b/network/tcp.py new file mode 100644 index 00000000..6ea962dc --- /dev/null +++ b/network/tcp.py @@ -0,0 +1,83 @@ +import asyncio +from .transport_interface import ITransport +from .listener_interface import IListener + +class TCP(ITransport): + + def __init__(self): + self.listener = self.Listener() + + class Listener(IListener): + + def __init__(self, handler_function=None): + self.multiaddrs = [] + self.server = None + self.handler = staticmethod(handler_function) + + def listen(self, multiaddr): + """ + put listener in listening mode and wait for incoming connections + :param multiaddr: multiaddr of peer + :return: return True if successful + """ + # TODO check for exceptions + _multiaddr = multiaddr + if "ipfs" in multiaddr.get_protocols(): + # ipfs_id = multiaddr.get_ipfs_id() + _multiaddr = multiaddr.remove_protocol("ipfs") + + self.multiaddrs.append(_multiaddr) + _multiaddr_dict = _multiaddr.to_dict() + _loop = asyncio.get_event_loop() + _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ + _multiaddr_dict.port, loop=_loop) + self.server = _loop.run_until_complete(_coroutine) + return True + + def get_addrs(self): + """ + retrieve list of addresses the listener is listening on + :return: return list of addrs + """ + # TODO check if server is listening + return self.multiaddrs + + def close(self, options=None): + """ + close the listener such that no more connections + can be open on this transport instance + :param options: optional object potential with timeout + a timeout value in ms that fires and destroy all connections + :return: return True if successful + """ + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True + + def dial(self, multiaddr, options=None): + """ + dial a transport to peer listening on multiaddr + :param multiaddr: multiaddr of peer + :param options: optional object + :return: True if successful + """ + _multiaddr_dict = multiaddr.to_dict() + reader, writer = await asyncio.open_connection(_multiaddr_dict.host,\ + _multiaddr_dict.port) + return False + # TODO dial behavior not fully understood + + def create_listener(self, handler_function, options=None): + """ + create listener on transport + :param options: optional object with properties the listener must have + :param handler_function: a function called when a new conntion is received + that takes a connection as argument which implements interface-connection + :return: a listener object that implements listener_interface.py + """ + return self.Listener(handler_function) From 8756320e859d891ae915d7f92b30d0158e53f300 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 31 Oct 2018 23:02:00 +0100 Subject: [PATCH 2/2] muxer scaffolding --- muxer/muxed_connection.py | 8 ++++---- muxer/muxed_connection_interface.py | 6 ++++-- muxer/muxed_stream.py | 13 +++++++------ muxer/muxed_stream_interface.py | 2 +- muxer/smux_multiplex.py | 12 ++++++------ muxer/stream_muxer_interface.py | 0 6 files changed, 22 insertions(+), 19 deletions(-) delete mode 100644 muxer/stream_muxer_interface.py diff --git a/muxer/muxed_connection.py b/muxer/muxed_connection.py index 26c80f5c..c0b99263 100644 --- a/muxer/muxed_connection.py +++ b/muxer/muxed_connection.py @@ -1,6 +1,6 @@ -from .muxed_connection_interface import IMuxedConnection +from .muxed_connection_interface import IMuxedConn -class MuxedConnection(IMuxedConnection): +class MuxedConn(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ @@ -26,7 +26,7 @@ class MuxedConnection(IMuxedConnection): """ pass - def open_stream(self): + def open_stream(self, protocol_id, stream_name): """ creates a new muxed_stream :return: a new stream @@ -38,4 +38,4 @@ class MuxedConnection(IMuxedConnection): accepts a muxed stream opened by the other end :return: the accepted stream """ - pass + pass diff --git a/muxer/muxed_connection_interface.py b/muxer/muxed_connection_interface.py index cf125918..df1694a0 100644 --- a/muxer/muxed_connection_interface.py +++ b/muxer/muxed_connection_interface.py @@ -16,9 +16,11 @@ class IMuxedConn(ABC): pass @abstractmethod - def open_stream(self): + def open_stream(self, protocol_id, stream_name): """ creates a new muxed_stream + :param protocol_id: id to be associated with stream + :param stream_name: name as part of identifier :return: a new stream """ pass @@ -29,4 +31,4 @@ class IMuxedConn(ABC): accepts a muxed stream opened by the other end :return: the accepted stream """ - pass + pass diff --git a/muxer/muxed_stream.py b/muxer/muxed_stream.py index be37d58a..92624e5f 100644 --- a/muxer/muxed_stream.py +++ b/muxer/muxed_stream.py @@ -1,11 +1,12 @@ from .muxed_stream_interface import IMuxedStream class MuxedStream(IMuxedStream): - """ - reference: https://github.com/libp2p/go-mplex/blob/master/stream.go - """ - def __init__(self, stream_id, stream_name): - self.id = stream_id + """ + reference: https://github.com/libp2p/go-mplex/blob/master/stream.go + """ + + def __init__(self, protocol_id, stream_name): + self.protocol_id = protocol_id self.name = stream_name def read(self): @@ -15,7 +16,7 @@ class MuxedStream(IMuxedStream): pass def close(self): - pass + pass def reset(self): """ diff --git a/muxer/muxed_stream_interface.py b/muxer/muxed_stream_interface.py index 0ac3ec9a..2c9bc25f 100644 --- a/muxer/muxed_stream_interface.py +++ b/muxer/muxed_stream_interface.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from datetime import time +# from datetime import time class IMuxedStream(ABC): diff --git a/muxer/smux_multiplex.py b/muxer/smux_multiplex.py index ae7feb97..6239ad73 100644 --- a/muxer/smux_multiplex.py +++ b/muxer/smux_multiplex.py @@ -25,19 +25,19 @@ class Multiplex(object): """ return self.muxed_conn.is_closed() - def open_stream(self): + def open_stream(self, protocol_id, stream_name): """ creates a new muxed_stream :return: a new stream """ - return self.muxed_conn.open_stream() + return self.muxed_conn.open_stream(protocol_id, stream_name) def accept_stream(self, _muxed_stream): - """ - accepts a muxed stream opened by the other end - :param _muxed_stream: stream to be accepted + """ + accepts a muxed stream opened by the other end + :param _muxed_stream: stream to be accepted :return: the accepted stream - """ + """ pass # def new_conn(raw_conn, is_server): diff --git a/muxer/stream_muxer_interface.py b/muxer/stream_muxer_interface.py deleted file mode 100644 index e69de29b..00000000