diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 07b5e144..52df7273 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -156,15 +156,10 @@ class Swarm(INetwork): if not addrs: raise SwarmException("No known addresses to peer") - multiaddr = addrs[0] - muxed_conn = await self.dial_peer(peer_id) - # Use muxed conn to open stream, which returns - # a muxed stream - # TODO: Remove protocol id from being passed into muxed_conn - # FIXME: Remove multiaddr from being passed into muxed_conn - muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) + # Use muxed conn to open stream, which returns a muxed stream + muxed_stream = await muxed_conn.open_stream() # Perform protocol muxing to determine protocol to use selected_protocol = await self.multiselect_client.select_one_of( diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 245a739e..f16b8052 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -1,8 +1,6 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Optional -from multiaddr import Multiaddr - from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.mplex.constants import HeaderTags @@ -66,20 +64,15 @@ class IMuxedConn(ABC): Read a message from `stream_id`'s buffer, non-blockingly. """ - # FIXME: Remove multiaddr from being passed into muxed_conn @abstractmethod - async def open_stream( - self, protocol_id: str, multi_addr: Multiaddr - ) -> "IMuxedStream": + async def open_stream(self) -> "IMuxedStream": """ creates a new muxed_stream - :param protocol_id: protocol_id of stream - :param multi_addr: multi_addr that stream connects to - :return: a new stream + :return: a new ``IMuxedStream`` stream """ @abstractmethod - async def accept_stream(self) -> None: + async def accept_stream(self, name: str) -> None: """ accepts a muxed stream opened by the other end """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 8c784313..4a0ba41d 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -1,8 +1,6 @@ import asyncio from typing import Dict, Optional, Tuple -from multiaddr import Multiaddr - from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn @@ -31,6 +29,7 @@ class Mplex(IMuxedConn): stream_queue: "asyncio.Queue[int]" next_stream_id: int + # TODO: `generic_protocol_handler` should be refactored out of mplex conn. def __init__( self, secured_conn: ISecureConn, @@ -114,28 +113,25 @@ class Mplex(IMuxedConn): self.next_stream_id += 2 return next_id - # FIXME: Remove multiaddr from being passed into muxed_conn - async def open_stream( - self, protocol_id: str, multi_addr: Multiaddr - ) -> IMuxedStream: + async def open_stream(self) -> IMuxedStream: """ creates a new muxed_stream - :param protocol_id: protocol_id of stream - :param multi_addr: multi_addr that stream connects to - :return: a new muxed stream + :return: a new ``MplexStream`` """ stream_id = self._get_next_stream_id() - stream = MplexStream(stream_id, True, self) + name = str(stream_id).encode() + stream = MplexStream(name, stream_id, True, self) self.buffers[stream_id] = asyncio.Queue() - await self.send_message(HeaderTags.NewStream, None, stream_id) + # Default stream name is the `stream_id` + await self.send_message(HeaderTags.NewStream, name, stream_id) return stream - async def accept_stream(self) -> None: + async def accept_stream(self, name: str) -> None: """ accepts a muxed stream opened by the other end """ stream_id = await self.stream_queue.get() - stream = MplexStream(stream_id, False, self) + stream = MplexStream(name, stream_id, False, self) asyncio.ensure_future(self.generic_protocol_handler(stream)) async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int: @@ -181,11 +177,14 @@ class Mplex(IMuxedConn): self.buffers[stream_id] = asyncio.Queue() await self.stream_queue.put(stream_id) + # TODO: Handle more tags, and refactor `HeaderTags` if flag == HeaderTags.NewStream.value: # new stream detected on connection - await self.accept_stream() - - if message: + await self.accept_stream(message) + elif flag in ( + HeaderTags.MessageInitiator.value, + HeaderTags.MessageReceiver.value, + ): await self.buffers[stream_id].put(message) # Force context switch diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 2ec23f1a..e90a1d40 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -10,6 +10,7 @@ class MplexStream(IMuxedStream): reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ + name: str stream_id: int initiator: bool mplex_conn: IMuxedConn @@ -21,13 +22,16 @@ class MplexStream(IMuxedStream): _buf: bytearray - def __init__(self, stream_id: int, initiator: bool, mplex_conn: IMuxedConn) -> None: + def __init__( + self, name: str, stream_id: int, initiator: bool, mplex_conn: IMuxedConn + ) -> None: """ create new MuxedStream in muxer :param stream_id: stream stream id :param initiator: boolean if this is an initiator :param mplex_conn: muxed connection of this muxed_stream """ + self.name = name self.stream_id = stream_id self.initiator = initiator self.mplex_conn = mplex_conn