diff --git a/interop/README.md b/interop/README.md new file mode 100644 index 00000000..5ecff1f1 --- /dev/null +++ b/interop/README.md @@ -0,0 +1,19 @@ +These commands are to be run in `./interop/exec` + +## Redis + +```bash +docker run -p 6379:6379 -it redis:latest +``` + +## Listener + +```bash +transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +``` + +## Dialer + +```bash +transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +``` diff --git a/interop/__init__.py b/interop/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/interop/arch.py b/interop/arch.py new file mode 100644 index 00000000..5c79283d --- /dev/null +++ b/interop/arch.py @@ -0,0 +1,73 @@ +from dataclasses import ( + dataclass, +) + +import multiaddr +import redis +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.keys import ( + KeyPair, +) +from libp2p.crypto.rsa import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.security.insecure.transport import ( + PLAINTEXT_PROTOCOL_ID, + InsecureTransport, +) +import libp2p.security.secio.transport as secio +from libp2p.stream_muxer.mplex.mplex import ( + MPLEX_PROTOCOL_ID, + Mplex, +) + + +def generate_new_rsa_identity() -> KeyPair: + return create_new_key_pair() + + +async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str): + match (sec_protocol, muxer): + case ("insecure", "mplex"): + key_pair = create_new_key_pair() + host = new_host( + key_pair, + {MPLEX_PROTOCOL_ID: Mplex}, + { + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), + TProtocol(secio.ID): secio.Transport(key_pair), + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case _: + raise ValueError("Protocols not supported") + + +@dataclass +class RedisClient: + client: redis.Redis + + def brpop(self, key: str, timeout: float) -> list[str]: + result = self.client.brpop([key], timeout) + return [result[1]] if result else [] + + def rpush(self, key: str, value: str) -> None: + self.client.rpush(key, value) + + +async def main(): + client = RedisClient(redis.Redis(host="localhost", port=6379, db=0)) + client.rpush("test", "hello") + print(client.blpop("test", timeout=5)) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/exec/config/mod.py b/interop/exec/config/mod.py new file mode 100644 index 00000000..9da19dcb --- /dev/null +++ b/interop/exec/config/mod.py @@ -0,0 +1,57 @@ +from dataclasses import ( + dataclass, +) +import os +from typing import ( + Optional, +) + + +def str_to_bool(val: str) -> bool: + return val.lower() in ("true", "1") + + +class ConfigError(Exception): + """Raised when the required environment variables are missing or invalid""" + + +@dataclass +class Config: + transport: str + sec_protocol: Optional[str] + muxer: Optional[str] + ip: str + is_dialer: bool + test_timeout: int + redis_addr: str + port: str + + @classmethod + def from_env(cls) -> "Config": + try: + transport = os.environ["transport"] + ip = os.environ["ip"] + except KeyError as e: + raise ConfigError(f"{e.args[0]} env variable not set") from None + + try: + is_dialer = str_to_bool(os.environ.get("is_dialer", "true")) + test_timeout = int(os.environ.get("test_timeout", "180")) + except ValueError as e: + raise ConfigError(f"Invalid value in env: {e}") from None + + redis_addr = os.environ.get("redis_addr", 6379) + sec_protocol = os.environ.get("security") + muxer = os.environ.get("muxer") + port = os.environ.get("port", "8000") + + return cls( + transport=transport, + sec_protocol=sec_protocol, + muxer=muxer, + ip=ip, + is_dialer=is_dialer, + test_timeout=test_timeout, + redis_addr=redis_addr, + port=port, + ) diff --git a/interop/exec/native_ping.py b/interop/exec/native_ping.py new file mode 100644 index 00000000..3578d0c6 --- /dev/null +++ b/interop/exec/native_ping.py @@ -0,0 +1,33 @@ +import trio + +from interop.exec.config.mod import ( + Config, + ConfigError, +) +from interop.lib import ( + run_test, +) + + +async def main() -> None: + try: + config = Config.from_env() + except ConfigError as e: + print(f"Config error: {e}") + return + + # Uncomment and implement when ready + _ = await run_test( + config.transport, + config.ip, + config.port, + config.is_dialer, + config.test_timeout, + config.redis_addr, + config.sec_protocol, + config.muxer, + ) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/lib.py b/interop/lib.py new file mode 100644 index 00000000..8c884c3f --- /dev/null +++ b/interop/lib.py @@ -0,0 +1,112 @@ +from dataclasses import ( + dataclass, +) +import json +import time + +from loguru import ( + logger, +) +import multiaddr +import redis +import trio + +from interop.arch import ( + RedisClient, + build_host, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.net_stream import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + +PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") +PING_LENGTH = 32 +RESP_TIMEOUT = 60 + + +async def handle_ping(stream: INetStream) -> None: + while True: + try: + payload = await stream.read(PING_LENGTH) + peer_id = stream.muxed_conn.peer_id + if payload is not None: + print(f"received ping from {peer_id}") + + await stream.write(payload) + print(f"responded with pong to {peer_id}") + + except Exception: + await stream.reset() + break + + +async def send_ping(stream: INetStream) -> None: + try: + payload = b"\x01" * PING_LENGTH + print(f"sending ping to {stream.muxed_conn.peer_id}") + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + if response == payload: + print(f"received pong from {stream.muxed_conn.peer_id}") + + except Exception as e: + print(f"error occurred: {e}") + + +async def run_test( + transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer +): + logger.info("Starting run_test") + + redis_client = RedisClient( + redis.Redis(host="localhost", port=int(redis_addr), db=0) + ) + (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer) + logger.info(f"Running ping test local_peer={host.get_id()}") + + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + if not is_dialer: + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + ma = f"{listen_addr}/p2p/{host.get_id().pretty()}" + redis_client.rpush("listenerAddr", ma) + + logger.info(f"Test instance, listening: {ma}") + else: + redis_addr = redis_client.brpop("listenerAddr", timeout=5) + destination = redis_addr[0].decode() + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + + handshake_start = time.perf_counter() + + await host.connect(info) + stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) + + logger.info("Remote conection established") + nursery.start_soon(send_ping, stream) + + handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 + + logger.info(f"handshake time: {handshake_plus_ping:.2f}ms") + return + + await trio.sleep_forever() + + +@dataclass +class Report: + handshake_plus_one_rtt_millis: float + ping_rtt_millis: float + + def gen_report(self): + return json.dumps(self.__dict__) diff --git a/setup.py b/setup.py index a23d811a..dd736318 100644 --- a/setup.py +++ b/setup.py @@ -37,10 +37,14 @@ extras_require = { "pytest-trio>=0.5.2", "factory-boy>=2.12.0,<3.0.0", ], + "interop": ["redis==6.1.0", "logging==0.4.9.6" "loguru==0.7.3"], } extras_require["dev"] = ( - extras_require["dev"] + extras_require["docs"] + extras_require["test"] + extras_require["dev"] + + extras_require["docs"] + + extras_require["test"] + + extras_require["interop"] ) try: