From b77834d12971618f60909f1a9b56678c827c8310 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sun, 1 Sep 2019 23:39:53 +0800 Subject: [PATCH] Use `asyncio.subprocess` over pexpect In the test for pubsub, since there were unknown issues when I test against pexpect. --- tests/interop/daemon.py | 164 +++++++++++++++++++++++------- tests/interop/test_pubsub_bind.py | 15 ++- 2 files changed, 137 insertions(+), 42 deletions(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index e74329be..9bcb292f 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -1,24 +1,113 @@ -from multiaddr import Multiaddr -from pexpect.spawnbase import SpawnBase +import asyncio +import time +from typing import Any, List +import multiaddr +from multiaddr import Multiaddr from p2pclient import Client -from libp2p.peer.peerinfo import info_from_p2p_addr, PeerInfo from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr -from .constants import PEXPECT_NEW_LINE, LOCALHOST_IP +from .constants import LOCALHOST_IP from .envs import GO_BIN_PATH P2PD_PATH = GO_BIN_PATH / "p2pd" +TIMEOUT_DURATION = 30 + + +async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): + """ + Keep running ``coro_func`` until the time is out. + All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. + """ + t_start = time.monotonic() + while True: + result = await coro_func() + if result: + break + if (time.monotonic() - t_start) >= timeout: + # timeout + assert False, f"{coro_func} still failed after `{timeout}` seconds" + await asyncio.sleep(0.01) + + +class P2PDProcess: + proc: asyncio.subprocess.Process + cmd: str = str(P2PD_PATH) + args: List[Any] + + def __init__( + self, + control_maddr: Multiaddr, + is_secure: bool, + is_pubsub_enabled=True, + is_gossipsub=True, + is_pubsub_signing=False, + is_pubsub_signing_strict=False, + ) -> None: + args = [f"-listen={str(control_maddr)}"] + # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. + if not is_secure: + args.append("-insecure=true") + if is_pubsub_enabled: + args.append("-pubsub") + if is_gossipsub: + args.append("-pubsubRouter=gossipsub") + else: + args.append("-pubsubRouter=floodsub") + if not is_pubsub_signing: + args.append("-pubsubSign=false") + if not is_pubsub_signing_strict: + args.append("-pubsubSignStrict=false") + # NOTE: + # Two other params are possibly what we want to configure: + # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501 + # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second + # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 + self.args = args + + async def wait_until_ready(self): + lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") + lines_head_occurred = {line: False for line in lines_head_pattern} + + async def read_from_daemon_and_check(): + line = await self.proc.stdout.readline() + for head_pattern in lines_head_occurred: + if line.startswith(head_pattern): + lines_head_occurred[head_pattern] = True + return all([value for _, value in lines_head_occurred.items()]) + + await try_until_success(read_from_daemon_and_check) + # Sleep a little bit to ensure the listener is up after logs are emitted. + await asyncio.sleep(0.01) + + async def start(self) -> None: + self.proc = await asyncio.subprocess.create_subprocess_exec( + self.cmd, + *self.args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + bufsize=0, + ) + await self.wait_until_ready() + + async def close(self) -> None: + self.proc.terminate() + await self.proc.wait() + + class Daemon: - proc: SpawnBase + p2pd_proc: P2PDProcess control: Client peer_info: PeerInfo - def __init__(self, proc: SpawnBase, control: Client, peer_info: PeerInfo) -> None: - self.proc = proc + def __init__( + self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo + ) -> None: + self.p2pd_proc = p2pd_proc self.control = control self.peer_info = peer_info @@ -33,9 +122,12 @@ class Daemon: def listen_maddr(self) -> Multiaddr: return self.peer_info.addrs[0] + async def close(self) -> None: + await self.p2pd_proc.close() + await self.control.close() + async def make_p2pd( - proc_factory, unused_tcp_port_factory, is_secure: bool, is_pubsub_enabled=True, @@ -44,38 +136,34 @@ async def make_p2pd( is_pubsub_signing_strict=False, ) -> Daemon: control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}") - args = [f"-listen={str(control_maddr)}"] - # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. - if not is_secure: - args.append("-insecure=true") - if is_pubsub_enabled: - args.append("-pubsub") - if is_gossipsub: - args.append("-pubsubRouter=gossipsub") - else: - args.append("-pubsubRouter=floodsub") - if not is_pubsub_signing: - args.append("-pubsubSign=false") - if not is_pubsub_signing_strict: - args.append("-pubsubSignStrict=false") - # NOTE: - # Two other params are possibly what we want to configure: - # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond - # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second - # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 - proc = proc_factory(str(P2PD_PATH), args) - await proc.expect(r"Peer ID:\s+(\w+)" + PEXPECT_NEW_LINE, async_=True) - peer_id_base58 = proc.match.group(1) - await proc.expect(r"Peer Addrs:", async_=True) - await proc.expect( - rf"(/ip4/{LOCALHOST_IP}/tcp/[\w]+)" + PEXPECT_NEW_LINE, async_=True - ) - daemon_listener_maddr = Multiaddr(proc.match.group(1)) - daemon_pinfo = info_from_p2p_addr( - daemon_listener_maddr.encapsulate(f"/p2p/{peer_id_base58}") + p2pd_proc = P2PDProcess( + control_maddr, + is_secure, + is_pubsub_enabled, + is_gossipsub, + is_pubsub_signing, + is_pubsub_signing_strict, ) + await p2pd_proc.start() client_callback_maddr = Multiaddr( f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}" ) p2pc = Client(control_maddr, client_callback_maddr) - return Daemon(proc, p2pc, daemon_pinfo) + await p2pc.listen() + peer_id, maddrs = await p2pc.identify() + listen_maddr: Multiaddr = None + for maddr in maddrs: + try: + ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) + maddr.value_for_protocol(multiaddr.protocols.P_TCP) + except multiaddr.exceptions.ProtocolLookupError: + continue + if ip == LOCALHOST_IP: + listen_maddr = maddr + break + assert listen_maddr is not None, "no loopback maddr is found" + peer_info = info_from_p2p_addr( + listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ) + print(f"!@# peer_info: peer_id={peer_info.peer_id}, maddrs={peer_info.addrs}") + return Daemon(p2pd_proc, p2pc, peer_info) diff --git a/tests/interop/test_pubsub_bind.py b/tests/interop/test_pubsub_bind.py index d01dc2d5..66793b40 100644 --- a/tests/interop/test_pubsub_bind.py +++ b/tests/interop/test_pubsub_bind.py @@ -5,7 +5,14 @@ from .daemon import make_p2pd @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_pubsub_init( - hosts, proc_factory, is_host_secure, unused_tcp_port_factory -): - p2pd = await make_p2pd(proc_factory, unused_tcp_port_factory, is_host_secure) +async def test_pubsub_init(hosts, is_host_secure, unused_tcp_port_factory): + try: + p2pd = await make_p2pd(unused_tcp_port_factory, is_host_secure) + host = hosts[0] + peers = await p2pd.control.list_peers() + assert len(peers) == 0 + await host.connect(p2pd.peer_info) + peers = await p2pd.control.list_peers() + assert len(peers) != 0 + finally: + await p2pd.close()