mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
interop utilities for mplex ping
This commit is contained in:
19
interop/README.md
Normal file
19
interop/README.md
Normal file
@ -0,0 +1,19 @@
|
||||
These commands are to be run in `./interop/exec`
|
||||
|
||||
## Redis
|
||||
|
||||
```bash
|
||||
docker run -p 6379:6379 -it redis:latest
|
||||
```
|
||||
|
||||
## Listener
|
||||
|
||||
```bash
|
||||
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
||||
```
|
||||
|
||||
## Dialer
|
||||
|
||||
```bash
|
||||
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
||||
```
|
||||
0
interop/__init__.py
Normal file
0
interop/__init__.py
Normal file
73
interop/arch.py
Normal file
73
interop/arch.py
Normal file
@ -0,0 +1,73 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
|
||||
import multiaddr
|
||||
import redis
|
||||
import trio
|
||||
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.crypto.keys import (
|
||||
KeyPair,
|
||||
)
|
||||
from libp2p.crypto.rsa import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.security.insecure.transport import (
|
||||
PLAINTEXT_PROTOCOL_ID,
|
||||
InsecureTransport,
|
||||
)
|
||||
import libp2p.security.secio.transport as secio
|
||||
from libp2p.stream_muxer.mplex.mplex import (
|
||||
MPLEX_PROTOCOL_ID,
|
||||
Mplex,
|
||||
)
|
||||
|
||||
|
||||
def generate_new_rsa_identity() -> KeyPair:
|
||||
return create_new_key_pair()
|
||||
|
||||
|
||||
async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
|
||||
match (sec_protocol, muxer):
|
||||
case ("insecure", "mplex"):
|
||||
key_pair = create_new_key_pair()
|
||||
host = new_host(
|
||||
key_pair,
|
||||
{MPLEX_PROTOCOL_ID: Mplex},
|
||||
{
|
||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
||||
},
|
||||
)
|
||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
||||
return (host, muladdr)
|
||||
case _:
|
||||
raise ValueError("Protocols not supported")
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedisClient:
|
||||
client: redis.Redis
|
||||
|
||||
def brpop(self, key: str, timeout: float) -> list[str]:
|
||||
result = self.client.brpop([key], timeout)
|
||||
return [result[1]] if result else []
|
||||
|
||||
def rpush(self, key: str, value: str) -> None:
|
||||
self.client.rpush(key, value)
|
||||
|
||||
|
||||
async def main():
|
||||
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
|
||||
client.rpush("test", "hello")
|
||||
print(client.blpop("test", timeout=5))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(main)
|
||||
57
interop/exec/config/mod.py
Normal file
57
interop/exec/config/mod.py
Normal file
@ -0,0 +1,57 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
import os
|
||||
from typing import (
|
||||
Optional,
|
||||
)
|
||||
|
||||
|
||||
def str_to_bool(val: str) -> bool:
|
||||
return val.lower() in ("true", "1")
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
"""Raised when the required environment variables are missing or invalid"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
transport: str
|
||||
sec_protocol: Optional[str]
|
||||
muxer: Optional[str]
|
||||
ip: str
|
||||
is_dialer: bool
|
||||
test_timeout: int
|
||||
redis_addr: str
|
||||
port: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Config":
|
||||
try:
|
||||
transport = os.environ["transport"]
|
||||
ip = os.environ["ip"]
|
||||
except KeyError as e:
|
||||
raise ConfigError(f"{e.args[0]} env variable not set") from None
|
||||
|
||||
try:
|
||||
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
|
||||
test_timeout = int(os.environ.get("test_timeout", "180"))
|
||||
except ValueError as e:
|
||||
raise ConfigError(f"Invalid value in env: {e}") from None
|
||||
|
||||
redis_addr = os.environ.get("redis_addr", 6379)
|
||||
sec_protocol = os.environ.get("security")
|
||||
muxer = os.environ.get("muxer")
|
||||
port = os.environ.get("port", "8000")
|
||||
|
||||
return cls(
|
||||
transport=transport,
|
||||
sec_protocol=sec_protocol,
|
||||
muxer=muxer,
|
||||
ip=ip,
|
||||
is_dialer=is_dialer,
|
||||
test_timeout=test_timeout,
|
||||
redis_addr=redis_addr,
|
||||
port=port,
|
||||
)
|
||||
33
interop/exec/native_ping.py
Normal file
33
interop/exec/native_ping.py
Normal file
@ -0,0 +1,33 @@
|
||||
import trio
|
||||
|
||||
from interop.exec.config.mod import (
|
||||
Config,
|
||||
ConfigError,
|
||||
)
|
||||
from interop.lib import (
|
||||
run_test,
|
||||
)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
try:
|
||||
config = Config.from_env()
|
||||
except ConfigError as e:
|
||||
print(f"Config error: {e}")
|
||||
return
|
||||
|
||||
# Uncomment and implement when ready
|
||||
_ = await run_test(
|
||||
config.transport,
|
||||
config.ip,
|
||||
config.port,
|
||||
config.is_dialer,
|
||||
config.test_timeout,
|
||||
config.redis_addr,
|
||||
config.sec_protocol,
|
||||
config.muxer,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(main)
|
||||
112
interop/lib.py
Normal file
112
interop/lib.py
Normal file
@ -0,0 +1,112 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
import json
|
||||
import time
|
||||
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
import multiaddr
|
||||
import redis
|
||||
import trio
|
||||
|
||||
from interop.arch import (
|
||||
RedisClient,
|
||||
build_host,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.network.stream.net_stream import (
|
||||
INetStream,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
|
||||
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
|
||||
PING_LENGTH = 32
|
||||
RESP_TIMEOUT = 60
|
||||
|
||||
|
||||
async def handle_ping(stream: INetStream) -> None:
|
||||
while True:
|
||||
try:
|
||||
payload = await stream.read(PING_LENGTH)
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
if payload is not None:
|
||||
print(f"received ping from {peer_id}")
|
||||
|
||||
await stream.write(payload)
|
||||
print(f"responded with pong to {peer_id}")
|
||||
|
||||
except Exception:
|
||||
await stream.reset()
|
||||
break
|
||||
|
||||
|
||||
async def send_ping(stream: INetStream) -> None:
|
||||
try:
|
||||
payload = b"\x01" * PING_LENGTH
|
||||
print(f"sending ping to {stream.muxed_conn.peer_id}")
|
||||
|
||||
await stream.write(payload)
|
||||
|
||||
with trio.fail_after(RESP_TIMEOUT):
|
||||
response = await stream.read(PING_LENGTH)
|
||||
|
||||
if response == payload:
|
||||
print(f"received pong from {stream.muxed_conn.peer_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"error occurred: {e}")
|
||||
|
||||
|
||||
async def run_test(
|
||||
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
|
||||
):
|
||||
logger.info("Starting run_test")
|
||||
|
||||
redis_client = RedisClient(
|
||||
redis.Redis(host="localhost", port=int(redis_addr), db=0)
|
||||
)
|
||||
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
|
||||
logger.info(f"Running ping test local_peer={host.get_id()}")
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
if not is_dialer:
|
||||
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
||||
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
|
||||
redis_client.rpush("listenerAddr", ma)
|
||||
|
||||
logger.info(f"Test instance, listening: {ma}")
|
||||
else:
|
||||
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
|
||||
destination = redis_addr[0].decode()
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
|
||||
handshake_start = time.perf_counter()
|
||||
|
||||
await host.connect(info)
|
||||
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
|
||||
|
||||
logger.info("Remote conection established")
|
||||
nursery.start_soon(send_ping, stream)
|
||||
|
||||
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
|
||||
|
||||
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
|
||||
return
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Report:
|
||||
handshake_plus_one_rtt_millis: float
|
||||
ping_rtt_millis: float
|
||||
|
||||
def gen_report(self):
|
||||
return json.dumps(self.__dict__)
|
||||
6
setup.py
6
setup.py
@ -37,10 +37,14 @@ extras_require = {
|
||||
"pytest-trio>=0.5.2",
|
||||
"factory-boy>=2.12.0,<3.0.0",
|
||||
],
|
||||
"interop": ["redis==6.1.0", "logging==0.4.9.6" "loguru==0.7.3"],
|
||||
}
|
||||
|
||||
extras_require["dev"] = (
|
||||
extras_require["dev"] + extras_require["docs"] + extras_require["test"]
|
||||
extras_require["dev"]
|
||||
+ extras_require["docs"]
|
||||
+ extras_require["test"]
|
||||
+ extras_require["interop"]
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user