mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
121 lines
3.2 KiB
Python
121 lines
3.2 KiB
Python
from dataclasses import (
|
|
dataclass,
|
|
)
|
|
import json
|
|
import time
|
|
|
|
from loguru import (
|
|
logger,
|
|
)
|
|
import multiaddr
|
|
import redis
|
|
import trio
|
|
|
|
from interop.arch import (
|
|
RedisClient,
|
|
build_host,
|
|
)
|
|
from libp2p.custom_types import (
|
|
TProtocol,
|
|
)
|
|
from libp2p.network.stream.net_stream import (
|
|
INetStream,
|
|
)
|
|
from libp2p.peer.peerinfo import (
|
|
info_from_p2p_addr,
|
|
)
|
|
|
|
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
|
|
PING_LENGTH = 32
|
|
RESP_TIMEOUT = 60
|
|
|
|
|
|
async def handle_ping(stream: INetStream) -> None:
|
|
while True:
|
|
try:
|
|
payload = await stream.read(PING_LENGTH)
|
|
peer_id = stream.muxed_conn.peer_id
|
|
if payload is not None:
|
|
print(f"received ping from {peer_id}")
|
|
|
|
await stream.write(payload)
|
|
print(f"responded with pong to {peer_id}")
|
|
|
|
except Exception:
|
|
await stream.reset()
|
|
break
|
|
|
|
|
|
async def send_ping(stream: INetStream) -> None:
|
|
try:
|
|
payload = b"\x01" * PING_LENGTH
|
|
print(f"sending ping to {stream.muxed_conn.peer_id}")
|
|
|
|
await stream.write(payload)
|
|
|
|
with trio.fail_after(RESP_TIMEOUT):
|
|
response = await stream.read(PING_LENGTH)
|
|
|
|
if response == payload:
|
|
print(f"received pong from {stream.muxed_conn.peer_id}")
|
|
|
|
except Exception as e:
|
|
print(f"error occurred: {e}")
|
|
|
|
|
|
async def run_test(
|
|
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
|
|
):
|
|
logger.info("Starting run_test")
|
|
|
|
redis_client = RedisClient(
|
|
redis.Redis(host="localhost", port=int(redis_addr), db=0)
|
|
)
|
|
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
|
|
logger.info(f"Running ping test local_peer={host.get_id()}")
|
|
|
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
|
if not is_dialer:
|
|
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
|
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
|
|
redis_client.rpush("listenerAddr", ma)
|
|
|
|
logger.info(f"Test instance, listening: {ma}")
|
|
else:
|
|
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
|
|
destination = redis_addr[0].decode()
|
|
maddr = multiaddr.Multiaddr(destination)
|
|
info = info_from_p2p_addr(maddr)
|
|
|
|
handshake_start = time.perf_counter()
|
|
|
|
logger.info("GETTING READY FOR CONNECTION")
|
|
await host.connect(info)
|
|
logger.info("HOST CONNECTED")
|
|
|
|
# TILL HERE EVERYTHING IS FINE
|
|
|
|
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
|
|
logger.info("CREATED NEW STREAM")
|
|
|
|
# DOES NOT MORE FORWARD FROM THIS
|
|
logger.info("Remote conection established")
|
|
|
|
nursery.start_soon(send_ping, stream)
|
|
|
|
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
|
|
|
|
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
|
|
return
|
|
|
|
await trio.sleep_forever()
|
|
|
|
|
|
@dataclass
|
|
class Report:
|
|
handshake_plus_one_rtt_millis: float
|
|
ping_rtt_millis: float
|
|
|
|
def gen_report(self):
|
|
return json.dumps(self.__dict__)
|