From 356cac02bf2bafa0d72ff642e12e7b20ba372eaf Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sun, 11 Nov 2018 16:04:57 -0500 Subject: [PATCH 1/9] Add mux stream --- network/stream/__init__.py | 0 network/stream/net_stream.py | 42 +++++++++++++++++++++++ network/stream/net_stream_interface.py | 47 ++++++++++++++++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 network/stream/__init__.py create mode 100644 network/stream/net_stream.py create mode 100644 network/stream/net_stream_interface.py diff --git a/network/stream/__init__.py b/network/stream/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/network/stream/net_stream.py b/network/stream/net_stream.py new file mode 100644 index 00000000..0dca44dc --- /dev/null +++ b/network/stream/net_stream.py @@ -0,0 +1,42 @@ +import asyncio +from .net_stream_interface import INetStream + +class NetStream(INetStream): + + def __init__(self, muxed_stream): + self.muxed_stream = muxed_stream + + def get_protocol(self): + """ + :return: protocol id that stream runs on + """ + return self.protocol_id + + def set_protocol(self, protocol_id): + """ + :param protocol_id: protocol id that stream runs on + :return: true if successful + """ + self.protocol_id = protocol_id + + def read(self): + """ + read from stream + :return: bytes of input until EOF + """ + return self.muxed_stream.read() + + def write(self, bytes): + """ + write to stream + :return: number of bytes written + """ + return self.muxed_stream.write(bytes) + + def close(self): + """ + close stream + :return: true if successful + """ + self.muxed_stream.close() + return True diff --git a/network/stream/net_stream_interface.py b/network/stream/net_stream_interface.py new file mode 100644 index 00000000..920e2faf --- /dev/null +++ b/network/stream/net_stream_interface.py @@ -0,0 +1,47 @@ +from abc import ABC, abstractmethod + +class INetStream(ABC): + + def __init__(self, peer_id, multi_addr, connection): + self.peer_id = peer_id + self.multi_addr = multi_addr + self.connection = connection + + @abstractmethod + def get_protocol(self): + """ + :return: protocol id that stream runs on + """ + pass + + @abstractmethod + def set_protocol(self, protocol_id): + """ + :param protocol_id: protocol id that stream runs on + :return: true if successful + """ + pass + + @abstractmethod + def read(self): + """ + read from stream + :return: bytes of input + """ + pass + + @abstractmethod + def write(self, _bytes): + """ + write to stream + :return: number of bytes written + """ + pass + + @abstractmethod + def close(self): + """ + close stream + :return: true if successful + """ + pass From 6ba5793c1d21500efc47bd9682cdde35a018143a Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sun, 11 Nov 2018 16:09:37 -0500 Subject: [PATCH 2/9] Reorganize folders (stream and connection) --- {transport => network}/connection/__init__.py | 0 .../connection/raw_connection.py | 0 .../connection/raw_connection_interface.py | 0 network/swarm.py | 14 ++--- transport/stream/__init__.py | 0 transport/stream/stream.py | 58 ------------------- transport/stream/stream_interface.py | 47 --------------- transport/upgrader.py | 32 +++++----- 8 files changed, 25 insertions(+), 126 deletions(-) rename {transport => network}/connection/__init__.py (100%) rename {transport => network}/connection/raw_connection.py (100%) rename {transport => network}/connection/raw_connection_interface.py (100%) delete mode 100644 transport/stream/__init__.py delete mode 100644 transport/stream/stream.py delete mode 100644 transport/stream/stream_interface.py diff --git a/transport/connection/__init__.py b/network/connection/__init__.py similarity index 100% rename from transport/connection/__init__.py rename to network/connection/__init__.py diff --git a/transport/connection/raw_connection.py b/network/connection/raw_connection.py similarity index 100% rename from transport/connection/raw_connection.py rename to network/connection/raw_connection.py diff --git a/transport/connection/raw_connection_interface.py b/network/connection/raw_connection_interface.py similarity index 100% rename from transport/connection/raw_connection_interface.py rename to network/connection/raw_connection_interface.py diff --git a/network/swarm.py b/network/swarm.py index edd4446a..e1ab9553 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -4,7 +4,7 @@ from transport.connection.raw_connection import RawConnection class Swarm(INetwork): - def __init__(self, my_peer_id, peerstore): + def __init__(self, my_peer_id, peerstore, upgrader): self.my_peer_id = my_peer_id self.peerstore = peerstore self.connections = {} @@ -18,19 +18,17 @@ class Swarm(INetwork): def new_stream(self, peer_id, protocol_id): """ - Determine if a connection to peer_id already exists - If a connection to peer_id exists, then - c = existing connection, - otherwise c = new muxed connection to peer_id - s = c.open_stream(protocol_id) - return s - :param peer_id: peer_id of destination :param protocol_id: protocol id :return: stream instance """ muxed_connection = None if peer_id in self.connections: + """ + If muxed connection already exists for peer_id, + set muxed connection equal to + existing muxed connection + """ muxed_connection = self.connections[peer_id] else: addrs = self.peerstore.addrs(peer_id) diff --git a/transport/stream/__init__.py b/transport/stream/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/transport/stream/stream.py b/transport/stream/stream.py deleted file mode 100644 index 34daf97f..00000000 --- a/transport/stream/stream.py +++ /dev/null @@ -1,58 +0,0 @@ -import asyncio -from .stream_interface import IStream - -class Stream(IStream): - - def __init__(self, peer_id, multi_addr, connection): - IStream.__init__(self, peer_id, multi_addr, connection) - self.peer_id = peer_id - - self.multi_addr = multi_addr - - self.stream_ip = multi_addr.get_protocol_value("ip4") - self.stream_port = multi_addr.get_protocol_value("tcp") - - self.reader = connection.reader - self.writer = connection.writer - - # TODO should construct protocol id from constructor - self.protocol_id = None - - def get_protocol(self): - """ - :return: protocol id that stream runs on - """ - return self.protocol_id - - def set_protocol(self, protocol_id): - """ - :param protocol_id: protocol id that stream runs on - :return: true if successful - """ - self.protocol_id = protocol_id - - def read(self): - """ - read from stream - :return: bytes of input - """ - return self.reader.read(-1) - - def write(self, _bytes): - """ - write to stream - :return: number of bytes written - """ - return self.write_to_stream(_bytes) - - async def write_to_stream(self, _bytes): - to_return = self.writer.write(_bytes) - await self.writer.drain() - return to_return - - def close(self): - """ - close stream - :return: true if successful - """ - self.writer.close() diff --git a/transport/stream/stream_interface.py b/transport/stream/stream_interface.py deleted file mode 100644 index 6d63a2f0..00000000 --- a/transport/stream/stream_interface.py +++ /dev/null @@ -1,47 +0,0 @@ -from abc import ABC, abstractmethod - -class IStream(ABC): - - def __init__(self, peer_id, multi_addr, connection): - self.peer_id = peer_id - self.multi_addr = multi_addr - self.connection = connection - - @abstractmethod - def get_protocol(self): - """ - :return: protocol id that stream runs on - """ - pass - - @abstractmethod - def set_protocol(self, protocol_id): - """ - :param protocol_id: protocol id that stream runs on - :return: true if successful - """ - pass - - @abstractmethod - def read(self): - """ - read from stream - :return: bytes of input - """ - pass - - @abstractmethod - def write(self, _bytes): - """ - write to stream - :return: number of bytes written - """ - pass - - @abstractmethod - def close(self): - """ - close stream - :return: true if successful - """ - pass diff --git a/transport/upgrader.py b/transport/upgrader.py index 64014244..e70d4377 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -1,18 +1,24 @@ class TransportUpgrader(object): - def __init__(self, secOpt, muxerOpt): - self.sec = secOpt - self.muxer = muxerOpt + def __init__(self, secOpt, muxerOpt): + self.sec = secOpt + self.muxer = muxerOpt - def upgrade_listener(self, transport, listeners): - """ - upgrade multiaddr listeners to libp2p-transport listeners + def upgrade_listener(self, transport, listeners): + """ + upgrade multiaddr listeners to libp2p-transport listeners - """ - pass - - def upgrade_security(self): - pass + """ + pass + + def upgrade_security(self): + pass - def upgrade_muxer(self): - pass \ No newline at end of file + def upgrade_connection(self, conn): + """ + upgrade raw connection to muxed connection + """ + # For PoC, no security + # Default to mplex + pass + \ No newline at end of file From 4851137ecefcf54a3b777370f138631ac27f4e4a Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sun, 11 Nov 2018 16:42:10 -0500 Subject: [PATCH 3/9] Implement new stream function --- network/swarm.py | 53 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/network/swarm.py b/network/swarm.py index e1ab9553..e05e3abf 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -1,6 +1,6 @@ +import uuid from .network_interface import INetwork -from muxer.mplex.muxed_connection import MuxedConn -from transport.connection.raw_connection import RawConnection +from .stream.net_stream import NetStream class Swarm(INetwork): @@ -8,6 +8,7 @@ class Swarm(INetwork): self.my_peer_id = my_peer_id self.peerstore = peerstore self.connections = {} + self.upgrader = upgrader def set_stream_handler(self, stream_handler): """ @@ -20,26 +21,41 @@ class Swarm(INetwork): """ :param peer_id: peer_id of destination :param protocol_id: protocol id - :return: stream instance + :return: net stream instance """ - muxed_connection = None + muxed_conn = None if peer_id in self.connections: """ If muxed connection already exists for peer_id, - set muxed connection equal to + set muxed connection equal to existing muxed connection - """ - muxed_connection = self.connections[peer_id] + """ + muxed_conn = self.connections[peer_id] else: + # Get peer info from peer store addrs = self.peerstore.addrs(peer_id) - stream_ip = addrs.get_protocol_value("ip") - stream_port = addrs.get_protocol_value("port") - if len(addrs) > 0: - conn = RawConnection(stream_ip, stream_port) - muxed_connection = MuxedConnection(conn, True) - else: - raise Exception("No IP and port in addr") - return muxed_connection.open_stream(protocol_id, "", peer_id, addrs) + + # Transport dials peer (gets back a raw conn) + if not addrs: + raise SwarmException("No known addresses to peer") + first_addr = addrs[0] + raw_conn = self.transport.dial(first_addr) + + # Use upgrader to upgrade raw conn to muxed conn + muxed_conn = self.upgrader.upgrade_connection(raw_conn) + + # Store muxed connection in connections + self.connections[peer_id] = muxed_conn + + # Use muxed conn to open stream, which returns + # a muxed stream + stream_id = str(uuid.uuid4()) + muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, first_addr) + + # Create a net stream + net_stream = NetStream(muxed_stream) + + return net_stream def listen(self, *args): """ @@ -47,3 +63,10 @@ class Swarm(INetwork): :return: true if at least one success """ pass + + def add_transport(self, transport): + # TODO: Support more than one transport + self.transport = transport + +class SwarmException(Exception): + pass From f6f2f6725d28f1eab4a1e8036c5baf24e598ef16 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sun, 11 Nov 2018 17:10:37 -0500 Subject: [PATCH 4/9] Describe listen stub --- network/swarm.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/network/swarm.py b/network/swarm.py index e05e3abf..6dcdcb44 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -7,8 +7,9 @@ class Swarm(INetwork): def __init__(self, my_peer_id, peerstore, upgrader): self.my_peer_id = my_peer_id self.peerstore = peerstore - self.connections = {} self.upgrader = upgrader + self.connections = dict() + self.listeners = dict() def set_stream_handler(self, stream_handler): """ @@ -62,7 +63,24 @@ class Swarm(INetwork): :param *args: one or many multiaddrs to start listening on :return: true if at least one success """ - pass + + # Create a closure C that takes in a multiaddr and + # returns a function object O that takes in a reader and writer. + # This function O looks up the stream handler + # for the given protocol, creates the net_stream + # for the listener and calls the stream handler function + # passing in the net_stream + + # For each multiaddr in args + # Check if a listener for multiaddr exists already + # If listener already exists, continue + # Otherwise, do the following: + # Pass multiaddr into C and get back function H + # listener = transport.create_listener(H) + # Call listener listen with the multiaddr + # Map multiaddr to listener + + return True def add_transport(self, transport): # TODO: Support more than one transport From 478cd033d5f0796e71cac2662fed55e7f2292f6a Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 11 Nov 2018 17:15:55 -0500 Subject: [PATCH 5/9] handle incoming, definition of send_message --- muxer/mplex/muxed_connection.py | 42 +++++++++++++++++++++++--- muxer/mplex/utils.py | 26 ++++++++++++++++ transport/connection/raw_connection.py | 4 +++ 3 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 muxer/mplex/utils.py diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index b7caa620..c4f36585 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -1,3 +1,4 @@ +import asyncio from .muxed_connection_interface import IMuxedConn from transport.stream.Stream import Stream @@ -13,12 +14,16 @@ class MuxedConn(IMuxedConn): """ self.raw_conn = conn self.initiator = initiator + self.buffers = {} + self.streams = {} + + self.add_incoming_task() def close(self): """ close the stream muxer and underlying raw connection """ - pass + self.raw_conn.close() def is_closed(self): """ @@ -27,13 +32,15 @@ class MuxedConn(IMuxedConn): """ pass - def open_stream(self, protocol_id, stream_name, peer_id, multi_addr): + def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): """ creates a new muxed_stream :return: a new stream """ - - return Stream(peer_id, multi_addr, self) + stream = Stream(peer_id, multi_addr, self) + self.streams[stream_id] = stream + self.buffers[stream_id] = bytearray() + return stream def accept_stream(self): @@ -42,3 +49,30 @@ class MuxedConn(IMuxedConn): :return: the accepted stream """ pass + + def send_message(self, header, data): + """ + sends a message over the connection + :param header: header to use + :param data: data to send in the message + :return: True if success + """ + pass + + async def handle_incoming(self): + data = bytearray() + while True: + chunk = self.raw_conn.reader.read(100) + if not chunk: + break + data += chunk + + # Read header + # Read message length + # Read message into corresponding buffer + + + def add_incoming_task(self): + loop = asyncio.get_event_loop() + handle_incoming_task = loop.create_task(self.handle_incoming()) + handle_incoming_task.add_done_callback(self.add_incoming_task) diff --git a/muxer/mplex/utils.py b/muxer/mplex/utils.py new file mode 100644 index 00000000..c4696f85 --- /dev/null +++ b/muxer/mplex/utils.py @@ -0,0 +1,26 @@ +def encode_uvarint(number): + """Pack `number` into varint bytes""" + buf = b'' + while True: + towrite = number & 0x7f + number >>= 7 + if number: + buf += bytes((towrite | 0x80, )) + else: + buf += bytes((towrite, )) + break + return buf + +def decode_uvarint(buff): + shift = 0 + result = 0 + index = 0 + while True: + i = buff[index] + result |= (i & 0x7f) << shift + shift += 7 + if not i & 0x80: + break + index += 1 + + return result, index diff --git a/transport/connection/raw_connection.py b/transport/connection/raw_connection.py index efc3706a..4947b1c2 100644 --- a/transport/connection/raw_connection.py +++ b/transport/connection/raw_connection.py @@ -9,6 +9,10 @@ class RawConnection(IRawConnection): self.reader = reader self.writer = writer + + def close(self): + self.writer.close() + # def __init__(self, ip, port): # self.conn_ip = ip # self.conn_port = port From 162f54af0cb963e9bbccd400cf83ee1add232481 Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 11 Nov 2018 17:38:11 -0500 Subject: [PATCH 6/9] send message done --- muxer/mplex/muxed_connection.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index c4f36585..6c77c9bc 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -1,4 +1,5 @@ import asyncio +from .utils import encode_uvarint, decode_uvarint from .muxed_connection_interface import IMuxedConn from transport.stream.Stream import Stream @@ -50,14 +51,25 @@ class MuxedConn(IMuxedConn): """ pass - def send_message(self, header, data): + def send_message(self, flag, data, stream_id): """ sends a message over the connection :param header: header to use :param data: data to send in the message + :param stream_id: stream the message is in :return: True if success """ - pass + # << by 3, then or with flag + header = (stream_id << 3) | flag + header = encode_uvarint(header) + data_length = encode_uvarint(len(data)) + _bytes = header + data_length + data + return self.write_to_stream(_bytes) + + async def write_to_stream(self, _bytes): + to_return = self.raw_conn.writer.write(_bytes) + await self.raw_conn.writer.drain() + return to_return async def handle_incoming(self): data = bytearray() From e44e31e55b752bb48f3c97850e31d818433f7580 Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 11 Nov 2018 17:48:31 -0500 Subject: [PATCH 7/9] read buffer and write_to_stream return --- muxer/mplex/muxed_connection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index 6c77c9bc..98b4d54c 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -33,6 +33,11 @@ class MuxedConn(IMuxedConn): """ pass + def read_buffer(self, stream_id): + data = self.buffers[stream_id] + self.buffers[stream_id] = bytearray() + return data + def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): """ creates a new muxed_stream @@ -67,9 +72,9 @@ class MuxedConn(IMuxedConn): return self.write_to_stream(_bytes) async def write_to_stream(self, _bytes): - to_return = self.raw_conn.writer.write(_bytes) + self.raw_conn.writer.write(_bytes) await self.raw_conn.writer.drain() - return to_return + return len(_bytes) async def handle_incoming(self): data = bytearray() From cacbc6c11ac47c29d32259aa38732b75390de0b5 Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 11 Nov 2018 17:55:50 -0500 Subject: [PATCH 8/9] finished handle_incoming --- muxer/mplex/muxed_connection.py | 8 ++++++++ muxer/mplex/utils.py | 3 +-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index 98b4d54c..e6d5819a 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -83,7 +83,15 @@ class MuxedConn(IMuxedConn): if not chunk: break data += chunk + header, end_index = decode_uvarint(data, 0) + length, end_index = decode_uvarint(data, end_index) + message = data[end_index, end_index + length] + # Deal with other types of messages + flag = header & 0x07 + stream_id = header >> 3 + + self.buffers[stream_id] = self.buffers[stream_id] + message # Read header # Read message length # Read message into corresponding buffer diff --git a/muxer/mplex/utils.py b/muxer/mplex/utils.py index c4696f85..4e202416 100644 --- a/muxer/mplex/utils.py +++ b/muxer/mplex/utils.py @@ -11,10 +11,9 @@ def encode_uvarint(number): break return buf -def decode_uvarint(buff): +def decode_uvarint(buff, index): shift = 0 result = 0 - index = 0 while True: i = buff[index] result |= (i & 0x7f) << shift From ffc3fb059fbd4080b727aff31cc1dd4d402f9397 Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 11 Nov 2018 18:01:08 -0500 Subject: [PATCH 9/9] updated open stream --- muxer/mplex/muxed_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index e6d5819a..fdae239c 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -1,7 +1,7 @@ import asyncio from .utils import encode_uvarint, decode_uvarint from .muxed_connection_interface import IMuxedConn -from transport.stream.Stream import Stream +from .muxed_stream import MuxedStream class MuxedConn(IMuxedConn): """ @@ -43,7 +43,7 @@ class MuxedConn(IMuxedConn): creates a new muxed_stream :return: a new stream """ - stream = Stream(peer_id, multi_addr, self) + stream = MuxedStream(peer_id, multi_addr, self) self.streams[stream_id] = stream self.buffers[stream_id] = bytearray() return stream