mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Compare commits
8 Commits
py-rust-in
...
varun-r-ma
| Author | SHA1 | Date | |
|---|---|---|---|
| 5983c08379 | |||
| d020bbc066 | |||
| 00f10dbec3 | |||
| d75886b180 | |||
| d4785b9e26 | |||
| cef217358f | |||
| 338672214c | |||
| c2046e6aa4 |
2
Makefile
2
Makefile
@ -38,7 +38,7 @@ lint:
|
|||||||
)
|
)
|
||||||
|
|
||||||
test:
|
test:
|
||||||
python -m pytest tests -n auto
|
python -m pytest tests
|
||||||
|
|
||||||
# protobufs management
|
# protobufs management
|
||||||
|
|
||||||
|
|||||||
@ -1,19 +0,0 @@
|
|||||||
These commands are to be run in `./interop/exec`
|
|
||||||
|
|
||||||
## Redis
|
|
||||||
|
|
||||||
```bash
|
|
||||||
docker run -p 6379:6379 -it redis:latest
|
|
||||||
```
|
|
||||||
|
|
||||||
## Listener
|
|
||||||
|
|
||||||
```bash
|
|
||||||
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
|
||||||
```
|
|
||||||
|
|
||||||
## Dialer
|
|
||||||
|
|
||||||
```bash
|
|
||||||
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
|
||||||
```
|
|
||||||
107
interop/arch.py
107
interop/arch.py
@ -1,107 +0,0 @@
|
|||||||
from dataclasses import (
|
|
||||||
dataclass,
|
|
||||||
)
|
|
||||||
|
|
||||||
import multiaddr
|
|
||||||
import redis
|
|
||||||
import trio
|
|
||||||
|
|
||||||
from libp2p import (
|
|
||||||
new_host,
|
|
||||||
)
|
|
||||||
from libp2p.crypto.keys import (
|
|
||||||
KeyPair,
|
|
||||||
)
|
|
||||||
from libp2p.crypto.rsa import (
|
|
||||||
create_new_key_pair,
|
|
||||||
)
|
|
||||||
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
|
|
||||||
from libp2p.custom_types import (
|
|
||||||
TProtocol,
|
|
||||||
)
|
|
||||||
from libp2p.security.insecure.transport import (
|
|
||||||
PLAINTEXT_PROTOCOL_ID,
|
|
||||||
InsecureTransport,
|
|
||||||
)
|
|
||||||
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
|
|
||||||
from libp2p.security.noise.transport import Transport as NoiseTransport
|
|
||||||
import libp2p.security.secio.transport as secio
|
|
||||||
from libp2p.stream_muxer.mplex.mplex import (
|
|
||||||
MPLEX_PROTOCOL_ID,
|
|
||||||
Mplex,
|
|
||||||
)
|
|
||||||
from libp2p.stream_muxer.yamux.yamux import (
|
|
||||||
Yamux,
|
|
||||||
)
|
|
||||||
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
|
|
||||||
|
|
||||||
|
|
||||||
def generate_new_rsa_identity() -> KeyPair:
|
|
||||||
return create_new_key_pair()
|
|
||||||
|
|
||||||
|
|
||||||
async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
|
|
||||||
match (sec_protocol, muxer):
|
|
||||||
case ("insecure", "mplex"):
|
|
||||||
key_pair = create_new_key_pair()
|
|
||||||
host = new_host(
|
|
||||||
key_pair,
|
|
||||||
{TProtocol(MPLEX_PROTOCOL_ID): Mplex},
|
|
||||||
{
|
|
||||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
|
||||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
|
||||||
return (host, muladdr)
|
|
||||||
case ("insecure", "yamux"):
|
|
||||||
key_pair = create_new_key_pair()
|
|
||||||
host = new_host(
|
|
||||||
key_pair,
|
|
||||||
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
|
|
||||||
{
|
|
||||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
|
||||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
|
||||||
return (host, muladdr)
|
|
||||||
case ("noise", "yamux"):
|
|
||||||
key_pair = create_new_key_pair()
|
|
||||||
noise_key_pair = create_new_x25519_key_pair()
|
|
||||||
|
|
||||||
host = new_host(
|
|
||||||
key_pair,
|
|
||||||
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
|
|
||||||
{
|
|
||||||
NOISE_PROTOCOL_ID: NoiseTransport(
|
|
||||||
key_pair, noise_privkey=noise_key_pair.private_key
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
|
||||||
return (host, muladdr)
|
|
||||||
case _:
|
|
||||||
raise ValueError("Protocols not supported")
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class RedisClient:
|
|
||||||
client: redis.Redis
|
|
||||||
|
|
||||||
def brpop(self, key: str, timeout: float) -> list[str]:
|
|
||||||
result = self.client.brpop([key], timeout)
|
|
||||||
return [result[1]] if result else []
|
|
||||||
|
|
||||||
def rpush(self, key: str, value: str) -> None:
|
|
||||||
self.client.rpush(key, value)
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
|
|
||||||
client.rpush("test", "hello")
|
|
||||||
print(client.blpop("test", timeout=5))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
trio.run(main)
|
|
||||||
@ -1,57 +0,0 @@
|
|||||||
from dataclasses import (
|
|
||||||
dataclass,
|
|
||||||
)
|
|
||||||
import os
|
|
||||||
from typing import (
|
|
||||||
Optional,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def str_to_bool(val: str) -> bool:
|
|
||||||
return val.lower() in ("true", "1")
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigError(Exception):
|
|
||||||
"""Raised when the required environment variables are missing or invalid"""
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Config:
|
|
||||||
transport: str
|
|
||||||
sec_protocol: Optional[str]
|
|
||||||
muxer: Optional[str]
|
|
||||||
ip: str
|
|
||||||
is_dialer: bool
|
|
||||||
test_timeout: int
|
|
||||||
redis_addr: str
|
|
||||||
port: str
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_env(cls) -> "Config":
|
|
||||||
try:
|
|
||||||
transport = os.environ["transport"]
|
|
||||||
ip = os.environ["ip"]
|
|
||||||
except KeyError as e:
|
|
||||||
raise ConfigError(f"{e.args[0]} env variable not set") from None
|
|
||||||
|
|
||||||
try:
|
|
||||||
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
|
|
||||||
test_timeout = int(os.environ.get("test_timeout", "180"))
|
|
||||||
except ValueError as e:
|
|
||||||
raise ConfigError(f"Invalid value in env: {e}") from None
|
|
||||||
|
|
||||||
redis_addr = os.environ.get("redis_addr", 6379)
|
|
||||||
sec_protocol = os.environ.get("security")
|
|
||||||
muxer = os.environ.get("muxer")
|
|
||||||
port = os.environ.get("port", "8000")
|
|
||||||
|
|
||||||
return cls(
|
|
||||||
transport=transport,
|
|
||||||
sec_protocol=sec_protocol,
|
|
||||||
muxer=muxer,
|
|
||||||
ip=ip,
|
|
||||||
is_dialer=is_dialer,
|
|
||||||
test_timeout=test_timeout,
|
|
||||||
redis_addr=redis_addr,
|
|
||||||
port=port,
|
|
||||||
)
|
|
||||||
@ -1,33 +0,0 @@
|
|||||||
import trio
|
|
||||||
|
|
||||||
from interop.exec.config.mod import (
|
|
||||||
Config,
|
|
||||||
ConfigError,
|
|
||||||
)
|
|
||||||
from interop.lib import (
|
|
||||||
run_test,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
|
||||||
try:
|
|
||||||
config = Config.from_env()
|
|
||||||
except ConfigError as e:
|
|
||||||
print(f"Config error: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Uncomment and implement when ready
|
|
||||||
_ = await run_test(
|
|
||||||
config.transport,
|
|
||||||
config.ip,
|
|
||||||
config.port,
|
|
||||||
config.is_dialer,
|
|
||||||
config.test_timeout,
|
|
||||||
config.redis_addr,
|
|
||||||
config.sec_protocol,
|
|
||||||
config.muxer,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
trio.run(main)
|
|
||||||
120
interop/lib.py
120
interop/lib.py
@ -1,120 +0,0 @@
|
|||||||
from dataclasses import (
|
|
||||||
dataclass,
|
|
||||||
)
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
|
|
||||||
from loguru import (
|
|
||||||
logger,
|
|
||||||
)
|
|
||||||
import multiaddr
|
|
||||||
import redis
|
|
||||||
import trio
|
|
||||||
|
|
||||||
from interop.arch import (
|
|
||||||
RedisClient,
|
|
||||||
build_host,
|
|
||||||
)
|
|
||||||
from libp2p.custom_types import (
|
|
||||||
TProtocol,
|
|
||||||
)
|
|
||||||
from libp2p.network.stream.net_stream import (
|
|
||||||
INetStream,
|
|
||||||
)
|
|
||||||
from libp2p.peer.peerinfo import (
|
|
||||||
info_from_p2p_addr,
|
|
||||||
)
|
|
||||||
|
|
||||||
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
|
|
||||||
PING_LENGTH = 32
|
|
||||||
RESP_TIMEOUT = 60
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_ping(stream: INetStream) -> None:
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
payload = await stream.read(PING_LENGTH)
|
|
||||||
peer_id = stream.muxed_conn.peer_id
|
|
||||||
if payload is not None:
|
|
||||||
print(f"received ping from {peer_id}")
|
|
||||||
|
|
||||||
await stream.write(payload)
|
|
||||||
print(f"responded with pong to {peer_id}")
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
await stream.reset()
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
async def send_ping(stream: INetStream) -> None:
|
|
||||||
try:
|
|
||||||
payload = b"\x01" * PING_LENGTH
|
|
||||||
print(f"sending ping to {stream.muxed_conn.peer_id}")
|
|
||||||
|
|
||||||
await stream.write(payload)
|
|
||||||
|
|
||||||
with trio.fail_after(RESP_TIMEOUT):
|
|
||||||
response = await stream.read(PING_LENGTH)
|
|
||||||
|
|
||||||
if response == payload:
|
|
||||||
print(f"received pong from {stream.muxed_conn.peer_id}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"error occurred: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
async def run_test(
|
|
||||||
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
|
|
||||||
):
|
|
||||||
logger.info("Starting run_test")
|
|
||||||
|
|
||||||
redis_client = RedisClient(
|
|
||||||
redis.Redis(host="localhost", port=int(redis_addr), db=0)
|
|
||||||
)
|
|
||||||
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
|
|
||||||
logger.info(f"Running ping test local_peer={host.get_id()}")
|
|
||||||
|
|
||||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
|
||||||
if not is_dialer:
|
|
||||||
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
|
||||||
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
|
|
||||||
redis_client.rpush("listenerAddr", ma)
|
|
||||||
|
|
||||||
logger.info(f"Test instance, listening: {ma}")
|
|
||||||
else:
|
|
||||||
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
|
|
||||||
destination = redis_addr[0].decode()
|
|
||||||
maddr = multiaddr.Multiaddr(destination)
|
|
||||||
info = info_from_p2p_addr(maddr)
|
|
||||||
|
|
||||||
handshake_start = time.perf_counter()
|
|
||||||
|
|
||||||
logger.info("GETTING READY FOR CONNECTION")
|
|
||||||
await host.connect(info)
|
|
||||||
logger.info("HOST CONNECTED")
|
|
||||||
|
|
||||||
# TILL HERE EVERYTHING IS FINE
|
|
||||||
|
|
||||||
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
|
|
||||||
logger.info("CREATED NEW STREAM")
|
|
||||||
|
|
||||||
# DOES NOT MORE FORWARD FROM THIS
|
|
||||||
logger.info("Remote conection established")
|
|
||||||
|
|
||||||
nursery.start_soon(send_ping, stream)
|
|
||||||
|
|
||||||
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
|
|
||||||
|
|
||||||
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
|
|
||||||
return
|
|
||||||
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Report:
|
|
||||||
handshake_plus_one_rtt_millis: float
|
|
||||||
ping_rtt_millis: float
|
|
||||||
|
|
||||||
def gen_report(self):
|
|
||||||
return json.dumps(self.__dict__)
|
|
||||||
@ -5,6 +5,7 @@ from collections.abc import (
|
|||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager,
|
asynccontextmanager,
|
||||||
)
|
)
|
||||||
|
import logging
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Optional,
|
Optional,
|
||||||
@ -67,10 +68,7 @@ if TYPE_CHECKING:
|
|||||||
# telling it to listen on the given listen addresses.
|
# telling it to listen on the given listen addresses.
|
||||||
|
|
||||||
|
|
||||||
# logger = logging.getLogger("libp2p.network.basic_host")
|
logger = logging.getLogger("libp2p.network.basic_host")
|
||||||
from loguru import (
|
|
||||||
logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class BasicHost(IHost):
|
class BasicHost(IHost):
|
||||||
@ -183,15 +181,12 @@ class BasicHost(IHost):
|
|||||||
:return: stream: new stream created
|
:return: stream: new stream created
|
||||||
"""
|
"""
|
||||||
net_stream = await self._network.new_stream(peer_id)
|
net_stream = await self._network.new_stream(peer_id)
|
||||||
logger.info("INETSTREAM CHECKING IN")
|
|
||||||
logger.info(protocol_ids)
|
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
try:
|
try:
|
||||||
logger.debug("PROTOCOLS TRYING TO GET SENT")
|
|
||||||
selected_protocol = await self.multiselect_client.select_one_of(
|
selected_protocol = await self.multiselect_client.select_one_of(
|
||||||
list(protocol_ids), MultiselectCommunicator(net_stream)
|
list(protocol_ids), MultiselectCommunicator(net_stream)
|
||||||
)
|
)
|
||||||
logger.info("PROTOCOLS GOT SENT")
|
|
||||||
except MultiselectClientError as error:
|
except MultiselectClientError as error:
|
||||||
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
||||||
await net_stream.reset()
|
await net_stream.reset()
|
||||||
|
|||||||
@ -1,11 +1,8 @@
|
|||||||
|
import logging
|
||||||
from typing import (
|
from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
)
|
)
|
||||||
|
|
||||||
# logger = logging.getLogger("libp2p.network.swarm")
|
|
||||||
from loguru import (
|
|
||||||
logger,
|
|
||||||
)
|
|
||||||
from multiaddr import (
|
from multiaddr import (
|
||||||
Multiaddr,
|
Multiaddr,
|
||||||
)
|
)
|
||||||
@ -58,6 +55,8 @@ from .exceptions import (
|
|||||||
SwarmException,
|
SwarmException,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger("libp2p.network.swarm")
|
||||||
|
|
||||||
|
|
||||||
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
|
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
|
||||||
async def stream_handler(stream: INetStream) -> None:
|
async def stream_handler(stream: INetStream) -> None:
|
||||||
@ -131,7 +130,6 @@ class Swarm(Service, INetworkService):
|
|||||||
:return: muxed connection
|
:return: muxed connection
|
||||||
"""
|
"""
|
||||||
if peer_id in self.connections:
|
if peer_id in self.connections:
|
||||||
logger.info("WE ARE RETURNING, PEER ALREADAY EXISTS")
|
|
||||||
# If muxed connection already exists for peer_id,
|
# If muxed connection already exists for peer_id,
|
||||||
# set muxed connection equal to existing muxed connection
|
# set muxed connection equal to existing muxed connection
|
||||||
return self.connections[peer_id]
|
return self.connections[peer_id]
|
||||||
@ -152,7 +150,6 @@ class Swarm(Service, INetworkService):
|
|||||||
# Try all known addresses
|
# Try all known addresses
|
||||||
for multiaddr in addrs:
|
for multiaddr in addrs:
|
||||||
try:
|
try:
|
||||||
logger.info("HANDSHAKE GOING TO HAPPEN")
|
|
||||||
return await self.dial_addr(multiaddr, peer_id)
|
return await self.dial_addr(multiaddr, peer_id)
|
||||||
except SwarmException as e:
|
except SwarmException as e:
|
||||||
exceptions.append(e)
|
exceptions.append(e)
|
||||||
@ -227,11 +224,8 @@ class Swarm(Service, INetworkService):
|
|||||||
logger.debug("attempting to open a stream to peer %s", peer_id)
|
logger.debug("attempting to open a stream to peer %s", peer_id)
|
||||||
|
|
||||||
swarm_conn = await self.dial_peer(peer_id)
|
swarm_conn = await self.dial_peer(peer_id)
|
||||||
logger.info("INETCONN CREATED")
|
|
||||||
|
|
||||||
net_stream = await swarm_conn.new_stream()
|
net_stream = await swarm_conn.new_stream()
|
||||||
logger.info("INETSTREAM CREATED")
|
|
||||||
|
|
||||||
logger.debug("successfully opened a stream to peer %s", peer_id)
|
logger.debug("successfully opened a stream to peer %s", peer_id)
|
||||||
return net_stream
|
return net_stream
|
||||||
|
|
||||||
|
|||||||
@ -2,10 +2,6 @@ from collections.abc import (
|
|||||||
Sequence,
|
Sequence,
|
||||||
)
|
)
|
||||||
|
|
||||||
from loguru import (
|
|
||||||
logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
from libp2p.abc import (
|
from libp2p.abc import (
|
||||||
IMultiselectClient,
|
IMultiselectClient,
|
||||||
IMultiselectCommunicator,
|
IMultiselectCommunicator,
|
||||||
@ -40,15 +36,11 @@ class MultiselectClient(IMultiselectClient):
|
|||||||
try:
|
try:
|
||||||
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
||||||
except MultiselectCommunicatorError as error:
|
except MultiselectCommunicatorError as error:
|
||||||
logger.error("WROTE FAIL")
|
|
||||||
raise MultiselectClientError() from error
|
raise MultiselectClientError() from error
|
||||||
|
|
||||||
logger.info(f"WROTE SUC, {MULTISELECT_PROTOCOL_ID}")
|
|
||||||
try:
|
try:
|
||||||
handshake_contents = await communicator.read()
|
handshake_contents = await communicator.read()
|
||||||
logger.info(f"READ SUC, {handshake_contents}")
|
|
||||||
except MultiselectCommunicatorError as error:
|
except MultiselectCommunicatorError as error:
|
||||||
logger.error(f"READ FAIL, {error}")
|
|
||||||
raise MultiselectClientError() from error
|
raise MultiselectClientError() from error
|
||||||
|
|
||||||
if not is_valid_handshake(handshake_contents):
|
if not is_valid_handshake(handshake_contents):
|
||||||
@ -67,12 +59,9 @@ class MultiselectClient(IMultiselectClient):
|
|||||||
:return: selected protocol
|
:return: selected protocol
|
||||||
:raise MultiselectClientError: raised when protocol negotiation failed
|
:raise MultiselectClientError: raised when protocol negotiation failed
|
||||||
"""
|
"""
|
||||||
logger.info("TRYING TO GET THE HANDSHAKE HAPPENED")
|
|
||||||
await self.handshake(communicator)
|
await self.handshake(communicator)
|
||||||
logger.info("HANDSHAKE HAPPENED")
|
|
||||||
|
|
||||||
for protocol in protocols:
|
for protocol in protocols:
|
||||||
logger.info(protocol)
|
|
||||||
try:
|
try:
|
||||||
selected_protocol = await self.try_select(communicator, protocol)
|
selected_protocol = await self.try_select(communicator, protocol)
|
||||||
return selected_protocol
|
return selected_protocol
|
||||||
@ -124,17 +113,11 @@ class MultiselectClient(IMultiselectClient):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
await communicator.write(protocol)
|
await communicator.write(protocol)
|
||||||
from loguru import (
|
|
||||||
logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(protocol)
|
|
||||||
except MultiselectCommunicatorError as error:
|
except MultiselectCommunicatorError as error:
|
||||||
raise MultiselectClientError() from error
|
raise MultiselectClientError() from error
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = await communicator.read()
|
response = await communicator.read()
|
||||||
logger.info("Response: ", response)
|
|
||||||
except MultiselectCommunicatorError as error:
|
except MultiselectCommunicatorError as error:
|
||||||
raise MultiselectClientError() from error
|
raise MultiselectClientError() from error
|
||||||
|
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from collections.abc import (
|
|||||||
)
|
)
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
DefaultDict,
|
DefaultDict,
|
||||||
@ -80,8 +81,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
# The protocol peer supports
|
# The protocol peer supports
|
||||||
peer_protocol: dict[ID, TProtocol]
|
peer_protocol: dict[ID, TProtocol]
|
||||||
|
|
||||||
# TODO: Add `time_since_last_publish`
|
time_since_last_publish: dict[str, int]
|
||||||
# Create topic --> time since last publish map.
|
|
||||||
|
|
||||||
mcache: MessageCache
|
mcache: MessageCache
|
||||||
|
|
||||||
@ -138,6 +138,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.direct_peers[direct_peer.peer_id] = direct_peer
|
self.direct_peers[direct_peer.peer_id] = direct_peer
|
||||||
self.direct_connect_interval = direct_connect_interval
|
self.direct_connect_interval = direct_connect_interval
|
||||||
self.direct_connect_initial_delay = direct_connect_initial_delay
|
self.direct_connect_initial_delay = direct_connect_initial_delay
|
||||||
|
self.time_since_last_publish = {}
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
if self.pubsub is None:
|
if self.pubsub is None:
|
||||||
@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
||||||
self.pubsub._handle_dead_peer(peer_id)
|
self.pubsub._handle_dead_peer(peer_id)
|
||||||
|
for topic in pubsub_msg.topicIDs:
|
||||||
|
self.time_since_last_publish[topic] = int(time.time())
|
||||||
|
|
||||||
def _get_peers_to_send(
|
def _get_peers_to_send(
|
||||||
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
||||||
@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
await self.emit_graft(topic, peer)
|
await self.emit_graft(topic, peer)
|
||||||
|
|
||||||
self.fanout.pop(topic, None)
|
self.fanout.pop(topic, None)
|
||||||
|
self.time_since_last_publish.pop(topic, None)
|
||||||
|
|
||||||
async def leave(self, topic: str) -> None:
|
async def leave(self, topic: str) -> None:
|
||||||
# Note: the comments here are the near-exact algorithm description from the spec
|
# Note: the comments here are the near-exact algorithm description from the spec
|
||||||
@ -514,10 +518,12 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
def fanout_heartbeat(self) -> None:
|
def fanout_heartbeat(self) -> None:
|
||||||
# Note: the comments here are the exact pseudocode from the spec
|
# Note: the comments here are the exact pseudocode from the spec
|
||||||
for topic in self.fanout:
|
for topic in list(self.fanout):
|
||||||
# Delete topic entry if it's not in `pubsub.peer_topics`
|
if (
|
||||||
# or (TODO) if it's time-since-last-published > ttl
|
topic not in self.pubsub.peer_topics
|
||||||
if topic not in self.pubsub.peer_topics:
|
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
|
||||||
|
< int(time.time())
|
||||||
|
):
|
||||||
# Remove topic from fanout
|
# Remove topic from fanout
|
||||||
del self.fanout[topic]
|
del self.fanout[topic]
|
||||||
else:
|
else:
|
||||||
|
|||||||
1
newsfragments/636.feature.rst
Normal file
1
newsfragments/636.feature.rst
Normal file
@ -0,0 +1 @@
|
|||||||
|
feat: add method to compute time since last message published by a peer and remove fanout peers based on ttl.
|
||||||
7
setup.py
7
setup.py
@ -37,14 +37,10 @@ extras_require = {
|
|||||||
"pytest-trio>=0.5.2",
|
"pytest-trio>=0.5.2",
|
||||||
"factory-boy>=2.12.0,<3.0.0",
|
"factory-boy>=2.12.0,<3.0.0",
|
||||||
],
|
],
|
||||||
"interop": ["redis==6.1.0", "logging==0.4.9.6", "loguru==0.7.3"],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extras_require["dev"] = (
|
extras_require["dev"] = (
|
||||||
extras_require["dev"]
|
extras_require["dev"] + extras_require["docs"] + extras_require["test"]
|
||||||
+ extras_require["docs"]
|
|
||||||
+ extras_require["test"]
|
|
||||||
+ extras_require["interop"]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -69,7 +65,6 @@ install_requires = [
|
|||||||
"rpcudp>=3.0.0",
|
"rpcudp>=3.0.0",
|
||||||
"trio-typing>=0.0.4",
|
"trio-typing>=0.0.4",
|
||||||
"trio>=0.26.0",
|
"trio>=0.26.0",
|
||||||
"loguru>=0.7.3",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
# Add platform-specific dependencies
|
# Add platform-specific dependencies
|
||||||
|
|||||||
@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import (
|
|||||||
@pytest.mark.trio
|
@pytest.mark.trio
|
||||||
async def test_join():
|
async def test_join():
|
||||||
async with PubsubFactory.create_batch_with_gossipsub(
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
4, degree=4, degree_low=3, degree_high=5
|
4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1
|
||||||
) as pubsubs_gsub:
|
) as pubsubs_gsub:
|
||||||
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
|
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
|
||||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||||
hosts_indices = list(range(len(pubsubs_gsub)))
|
hosts_indices = list(range(len(pubsubs_gsub)))
|
||||||
|
|
||||||
topic = "test_join"
|
topic = "test_join"
|
||||||
|
to_drop_topic = "test_drop_topic"
|
||||||
central_node_index = 0
|
central_node_index = 0
|
||||||
# Remove index of central host from the indices
|
# Remove index of central host from the indices
|
||||||
hosts_indices.remove(central_node_index)
|
hosts_indices.remove(central_node_index)
|
||||||
@ -42,23 +43,31 @@ async def test_join():
|
|||||||
# Connect central host to all other hosts
|
# Connect central host to all other hosts
|
||||||
await one_to_all_connect(hosts, central_node_index)
|
await one_to_all_connect(hosts, central_node_index)
|
||||||
|
|
||||||
# Wait 2 seconds for heartbeat to allow mesh to connect
|
# Wait 1 seconds for heartbeat to allow mesh to connect
|
||||||
await trio.sleep(2)
|
await trio.sleep(1)
|
||||||
|
|
||||||
# Central node publish to the topic so that this topic
|
# Central node publish to the topic so that this topic
|
||||||
# is added to central node's fanout
|
# is added to central node's fanout
|
||||||
# publish from the randomly chosen host
|
# publish from the randomly chosen host
|
||||||
await pubsubs_gsub[central_node_index].publish(topic, b"data")
|
await pubsubs_gsub[central_node_index].publish(topic, b"data")
|
||||||
|
await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data")
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
# Check that the gossipsub of central node has fanout for the topics
|
||||||
|
assert topic, to_drop_topic in gossipsubs[central_node_index].fanout
|
||||||
|
# Check that the gossipsub of central node does not have a mesh for the topics
|
||||||
|
assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh
|
||||||
|
# Check that the gossipsub of central node
|
||||||
|
# has a time_since_last_publish for the topics
|
||||||
|
assert topic in gossipsubs[central_node_index].time_since_last_publish
|
||||||
|
assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish
|
||||||
|
|
||||||
# Check that the gossipsub of central node has fanout for the topic
|
await trio.sleep(1)
|
||||||
assert topic in gossipsubs[central_node_index].fanout
|
# Check that after ttl the to_drop_topic is no more in fanout of central node
|
||||||
# Check that the gossipsub of central node does not have a mesh for the topic
|
assert to_drop_topic not in gossipsubs[central_node_index].fanout
|
||||||
assert topic not in gossipsubs[central_node_index].mesh
|
|
||||||
|
|
||||||
# Central node subscribes the topic
|
# Central node subscribes the topic
|
||||||
await pubsubs_gsub[central_node_index].subscribe(topic)
|
await pubsubs_gsub[central_node_index].subscribe(topic)
|
||||||
|
|
||||||
await trio.sleep(2)
|
await trio.sleep(1)
|
||||||
|
|
||||||
# Check that the gossipsub of central node no longer has fanout for the topic
|
# Check that the gossipsub of central node no longer has fanout for the topic
|
||||||
assert topic not in gossipsubs[central_node_index].fanout
|
assert topic not in gossipsubs[central_node_index].fanout
|
||||||
|
|||||||
166
tests/interop/rust_libp2p/README.md
Normal file
166
tests/interop/rust_libp2p/README.md
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
# libp2p Interoperability Tests
|
||||||
|
|
||||||
|
This directory contains interoperability tests between py-libp2p and rust-libp2p implementations, focusing on the ping protocol to verify core compatibility.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The tests verify the following libp2p components work correctly between implementations:
|
||||||
|
|
||||||
|
- **Transport Layer**: TCP connection establishment
|
||||||
|
- **Security Layer**: Noise encryption protocol
|
||||||
|
- **Stream Multiplexing**: Yamux multiplexer compatibility
|
||||||
|
- **Protocol Negotiation**: Multistream-select protocol selection
|
||||||
|
- **Application Protocol**: Ping protocol (`/ipfs/ping/1.0.0`)
|
||||||
|
|
||||||
|
## Test Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
├── py_node/
|
||||||
|
│ └── ping.py # Python libp2p ping client/server
|
||||||
|
├── rust_node/
|
||||||
|
│ ├── src/main.rs # Rust libp2p ping client/server
|
||||||
|
│ └── Cargo.toml
|
||||||
|
└── scripts/
|
||||||
|
├── run_py_to_rust_test.ps1 # Test: Python client → Rust server
|
||||||
|
└── run_rust_to_py_test.ps1 # Test: Rust client → Python server
|
||||||
|
```
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
### Python Environment
|
||||||
|
```bash
|
||||||
|
# Install py-libp2p and dependencies
|
||||||
|
pip install .
|
||||||
|
```
|
||||||
|
|
||||||
|
### Rust Environment
|
||||||
|
```bash
|
||||||
|
# Ensure Rust is installed
|
||||||
|
rustc --version
|
||||||
|
cargo --version
|
||||||
|
|
||||||
|
# Dependencies are defined in rust_node/Cargo.toml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running Tests
|
||||||
|
|
||||||
|
### Test 1: Rust Client → Python Server
|
||||||
|
|
||||||
|
This test starts a Python server and connects with a Rust client:
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
# Run the automated test
|
||||||
|
.\scripts\run_rust_to_py_test.ps1
|
||||||
|
|
||||||
|
# Or with custom parameters
|
||||||
|
.\scripts\run_rust_to_py_test.ps1 -Port 9000 -PingCount 10
|
||||||
|
```
|
||||||
|
|
||||||
|
**Manual steps:**
|
||||||
|
1. Start Python server: `python py_node/ping.py server --port 8000`
|
||||||
|
2. Note the Peer ID from server output
|
||||||
|
3. Run Rust client: `cargo run --manifest-path rust_node/Cargo.toml -- /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID>`
|
||||||
|
|
||||||
|
### Test 2: Python Client → Rust Server
|
||||||
|
|
||||||
|
This test starts a Rust server and connects with a Python client:
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
# Run the automated test (requires manual intervention)
|
||||||
|
.\scripts\run_py_to_rust_test.ps1
|
||||||
|
|
||||||
|
# Follow the on-screen instructions to complete the test
|
||||||
|
```
|
||||||
|
|
||||||
|
**Manual steps:**
|
||||||
|
1. Start Rust server: `cargo run --manifest-path rust_node/Cargo.toml`
|
||||||
|
2. Note the Peer ID and port from server output
|
||||||
|
3. Run Python client: `python py_node/ping.py client /ip4/127.0.0.1/tcp/<PORT>/p2p/<PEER_ID> --count 5`
|
||||||
|
|
||||||
|
## Expected Behavior
|
||||||
|
|
||||||
|
### Successful Test Output
|
||||||
|
|
||||||
|
**Python Server Logs:**
|
||||||
|
```
|
||||||
|
[INFO] Starting py-libp2p ping server...
|
||||||
|
[INFO] Peer ID: QmYourPeerIdHere
|
||||||
|
[INFO] Listening: /ip4/0.0.0.0/tcp/8000
|
||||||
|
[INFO] New ping stream opened by 12D3KooW...
|
||||||
|
[PING 1] Received ping from 12D3KooW...: 32 bytes
|
||||||
|
[PING 1] Echoed ping back to 12D3KooW...
|
||||||
|
```
|
||||||
|
|
||||||
|
**Rust Client Logs:**
|
||||||
|
```
|
||||||
|
Local peer ID: 12D3KooW...
|
||||||
|
Listening on "/ip4/0.0.0.0/tcp/54321"
|
||||||
|
Dialed /ip4/127.0.0.1/tcp/8000/p2p/QmYourPeerIdHere
|
||||||
|
Behaviour(Event { peer: QmYourPeerIdHere, result: Ok(Pong) })
|
||||||
|
```
|
||||||
|
|
||||||
|
### Performance Metrics
|
||||||
|
|
||||||
|
The tests measure:
|
||||||
|
- **Connection Establishment Time**: Time to establish secure connection
|
||||||
|
- **Round-Trip Time (RTT)**: Latency for ping/pong exchanges
|
||||||
|
- **Success Rate**: Percentage of successful ping attempts
|
||||||
|
- **Protocol Negotiation**: Successful selection of `/ipfs/ping/1.0.0`
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### Common Issues
|
||||||
|
|
||||||
|
1. **Protocol Mismatch**: Ensure both implementations use the same protocol ID
|
||||||
|
- Python: `/ipfs/ping/1.0.0`
|
||||||
|
- Rust: `/ipfs/ping/1.0.0` (default ping protocol)
|
||||||
|
|
||||||
|
2. **Connection Timeout**:
|
||||||
|
- Check firewall settings
|
||||||
|
- Verify correct IP addresses and ports
|
||||||
|
- Ensure both peers are running
|
||||||
|
|
||||||
|
3. **Noise Encryption Errors**:
|
||||||
|
- Verify cryptography library versions
|
||||||
|
- Check that both implementations support the same Noise variants
|
||||||
|
|
||||||
|
4. **Yamux Multiplexing Issues**:
|
||||||
|
- Confirm Yamux protocol versions match
|
||||||
|
- Check stream handling implementation
|
||||||
|
|
||||||
|
### Debug Logging
|
||||||
|
|
||||||
|
Enable detailed logging:
|
||||||
|
|
||||||
|
**Python:**
|
||||||
|
```bash
|
||||||
|
# Logs are automatically written to ping_debug.log
|
||||||
|
tail -f ping_debug.log
|
||||||
|
```
|
||||||
|
|
||||||
|
**Rust:**
|
||||||
|
```bash
|
||||||
|
# Set environment variable for detailed logs
|
||||||
|
$env:RUST_LOG="debug"
|
||||||
|
cargo run --manifest-path rust_node/Cargo.toml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Interoperability Checklist
|
||||||
|
|
||||||
|
- [ ] TCP transport connection establishment
|
||||||
|
- [ ] Noise encryption handshake
|
||||||
|
- [ ] Yamux stream multiplexing
|
||||||
|
- [ ] Multistream protocol negotiation
|
||||||
|
- [ ] Ping protocol payload exchange (32 bytes)
|
||||||
|
- [ ] Proper connection cleanup
|
||||||
|
- [ ] Error handling and timeouts
|
||||||
|
- [ ] Performance metrics collection
|
||||||
|
|
||||||
|
## Contributing
|
||||||
|
|
||||||
|
When adding new tests:
|
||||||
|
|
||||||
|
1. Follow the existing pattern for client/server implementations
|
||||||
|
2. Add appropriate error handling and logging
|
||||||
|
3. Update this README with new test procedures
|
||||||
|
4. Ensure tests clean up resources properly
|
||||||
427
tests/interop/rust_libp2p/py_node/ping.py
Normal file
427
tests/interop/rust_libp2p/py_node/ping.py
Normal file
@ -0,0 +1,427 @@
|
|||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import (
|
||||||
|
x25519,
|
||||||
|
)
|
||||||
|
import multiaddr
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from libp2p import (
|
||||||
|
generate_new_rsa_identity,
|
||||||
|
new_host,
|
||||||
|
)
|
||||||
|
from libp2p.custom_types import (
|
||||||
|
TProtocol,
|
||||||
|
)
|
||||||
|
from libp2p.network.stream.net_stream import (
|
||||||
|
INetStream,
|
||||||
|
)
|
||||||
|
from libp2p.peer.peerinfo import (
|
||||||
|
info_from_p2p_addr,
|
||||||
|
)
|
||||||
|
from libp2p.security.noise.transport import Transport as NoiseTransport
|
||||||
|
from libp2p.stream_muxer.yamux.yamux import (
|
||||||
|
Yamux,
|
||||||
|
)
|
||||||
|
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
|
||||||
|
|
||||||
|
# Configure detailed logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(),
|
||||||
|
logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Standard libp2p ping protocol - this is what rust-libp2p uses by default
|
||||||
|
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
|
||||||
|
PING_LENGTH = 32
|
||||||
|
RESP_TIMEOUT = 60
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_ping(stream: INetStream) -> None:
|
||||||
|
"""Handle incoming ping requests from rust-libp2p clients"""
|
||||||
|
peer_id = stream.muxed_conn.peer_id
|
||||||
|
print(f"[INFO] New ping stream opened by {peer_id}")
|
||||||
|
logging.info(f"Ping handler called for peer {peer_id}")
|
||||||
|
|
||||||
|
ping_count = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print(f"[INFO] Waiting for ping data from {peer_id}...")
|
||||||
|
logging.debug(f"Stream state: {stream}")
|
||||||
|
data = await stream.read(PING_LENGTH)
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
print(
|
||||||
|
f"[INFO] No data received,"
|
||||||
|
f" connection likely closed by {peer_id}"
|
||||||
|
)
|
||||||
|
logging.debug("No data received, stream closed")
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(data) == 0:
|
||||||
|
print(f"[INFO] Empty data received, connection closed by {peer_id}")
|
||||||
|
logging.debug("Empty data received")
|
||||||
|
break
|
||||||
|
|
||||||
|
ping_count += 1
|
||||||
|
print(
|
||||||
|
f"[PING {ping_count}] Received ping from {peer_id}:"
|
||||||
|
f" {len(data)} bytes"
|
||||||
|
)
|
||||||
|
logging.debug(f"Ping data: {data.hex()}")
|
||||||
|
|
||||||
|
# Echo the data back (this is what ping protocol does)
|
||||||
|
await stream.write(data)
|
||||||
|
print(f"[PING {ping_count}] Echoed ping back to {peer_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Error in ping loop with {peer_id}: {e}")
|
||||||
|
logging.exception("Ping loop error")
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Error handling ping from {peer_id}: {e}")
|
||||||
|
logging.exception("Ping handler error")
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
print(f"[INFO] Closing ping stream with {peer_id}")
|
||||||
|
await stream.close()
|
||||||
|
except Exception as e:
|
||||||
|
logging.debug(f"Error closing stream: {e}")
|
||||||
|
|
||||||
|
print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)")
|
||||||
|
|
||||||
|
|
||||||
|
async def send_ping_sequence(stream: INetStream, count: int = 5) -> None:
|
||||||
|
"""Send a sequence of pings compatible with rust-libp2p."""
|
||||||
|
peer_id = stream.muxed_conn.peer_id
|
||||||
|
print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)")
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
rtts = []
|
||||||
|
|
||||||
|
for i in range(1, count + 1):
|
||||||
|
try:
|
||||||
|
# Generate random 32-byte payload as per ping protocol spec
|
||||||
|
payload = os.urandom(PING_LENGTH)
|
||||||
|
print(f"[PING {i}/{count}] Sending ping to {peer_id}")
|
||||||
|
logging.debug(f"Sending payload: {payload.hex()}")
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
await stream.write(payload)
|
||||||
|
|
||||||
|
with trio.fail_after(RESP_TIMEOUT):
|
||||||
|
response = await stream.read(PING_LENGTH)
|
||||||
|
|
||||||
|
end_time = time.time()
|
||||||
|
rtt = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
if (
|
||||||
|
response
|
||||||
|
and len(response) >= PING_LENGTH
|
||||||
|
and response[:PING_LENGTH] == payload
|
||||||
|
):
|
||||||
|
rtts.append(rtt)
|
||||||
|
print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms")
|
||||||
|
else:
|
||||||
|
print(f"[ERROR] Ping {i} failed: response mismatch or incomplete")
|
||||||
|
if response:
|
||||||
|
logging.debug(f"Expected: {payload.hex()}")
|
||||||
|
logging.debug(f"Received: {response.hex()}")
|
||||||
|
|
||||||
|
if i < count:
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
except trio.TooSlowError:
|
||||||
|
print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Ping {i} failed: {e}")
|
||||||
|
logging.exception(f"Ping {i} error")
|
||||||
|
|
||||||
|
# Print statistics
|
||||||
|
if rtts:
|
||||||
|
avg_rtt = sum(rtts) / len(rtts)
|
||||||
|
min_rtt = min(rtts)
|
||||||
|
max_rtt = max(rtts) # Fixed typo: was max_rtts
|
||||||
|
success_count = len(rtts)
|
||||||
|
loss_rate = ((count - success_count) / count) * 100
|
||||||
|
|
||||||
|
print(f"\n[STATS] Ping Statistics:")
|
||||||
|
print(
|
||||||
|
f" Packets: Sent={count}, Received={success_count},"
|
||||||
|
f" Lost={count - success_count}"
|
||||||
|
)
|
||||||
|
print(f" Loss rate: {loss_rate:.1f}%")
|
||||||
|
print(
|
||||||
|
f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms,"
|
||||||
|
f" max={max_rtt:.2f}ms"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(f"\n[STATS] All pings failed ({count} attempts)")
|
||||||
|
|
||||||
|
|
||||||
|
def create_noise_keypair():
|
||||||
|
"""Create a Noise protocol keypair for secure communication"""
|
||||||
|
try:
|
||||||
|
x25519_private_key = x25519.X25519PrivateKey.generate()
|
||||||
|
|
||||||
|
class NoisePrivateKey:
|
||||||
|
def __init__(self, key):
|
||||||
|
self._key = key
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
return self._key.private_bytes_raw()
|
||||||
|
|
||||||
|
def public_key(self):
|
||||||
|
return NoisePublicKey(self._key.public_key())
|
||||||
|
|
||||||
|
def get_public_key(self):
|
||||||
|
return NoisePublicKey(self._key.public_key())
|
||||||
|
|
||||||
|
class NoisePublicKey:
|
||||||
|
def __init__(self, key):
|
||||||
|
self._key = key
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
return self._key.public_bytes_raw()
|
||||||
|
|
||||||
|
return NoisePrivateKey(x25519_private_key)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to create Noise keypair: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def run_server(port: int) -> None:
|
||||||
|
"""Run ping server that accepts connections from rust-libp2p clients."""
|
||||||
|
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||||
|
|
||||||
|
key_pair = generate_new_rsa_identity()
|
||||||
|
logging.debug("Generated RSA keypair")
|
||||||
|
|
||||||
|
noise_privkey = create_noise_keypair()
|
||||||
|
if not noise_privkey:
|
||||||
|
print("[ERROR] Failed to create Noise keypair")
|
||||||
|
return
|
||||||
|
logging.debug("Generated Noise keypair")
|
||||||
|
|
||||||
|
noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey)
|
||||||
|
logging.debug(f"Noise transport initialized: {noise_transport}")
|
||||||
|
sec_opt = {TProtocol("/noise"): noise_transport}
|
||||||
|
muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux}
|
||||||
|
|
||||||
|
logging.info(f"Using muxer: {muxer_opt}")
|
||||||
|
|
||||||
|
host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt)
|
||||||
|
|
||||||
|
print("[INFO] Starting py-libp2p ping server...")
|
||||||
|
|
||||||
|
async with host.run(listen_addrs=[listen_addr]):
|
||||||
|
print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}")
|
||||||
|
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
||||||
|
|
||||||
|
# Also register alternative protocol IDs for better compatibility
|
||||||
|
alt_protocols = [
|
||||||
|
TProtocol("/ping/1.0.0"),
|
||||||
|
TProtocol("/libp2p/ping/1.0.0"),
|
||||||
|
]
|
||||||
|
|
||||||
|
for alt_proto in alt_protocols:
|
||||||
|
print(f"[INFO] Also registering handler for: {alt_proto}")
|
||||||
|
host.set_stream_handler(alt_proto, handle_ping)
|
||||||
|
|
||||||
|
print("[INFO] Server started successfully!")
|
||||||
|
print(f"[INFO] Peer ID: {host.get_id()}")
|
||||||
|
print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}")
|
||||||
|
print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}")
|
||||||
|
print(f"[INFO] Security: Noise encryption")
|
||||||
|
print(f"[INFO] Muxer: Yamux stream multiplexing")
|
||||||
|
|
||||||
|
print("\n[INFO] Registered protocols:")
|
||||||
|
print(f" - {PING_PROTOCOL_ID}")
|
||||||
|
for proto in alt_protocols:
|
||||||
|
print(f" - {proto}")
|
||||||
|
|
||||||
|
peer_id = host.get_id()
|
||||||
|
print("\n[TEST] Test with rust-libp2p:")
|
||||||
|
print(f" cargo run -- /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}")
|
||||||
|
|
||||||
|
print("\n[TEST] Test with py-libp2p:")
|
||||||
|
print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}")
|
||||||
|
|
||||||
|
print("\n[INFO] Waiting for connections...")
|
||||||
|
print("Press Ctrl+C to exit")
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def run_client(destination: str, count: int = 5) -> None:
|
||||||
|
"""Run ping client to test connectivity with another peer."""
|
||||||
|
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")
|
||||||
|
|
||||||
|
key_pair = generate_new_rsa_identity()
|
||||||
|
logging.debug("Generated RSA keypair")
|
||||||
|
|
||||||
|
noise_privkey = create_noise_keypair()
|
||||||
|
if not noise_privkey:
|
||||||
|
print("[ERROR] Failed to create Noise keypair")
|
||||||
|
return 1
|
||||||
|
logging.debug("Generated Noise keypair")
|
||||||
|
|
||||||
|
noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey)
|
||||||
|
logging.debug(f"Noise transport initialized: {noise_transport}")
|
||||||
|
sec_opt = {TProtocol("/noise"): noise_transport}
|
||||||
|
muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux}
|
||||||
|
|
||||||
|
logging.info(f"Using muxer: {muxer_opt}")
|
||||||
|
|
||||||
|
host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt)
|
||||||
|
|
||||||
|
print("[INFO] Starting py-libp2p ping client...")
|
||||||
|
|
||||||
|
async with host.run(listen_addrs=[listen_addr]):
|
||||||
|
print(f"[INFO] Our Peer ID: {host.get_id()}")
|
||||||
|
print(f"[INFO] Target: {destination}")
|
||||||
|
print("[INFO] Security: Noise encryption")
|
||||||
|
print("[INFO] Muxer: Yamux stream multiplexing")
|
||||||
|
|
||||||
|
try:
|
||||||
|
maddr = multiaddr.Multiaddr(destination)
|
||||||
|
info = info_from_p2p_addr(maddr)
|
||||||
|
target_peer_id = info.peer_id
|
||||||
|
|
||||||
|
print(f"[INFO] Target Peer ID: {target_peer_id}")
|
||||||
|
print("[INFO] Connecting to peer...")
|
||||||
|
|
||||||
|
await host.connect(info)
|
||||||
|
print("[INFO] Connection established!")
|
||||||
|
|
||||||
|
# Try protocols in order of preference
|
||||||
|
# Start with the standard libp2p ping protocol
|
||||||
|
protocols_to_try = [
|
||||||
|
PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol
|
||||||
|
TProtocol("/ping/1.0.0"), # Alternative
|
||||||
|
TProtocol("/libp2p/ping/1.0.0"), # Another alternative
|
||||||
|
]
|
||||||
|
|
||||||
|
stream = None
|
||||||
|
|
||||||
|
for proto in protocols_to_try:
|
||||||
|
try:
|
||||||
|
print(f"[INFO] Trying to open stream with protocol: {proto}")
|
||||||
|
stream = await host.new_stream(target_peer_id, [proto])
|
||||||
|
print(f"[INFO] Stream opened with protocol: {proto}")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Failed to open stream with {proto}: {e}")
|
||||||
|
logging.debug(f"Protocol {proto} failed: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not stream:
|
||||||
|
print("[ERROR] Failed to open stream with any ping protocol")
|
||||||
|
print("[ERROR] Ensure the target peer supports one of these protocols:")
|
||||||
|
for proto in protocols_to_try:
|
||||||
|
print(f"[ERROR] - {proto}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
await send_ping_sequence(stream, count)
|
||||||
|
|
||||||
|
await stream.close()
|
||||||
|
print("[INFO] Stream closed successfully")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Client error: {e}")
|
||||||
|
logging.exception("Client error")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print("\n[INFO] Client stopped")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Main function with argument parsing."""
|
||||||
|
description = """
|
||||||
|
py-libp2p ping tool for interoperability testing with rust-libp2p.
|
||||||
|
Uses Noise encryption and Yamux multiplexing for compatibility.
|
||||||
|
|
||||||
|
Server mode: Listens for ping requests from rust-libp2p or py-libp2p clients.
|
||||||
|
Client mode: Sends ping requests to rust-libp2p or py-libp2p servers.
|
||||||
|
|
||||||
|
The tool implements the standard libp2p ping protocol (/ipfs/ping/1.0.0)
|
||||||
|
which exchanges 32-byte random payloads and measures round-trip time.
|
||||||
|
"""
|
||||||
|
|
||||||
|
example_maddr = (
|
||||||
|
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description=description,
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=f"""
|
||||||
|
Examples:
|
||||||
|
python ping.py server # Start server on port 8000
|
||||||
|
python ping.py server --port 9000 # Start server on port 9000
|
||||||
|
python ping.py client {example_maddr}
|
||||||
|
python ping.py client {example_maddr} --count 10
|
||||||
|
|
||||||
|
Protocols supported:
|
||||||
|
- /ipfs/ping/1.0.0 (primary, rust-libp2p default)
|
||||||
|
- /ping/1.0.0 (alternative)
|
||||||
|
- /libp2p/ping/1.0.0 (alternative)
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
subparsers = parser.add_subparsers(dest="mode", help="Operation mode")
|
||||||
|
|
||||||
|
server_parser = subparsers.add_parser("server", help="Run as ping server")
|
||||||
|
server_parser.add_argument(
|
||||||
|
"--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)"
|
||||||
|
)
|
||||||
|
|
||||||
|
client_parser = subparsers.add_parser("client", help="Run as ping client")
|
||||||
|
client_parser.add_argument("destination", help="Target peer multiaddr")
|
||||||
|
client_parser.add_argument(
|
||||||
|
"--count",
|
||||||
|
"-c",
|
||||||
|
type=int,
|
||||||
|
default=5,
|
||||||
|
help="Number of pings to send (default: 5)",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.mode:
|
||||||
|
parser.print_help()
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
if args.mode == "server":
|
||||||
|
trio.run(run_server, args.port)
|
||||||
|
elif args.mode == "client":
|
||||||
|
return trio.run(run_client, args.destination, args.count)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n[INFO] Goodbye!")
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Fatal error: {e}")
|
||||||
|
logging.exception("Fatal error")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
exit(main())
|
||||||
18
tests/interop/rust_libp2p/rust_node/Cargo.toml
Normal file
18
tests/interop/rust_libp2p/rust_node/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
[package]
|
||||||
|
name = "ping-example"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition.workspace = true
|
||||||
|
publish = false
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
|
[package.metadata.release]
|
||||||
|
release = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
futures = { workspace = true }
|
||||||
|
libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "tokio", "yamux", "rsa"] }
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
30
tests/interop/rust_libp2p/rust_node/README.md
Normal file
30
tests/interop/rust_libp2p/rust_node/README.md
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
## Description
|
||||||
|
|
||||||
|
The ping example showcases how to create a network of nodes that establish connections, negotiate the ping protocol, and ping each other.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
To run the example, follow these steps:
|
||||||
|
|
||||||
|
1. In a first terminal window, run the following command:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
cargo run
|
||||||
|
```
|
||||||
|
|
||||||
|
This command starts a node and prints the `PeerId` and the listening addresses, such as `Listening on "/ip4/0.0.0.0/tcp/24915"`.
|
||||||
|
|
||||||
|
2. In a second terminal window, start a new instance of the example with the following command:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
cargo run -- /ip4/127.0.0.1/tcp/24915
|
||||||
|
```
|
||||||
|
|
||||||
|
Replace `/ip4/127.0.0.1/tcp/24915` with the listen address of the first node obtained from the first terminal window.
|
||||||
|
|
||||||
|
3. The two nodes will establish a connection, negotiate the ping protocol, and begin pinging each other.
|
||||||
|
|
||||||
|
## Conclusion
|
||||||
|
|
||||||
|
The ping example demonstrates the basic usage of **libp2p** to create a simple p2p network and implement a ping protocol.
|
||||||
|
By running multiple nodes and observing the ping behavior, users can gain insights into how **libp2p** facilitates communication and interaction between peers.
|
||||||
68
tests/interop/rust_libp2p/rust_node/src/main.rs
Normal file
68
tests/interop/rust_libp2p/rust_node/src/main.rs
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
#![doc = include_str!("../README.md")]
|
||||||
|
|
||||||
|
use std::{error::Error, time::Duration};
|
||||||
|
|
||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p::{noise, ping, swarm::SwarmEvent, tcp, yamux, Multiaddr};
|
||||||
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
let _ = tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(EnvFilter::from_default_env())
|
||||||
|
.try_init();
|
||||||
|
|
||||||
|
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
|
||||||
|
.with_tokio()
|
||||||
|
.with_tcp(
|
||||||
|
tcp::Config::default(),
|
||||||
|
noise::Config::new,
|
||||||
|
yamux::Config::default,
|
||||||
|
)?
|
||||||
|
.with_behaviour(|_| ping::Behaviour::default())?
|
||||||
|
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Print the peer ID
|
||||||
|
println!("Local peer ID: {}", swarm.local_peer_id());
|
||||||
|
|
||||||
|
// Tell the swarm to listen on all interfaces and a random, OS-assigned
|
||||||
|
// port.
|
||||||
|
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
|
||||||
|
|
||||||
|
// Dial the peer identified by the multi-address given as the second
|
||||||
|
// command-line argument, if any.
|
||||||
|
if let Some(addr) = std::env::args().nth(1) {
|
||||||
|
let remote: Multiaddr = addr.parse()?;
|
||||||
|
swarm.dial(remote)?;
|
||||||
|
println!("Dialed {addr}")
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match swarm.select_next_some().await {
|
||||||
|
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
|
||||||
|
SwarmEvent::Behaviour(event) => println!("{event:?}"),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
44
tests/interop/rust_libp2p/scripts/run_py_to_rust_test.ps1
Normal file
44
tests/interop/rust_libp2p/scripts/run_py_to_rust_test.ps1
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
# scripts/run_py_to_rust_test.ps1
|
||||||
|
# Test script for py-libp2p client connecting to rust-libp2p server
|
||||||
|
|
||||||
|
param(
|
||||||
|
[int]$PingCount = 5,
|
||||||
|
[int]$TimeoutSeconds = 30
|
||||||
|
)
|
||||||
|
|
||||||
|
Write-Host "=== py-libp2p to rust-libp2p Interop Test ===" -ForegroundColor Cyan
|
||||||
|
Write-Host "Starting rust-libp2p server..." -ForegroundColor Yellow
|
||||||
|
|
||||||
|
# Start rust server in background
|
||||||
|
$rustProcess = Start-Process -FilePath "cargo" -ArgumentList "run" -WorkingDirectory "rust_node" -PassThru -WindowStyle Hidden
|
||||||
|
|
||||||
|
# Wait a moment for server to start
|
||||||
|
Start-Sleep -Seconds 3
|
||||||
|
|
||||||
|
try {
|
||||||
|
# Get the rust server's listening address from its output
|
||||||
|
# For now, we'll assume it's listening on a predictable port
|
||||||
|
# In a real scenario, you'd parse the server output to get the actual address
|
||||||
|
|
||||||
|
Write-Host "Waiting for rust server to start..." -ForegroundColor Yellow
|
||||||
|
Start-Sleep -Seconds 5
|
||||||
|
|
||||||
|
# Try to find the server's peer ID and port from netstat or process output
|
||||||
|
# For this test, we'll need to manually check the rust server output
|
||||||
|
Write-Host "Please check the rust server output for its Peer ID and port" -ForegroundColor Red
|
||||||
|
Write-Host "Then run the Python client manually with:" -ForegroundColor Yellow
|
||||||
|
Write-Host "python py_node/ping.py client /ip4/127.0.0.1/tcp/<PORT>/p2p/<PEER_ID> --count $PingCount" -ForegroundColor Green
|
||||||
|
|
||||||
|
# Keep the server running
|
||||||
|
Write-Host "Press any key to stop the test..." -ForegroundColor Cyan
|
||||||
|
$null = $Host.UI.RawUI.ReadKey("NoEcho,IncludeKeyDown")
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
# Clean up
|
||||||
|
Write-Host "Stopping rust server..." -ForegroundColor Yellow
|
||||||
|
if ($rustProcess -and !$rustProcess.HasExited) {
|
||||||
|
$rustProcess.Kill()
|
||||||
|
$rustProcess.WaitForExit(5000)
|
||||||
|
}
|
||||||
|
Write-Host "Test completed." -ForegroundColor Green
|
||||||
|
}
|
||||||
78
tests/interop/rust_libp2p/scripts/run_rust_to_py_test.ps1
Normal file
78
tests/interop/rust_libp2p/scripts/run_rust_to_py_test.ps1
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
# scripts/run_rust_to_py_test.ps1
|
||||||
|
# Test script for rust-libp2p client connecting to py-libp2p server
|
||||||
|
|
||||||
|
param(
|
||||||
|
[int]$Port = 8000,
|
||||||
|
[int]$PingCount = 5,
|
||||||
|
[int]$TimeoutSeconds = 30
|
||||||
|
)
|
||||||
|
|
||||||
|
Write-Host "=== rust-libp2p to py-libp2p Interop Test ===" -ForegroundColor Cyan
|
||||||
|
Write-Host "Starting py-libp2p server on port $Port..." -ForegroundColor Yellow
|
||||||
|
|
||||||
|
# Start Python server in background
|
||||||
|
$pyProcess = Start-Process -FilePath "python" -ArgumentList "py_node/ping.py", "server", "--port", $Port -PassThru -RedirectStandardOutput "py_server_output.txt" -RedirectStandardError "py_server_error.txt"
|
||||||
|
|
||||||
|
# Wait for server to start
|
||||||
|
Start-Sleep -Seconds 5
|
||||||
|
|
||||||
|
try {
|
||||||
|
# Read the server output to get peer ID
|
||||||
|
$maxWaitTime = 10
|
||||||
|
$waited = 0
|
||||||
|
$peerID = $null
|
||||||
|
|
||||||
|
while ($waited -lt $maxWaitTime -and !$peerID) {
|
||||||
|
if (Test-Path "py_server_output.txt") {
|
||||||
|
$output = Get-Content "py_server_output.txt" -Raw
|
||||||
|
if ($output -match "Peer ID: ([\w\d]+)") {
|
||||||
|
$peerID = $matches[1]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Start-Sleep -Seconds 1
|
||||||
|
$waited++
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$peerID) {
|
||||||
|
Write-Host "Could not extract Peer ID from Python server output" -ForegroundColor Red
|
||||||
|
Write-Host "Server output:" -ForegroundColor Yellow
|
||||||
|
if (Test-Path "py_server_output.txt") {
|
||||||
|
Get-Content "py_server_output.txt"
|
||||||
|
}
|
||||||
|
if (Test-Path "py_server_error.txt") {
|
||||||
|
Write-Host "Server errors:" -ForegroundColor Red
|
||||||
|
Get-Content "py_server_error.txt"
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
$multiaddr = "/ip4/127.0.0.1/tcp/$Port/p2p/$peerID"
|
||||||
|
Write-Host "Python server started with Peer ID: $peerID" -ForegroundColor Green
|
||||||
|
Write-Host "Full address: $multiaddr" -ForegroundColor Green
|
||||||
|
|
||||||
|
Write-Host "Starting rust client..." -ForegroundColor Yellow
|
||||||
|
|
||||||
|
# Run rust client
|
||||||
|
$rustResult = Start-Process -FilePath "cargo" -ArgumentList "run", "--", $multiaddr -WorkingDirectory "rust_node" -Wait -PassThru -NoNewWindow
|
||||||
|
|
||||||
|
if ($rustResult.ExitCode -eq 0) {
|
||||||
|
Write-Host "Rust client completed successfully!" -ForegroundColor Green
|
||||||
|
} else {
|
||||||
|
Write-Host "Rust client failed with exit code: $($rustResult.ExitCode)" -ForegroundColor Red
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
# Clean up
|
||||||
|
Write-Host "Stopping Python server..." -ForegroundColor Yellow
|
||||||
|
if ($pyProcess -and !$pyProcess.HasExited) {
|
||||||
|
$pyProcess.Kill()
|
||||||
|
$pyProcess.WaitForExit(5000)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Clean up output files
|
||||||
|
if (Test-Path "py_server_output.txt") { Remove-Item "py_server_output.txt" }
|
||||||
|
if (Test-Path "py_server_error.txt") { Remove-Item "py_server_error.txt" }
|
||||||
|
|
||||||
|
Write-Host "Test completed." -ForegroundColor Green
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user