From 592ef69d8d59df6eaf3489ed45f9a74cca192d55 Mon Sep 17 00:00:00 2001 From: Robert Zajac Date: Sun, 25 Nov 2018 11:05:56 -0500 Subject: [PATCH 1/2] Improved stream IDs --- network/swarm.py | 4 +--- stream_muxer/mplex/mplex.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/network/swarm.py b/network/swarm.py index 41ef9bc6..dc1f67e7 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -59,9 +59,7 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream - # TODO: use better stream IDs - stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3 - muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr) + muxed_stream = await muxed_conn.open_stream(protocol_id, peer_id, multiaddr) # Create a net stream net_stream = NetStream(muxed_stream) diff --git a/stream_muxer/mplex/mplex.py b/stream_muxer/mplex/mplex.py index 516dce2d..2d677d57 100644 --- a/stream_muxer/mplex/mplex.py +++ b/stream_muxer/mplex/mplex.py @@ -16,16 +16,24 @@ class Mplex(IMuxedConn): """ self.raw_conn = conn self.initiator = initiator + + # Mapping from stream ID -> buffer of messages for that stream self.buffers = {} - self.streams = {} + self.stream_queue = asyncio.Queue() self.conn_lock = asyncio.Lock() + self._next_id = 0 # The initiator need not read upon construction time. # It should read when the user decides that it wants to read from the constructed stream. if not initiator: asyncio.ensure_future(self.handle_incoming()) + def _next_stream_id(self): + next_id = self._next_id + self._next_id += 1 + return next_id + def close(self): """ close the stream muxer and underlying raw connection @@ -49,7 +57,7 @@ class Mplex(IMuxedConn): self.buffers[stream_id] = bytearray() return data - def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): + async def open_stream(self, protocol_id, peer_id, multi_addr): """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -58,8 +66,9 @@ class Mplex(IMuxedConn): :param multi_addr: multi_addr that stream connects to :return: a new stream """ + stream_id = self._next_stream_id() stream = MplexStream(stream_id, multi_addr, self) - self.streams[stream_id] = stream + self.buffers[stream_id] = bytearray() return stream async def accept_stream(self): From 6c99d854dd932cf520f8ce88b8a53eadcac6594c Mon Sep 17 00:00:00 2001 From: Robert Zajac Date: Sun, 25 Nov 2018 15:27:54 -0500 Subject: [PATCH 2/2] fixes for muxed_conn interface and usage of self.buffers over self.stream --- stream_muxer/mplex/mplex_stream.py | 4 ++-- stream_muxer/muxed_connection_interface.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/stream_muxer/mplex/mplex_stream.py b/stream_muxer/mplex/mplex_stream.py index 417ef647..0a505d26 100644 --- a/stream_muxer/mplex/mplex_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -68,7 +68,7 @@ class MplexStream(IMuxedStream): if remote_lock: async with self.mplex_conn.conn_lock: - self.mplex_conn.streams.pop(self.stream_id) + self.mplex_conn.buffers.pop(self.stream_id) return True @@ -91,7 +91,7 @@ class MplexStream(IMuxedStream): self.remote_closed = True async with self.mplex_conn.conn_lock: - self.mplex_conn.streams.pop(self.stream_id, None) + self.mplex_conn.buffers.pop(self.stream_id, None) return True diff --git a/stream_muxer/muxed_connection_interface.py b/stream_muxer/muxed_connection_interface.py index 7a22970f..489615a2 100644 --- a/stream_muxer/muxed_connection_interface.py +++ b/stream_muxer/muxed_connection_interface.py @@ -23,7 +23,7 @@ class IMuxedConn(ABC): pass @abstractmethod - def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): + def open_stream(self, protocol_id, peer_id, multi_addr): """ creates a new muxed_stream :param protocol_id: protocol_id of stream