4 Commits

Author SHA1 Message Date
203ae14b06 fix: update pytest command to use auto for parallel execution
feat: add loguru dependency for enhanced logging capabilities

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-06-06 15:47:00 +05:30
81abe4ff64 fix: correct syntax error in interop dependencies list
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-06-06 15:32:42 +05:30
e868f77f93 added: logs 2025-06-06 15:04:13 +05:30
b3137aa159 interop utilities for mplex ping 2025-06-06 15:04:12 +05:30
11 changed files with 377 additions and 8 deletions

View File

@ -38,7 +38,7 @@ lint:
)
test:
python -m pytest tests
python -m pytest tests -n auto
# protobufs management

19
interop/README.md Normal file
View File

@ -0,0 +1,19 @@
These commands are to be run in `./interop/exec`
## Redis
```bash
docker run -p 6379:6379 -it redis:latest
```
## Listener
```bash
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
```
## Dialer
```bash
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
```

0
interop/__init__.py Normal file
View File

107
interop/arch.py Normal file
View File

@ -0,0 +1,107 @@
from dataclasses import (
dataclass,
)
import multiaddr
import redis
import trio
from libp2p import (
new_host,
)
from libp2p.crypto.keys import (
KeyPair,
)
from libp2p.crypto.rsa import (
create_new_key_pair,
)
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
from libp2p.custom_types import (
TProtocol,
)
from libp2p.security.insecure.transport import (
PLAINTEXT_PROTOCOL_ID,
InsecureTransport,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
import libp2p.security.secio.transport as secio
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
Mplex,
)
from libp2p.stream_muxer.yamux.yamux import (
Yamux,
)
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
def generate_new_rsa_identity() -> KeyPair:
return create_new_key_pair()
async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
match (sec_protocol, muxer):
case ("insecure", "mplex"):
key_pair = create_new_key_pair()
host = new_host(
key_pair,
{TProtocol(MPLEX_PROTOCOL_ID): Mplex},
{
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
TProtocol(secio.ID): secio.Transport(key_pair),
},
)
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
return (host, muladdr)
case ("insecure", "yamux"):
key_pair = create_new_key_pair()
host = new_host(
key_pair,
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
{
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
TProtocol(secio.ID): secio.Transport(key_pair),
},
)
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
return (host, muladdr)
case ("noise", "yamux"):
key_pair = create_new_key_pair()
noise_key_pair = create_new_x25519_key_pair()
host = new_host(
key_pair,
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
{
NOISE_PROTOCOL_ID: NoiseTransport(
key_pair, noise_privkey=noise_key_pair.private_key
)
},
)
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
return (host, muladdr)
case _:
raise ValueError("Protocols not supported")
@dataclass
class RedisClient:
client: redis.Redis
def brpop(self, key: str, timeout: float) -> list[str]:
result = self.client.brpop([key], timeout)
return [result[1]] if result else []
def rpush(self, key: str, value: str) -> None:
self.client.rpush(key, value)
async def main():
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
client.rpush("test", "hello")
print(client.blpop("test", timeout=5))
if __name__ == "__main__":
trio.run(main)

View File

@ -0,0 +1,57 @@
from dataclasses import (
dataclass,
)
import os
from typing import (
Optional,
)
def str_to_bool(val: str) -> bool:
return val.lower() in ("true", "1")
class ConfigError(Exception):
"""Raised when the required environment variables are missing or invalid"""
@dataclass
class Config:
transport: str
sec_protocol: Optional[str]
muxer: Optional[str]
ip: str
is_dialer: bool
test_timeout: int
redis_addr: str
port: str
@classmethod
def from_env(cls) -> "Config":
try:
transport = os.environ["transport"]
ip = os.environ["ip"]
except KeyError as e:
raise ConfigError(f"{e.args[0]} env variable not set") from None
try:
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
test_timeout = int(os.environ.get("test_timeout", "180"))
except ValueError as e:
raise ConfigError(f"Invalid value in env: {e}") from None
redis_addr = os.environ.get("redis_addr", 6379)
sec_protocol = os.environ.get("security")
muxer = os.environ.get("muxer")
port = os.environ.get("port", "8000")
return cls(
transport=transport,
sec_protocol=sec_protocol,
muxer=muxer,
ip=ip,
is_dialer=is_dialer,
test_timeout=test_timeout,
redis_addr=redis_addr,
port=port,
)

View File

@ -0,0 +1,33 @@
import trio
from interop.exec.config.mod import (
Config,
ConfigError,
)
from interop.lib import (
run_test,
)
async def main() -> None:
try:
config = Config.from_env()
except ConfigError as e:
print(f"Config error: {e}")
return
# Uncomment and implement when ready
_ = await run_test(
config.transport,
config.ip,
config.port,
config.is_dialer,
config.test_timeout,
config.redis_addr,
config.sec_protocol,
config.muxer,
)
if __name__ == "__main__":
trio.run(main)

120
interop/lib.py Normal file
View File

@ -0,0 +1,120 @@
from dataclasses import (
dataclass,
)
import json
import time
from loguru import (
logger,
)
import multiaddr
import redis
import trio
from interop.arch import (
RedisClient,
build_host,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60
async def handle_ping(stream: INetStream) -> None:
while True:
try:
payload = await stream.read(PING_LENGTH)
peer_id = stream.muxed_conn.peer_id
if payload is not None:
print(f"received ping from {peer_id}")
await stream.write(payload)
print(f"responded with pong to {peer_id}")
except Exception:
await stream.reset()
break
async def send_ping(stream: INetStream) -> None:
try:
payload = b"\x01" * PING_LENGTH
print(f"sending ping to {stream.muxed_conn.peer_id}")
await stream.write(payload)
with trio.fail_after(RESP_TIMEOUT):
response = await stream.read(PING_LENGTH)
if response == payload:
print(f"received pong from {stream.muxed_conn.peer_id}")
except Exception as e:
print(f"error occurred: {e}")
async def run_test(
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
):
logger.info("Starting run_test")
redis_client = RedisClient(
redis.Redis(host="localhost", port=int(redis_addr), db=0)
)
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
logger.info(f"Running ping test local_peer={host.get_id()}")
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
if not is_dialer:
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
redis_client.rpush("listenerAddr", ma)
logger.info(f"Test instance, listening: {ma}")
else:
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
destination = redis_addr[0].decode()
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)
handshake_start = time.perf_counter()
logger.info("GETTING READY FOR CONNECTION")
await host.connect(info)
logger.info("HOST CONNECTED")
# TILL HERE EVERYTHING IS FINE
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
logger.info("CREATED NEW STREAM")
# DOES NOT MORE FORWARD FROM THIS
logger.info("Remote conection established")
nursery.start_soon(send_ping, stream)
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
return
await trio.sleep_forever()
@dataclass
class Report:
handshake_plus_one_rtt_millis: float
ping_rtt_millis: float
def gen_report(self):
return json.dumps(self.__dict__)

View File

@ -5,7 +5,6 @@ from collections.abc import (
from contextlib import (
asynccontextmanager,
)
import logging
from typing import (
TYPE_CHECKING,
Optional,
@ -68,7 +67,10 @@ if TYPE_CHECKING:
# telling it to listen on the given listen addresses.
logger = logging.getLogger("libp2p.network.basic_host")
# logger = logging.getLogger("libp2p.network.basic_host")
from loguru import (
logger,
)
class BasicHost(IHost):
@ -181,12 +183,15 @@ class BasicHost(IHost):
:return: stream: new stream created
"""
net_stream = await self._network.new_stream(peer_id)
logger.info("INETSTREAM CHECKING IN")
logger.info(protocol_ids)
# Perform protocol muxing to determine protocol to use
try:
logger.debug("PROTOCOLS TRYING TO GET SENT")
selected_protocol = await self.multiselect_client.select_one_of(
list(protocol_ids), MultiselectCommunicator(net_stream)
)
logger.info("PROTOCOLS GOT SENT")
except MultiselectClientError as error:
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
await net_stream.reset()

View File

@ -1,8 +1,11 @@
import logging
from typing import (
Optional,
)
# logger = logging.getLogger("libp2p.network.swarm")
from loguru import (
logger,
)
from multiaddr import (
Multiaddr,
)
@ -55,8 +58,6 @@ from .exceptions import (
SwarmException,
)
logger = logging.getLogger("libp2p.network.swarm")
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
async def stream_handler(stream: INetStream) -> None:
@ -130,6 +131,7 @@ class Swarm(Service, INetworkService):
:return: muxed connection
"""
if peer_id in self.connections:
logger.info("WE ARE RETURNING, PEER ALREADAY EXISTS")
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
return self.connections[peer_id]
@ -150,6 +152,7 @@ class Swarm(Service, INetworkService):
# Try all known addresses
for multiaddr in addrs:
try:
logger.info("HANDSHAKE GOING TO HAPPEN")
return await self.dial_addr(multiaddr, peer_id)
except SwarmException as e:
exceptions.append(e)
@ -224,8 +227,11 @@ class Swarm(Service, INetworkService):
logger.debug("attempting to open a stream to peer %s", peer_id)
swarm_conn = await self.dial_peer(peer_id)
logger.info("INETCONN CREATED")
net_stream = await swarm_conn.new_stream()
logger.info("INETSTREAM CREATED")
logger.debug("successfully opened a stream to peer %s", peer_id)
return net_stream

View File

@ -2,6 +2,10 @@ from collections.abc import (
Sequence,
)
from loguru import (
logger,
)
from libp2p.abc import (
IMultiselectClient,
IMultiselectCommunicator,
@ -36,11 +40,15 @@ class MultiselectClient(IMultiselectClient):
try:
await communicator.write(MULTISELECT_PROTOCOL_ID)
except MultiselectCommunicatorError as error:
logger.error("WROTE FAIL")
raise MultiselectClientError() from error
logger.info(f"WROTE SUC, {MULTISELECT_PROTOCOL_ID}")
try:
handshake_contents = await communicator.read()
logger.info(f"READ SUC, {handshake_contents}")
except MultiselectCommunicatorError as error:
logger.error(f"READ FAIL, {error}")
raise MultiselectClientError() from error
if not is_valid_handshake(handshake_contents):
@ -59,9 +67,12 @@ class MultiselectClient(IMultiselectClient):
:return: selected protocol
:raise MultiselectClientError: raised when protocol negotiation failed
"""
logger.info("TRYING TO GET THE HANDSHAKE HAPPENED")
await self.handshake(communicator)
logger.info("HANDSHAKE HAPPENED")
for protocol in protocols:
logger.info(protocol)
try:
selected_protocol = await self.try_select(communicator, protocol)
return selected_protocol
@ -113,11 +124,17 @@ class MultiselectClient(IMultiselectClient):
"""
try:
await communicator.write(protocol)
from loguru import (
logger,
)
logger.info(protocol)
except MultiselectCommunicatorError as error:
raise MultiselectClientError() from error
try:
response = await communicator.read()
logger.info("Response: ", response)
except MultiselectCommunicatorError as error:
raise MultiselectClientError() from error

View File

@ -37,10 +37,14 @@ extras_require = {
"pytest-trio>=0.5.2",
"factory-boy>=2.12.0,<3.0.0",
],
"interop": ["redis==6.1.0", "logging==0.4.9.6", "loguru==0.7.3"],
}
extras_require["dev"] = (
extras_require["dev"] + extras_require["docs"] + extras_require["test"]
extras_require["dev"]
+ extras_require["docs"]
+ extras_require["test"]
+ extras_require["interop"]
)
try:
@ -65,6 +69,7 @@ install_requires = [
"rpcudp>=3.0.0",
"trio-typing>=0.0.4",
"trio>=0.26.0",
"loguru>=0.7.3",
]
# Add platform-specific dependencies