diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index b7caa620..c4f36585 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -1,3 +1,4 @@ +import asyncio from .muxed_connection_interface import IMuxedConn from transport.stream.Stream import Stream @@ -13,12 +14,16 @@ class MuxedConn(IMuxedConn): """ self.raw_conn = conn self.initiator = initiator + self.buffers = {} + self.streams = {} + + self.add_incoming_task() def close(self): """ close the stream muxer and underlying raw connection """ - pass + self.raw_conn.close() def is_closed(self): """ @@ -27,13 +32,15 @@ class MuxedConn(IMuxedConn): """ pass - def open_stream(self, protocol_id, stream_name, peer_id, multi_addr): + def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): """ creates a new muxed_stream :return: a new stream """ - - return Stream(peer_id, multi_addr, self) + stream = Stream(peer_id, multi_addr, self) + self.streams[stream_id] = stream + self.buffers[stream_id] = bytearray() + return stream def accept_stream(self): @@ -42,3 +49,30 @@ class MuxedConn(IMuxedConn): :return: the accepted stream """ pass + + def send_message(self, header, data): + """ + sends a message over the connection + :param header: header to use + :param data: data to send in the message + :return: True if success + """ + pass + + async def handle_incoming(self): + data = bytearray() + while True: + chunk = self.raw_conn.reader.read(100) + if not chunk: + break + data += chunk + + # Read header + # Read message length + # Read message into corresponding buffer + + + def add_incoming_task(self): + loop = asyncio.get_event_loop() + handle_incoming_task = loop.create_task(self.handle_incoming()) + handle_incoming_task.add_done_callback(self.add_incoming_task) diff --git a/muxer/mplex/utils.py b/muxer/mplex/utils.py new file mode 100644 index 00000000..c4696f85 --- /dev/null +++ b/muxer/mplex/utils.py @@ -0,0 +1,26 @@ +def encode_uvarint(number): + """Pack `number` into varint bytes""" + buf = b'' + while True: + towrite = number & 0x7f + number >>= 7 + if number: + buf += bytes((towrite | 0x80, )) + else: + buf += bytes((towrite, )) + break + return buf + +def decode_uvarint(buff): + shift = 0 + result = 0 + index = 0 + while True: + i = buff[index] + result |= (i & 0x7f) << shift + shift += 7 + if not i & 0x80: + break + index += 1 + + return result, index diff --git a/transport/connection/raw_connection.py b/transport/connection/raw_connection.py index efc3706a..4947b1c2 100644 --- a/transport/connection/raw_connection.py +++ b/transport/connection/raw_connection.py @@ -9,6 +9,10 @@ class RawConnection(IRawConnection): self.reader = reader self.writer = writer + + def close(self): + self.writer.close() + # def __init__(self, ip, port): # self.conn_ip = ip # self.conn_port = port