diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 7aefe3f9..8c784313 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -44,9 +44,9 @@ class Mplex(IMuxedConn): for new muxed streams :param peer_id: peer_id of peer the connection is to """ - self.conn = secured_conn + self.secured_conn = secured_conn - if self.conn.initiator: + if self.secured_conn.initiator: self.next_stream_id = 0 else: self.next_stream_id = 1 @@ -67,13 +67,13 @@ class Mplex(IMuxedConn): @property def initiator(self) -> bool: - return self.conn.initiator + return self.secured_conn.initiator async def close(self) -> None: """ - close the stream muxer and underlying raw connection + close the stream muxer and underlying secured connection """ - await self.conn.close() + await self.secured_conn.close() def is_closed(self) -> bool: """ @@ -84,7 +84,8 @@ class Mplex(IMuxedConn): async def read_buffer(self, stream_id: int) -> bytes: """ - Read a message from stream_id's buffer, check raw connection for new messages. + Read a message from buffer of the stream specified by `stream_id`, + check secured connection for new messages. `StreamNotFound` is raised when stream `stream_id` is not found in `Mplex`. :param stream_id: stream id of stream to read from :return: message read @@ -95,7 +96,7 @@ class Mplex(IMuxedConn): async def read_buffer_nonblocking(self, stream_id: int) -> Optional[bytes]: """ - Read a message from `stream_id`'s buffer, non-blockingly. + Read a message from buffer of the stream specified by `stream_id`, non-blockingly. `StreamNotFound` is raised when stream `stream_id` is not found in `Mplex`. """ if stream_id not in self.buffers: @@ -121,7 +122,7 @@ class Mplex(IMuxedConn): creates a new muxed_stream :param protocol_id: protocol_id of stream :param multi_addr: multi_addr that stream connects to - :return: a new stream + :return: a new muxed stream """ stream_id = self._get_next_stream_id() stream = MplexStream(stream_id, True, self) @@ -159,16 +160,16 @@ class Mplex(IMuxedConn): async def write_to_stream(self, _bytes: bytearray) -> int: """ - writes a byte array to a raw connection + writes a byte array to a secured connection :param _bytes: byte array to write :return: length written """ - await self.conn.write(_bytes) + await self.secured_conn.write(_bytes) return len(_bytes) async def handle_incoming(self) -> None: """ - Read a message off of the raw connection and add it to the corresponding message buffer + Read a message off of the secured connection and add it to the corresponding message buffer """ # TODO Deal with other types of messages using flag (currently _) @@ -192,7 +193,7 @@ class Mplex(IMuxedConn): async def read_message(self) -> Tuple[int, int, bytes]: """ - Read a single message off of the raw connection + Read a single message off of the secured connection :return: stream_id, flag, message contents """ @@ -201,9 +202,11 @@ class Mplex(IMuxedConn): # loop in handle_incoming timeout = 0.1 try: - header = await decode_uvarint_from_stream(self.conn, timeout) - length = await decode_uvarint_from_stream(self.conn, timeout) - message = await asyncio.wait_for(self.conn.read(length), timeout=timeout) + header = await decode_uvarint_from_stream(self.secured_conn, timeout) + length = await decode_uvarint_from_stream(self.secured_conn, timeout) + message = await asyncio.wait_for( + self.secured_conn.read(length), timeout=timeout + ) except asyncio.TimeoutError: return None, None, None diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index d3a5b4ec..1d87e7b6 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -53,8 +53,8 @@ async def perform_simple_test( node2_conn = node2.get_network().connections[peer_id_for_node(node1)] # Perform assertion - assertion_func(node1_conn.conn) - assertion_func(node2_conn.conn) + assertion_func(node1_conn.secured_conn) + assertion_func(node2_conn.secured_conn) # Success, terminate pending tasks. await cleanup()