mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Noise: add noise option in the factories and tests
This commit is contained in:
@ -27,7 +27,6 @@ class SecurityMultistream(ABC):
|
||||
Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go
|
||||
"""
|
||||
|
||||
# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.
|
||||
transports: "OrderedDict[TProtocol, ISecureTransport]"
|
||||
multiselect: Multiselect
|
||||
multiselect_client: MultiselectClient
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from typing import Any, AsyncIterator, Dict, List, Sequence, Tuple, cast
|
||||
from typing import Any, AsyncIterator, Callable, Dict, List, Sequence, Tuple, cast
|
||||
|
||||
from async_exit_stack import AsyncExitStack
|
||||
from async_generator import asynccontextmanager
|
||||
@ -33,6 +33,7 @@ from libp2p.security.noise.messages import (
|
||||
NoiseHandshakePayload,
|
||||
make_handshake_payload_sig,
|
||||
)
|
||||
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
|
||||
from libp2p.security.noise.transport import Transport as NoiseTransport
|
||||
import libp2p.security.secio.transport as secio
|
||||
from libp2p.security.secure_conn_interface import ISecureConn
|
||||
@ -41,20 +42,26 @@ from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex
|
||||
from libp2p.stream_muxer.mplex.mplex_stream import MplexStream
|
||||
from libp2p.tools.constants import GOSSIPSUB_PARAMS
|
||||
from libp2p.transport.tcp.tcp import TCP
|
||||
from libp2p.transport.typing import TMuxerOptions
|
||||
from libp2p.transport.typing import TMuxerOptions, TSecurityOptions
|
||||
from libp2p.transport.upgrader import TransportUpgrader
|
||||
from libp2p.typing import TProtocol
|
||||
|
||||
from .constants import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, LISTEN_MADDR
|
||||
from .utils import connect, connect_swarm
|
||||
|
||||
DEFAULT_SECURITY_PROTOCOL_ID = PLAINTEXT_PROTOCOL_ID
|
||||
|
||||
|
||||
def default_key_pair_factory() -> KeyPair:
|
||||
return generate_new_rsa_identity()
|
||||
|
||||
|
||||
class IDFactory(factory.Factory):
|
||||
class Meta:
|
||||
model = ID
|
||||
|
||||
peer_id_bytes = factory.LazyFunction(
|
||||
lambda: generate_peer_id_from(generate_new_rsa_identity())
|
||||
lambda: generate_peer_id_from(default_key_pair_factory())
|
||||
)
|
||||
|
||||
|
||||
@ -64,15 +71,6 @@ def initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) -> Pee
|
||||
return peer_store
|
||||
|
||||
|
||||
def security_transport_factory(
|
||||
is_secure: bool, key_pair: KeyPair
|
||||
) -> Dict[TProtocol, ISecureTransport]:
|
||||
if not is_secure:
|
||||
return {PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair)}
|
||||
else:
|
||||
return {secio.ID: secio.Transport(key_pair)}
|
||||
|
||||
|
||||
def noise_static_key_factory() -> PrivateKey:
|
||||
return create_ed25519_key_pair().private_key
|
||||
|
||||
@ -88,15 +86,52 @@ def noise_handshake_payload_factory() -> NoiseHandshakePayload:
|
||||
)
|
||||
|
||||
|
||||
def noise_transport_factory() -> NoiseTransport:
|
||||
def plaintext_transport_factory(key_pair: KeyPair) -> ISecureTransport:
|
||||
return InsecureTransport(key_pair)
|
||||
|
||||
|
||||
def secio_transport_factory(key_pair: KeyPair) -> ISecureTransport:
|
||||
return secio.Transport(key_pair)
|
||||
|
||||
|
||||
def noise_transport_factory(key_pair: KeyPair) -> ISecureTransport:
|
||||
return NoiseTransport(
|
||||
libp2p_keypair=create_secp256k1_key_pair(),
|
||||
libp2p_keypair=key_pair,
|
||||
noise_privkey=noise_static_key_factory(),
|
||||
early_data=None,
|
||||
with_noise_pipes=False,
|
||||
)
|
||||
|
||||
|
||||
def security_options_factory_factory(
|
||||
protocol_id: TProtocol = None
|
||||
) -> Callable[[KeyPair], TSecurityOptions]:
|
||||
if protocol_id is None:
|
||||
protocol_id = DEFAULT_SECURITY_PROTOCOL_ID
|
||||
|
||||
def security_options_factory(key_pair: KeyPair) -> TSecurityOptions:
|
||||
transport_factory: Callable[[KeyPair], ISecureTransport]
|
||||
if protocol_id == PLAINTEXT_PROTOCOL_ID:
|
||||
transport_factory = plaintext_transport_factory
|
||||
elif protocol_id == secio.ID:
|
||||
transport_factory = secio_transport_factory
|
||||
elif protocol_id == NOISE_PROTOCOL_ID:
|
||||
transport_factory = noise_transport_factory
|
||||
else:
|
||||
raise Exception(f"security transport {protocol_id} is not supported")
|
||||
return {protocol_id: transport_factory(key_pair)}
|
||||
|
||||
return security_options_factory
|
||||
|
||||
|
||||
def mplex_transport_factory() -> TMuxerOptions:
|
||||
return {MPLEX_PROTOCOL_ID: Mplex}
|
||||
|
||||
|
||||
def default_muxer_transport_factory() -> TMuxerOptions:
|
||||
return mplex_transport_factory()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def raw_conn_factory(
|
||||
nursery: trio.Nursery
|
||||
@ -124,8 +159,12 @@ async def raw_conn_factory(
|
||||
async def noise_conn_factory(
|
||||
nursery: trio.Nursery
|
||||
) -> AsyncIterator[Tuple[ISecureConn, ISecureConn]]:
|
||||
local_transport = noise_transport_factory()
|
||||
remote_transport = noise_transport_factory()
|
||||
local_transport = cast(
|
||||
NoiseTransport, noise_transport_factory(create_secp256k1_key_pair())
|
||||
)
|
||||
remote_transport = cast(
|
||||
NoiseTransport, noise_transport_factory(create_secp256k1_key_pair())
|
||||
)
|
||||
|
||||
local_secure_conn: ISecureConn = None
|
||||
remote_secure_conn: ISecureConn = None
|
||||
@ -158,9 +197,9 @@ class SwarmFactory(factory.Factory):
|
||||
model = Swarm
|
||||
|
||||
class Params:
|
||||
is_secure = False
|
||||
key_pair = factory.LazyFunction(generate_new_rsa_identity)
|
||||
muxer_opt = {MPLEX_PROTOCOL_ID: Mplex}
|
||||
key_pair = factory.LazyFunction(default_key_pair_factory)
|
||||
security_protocol = DEFAULT_SECURITY_PROTOCOL_ID
|
||||
muxer_opt = factory.LazyFunction(default_muxer_transport_factory)
|
||||
|
||||
peer_id = factory.LazyAttribute(lambda o: generate_peer_id_from(o.key_pair))
|
||||
peerstore = factory.LazyAttribute(
|
||||
@ -168,7 +207,8 @@ class SwarmFactory(factory.Factory):
|
||||
)
|
||||
upgrader = factory.LazyAttribute(
|
||||
lambda o: TransportUpgrader(
|
||||
security_transport_factory(o.is_secure, o.key_pair), o.muxer_opt
|
||||
(security_options_factory_factory(o.security_protocol))(o.key_pair),
|
||||
o.muxer_opt,
|
||||
)
|
||||
)
|
||||
transport = factory.LazyFunction(TCP)
|
||||
@ -176,7 +216,10 @@ class SwarmFactory(factory.Factory):
|
||||
@classmethod
|
||||
@asynccontextmanager
|
||||
async def create_and_listen(
|
||||
cls, is_secure: bool, key_pair: KeyPair = None, muxer_opt: TMuxerOptions = None
|
||||
cls,
|
||||
key_pair: KeyPair = None,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Swarm]:
|
||||
# `factory.Factory.__init__` does *not* prepare a *default value* if we pass
|
||||
# an argument explicitly with `None`. If an argument is `None`, we don't pass it to
|
||||
@ -184,9 +227,11 @@ class SwarmFactory(factory.Factory):
|
||||
optional_kwargs: Dict[str, Any] = {}
|
||||
if key_pair is not None:
|
||||
optional_kwargs["key_pair"] = key_pair
|
||||
if security_protocol is not None:
|
||||
optional_kwargs["security_protocol"] = security_protocol
|
||||
if muxer_opt is not None:
|
||||
optional_kwargs["muxer_opt"] = muxer_opt
|
||||
swarm = cls(is_secure=is_secure, **optional_kwargs)
|
||||
swarm = cls(**optional_kwargs)
|
||||
async with background_trio_service(swarm):
|
||||
await swarm.listen(LISTEN_MADDR)
|
||||
yield swarm
|
||||
@ -194,12 +239,17 @@ class SwarmFactory(factory.Factory):
|
||||
@classmethod
|
||||
@asynccontextmanager
|
||||
async def create_batch_and_listen(
|
||||
cls, is_secure: bool, number: int, muxer_opt: TMuxerOptions = None
|
||||
cls,
|
||||
number: int,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[Swarm, ...]]:
|
||||
async with AsyncExitStack() as stack:
|
||||
ctx_mgrs = [
|
||||
await stack.enter_async_context(
|
||||
cls.create_and_listen(is_secure=is_secure, muxer_opt=muxer_opt)
|
||||
cls.create_and_listen(
|
||||
security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
)
|
||||
)
|
||||
for _ in range(number)
|
||||
]
|
||||
@ -211,17 +261,27 @@ class HostFactory(factory.Factory):
|
||||
model = BasicHost
|
||||
|
||||
class Params:
|
||||
is_secure = False
|
||||
key_pair = factory.LazyFunction(generate_new_rsa_identity)
|
||||
key_pair = factory.LazyFunction(default_key_pair_factory)
|
||||
security_protocol: TProtocol = None
|
||||
muxer_opt = factory.LazyFunction(default_muxer_transport_factory)
|
||||
|
||||
network = factory.LazyAttribute(lambda o: SwarmFactory(is_secure=o.is_secure))
|
||||
network = factory.LazyAttribute(
|
||||
lambda o: SwarmFactory(
|
||||
security_protocol=o.security_protocol, muxer_opt=o.muxer_opt
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@asynccontextmanager
|
||||
async def create_batch_and_listen(
|
||||
cls, is_secure: bool, number: int
|
||||
cls,
|
||||
number: int,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[BasicHost, ...]]:
|
||||
async with SwarmFactory.create_batch_and_listen(is_secure, number) as swarms:
|
||||
async with SwarmFactory.create_batch_and_listen(
|
||||
number, security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as swarms:
|
||||
hosts = tuple(BasicHost(swarm) for swarm in swarms)
|
||||
yield hosts
|
||||
|
||||
@ -245,20 +305,29 @@ class RoutedHostFactory(factory.Factory):
|
||||
model = RoutedHost
|
||||
|
||||
class Params:
|
||||
is_secure = False
|
||||
key_pair = factory.LazyFunction(default_key_pair_factory)
|
||||
security_protocol: TProtocol = None
|
||||
muxer_opt = factory.LazyFunction(default_muxer_transport_factory)
|
||||
|
||||
network = factory.LazyAttribute(
|
||||
lambda o: HostFactory(is_secure=o.is_secure).get_network()
|
||||
lambda o: HostFactory(
|
||||
security_protocol=o.security_protocol, muxer_opt=o.muxer_opt
|
||||
).get_network()
|
||||
)
|
||||
router = factory.LazyFunction(DummyRouter)
|
||||
|
||||
@classmethod
|
||||
@asynccontextmanager
|
||||
async def create_batch_and_listen(
|
||||
cls, is_secure: bool, number: int
|
||||
cls,
|
||||
number: int,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[RoutedHost, ...]]:
|
||||
routing_table = DummyRouter()
|
||||
async with HostFactory.create_batch_and_listen(is_secure, number) as hosts:
|
||||
async with HostFactory.create_batch_and_listen(
|
||||
number, security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as hosts:
|
||||
for host in hosts:
|
||||
routing_table._add_peer(host.get_id(), host.get_addrs())
|
||||
routed_hosts = tuple(
|
||||
@ -319,11 +388,14 @@ class PubsubFactory(factory.Factory):
|
||||
cls,
|
||||
number: int,
|
||||
routers: Sequence[IPubsubRouter],
|
||||
is_secure: bool = False,
|
||||
cache_size: int = None,
|
||||
strict_signing: bool = False,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[Pubsub, ...]]:
|
||||
async with HostFactory.create_batch_and_listen(is_secure, number) as hosts:
|
||||
async with HostFactory.create_batch_and_listen(
|
||||
number, security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as hosts:
|
||||
# Pubsubs should exit before hosts
|
||||
async with AsyncExitStack() as stack:
|
||||
pubsubs = [
|
||||
@ -339,17 +411,23 @@ class PubsubFactory(factory.Factory):
|
||||
async def create_batch_with_floodsub(
|
||||
cls,
|
||||
number: int,
|
||||
is_secure: bool = False,
|
||||
cache_size: int = None,
|
||||
strict_signing: bool = False,
|
||||
protocols: Sequence[TProtocol] = None,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[Pubsub, ...]]:
|
||||
if protocols is not None:
|
||||
floodsubs = FloodsubFactory.create_batch(number, protocols=list(protocols))
|
||||
else:
|
||||
floodsubs = FloodsubFactory.create_batch(number)
|
||||
async with cls._create_batch_with_router(
|
||||
number, floodsubs, is_secure, cache_size, strict_signing
|
||||
number,
|
||||
floodsubs,
|
||||
cache_size,
|
||||
strict_signing,
|
||||
security_protocol=security_protocol,
|
||||
muxer_opt=muxer_opt,
|
||||
) as pubsubs:
|
||||
yield pubsubs
|
||||
|
||||
@ -359,7 +437,6 @@ class PubsubFactory(factory.Factory):
|
||||
cls,
|
||||
number: int,
|
||||
*,
|
||||
is_secure: bool = False,
|
||||
cache_size: int = None,
|
||||
strict_signing: bool = False,
|
||||
protocols: Sequence[TProtocol] = None,
|
||||
@ -371,6 +448,8 @@ class PubsubFactory(factory.Factory):
|
||||
gossip_history: int = GOSSIPSUB_PARAMS.gossip_history,
|
||||
heartbeat_interval: float = GOSSIPSUB_PARAMS.heartbeat_interval,
|
||||
heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
) -> AsyncIterator[Tuple[Pubsub, ...]]:
|
||||
if protocols is not None:
|
||||
gossipsubs = GossipsubFactory.create_batch(
|
||||
@ -395,7 +474,12 @@ class PubsubFactory(factory.Factory):
|
||||
)
|
||||
|
||||
async with cls._create_batch_with_router(
|
||||
number, gossipsubs, is_secure, cache_size, strict_signing
|
||||
number,
|
||||
gossipsubs,
|
||||
cache_size,
|
||||
strict_signing,
|
||||
security_protocol=security_protocol,
|
||||
muxer_opt=muxer_opt,
|
||||
) as pubsubs:
|
||||
async with AsyncExitStack() as stack:
|
||||
for router in gossipsubs:
|
||||
@ -405,10 +489,10 @@ class PubsubFactory(factory.Factory):
|
||||
|
||||
@asynccontextmanager
|
||||
async def swarm_pair_factory(
|
||||
is_secure: bool, muxer_opt: TMuxerOptions = None
|
||||
security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None
|
||||
) -> AsyncIterator[Tuple[Swarm, Swarm]]:
|
||||
async with SwarmFactory.create_batch_and_listen(
|
||||
is_secure, 2, muxer_opt=muxer_opt
|
||||
2, security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as swarms:
|
||||
await connect_swarm(swarms[0], swarms[1])
|
||||
yield swarms[0], swarms[1]
|
||||
@ -416,18 +500,22 @@ async def swarm_pair_factory(
|
||||
|
||||
@asynccontextmanager
|
||||
async def host_pair_factory(
|
||||
is_secure: bool
|
||||
security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None
|
||||
) -> AsyncIterator[Tuple[BasicHost, BasicHost]]:
|
||||
async with HostFactory.create_batch_and_listen(is_secure, 2) as hosts:
|
||||
async with HostFactory.create_batch_and_listen(
|
||||
2, security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as hosts:
|
||||
await connect(hosts[0], hosts[1])
|
||||
yield hosts[0], hosts[1]
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def swarm_conn_pair_factory(
|
||||
is_secure: bool, muxer_opt: TMuxerOptions = None
|
||||
security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None
|
||||
) -> AsyncIterator[Tuple[SwarmConn, SwarmConn]]:
|
||||
async with swarm_pair_factory(is_secure) as swarms:
|
||||
async with swarm_pair_factory(
|
||||
security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as swarms:
|
||||
conn_0 = swarms[0].connections[swarms[1].get_peer_id()]
|
||||
conn_1 = swarms[1].connections[swarms[0].get_peer_id()]
|
||||
yield cast(SwarmConn, conn_0), cast(SwarmConn, conn_1)
|
||||
@ -435,10 +523,11 @@ async def swarm_conn_pair_factory(
|
||||
|
||||
@asynccontextmanager
|
||||
async def mplex_conn_pair_factory(
|
||||
is_secure: bool
|
||||
security_protocol: TProtocol = None
|
||||
) -> AsyncIterator[Tuple[Mplex, Mplex]]:
|
||||
muxer_opt = {MPLEX_PROTOCOL_ID: Mplex}
|
||||
async with swarm_conn_pair_factory(is_secure, muxer_opt=muxer_opt) as swarm_pair:
|
||||
async with swarm_conn_pair_factory(
|
||||
security_protocol=security_protocol, muxer_opt=default_muxer_transport_factory()
|
||||
) as swarm_pair:
|
||||
yield (
|
||||
cast(Mplex, swarm_pair[0].muxed_conn),
|
||||
cast(Mplex, swarm_pair[1].muxed_conn),
|
||||
@ -447,9 +536,11 @@ async def mplex_conn_pair_factory(
|
||||
|
||||
@asynccontextmanager
|
||||
async def mplex_stream_pair_factory(
|
||||
is_secure: bool
|
||||
security_protocol: TProtocol = None
|
||||
) -> AsyncIterator[Tuple[MplexStream, MplexStream]]:
|
||||
async with mplex_conn_pair_factory(is_secure) as mplex_conn_pair_info:
|
||||
async with mplex_conn_pair_factory(
|
||||
security_protocol=security_protocol
|
||||
) as mplex_conn_pair_info:
|
||||
mplex_conn_0, mplex_conn_1 = mplex_conn_pair_info
|
||||
stream_0 = cast(MplexStream, await mplex_conn_0.open_stream())
|
||||
await trio.sleep(0.01)
|
||||
@ -463,7 +554,7 @@ async def mplex_stream_pair_factory(
|
||||
|
||||
@asynccontextmanager
|
||||
async def net_stream_pair_factory(
|
||||
is_secure: bool
|
||||
security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None
|
||||
) -> AsyncIterator[Tuple[INetStream, INetStream]]:
|
||||
protocol_id = TProtocol("/example/id/1")
|
||||
|
||||
@ -478,7 +569,9 @@ async def net_stream_pair_factory(
|
||||
stream_1 = stream
|
||||
await event_handler_finished.wait()
|
||||
|
||||
async with host_pair_factory(is_secure) as hosts:
|
||||
async with host_pair_factory(
|
||||
security_protocol=security_protocol, muxer_opt=muxer_opt
|
||||
) as hosts:
|
||||
hosts[1].set_stream_handler(protocol_id, handler)
|
||||
|
||||
stream_0 = await hosts[0].new_stream(hosts[1].get_id(), [protocol_id])
|
||||
|
||||
@ -8,6 +8,8 @@ import trio
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
|
||||
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID
|
||||
from libp2p.typing import TProtocol
|
||||
|
||||
from .constants import LOCALHOST_IP
|
||||
from .envs import GO_BIN_PATH
|
||||
@ -20,7 +22,7 @@ class P2PDProcess(BaseInteractiveProcess):
|
||||
def __init__(
|
||||
self,
|
||||
control_maddr: Multiaddr,
|
||||
is_secure: bool,
|
||||
security_protocol: TProtocol,
|
||||
is_pubsub_enabled: bool = True,
|
||||
is_gossipsub: bool = True,
|
||||
is_pubsub_signing: bool = False,
|
||||
@ -28,7 +30,7 @@ class P2PDProcess(BaseInteractiveProcess):
|
||||
) -> None:
|
||||
args = [f"-listen={control_maddr!s}"]
|
||||
# NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`.
|
||||
if not is_secure:
|
||||
if security_protocol == PLAINTEXT_PROTOCOL_ID:
|
||||
args.append("-insecure=true")
|
||||
if is_pubsub_enabled:
|
||||
args.append("-pubsub")
|
||||
@ -85,7 +87,7 @@ class Daemon:
|
||||
async def make_p2pd(
|
||||
daemon_control_port: int,
|
||||
client_callback_port: int,
|
||||
is_secure: bool,
|
||||
security_protocol: TProtocol,
|
||||
is_pubsub_enabled: bool = True,
|
||||
is_gossipsub: bool = True,
|
||||
is_pubsub_signing: bool = False,
|
||||
@ -94,7 +96,7 @@ async def make_p2pd(
|
||||
control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}")
|
||||
p2pd_proc = P2PDProcess(
|
||||
control_maddr,
|
||||
is_secure,
|
||||
security_protocol,
|
||||
is_pubsub_enabled,
|
||||
is_gossipsub,
|
||||
is_pubsub_signing,
|
||||
|
||||
Reference in New Issue
Block a user