Fix tests_interop

- Remove pexpect
- Use new version of `p2pclient`, which makes use of anyio
- Clean up tests
This commit is contained in:
mhchia
2020-01-07 14:14:34 +08:00
parent 000e777ac7
commit fe4354d377
11 changed files with 415 additions and 477 deletions

View File

@ -7,7 +7,7 @@ from libp2p.pubsub import floodsub, gossipsub
# Just a arbitrary large number. # Just a arbitrary large number.
# It is used when calling `MplexStream.read(MAX_READ_LEN)`, # It is used when calling `MplexStream.read(MAX_READ_LEN)`,
# to avoid `MplexStream.read()`, which blocking reads until EOF. # to avoid `MplexStream.read()`, which blocking reads until EOF.
MAX_READ_LEN = 2 ** 32 - 1 MAX_READ_LEN = 65535
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")

View File

@ -1,2 +1 @@
LOCALHOST_IP = "127.0.0.1" LOCALHOST_IP = "127.0.0.1"
PEXPECT_NEW_LINE = "\r\n"

View File

@ -1,52 +1,22 @@
import asyncio from typing import AsyncIterator
import time
from typing import Any, Awaitable, Callable, List
from async_generator import asynccontextmanager
import multiaddr import multiaddr
from multiaddr import Multiaddr from multiaddr import Multiaddr
from p2pclient import Client from p2pclient import Client
import pytest import trio
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
from .constants import LOCALHOST_IP from .constants import LOCALHOST_IP
from .envs import GO_BIN_PATH from .envs import GO_BIN_PATH
from .process import BaseInteractiveProcess
P2PD_PATH = GO_BIN_PATH / "p2pd" P2PD_PATH = GO_BIN_PATH / "p2pd"
TIMEOUT_DURATION = 30 class P2PDProcess(BaseInteractiveProcess):
async def try_until_success(
coro_func: Callable[[], Awaitable[Any]], timeout: int = TIMEOUT_DURATION
) -> None:
"""
Keep running ``coro_func`` until either it succeed or time is up.
All arguments of ``coro_func`` should be filled, i.e. it should be
called without arguments.
"""
t_start = time.monotonic()
while True:
result = await coro_func()
if result:
break
if (time.monotonic() - t_start) >= timeout:
# timeout
pytest.fail(f"{coro_func} is still failing after `{timeout}` seconds")
await asyncio.sleep(0.01)
class P2PDProcess:
proc: asyncio.subprocess.Process
cmd: str = str(P2PD_PATH)
args: List[Any]
is_proc_running: bool
_tasks: List["asyncio.Future[Any]"]
def __init__( def __init__(
self, self,
control_maddr: Multiaddr, control_maddr: Multiaddr,
@ -75,74 +45,21 @@ class P2PDProcess:
# - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501 # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501
# - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second
# Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501
self.proc = None
self.cmd = str(P2PD_PATH)
self.args = args self.args = args
self.is_proc_running = False self.patterns = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
self.bytes_read = bytearray()
self._tasks = [] self.event_ready = trio.Event()
async def wait_until_ready(self) -> None:
lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
lines_head_occurred = {line: False for line in lines_head_pattern}
async def read_from_daemon_and_check() -> bool:
line = await self.proc.stdout.readline()
for head_pattern in lines_head_occurred:
if line.startswith(head_pattern):
lines_head_occurred[head_pattern] = True
return all([value for value in lines_head_occurred.values()])
await try_until_success(read_from_daemon_and_check)
# Sleep a little bit to ensure the listener is up after logs are emitted.
await asyncio.sleep(0.01)
async def start_printing_logs(self) -> None:
async def _print_from_stream(
src_name: str, reader: asyncio.StreamReader
) -> None:
while True:
line = await reader.readline()
if line != b"":
print(f"{src_name}\t: {line.rstrip().decode()}")
await asyncio.sleep(0.01)
self._tasks.append(
asyncio.ensure_future(_print_from_stream("out", self.proc.stdout))
)
self._tasks.append(
asyncio.ensure_future(_print_from_stream("err", self.proc.stderr))
)
await asyncio.sleep(0)
async def start(self) -> None:
if self.is_proc_running:
return
self.proc = await asyncio.subprocess.create_subprocess_exec(
self.cmd,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
bufsize=0,
)
self.is_proc_running = True
await self.wait_until_ready()
await self.start_printing_logs()
async def close(self) -> None:
if self.is_proc_running:
self.proc.terminate()
await self.proc.wait()
self.is_proc_running = False
for task in self._tasks:
task.cancel()
class Daemon: class Daemon:
p2pd_proc: P2PDProcess p2pd_proc: BaseInteractiveProcess
control: Client control: Client
peer_info: PeerInfo peer_info: PeerInfo
def __init__( def __init__(
self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo self, p2pd_proc: BaseInteractiveProcess, control: Client, peer_info: PeerInfo
) -> None: ) -> None:
self.p2pd_proc = p2pd_proc self.p2pd_proc = p2pd_proc
self.control = control self.control = control
@ -164,6 +81,7 @@ class Daemon:
await self.control.close() await self.control.close()
@asynccontextmanager
async def make_p2pd( async def make_p2pd(
daemon_control_port: int, daemon_control_port: int,
client_callback_port: int, client_callback_port: int,
@ -172,7 +90,7 @@ async def make_p2pd(
is_gossipsub: bool = True, is_gossipsub: bool = True,
is_pubsub_signing: bool = False, is_pubsub_signing: bool = False,
is_pubsub_signing_strict: bool = False, is_pubsub_signing_strict: bool = False,
) -> Daemon: ) -> AsyncIterator[Daemon]:
control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}") control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}")
p2pd_proc = P2PDProcess( p2pd_proc = P2PDProcess(
control_maddr, control_maddr,
@ -185,21 +103,22 @@ async def make_p2pd(
await p2pd_proc.start() await p2pd_proc.start()
client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}") client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}")
p2pc = Client(control_maddr, client_callback_maddr) p2pc = Client(control_maddr, client_callback_maddr)
await p2pc.listen()
peer_id, maddrs = await p2pc.identify() async with p2pc.listen():
listen_maddr: Multiaddr = None peer_id, maddrs = await p2pc.identify()
for maddr in maddrs: listen_maddr: Multiaddr = None
try: for maddr in maddrs:
ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) try:
# NOTE: Check if this `maddr` uses `tcp`. ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4)
maddr.value_for_protocol(multiaddr.protocols.P_TCP) # NOTE: Check if this `maddr` uses `tcp`.
except multiaddr.exceptions.ProtocolLookupError: maddr.value_for_protocol(multiaddr.protocols.P_TCP)
continue except multiaddr.exceptions.ProtocolLookupError:
if ip == LOCALHOST_IP: continue
listen_maddr = maddr if ip == LOCALHOST_IP:
break listen_maddr = maddr
assert listen_maddr is not None, "no loopback maddr is found" break
peer_info = info_from_p2p_addr( assert listen_maddr is not None, "no loopback maddr is found"
listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) peer_info = info_from_p2p_addr(
) listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
return Daemon(p2pd_proc, p2pc, peer_info) )
yield Daemon(p2pd_proc, p2pc, peer_info)

View File

@ -0,0 +1,66 @@
from abc import ABC, abstractmethod
import subprocess
from typing import Iterable, List
import trio
TIMEOUT_DURATION = 30
class AbstractInterativeProcess(ABC):
@abstractmethod
async def start(self) -> None:
...
@abstractmethod
async def close(self) -> None:
...
class BaseInteractiveProcess(AbstractInterativeProcess):
proc: trio.Process = None
cmd: str
args: List[str]
bytes_read: bytearray
patterns: Iterable[bytes] = None
event_ready: trio.Event
async def wait_until_ready(self) -> None:
patterns_occurred = {pat: False for pat in self.patterns}
async def read_from_daemon_and_check() -> None:
async for data in self.proc.stdout:
# TODO: It takes O(n^2), which is quite bad.
# But it should succeed in a few seconds.
self.bytes_read.extend(data)
for pat, occurred in patterns_occurred.items():
if occurred:
continue
if pat in self.bytes_read:
patterns_occurred[pat] = True
if all([value for value in patterns_occurred.values()]):
return
with trio.fail_after(TIMEOUT_DURATION):
await read_from_daemon_and_check()
self.event_ready.set()
# Sleep a little bit to ensure the listener is up after logs are emitted.
await trio.sleep(0.01)
async def start(self) -> None:
if self.proc is not None:
return
# NOTE: Ignore type checks here since mypy complains about bufsize=0
self.proc = await trio.open_process( # type: ignore
[self.cmd] + self.args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout, which makes parsing easier
bufsize=0,
)
await self.wait_until_ready()
async def close(self) -> None:
if self.proc is None:
return
self.proc.terminate()
await self.proc.wait()

View File

@ -1,7 +1,7 @@
import asyncio
from typing import Union from typing import Union
from multiaddr import Multiaddr from multiaddr import Multiaddr
import trio
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.peer.id import ID from libp2p.peer.id import ID
@ -50,7 +50,7 @@ async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None:
else: # isinstance(b, IHost) else: # isinstance(b, IHost)
await a.connect(b_peer_info) await a.connect(b_peer_info)
# Allow additional sleep for both side to establish the connection. # Allow additional sleep for both side to establish the connection.
await asyncio.sleep(0.1) await trio.sleep(0.1)
a_peer_info = _get_peer_info(a) a_peer_info = _get_peer_info(a)

View File

@ -1,20 +1,13 @@
import asyncio import anyio
import sys from async_exit_stack import AsyncExitStack
from typing import Union
from p2pclient.datastructures import StreamInfo from p2pclient.datastructures import StreamInfo
import pexpect from p2pclient.utils import get_unused_tcp_port
import pytest import pytest
import trio
from libp2p.io.abc import ReadWriteCloser from libp2p.io.abc import ReadWriteCloser
from libp2p.tools.constants import GOSSIPSUB_PARAMS, LISTEN_MADDR from libp2p.tools.factories import HostFactory, PubsubFactory
from libp2p.tools.factories import ( from libp2p.tools.interop.daemon import make_p2pd
FloodsubFactory,
GossipsubFactory,
HostFactory,
PubsubFactory,
)
from libp2p.tools.interop.daemon import Daemon, make_p2pd
from libp2p.tools.interop.utils import connect from libp2p.tools.interop.utils import connect
@ -23,48 +16,6 @@ def is_host_secure():
return False return False
@pytest.fixture
def num_hosts():
return 3
@pytest.fixture
async def hosts(num_hosts, is_host_secure):
_hosts = HostFactory.create_batch(num_hosts, is_secure=is_host_secure)
await asyncio.gather(
*[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts]
)
try:
yield _hosts
finally:
# TODO: It's possible that `close` raises exceptions currently,
# due to the connection reset things. Though we don't care much about that when
# cleaning up the tasks, it is probably better to handle the exceptions properly.
await asyncio.gather(
*[_host.close() for _host in _hosts], return_exceptions=True
)
@pytest.fixture
def proc_factory():
procs = []
def call_proc(cmd, args, logfile=None, encoding=None):
if logfile is None:
logfile = sys.stdout
if encoding is None:
encoding = "utf-8"
proc = pexpect.spawn(cmd, args, logfile=logfile, encoding=encoding)
procs.append(proc)
return proc
try:
yield call_proc
finally:
for proc in procs:
proc.close()
@pytest.fixture @pytest.fixture
def num_p2pds(): def num_p2pds():
return 1 return 1
@ -87,79 +38,60 @@ def is_pubsub_signing_strict():
@pytest.fixture @pytest.fixture
async def p2pds( async def p2pds(
num_p2pds, num_p2pds, is_host_secure, is_gossipsub, is_pubsub_signing, is_pubsub_signing_strict
is_host_secure,
is_gossipsub,
unused_tcp_port_factory,
is_pubsub_signing,
is_pubsub_signing_strict,
): ):
p2pds: Union[Daemon, Exception] = await asyncio.gather( async with AsyncExitStack() as stack:
*[ p2pds = [
make_p2pd( await stack.enter_async_context(
unused_tcp_port_factory(), make_p2pd(
unused_tcp_port_factory(), get_unused_tcp_port(),
is_host_secure, get_unused_tcp_port(),
is_gossipsub=is_gossipsub, is_host_secure,
is_pubsub_signing=is_pubsub_signing, is_gossipsub=is_gossipsub,
is_pubsub_signing_strict=is_pubsub_signing_strict, is_pubsub_signing=is_pubsub_signing,
is_pubsub_signing_strict=is_pubsub_signing_strict,
)
) )
for _ in range(num_p2pds) for _ in range(num_p2pds)
], ]
return_exceptions=True, try:
) yield p2pds
p2pds_succeeded = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Daemon)) finally:
if len(p2pds_succeeded) != len(p2pds): for p2pd in p2pds:
# Not all succeeded. Close the succeeded ones and print the failed ones(exceptions). await p2pd.close()
await asyncio.gather(*[p2pd.close() for p2pd in p2pds_succeeded])
exceptions = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Exception))
raise Exception(f"not all p2pds succeed: first exception={exceptions[0]}")
try:
yield p2pds
finally:
await asyncio.gather(*[p2pd.close() for p2pd in p2pds])
@pytest.fixture @pytest.fixture
def pubsubs(num_hosts, hosts, is_gossipsub, is_pubsub_signing_strict): async def pubsubs(num_hosts, is_host_secure, is_gossipsub, is_pubsub_signing_strict):
if is_gossipsub: if is_gossipsub:
routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) yield PubsubFactory.create_batch_with_gossipsub(
num_hosts, is_secure=is_host_secure, strict_signing=is_pubsub_signing_strict
)
else: else:
routers = FloodsubFactory.create_batch(num_hosts) yield PubsubFactory.create_batch_with_floodsub(
_pubsubs = tuple( num_hosts, is_host_secure, strict_signing=is_pubsub_signing_strict
PubsubFactory(host=host, router=router, strict_signing=is_pubsub_signing_strict) )
for host, router in zip(hosts, routers)
)
yield _pubsubs
# TODO: Clean up
class DaemonStream(ReadWriteCloser): class DaemonStream(ReadWriteCloser):
stream_info: StreamInfo stream_info: StreamInfo
reader: asyncio.StreamReader stream: anyio.abc.SocketStream
writer: asyncio.StreamWriter
def __init__( def __init__(self, stream_info: StreamInfo, stream: anyio.abc.SocketStream) -> None:
self,
stream_info: StreamInfo,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
self.stream_info = stream_info self.stream_info = stream_info
self.reader = reader self.stream = stream
self.writer = writer
async def close(self) -> None: async def close(self) -> None:
self.writer.close() await self.stream.close()
if sys.version_info < (3, 7):
return
await self.writer.wait_closed()
async def read(self, n: int = -1) -> bytes: async def read(self, n: int = -1) -> bytes:
return await self.reader.read(n) if n == -1:
return await self.stream.receive_some()
else:
return await self.stream.receive_some(n)
async def write(self, data: bytes) -> int: async def write(self, data: bytes) -> None:
return self.writer.write(data) return await self.stream.send_all(data)
@pytest.fixture @pytest.fixture
@ -168,40 +100,38 @@ async def is_to_fail_daemon_stream():
@pytest.fixture @pytest.fixture
async def py_to_daemon_stream_pair(hosts, p2pds, is_to_fail_daemon_stream): async def py_to_daemon_stream_pair(p2pds, is_host_secure, is_to_fail_daemon_stream):
assert len(hosts) >= 1 async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts:
assert len(p2pds) >= 1 assert len(p2pds) >= 1
host = hosts[0] host = hosts[0]
p2pd = p2pds[0] p2pd = p2pds[0]
protocol_id = "/protocol/id/123" protocol_id = "/protocol/id/123"
stream_py = None stream_py = None
stream_daemon = None stream_daemon = None
event_stream_handled = asyncio.Event() event_stream_handled = trio.Event()
await connect(host, p2pd) await connect(host, p2pd)
async def daemon_stream_handler(stream_info, reader, writer): async def daemon_stream_handler(stream_info, stream):
nonlocal stream_daemon nonlocal stream_daemon
stream_daemon = DaemonStream(stream_info, reader, writer) stream_daemon = DaemonStream(stream_info, stream)
event_stream_handled.set() event_stream_handled.set()
await trio.hazmat.checkpoint()
await p2pd.control.stream_handler(protocol_id, daemon_stream_handler) await p2pd.control.stream_handler(protocol_id, daemon_stream_handler)
# Sleep for a while to wait for the handler being registered. # Sleep for a while to wait for the handler being registered.
await asyncio.sleep(0.01) await trio.sleep(0.01)
if is_to_fail_daemon_stream: if is_to_fail_daemon_stream:
# FIXME: This is a workaround to make daemon reset the stream. # FIXME: This is a workaround to make daemon reset the stream.
# We intentionally close the listener on the python side, it makes the connection from # We intentionally close the listener on the python side, it makes the connection from
# daemon to us fail, and therefore the daemon resets the opened stream on their side. # daemon to us fail, and therefore the daemon resets the opened stream on their side.
# Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501 # Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501
# We need it because we want to test against `stream_py` after the remote side(daemon) # We need it because we want to test against `stream_py` after the remote side(daemon)
# is reset. This should be removed after the API `stream.reset` is exposed in daemon # is reset. This should be removed after the API `stream.reset` is exposed in daemon
# some day. # some day.
listener = p2pds[0].control.control.listener await p2pds[0].control.control.close()
listener.close() stream_py = await host.new_stream(p2pd.peer_id, [protocol_id])
if sys.version_info[0:2] > (3, 6): if not is_to_fail_daemon_stream:
await listener.wait_closed() await event_stream_handled.wait()
stream_py = await host.new_stream(p2pd.peer_id, [protocol_id]) # NOTE: If `is_to_fail_daemon_stream == True`, then `stream_daemon == None`.
if not is_to_fail_daemon_stream: yield stream_py, stream_daemon
await event_stream_handled.wait()
# NOTE: If `is_to_fail_daemon_stream == True`, then `stream_daemon == None`.
yield stream_py, stream_daemon

View File

@ -1,26 +1,26 @@
import asyncio
import pytest import pytest
import trio
from libp2p.tools.factories import HostFactory
from libp2p.tools.interop.utils import connect from libp2p.tools.interop.utils import connect
@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.trio
@pytest.mark.asyncio async def test_connect(is_host_secure, p2pds):
async def test_connect(hosts, p2pds): async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts:
p2pd = p2pds[0] p2pd = p2pds[0]
host = hosts[0] host = hosts[0]
assert len(await p2pd.control.list_peers()) == 0 assert len(await p2pd.control.list_peers()) == 0
# Test: connect from Py # Test: connect from Py
await connect(host, p2pd) await connect(host, p2pd)
assert len(await p2pd.control.list_peers()) == 1 assert len(await p2pd.control.list_peers()) == 1
# Test: `disconnect` from Py # Test: `disconnect` from Py
await host.disconnect(p2pd.peer_id) await host.disconnect(p2pd.peer_id)
assert len(await p2pd.control.list_peers()) == 0 assert len(await p2pd.control.list_peers()) == 0
# Test: connect from Go # Test: connect from Go
await connect(p2pd, host) await connect(p2pd, host)
assert len(host.get_network().connections) == 1 assert len(host.get_network().connections) == 1
# Test: `disconnect` from Go # Test: `disconnect` from Go
await p2pd.control.disconnect(host.get_id()) await p2pd.control.disconnect(host.get_id())
await asyncio.sleep(0.01) await trio.sleep(0.01)
assert len(host.get_network().connections) == 0 assert len(host.get_network().connections) == 0

View File

@ -1,82 +1,104 @@
import asyncio import random
import re
from multiaddr import Multiaddr from multiaddr import Multiaddr
import pytest import pytest
import trio
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
from libp2p.tools.interop.constants import PEXPECT_NEW_LINE from libp2p.tools.factories import HostFactory
from libp2p.tools.interop.envs import GO_BIN_PATH from libp2p.tools.interop.envs import GO_BIN_PATH
from libp2p.tools.interop.process import BaseInteractiveProcess
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
ECHO_PATH = GO_BIN_PATH / "echo" ECHO_PATH = GO_BIN_PATH / "echo"
ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0") ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0")
async def make_echo_proc( # FIXME: Change to a reasonable implementation
proc_factory, port: int, is_secure: bool, destination: Multiaddr = None def unused_tcp_port_factory():
): return random.randint(1024, 65535)
args = [f"-l={port}"]
if not is_secure:
args.append("-insecure")
if destination is not None:
args.append(f"-d={str(destination)}")
echo_proc = proc_factory(str(ECHO_PATH), args)
await echo_proc.expect(r"I am ([\w\./]+)" + PEXPECT_NEW_LINE, async_=True)
maddr_str_ipfs = echo_proc.match.group(1)
maddr_str = maddr_str_ipfs.replace("ipfs", "p2p")
maddr = Multiaddr(maddr_str)
go_pinfo = info_from_p2p_addr(maddr)
if destination is None:
await echo_proc.expect("listening for connections", async_=True)
return echo_proc, go_pinfo
@pytest.mark.parametrize("num_hosts", (1,)) class EchoProcess(BaseInteractiveProcess):
@pytest.mark.asyncio port: int
async def test_insecure_conn_py_to_go( _peer_info: PeerInfo
hosts, proc_factory, is_host_secure, unused_tcp_port
):
go_proc, go_pinfo = await make_echo_proc(
proc_factory, unused_tcp_port, is_host_secure
)
host = hosts[0] def __init__(
await host.connect(go_pinfo) self, port: int, is_secure: bool, destination: Multiaddr = None
await go_proc.expect("swarm listener accepted connection", async_=True) ) -> None:
s = await host.new_stream(go_pinfo.peer_id, [ECHO_PROTOCOL_ID]) args = [f"-l={port}"]
if not is_secure:
args.append("-insecure")
if destination is not None:
args.append(f"-d={str(destination)}")
await go_proc.expect("Got a new stream!", async_=True) patterns = [b"I am"]
data = "data321123\n" if destination is None:
await s.write(data.encode()) patterns.append(b"listening for connections")
await go_proc.expect(f"read: {data[:-1]}", async_=True)
echoed_resp = await s.read(len(data)) self.args = args
assert echoed_resp.decode() == data self.cmd = str(ECHO_PATH)
await s.close() self.patterns = patterns
self.bytes_read = bytearray()
self.event_ready = trio.Event()
self.port = port
self._peer_info = None
self.regex_pat = re.compile(br"I am ([\w\./]+)")
@property
def peer_info(self) -> None:
if self._peer_info is not None:
return self._peer_info
if not self.event_ready.is_set():
raise Exception("process is not ready yet. failed to parse the peer info")
# Example:
# b"I am /ip4/127.0.0.1/tcp/56171/ipfs/QmU41TRPs34WWqa1brJEojBLYZKrrBcJq9nyNfVvSrbZUJ\n"
m = re.search(br"I am ([\w\./]+)", self.bytes_read)
if m is None:
raise Exception("failed to find the pattern for the listening multiaddr")
maddr_bytes_str_ipfs = m.group(1)
maddr_str = maddr_bytes_str_ipfs.decode().replace("ipfs", "p2p")
maddr = Multiaddr(maddr_str)
self._peer_info = info_from_p2p_addr(maddr)
return self._peer_info
@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.trio
@pytest.mark.asyncio async def test_insecure_conn_py_to_go(is_host_secure):
async def test_insecure_conn_go_to_py( async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts:
hosts, proc_factory, is_host_secure, unused_tcp_port go_proc = EchoProcess(unused_tcp_port_factory(), is_host_secure)
): await go_proc.start()
host = hosts[0]
expected_data = "Hello, world!\n"
reply_data = "Replyooo!\n"
event_handler_finished = asyncio.Event()
async def _handle_echo(stream): host = hosts[0]
read_data = await stream.read(len(expected_data)) peer_info = go_proc.peer_info
assert read_data == expected_data.encode() await host.connect(peer_info)
event_handler_finished.set() s = await host.new_stream(peer_info.peer_id, [ECHO_PROTOCOL_ID])
await stream.write(reply_data.encode()) data = "data321123\n"
await stream.close() await s.write(data.encode())
echoed_resp = await s.read(len(data))
assert echoed_resp.decode() == data
await s.close()
host.set_stream_handler(ECHO_PROTOCOL_ID, _handle_echo)
py_maddr = host.get_addrs()[0] @pytest.mark.trio
go_proc, _ = await make_echo_proc( async def test_insecure_conn_go_to_py(is_host_secure):
proc_factory, unused_tcp_port, is_host_secure, py_maddr async with HostFactory.create_batch_and_listen(is_host_secure, 1) as hosts:
) host = hosts[0]
await go_proc.expect("connect with peer", async_=True) expected_data = "Hello, world!\n"
await go_proc.expect("opened stream", async_=True) reply_data = "Replyooo!\n"
await event_handler_finished.wait() event_handler_finished = trio.Event()
await go_proc.expect(f"read reply: .*{reply_data.rstrip()}.*", async_=True)
async def _handle_echo(stream):
read_data = await stream.read(len(expected_data))
assert read_data == expected_data.encode()
event_handler_finished.set()
await stream.write(reply_data.encode())
await stream.close()
host.set_stream_handler(ECHO_PROTOCOL_ID, _handle_echo)
py_maddr = host.get_addrs()[0]
go_proc = EchoProcess(unused_tcp_port_factory(), is_host_secure, py_maddr)
await go_proc.start()
await event_handler_finished.wait()

View File

@ -1,6 +1,5 @@
import asyncio
import pytest import pytest
import trio
from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.tools.constants import MAX_READ_LEN from libp2p.tools.constants import MAX_READ_LEN
@ -8,7 +7,7 @@ from libp2p.tools.constants import MAX_READ_LEN
DATA = b"data" DATA = b"data"
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds): async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair stream_py, stream_daemon = py_to_daemon_stream_pair
assert ( assert (
@ -19,19 +18,19 @@ async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds):
assert (await stream_daemon.read(MAX_READ_LEN)) == DATA assert (await stream_daemon.read(MAX_READ_LEN)) == DATA
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_read_after_remote_closed(py_to_daemon_stream_pair, p2pds): async def test_net_stream_read_after_remote_closed(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair stream_py, stream_daemon = py_to_daemon_stream_pair
await stream_daemon.write(DATA) await stream_daemon.write(DATA)
await stream_daemon.close() await stream_daemon.close()
await asyncio.sleep(0.01) await trio.sleep(0.01)
assert (await stream_py.read(MAX_READ_LEN)) == DATA assert (await stream_py.read(MAX_READ_LEN)) == DATA
# EOF # EOF
with pytest.raises(StreamEOF): with pytest.raises(StreamEOF):
await stream_py.read(MAX_READ_LEN) await stream_py.read(MAX_READ_LEN)
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds): async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair stream_py, _ = py_to_daemon_stream_pair
await stream_py.reset() await stream_py.reset()
@ -40,15 +39,15 @@ async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds
@pytest.mark.parametrize("is_to_fail_daemon_stream", (True,)) @pytest.mark.parametrize("is_to_fail_daemon_stream", (True,))
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_read_after_remote_reset(py_to_daemon_stream_pair, p2pds): async def test_net_stream_read_after_remote_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair stream_py, _ = py_to_daemon_stream_pair
await asyncio.sleep(0.01) await trio.sleep(0.01)
with pytest.raises(StreamReset): with pytest.raises(StreamReset):
await stream_py.read(MAX_READ_LEN) await stream_py.read(MAX_READ_LEN)
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2pds): async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair stream_py, _ = py_to_daemon_stream_pair
await stream_py.write(DATA) await stream_py.write(DATA)
@ -57,7 +56,7 @@ async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2p
await stream_py.write(DATA) await stream_py.write(DATA)
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pds): async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair stream_py, stream_daemon = py_to_daemon_stream_pair
await stream_py.reset() await stream_py.reset()
@ -66,9 +65,9 @@ async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pd
@pytest.mark.parametrize("is_to_fail_daemon_stream", (True,)) @pytest.mark.parametrize("is_to_fail_daemon_stream", (True,))
@pytest.mark.asyncio @pytest.mark.trio
async def test_net_stream_write_after_remote_reset(py_to_daemon_stream_pair, p2pds): async def test_net_stream_write_after_remote_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair stream_py, _ = py_to_daemon_stream_pair
await asyncio.sleep(0.01) await trio.sleep(0.01)
with pytest.raises(StreamClosed): with pytest.raises(StreamClosed):
await stream_py.write(DATA) await stream_py.write(DATA)

View File

@ -1,11 +1,15 @@
import asyncio
import functools import functools
import math
from p2pclient.pb import p2pd_pb2 from p2pclient.pb import p2pd_pb2
import pytest import pytest
import trio
from libp2p.io.trio import TrioTCPStream
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.subscription import TrioSubscriptionAPI
from libp2p.tools.factories import PubsubFactory
from libp2p.tools.interop.utils import connect from libp2p.tools.interop.utils import connect
from libp2p.utils import read_varint_prefixed_bytes from libp2p.utils import read_varint_prefixed_bytes
@ -13,26 +17,15 @@ TOPIC_0 = "ABALA"
TOPIC_1 = "YOOOO" TOPIC_1 = "YOOOO"
async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]": async def p2pd_subscribe(p2pd, topic, nursery):
reader, writer = await p2pd.control.pubsub_subscribe(topic) stream = TrioTCPStream(await p2pd.control.pubsub_subscribe(topic))
send_channel, receive_channel = trio.open_memory_channel(math.inf)
queue = asyncio.Queue() sub = TrioSubscriptionAPI(receive_channel)
async def _read_pubsub_msg() -> None: async def _read_pubsub_msg() -> None:
writer_closed_task = asyncio.ensure_future(writer.wait_closed())
while True: while True:
done, pending = await asyncio.wait( msg_bytes = await read_varint_prefixed_bytes(stream)
[read_varint_prefixed_bytes(reader), writer_closed_task],
return_when=asyncio.FIRST_COMPLETED,
)
done_tasks = tuple(done)
if writer.is_closing():
return
read_task = done_tasks[0]
# Sanity check
assert read_task._coro.__name__ == "read_varint_prefixed_bytes"
msg_bytes = read_task.result()
ps_msg = p2pd_pb2.PSMessage() ps_msg = p2pd_pb2.PSMessage()
ps_msg.ParseFromString(msg_bytes) ps_msg.ParseFromString(msg_bytes)
# Fill in the message used in py-libp2p # Fill in the message used in py-libp2p
@ -44,11 +37,10 @@ async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]":
signature=ps_msg.signature, signature=ps_msg.signature,
key=ps_msg.key, key=ps_msg.key,
) )
queue.put_nowait(msg) await send_channel.send(msg)
asyncio.ensure_future(_read_pubsub_msg()) nursery.start_soon(_read_pubsub_msg)
await asyncio.sleep(0) return sub
return queue
def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None: def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None:
@ -59,108 +51,119 @@ def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) ->
"is_pubsub_signing, is_pubsub_signing_strict", ((True, True), (False, False)) "is_pubsub_signing, is_pubsub_signing_strict", ((True, True), (False, False))
) )
@pytest.mark.parametrize("is_gossipsub", (True, False)) @pytest.mark.parametrize("is_gossipsub", (True, False))
@pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) @pytest.mark.parametrize("num_p2pds", (2,))
@pytest.mark.asyncio @pytest.mark.trio
async def test_pubsub(pubsubs, p2pds): async def test_pubsub(
# p2pds, is_gossipsub, is_host_secure, is_pubsub_signing_strict, nursery
# Test: Recognize pubsub peers on connection. ):
# pubsub_factory = None
py_pubsub = pubsubs[0] if is_gossipsub:
# go0 <-> py <-> go1 pubsub_factory = PubsubFactory.create_batch_with_gossipsub
await connect(p2pds[0], py_pubsub.host) else:
await connect(py_pubsub.host, p2pds[1]) pubsub_factory = PubsubFactory.create_batch_with_floodsub
py_peer_id = py_pubsub.host.get_id()
# Check pubsub peers
pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("")
assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id
pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("")
assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id
assert (
len(py_pubsub.peers) == 2
and p2pds[0].peer_id in py_pubsub.peers
and p2pds[1].peer_id in py_pubsub.peers
)
# async with pubsub_factory(
# Test: `subscribe`. 1, is_secure=is_host_secure, strict_signing=is_pubsub_signing_strict
# ) as pubsubs:
# (name, topics) #
# (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1]) # Test: Recognize pubsub peers on connection.
sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0) #
sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1) py_pubsub = pubsubs[0]
sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0) # go0 <-> py <-> go1
sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1) await connect(p2pds[0], py_pubsub.host)
sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1) await connect(py_pubsub.host, p2pds[1])
# Check topic peers py_peer_id = py_pubsub.host.get_id()
await asyncio.sleep(0.1) # Check pubsub peers
# go_0 pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("")
go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0) assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id
assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0] pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("")
go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1) assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id
assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0] assert (
# py len(py_pubsub.peers) == 2
py_topic_0_peers = list(py_pubsub.peer_topics[TOPIC_0]) and p2pds[0].peer_id in py_pubsub.peers
assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0] and p2pds[1].peer_id in py_pubsub.peers
# go_1 )
go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1)
assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0]
# #
# Test: `publish` # Test: `subscribe`.
# #
# 1. py publishes # (name, topics)
# - 1.1. py publishes data_11 to topic_0, py and go_0 receives. # (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1])
# - 1.2. py publishes data_12 to topic_1, all receive. sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0)
# 2. go publishes sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1)
# - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0, nursery)
# - 2.2. go_1 publishes data_22 to topic_1, all receive. sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1, nursery)
sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1, nursery)
# Check topic peers
await trio.sleep(0.1)
# go_0
go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0)
assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0]
go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1)
assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0]
# py
py_topic_0_peers = list(py_pubsub.peer_topics[TOPIC_0])
assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0]
# go_1
go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1)
assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0]
# 1.1. py publishes data_11 to topic_0, py and go_0 receives. #
data_11 = b"data_11" # Test: `publish`
await py_pubsub.publish(TOPIC_0, data_11) #
validate_11 = functools.partial( # 1. py publishes
validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id # - 1.1. py publishes data_11 to topic_0, py and go_0 receives.
) # - 1.2. py publishes data_12 to topic_1, all receive.
validate_11(await sub_py_topic_0.get()) # 2. go publishes
validate_11(await sub_go_0_topic_0.get()) # - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive.
# - 2.2. go_1 publishes data_22 to topic_1, all receive.
# 1.2. py publishes data_12 to topic_1, all receive. # 1.1. py publishes data_11 to topic_0, py and go_0 receives.
data_12 = b"data_12" data_11 = b"data_11"
validate_12 = functools.partial( await py_pubsub.publish(TOPIC_0, data_11)
validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id validate_11 = functools.partial(
) validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id
await py_pubsub.publish(TOPIC_1, data_12) )
validate_12(await sub_py_topic_1.get()) validate_11(await sub_py_topic_0.get())
validate_12(await sub_go_0_topic_1.get()) validate_11(await sub_go_0_topic_0.get())
validate_12(await sub_go_1_topic_1.get())
# 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. # 1.2. py publishes data_12 to topic_1, all receive.
data_21 = b"data_21" data_12 = b"data_12"
validate_21 = functools.partial( validate_12 = functools.partial(
validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id
) )
await p2pds[0].control.pubsub_publish(TOPIC_0, data_21) await py_pubsub.publish(TOPIC_1, data_12)
validate_21(await sub_py_topic_0.get()) validate_12(await sub_py_topic_1.get())
validate_21(await sub_go_0_topic_0.get()) validate_12(await sub_go_0_topic_1.get())
validate_12(await sub_go_1_topic_1.get())
# 2.2. go_1 publishes data_22 to topic_1, all receive. # 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive.
data_22 = b"data_22" data_21 = b"data_21"
validate_22 = functools.partial( validate_21 = functools.partial(
validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id
) )
await p2pds[1].control.pubsub_publish(TOPIC_1, data_22) await p2pds[0].control.pubsub_publish(TOPIC_0, data_21)
validate_22(await sub_py_topic_1.get()) validate_21(await sub_py_topic_0.get())
validate_22(await sub_go_0_topic_1.get()) validate_21(await sub_go_0_topic_0.get())
validate_22(await sub_go_1_topic_1.get())
# # 2.2. go_1 publishes data_22 to topic_1, all receive.
# Test: `unsubscribe` and re`subscribe` data_22 = b"data_22"
# validate_22 = functools.partial(
await py_pubsub.unsubscribe(TOPIC_0) validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id
await asyncio.sleep(0.1) )
assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) await p2pds[1].control.pubsub_publish(TOPIC_1, data_22)
assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) validate_22(await sub_py_topic_1.get())
await py_pubsub.subscribe(TOPIC_0) validate_22(await sub_go_0_topic_1.get())
await asyncio.sleep(0.1) validate_22(await sub_go_1_topic_1.get())
assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0))
assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) #
# Test: `unsubscribe` and re`subscribe`
#
await py_pubsub.unsubscribe(TOPIC_0)
await trio.sleep(0.1)
assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0))
assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0))
await py_pubsub.subscribe(TOPIC_0)
await trio.sleep(0.1)
assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0))
assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0))

View File

@ -12,7 +12,7 @@ envlist =
combine_as_imports=False combine_as_imports=False
force_sort_within_sections=True force_sort_within_sections=True
include_trailing_comma=True include_trailing_comma=True
known_third_party=hypothesis,pytest,p2pclient,pexpect,factory known_third_party=anyio,factory,p2pclient,pytest
known_first_party=libp2p known_first_party=libp2p
line_length=88 line_length=88
multi_line_output=3 multi_line_output=3