Move test utilities to tools (#356)

* move test factories to libp2p/tools

* remove unused inits

* move pubsub test utils to tools

* cleanup test_interop

* fix typing libp2p/tools/utils

* add typing to pubsub utils

* fix factories typing

* fix typing for floodsub_integration_test_settings

* fix rest of the typing

* fix isort
This commit is contained in:
Chih Cheng Liang
2019-11-21 11:47:54 +08:00
committed by GitHub
parent 74198c70b1
commit bcd7890124
52 changed files with 171 additions and 198 deletions

View File

@ -117,7 +117,7 @@ async def new_node(
sec_opt: TSecurityOptions = None,
peerstore_opt: IPeerStore = None,
disc_opt: IPeerRouting = None,
) -> IHost:
) -> BasicHost:
"""
create new libp2p node.

View File

@ -25,6 +25,7 @@ from libp2p.peer.id import ID as PeerID
from libp2p.security.base_session import BaseSession
from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.typing import TProtocol
from .exceptions import (
IncompatibleChoices,
@ -37,7 +38,7 @@ from .exceptions import (
)
from .pb.spipe_pb2 import Exchange, Propose
ID = "/secio/1.0.0"
ID = TProtocol("/secio/1.0.0")
NONCE_SIZE = 16 # bytes

0
libp2p/tools/__init__.py Normal file
View File

30
libp2p/tools/constants.py Normal file
View File

@ -0,0 +1,30 @@
from typing import NamedTuple
import multiaddr
from libp2p.pubsub import floodsub, gossipsub
# Just a arbitrary large number.
# It is used when calling `MplexStream.read(MAX_READ_LEN)`,
# to avoid `MplexStream.read()`, which blocking reads until EOF.
MAX_READ_LEN = 2 ** 32 - 1
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID
GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID
class GossipsubParams(NamedTuple):
degree: int = 10
degree_low: int = 9
degree_high: int = 11
time_to_live: int = 30
gossip_window: int = 3
gossip_history: int = 5
heartbeat_interval: float = 0.5
GOSSIPSUB_PARAMS = GossipsubParams()

233
libp2p/tools/factories.py Normal file
View File

@ -0,0 +1,233 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict, Tuple, cast
import factory
from libp2p import generate_new_rsa_identity, generate_peer_id_from
from libp2p.crypto.keys import KeyPair
from libp2p.host.basic_host import BasicHost
from libp2p.network.connection.swarm_connection import SwarmConn
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.network.swarm import Swarm
from libp2p.peer.peerstore import PeerStore
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pubsub import Pubsub
from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
import libp2p.security.secio.transport as secio
from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex
from libp2p.stream_muxer.mplex.mplex_stream import MplexStream
from libp2p.transport.tcp.tcp import TCP
from libp2p.transport.typing import TMuxerOptions
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import TProtocol
from .constants import (
FLOODSUB_PROTOCOL_ID,
GOSSIPSUB_PARAMS,
GOSSIPSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
from .utils import connect, connect_swarm
def security_transport_factory(
is_secure: bool, key_pair: KeyPair
) -> Dict[TProtocol, BaseSecureTransport]:
if not is_secure:
return {PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair)}
else:
return {secio.ID: secio.Transport(key_pair)}
class SwarmFactory(factory.Factory):
class Meta:
model = Swarm
class Params:
is_secure = False
key_pair = factory.LazyFunction(generate_new_rsa_identity)
muxer_opt = {MPLEX_PROTOCOL_ID: Mplex}
peer_id = factory.LazyAttribute(lambda o: generate_peer_id_from(o.key_pair))
peerstore = factory.LazyFunction(PeerStore)
upgrader = factory.LazyAttribute(
lambda o: TransportUpgrader(
security_transport_factory(o.is_secure, o.key_pair), o.muxer_opt
)
)
transport = factory.LazyFunction(TCP)
@classmethod
async def create_and_listen(
cls, is_secure: bool, key_pair: KeyPair = None, muxer_opt: TMuxerOptions = None
) -> Swarm:
# `factory.Factory.__init__` does *not* prepare a *default value* if we pass
# an argument explicitly with `None`. If an argument is `None`, we don't pass it to
# `factory.Factory.__init__`, in order to let the function initialize it.
optional_kwargs: Dict[str, Any] = {}
if key_pair is not None:
optional_kwargs["key_pair"] = key_pair
if muxer_opt is not None:
optional_kwargs["muxer_opt"] = muxer_opt
swarm = cls(is_secure=is_secure, **optional_kwargs)
await swarm.listen(LISTEN_MADDR)
return swarm
@classmethod
async def create_batch_and_listen(
cls, is_secure: bool, number: int, muxer_opt: TMuxerOptions = None
) -> Tuple[Swarm, ...]:
# Ignore typing since we are removing asyncio soon
return await asyncio.gather( # type: ignore
*[
cls.create_and_listen(is_secure=is_secure, muxer_opt=muxer_opt)
for _ in range(number)
]
)
class HostFactory(factory.Factory):
class Meta:
model = BasicHost
class Params:
is_secure = False
key_pair = factory.LazyFunction(generate_new_rsa_identity)
public_key = factory.LazyAttribute(lambda o: o.key_pair.public_key)
network = factory.LazyAttribute(
lambda o: SwarmFactory(is_secure=o.is_secure, key_pair=o.key_pair)
)
@classmethod
async def create_batch_and_listen(
cls, is_secure: bool, number: int
) -> Tuple[BasicHost, ...]:
key_pairs = [generate_new_rsa_identity() for _ in range(number)]
swarms = await asyncio.gather(
*[
SwarmFactory.create_and_listen(is_secure, key_pair)
for key_pair in key_pairs
]
)
return tuple(
BasicHost(key_pair.public_key, swarm)
for key_pair, swarm in zip(key_pairs, swarms)
)
class FloodsubFactory(factory.Factory):
class Meta:
model = FloodSub
protocols = (FLOODSUB_PROTOCOL_ID,)
class GossipsubFactory(factory.Factory):
class Meta:
model = GossipSub
protocols = (GOSSIPSUB_PROTOCOL_ID,)
degree = GOSSIPSUB_PARAMS.degree
degree_low = GOSSIPSUB_PARAMS.degree_low
degree_high = GOSSIPSUB_PARAMS.degree_high
time_to_live = GOSSIPSUB_PARAMS.time_to_live
gossip_window = GOSSIPSUB_PARAMS.gossip_window
gossip_history = GOSSIPSUB_PARAMS.gossip_history
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
class PubsubFactory(factory.Factory):
class Meta:
model = Pubsub
host = factory.SubFactory(HostFactory)
router = None
my_id = factory.LazyAttribute(lambda obj: obj.host.get_id())
cache_size = None
async def swarm_pair_factory(
is_secure: bool, muxer_opt: TMuxerOptions = None
) -> Tuple[Swarm, Swarm]:
swarms = await SwarmFactory.create_batch_and_listen(
is_secure, 2, muxer_opt=muxer_opt
)
await connect_swarm(swarms[0], swarms[1])
return swarms[0], swarms[1]
async def host_pair_factory(is_secure: bool) -> Tuple[BasicHost, BasicHost]:
hosts = await HostFactory.create_batch_and_listen(is_secure, 2)
await connect(hosts[0], hosts[1])
return hosts[0], hosts[1]
@asynccontextmanager
async def pair_of_connected_hosts(
is_secure: bool = True
) -> AsyncIterator[Tuple[BasicHost, BasicHost]]:
a, b = await host_pair_factory(is_secure)
yield a, b
close_tasks = (a.close(), b.close())
await asyncio.gather(*close_tasks)
async def swarm_conn_pair_factory(
is_secure: bool, muxer_opt: TMuxerOptions = None
) -> Tuple[SwarmConn, Swarm, SwarmConn, Swarm]:
swarms = await swarm_pair_factory(is_secure)
conn_0 = swarms[0].connections[swarms[1].get_peer_id()]
conn_1 = swarms[1].connections[swarms[0].get_peer_id()]
return cast(SwarmConn, conn_0), swarms[0], cast(SwarmConn, conn_1), swarms[1]
async def mplex_conn_pair_factory(is_secure: bool) -> Tuple[Mplex, Swarm, Mplex, Swarm]:
muxer_opt = {MPLEX_PROTOCOL_ID: Mplex}
conn_0, swarm_0, conn_1, swarm_1 = await swarm_conn_pair_factory(
is_secure, muxer_opt=muxer_opt
)
return (
cast(Mplex, conn_0.muxed_conn),
swarm_0,
cast(Mplex, conn_1.muxed_conn),
swarm_1,
)
async def mplex_stream_pair_factory(
is_secure: bool
) -> Tuple[MplexStream, Swarm, MplexStream, Swarm]:
mplex_conn_0, swarm_0, mplex_conn_1, swarm_1 = await mplex_conn_pair_factory(
is_secure
)
stream_0 = await mplex_conn_0.open_stream()
await asyncio.sleep(0.01)
stream_1: MplexStream
async with mplex_conn_1.streams_lock:
if len(mplex_conn_1.streams) != 1:
raise Exception("Mplex should not have any stream upon connection")
stream_1 = tuple(mplex_conn_1.streams.values())[0]
return cast(MplexStream, stream_0), swarm_0, stream_1, swarm_1
async def net_stream_pair_factory(
is_secure: bool
) -> Tuple[INetStream, BasicHost, INetStream, BasicHost]:
protocol_id = TProtocol("/example/id/1")
stream_1: INetStream
# Just a proxy, we only care about the stream
def handler(stream: INetStream) -> None:
nonlocal stream_1
stream_1 = stream
host_0, host_1 = await host_pair_factory(is_secure)
host_1.set_stream_handler(protocol_id, handler)
stream_0 = await host_0.new_stream(host_1.get_id(), [protocol_id])
return stream_0, host_0, stream_1, host_1

View File

View File

@ -0,0 +1,2 @@
LOCALHOST_IP = "127.0.0.1"
PEXPECT_NEW_LINE = "\r\n"

View File

@ -0,0 +1,205 @@
import asyncio
import time
from typing import Any, Awaitable, Callable, List
import multiaddr
from multiaddr import Multiaddr
from p2pclient import Client
import pytest
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
from .constants import LOCALHOST_IP
from .envs import GO_BIN_PATH
P2PD_PATH = GO_BIN_PATH / "p2pd"
TIMEOUT_DURATION = 30
async def try_until_success(
coro_func: Callable[[], Awaitable[Any]], timeout: int = TIMEOUT_DURATION
) -> None:
"""
Keep running ``coro_func`` until either it succeed or time is up.
All arguments of ``coro_func`` should be filled, i.e. it should be
called without arguments.
"""
t_start = time.monotonic()
while True:
result = await coro_func()
if result:
break
if (time.monotonic() - t_start) >= timeout:
# timeout
pytest.fail(f"{coro_func} is still failing after `{timeout}` seconds")
await asyncio.sleep(0.01)
class P2PDProcess:
proc: asyncio.subprocess.Process
cmd: str = str(P2PD_PATH)
args: List[Any]
is_proc_running: bool
_tasks: List["asyncio.Future[Any]"]
def __init__(
self,
control_maddr: Multiaddr,
is_secure: bool,
is_pubsub_enabled: bool = True,
is_gossipsub: bool = True,
is_pubsub_signing: bool = False,
is_pubsub_signing_strict: bool = False,
) -> None:
args = [f"-listen={str(control_maddr)}"]
# NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`.
if not is_secure:
args.append("-insecure=true")
if is_pubsub_enabled:
args.append("-pubsub")
if is_gossipsub:
args.append("-pubsubRouter=gossipsub")
else:
args.append("-pubsubRouter=floodsub")
if not is_pubsub_signing:
args.append("-pubsubSign=false")
if not is_pubsub_signing_strict:
args.append("-pubsubSignStrict=false")
# NOTE:
# Two other params are possibly what we want to configure:
# - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501
# - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second
# Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501
self.args = args
self.is_proc_running = False
self._tasks = []
async def wait_until_ready(self) -> None:
lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
lines_head_occurred = {line: False for line in lines_head_pattern}
async def read_from_daemon_and_check() -> bool:
line = await self.proc.stdout.readline()
for head_pattern in lines_head_occurred:
if line.startswith(head_pattern):
lines_head_occurred[head_pattern] = True
return all([value for value in lines_head_occurred.values()])
await try_until_success(read_from_daemon_and_check)
# Sleep a little bit to ensure the listener is up after logs are emitted.
await asyncio.sleep(0.01)
async def start_printing_logs(self) -> None:
async def _print_from_stream(
src_name: str, reader: asyncio.StreamReader
) -> None:
while True:
line = await reader.readline()
if line != b"":
print(f"{src_name}\t: {line.rstrip().decode()}")
await asyncio.sleep(0.01)
self._tasks.append(
asyncio.ensure_future(_print_from_stream("out", self.proc.stdout))
)
self._tasks.append(
asyncio.ensure_future(_print_from_stream("err", self.proc.stderr))
)
await asyncio.sleep(0)
async def start(self) -> None:
if self.is_proc_running:
return
self.proc = await asyncio.subprocess.create_subprocess_exec(
self.cmd,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
bufsize=0,
)
self.is_proc_running = True
await self.wait_until_ready()
await self.start_printing_logs()
async def close(self) -> None:
if self.is_proc_running:
self.proc.terminate()
await self.proc.wait()
self.is_proc_running = False
for task in self._tasks:
task.cancel()
class Daemon:
p2pd_proc: P2PDProcess
control: Client
peer_info: PeerInfo
def __init__(
self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo
) -> None:
self.p2pd_proc = p2pd_proc
self.control = control
self.peer_info = peer_info
def __repr__(self) -> str:
return f"<Daemon {self.peer_id.to_string()[2:8]}>"
@property
def peer_id(self) -> ID:
return self.peer_info.peer_id
@property
def listen_maddr(self) -> Multiaddr:
return self.peer_info.addrs[0]
async def close(self) -> None:
await self.p2pd_proc.close()
await self.control.close()
async def make_p2pd(
daemon_control_port: int,
client_callback_port: int,
is_secure: bool,
is_pubsub_enabled: bool = True,
is_gossipsub: bool = True,
is_pubsub_signing: bool = False,
is_pubsub_signing_strict: bool = False,
) -> Daemon:
control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}")
p2pd_proc = P2PDProcess(
control_maddr,
is_secure,
is_pubsub_enabled,
is_gossipsub,
is_pubsub_signing,
is_pubsub_signing_strict,
)
await p2pd_proc.start()
client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}")
p2pc = Client(control_maddr, client_callback_maddr)
await p2pc.listen()
peer_id, maddrs = await p2pc.identify()
listen_maddr: Multiaddr = None
for maddr in maddrs:
try:
ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4)
# NOTE: Check if this `maddr` uses `tcp`.
maddr.value_for_protocol(multiaddr.protocols.P_TCP)
except multiaddr.exceptions.ProtocolLookupError:
continue
if ip == LOCALHOST_IP:
listen_maddr = maddr
break
assert listen_maddr is not None, "no loopback maddr is found"
peer_info = info_from_p2p_addr(
listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
)
return Daemon(p2pd_proc, p2pc, peer_info)

View File

@ -0,0 +1,4 @@
import os
import pathlib
GO_BIN_PATH = pathlib.Path(os.environ["GOPATH"]) / "bin"

View File

@ -0,0 +1,58 @@
import asyncio
from typing import Union
from multiaddr import Multiaddr
from libp2p.host.host_interface import IHost
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from .daemon import Daemon
TDaemonOrHost = Union[IHost, Daemon]
def _get_peer_info(node: TDaemonOrHost) -> PeerInfo:
peer_info: PeerInfo
if isinstance(node, Daemon):
peer_info = node.peer_info
else: # isinstance(node, IHost)
peer_id = node.get_id()
maddrs = [
node.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
]
peer_info = PeerInfo(peer_id, maddrs)
return peer_info
async def _is_peer(peer_id: ID, node: TDaemonOrHost) -> bool:
if isinstance(node, Daemon):
pinfos = await node.control.list_peers()
peers = tuple([pinfo.peer_id for pinfo in pinfos])
return peer_id in peers
else: # isinstance(node, IHost)
return peer_id in node.get_network().connections
async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None:
# Type check
err_msg = (
f"Type of a={type(a)} or type of b={type(b)} is wrong."
"Should be either `IHost` or `Daemon`"
)
assert all(
[isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)]
), err_msg
b_peer_info = _get_peer_info(b)
if isinstance(a, Daemon):
await a.control.connect(b_peer_info.peer_id, b_peer_info.addrs)
else: # isinstance(b, IHost)
await a.connect(b_peer_info)
# Allow additional sleep for both side to establish the connection.
await asyncio.sleep(0.1)
a_peer_info = _get_peer_info(a)
assert await _is_peer(b_peer_info.peer_id, a)
assert await _is_peer(a_peer_info.peer_id, b)

View File

View File

@ -0,0 +1,135 @@
import asyncio
from typing import Dict
import uuid
from libp2p.host.host_interface import IHost
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from libp2p.tools.constants import LISTEN_MADDR
from libp2p.tools.factories import FloodsubFactory, PubsubFactory
CRYPTO_TOPIC = "ethereum"
# Message format:
# Sending crypto: <source>,<dest>,<amount as integer>
# Ex. send,aspyn,alex,5
# Set crypto: <dest>,<amount as integer>
# Ex. set,rob,5
# Determine message type by looking at first item before first comma
class DummyAccountNode:
"""
Node which has an internal balance mapping, meant to serve as a dummy
crypto blockchain.
There is no actual blockchain, just a simple map indicating how much
crypto each user in the mappings holds
"""
libp2p_node: IHost
pubsub: Pubsub
floodsub: FloodSub
def __init__(self, libp2p_node: IHost, pubsub: Pubsub, floodsub: FloodSub):
self.libp2p_node = libp2p_node
self.pubsub = pubsub
self.floodsub = floodsub
self.balances: Dict[str, int] = {}
self.node_id = str(uuid.uuid1())
@classmethod
async def create(cls) -> "DummyAccountNode":
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and
a pubsub instance to this new node.
We use create as this serves as a factory function and allows us
to use async await, unlike the init function
"""
pubsub = PubsubFactory(router=FloodsubFactory())
await pubsub.host.get_network().listen(LISTEN_MADDR)
return cls(libp2p_node=pubsub.host, pubsub=pubsub, floodsub=pubsub.router)
async def handle_incoming_msgs(self) -> None:
"""Handle all incoming messages on the CRYPTO_TOPIC from peers."""
while True:
incoming = await self.q.get()
msg_comps = incoming.data.decode("utf-8").split(",")
if msg_comps[0] == "send":
self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3]))
elif msg_comps[0] == "set":
self.handle_set_crypto(msg_comps[1], int(msg_comps[2]))
async def setup_crypto_networking(self) -> None:
"""Subscribe to CRYPTO_TOPIC and perform call to function that handles
all incoming messages on said topic."""
self.q = await self.pubsub.subscribe(CRYPTO_TOPIC)
asyncio.ensure_future(self.handle_incoming_msgs())
async def publish_send_crypto(
self, source_user: str, dest_user: str, amount: int
) -> None:
"""
Create a send crypto message and publish that message to all other
nodes.
:param source_user: user to send crypto from
:param dest_user: user to send crypto to
:param amount: amount of crypto to send
"""
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode())
async def publish_set_crypto(self, user: str, amount: int) -> None:
"""
Create a set crypto message and publish that message to all other
nodes.
:param user: user to set crypto for
:param amount: amount of crypto
"""
msg_contents = "set," + user + "," + str(amount)
await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode())
def handle_send_crypto(self, source_user: str, dest_user: str, amount: int) -> None:
"""
Handle incoming send_crypto message.
:param source_user: user to send crypto from
:param dest_user: user to send crypto to
:param amount: amount of crypto to send
"""
if source_user in self.balances:
self.balances[source_user] -= amount
else:
self.balances[source_user] = -amount
if dest_user in self.balances:
self.balances[dest_user] += amount
else:
self.balances[dest_user] = amount
def handle_set_crypto(self, dest_user: str, amount: int) -> None:
"""
Handle incoming set_crypto message.
:param dest_user: user to set crypto for
:param amount: amount of crypto
"""
self.balances[dest_user] = amount
def get_balance(self, user: str) -> int:
"""
Get balance in crypto for a particular user.
:param user: user to get balance for
:return: balance of user
"""
if user in self.balances:
return self.balances[user]
else:
return -1

View File

@ -0,0 +1,261 @@
# type: ignore
# To add typing to this module, it's better to do it after refactoring test cases into classes
import asyncio
import pytest
from libp2p.tools.constants import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR
from libp2p.tools.factories import PubsubFactory
from libp2p.tools.utils import connect
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
FLOODSUB_PROTOCOL_TEST_CASES = [
{
"name": "simple_two_nodes",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"A": ["B"]},
"topic_map": {"topic1": ["B"]},
"messages": [{"topics": ["topic1"], "data": b"foo", "node_id": "A"}],
},
{
"name": "three_nodes_two_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"A": ["B"], "B": ["C"]},
"topic_map": {"topic1": ["B", "C"], "topic2": ["B", "C"]},
"messages": [
{"topics": ["topic1"], "data": b"foo", "node_id": "A"},
{"topics": ["topic2"], "data": b"Alex is tall", "node_id": "A"},
],
},
{
"name": "two_nodes_one_topic_single_subscriber_is_sender",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"A": ["B"]},
"topic_map": {"topic1": ["B"]},
"messages": [{"topics": ["topic1"], "data": b"Alex is tall", "node_id": "B"}],
},
{
"name": "two_nodes_one_topic_two_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"A": ["B"]},
"topic_map": {"topic1": ["B"]},
"messages": [
{"topics": ["topic1"], "data": b"Alex is tall", "node_id": "B"},
{"topics": ["topic1"], "data": b"foo", "node_id": "A"},
],
},
{
"name": "seven_nodes_tree_one_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]},
"topic_map": {"astrophysics": ["2", "3", "4", "5", "6", "7"]},
"messages": [{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}],
},
{
"name": "seven_nodes_tree_three_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"],
},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["space"], "data": b"foobar", "node_id": "1"},
{"topics": ["onions"], "data": b"I am allergic", "node_id": "1"},
],
},
{
"name": "seven_nodes_tree_three_topics_diff_origin",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"],
},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["space"], "data": b"foobar", "node_id": "4"},
{"topics": ["onions"], "data": b"I am allergic", "node_id": "7"},
],
},
{
"name": "three_nodes_clique_two_topic_diff_origin",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"1": ["2", "3"], "2": ["3"]},
"topic_map": {"astrophysics": ["1", "2", "3"], "school": ["1", "2", "3"]},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"},
],
},
{
"name": "four_nodes_clique_two_topic_diff_origin_many_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3", "4"],
"2": ["1", "3", "4"],
"3": ["1", "2", "4"],
"4": ["1", "2", "3"],
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"],
},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"},
{"topics": ["school"], "data": b"foobar2", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar3", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic3", "node_id": "1"},
],
},
{
"name": "five_nodes_ring_two_topic_diff_origin_many_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {"1": ["2"], "2": ["3"], "3": ["4"], "4": ["5"], "5": ["1"]},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5"],
"school": ["1", "2", "3", "4", "5"],
},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"},
{"topics": ["school"], "data": b"foobar2", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar3", "node_id": "2"},
{"topics": ["astrophysics"], "data": b"I am allergic3", "node_id": "1"},
],
},
]
floodsub_protocol_pytest_params = [
pytest.param(test_case, id=test_case["name"])
for test_case in FLOODSUB_PROTOCOL_TEST_CASES
]
async def perform_test_from_obj(obj, router_factory) -> None:
"""
Perform pubsub tests from a test obj.
test obj are composed as follows:
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": b"some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
pubsub_map = {}
async def add_node(node_id_str: str) -> None:
pubsub_router = router_factory(protocols=obj["supported_protocols"])
pubsub = PubsubFactory(router=pubsub_router)
await pubsub.host.get_network().listen(LISTEN_MADDR)
node_map[node_id_str] = pubsub.host
pubsub_map[node_id_str] = pubsub
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
await add_node(start_node_id)
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
await add_node(neighbor_id)
tasks_connect.append(
connect(node_map[start_node_id], node_map[neighbor_id])
)
# Connect nodes and wait at least for 2 seconds
await asyncio.gather(*tasks_connect, asyncio.sleep(2))
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic, node_ids in topic_map.items():
for node_id in node_ids:
tasks_topic.append(pubsub_map[node_id].subscribe(topic))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic)
for i in range(len(responses) - 1):
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = responses[i]
# Allow time for subscribing before continuing
await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Publish message
# TODO: Should be single RPC package with several topics
for topic in topics:
tasks_publish.append(pubsub_map[node_id].publish(topic, data))
# For each topic in topics, add (topic, node_id, data) tuple to ordered test list
for topic in topics:
topics_in_msgs_ordered.append((topic, node_id, data))
# Allow time for publishing before continuing
await asyncio.gather(*tasks_publish, asyncio.sleep(2))
# Step 4) Check that all messages were received correctly.
for topic, origin_node_id, data in topics_in_msgs_ordered:
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg = await queues_map[node_id][topic].get()
assert data == msg.data
# Check the message origin
assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id
# Success, terminate pending tasks.

View File

@ -0,0 +1,32 @@
from typing import Sequence
from libp2p.host.host_interface import IHost
from libp2p.peer.id import ID
from libp2p.pubsub.pb import rpc_pb2
from libp2p.tools.utils import connect
def make_pubsub_msg(
origin_id: ID, topic_ids: Sequence[str], data: bytes, seqno: bytes
) -> rpc_pb2.Message:
return rpc_pb2.Message(
from_id=origin_id.to_bytes(), seqno=seqno, data=data, topicIDs=list(topic_ids)
)
# TODO: Implement sparse connect
async def dense_connect(hosts: Sequence[IHost]) -> None:
await connect_some(hosts, 10)
# FIXME: `degree` is not used at all
async def connect_some(hosts: Sequence[IHost], degree: int) -> None:
for i, host in enumerate(hosts):
for host2 in hosts[i + 1 :]:
await connect(host, host2)
async def one_to_all_connect(hosts: Sequence[IHost], central_host_index: int) -> None:
for i, host in enumerate(hosts):
if i != central_host_index:
await connect(hosts[central_host_index], host)

97
libp2p/tools/utils.py Normal file
View File

@ -0,0 +1,97 @@
from typing import List, Sequence, Tuple
import multiaddr
from libp2p import new_node
from libp2p.host.basic_host import BasicHost
from libp2p.host.host_interface import IHost
from libp2p.kademlia.network import KademliaServer
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.network.swarm import Swarm
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.routing.interfaces import IPeerRouting
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from libp2p.typing import StreamHandlerFn, TProtocol
from .constants import MAX_READ_LEN
async def connect_swarm(swarm_0: Swarm, swarm_1: Swarm) -> None:
peer_id = swarm_1.get_peer_id()
addrs = tuple(
addr
for transport in swarm_1.listeners.values()
for addr in transport.get_addrs()
)
swarm_0.peerstore.add_addrs(peer_id, addrs, 10000)
await swarm_0.dial_peer(peer_id)
assert swarm_0.get_peer_id() in swarm_1.connections
assert swarm_1.get_peer_id() in swarm_0.connections
async def connect(node1: IHost, node2: IHost) -> None:
"""Connect node1 to node2."""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def set_up_nodes_by_transport_opt(
transport_opt_list: Sequence[Sequence[str]]
) -> Tuple[BasicHost, ...]:
nodes_list = []
for transport_opt in transport_opt_list:
node = await new_node(transport_opt=transport_opt)
await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0]))
nodes_list.append(node)
return tuple(nodes_list)
async def set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list: Sequence[Tuple[Sequence[str], IPeerRouting]]
) -> Tuple[BasicHost, ...]:
nodes_list = []
for transport_opt, disc_opt in transport_disc_opt_list:
node = await new_node(transport_opt=transport_opt, disc_opt=disc_opt)
await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0]))
nodes_list.append(node)
return tuple(nodes_list)
async def set_up_routers(
router_confs: Tuple[int, int] = (0, 0)
) -> List[KadmeliaPeerRouter]:
"""The default ``router_confs`` selects two free ports local to this
machine."""
bootstrap_node = KademliaServer() # type: ignore
await bootstrap_node.listen(router_confs[0])
routers = [KadmeliaPeerRouter(bootstrap_node)]
for port in router_confs[1:]:
node = KademliaServer() # type: ignore
await node.listen(port)
await node.bootstrap_node(bootstrap_node.address)
routers.append(KadmeliaPeerRouter(node))
return routers
async def echo_stream_handler(stream: INetStream) -> None:
while True:
read_string = (await stream.read(MAX_READ_LEN)).decode()
resp = "ack:" + read_string
await stream.write(resp.encode())
async def perform_two_host_set_up(
handler: StreamHandlerFn = echo_stream_handler
) -> Tuple[BasicHost, BasicHost]:
transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
(node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list)
node_b.set_stream_handler(TProtocol("/echo/1.0.0"), handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
return node_a, node_b