mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Compare commits
4 Commits
80b58a2ae0
...
py-rust-in
| Author | SHA1 | Date | |
|---|---|---|---|
| 203ae14b06 | |||
| 81abe4ff64 | |||
| e868f77f93 | |||
| b3137aa159 |
2
Makefile
2
Makefile
@ -38,7 +38,7 @@ lint:
|
||||
)
|
||||
|
||||
test:
|
||||
python -m pytest tests
|
||||
python -m pytest tests -n auto
|
||||
|
||||
# protobufs management
|
||||
|
||||
|
||||
19
interop/README.md
Normal file
19
interop/README.md
Normal file
@ -0,0 +1,19 @@
|
||||
These commands are to be run in `./interop/exec`
|
||||
|
||||
## Redis
|
||||
|
||||
```bash
|
||||
docker run -p 6379:6379 -it redis:latest
|
||||
```
|
||||
|
||||
## Listener
|
||||
|
||||
```bash
|
||||
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
||||
```
|
||||
|
||||
## Dialer
|
||||
|
||||
```bash
|
||||
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
|
||||
```
|
||||
0
interop/__init__.py
Normal file
0
interop/__init__.py
Normal file
107
interop/arch.py
Normal file
107
interop/arch.py
Normal file
@ -0,0 +1,107 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
|
||||
import multiaddr
|
||||
import redis
|
||||
import trio
|
||||
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.crypto.keys import (
|
||||
KeyPair,
|
||||
)
|
||||
from libp2p.crypto.rsa import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.security.insecure.transport import (
|
||||
PLAINTEXT_PROTOCOL_ID,
|
||||
InsecureTransport,
|
||||
)
|
||||
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
|
||||
from libp2p.security.noise.transport import Transport as NoiseTransport
|
||||
import libp2p.security.secio.transport as secio
|
||||
from libp2p.stream_muxer.mplex.mplex import (
|
||||
MPLEX_PROTOCOL_ID,
|
||||
Mplex,
|
||||
)
|
||||
from libp2p.stream_muxer.yamux.yamux import (
|
||||
Yamux,
|
||||
)
|
||||
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
|
||||
|
||||
|
||||
def generate_new_rsa_identity() -> KeyPair:
|
||||
return create_new_key_pair()
|
||||
|
||||
|
||||
async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
|
||||
match (sec_protocol, muxer):
|
||||
case ("insecure", "mplex"):
|
||||
key_pair = create_new_key_pair()
|
||||
host = new_host(
|
||||
key_pair,
|
||||
{TProtocol(MPLEX_PROTOCOL_ID): Mplex},
|
||||
{
|
||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
||||
},
|
||||
)
|
||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
||||
return (host, muladdr)
|
||||
case ("insecure", "yamux"):
|
||||
key_pair = create_new_key_pair()
|
||||
host = new_host(
|
||||
key_pair,
|
||||
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
|
||||
{
|
||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
||||
},
|
||||
)
|
||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
||||
return (host, muladdr)
|
||||
case ("noise", "yamux"):
|
||||
key_pair = create_new_key_pair()
|
||||
noise_key_pair = create_new_x25519_key_pair()
|
||||
|
||||
host = new_host(
|
||||
key_pair,
|
||||
{TProtocol(YAMUX_PROTOCOL_ID): Yamux},
|
||||
{
|
||||
NOISE_PROTOCOL_ID: NoiseTransport(
|
||||
key_pair, noise_privkey=noise_key_pair.private_key
|
||||
)
|
||||
},
|
||||
)
|
||||
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
|
||||
return (host, muladdr)
|
||||
case _:
|
||||
raise ValueError("Protocols not supported")
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedisClient:
|
||||
client: redis.Redis
|
||||
|
||||
def brpop(self, key: str, timeout: float) -> list[str]:
|
||||
result = self.client.brpop([key], timeout)
|
||||
return [result[1]] if result else []
|
||||
|
||||
def rpush(self, key: str, value: str) -> None:
|
||||
self.client.rpush(key, value)
|
||||
|
||||
|
||||
async def main():
|
||||
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
|
||||
client.rpush("test", "hello")
|
||||
print(client.blpop("test", timeout=5))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(main)
|
||||
57
interop/exec/config/mod.py
Normal file
57
interop/exec/config/mod.py
Normal file
@ -0,0 +1,57 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
import os
|
||||
from typing import (
|
||||
Optional,
|
||||
)
|
||||
|
||||
|
||||
def str_to_bool(val: str) -> bool:
|
||||
return val.lower() in ("true", "1")
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
"""Raised when the required environment variables are missing or invalid"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
transport: str
|
||||
sec_protocol: Optional[str]
|
||||
muxer: Optional[str]
|
||||
ip: str
|
||||
is_dialer: bool
|
||||
test_timeout: int
|
||||
redis_addr: str
|
||||
port: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Config":
|
||||
try:
|
||||
transport = os.environ["transport"]
|
||||
ip = os.environ["ip"]
|
||||
except KeyError as e:
|
||||
raise ConfigError(f"{e.args[0]} env variable not set") from None
|
||||
|
||||
try:
|
||||
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
|
||||
test_timeout = int(os.environ.get("test_timeout", "180"))
|
||||
except ValueError as e:
|
||||
raise ConfigError(f"Invalid value in env: {e}") from None
|
||||
|
||||
redis_addr = os.environ.get("redis_addr", 6379)
|
||||
sec_protocol = os.environ.get("security")
|
||||
muxer = os.environ.get("muxer")
|
||||
port = os.environ.get("port", "8000")
|
||||
|
||||
return cls(
|
||||
transport=transport,
|
||||
sec_protocol=sec_protocol,
|
||||
muxer=muxer,
|
||||
ip=ip,
|
||||
is_dialer=is_dialer,
|
||||
test_timeout=test_timeout,
|
||||
redis_addr=redis_addr,
|
||||
port=port,
|
||||
)
|
||||
33
interop/exec/native_ping.py
Normal file
33
interop/exec/native_ping.py
Normal file
@ -0,0 +1,33 @@
|
||||
import trio
|
||||
|
||||
from interop.exec.config.mod import (
|
||||
Config,
|
||||
ConfigError,
|
||||
)
|
||||
from interop.lib import (
|
||||
run_test,
|
||||
)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
try:
|
||||
config = Config.from_env()
|
||||
except ConfigError as e:
|
||||
print(f"Config error: {e}")
|
||||
return
|
||||
|
||||
# Uncomment and implement when ready
|
||||
_ = await run_test(
|
||||
config.transport,
|
||||
config.ip,
|
||||
config.port,
|
||||
config.is_dialer,
|
||||
config.test_timeout,
|
||||
config.redis_addr,
|
||||
config.sec_protocol,
|
||||
config.muxer,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(main)
|
||||
120
interop/lib.py
Normal file
120
interop/lib.py
Normal file
@ -0,0 +1,120 @@
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
)
|
||||
import json
|
||||
import time
|
||||
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
import multiaddr
|
||||
import redis
|
||||
import trio
|
||||
|
||||
from interop.arch import (
|
||||
RedisClient,
|
||||
build_host,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.network.stream.net_stream import (
|
||||
INetStream,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
|
||||
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
|
||||
PING_LENGTH = 32
|
||||
RESP_TIMEOUT = 60
|
||||
|
||||
|
||||
async def handle_ping(stream: INetStream) -> None:
|
||||
while True:
|
||||
try:
|
||||
payload = await stream.read(PING_LENGTH)
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
if payload is not None:
|
||||
print(f"received ping from {peer_id}")
|
||||
|
||||
await stream.write(payload)
|
||||
print(f"responded with pong to {peer_id}")
|
||||
|
||||
except Exception:
|
||||
await stream.reset()
|
||||
break
|
||||
|
||||
|
||||
async def send_ping(stream: INetStream) -> None:
|
||||
try:
|
||||
payload = b"\x01" * PING_LENGTH
|
||||
print(f"sending ping to {stream.muxed_conn.peer_id}")
|
||||
|
||||
await stream.write(payload)
|
||||
|
||||
with trio.fail_after(RESP_TIMEOUT):
|
||||
response = await stream.read(PING_LENGTH)
|
||||
|
||||
if response == payload:
|
||||
print(f"received pong from {stream.muxed_conn.peer_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"error occurred: {e}")
|
||||
|
||||
|
||||
async def run_test(
|
||||
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
|
||||
):
|
||||
logger.info("Starting run_test")
|
||||
|
||||
redis_client = RedisClient(
|
||||
redis.Redis(host="localhost", port=int(redis_addr), db=0)
|
||||
)
|
||||
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
|
||||
logger.info(f"Running ping test local_peer={host.get_id()}")
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
if not is_dialer:
|
||||
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
||||
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
|
||||
redis_client.rpush("listenerAddr", ma)
|
||||
|
||||
logger.info(f"Test instance, listening: {ma}")
|
||||
else:
|
||||
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
|
||||
destination = redis_addr[0].decode()
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
|
||||
handshake_start = time.perf_counter()
|
||||
|
||||
logger.info("GETTING READY FOR CONNECTION")
|
||||
await host.connect(info)
|
||||
logger.info("HOST CONNECTED")
|
||||
|
||||
# TILL HERE EVERYTHING IS FINE
|
||||
|
||||
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
|
||||
logger.info("CREATED NEW STREAM")
|
||||
|
||||
# DOES NOT MORE FORWARD FROM THIS
|
||||
logger.info("Remote conection established")
|
||||
|
||||
nursery.start_soon(send_ping, stream)
|
||||
|
||||
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
|
||||
|
||||
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
|
||||
return
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Report:
|
||||
handshake_plus_one_rtt_millis: float
|
||||
ping_rtt_millis: float
|
||||
|
||||
def gen_report(self):
|
||||
return json.dumps(self.__dict__)
|
||||
@ -5,7 +5,6 @@ from collections.abc import (
|
||||
from contextlib import (
|
||||
asynccontextmanager,
|
||||
)
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Optional,
|
||||
@ -68,7 +67,10 @@ if TYPE_CHECKING:
|
||||
# telling it to listen on the given listen addresses.
|
||||
|
||||
|
||||
logger = logging.getLogger("libp2p.network.basic_host")
|
||||
# logger = logging.getLogger("libp2p.network.basic_host")
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
|
||||
|
||||
class BasicHost(IHost):
|
||||
@ -181,12 +183,15 @@ class BasicHost(IHost):
|
||||
:return: stream: new stream created
|
||||
"""
|
||||
net_stream = await self._network.new_stream(peer_id)
|
||||
|
||||
logger.info("INETSTREAM CHECKING IN")
|
||||
logger.info(protocol_ids)
|
||||
# Perform protocol muxing to determine protocol to use
|
||||
try:
|
||||
logger.debug("PROTOCOLS TRYING TO GET SENT")
|
||||
selected_protocol = await self.multiselect_client.select_one_of(
|
||||
list(protocol_ids), MultiselectCommunicator(net_stream)
|
||||
)
|
||||
logger.info("PROTOCOLS GOT SENT")
|
||||
except MultiselectClientError as error:
|
||||
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
||||
await net_stream.reset()
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
import logging
|
||||
from typing import (
|
||||
Optional,
|
||||
)
|
||||
|
||||
# logger = logging.getLogger("libp2p.network.swarm")
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
@ -55,8 +58,6 @@ from .exceptions import (
|
||||
SwarmException,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("libp2p.network.swarm")
|
||||
|
||||
|
||||
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
|
||||
async def stream_handler(stream: INetStream) -> None:
|
||||
@ -130,6 +131,7 @@ class Swarm(Service, INetworkService):
|
||||
:return: muxed connection
|
||||
"""
|
||||
if peer_id in self.connections:
|
||||
logger.info("WE ARE RETURNING, PEER ALREADAY EXISTS")
|
||||
# If muxed connection already exists for peer_id,
|
||||
# set muxed connection equal to existing muxed connection
|
||||
return self.connections[peer_id]
|
||||
@ -150,6 +152,7 @@ class Swarm(Service, INetworkService):
|
||||
# Try all known addresses
|
||||
for multiaddr in addrs:
|
||||
try:
|
||||
logger.info("HANDSHAKE GOING TO HAPPEN")
|
||||
return await self.dial_addr(multiaddr, peer_id)
|
||||
except SwarmException as e:
|
||||
exceptions.append(e)
|
||||
@ -224,8 +227,11 @@ class Swarm(Service, INetworkService):
|
||||
logger.debug("attempting to open a stream to peer %s", peer_id)
|
||||
|
||||
swarm_conn = await self.dial_peer(peer_id)
|
||||
logger.info("INETCONN CREATED")
|
||||
|
||||
net_stream = await swarm_conn.new_stream()
|
||||
logger.info("INETSTREAM CREATED")
|
||||
|
||||
logger.debug("successfully opened a stream to peer %s", peer_id)
|
||||
return net_stream
|
||||
|
||||
|
||||
@ -2,6 +2,10 @@ from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
|
||||
from libp2p.abc import (
|
||||
IMultiselectClient,
|
||||
IMultiselectCommunicator,
|
||||
@ -36,11 +40,15 @@ class MultiselectClient(IMultiselectClient):
|
||||
try:
|
||||
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
||||
except MultiselectCommunicatorError as error:
|
||||
logger.error("WROTE FAIL")
|
||||
raise MultiselectClientError() from error
|
||||
|
||||
logger.info(f"WROTE SUC, {MULTISELECT_PROTOCOL_ID}")
|
||||
try:
|
||||
handshake_contents = await communicator.read()
|
||||
logger.info(f"READ SUC, {handshake_contents}")
|
||||
except MultiselectCommunicatorError as error:
|
||||
logger.error(f"READ FAIL, {error}")
|
||||
raise MultiselectClientError() from error
|
||||
|
||||
if not is_valid_handshake(handshake_contents):
|
||||
@ -59,9 +67,12 @@ class MultiselectClient(IMultiselectClient):
|
||||
:return: selected protocol
|
||||
:raise MultiselectClientError: raised when protocol negotiation failed
|
||||
"""
|
||||
logger.info("TRYING TO GET THE HANDSHAKE HAPPENED")
|
||||
await self.handshake(communicator)
|
||||
logger.info("HANDSHAKE HAPPENED")
|
||||
|
||||
for protocol in protocols:
|
||||
logger.info(protocol)
|
||||
try:
|
||||
selected_protocol = await self.try_select(communicator, protocol)
|
||||
return selected_protocol
|
||||
@ -113,11 +124,17 @@ class MultiselectClient(IMultiselectClient):
|
||||
"""
|
||||
try:
|
||||
await communicator.write(protocol)
|
||||
from loguru import (
|
||||
logger,
|
||||
)
|
||||
|
||||
logger.info(protocol)
|
||||
except MultiselectCommunicatorError as error:
|
||||
raise MultiselectClientError() from error
|
||||
|
||||
try:
|
||||
response = await communicator.read()
|
||||
logger.info("Response: ", response)
|
||||
except MultiselectCommunicatorError as error:
|
||||
raise MultiselectClientError() from error
|
||||
|
||||
|
||||
7
setup.py
7
setup.py
@ -37,10 +37,14 @@ extras_require = {
|
||||
"pytest-trio>=0.5.2",
|
||||
"factory-boy>=2.12.0,<3.0.0",
|
||||
],
|
||||
"interop": ["redis==6.1.0", "logging==0.4.9.6", "loguru==0.7.3"],
|
||||
}
|
||||
|
||||
extras_require["dev"] = (
|
||||
extras_require["dev"] + extras_require["docs"] + extras_require["test"]
|
||||
extras_require["dev"]
|
||||
+ extras_require["docs"]
|
||||
+ extras_require["test"]
|
||||
+ extras_require["interop"]
|
||||
)
|
||||
|
||||
try:
|
||||
@ -65,6 +69,7 @@ install_requires = [
|
||||
"rpcudp>=3.0.0",
|
||||
"trio-typing>=0.0.4",
|
||||
"trio>=0.26.0",
|
||||
"loguru>=0.7.3",
|
||||
]
|
||||
|
||||
# Add platform-specific dependencies
|
||||
|
||||
Reference in New Issue
Block a user