diff --git a/.travis.yml b/.travis.yml index 2373658c..f5d470d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,13 +2,13 @@ language: python matrix: include: - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=py37-test - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=lint - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=py37-interop sudo: true @@ -18,7 +18,7 @@ matrix: - export GOPATH=$HOME/go - export GOROOT=/usr/local/go - export PATH=$GOROOT/bin:$GOPATH/bin:$PATH - - ./install_interop_go_pkgs.sh + - ./tests/interop/go_pkgs/install_interop_go_pkgs.sh install: - pip install --upgrade pip diff --git a/install_interop_go_pkgs.sh b/install_interop_go_pkgs.sh deleted file mode 100755 index cdf3193c..00000000 --- a/install_interop_go_pkgs.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -go version -cd tests/interop/go_pkgs/ -go install ./... diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index 9fab8dc7..346f6714 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -1,11 +1,11 @@ import heapq from operator import itemgetter import random +from typing import List from multiaddr import Multiaddr from libp2p.peer.id import ID -from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import PeerInfo from .utils import digest @@ -15,16 +15,16 @@ P_UDP = "udp" class KadPeerInfo(PeerInfo): - def __init__(self, peer_id, peer_data=None): - super(KadPeerInfo, self).__init__(peer_id, peer_data) + def __init__(self, peer_id, addrs): + super(KadPeerInfo, self).__init__(peer_id, addrs) self.peer_id_bytes = peer_id.to_bytes() self.xor_id = peer_id.xor_id - self.addrs = peer_data.get_addrs() if peer_data else None + self.addrs = addrs - self.ip = self.addrs[0].value_for_protocol(P_IP) if peer_data else None - self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if peer_data else None + self.ip = self.addrs[0].value_for_protocol(P_IP) if addrs else None + self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if addrs else None def same_home_as(self, node): return sorted(self.addrs) == sorted(node.addrs) @@ -142,14 +142,14 @@ def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None): node_id = ( ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255))) ) - peer_data = None + addrs: List[Multiaddr] if sender_ip and sender_port: - peer_data = PeerData() - addr = [ + addrs = [ Multiaddr( "/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port) ) ] - peer_data.add_addrs(addr) + else: + addrs = [] - return KadPeerInfo(node_id, peer_data) + return KadPeerInfo(node_id, addrs) diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 010bd922..ff78f5a8 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -7,6 +7,7 @@ from .net_stream_interface import INetStream class NetStream(INetStream): muxed_stream: IMuxedStream + # TODO: Why we expose `mplex_conn` here? mplex_conn: IMuxedConn protocol_id: TProtocol diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 300ad71e..bccfdac1 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -4,6 +4,7 @@ from typing import Callable, Dict, List, Sequence from multiaddr import Multiaddr from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore_interface import IPeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient @@ -92,55 +93,55 @@ class Swarm(INetwork): :return: muxed connection """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) + if peer_id in self.connections: + # If muxed connection already exists for peer_id, + # set muxed connection equal to existing muxed connection + return self.connections[peer_id] + + try: + # Get peer info from peer store + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError: + raise SwarmException(f"No known addresses to peer {peer_id}") if not addrs: - raise SwarmException("No known addresses to peer") + raise SwarmException(f"No known addresses to peer {peer_id}") if not self.router: multiaddr = addrs[0] else: multiaddr = self.router.find_peer(peer_id) + # Dial peer (connection to peer does not yet exist) + # Transport dials peer (gets back a raw conn) + raw_conn = await self.transport.dial(multiaddr, self.self_id) - if peer_id in self.connections: - # If muxed connection already exists for peer_id, - # set muxed connection equal to existing muxed connection - muxed_conn = self.connections[peer_id] - else: - # Dial peer (connection to peer does not yet exist) - # Transport dials peer (gets back a raw conn) - raw_conn = await self.transport.dial(multiaddr, self.self_id) + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn + try: + secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await raw_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a secured connection from {peer_id}" + ) from error + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error - # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure - # the conn and then mux the conn - try: - secured_conn = await self.upgrader.upgrade_security( - raw_conn, peer_id, True - ) - except SecurityUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await raw_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a secured connection from {peer_id}" - ) from error - try: - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) - except MuxerUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await secured_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a muxed connection from {peer_id}" - ) from error + # Store muxed connection in connections + self.connections[peer_id] = muxed_conn - # Store muxed connection in connections - self.connections[peer_id] = muxed_conn - - # Call notifiers since event occurred - for notifee in self.notifees: - await notifee.connected(self, muxed_conn) + # Call notifiers since event occurred + for notifee in self.notifees: + await notifee.connected(self, muxed_conn) return muxed_conn @@ -152,11 +153,6 @@ class Swarm(INetwork): :param protocol_id: protocol id :return: net stream instance """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) - - if not addrs: - raise SwarmException("No known addresses to peer") muxed_conn = await self.dial_peer(peer_id) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index a2f08424..069a67dd 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -1,9 +1,8 @@ -from typing import List +from typing import List, Sequence import multiaddr from .id import ID -from .peerdata import PeerData class PeerInfo: @@ -11,9 +10,9 @@ class PeerInfo: peer_id: ID addrs: List[multiaddr.Multiaddr] - def __init__(self, peer_id: ID, peer_data: PeerData = None) -> None: + def __init__(self, peer_id: ID, addrs: Sequence[multiaddr.Multiaddr]) -> None: self.peer_id = peer_id - self.addrs = peer_data.get_addrs() if peer_data else None + self.addrs = list(addrs) def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: @@ -44,11 +43,7 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: if len(parts) > 1: addr = multiaddr.Multiaddr.join(*parts[:-1]) - peer_data = PeerData() - peer_data.add_addrs([addr]) - peer_data.set_protocols([p.code for p in addr.protocols()]) - - return PeerInfo(peer_id, peer_data) + return PeerInfo(peer_id, [addr]) class InvalidAddrError(ValueError): diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 1d15ab2a..c1eae370 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -33,7 +33,7 @@ class PeerStore(IPeerStore): def peer_info(self, peer_id: ID) -> Optional[PeerInfo]: if peer_id in self.peer_map: peer_data = self.peer_map[peer_id] - return PeerInfo(peer_id, peer_data) + return PeerInfo(peer_id, peer_data.addrs) return None def get_protocols(self, peer_id: ID) -> List[str]: diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index d35b97b6..3ded0fe2 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -2,11 +2,14 @@ from typing import Iterable, List, Sequence from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed from .pb import rpc_pb2 from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter +PROTOCOL_ID = TProtocol("/floodsub/1.0.0") + class FloodSub(IPubsubRouter): @@ -76,7 +79,7 @@ class FloodSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - await stream.write(rpc_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) async def join(self, topic: str) -> None: """ diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8b3a62cb..267bb81e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -4,13 +4,17 @@ import random from typing import Any, Dict, Iterable, List, Sequence, Set from libp2p.peer.id import ID +from libp2p.pubsub import floodsub from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed from .mcache import MessageCache from .pb import rpc_pb2 from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter +PROTOCOL_ID = TProtocol("/meshsub/1.0.0") + class GossipSub(IPubsubRouter): @@ -104,16 +108,19 @@ class GossipSub(IPubsubRouter): :param peer_id: id of peer to add :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ - - # Add peer to the correct peer list - peer_type = GossipSub.get_peer_type(protocol_id) - self.peers_to_protocol[peer_id] = protocol_id - if peer_type == "gossip": + if protocol_id == PROTOCOL_ID: self.peers_gossipsub.append(peer_id) - elif peer_type == "flood": + elif protocol_id == floodsub.PROTOCOL_ID: self.peers_floodsub.append(peer_id) + else: + # We should never enter here. Becuase the `protocol_id` is registered by your pubsub + # instance in multistream-select, but it is not the protocol that gossipsub supports. + # In this case, probably we registered gossipsub to a wrong `protocol_id` + # in multistream-select, or wrong versions. + # TODO: Better handling + raise Exception(f"protocol is not supported: protocol_id={protocol_id}") def remove_peer(self, peer_id: ID) -> None: """ @@ -167,7 +174,7 @@ class GossipSub(IPubsubRouter): # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. - await stream.write(rpc_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -264,29 +271,6 @@ class GossipSub(IPubsubRouter): # Forget mesh[topic] self.mesh.pop(topic, None) - # Interface Helper Functions - @staticmethod - def get_peer_type(protocol_id: str) -> str: - # TODO: Do this in a better, more efficient way - if "gossipsub" in protocol_id: - return "gossip" - if "floodsub" in protocol_id: - return "flood" - return "unknown" - - async def deliver_messages_to_peers( - self, peers: List[ID], msg_sender: ID, origin_id: ID, serialized_packet: bytes - ) -> None: - for peer_id_in_topic in peers: - # Forward to all peers that are not the - # message sender and are not the message origin - - if peer_id_in_topic not in (msg_sender, origin_id): - stream = self.pubsub.peers[peer_id_in_topic] - - # Publish the packet - await stream.write(serialized_packet) - # Heartbeat async def heartbeat(self) -> None: """ @@ -509,7 +493,7 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - await peer_stream.write(rpc_msg) + await peer_stream.write(encode_varint_prefixed(rpc_msg)) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -601,4 +585,4 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - await peer_stream.write(rpc_msg) + await peer_stream.write(encode_varint_prefixed(rpc_msg)) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a19c99ae..b1812933 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -21,6 +21,7 @@ from libp2p.host.host_interface import IHost from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -71,7 +72,7 @@ class Pubsub: topic_validators: Dict[str, TopicValidator] - # NOTE: Be sure it is increased atomically everytime. + # TODO: Be sure it is increased atomically everytime. counter: int # uint64 def __init__( @@ -131,7 +132,7 @@ class Pubsub: # Call handle peer to keep waiting for updates to peer queue asyncio.ensure_future(self.handle_peer_queue()) - def get_hello_packet(self) -> bytes: + def get_hello_packet(self) -> rpc_pb2.RPC: """ Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics @@ -141,7 +142,7 @@ class Pubsub: packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) - return packet.SerializeToString() + return packet async def continuously_read_stream(self, stream: INetStream) -> None: """ @@ -152,17 +153,14 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming: bytes = (await stream.read()) + incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - if rpc_incoming.publish: # deal with RPC.publish for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): continue - # TODO(mhchia): This will block this read_stream loop until all data are pushed. - # Should investigate further if this is an issue. asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: @@ -220,20 +218,19 @@ class Pubsub: on one of the supported pubsub protocols. :param stream: newly created stream """ - # Add peer - # Map peer to stream - peer_id: ID = stream.mplex_conn.peer_id + await self.continuously_read_stream(stream) + + async def _handle_new_peer(self, peer_id: ID) -> None: + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet - hello: bytes = self.get_hello_packet() - - await stream.write(hello) - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) - # Force context switch - await asyncio.sleep(0) + hello = self.get_hello_packet() + await stream.write(encode_varint_prefixed(hello.SerializeToString())) + # TODO: Check EOF of this stream. + # TODO: Check if the peer in black list. + self.router.add_peer(peer_id, stream.get_protocol()) async def handle_peer_queue(self) -> None: """ @@ -246,25 +243,9 @@ class Pubsub: peer_id: ID = await self.peer_queue.get() - # Open a stream to peer on existing connection - # (we know connection exists since that's the only way - # an element gets added to peer_queue) - stream: INetStream = await self.host.new_stream(peer_id, self.protocols) - # Add Peer - # Map peer to stream - self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) - - # Send hello packet - hello: bytes = self.get_hello_packet() - await stream.write(hello) - - # TODO: Investigate whether this should be replaced by `handlePeerEOF` - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501 - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) + asyncio.ensure_future(self._handle_new_peer(peer_id)) # Force context switch await asyncio.sleep(0) @@ -365,7 +346,7 @@ class Pubsub: # Broadcast message for stream in self.peers.values(): # Write message to stream - await stream.write(raw_msg) + await stream.write(encode_varint_prefixed(raw_msg)) async def publish(self, topic_id: str, data: bytes) -> None: """ diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 8878a276..6ecab1ab 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -36,11 +36,7 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - - # Only add peer_id if we are initiator (otherwise we would end up - # with two pubsub streams between us and the peer) - if conn.initiator: - await self.initiator_peers_queue.put(conn.peer_id) + await self.initiator_peers_queue.put(conn.peer_id) async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None: pass diff --git a/libp2p/typing.py b/libp2p/typing.py index f36d8ab7..ba776e19 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -9,5 +9,4 @@ if TYPE_CHECKING: TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] - -StreamReader = Union["IMuxedStream", IRawConnection] +StreamReader = Union["IMuxedStream", "INetStream", IRawConnection] diff --git a/setup.py b/setup.py index 1567c039..e3ef58e7 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,8 @@ extras_require = { "pytest>=4.6.3,<5.0.0", "pytest-asyncio>=0.10.0,<1.0.0", "pexpect>=4.6,<5", + # FIXME: Master branch. Use PyPI instead after it is released. + "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@628266f", ], "lint": [ "mypy>=0.701,<1.0", diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index e85f2f6a..7261ee7b 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -1,8 +1,15 @@ +import asyncio import sys +from typing import Union import pexpect import pytest +from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory +from tests.pubsub.configs import GOSSIPSUB_PARAMS + +from .daemon import Daemon, make_p2pd + @pytest.fixture def proc_factory(): @@ -22,3 +29,52 @@ def proc_factory(): finally: for proc in procs: proc.close() + + +@pytest.fixture +def num_p2pds(): + return 1 + + +@pytest.fixture +def is_gossipsub(): + return True + + +@pytest.fixture +async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory): + p2pds: Union[Daemon, Exception] = await asyncio.gather( + *[ + make_p2pd( + unused_tcp_port_factory(), + unused_tcp_port_factory(), + is_host_secure, + is_gossipsub=is_gossipsub, + ) + for _ in range(num_p2pds) + ], + return_exceptions=True, + ) + p2pds_succeeded = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Daemon)) + if len(p2pds_succeeded) != len(p2pds): + # Not all succeeded. Close the succeeded ones and print the failed ones(exceptions). + await asyncio.gather(*[p2pd.close() for p2pd in p2pds_succeeded]) + exceptions = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Exception)) + raise Exception(f"not all p2pds succeed: first exception={exceptions[0]}") + try: + yield p2pds + finally: + await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) + + +@pytest.fixture +def pubsubs(num_hosts, hosts, is_gossipsub): + if is_gossipsub: + routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) + else: + routers = FloodsubFactory.create_batch(num_hosts) + _pubsubs = tuple( + PubsubFactory(host=host, router=router) for host, router in zip(hosts, routers) + ) + yield _pubsubs + # TODO: Clean up diff --git a/tests/interop/constants.py b/tests/interop/constants.py index dbef0437..331e2843 100644 --- a/tests/interop/constants.py +++ b/tests/interop/constants.py @@ -1 +1,2 @@ +LOCALHOST_IP = "127.0.0.1" PEXPECT_NEW_LINE = "\r\n" diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py new file mode 100644 index 00000000..97356845 --- /dev/null +++ b/tests/interop/daemon.py @@ -0,0 +1,200 @@ +import asyncio +import time +from typing import Any, List + +import multiaddr +from multiaddr import Multiaddr +from p2pclient import Client + +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr + +from .constants import LOCALHOST_IP +from .envs import GO_BIN_PATH + +P2PD_PATH = GO_BIN_PATH / "p2pd" + + +TIMEOUT_DURATION = 30 + + +async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): + """ + Keep running ``coro_func`` until either it succeed or time is up. + All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. + """ + t_start = time.monotonic() + while True: + result = await coro_func() + if result: + break + if (time.monotonic() - t_start) >= timeout: + # timeout + assert False, f"{coro_func} is still failing after `{timeout}` seconds" + await asyncio.sleep(0.01) + + +class P2PDProcess: + proc: asyncio.subprocess.Process + cmd: str = str(P2PD_PATH) + args: List[Any] + is_proc_running: bool + + _tasks: List["asyncio.Future[Any]"] + + def __init__( + self, + control_maddr: Multiaddr, + is_secure: bool, + is_pubsub_enabled: bool = True, + is_gossipsub: bool = True, + is_pubsub_signing: bool = False, + is_pubsub_signing_strict: bool = False, + ) -> None: + args = [f"-listen={str(control_maddr)}"] + # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. + if not is_secure: + args.append("-insecure=true") + if is_pubsub_enabled: + args.append("-pubsub") + if is_gossipsub: + args.append("-pubsubRouter=gossipsub") + else: + args.append("-pubsubRouter=floodsub") + if not is_pubsub_signing: + args.append("-pubsubSign=false") + if not is_pubsub_signing_strict: + args.append("-pubsubSignStrict=false") + # NOTE: + # Two other params are possibly what we want to configure: + # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501 + # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second + # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 + self.args = args + self.is_proc_running = False + + self._tasks = [] + + async def wait_until_ready(self): + lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") + lines_head_occurred = {line: False for line in lines_head_pattern} + + async def read_from_daemon_and_check(): + line = await self.proc.stdout.readline() + for head_pattern in lines_head_occurred: + if line.startswith(head_pattern): + lines_head_occurred[head_pattern] = True + return all([value for value in lines_head_occurred.values()]) + + await try_until_success(read_from_daemon_and_check) + # Sleep a little bit to ensure the listener is up after logs are emitted. + await asyncio.sleep(0.01) + + async def start_printing_logs(self) -> None: + async def _print_from_stream( + src_name: str, reader: asyncio.StreamReader + ) -> None: + while True: + line = await reader.readline() + if line != b"": + print(f"{src_name}\t: {line.rstrip().decode()}") + await asyncio.sleep(0.01) + + self._tasks.append( + asyncio.ensure_future(_print_from_stream("out", self.proc.stdout)) + ) + self._tasks.append( + asyncio.ensure_future(_print_from_stream("err", self.proc.stderr)) + ) + await asyncio.sleep(0) + + async def start(self) -> None: + if self.is_proc_running: + return + self.proc = await asyncio.subprocess.create_subprocess_exec( + self.cmd, + *self.args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + bufsize=0, + ) + self.is_proc_running = True + await self.wait_until_ready() + await self.start_printing_logs() + + async def close(self) -> None: + if self.is_proc_running: + self.proc.terminate() + await self.proc.wait() + self.is_proc_running = False + for task in self._tasks: + task.cancel() + + +class Daemon: + p2pd_proc: P2PDProcess + control: Client + peer_info: PeerInfo + + def __init__( + self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo + ) -> None: + self.p2pd_proc = p2pd_proc + self.control = control + self.peer_info = peer_info + + def __repr__(self) -> str: + return f"" + + @property + def peer_id(self) -> ID: + return self.peer_info.peer_id + + @property + def listen_maddr(self) -> Multiaddr: + return self.peer_info.addrs[0] + + async def close(self) -> None: + await self.p2pd_proc.close() + await self.control.close() + + +async def make_p2pd( + daemon_control_port: int, + client_callback_port: int, + is_secure: bool, + is_pubsub_enabled=True, + is_gossipsub=True, + is_pubsub_signing=False, + is_pubsub_signing_strict=False, +) -> Daemon: + control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}") + p2pd_proc = P2PDProcess( + control_maddr, + is_secure, + is_pubsub_enabled, + is_gossipsub, + is_pubsub_signing, + is_pubsub_signing_strict, + ) + await p2pd_proc.start() + client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}") + p2pc = Client(control_maddr, client_callback_maddr) + await p2pc.listen() + peer_id, maddrs = await p2pc.identify() + listen_maddr: Multiaddr = None + for maddr in maddrs: + try: + ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) + # NOTE: Check if this `maddr` uses `tcp`. + maddr.value_for_protocol(multiaddr.protocols.P_TCP) + except multiaddr.exceptions.ProtocolLookupError: + continue + if ip == LOCALHOST_IP: + listen_maddr = maddr + break + assert listen_maddr is not None, "no loopback maddr is found" + peer_info = info_from_p2p_addr( + listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ) + return Daemon(p2pd_proc, p2pc, peer_info) diff --git a/tests/interop/envs.py b/tests/interop/envs.py new file mode 100644 index 00000000..23d9f27a --- /dev/null +++ b/tests/interop/envs.py @@ -0,0 +1,4 @@ +import os +import pathlib + +GO_BIN_PATH = pathlib.Path(os.environ["GOPATH"]) / "bin" diff --git a/tests/interop/go_pkgs/README.md b/tests/interop/go_pkgs/examples/README.md similarity index 100% rename from tests/interop/go_pkgs/README.md rename to tests/interop/go_pkgs/examples/README.md diff --git a/tests/interop/go_pkgs/echo/main.go b/tests/interop/go_pkgs/examples/echo/main.go similarity index 64% rename from tests/interop/go_pkgs/echo/main.go rename to tests/interop/go_pkgs/examples/echo/main.go index ad958a3e..e9ec5844 100644 --- a/tests/interop/go_pkgs/echo/main.go +++ b/tests/interop/go_pkgs/examples/echo/main.go @@ -3,17 +3,13 @@ package main import ( "bufio" "context" - "crypto/rand" "flag" "fmt" - "io" "io/ioutil" "log" - mrand "math/rand" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" + utils "interop/utils" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -23,59 +19,6 @@ import ( gologging "github.com/whyrusleeping/go-logging" ) -// makeBasicHost creates a LibP2P host with a random peer ID listening on the -// given multiaddress. It won't encrypt the connection if insecure is true. -func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { - - // If the seed is zero, use real cryptographic randomness. Otherwise, use a - // deterministic randomness source to make generated keys stay the same - // across multiple runs - var r io.Reader - if randseed == 0 { - r = rand.Reader - } else { - r = mrand.New(mrand.NewSource(randseed)) - } - - // Generate a key pair for this host. We will use it at least - // to obtain a valid host ID. - priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) - if err != nil { - return nil, err - } - - opts := []libp2p.Option{ - libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), - libp2p.Identity(priv), - libp2p.DisableRelay(), - } - - if insecure { - opts = append(opts, libp2p.NoSecurity) - } - - basicHost, err := libp2p.New(context.Background(), opts...) - if err != nil { - return nil, err - } - - // Build host multiaddress - hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) - - // Now we can build a full multiaddress to reach this host - // by encapsulating both addresses: - addr := basicHost.Addrs()[0] - fullAddr := addr.Encapsulate(hostAddr) - log.Printf("I am %s\n", fullAddr) - if insecure { - log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr) - } else { - log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) - } - - return basicHost, nil -} - func main() { // LibP2P code uses golog to log messages. They log with different // string IDs (i.e. "swarm"). We can control the verbosity level for @@ -94,7 +37,7 @@ func main() { } // Make a host that listens on the given multiaddress - ha, err := makeBasicHost(*listenF, *insecure, *seed) + ha, err := utils.MakeBasicHost(*listenF, *insecure, *seed) if err != nil { log.Fatal(err) } diff --git a/tests/interop/go_pkgs/go.mod b/tests/interop/go_pkgs/examples/go.mod similarity index 100% rename from tests/interop/go_pkgs/go.mod rename to tests/interop/go_pkgs/examples/go.mod diff --git a/tests/interop/go_pkgs/go.sum b/tests/interop/go_pkgs/examples/go.sum similarity index 100% rename from tests/interop/go_pkgs/go.sum rename to tests/interop/go_pkgs/examples/go.sum diff --git a/tests/interop/go_pkgs/examples/utils/host.go b/tests/interop/go_pkgs/examples/utils/host.go new file mode 100644 index 00000000..4024da51 --- /dev/null +++ b/tests/interop/go_pkgs/examples/utils/host.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "log" + mrand "math/rand" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + + ma "github.com/multiformats/go-multiaddr" +) + +// MakeBasicHost creates a LibP2P host with a random peer ID listening on the +// given multiaddress. It won't encrypt the connection if insecure is true. +func MakeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { + + // If the seed is zero, use real cryptographic randomness. Otherwise, use a + // deterministic randomness source to make generated keys stay the same + // across multiple runs + var r io.Reader + if randseed == 0 { + r = rand.Reader + } else { + r = mrand.New(mrand.NewSource(randseed)) + } + + // Generate a key pair for this host. We will use it at least + // to obtain a valid host ID. + priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) + if err != nil { + return nil, err + } + + opts := []libp2p.Option{ + libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), + libp2p.Identity(priv), + libp2p.DisableRelay(), + } + + if insecure { + opts = append(opts, libp2p.NoSecurity) + } + + basicHost, err := libp2p.New(context.Background(), opts...) + if err != nil { + return nil, err + } + + // Build host multiaddress + hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) + + // Now we can build a full multiaddress to reach this host + // by encapsulating both addresses: + addr := basicHost.Addrs()[0] + fullAddr := addr.Encapsulate(hostAddr) + log.Printf("I am %s\n", fullAddr) + if insecure { + log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr) + } else { + log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) + } + + return basicHost, nil +} diff --git a/tests/interop/go_pkgs/install_interop_go_pkgs.sh b/tests/interop/go_pkgs/install_interop_go_pkgs.sh new file mode 100755 index 00000000..b830e865 --- /dev/null +++ b/tests/interop/go_pkgs/install_interop_go_pkgs.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +SCRIPT_RELATIVE_PATH=`dirname $0` + +GO_PKGS_PATH=$SCRIPT_RELATIVE_PATH + +DAEMON_REPO=go-libp2p-daemon +DAEMON_PATH=$GO_PKGS_PATH/$DAEMON_REPO + +EXAMPLES_PATHS=$GO_PKGS_PATH/examples + +go version + +# Install `p2pd` +# FIXME: Use the canonical repo in libp2p, when we don't need `insecure`. +if [ ! -e "$DAEMON_PATH" ]; then + git clone https://github.com/mhchia/$DAEMON_REPO.git --branch test/add-options $DAEMON_PATH + if [ "$?" != 0 ]; then + echo "Failed to clone the daemon repo" + exit 1 + fi +fi + +cd $DAEMON_PATH && go install ./... + +cd - + +# Install example modeuls +cd $EXAMPLES_PATHS && go install ./... + +echo "Finish installing go modules for interop." diff --git a/tests/interop/test_bindings.py b/tests/interop/test_bindings.py new file mode 100644 index 00000000..1189e0b7 --- /dev/null +++ b/tests/interop/test_bindings.py @@ -0,0 +1,24 @@ +import pytest + +from .utils import connect + + +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_connect(hosts, p2pds): + p2pd = p2pds[0] + host = hosts[0] + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Py + await connect(host, p2pd) + assert len(await p2pd.control.list_peers()) == 1 + # Test: `disconnect` from Py + await host.disconnect(p2pd.peer_id) + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Go + await connect(p2pd, host) + assert len(host.get_network().connections) == 1 + # Test: `disconnect` from Go + await p2pd.control.disconnect(host.get_id()) + # FIXME: Failed to handle disconnect + # assert len(host.get_network().connections) == 0 diff --git a/tests/interop/test_echo.py b/tests/interop/test_echo.py index 9b170db9..81b553c4 100644 --- a/tests/interop/test_echo.py +++ b/tests/interop/test_echo.py @@ -1,6 +1,4 @@ import asyncio -import os -import pathlib from multiaddr import Multiaddr import pytest @@ -9,9 +7,9 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.typing import TProtocol from .constants import PEXPECT_NEW_LINE +from .envs import GO_BIN_PATH -GOPATH = pathlib.Path(os.environ["GOPATH"]) -ECHO_PATH = GOPATH / "bin" / "echo" +ECHO_PATH = GO_BIN_PATH / "echo" ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0") diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py new file mode 100644 index 00000000..bb37e352 --- /dev/null +++ b/tests/interop/test_pubsub.py @@ -0,0 +1,164 @@ +import asyncio +import functools + +from p2pclient.pb import p2pd_pb2 +import pytest + +from libp2p.peer.id import ID +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.utils import read_varint_prefixed_bytes + +from .utils import connect + +TOPIC_0 = "ABALA" +TOPIC_1 = "YOOOO" + + +async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]": + reader, writer = await p2pd.control.pubsub_subscribe(topic) + + queue = asyncio.Queue() + + async def _read_pubsub_msg() -> None: + writer_closed_task = asyncio.ensure_future(writer.wait_closed()) + + while True: + done, pending = await asyncio.wait( + [read_varint_prefixed_bytes(reader), writer_closed_task], + return_when=asyncio.FIRST_COMPLETED, + ) + done_tasks = tuple(done) + if writer.is_closing(): + return + read_task = done_tasks[0] + # Sanity check + assert read_task._coro.__name__ == "read_varint_prefixed_bytes" + msg_bytes = read_task.result() + ps_msg = p2pd_pb2.PSMessage() + ps_msg.ParseFromString(msg_bytes) + # Fill in the message used in py-libp2p + msg = rpc_pb2.Message( + from_id=ps_msg.from_field, + data=ps_msg.data, + seqno=ps_msg.seqno, + topicIDs=ps_msg.topicIDs, + signature=ps_msg.signature, + key=ps_msg.key, + ) + queue.put_nowait(msg) + + asyncio.ensure_future(_read_pubsub_msg()) + await asyncio.sleep(0) + return queue + + +def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None: + assert msg.data == data and msg.from_id == from_peer_id + + +@pytest.mark.parametrize("is_gossipsub", (True, False)) +@pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) +@pytest.mark.asyncio +async def test_pubsub(pubsubs, p2pds): + # + # Test: Recognize pubsub peers on connection. + # + py_pubsub = pubsubs[0] + # go0 <-> py <-> go1 + await connect(p2pds[0], py_pubsub.host) + await connect(py_pubsub.host, p2pds[1]) + py_peer_id = py_pubsub.host.get_id() + # Check pubsub peers + pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("") + assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id + pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("") + assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id + assert ( + len(py_pubsub.peers) == 2 + and p2pds[0].peer_id in py_pubsub.peers + and p2pds[1].peer_id in py_pubsub.peers + ) + + # + # Test: `subscribe`. + # + # (name, topics) + # (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1]) + sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0) + sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1) + sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0) + sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1) + sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1) + # Check topic peers + await asyncio.sleep(0.1) + # go_0 + go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0) + assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0] + go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1) + assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0] + # py + py_topic_0_peers = py_pubsub.peer_topics[TOPIC_0] + assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0] + # go_1 + go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1) + assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0] + + # + # Test: `publish` + # + # 1. py publishes + # - 1.1. py publishes data_11 to topic_0, py and go_0 receives. + # - 1.2. py publishes data_12 to topic_1, all receive. + # 2. go publishes + # - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + # - 2.2. go_1 publishes data_22 to topic_1, all receive. + + # 1.1. py publishes data_11 to topic_0, py and go_0 receives. + data_11 = b"data_11" + await py_pubsub.publish(TOPIC_0, data_11) + validate_11 = functools.partial( + validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id + ) + validate_11(await sub_py_topic_0.get()) + validate_11(await sub_go_0_topic_0.get()) + + # 1.2. py publishes data_12 to topic_1, all receive. + data_12 = b"data_12" + validate_12 = functools.partial( + validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id + ) + await py_pubsub.publish(TOPIC_1, data_12) + validate_12(await sub_py_topic_1.get()) + validate_12(await sub_go_0_topic_1.get()) + validate_12(await sub_go_1_topic_1.get()) + + # 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + data_21 = b"data_21" + validate_21 = functools.partial( + validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id + ) + await p2pds[0].control.pubsub_publish(TOPIC_0, data_21) + validate_21(await sub_py_topic_0.get()) + validate_21(await sub_go_0_topic_0.get()) + + # 2.2. go_1 publishes data_22 to topic_1, all receive. + data_22 = b"data_22" + validate_22 = functools.partial( + validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id + ) + await p2pds[1].control.pubsub_publish(TOPIC_1, data_22) + validate_22(await sub_py_topic_1.get()) + validate_22(await sub_go_0_topic_1.get()) + validate_22(await sub_go_1_topic_1.get()) + + # + # Test: `unsubscribe` and re`subscribe` + # + await py_pubsub.unsubscribe(TOPIC_0) + await asyncio.sleep(0.1) + assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) + await py_pubsub.subscribe(TOPIC_0) + await asyncio.sleep(0.1) + assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) diff --git a/tests/interop/utils.py b/tests/interop/utils.py new file mode 100644 index 00000000..c9174179 --- /dev/null +++ b/tests/interop/utils.py @@ -0,0 +1,58 @@ +import asyncio +from typing import Union + +from multiaddr import Multiaddr + +from libp2p.host.host_interface import IHost +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + +from .daemon import Daemon + +TDaemonOrHost = Union[IHost, Daemon] + + +def _get_peer_info(node: TDaemonOrHost) -> PeerInfo: + peer_info: PeerInfo + if isinstance(node, Daemon): + peer_info = node.peer_info + else: # isinstance(node, IHost) + peer_id = node.get_id() + maddrs = [ + node.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ] + peer_info = PeerInfo(peer_id, maddrs) + return peer_info + + +async def _is_peer(peer_id: ID, node: TDaemonOrHost) -> bool: + if isinstance(node, Daemon): + pinfos = await node.control.list_peers() + peers = tuple([pinfo.peer_id for pinfo in pinfos]) + return peer_id in peers + else: # isinstance(node, IHost) + return peer_id in node.get_network().connections + + +async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: + # Type check + err_msg = ( + f"Type of a={type(a)} or type of b={type(b)} is wrong." + "Should be either `IHost` or `Daemon`" + ) + assert all( + [isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)] + ), err_msg + + b_peer_info = _get_peer_info(b) + if isinstance(a, Daemon): + await a.control.connect(b_peer_info.peer_id, b_peer_info.addrs) + else: # isinstance(b, IHost) + await a.connect(b_peer_info) + # Allow additional sleep for both side to establish the connection. + await asyncio.sleep(0.1) + + a_peer_info = _get_peer_info(a) + + assert await _is_peer(b_peer_info.peer_id, a) + assert await _is_peer(a_peer_info.peer_id, b) diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index 156305c2..29c46887 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -4,7 +4,6 @@ import multiaddr import pytest from libp2p.peer.id import ID -from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" @@ -12,24 +11,17 @@ VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJ def test_init_(): - peer_data = PeerData() random_addrs = [random.randint(0, 255) for r in range(4)] - peer_data.add_addrs(random_addrs) random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) peer_id = ID(random_id_string.encode()) - peer_info = PeerInfo(peer_id, peer_data) + peer_info = PeerInfo(peer_id, random_addrs) assert peer_info.peer_id == peer_id assert peer_info.addrs == random_addrs -def test_init_no_value(): - with pytest.raises(Exception): - PeerInfo() - - @pytest.mark.parametrize( "addr", ( diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index e5adfad4..b2053252 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -1,7 +1,9 @@ from typing import NamedTuple -FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" -GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" +from libp2p.pubsub import floodsub, gossipsub + +FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID +GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID class GossipsubParams(NamedTuple): diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index 1755ee59..246ca158 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -1,23 +1,8 @@ import pytest from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory -from tests.pubsub.configs import GOSSIPSUB_PARAMS - -@pytest.fixture -def floodsubs(num_hosts): - return FloodsubFactory.create_batch(num_hosts) - - -@pytest.fixture -def gossipsub_params(): - return GOSSIPSUB_PARAMS - - -@pytest.fixture -def gossipsubs(num_hosts, gossipsub_params): - yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) - # TODO: Clean up +from .configs import GOSSIPSUB_PARAMS def _make_pubsubs(hosts, pubsub_routers, cache_size): @@ -38,14 +23,21 @@ def pubsub_cache_size(): @pytest.fixture -def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): +def gossipsub_params(): + return GOSSIPSUB_PARAMS + + +@pytest.fixture +def pubsubs_fsub(num_hosts, hosts, pubsub_cache_size): + floodsubs = FloodsubFactory.create_batch(num_hosts) _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) yield _pubsubs_fsub # TODO: Clean up @pytest.fixture -def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): +def pubsubs_gsub(num_hosts, hosts, pubsub_cache_size, gossipsub_params): + gossipsubs = GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) yield _pubsubs_gsub # TODO: Clean up diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e091f669..7a0efc2c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -14,7 +14,8 @@ from .utils import dense_connect, one_to_all_connect ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ) @pytest.mark.asyncio -async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): +async def test_join(num_hosts, hosts, pubsubs_gsub): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) hosts_indices = list(range(num_hosts)) topic = "test_join" @@ -85,7 +86,9 @@ async def test_leave(pubsubs_gsub): @pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.asyncio -async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch): +async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1 @@ -137,7 +140,9 @@ async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeyp "num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),) ) @pytest.mark.asyncio -async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs): +async def test_handle_prune(pubsubs_gsub, hosts): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1 diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 7a9ff3e7..34139494 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -1,5 +1,4 @@ import asyncio -import io from typing import NamedTuple import pytest @@ -7,6 +6,7 @@ import pytest from libp2p.exceptions import ValidationError from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 +from libp2p.utils import encode_varint_prefixed from tests.utils import connect from .utils import make_pubsub_msg @@ -70,8 +70,7 @@ async def test_peers_subscribe(pubsubs_fsub): @pytest.mark.asyncio async def test_get_hello_packet(pubsubs_fsub): def _get_hello_packet_topic_ids(): - packet = rpc_pb2.RPC() - packet.ParseFromString(pubsubs_fsub[0].get_hello_packet()) + packet = pubsubs_fsub[0].get_hello_packet() return tuple(sub.topicid for sub in packet.subscriptions) # Test: No subscription, so there should not be any topic ids in the hello packet. @@ -239,11 +238,19 @@ class FakeNetStream: def __init__(self) -> None: self._queue = asyncio.Queue() - async def read(self) -> bytes: - buf = io.BytesIO() - while not self._queue.empty(): - buf.write(await self._queue.get()) - return buf.getvalue() + async def read(self, n: int = -1) -> bytes: + buf = bytearray() + # Force to blocking wait if no data available now. + if self._queue.empty(): + first_byte = await self._queue.get() + buf.extend(first_byte) + # If `n == -1`, read until no data is in the buffer(_queue). + # Else, read until no data is in the buffer(_queue) or we have read `n` bytes. + while (n == -1) or (len(buf) < n): + if self._queue.empty(): + break + buf.extend(await self._queue.get()) + return bytes(buf) async def write(self, data: bytes) -> int: for i in data: @@ -279,7 +286,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): async def wait_for_event_occurring(event): try: - await asyncio.wait_for(event.wait(), timeout=0.01) + await asyncio.wait_for(event.wait(), timeout=1) except asyncio.TimeoutError as error: event.clear() raise asyncio.TimeoutError( @@ -296,7 +303,9 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): publish_subscribed_topic = rpc_pb2.RPC( publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])] ) - await stream.write(publish_subscribed_topic.SerializeToString()) + await stream.write( + encode_varint_prefixed(publish_subscribed_topic.SerializeToString()) + ) await wait_for_event_occurring(event_push_msg) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -308,13 +317,15 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): publish_not_subscribed_topic = rpc_pb2.RPC( publish=[rpc_pb2.Message(topicIDs=["NOT_SUBSCRIBED"])] ) - await stream.write(publish_not_subscribed_topic.SerializeToString()) + await stream.write( + encode_varint_prefixed(publish_not_subscribed_topic.SerializeToString()) + ) with pytest.raises(asyncio.TimeoutError): await wait_for_event_occurring(event_push_msg) # Test: `handle_subscription` is called when a subscription message is received. subscription_msg = rpc_pb2.RPC(subscriptions=[rpc_pb2.RPC.SubOpts()]) - await stream.write(subscription_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(subscription_msg.SerializeToString())) await wait_for_event_occurring(event_handle_subscription) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -324,7 +335,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): # Test: `handle_rpc` is called when a control message is received. control_msg = rpc_pb2.RPC(control=rpc_pb2.ControlMessage()) - await stream.write(control_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(control_msg.SerializeToString())) await wait_for_event_occurring(event_handle_rpc) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -406,9 +417,11 @@ async def test_message_all_peers(pubsubs_fsub, monkeypatch): monkeypatch.setattr(pubsubs_fsub[0], "peers", mock_peers) empty_rpc = rpc_pb2.RPC() - await pubsubs_fsub[0].message_all_peers(empty_rpc.SerializeToString()) + empty_rpc_bytes = empty_rpc.SerializeToString() + empty_rpc_bytes_len_prefixed = encode_varint_prefixed(empty_rpc_bytes) + await pubsubs_fsub[0].message_all_peers(empty_rpc_bytes) for stream in mock_peers.values(): - assert (await stream.read()) == empty_rpc.SerializeToString() + assert (await stream.read()) == empty_rpc_bytes_len_prefixed @pytest.mark.parametrize("num_hosts", (1,)) diff --git a/tox.ini b/tox.ini index 0d85779c..2d50b08c 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ select = B,C,E,F,W,T4,B9 [isort] force_sort_within_sections=True -known_third_party=hypothesis,pytest,async_generator,cytoolz,trio_typing,pytest_trio +known_third_party=pytest,p2pclient multi_line_output=3 include_trailing_comma=True force_grid_wrap=0