From fe4354d377ed2dfa19d1746b024d379d108ecd5f Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 7 Jan 2020 14:14:34 +0800 Subject: [PATCH] Fix `tests_interop` - Remove pexpect - Use new version of `p2pclient`, which makes use of anyio - Clean up tests --- libp2p/tools/constants.py | 2 +- libp2p/tools/interop/constants.py | 1 - libp2p/tools/interop/daemon.py | 147 ++++-------------- libp2p/tools/interop/process.py | 66 ++++++++ libp2p/tools/interop/utils.py | 4 +- tests_interop/conftest.py | 216 +++++++++----------------- tests_interop/test_bindings.py | 42 +++--- tests_interop/test_echo.py | 148 ++++++++++-------- tests_interop/test_net_stream.py | 23 ++- tests_interop/test_pubsub.py | 241 +++++++++++++++--------------- tox.ini | 2 +- 11 files changed, 415 insertions(+), 477 deletions(-) create mode 100644 libp2p/tools/interop/process.py diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 8c22d151..b1ad2652 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -7,7 +7,7 @@ from libp2p.pubsub import floodsub, gossipsub # Just a arbitrary large number. # It is used when calling `MplexStream.read(MAX_READ_LEN)`, # to avoid `MplexStream.read()`, which blocking reads until EOF. -MAX_READ_LEN = 2 ** 32 - 1 +MAX_READ_LEN = 65535 LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") diff --git a/libp2p/tools/interop/constants.py b/libp2p/tools/interop/constants.py index 331e2843..8f039f1c 100644 --- a/libp2p/tools/interop/constants.py +++ b/libp2p/tools/interop/constants.py @@ -1,2 +1 @@ LOCALHOST_IP = "127.0.0.1" -PEXPECT_NEW_LINE = "\r\n" diff --git a/libp2p/tools/interop/daemon.py b/libp2p/tools/interop/daemon.py index 43cfd6db..f6e363e0 100644 --- a/libp2p/tools/interop/daemon.py +++ b/libp2p/tools/interop/daemon.py @@ -1,52 +1,22 @@ -import asyncio -import time -from typing import Any, Awaitable, Callable, List +from typing import AsyncIterator +from async_generator import asynccontextmanager import multiaddr from multiaddr import Multiaddr from p2pclient import Client -import pytest +import trio from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from .constants import LOCALHOST_IP from .envs import GO_BIN_PATH +from .process import BaseInteractiveProcess P2PD_PATH = GO_BIN_PATH / "p2pd" -TIMEOUT_DURATION = 30 - - -async def try_until_success( - coro_func: Callable[[], Awaitable[Any]], timeout: int = TIMEOUT_DURATION -) -> None: - """ - Keep running ``coro_func`` until either it succeed or time is up. - - All arguments of ``coro_func`` should be filled, i.e. it should be - called without arguments. - """ - t_start = time.monotonic() - while True: - result = await coro_func() - if result: - break - if (time.monotonic() - t_start) >= timeout: - # timeout - pytest.fail(f"{coro_func} is still failing after `{timeout}` seconds") - await asyncio.sleep(0.01) - - -class P2PDProcess: - proc: asyncio.subprocess.Process - cmd: str = str(P2PD_PATH) - args: List[Any] - is_proc_running: bool - - _tasks: List["asyncio.Future[Any]"] - +class P2PDProcess(BaseInteractiveProcess): def __init__( self, control_maddr: Multiaddr, @@ -75,74 +45,21 @@ class P2PDProcess: # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501 # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 + self.proc = None + self.cmd = str(P2PD_PATH) self.args = args - self.is_proc_running = False - - self._tasks = [] - - async def wait_until_ready(self) -> None: - lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") - lines_head_occurred = {line: False for line in lines_head_pattern} - - async def read_from_daemon_and_check() -> bool: - line = await self.proc.stdout.readline() - for head_pattern in lines_head_occurred: - if line.startswith(head_pattern): - lines_head_occurred[head_pattern] = True - return all([value for value in lines_head_occurred.values()]) - - await try_until_success(read_from_daemon_and_check) - # Sleep a little bit to ensure the listener is up after logs are emitted. - await asyncio.sleep(0.01) - - async def start_printing_logs(self) -> None: - async def _print_from_stream( - src_name: str, reader: asyncio.StreamReader - ) -> None: - while True: - line = await reader.readline() - if line != b"": - print(f"{src_name}\t: {line.rstrip().decode()}") - await asyncio.sleep(0.01) - - self._tasks.append( - asyncio.ensure_future(_print_from_stream("out", self.proc.stdout)) - ) - self._tasks.append( - asyncio.ensure_future(_print_from_stream("err", self.proc.stderr)) - ) - await asyncio.sleep(0) - - async def start(self) -> None: - if self.is_proc_running: - return - self.proc = await asyncio.subprocess.create_subprocess_exec( - self.cmd, - *self.args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - bufsize=0, - ) - self.is_proc_running = True - await self.wait_until_ready() - await self.start_printing_logs() - - async def close(self) -> None: - if self.is_proc_running: - self.proc.terminate() - await self.proc.wait() - self.is_proc_running = False - for task in self._tasks: - task.cancel() + self.patterns = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") + self.bytes_read = bytearray() + self.event_ready = trio.Event() class Daemon: - p2pd_proc: P2PDProcess + p2pd_proc: BaseInteractiveProcess control: Client peer_info: PeerInfo def __init__( - self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo + self, p2pd_proc: BaseInteractiveProcess, control: Client, peer_info: PeerInfo ) -> None: self.p2pd_proc = p2pd_proc self.control = control @@ -164,6 +81,7 @@ class Daemon: await self.control.close() +@asynccontextmanager async def make_p2pd( daemon_control_port: int, client_callback_port: int, @@ -172,7 +90,7 @@ async def make_p2pd( is_gossipsub: bool = True, is_pubsub_signing: bool = False, is_pubsub_signing_strict: bool = False, -) -> Daemon: +) -> AsyncIterator[Daemon]: control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}") p2pd_proc = P2PDProcess( control_maddr, @@ -185,21 +103,22 @@ async def make_p2pd( await p2pd_proc.start() client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}") p2pc = Client(control_maddr, client_callback_maddr) - await p2pc.listen() - peer_id, maddrs = await p2pc.identify() - listen_maddr: Multiaddr = None - for maddr in maddrs: - try: - ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) - # NOTE: Check if this `maddr` uses `tcp`. - maddr.value_for_protocol(multiaddr.protocols.P_TCP) - except multiaddr.exceptions.ProtocolLookupError: - continue - if ip == LOCALHOST_IP: - listen_maddr = maddr - break - assert listen_maddr is not None, "no loopback maddr is found" - peer_info = info_from_p2p_addr( - listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) - ) - return Daemon(p2pd_proc, p2pc, peer_info) + + async with p2pc.listen(): + peer_id, maddrs = await p2pc.identify() + listen_maddr: Multiaddr = None + for maddr in maddrs: + try: + ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) + # NOTE: Check if this `maddr` uses `tcp`. + maddr.value_for_protocol(multiaddr.protocols.P_TCP) + except multiaddr.exceptions.ProtocolLookupError: + continue + if ip == LOCALHOST_IP: + listen_maddr = maddr + break + assert listen_maddr is not None, "no loopback maddr is found" + peer_info = info_from_p2p_addr( + listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ) + yield Daemon(p2pd_proc, p2pc, peer_info) diff --git a/libp2p/tools/interop/process.py b/libp2p/tools/interop/process.py new file mode 100644 index 00000000..0c17e51b --- /dev/null +++ b/libp2p/tools/interop/process.py @@ -0,0 +1,66 @@ +from abc import ABC, abstractmethod +import subprocess +from typing import Iterable, List + +import trio + +TIMEOUT_DURATION = 30 + + +class AbstractInterativeProcess(ABC): + @abstractmethod + async def start(self) -> None: + ... + + @abstractmethod + async def close(self) -> None: + ... + + +class BaseInteractiveProcess(AbstractInterativeProcess): + proc: trio.Process = None + cmd: str + args: List[str] + bytes_read: bytearray + patterns: Iterable[bytes] = None + event_ready: trio.Event + + async def wait_until_ready(self) -> None: + patterns_occurred = {pat: False for pat in self.patterns} + + async def read_from_daemon_and_check() -> None: + async for data in self.proc.stdout: + # TODO: It takes O(n^2), which is quite bad. + # But it should succeed in a few seconds. + self.bytes_read.extend(data) + for pat, occurred in patterns_occurred.items(): + if occurred: + continue + if pat in self.bytes_read: + patterns_occurred[pat] = True + if all([value for value in patterns_occurred.values()]): + return + + with trio.fail_after(TIMEOUT_DURATION): + await read_from_daemon_and_check() + self.event_ready.set() + # Sleep a little bit to ensure the listener is up after logs are emitted. + await trio.sleep(0.01) + + async def start(self) -> None: + if self.proc is not None: + return + # NOTE: Ignore type checks here since mypy complains about bufsize=0 + self.proc = await trio.open_process( # type: ignore + [self.cmd] + self.args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Redirect stderr to stdout, which makes parsing easier + bufsize=0, + ) + await self.wait_until_ready() + + async def close(self) -> None: + if self.proc is None: + return + self.proc.terminate() + await self.proc.wait() diff --git a/libp2p/tools/interop/utils.py b/libp2p/tools/interop/utils.py index c9174179..ce05c8fb 100644 --- a/libp2p/tools/interop/utils.py +++ b/libp2p/tools/interop/utils.py @@ -1,7 +1,7 @@ -import asyncio from typing import Union from multiaddr import Multiaddr +import trio from libp2p.host.host_interface import IHost from libp2p.peer.id import ID @@ -50,7 +50,7 @@ async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: else: # isinstance(b, IHost) await a.connect(b_peer_info) # Allow additional sleep for both side to establish the connection. - await asyncio.sleep(0.1) + await trio.sleep(0.1) a_peer_info = _get_peer_info(a) diff --git a/tests_interop/conftest.py b/tests_interop/conftest.py index 08df614c..bad0054e 100644 --- a/tests_interop/conftest.py +++ b/tests_interop/conftest.py @@ -1,20 +1,13 @@ -import asyncio -import sys -from typing import Union - +import anyio +from async_exit_stack import AsyncExitStack from p2pclient.datastructures import StreamInfo -import pexpect +from p2pclient.utils import get_unused_tcp_port import pytest +import trio from libp2p.io.abc import ReadWriteCloser -from libp2p.tools.constants import GOSSIPSUB_PARAMS, LISTEN_MADDR -from libp2p.tools.factories import ( - FloodsubFactory, - GossipsubFactory, - HostFactory, - PubsubFactory, -) -from libp2p.tools.interop.daemon import Daemon, make_p2pd +from libp2p.tools.factories import HostFactory, PubsubFactory +from libp2p.tools.interop.daemon import make_p2pd from libp2p.tools.interop.utils import connect @@ -23,48 +16,6 @@ def is_host_secure(): return False -@pytest.fixture -def num_hosts(): - return 3 - - -@pytest.fixture -async def hosts(num_hosts, is_host_secure): - _hosts = HostFactory.create_batch(num_hosts, is_secure=is_host_secure) - await asyncio.gather( - *[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts] - ) - try: - yield _hosts - finally: - # TODO: It's possible that `close` raises exceptions currently, - # due to the connection reset things. Though we don't care much about that when - # cleaning up the tasks, it is probably better to handle the exceptions properly. - await asyncio.gather( - *[_host.close() for _host in _hosts], return_exceptions=True - ) - - -@pytest.fixture -def proc_factory(): - procs = [] - - def call_proc(cmd, args, logfile=None, encoding=None): - if logfile is None: - logfile = sys.stdout - if encoding is None: - encoding = "utf-8" - proc = pexpect.spawn(cmd, args, logfile=logfile, encoding=encoding) - procs.append(proc) - return proc - - try: - yield call_proc - finally: - for proc in procs: - proc.close() - - @pytest.fixture def num_p2pds(): return 1 @@ -87,79 +38,60 @@ def is_pubsub_signing_strict(): @pytest.fixture async def p2pds( - num_p2pds, - is_host_secure, - is_gossipsub, - unused_tcp_port_factory, - is_pubsub_signing, - is_pubsub_signing_strict, + num_p2pds, is_host_secure, is_gossipsub, is_pubsub_signing, is_pubsub_signing_strict ): - p2pds: Union[Daemon, Exception] = await asyncio.gather( - *[ - make_p2pd( - unused_tcp_port_factory(), - unused_tcp_port_factory(), - is_host_secure, - is_gossipsub=is_gossipsub, - is_pubsub_signing=is_pubsub_signing, - is_pubsub_signing_strict=is_pubsub_signing_strict, + async with AsyncExitStack() as stack: + p2pds = [ + await stack.enter_async_context( + make_p2pd( + get_unused_tcp_port(), + get_unused_tcp_port(), + is_host_secure, + is_gossipsub=is_gossipsub, + is_pubsub_signing=is_pubsub_signing, + is_pubsub_signing_strict=is_pubsub_signing_strict, + ) ) for _ in range(num_p2pds) - ], - return_exceptions=True, - ) - p2pds_succeeded = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Daemon)) - if len(p2pds_succeeded) != len(p2pds): - # Not all succeeded. Close the succeeded ones and print the failed ones(exceptions). - await asyncio.gather(*[p2pd.close() for p2pd in p2pds_succeeded]) - exceptions = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Exception)) - raise Exception(f"not all p2pds succeed: first exception={exceptions[0]}") - try: - yield p2pds - finally: - await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) + ] + try: + yield p2pds + finally: + for p2pd in p2pds: + await p2pd.close() @pytest.fixture -def pubsubs(num_hosts, hosts, is_gossipsub, is_pubsub_signing_strict): +async def pubsubs(num_hosts, is_host_secure, is_gossipsub, is_pubsub_signing_strict): if is_gossipsub: - routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) + yield PubsubFactory.create_batch_with_gossipsub( + num_hosts, is_secure=is_host_secure, strict_signing=is_pubsub_signing_strict + ) else: - routers = FloodsubFactory.create_batch(num_hosts) - _pubsubs = tuple( - PubsubFactory(host=host, router=router, strict_signing=is_pubsub_signing_strict) - for host, router in zip(hosts, routers) - ) - yield _pubsubs - # TODO: Clean up + yield PubsubFactory.create_batch_with_floodsub( + num_hosts, is_host_secure, strict_signing=is_pubsub_signing_strict + ) class DaemonStream(ReadWriteCloser): stream_info: StreamInfo - reader: asyncio.StreamReader - writer: asyncio.StreamWriter + stream: anyio.abc.SocketStream - def __init__( - self, - stream_info: StreamInfo, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - ) -> None: + def __init__(self, stream_info: StreamInfo, stream: anyio.abc.SocketStream) -> None: self.stream_info = stream_info - self.reader = reader - self.writer = writer + self.stream = stream async def close(self) -> None: - self.writer.close() - if sys.version_info < (3, 7): - return - await self.writer.wait_closed() + await self.stream.close() async def read(self, n: int = -1) -> bytes: - return await self.reader.read(n) + if n == -1: + return await self.stream.receive_some() + else: + return await self.stream.receive_some(n) - async def write(self, data: bytes) -> int: - return self.writer.write(data) + async def write(self, data: bytes) -> None: + return await self.stream.send_all(data) @pytest.fixture @@ -168,40 +100,38 @@ async def is_to_fail_daemon_stream(): @pytest.fixture -async def py_to_daemon_stream_pair(hosts, p2pds, is_to_fail_daemon_stream): - assert len(hosts) >= 1 - assert len(p2pds) >= 1 - host = hosts[0] - p2pd = p2pds[0] - protocol_id = "/protocol/id/123" - stream_py = None - stream_daemon = None - event_stream_handled = asyncio.Event() - await connect(host, p2pd) +async def py_to_daemon_stream_pair(p2pds, is_host_secure, is_to_fail_daemon_stream): + async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts: + assert len(p2pds) >= 1 + host = hosts[0] + p2pd = p2pds[0] + protocol_id = "/protocol/id/123" + stream_py = None + stream_daemon = None + event_stream_handled = trio.Event() + await connect(host, p2pd) - async def daemon_stream_handler(stream_info, reader, writer): - nonlocal stream_daemon - stream_daemon = DaemonStream(stream_info, reader, writer) - event_stream_handled.set() + async def daemon_stream_handler(stream_info, stream): + nonlocal stream_daemon + stream_daemon = DaemonStream(stream_info, stream) + event_stream_handled.set() + await trio.hazmat.checkpoint() - await p2pd.control.stream_handler(protocol_id, daemon_stream_handler) - # Sleep for a while to wait for the handler being registered. - await asyncio.sleep(0.01) + await p2pd.control.stream_handler(protocol_id, daemon_stream_handler) + # Sleep for a while to wait for the handler being registered. + await trio.sleep(0.01) - if is_to_fail_daemon_stream: - # FIXME: This is a workaround to make daemon reset the stream. - # We intentionally close the listener on the python side, it makes the connection from - # daemon to us fail, and therefore the daemon resets the opened stream on their side. - # Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501 - # We need it because we want to test against `stream_py` after the remote side(daemon) - # is reset. This should be removed after the API `stream.reset` is exposed in daemon - # some day. - listener = p2pds[0].control.control.listener - listener.close() - if sys.version_info[0:2] > (3, 6): - await listener.wait_closed() - stream_py = await host.new_stream(p2pd.peer_id, [protocol_id]) - if not is_to_fail_daemon_stream: - await event_stream_handled.wait() - # NOTE: If `is_to_fail_daemon_stream == True`, then `stream_daemon == None`. - yield stream_py, stream_daemon + if is_to_fail_daemon_stream: + # FIXME: This is a workaround to make daemon reset the stream. + # We intentionally close the listener on the python side, it makes the connection from + # daemon to us fail, and therefore the daemon resets the opened stream on their side. + # Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501 + # We need it because we want to test against `stream_py` after the remote side(daemon) + # is reset. This should be removed after the API `stream.reset` is exposed in daemon + # some day. + await p2pds[0].control.control.close() + stream_py = await host.new_stream(p2pd.peer_id, [protocol_id]) + if not is_to_fail_daemon_stream: + await event_stream_handled.wait() + # NOTE: If `is_to_fail_daemon_stream == True`, then `stream_daemon == None`. + yield stream_py, stream_daemon diff --git a/tests_interop/test_bindings.py b/tests_interop/test_bindings.py index 9e70aa21..87cdbb1b 100644 --- a/tests_interop/test_bindings.py +++ b/tests_interop/test_bindings.py @@ -1,26 +1,26 @@ -import asyncio - import pytest +import trio +from libp2p.tools.factories import HostFactory from libp2p.tools.interop.utils import connect -@pytest.mark.parametrize("num_hosts", (1,)) -@pytest.mark.asyncio -async def test_connect(hosts, p2pds): - p2pd = p2pds[0] - host = hosts[0] - assert len(await p2pd.control.list_peers()) == 0 - # Test: connect from Py - await connect(host, p2pd) - assert len(await p2pd.control.list_peers()) == 1 - # Test: `disconnect` from Py - await host.disconnect(p2pd.peer_id) - assert len(await p2pd.control.list_peers()) == 0 - # Test: connect from Go - await connect(p2pd, host) - assert len(host.get_network().connections) == 1 - # Test: `disconnect` from Go - await p2pd.control.disconnect(host.get_id()) - await asyncio.sleep(0.01) - assert len(host.get_network().connections) == 0 +@pytest.mark.trio +async def test_connect(is_host_secure, p2pds): + async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts: + p2pd = p2pds[0] + host = hosts[0] + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Py + await connect(host, p2pd) + assert len(await p2pd.control.list_peers()) == 1 + # Test: `disconnect` from Py + await host.disconnect(p2pd.peer_id) + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Go + await connect(p2pd, host) + assert len(host.get_network().connections) == 1 + # Test: `disconnect` from Go + await p2pd.control.disconnect(host.get_id()) + await trio.sleep(0.01) + assert len(host.get_network().connections) == 0 diff --git a/tests_interop/test_echo.py b/tests_interop/test_echo.py index 6ac867ba..d3cb72fd 100644 --- a/tests_interop/test_echo.py +++ b/tests_interop/test_echo.py @@ -1,82 +1,104 @@ -import asyncio +import random +import re from multiaddr import Multiaddr import pytest +import trio -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.tools.interop.constants import PEXPECT_NEW_LINE +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr +from libp2p.tools.factories import HostFactory from libp2p.tools.interop.envs import GO_BIN_PATH +from libp2p.tools.interop.process import BaseInteractiveProcess from libp2p.typing import TProtocol ECHO_PATH = GO_BIN_PATH / "echo" ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0") -async def make_echo_proc( - proc_factory, port: int, is_secure: bool, destination: Multiaddr = None -): - args = [f"-l={port}"] - if not is_secure: - args.append("-insecure") - if destination is not None: - args.append(f"-d={str(destination)}") - echo_proc = proc_factory(str(ECHO_PATH), args) - await echo_proc.expect(r"I am ([\w\./]+)" + PEXPECT_NEW_LINE, async_=True) - maddr_str_ipfs = echo_proc.match.group(1) - maddr_str = maddr_str_ipfs.replace("ipfs", "p2p") - maddr = Multiaddr(maddr_str) - go_pinfo = info_from_p2p_addr(maddr) - if destination is None: - await echo_proc.expect("listening for connections", async_=True) - return echo_proc, go_pinfo +# FIXME: Change to a reasonable implementation +def unused_tcp_port_factory(): + return random.randint(1024, 65535) -@pytest.mark.parametrize("num_hosts", (1,)) -@pytest.mark.asyncio -async def test_insecure_conn_py_to_go( - hosts, proc_factory, is_host_secure, unused_tcp_port -): - go_proc, go_pinfo = await make_echo_proc( - proc_factory, unused_tcp_port, is_host_secure - ) +class EchoProcess(BaseInteractiveProcess): + port: int + _peer_info: PeerInfo - host = hosts[0] - await host.connect(go_pinfo) - await go_proc.expect("swarm listener accepted connection", async_=True) - s = await host.new_stream(go_pinfo.peer_id, [ECHO_PROTOCOL_ID]) + def __init__( + self, port: int, is_secure: bool, destination: Multiaddr = None + ) -> None: + args = [f"-l={port}"] + if not is_secure: + args.append("-insecure") + if destination is not None: + args.append(f"-d={str(destination)}") - await go_proc.expect("Got a new stream!", async_=True) - data = "data321123\n" - await s.write(data.encode()) - await go_proc.expect(f"read: {data[:-1]}", async_=True) - echoed_resp = await s.read(len(data)) - assert echoed_resp.decode() == data - await s.close() + patterns = [b"I am"] + if destination is None: + patterns.append(b"listening for connections") + + self.args = args + self.cmd = str(ECHO_PATH) + self.patterns = patterns + self.bytes_read = bytearray() + self.event_ready = trio.Event() + + self.port = port + self._peer_info = None + self.regex_pat = re.compile(br"I am ([\w\./]+)") + + @property + def peer_info(self) -> None: + if self._peer_info is not None: + return self._peer_info + if not self.event_ready.is_set(): + raise Exception("process is not ready yet. failed to parse the peer info") + # Example: + # b"I am /ip4/127.0.0.1/tcp/56171/ipfs/QmU41TRPs34WWqa1brJEojBLYZKrrBcJq9nyNfVvSrbZUJ\n" + m = re.search(br"I am ([\w\./]+)", self.bytes_read) + if m is None: + raise Exception("failed to find the pattern for the listening multiaddr") + maddr_bytes_str_ipfs = m.group(1) + maddr_str = maddr_bytes_str_ipfs.decode().replace("ipfs", "p2p") + maddr = Multiaddr(maddr_str) + self._peer_info = info_from_p2p_addr(maddr) + return self._peer_info -@pytest.mark.parametrize("num_hosts", (1,)) -@pytest.mark.asyncio -async def test_insecure_conn_go_to_py( - hosts, proc_factory, is_host_secure, unused_tcp_port -): - host = hosts[0] - expected_data = "Hello, world!\n" - reply_data = "Replyooo!\n" - event_handler_finished = asyncio.Event() +@pytest.mark.trio +async def test_insecure_conn_py_to_go(is_host_secure): + async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts: + go_proc = EchoProcess(unused_tcp_port_factory(), is_host_secure) + await go_proc.start() - async def _handle_echo(stream): - read_data = await stream.read(len(expected_data)) - assert read_data == expected_data.encode() - event_handler_finished.set() - await stream.write(reply_data.encode()) - await stream.close() + host = hosts[0] + peer_info = go_proc.peer_info + await host.connect(peer_info) + s = await host.new_stream(peer_info.peer_id, [ECHO_PROTOCOL_ID]) + data = "data321123\n" + await s.write(data.encode()) + echoed_resp = await s.read(len(data)) + assert echoed_resp.decode() == data + await s.close() - host.set_stream_handler(ECHO_PROTOCOL_ID, _handle_echo) - py_maddr = host.get_addrs()[0] - go_proc, _ = await make_echo_proc( - proc_factory, unused_tcp_port, is_host_secure, py_maddr - ) - await go_proc.expect("connect with peer", async_=True) - await go_proc.expect("opened stream", async_=True) - await event_handler_finished.wait() - await go_proc.expect(f"read reply: .*{reply_data.rstrip()}.*", async_=True) + +@pytest.mark.trio +async def test_insecure_conn_go_to_py(is_host_secure): + async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts: + host = hosts[0] + expected_data = "Hello, world!\n" + reply_data = "Replyooo!\n" + event_handler_finished = trio.Event() + + async def _handle_echo(stream): + read_data = await stream.read(len(expected_data)) + assert read_data == expected_data.encode() + event_handler_finished.set() + await stream.write(reply_data.encode()) + await stream.close() + + host.set_stream_handler(ECHO_PROTOCOL_ID, _handle_echo) + py_maddr = host.get_addrs()[0] + go_proc = EchoProcess(unused_tcp_port_factory(), is_host_secure, py_maddr) + await go_proc.start() + await event_handler_finished.wait() diff --git a/tests_interop/test_net_stream.py b/tests_interop/test_net_stream.py index 2c897d2b..59812a9b 100644 --- a/tests_interop/test_net_stream.py +++ b/tests_interop/test_net_stream.py @@ -1,6 +1,5 @@ -import asyncio - import pytest +import trio from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset from libp2p.tools.constants import MAX_READ_LEN @@ -8,7 +7,7 @@ from libp2p.tools.constants import MAX_READ_LEN DATA = b"data" -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds): stream_py, stream_daemon = py_to_daemon_stream_pair assert ( @@ -19,19 +18,19 @@ async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds): assert (await stream_daemon.read(MAX_READ_LEN)) == DATA -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_remote_closed(py_to_daemon_stream_pair, p2pds): stream_py, stream_daemon = py_to_daemon_stream_pair await stream_daemon.write(DATA) await stream_daemon.close() - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert (await stream_py.read(MAX_READ_LEN)) == DATA # EOF with pytest.raises(StreamEOF): await stream_py.read(MAX_READ_LEN) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds): stream_py, _ = py_to_daemon_stream_pair await stream_py.reset() @@ -40,15 +39,15 @@ async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds @pytest.mark.parametrize("is_to_fail_daemon_stream", (True,)) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_remote_reset(py_to_daemon_stream_pair, p2pds): stream_py, _ = py_to_daemon_stream_pair - await asyncio.sleep(0.01) + await trio.sleep(0.01) with pytest.raises(StreamReset): await stream_py.read(MAX_READ_LEN) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2pds): stream_py, _ = py_to_daemon_stream_pair await stream_py.write(DATA) @@ -57,7 +56,7 @@ async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2p await stream_py.write(DATA) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pds): stream_py, stream_daemon = py_to_daemon_stream_pair await stream_py.reset() @@ -66,9 +65,9 @@ async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pd @pytest.mark.parametrize("is_to_fail_daemon_stream", (True,)) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_remote_reset(py_to_daemon_stream_pair, p2pds): stream_py, _ = py_to_daemon_stream_pair - await asyncio.sleep(0.01) + await trio.sleep(0.01) with pytest.raises(StreamClosed): await stream_py.write(DATA) diff --git a/tests_interop/test_pubsub.py b/tests_interop/test_pubsub.py index db42c7cd..f15d89e3 100644 --- a/tests_interop/test_pubsub.py +++ b/tests_interop/test_pubsub.py @@ -1,11 +1,15 @@ -import asyncio import functools +import math from p2pclient.pb import p2pd_pb2 import pytest +import trio +from libp2p.io.trio import TrioTCPStream from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.subscription import TrioSubscriptionAPI +from libp2p.tools.factories import PubsubFactory from libp2p.tools.interop.utils import connect from libp2p.utils import read_varint_prefixed_bytes @@ -13,26 +17,15 @@ TOPIC_0 = "ABALA" TOPIC_1 = "YOOOO" -async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]": - reader, writer = await p2pd.control.pubsub_subscribe(topic) +async def p2pd_subscribe(p2pd, topic, nursery): + stream = TrioTCPStream(await p2pd.control.pubsub_subscribe(topic)) + send_channel, receive_channel = trio.open_memory_channel(math.inf) - queue = asyncio.Queue() + sub = TrioSubscriptionAPI(receive_channel) async def _read_pubsub_msg() -> None: - writer_closed_task = asyncio.ensure_future(writer.wait_closed()) - while True: - done, pending = await asyncio.wait( - [read_varint_prefixed_bytes(reader), writer_closed_task], - return_when=asyncio.FIRST_COMPLETED, - ) - done_tasks = tuple(done) - if writer.is_closing(): - return - read_task = done_tasks[0] - # Sanity check - assert read_task._coro.__name__ == "read_varint_prefixed_bytes" - msg_bytes = read_task.result() + msg_bytes = await read_varint_prefixed_bytes(stream) ps_msg = p2pd_pb2.PSMessage() ps_msg.ParseFromString(msg_bytes) # Fill in the message used in py-libp2p @@ -44,11 +37,10 @@ async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]": signature=ps_msg.signature, key=ps_msg.key, ) - queue.put_nowait(msg) + await send_channel.send(msg) - asyncio.ensure_future(_read_pubsub_msg()) - await asyncio.sleep(0) - return queue + nursery.start_soon(_read_pubsub_msg) + return sub def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None: @@ -59,108 +51,119 @@ def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> "is_pubsub_signing, is_pubsub_signing_strict", ((True, True), (False, False)) ) @pytest.mark.parametrize("is_gossipsub", (True, False)) -@pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) -@pytest.mark.asyncio -async def test_pubsub(pubsubs, p2pds): - # - # Test: Recognize pubsub peers on connection. - # - py_pubsub = pubsubs[0] - # go0 <-> py <-> go1 - await connect(p2pds[0], py_pubsub.host) - await connect(py_pubsub.host, p2pds[1]) - py_peer_id = py_pubsub.host.get_id() - # Check pubsub peers - pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("") - assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id - pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("") - assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id - assert ( - len(py_pubsub.peers) == 2 - and p2pds[0].peer_id in py_pubsub.peers - and p2pds[1].peer_id in py_pubsub.peers - ) +@pytest.mark.parametrize("num_p2pds", (2,)) +@pytest.mark.trio +async def test_pubsub( + p2pds, is_gossipsub, is_host_secure, is_pubsub_signing_strict, nursery +): + pubsub_factory = None + if is_gossipsub: + pubsub_factory = PubsubFactory.create_batch_with_gossipsub + else: + pubsub_factory = PubsubFactory.create_batch_with_floodsub - # - # Test: `subscribe`. - # - # (name, topics) - # (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1]) - sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0) - sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1) - sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0) - sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1) - sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1) - # Check topic peers - await asyncio.sleep(0.1) - # go_0 - go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0) - assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0] - go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1) - assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0] - # py - py_topic_0_peers = list(py_pubsub.peer_topics[TOPIC_0]) - assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0] - # go_1 - go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1) - assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0] + async with pubsub_factory( + 1, is_secure=is_host_secure, strict_signing=is_pubsub_signing_strict + ) as pubsubs: + # + # Test: Recognize pubsub peers on connection. + # + py_pubsub = pubsubs[0] + # go0 <-> py <-> go1 + await connect(p2pds[0], py_pubsub.host) + await connect(py_pubsub.host, p2pds[1]) + py_peer_id = py_pubsub.host.get_id() + # Check pubsub peers + pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("") + assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id + pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("") + assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id + assert ( + len(py_pubsub.peers) == 2 + and p2pds[0].peer_id in py_pubsub.peers + and p2pds[1].peer_id in py_pubsub.peers + ) - # - # Test: `publish` - # - # 1. py publishes - # - 1.1. py publishes data_11 to topic_0, py and go_0 receives. - # - 1.2. py publishes data_12 to topic_1, all receive. - # 2. go publishes - # - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. - # - 2.2. go_1 publishes data_22 to topic_1, all receive. + # + # Test: `subscribe`. + # + # (name, topics) + # (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1]) + sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0) + sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1) + sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0, nursery) + sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1, nursery) + sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1, nursery) + # Check topic peers + await trio.sleep(0.1) + # go_0 + go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0) + assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0] + go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1) + assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0] + # py + py_topic_0_peers = list(py_pubsub.peer_topics[TOPIC_0]) + assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0] + # go_1 + go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1) + assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0] - # 1.1. py publishes data_11 to topic_0, py and go_0 receives. - data_11 = b"data_11" - await py_pubsub.publish(TOPIC_0, data_11) - validate_11 = functools.partial( - validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id - ) - validate_11(await sub_py_topic_0.get()) - validate_11(await sub_go_0_topic_0.get()) + # + # Test: `publish` + # + # 1. py publishes + # - 1.1. py publishes data_11 to topic_0, py and go_0 receives. + # - 1.2. py publishes data_12 to topic_1, all receive. + # 2. go publishes + # - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + # - 2.2. go_1 publishes data_22 to topic_1, all receive. - # 1.2. py publishes data_12 to topic_1, all receive. - data_12 = b"data_12" - validate_12 = functools.partial( - validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id - ) - await py_pubsub.publish(TOPIC_1, data_12) - validate_12(await sub_py_topic_1.get()) - validate_12(await sub_go_0_topic_1.get()) - validate_12(await sub_go_1_topic_1.get()) + # 1.1. py publishes data_11 to topic_0, py and go_0 receives. + data_11 = b"data_11" + await py_pubsub.publish(TOPIC_0, data_11) + validate_11 = functools.partial( + validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id + ) + validate_11(await sub_py_topic_0.get()) + validate_11(await sub_go_0_topic_0.get()) - # 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. - data_21 = b"data_21" - validate_21 = functools.partial( - validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id - ) - await p2pds[0].control.pubsub_publish(TOPIC_0, data_21) - validate_21(await sub_py_topic_0.get()) - validate_21(await sub_go_0_topic_0.get()) + # 1.2. py publishes data_12 to topic_1, all receive. + data_12 = b"data_12" + validate_12 = functools.partial( + validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id + ) + await py_pubsub.publish(TOPIC_1, data_12) + validate_12(await sub_py_topic_1.get()) + validate_12(await sub_go_0_topic_1.get()) + validate_12(await sub_go_1_topic_1.get()) - # 2.2. go_1 publishes data_22 to topic_1, all receive. - data_22 = b"data_22" - validate_22 = functools.partial( - validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id - ) - await p2pds[1].control.pubsub_publish(TOPIC_1, data_22) - validate_22(await sub_py_topic_1.get()) - validate_22(await sub_go_0_topic_1.get()) - validate_22(await sub_go_1_topic_1.get()) + # 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + data_21 = b"data_21" + validate_21 = functools.partial( + validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id + ) + await p2pds[0].control.pubsub_publish(TOPIC_0, data_21) + validate_21(await sub_py_topic_0.get()) + validate_21(await sub_go_0_topic_0.get()) - # - # Test: `unsubscribe` and re`subscribe` - # - await py_pubsub.unsubscribe(TOPIC_0) - await asyncio.sleep(0.1) - assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) - assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) - await py_pubsub.subscribe(TOPIC_0) - await asyncio.sleep(0.1) - assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) - assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) + # 2.2. go_1 publishes data_22 to topic_1, all receive. + data_22 = b"data_22" + validate_22 = functools.partial( + validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id + ) + await p2pds[1].control.pubsub_publish(TOPIC_1, data_22) + validate_22(await sub_py_topic_1.get()) + validate_22(await sub_go_0_topic_1.get()) + validate_22(await sub_go_1_topic_1.get()) + + # + # Test: `unsubscribe` and re`subscribe` + # + await py_pubsub.unsubscribe(TOPIC_0) + await trio.sleep(0.1) + assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) + await py_pubsub.subscribe(TOPIC_0) + await trio.sleep(0.1) + assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) diff --git a/tox.ini b/tox.ini index 21f46435..00af177f 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ envlist = combine_as_imports=False force_sort_within_sections=True include_trailing_comma=True -known_third_party=hypothesis,pytest,p2pclient,pexpect,factory +known_third_party=anyio,factory,p2pclient,pytest known_first_party=libp2p line_length=88 multi_line_output=3