From 1b5d064a8d00b192560d7a8b82a6263d281fc6e0 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 31 Aug 2019 00:18:22 +0800 Subject: [PATCH 01/33] Add utility functions for libp2p bindings To prepare for pubsub interop test --- libp2p/pubsub/floodsub.py | 2 + libp2p/pubsub/gossipsub.py | 2 + setup.py | 2 + tests/interop/constants.py | 1 + tests/interop/daemon.py | 81 +++++++++++++++++++++++++++++ tests/interop/envs.py | 4 ++ tests/interop/go_pkgs/echo/main.go | 63 ++-------------------- tests/interop/go_pkgs/utils/host.go | 69 ++++++++++++++++++++++++ tests/interop/test_echo.py | 6 +-- tests/interop/test_pubsub_bind.py | 11 ++++ 10 files changed, 177 insertions(+), 64 deletions(-) create mode 100644 tests/interop/daemon.py create mode 100644 tests/interop/envs.py create mode 100644 tests/interop/go_pkgs/utils/host.go create mode 100644 tests/interop/test_pubsub_bind.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index d35b97b6..3fc713b2 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -7,6 +7,8 @@ from .pb import rpc_pb2 from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter +PROTOCOL_ID = TProtocol("/floodsub/1.0.0") + class FloodSub(IPubsubRouter): diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8b3a62cb..3e8b0d9a 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -11,6 +11,8 @@ from .pb import rpc_pb2 from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter +PROTOCOL_ID = TProtocol("/meshsub/1.0.0") + class GossipSub(IPubsubRouter): diff --git a/setup.py b/setup.py index ff3dfa1f..f5eff257 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,8 @@ extras_require = { "pytest>=4.6.3,<5.0.0", "pytest-asyncio>=0.10.0,<1.0.0", "pexpect>=4.6,<5", + # FIXME: Master branch. Use PyPI instead after it is released. + "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@4777c62", ], "lint": [ "mypy>=0.701,<1.0", diff --git a/tests/interop/constants.py b/tests/interop/constants.py index dbef0437..331e2843 100644 --- a/tests/interop/constants.py +++ b/tests/interop/constants.py @@ -1 +1,2 @@ +LOCALHOST_IP = "127.0.0.1" PEXPECT_NEW_LINE = "\r\n" diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py new file mode 100644 index 00000000..e74329be --- /dev/null +++ b/tests/interop/daemon.py @@ -0,0 +1,81 @@ +from multiaddr import Multiaddr +from pexpect.spawnbase import SpawnBase + +from p2pclient import Client + +from libp2p.peer.peerinfo import info_from_p2p_addr, PeerInfo +from libp2p.peer.id import ID + +from .constants import PEXPECT_NEW_LINE, LOCALHOST_IP +from .envs import GO_BIN_PATH + +P2PD_PATH = GO_BIN_PATH / "p2pd" + + +class Daemon: + proc: SpawnBase + control: Client + peer_info: PeerInfo + + def __init__(self, proc: SpawnBase, control: Client, peer_info: PeerInfo) -> None: + self.proc = proc + self.control = control + self.peer_info = peer_info + + def __repr__(self) -> str: + return f"" + + @property + def peer_id(self) -> ID: + return self.peer_info.peer_id + + @property + def listen_maddr(self) -> Multiaddr: + return self.peer_info.addrs[0] + + +async def make_p2pd( + proc_factory, + unused_tcp_port_factory, + is_secure: bool, + is_pubsub_enabled=True, + is_gossipsub=True, + is_pubsub_signing=False, + is_pubsub_signing_strict=False, +) -> Daemon: + control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}") + args = [f"-listen={str(control_maddr)}"] + # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. + if not is_secure: + args.append("-insecure=true") + if is_pubsub_enabled: + args.append("-pubsub") + if is_gossipsub: + args.append("-pubsubRouter=gossipsub") + else: + args.append("-pubsubRouter=floodsub") + if not is_pubsub_signing: + args.append("-pubsubSign=false") + if not is_pubsub_signing_strict: + args.append("-pubsubSignStrict=false") + # NOTE: + # Two other params are possibly what we want to configure: + # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond + # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second + # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 + proc = proc_factory(str(P2PD_PATH), args) + await proc.expect(r"Peer ID:\s+(\w+)" + PEXPECT_NEW_LINE, async_=True) + peer_id_base58 = proc.match.group(1) + await proc.expect(r"Peer Addrs:", async_=True) + await proc.expect( + rf"(/ip4/{LOCALHOST_IP}/tcp/[\w]+)" + PEXPECT_NEW_LINE, async_=True + ) + daemon_listener_maddr = Multiaddr(proc.match.group(1)) + daemon_pinfo = info_from_p2p_addr( + daemon_listener_maddr.encapsulate(f"/p2p/{peer_id_base58}") + ) + client_callback_maddr = Multiaddr( + f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}" + ) + p2pc = Client(control_maddr, client_callback_maddr) + return Daemon(proc, p2pc, daemon_pinfo) diff --git a/tests/interop/envs.py b/tests/interop/envs.py new file mode 100644 index 00000000..23d9f27a --- /dev/null +++ b/tests/interop/envs.py @@ -0,0 +1,4 @@ +import os +import pathlib + +GO_BIN_PATH = pathlib.Path(os.environ["GOPATH"]) / "bin" diff --git a/tests/interop/go_pkgs/echo/main.go b/tests/interop/go_pkgs/echo/main.go index ad958a3e..e9ec5844 100644 --- a/tests/interop/go_pkgs/echo/main.go +++ b/tests/interop/go_pkgs/echo/main.go @@ -3,17 +3,13 @@ package main import ( "bufio" "context" - "crypto/rand" "flag" "fmt" - "io" "io/ioutil" "log" - mrand "math/rand" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" + utils "interop/utils" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -23,59 +19,6 @@ import ( gologging "github.com/whyrusleeping/go-logging" ) -// makeBasicHost creates a LibP2P host with a random peer ID listening on the -// given multiaddress. It won't encrypt the connection if insecure is true. -func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { - - // If the seed is zero, use real cryptographic randomness. Otherwise, use a - // deterministic randomness source to make generated keys stay the same - // across multiple runs - var r io.Reader - if randseed == 0 { - r = rand.Reader - } else { - r = mrand.New(mrand.NewSource(randseed)) - } - - // Generate a key pair for this host. We will use it at least - // to obtain a valid host ID. - priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) - if err != nil { - return nil, err - } - - opts := []libp2p.Option{ - libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), - libp2p.Identity(priv), - libp2p.DisableRelay(), - } - - if insecure { - opts = append(opts, libp2p.NoSecurity) - } - - basicHost, err := libp2p.New(context.Background(), opts...) - if err != nil { - return nil, err - } - - // Build host multiaddress - hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) - - // Now we can build a full multiaddress to reach this host - // by encapsulating both addresses: - addr := basicHost.Addrs()[0] - fullAddr := addr.Encapsulate(hostAddr) - log.Printf("I am %s\n", fullAddr) - if insecure { - log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr) - } else { - log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) - } - - return basicHost, nil -} - func main() { // LibP2P code uses golog to log messages. They log with different // string IDs (i.e. "swarm"). We can control the verbosity level for @@ -94,7 +37,7 @@ func main() { } // Make a host that listens on the given multiaddress - ha, err := makeBasicHost(*listenF, *insecure, *seed) + ha, err := utils.MakeBasicHost(*listenF, *insecure, *seed) if err != nil { log.Fatal(err) } diff --git a/tests/interop/go_pkgs/utils/host.go b/tests/interop/go_pkgs/utils/host.go new file mode 100644 index 00000000..4024da51 --- /dev/null +++ b/tests/interop/go_pkgs/utils/host.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "log" + mrand "math/rand" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + + ma "github.com/multiformats/go-multiaddr" +) + +// MakeBasicHost creates a LibP2P host with a random peer ID listening on the +// given multiaddress. It won't encrypt the connection if insecure is true. +func MakeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { + + // If the seed is zero, use real cryptographic randomness. Otherwise, use a + // deterministic randomness source to make generated keys stay the same + // across multiple runs + var r io.Reader + if randseed == 0 { + r = rand.Reader + } else { + r = mrand.New(mrand.NewSource(randseed)) + } + + // Generate a key pair for this host. We will use it at least + // to obtain a valid host ID. + priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) + if err != nil { + return nil, err + } + + opts := []libp2p.Option{ + libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), + libp2p.Identity(priv), + libp2p.DisableRelay(), + } + + if insecure { + opts = append(opts, libp2p.NoSecurity) + } + + basicHost, err := libp2p.New(context.Background(), opts...) + if err != nil { + return nil, err + } + + // Build host multiaddress + hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) + + // Now we can build a full multiaddress to reach this host + // by encapsulating both addresses: + addr := basicHost.Addrs()[0] + fullAddr := addr.Encapsulate(hostAddr) + log.Printf("I am %s\n", fullAddr) + if insecure { + log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr) + } else { + log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) + } + + return basicHost, nil +} diff --git a/tests/interop/test_echo.py b/tests/interop/test_echo.py index 9b170db9..81b553c4 100644 --- a/tests/interop/test_echo.py +++ b/tests/interop/test_echo.py @@ -1,6 +1,4 @@ import asyncio -import os -import pathlib from multiaddr import Multiaddr import pytest @@ -9,9 +7,9 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.typing import TProtocol from .constants import PEXPECT_NEW_LINE +from .envs import GO_BIN_PATH -GOPATH = pathlib.Path(os.environ["GOPATH"]) -ECHO_PATH = GOPATH / "bin" / "echo" +ECHO_PATH = GO_BIN_PATH / "echo" ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0") diff --git a/tests/interop/test_pubsub_bind.py b/tests/interop/test_pubsub_bind.py new file mode 100644 index 00000000..d01dc2d5 --- /dev/null +++ b/tests/interop/test_pubsub_bind.py @@ -0,0 +1,11 @@ +import pytest + +from .daemon import make_p2pd + + +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_pubsub_init( + hosts, proc_factory, is_host_secure, unused_tcp_port_factory +): + p2pd = await make_p2pd(proc_factory, unused_tcp_port_factory, is_host_secure) From b77834d12971618f60909f1a9b56678c827c8310 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sun, 1 Sep 2019 23:39:53 +0800 Subject: [PATCH 02/33] Use `asyncio.subprocess` over pexpect In the test for pubsub, since there were unknown issues when I test against pexpect. --- tests/interop/daemon.py | 164 +++++++++++++++++++++++------- tests/interop/test_pubsub_bind.py | 15 ++- 2 files changed, 137 insertions(+), 42 deletions(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index e74329be..9bcb292f 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -1,24 +1,113 @@ -from multiaddr import Multiaddr -from pexpect.spawnbase import SpawnBase +import asyncio +import time +from typing import Any, List +import multiaddr +from multiaddr import Multiaddr from p2pclient import Client -from libp2p.peer.peerinfo import info_from_p2p_addr, PeerInfo from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr -from .constants import PEXPECT_NEW_LINE, LOCALHOST_IP +from .constants import LOCALHOST_IP from .envs import GO_BIN_PATH P2PD_PATH = GO_BIN_PATH / "p2pd" +TIMEOUT_DURATION = 30 + + +async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): + """ + Keep running ``coro_func`` until the time is out. + All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. + """ + t_start = time.monotonic() + while True: + result = await coro_func() + if result: + break + if (time.monotonic() - t_start) >= timeout: + # timeout + assert False, f"{coro_func} still failed after `{timeout}` seconds" + await asyncio.sleep(0.01) + + +class P2PDProcess: + proc: asyncio.subprocess.Process + cmd: str = str(P2PD_PATH) + args: List[Any] + + def __init__( + self, + control_maddr: Multiaddr, + is_secure: bool, + is_pubsub_enabled=True, + is_gossipsub=True, + is_pubsub_signing=False, + is_pubsub_signing_strict=False, + ) -> None: + args = [f"-listen={str(control_maddr)}"] + # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. + if not is_secure: + args.append("-insecure=true") + if is_pubsub_enabled: + args.append("-pubsub") + if is_gossipsub: + args.append("-pubsubRouter=gossipsub") + else: + args.append("-pubsubRouter=floodsub") + if not is_pubsub_signing: + args.append("-pubsubSign=false") + if not is_pubsub_signing_strict: + args.append("-pubsubSignStrict=false") + # NOTE: + # Two other params are possibly what we want to configure: + # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501 + # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second + # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 + self.args = args + + async def wait_until_ready(self): + lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") + lines_head_occurred = {line: False for line in lines_head_pattern} + + async def read_from_daemon_and_check(): + line = await self.proc.stdout.readline() + for head_pattern in lines_head_occurred: + if line.startswith(head_pattern): + lines_head_occurred[head_pattern] = True + return all([value for _, value in lines_head_occurred.items()]) + + await try_until_success(read_from_daemon_and_check) + # Sleep a little bit to ensure the listener is up after logs are emitted. + await asyncio.sleep(0.01) + + async def start(self) -> None: + self.proc = await asyncio.subprocess.create_subprocess_exec( + self.cmd, + *self.args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + bufsize=0, + ) + await self.wait_until_ready() + + async def close(self) -> None: + self.proc.terminate() + await self.proc.wait() + + class Daemon: - proc: SpawnBase + p2pd_proc: P2PDProcess control: Client peer_info: PeerInfo - def __init__(self, proc: SpawnBase, control: Client, peer_info: PeerInfo) -> None: - self.proc = proc + def __init__( + self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo + ) -> None: + self.p2pd_proc = p2pd_proc self.control = control self.peer_info = peer_info @@ -33,9 +122,12 @@ class Daemon: def listen_maddr(self) -> Multiaddr: return self.peer_info.addrs[0] + async def close(self) -> None: + await self.p2pd_proc.close() + await self.control.close() + async def make_p2pd( - proc_factory, unused_tcp_port_factory, is_secure: bool, is_pubsub_enabled=True, @@ -44,38 +136,34 @@ async def make_p2pd( is_pubsub_signing_strict=False, ) -> Daemon: control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}") - args = [f"-listen={str(control_maddr)}"] - # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. - if not is_secure: - args.append("-insecure=true") - if is_pubsub_enabled: - args.append("-pubsub") - if is_gossipsub: - args.append("-pubsubRouter=gossipsub") - else: - args.append("-pubsubRouter=floodsub") - if not is_pubsub_signing: - args.append("-pubsubSign=false") - if not is_pubsub_signing_strict: - args.append("-pubsubSignStrict=false") - # NOTE: - # Two other params are possibly what we want to configure: - # - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond - # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second - # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 - proc = proc_factory(str(P2PD_PATH), args) - await proc.expect(r"Peer ID:\s+(\w+)" + PEXPECT_NEW_LINE, async_=True) - peer_id_base58 = proc.match.group(1) - await proc.expect(r"Peer Addrs:", async_=True) - await proc.expect( - rf"(/ip4/{LOCALHOST_IP}/tcp/[\w]+)" + PEXPECT_NEW_LINE, async_=True - ) - daemon_listener_maddr = Multiaddr(proc.match.group(1)) - daemon_pinfo = info_from_p2p_addr( - daemon_listener_maddr.encapsulate(f"/p2p/{peer_id_base58}") + p2pd_proc = P2PDProcess( + control_maddr, + is_secure, + is_pubsub_enabled, + is_gossipsub, + is_pubsub_signing, + is_pubsub_signing_strict, ) + await p2pd_proc.start() client_callback_maddr = Multiaddr( f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}" ) p2pc = Client(control_maddr, client_callback_maddr) - return Daemon(proc, p2pc, daemon_pinfo) + await p2pc.listen() + peer_id, maddrs = await p2pc.identify() + listen_maddr: Multiaddr = None + for maddr in maddrs: + try: + ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) + maddr.value_for_protocol(multiaddr.protocols.P_TCP) + except multiaddr.exceptions.ProtocolLookupError: + continue + if ip == LOCALHOST_IP: + listen_maddr = maddr + break + assert listen_maddr is not None, "no loopback maddr is found" + peer_info = info_from_p2p_addr( + listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ) + print(f"!@# peer_info: peer_id={peer_info.peer_id}, maddrs={peer_info.addrs}") + return Daemon(p2pd_proc, p2pc, peer_info) diff --git a/tests/interop/test_pubsub_bind.py b/tests/interop/test_pubsub_bind.py index d01dc2d5..66793b40 100644 --- a/tests/interop/test_pubsub_bind.py +++ b/tests/interop/test_pubsub_bind.py @@ -5,7 +5,14 @@ from .daemon import make_p2pd @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_pubsub_init( - hosts, proc_factory, is_host_secure, unused_tcp_port_factory -): - p2pd = await make_p2pd(proc_factory, unused_tcp_port_factory, is_host_secure) +async def test_pubsub_init(hosts, is_host_secure, unused_tcp_port_factory): + try: + p2pd = await make_p2pd(unused_tcp_port_factory, is_host_secure) + host = hosts[0] + peers = await p2pd.control.list_peers() + assert len(peers) == 0 + await host.connect(p2pd.peer_info) + peers = await p2pd.control.list_peers() + assert len(peers) != 0 + finally: + await p2pd.close() From dfd9ebdc5e4489945daae10f4fd7911bd32ca0d2 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 2 Sep 2019 13:52:33 +0800 Subject: [PATCH 03/33] Change `PeerInfo` to remove dep on `PeerData` --- libp2p/peer/peerinfo.py | 11 +++-------- libp2p/peer/peerstore.py | 2 +- tests/peer/test_peerinfo.py | 10 +--------- 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index a2f08424..2f41a6c0 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -3,7 +3,6 @@ from typing import List import multiaddr from .id import ID -from .peerdata import PeerData class PeerInfo: @@ -11,9 +10,9 @@ class PeerInfo: peer_id: ID addrs: List[multiaddr.Multiaddr] - def __init__(self, peer_id: ID, peer_data: PeerData = None) -> None: + def __init__(self, peer_id: ID, addrs: List[multiaddr.Multiaddr]) -> None: self.peer_id = peer_id - self.addrs = peer_data.get_addrs() if peer_data else None + self.addrs = addrs def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: @@ -44,11 +43,7 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: if len(parts) > 1: addr = multiaddr.Multiaddr.join(*parts[:-1]) - peer_data = PeerData() - peer_data.add_addrs([addr]) - peer_data.set_protocols([p.code for p in addr.protocols()]) - - return PeerInfo(peer_id, peer_data) + return PeerInfo(peer_id, [addr]) class InvalidAddrError(ValueError): diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 1d15ab2a..c1eae370 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -33,7 +33,7 @@ class PeerStore(IPeerStore): def peer_info(self, peer_id: ID) -> Optional[PeerInfo]: if peer_id in self.peer_map: peer_data = self.peer_map[peer_id] - return PeerInfo(peer_id, peer_data) + return PeerInfo(peer_id, peer_data.addrs) return None def get_protocols(self, peer_id: ID) -> List[str]: diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index 156305c2..29c46887 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -4,7 +4,6 @@ import multiaddr import pytest from libp2p.peer.id import ID -from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" @@ -12,24 +11,17 @@ VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJ def test_init_(): - peer_data = PeerData() random_addrs = [random.randint(0, 255) for r in range(4)] - peer_data.add_addrs(random_addrs) random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) peer_id = ID(random_id_string.encode()) - peer_info = PeerInfo(peer_id, peer_data) + peer_info = PeerInfo(peer_id, random_addrs) assert peer_info.peer_id == peer_id assert peer_info.addrs == random_addrs -def test_init_no_value(): - with pytest.raises(Exception): - PeerInfo() - - @pytest.mark.parametrize( "addr", ( From 56ef0b962cd1877c4534c72d255bf0acd3ff6a45 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 2 Sep 2019 17:32:15 +0800 Subject: [PATCH 04/33] Add test for `host` `connect` and `disconnect` --- setup.py | 2 +- tests/conftest.py | 50 +++++++++++++++++++++++++++++- tests/interop/conftest.py | 19 ++++++++++++ tests/interop/daemon.py | 33 +++++++++++++++++--- tests/interop/test_bindings.py | 29 ++++++++++++++++++ tests/interop/test_pubsub_bind.py | 18 ----------- tests/pubsub/conftest.py | 51 ------------------------------- 7 files changed, 126 insertions(+), 76 deletions(-) create mode 100644 tests/interop/test_bindings.py delete mode 100644 tests/interop/test_pubsub_bind.py diff --git a/setup.py b/setup.py index f5eff257..9ddfbd12 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ extras_require = { "pytest-asyncio>=0.10.0,<1.0.0", "pexpect>=4.6,<5", # FIXME: Master branch. Use PyPI instead after it is released. - "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@4777c62", + "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@2647296", ], "lint": [ "mypy>=0.701,<1.0", diff --git a/tests/conftest.py b/tests/conftest.py index fd753be0..188c655d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,8 @@ import asyncio import pytest from .configs import LISTEN_MADDR -from .factories import HostFactory +from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory +from .pubsub.configs import GOSSIPSUB_PARAMS @pytest.fixture @@ -31,3 +32,50 @@ async def hosts(num_hosts, is_host_secure): await asyncio.gather( *[_host.close() for _host in _hosts], return_exceptions=True ) + + +@pytest.fixture +def floodsubs(num_hosts): + return FloodsubFactory.create_batch(num_hosts) + + +@pytest.fixture +def gossipsub_params(): + return GOSSIPSUB_PARAMS + + +@pytest.fixture +def gossipsubs(num_hosts, gossipsub_params): + yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) + # TODO: Clean up + + +def _make_pubsubs(hosts, pubsub_routers, cache_size): + if len(pubsub_routers) != len(hosts): + raise ValueError( + f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " + f"length of hosts={len(hosts)}" + ) + return tuple( + PubsubFactory(host=host, router=router, cache_size=cache_size) + for host, router in zip(hosts, pubsub_routers) + ) + + +@pytest.fixture +def pubsub_cache_size(): + return None # default + + +@pytest.fixture +def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): + _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) + yield _pubsubs_fsub + # TODO: Clean up + + +@pytest.fixture +def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): + _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) + yield _pubsubs_gsub + # TODO: Clean up diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index e85f2f6a..ef9de3f5 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -1,8 +1,11 @@ +import asyncio import sys import pexpect import pytest +from .daemon import make_p2pd + @pytest.fixture def proc_factory(): @@ -22,3 +25,19 @@ def proc_factory(): finally: for proc in procs: proc.close() + + +@pytest.fixture +def num_p2pds(): + return 1 + + +@pytest.fixture +async def p2pds(num_p2pds, is_host_secure, unused_tcp_port_factory): + p2pds = await asyncio.gather( + *[make_p2pd(unused_tcp_port_factory, is_host_secure) for _ in range(num_p2pds)] + ) + try: + yield p2pds + finally: + await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 9bcb292f..c56bff72 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -39,14 +39,16 @@ class P2PDProcess: cmd: str = str(P2PD_PATH) args: List[Any] + _tasks: List["asyncio.Future[Any]"] + def __init__( self, control_maddr: Multiaddr, is_secure: bool, - is_pubsub_enabled=True, - is_gossipsub=True, - is_pubsub_signing=False, - is_pubsub_signing_strict=False, + is_pubsub_enabled: bool = True, + is_gossipsub: bool = True, + is_pubsub_signing: bool = False, + is_pubsub_signing_strict: bool = False, ) -> None: args = [f"-listen={str(control_maddr)}"] # NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. @@ -68,6 +70,7 @@ class P2PDProcess: # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 self.args = args + self._tasks = [] async def wait_until_ready(self): lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:") @@ -84,6 +87,24 @@ class P2PDProcess: # Sleep a little bit to ensure the listener is up after logs are emitted. await asyncio.sleep(0.01) + async def start_printing_logs(self) -> None: + async def _print_from_stream( + src_name: str, reader: asyncio.StreamReader + ) -> None: + while True: + line = await reader.readline() + if line != b"": + print(f"{src_name}\t: {line.rstrip().decode()}") + await asyncio.sleep(0.01) + + self._tasks.append( + asyncio.ensure_future(_print_from_stream("out", self.proc.stdout)) + ) + self._tasks.append( + asyncio.ensure_future(_print_from_stream("err", self.proc.stderr)) + ) + await asyncio.sleep(0) + async def start(self) -> None: self.proc = await asyncio.subprocess.create_subprocess_exec( self.cmd, @@ -93,10 +114,13 @@ class P2PDProcess: bufsize=0, ) await self.wait_until_ready() + await self.start_printing_logs() async def close(self) -> None: self.proc.terminate() await self.proc.wait() + for task in self._tasks: + task.cancel() class Daemon: @@ -165,5 +189,4 @@ async def make_p2pd( peer_info = info_from_p2p_addr( listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) ) - print(f"!@# peer_info: peer_id={peer_info.peer_id}, maddrs={peer_info.addrs}") return Daemon(p2pd_proc, p2pc, peer_info) diff --git a/tests/interop/test_bindings.py b/tests/interop/test_bindings.py new file mode 100644 index 00000000..7844d4bb --- /dev/null +++ b/tests/interop/test_bindings.py @@ -0,0 +1,29 @@ +import asyncio + +from multiaddr import Multiaddr +import pytest + + +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_connect(hosts, p2pds): + p2pd = p2pds[0] + host = hosts[0] + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Py + await host.connect(p2pd.peer_info) + assert len(await p2pd.control.list_peers()) == 1 + # Test: `disconnect` from Py + await host.disconnect(p2pd.peer_id) + assert len(await p2pd.control.list_peers()) == 0 + # Test: connect from Go + py_peer_id = host.get_id() + await p2pd.control.connect( + host.get_id(), + [host.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{py_peer_id.to_string()}"))], + ) + assert len(host.get_network().connections) == 1 + # Test: `disconnect` from Go + await p2pd.control.disconnect(py_peer_id) + # FIXME: Failed to handle disconnect + # assert len(host.get_network().connections) == 0 diff --git a/tests/interop/test_pubsub_bind.py b/tests/interop/test_pubsub_bind.py deleted file mode 100644 index 66793b40..00000000 --- a/tests/interop/test_pubsub_bind.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest - -from .daemon import make_p2pd - - -@pytest.mark.parametrize("num_hosts", (1,)) -@pytest.mark.asyncio -async def test_pubsub_init(hosts, is_host_secure, unused_tcp_port_factory): - try: - p2pd = await make_p2pd(unused_tcp_port_factory, is_host_secure) - host = hosts[0] - peers = await p2pd.control.list_peers() - assert len(peers) == 0 - await host.connect(p2pd.peer_info) - peers = await p2pd.control.list_peers() - assert len(peers) != 0 - finally: - await p2pd.close() diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index 1755ee59..e69de29b 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -1,51 +0,0 @@ -import pytest - -from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory -from tests.pubsub.configs import GOSSIPSUB_PARAMS - - -@pytest.fixture -def floodsubs(num_hosts): - return FloodsubFactory.create_batch(num_hosts) - - -@pytest.fixture -def gossipsub_params(): - return GOSSIPSUB_PARAMS - - -@pytest.fixture -def gossipsubs(num_hosts, gossipsub_params): - yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) - # TODO: Clean up - - -def _make_pubsubs(hosts, pubsub_routers, cache_size): - if len(pubsub_routers) != len(hosts): - raise ValueError( - f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " - f"length of hosts={len(hosts)}" - ) - return tuple( - PubsubFactory(host=host, router=router, cache_size=cache_size) - for host, router in zip(hosts, pubsub_routers) - ) - - -@pytest.fixture -def pubsub_cache_size(): - return None # default - - -@pytest.fixture -def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): - _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) - yield _pubsubs_fsub - # TODO: Clean up - - -@pytest.fixture -def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): - _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) - yield _pubsubs_gsub - # TODO: Clean up From a883816881f41465711d79051f16f89669ddf9cb Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 2 Sep 2019 18:40:12 +0800 Subject: [PATCH 05/33] Add `connect` utility function --- tests/interop/test_bindings.py | 15 +++++-------- tests/interop/test_pubsub.py | 7 ++++++ tests/interop/utils.py | 40 ++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 tests/interop/test_pubsub.py create mode 100644 tests/interop/utils.py diff --git a/tests/interop/test_bindings.py b/tests/interop/test_bindings.py index 7844d4bb..1189e0b7 100644 --- a/tests/interop/test_bindings.py +++ b/tests/interop/test_bindings.py @@ -1,8 +1,7 @@ -import asyncio - -from multiaddr import Multiaddr import pytest +from .utils import connect + @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio @@ -11,19 +10,15 @@ async def test_connect(hosts, p2pds): host = hosts[0] assert len(await p2pd.control.list_peers()) == 0 # Test: connect from Py - await host.connect(p2pd.peer_info) + await connect(host, p2pd) assert len(await p2pd.control.list_peers()) == 1 # Test: `disconnect` from Py await host.disconnect(p2pd.peer_id) assert len(await p2pd.control.list_peers()) == 0 # Test: connect from Go - py_peer_id = host.get_id() - await p2pd.control.connect( - host.get_id(), - [host.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{py_peer_id.to_string()}"))], - ) + await connect(p2pd, host) assert len(host.get_network().connections) == 1 # Test: `disconnect` from Go - await p2pd.control.disconnect(py_peer_id) + await p2pd.control.disconnect(host.get_id()) # FIXME: Failed to handle disconnect # assert len(host.get_network().connections) == 0 diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py new file mode 100644 index 00000000..a9ad1f8e --- /dev/null +++ b/tests/interop/test_pubsub.py @@ -0,0 +1,7 @@ +import pytest + + +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_gossipsub(pubsubs_gsub, p2pds): + pass diff --git a/tests/interop/utils.py b/tests/interop/utils.py new file mode 100644 index 00000000..d6137279 --- /dev/null +++ b/tests/interop/utils.py @@ -0,0 +1,40 @@ +import asyncio +from typing import Union + +from multiaddr import Multiaddr + +from libp2p.host.host_interface import IHost +from libp2p.peer.peerinfo import PeerInfo + +from .daemon import Daemon + +TDaemonOrHost = Union[IHost, Daemon] + + +async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: + # Type check + err_msg = ( + f"Type of type(a)={type(a)} or type(b)={type(b)} is wrong." + "Should be either `IHost` or `Daemon`" + ) + assert all( + [isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)] + ), err_msg + + # TODO: Get peer info + peer_info: PeerInfo + if isinstance(b, Daemon): + peer_info = b.peer_info + else: # isinstance(b, IHost) + peer_id = b.get_id() + maddrs = [ + b.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ] + peer_info = PeerInfo(peer_id, maddrs) + # TODO: connect to peer info + if isinstance(a, Daemon): + await a.control.connect(peer_info.peer_id, peer_info.addrs) + else: # isinstance(b, IHost) + await a.connect(peer_info) + # Allow additional sleep for both side to establish the connection. + await asyncio.sleep(0.01) From 3717dc9adf6d45c6687bbfe9c2a6f2e791ba3420 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 2 Sep 2019 21:01:13 +0800 Subject: [PATCH 06/33] Add helper functions --- libp2p/pubsub/pubsub.py | 13 +++++----- tests/interop/test_pubsub.py | 16 +++++++++++-- tests/interop/utils.py | 46 +++++++++++++++++++++++++----------- tests/pubsub/configs.py | 8 +++++-- tests/pubsub/test_pubsub.py | 3 +-- 5 files changed, 60 insertions(+), 26 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a19c99ae..52c4b55c 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -16,6 +16,7 @@ from typing import ( from lru import LRU +from libp2p.utils import encode_varint_prefixed from libp2p.exceptions import ValidationError from libp2p.host.host_interface import IHost from libp2p.network.stream.net_stream_interface import INetStream @@ -131,7 +132,7 @@ class Pubsub: # Call handle peer to keep waiting for updates to peer queue asyncio.ensure_future(self.handle_peer_queue()) - def get_hello_packet(self) -> bytes: + def get_hello_packet(self) -> rpc_pb2.RPC: """ Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics @@ -141,7 +142,7 @@ class Pubsub: packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) - return packet.SerializeToString() + return packet async def continuously_read_stream(self, stream: INetStream) -> None: """ @@ -227,9 +228,9 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet - hello: bytes = self.get_hello_packet() + hello = self.get_hello_packet() + await stream.write(hello.SerializeToString()) - await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continuously_read_stream(stream)) # Force context switch @@ -257,8 +258,8 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet - hello: bytes = self.get_hello_packet() - await stream.write(hello) + hello = self.get_hello_packet() + await stream.write(hello.SerializeToString()) # TODO: Investigate whether this should be replaced by `handlePeerEOF` # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501 diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index a9ad1f8e..bbb9b8db 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -1,7 +1,19 @@ +import asyncio + import pytest +from .utils import connect + + +TOPIC = "TOPIC_0123" + @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_gossipsub(pubsubs_gsub, p2pds): - pass +async def test_pubsub_peers(pubsubs_gsub, p2pds): + # await connect(pubsubs_gsub[0].host, p2pds[0]) + await connect(p2pds[0], pubsubs_gsub[0].host) + sub = await pubsubs_gsub[0].subscribe(TOPIC) + await asyncio.sleep(1) + peers = await p2pds[0].control.pubsub_list_peers(TOPIC) + print(f"!@# peers={peers}") diff --git a/tests/interop/utils.py b/tests/interop/utils.py index d6137279..43b93a64 100644 --- a/tests/interop/utils.py +++ b/tests/interop/utils.py @@ -5,12 +5,35 @@ from multiaddr import Multiaddr from libp2p.host.host_interface import IHost from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.id import ID from .daemon import Daemon TDaemonOrHost = Union[IHost, Daemon] +def _get_peer_info(node: TDaemonOrHost) -> PeerInfo: + peer_info: PeerInfo + if isinstance(node, Daemon): + peer_info = node.peer_info + else: # isinstance(node, IHost) + peer_id = node.get_id() + maddrs = [ + node.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) + ] + peer_info = PeerInfo(peer_id, maddrs) + return peer_info + + +async def _is_peer(peer_id: ID, node: TDaemonOrHost) -> bool: + if isinstance(node, Daemon): + pinfos = await node.control.list_peers() + peers = tuple([pinfo.peer_id for pinfo in pinfos]) + return peer_id in peers + else: # isinstance(node, IHost) + return peer_id in node.get_network().connections + + async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: # Type check err_msg = ( @@ -21,20 +44,15 @@ async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: [isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)] ), err_msg - # TODO: Get peer info - peer_info: PeerInfo - if isinstance(b, Daemon): - peer_info = b.peer_info - else: # isinstance(b, IHost) - peer_id = b.get_id() - maddrs = [ - b.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}")) - ] - peer_info = PeerInfo(peer_id, maddrs) - # TODO: connect to peer info + b_peer_info = _get_peer_info(b) if isinstance(a, Daemon): - await a.control.connect(peer_info.peer_id, peer_info.addrs) + await a.control.connect(b_peer_info.peer_id, b_peer_info.addrs) else: # isinstance(b, IHost) - await a.connect(peer_info) + await a.connect(b_peer_info) # Allow additional sleep for both side to establish the connection. - await asyncio.sleep(0.01) + await asyncio.sleep(0.1) + + a_peer_info = _get_peer_info(a) + + assert await _is_peer(b_peer_info.peer_id, a) + assert await _is_peer(a_peer_info.peer_id, b) diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index e5adfad4..e0737878 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -1,7 +1,11 @@ from typing import NamedTuple -FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" -GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" +from libp2p.pubsub import floodsub +from libp2p.pubsub import gossipsub + + +FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID +GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID class GossipsubParams(NamedTuple): diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 7a9ff3e7..59df7d7a 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -70,8 +70,7 @@ async def test_peers_subscribe(pubsubs_fsub): @pytest.mark.asyncio async def test_get_hello_packet(pubsubs_fsub): def _get_hello_packet_topic_ids(): - packet = rpc_pb2.RPC() - packet.ParseFromString(pubsubs_fsub[0].get_hello_packet()) + packet = pubsubs_fsub[0].get_hello_packet() return tuple(sub.topicid for sub in packet.subscriptions) # Test: No subscription, so there should not be any topic ids in the hello packet. From 194b494057d7631f8410946b1e1cd5097f16f82c Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 2 Sep 2019 23:21:57 +0800 Subject: [PATCH 07/33] Tested against subscriptions and publish --- libp2p/network/swarm.py | 4 +++ libp2p/pubsub/pubsub.py | 50 ++++++++++++++------------------- libp2p/pubsub/pubsub_notifee.py | 6 +--- tests/interop/test_pubsub.py | 23 +++++++++++---- tests/interop/utils.py | 2 +- tests/pubsub/configs.py | 4 +-- 6 files changed, 46 insertions(+), 43 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 300ad71e..e522a4b2 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -217,6 +217,10 @@ class Swarm(INetwork): "fail to upgrade the connection to a secured connection" ) from error peer_id = secured_conn.get_remote_peer() + peer_ip, peer_port = writer.get_extra_info("peername") + peer_maddr = Multiaddr(f"/ip4/{peer_ip}/tcp/{peer_port}") + # TODO: Fix the ttl + self.peerstore.add_addr(peer_id, peer_maddr, 12345678) try: muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 52c4b55c..a02f5c31 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -16,12 +16,12 @@ from typing import ( from lru import LRU -from libp2p.utils import encode_varint_prefixed from libp2p.exceptions import ValidationError from libp2p.host.host_interface import IHost from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -153,11 +153,14 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming: bytes = (await stream.read()) + print("!@# continuously_read_stream: waiting") + incoming: bytes = await read_varint_prefixed_bytes(stream) + print(f"!@# continuously_read_stream: incoming={incoming}") rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: + print("!@# continuously_read_stream: publish") # deal with RPC.publish for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): @@ -167,6 +170,7 @@ class Pubsub: asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: + print("!@# continuously_read_stream: subscriptions") # deal with RPC.subscriptions # We don't need to relay the subscription to our # peers because a given node only needs its peers @@ -179,6 +183,7 @@ class Pubsub: # This is necessary because `control` is an optional field in pb2. # Ref: https://developers.google.com/protocol-buffers/docs/reference/python-generated#singular-fields-proto2 # noqa: E501 if rpc_incoming.HasField("control"): + print("!@# continuously_read_stream: control") # Pass rpc to router so router could perform custom logic await self.router.handle_rpc(rpc_incoming, peer_id) @@ -221,20 +226,23 @@ class Pubsub: on one of the supported pubsub protocols. :param stream: newly created stream """ - # Add peer + await self.continuously_read_stream(stream) + + async def _handle_new_peer(self, peer_id: ID) -> None: + # Open a stream to peer on existing connection + # (we know connection exists since that's the only way + # an element gets added to peer_queue) + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + # Map peer to stream - peer_id: ID = stream.mplex_conn.peer_id self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.SerializeToString()) - - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) - # Force context switch - await asyncio.sleep(0) + await stream.write(encode_varint_prefixed(hello.SerializeToString())) + # TODO: Check EOF in the future in the stream's lifetime. + # TODO: Check if the peer in black list. + self.router.add_peer(peer_id, stream.get_protocol()) async def handle_peer_queue(self) -> None: """ @@ -247,25 +255,9 @@ class Pubsub: peer_id: ID = await self.peer_queue.get() - # Open a stream to peer on existing connection - # (we know connection exists since that's the only way - # an element gets added to peer_queue) - stream: INetStream = await self.host.new_stream(peer_id, self.protocols) - # Add Peer - # Map peer to stream - self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) - - # Send hello packet - hello = self.get_hello_packet() - await stream.write(hello.SerializeToString()) - - # TODO: Investigate whether this should be replaced by `handlePeerEOF` - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501 - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) + asyncio.ensure_future(self._handle_new_peer(peer_id)) # Force context switch await asyncio.sleep(0) @@ -366,7 +358,7 @@ class Pubsub: # Broadcast message for stream in self.peers.values(): # Write message to stream - await stream.write(raw_msg) + await stream.write(encode_varint_prefixed(raw_msg)) async def publish(self, topic_id: str, data: bytes) -> None: """ diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 8878a276..6ecab1ab 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -36,11 +36,7 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - - # Only add peer_id if we are initiator (otherwise we would end up - # with two pubsub streams between us and the peer) - if conn.initiator: - await self.initiator_peers_queue.put(conn.peer_id) + await self.initiator_peers_queue.put(conn.peer_id) async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None: pass diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index bbb9b8db..5c7f70d5 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -2,18 +2,31 @@ import asyncio import pytest +from libp2p.peer.id import ID + from .utils import connect - TOPIC = "TOPIC_0123" +DATA = b"DATA_0123" @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_pubsub_peers(pubsubs_gsub, p2pds): +async def test_pubsub_subscribe(pubsubs_gsub, p2pds): # await connect(pubsubs_gsub[0].host, p2pds[0]) await connect(p2pds[0], pubsubs_gsub[0].host) + peers = await p2pds[0].control.pubsub_list_peers("") + assert pubsubs_gsub[0].host.get_id() in peers + # FIXME: + assert p2pds[0].peer_id in pubsubs_gsub[0].peers + sub = await pubsubs_gsub[0].subscribe(TOPIC) - await asyncio.sleep(1) - peers = await p2pds[0].control.pubsub_list_peers(TOPIC) - print(f"!@# peers={peers}") + peers_topic = await p2pds[0].control.pubsub_list_peers(TOPIC) + await asyncio.sleep(0.1) + assert pubsubs_gsub[0].host.get_id() in peers_topic + + await p2pds[0].control.pubsub_publish(TOPIC, DATA) + msg = await sub.get() + assert ID(msg.from_id) == p2pds[0].peer_id + assert msg.data == DATA + assert len(msg.topicIDs) == 1 and msg.topicIDs[0] == TOPIC diff --git a/tests/interop/utils.py b/tests/interop/utils.py index 43b93a64..d0506067 100644 --- a/tests/interop/utils.py +++ b/tests/interop/utils.py @@ -4,8 +4,8 @@ from typing import Union from multiaddr import Multiaddr from libp2p.host.host_interface import IHost -from libp2p.peer.peerinfo import PeerInfo from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo from .daemon import Daemon diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index e0737878..b2053252 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -1,8 +1,6 @@ from typing import NamedTuple -from libp2p.pubsub import floodsub -from libp2p.pubsub import gossipsub - +from libp2p.pubsub import floodsub, gossipsub FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID From fd1f466002726d0ba66983235e6ff164bc1eabdd Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 14:12:16 +0800 Subject: [PATCH 08/33] Fix: failed to open stream using existing conn Fix #233 --- libp2p/network/swarm.py | 88 +++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 48 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index e522a4b2..895ceb57 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -15,6 +15,7 @@ from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol +from libp2p.peer.peerstore import PeerStoreError from .connection.raw_connection import RawConnection from .exceptions import SwarmException @@ -92,55 +93,55 @@ class Swarm(INetwork): :return: muxed connection """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) + if peer_id in self.connections: + # If muxed connection already exists for peer_id, + # set muxed connection equal to existing muxed connection + return self.connections[peer_id] + + try: + # Get peer info from peer store + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError: + raise SwarmException(f"No known addresses to peer {peer_id}") if not addrs: - raise SwarmException("No known addresses to peer") + raise SwarmException(f"No known addresses to peer {peer_id}") if not self.router: multiaddr = addrs[0] else: multiaddr = self.router.find_peer(peer_id) + # Dial peer (connection to peer does not yet exist) + # Transport dials peer (gets back a raw conn) + raw_conn = await self.transport.dial(multiaddr, self.self_id) - if peer_id in self.connections: - # If muxed connection already exists for peer_id, - # set muxed connection equal to existing muxed connection - muxed_conn = self.connections[peer_id] - else: - # Dial peer (connection to peer does not yet exist) - # Transport dials peer (gets back a raw conn) - raw_conn = await self.transport.dial(multiaddr, self.self_id) + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn + try: + secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await raw_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a secured connection from {peer_id}" + ) from error + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error - # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure - # the conn and then mux the conn - try: - secured_conn = await self.upgrader.upgrade_security( - raw_conn, peer_id, True - ) - except SecurityUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await raw_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a secured connection from {peer_id}" - ) from error - try: - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) - except MuxerUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await secured_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a muxed connection from {peer_id}" - ) from error + # Store muxed connection in connections + self.connections[peer_id] = muxed_conn - # Store muxed connection in connections - self.connections[peer_id] = muxed_conn - - # Call notifiers since event occurred - for notifee in self.notifees: - await notifee.connected(self, muxed_conn) + # Call notifiers since event occurred + for notifee in self.notifees: + await notifee.connected(self, muxed_conn) return muxed_conn @@ -152,11 +153,6 @@ class Swarm(INetwork): :param protocol_id: protocol id :return: net stream instance """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) - - if not addrs: - raise SwarmException("No known addresses to peer") muxed_conn = await self.dial_peer(peer_id) @@ -217,10 +213,6 @@ class Swarm(INetwork): "fail to upgrade the connection to a secured connection" ) from error peer_id = secured_conn.get_remote_peer() - peer_ip, peer_port = writer.get_extra_info("peername") - peer_maddr = Multiaddr(f"/ip4/{peer_ip}/tcp/{peer_port}") - # TODO: Fix the ttl - self.peerstore.add_addr(peer_id, peer_maddr, 12345678) try: muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id From 33dae87c354142705546a62637ffb8d097af20f8 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 16:07:44 +0800 Subject: [PATCH 09/33] Add pubsub test for gossipsub --- libp2p/pubsub/gossipsub.py | 20 +---- libp2p/pubsub/pubsub.py | 4 +- tests/interop/test_pubsub.py | 168 +++++++++++++++++++++++++++++++---- 3 files changed, 155 insertions(+), 37 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3e8b0d9a..64a56d11 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Iterable, List, Sequence, Set from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed from .mcache import MessageCache from .pb import rpc_pb2 @@ -169,7 +170,7 @@ class GossipSub(IPubsubRouter): # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. - await stream.write(rpc_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -276,19 +277,6 @@ class GossipSub(IPubsubRouter): return "flood" return "unknown" - async def deliver_messages_to_peers( - self, peers: List[ID], msg_sender: ID, origin_id: ID, serialized_packet: bytes - ) -> None: - for peer_id_in_topic in peers: - # Forward to all peers that are not the - # message sender and are not the message origin - - if peer_id_in_topic not in (msg_sender, origin_id): - stream = self.pubsub.peers[peer_id_in_topic] - - # Publish the packet - await stream.write(serialized_packet) - # Heartbeat async def heartbeat(self) -> None: """ @@ -511,7 +499,7 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - await peer_stream.write(rpc_msg) + await peer_stream.write(encode_varint_prefixed(rpc_msg)) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -603,4 +591,4 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - await peer_stream.write(rpc_msg) + await peer_stream.write(encode_varint_prefixed(rpc_msg)) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a02f5c31..c26bc547 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -72,7 +72,7 @@ class Pubsub: topic_validators: Dict[str, TopicValidator] - # NOTE: Be sure it is increased atomically everytime. + # TODO: Be sure it is increased atomically everytime. counter: int # uint64 def __init__( @@ -165,8 +165,6 @@ class Pubsub: for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): continue - # TODO(mhchia): This will block this read_stream loop until all data are pushed. - # Should investigate further if this is an issue. asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index 5c7f70d5..53aee074 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -1,32 +1,164 @@ import asyncio +import functools import pytest from libp2p.peer.id import ID +from libp2p.utils import read_varint_prefixed_bytes +from libp2p.pubsub.pb import rpc_pb2 + +from p2pclient.pb import p2pd_pb2 from .utils import connect -TOPIC = "TOPIC_0123" -DATA = b"DATA_0123" +TOPIC_0 = "ABALA" +TOPIC_1 = "YOOOO" -@pytest.mark.parametrize("num_hosts", (1,)) +async def p2pd_subscribe(p2pd, topic) -> "asyncio.Queue[rpc_pb2.Message]": + reader, writer = await p2pd.control.pubsub_subscribe(topic) + + queue = asyncio.Queue() + + async def _read_pubsub_msg() -> None: + writer_closed_task = asyncio.ensure_future(writer.wait_closed()) + + while True: + done, pending = await asyncio.wait( + [read_varint_prefixed_bytes(reader), writer_closed_task], + return_when=asyncio.FIRST_COMPLETED, + ) + done_tasks = tuple(done) + if writer.is_closing(): + return + read_task = done_tasks[0] + # Sanity check + assert read_task._coro.__name__ == "read_varint_prefixed_bytes" + msg_bytes = read_task.result() + ps_msg = p2pd_pb2.PSMessage() + ps_msg.ParseFromString(msg_bytes) + # Fill in the message used in py-libp2p + msg = rpc_pb2.Message( + from_id=ps_msg.from_field, + data=ps_msg.data, + seqno=ps_msg.seqno, + topicIDs=ps_msg.topicIDs, + signature=ps_msg.signature, + key=ps_msg.key, + ) + queue.put_nowait(msg) + + asyncio.ensure_future(_read_pubsub_msg()) + await asyncio.sleep(0) + return queue + + +def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None: + assert msg.data == data and msg.from_id == from_peer_id + + +@pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) @pytest.mark.asyncio -async def test_pubsub_subscribe(pubsubs_gsub, p2pds): - # await connect(pubsubs_gsub[0].host, p2pds[0]) - await connect(p2pds[0], pubsubs_gsub[0].host) - peers = await p2pds[0].control.pubsub_list_peers("") - assert pubsubs_gsub[0].host.get_id() in peers - # FIXME: - assert p2pds[0].peer_id in pubsubs_gsub[0].peers +async def test_pubsub(pubsubs_gsub, p2pds): + # + # Test: Recognize pubsub peers on connection. + # + py_pubsub = pubsubs_gsub[0] + # go0 <-> py <-> go1 + await connect(p2pds[0], py_pubsub.host) + await connect(py_pubsub.host, p2pds[1]) + py_peer_id = py_pubsub.host.get_id() + # Check pubsub peers + pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("") + assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id + pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("") + assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id + assert ( + len(py_pubsub.peers) == 2 + and p2pds[0].peer_id in py_pubsub.peers + and p2pds[1].peer_id in py_pubsub.peers + ) - sub = await pubsubs_gsub[0].subscribe(TOPIC) - peers_topic = await p2pds[0].control.pubsub_list_peers(TOPIC) + # + # Test: `subscribe`. + # + # (name, topics) + # (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1]) + sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0) + sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1) + sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0) + sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1) + sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1) + # Check topic peers await asyncio.sleep(0.1) - assert pubsubs_gsub[0].host.get_id() in peers_topic + # go_0 + go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0) + assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0] + go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1) + assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0] + # py + py_topic_0_peers = py_pubsub.peer_topics[TOPIC_0] + assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0] + # go_1 + go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1) + assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0] - await p2pds[0].control.pubsub_publish(TOPIC, DATA) - msg = await sub.get() - assert ID(msg.from_id) == p2pds[0].peer_id - assert msg.data == DATA - assert len(msg.topicIDs) == 1 and msg.topicIDs[0] == TOPIC + # + # Test: `publish` + # + # 1. py publishes + # - 1.1. py publishes data_11 to topic_0, py and go_0 receives. + # - 1.2. py publishes data_12 to topic_1, all receive. + # 2. go publishes + # - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + # - 2.2. go_1 publishes data_22 to topic_1, all receive. + + # 1.1. py publishes data_11 to topic_0, py and go_0 receives. + data_11 = b"data_11" + await py_pubsub.publish(TOPIC_0, data_11) + validate_11 = functools.partial( + validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id + ) + validate_11(await sub_py_topic_0.get()) + validate_11(await sub_go_0_topic_0.get()) + + # 1.2. py publishes data_12 to topic_1, all receive. + data_12 = b"data_12" + validate_12 = functools.partial( + validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id + ) + await py_pubsub.publish(TOPIC_1, data_12) + validate_12(await sub_py_topic_1.get()) + validate_12(await sub_go_0_topic_1.get()) + validate_12(await sub_go_1_topic_1.get()) + + # 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive. + data_21 = b"data_21" + validate_21 = functools.partial( + validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id + ) + await p2pds[0].control.pubsub_publish(TOPIC_0, data_21) + validate_21(await sub_py_topic_0.get()) + validate_21(await sub_go_0_topic_0.get()) + + # 2.2. go_1 publishes data_22 to topic_1, all receive. + data_22 = b"data_22" + validate_22 = functools.partial( + validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id + ) + await p2pds[1].control.pubsub_publish(TOPIC_1, data_22) + validate_22(await sub_py_topic_1.get()) + validate_22(await sub_go_0_topic_1.get()) + validate_22(await sub_go_1_topic_1.get()) + + # + # Test: `unsubscribe` and re`subscribe` + # + await py_pubsub.unsubscribe(TOPIC_0) + await asyncio.sleep(0.1) + assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) + await py_pubsub.subscribe(TOPIC_0) + await asyncio.sleep(0.1) + assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0)) + assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0)) From 7385a7a67760dd8b498139fdada08f7579862845 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 16:49:00 +0800 Subject: [PATCH 10/33] Add `is_gossipsub` fixture in interop test To use the same code to test against both routers: floodsub and gossipsub. --- libp2p/network/swarm.py | 2 +- libp2p/pubsub/floodsub.py | 3 +- tests/conftest.py | 50 +--------------------------------- tests/interop/conftest.py | 31 +++++++++++++++++++-- tests/interop/test_pubsub.py | 10 +++---- tests/pubsub/conftest.py | 43 +++++++++++++++++++++++++++++ tests/pubsub/test_gossipsub.py | 7 +++-- 7 files changed, 86 insertions(+), 60 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 895ceb57..bccfdac1 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -4,6 +4,7 @@ from typing import Callable, Dict, List, Sequence from multiaddr import Multiaddr from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore_interface import IPeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient @@ -15,7 +16,6 @@ from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol -from libp2p.peer.peerstore import PeerStoreError from .connection.raw_connection import RawConnection from .exceptions import SwarmException diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 3fc713b2..3ded0fe2 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -2,6 +2,7 @@ from typing import Iterable, List, Sequence from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed from .pb import rpc_pb2 from .pubsub import Pubsub @@ -78,7 +79,7 @@ class FloodSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - await stream.write(rpc_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) async def join(self, topic: str) -> None: """ diff --git a/tests/conftest.py b/tests/conftest.py index 188c655d..fd753be0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,7 @@ import asyncio import pytest from .configs import LISTEN_MADDR -from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory -from .pubsub.configs import GOSSIPSUB_PARAMS +from .factories import HostFactory @pytest.fixture @@ -32,50 +31,3 @@ async def hosts(num_hosts, is_host_secure): await asyncio.gather( *[_host.close() for _host in _hosts], return_exceptions=True ) - - -@pytest.fixture -def floodsubs(num_hosts): - return FloodsubFactory.create_batch(num_hosts) - - -@pytest.fixture -def gossipsub_params(): - return GOSSIPSUB_PARAMS - - -@pytest.fixture -def gossipsubs(num_hosts, gossipsub_params): - yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) - # TODO: Clean up - - -def _make_pubsubs(hosts, pubsub_routers, cache_size): - if len(pubsub_routers) != len(hosts): - raise ValueError( - f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " - f"length of hosts={len(hosts)}" - ) - return tuple( - PubsubFactory(host=host, router=router, cache_size=cache_size) - for host, router in zip(hosts, pubsub_routers) - ) - - -@pytest.fixture -def pubsub_cache_size(): - return None # default - - -@pytest.fixture -def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): - _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) - yield _pubsubs_fsub - # TODO: Clean up - - -@pytest.fixture -def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): - _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) - yield _pubsubs_gsub - # TODO: Clean up diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index ef9de3f5..06dfeecb 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -4,6 +4,9 @@ import sys import pexpect import pytest +from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory +from tests.pubsub.configs import GOSSIPSUB_PARAMS + from .daemon import make_p2pd @@ -33,11 +36,35 @@ def num_p2pds(): @pytest.fixture -async def p2pds(num_p2pds, is_host_secure, unused_tcp_port_factory): +def is_gossipsub(): + return True + + +@pytest.fixture +async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory): p2pds = await asyncio.gather( - *[make_p2pd(unused_tcp_port_factory, is_host_secure) for _ in range(num_p2pds)] + *[ + make_p2pd( + unused_tcp_port_factory, is_host_secure, is_gossipsub=is_gossipsub + ) + for _ in range(num_p2pds) + ] ) try: yield p2pds finally: await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) + + +@pytest.fixture +def pubsubs(num_hosts, hosts, is_gossipsub): + routers = None + if is_gossipsub: + routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) + else: + routers = FloodsubFactory.create_batch(num_hosts) + _pubsubs = tuple( + PubsubFactory(host=host, router=router) for host, router in zip(hosts, routers) + ) + yield _pubsubs + # TODO: Clean up diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index 53aee074..bb37e352 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -1,13 +1,12 @@ import asyncio import functools +from p2pclient.pb import p2pd_pb2 import pytest from libp2p.peer.id import ID -from libp2p.utils import read_varint_prefixed_bytes from libp2p.pubsub.pb import rpc_pb2 - -from p2pclient.pb import p2pd_pb2 +from libp2p.utils import read_varint_prefixed_bytes from .utils import connect @@ -57,13 +56,14 @@ def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> assert msg.data == data and msg.from_id == from_peer_id +@pytest.mark.parametrize("is_gossipsub", (True, False)) @pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) @pytest.mark.asyncio -async def test_pubsub(pubsubs_gsub, p2pds): +async def test_pubsub(pubsubs, p2pds): # # Test: Recognize pubsub peers on connection. # - py_pubsub = pubsubs_gsub[0] + py_pubsub = pubsubs[0] # go0 <-> py <-> go1 await connect(p2pds[0], py_pubsub.host) await connect(py_pubsub.host, p2pds[1]) diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index e69de29b..246ca158 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -0,0 +1,43 @@ +import pytest + +from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory + +from .configs import GOSSIPSUB_PARAMS + + +def _make_pubsubs(hosts, pubsub_routers, cache_size): + if len(pubsub_routers) != len(hosts): + raise ValueError( + f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " + f"length of hosts={len(hosts)}" + ) + return tuple( + PubsubFactory(host=host, router=router, cache_size=cache_size) + for host, router in zip(hosts, pubsub_routers) + ) + + +@pytest.fixture +def pubsub_cache_size(): + return None # default + + +@pytest.fixture +def gossipsub_params(): + return GOSSIPSUB_PARAMS + + +@pytest.fixture +def pubsubs_fsub(num_hosts, hosts, pubsub_cache_size): + floodsubs = FloodsubFactory.create_batch(num_hosts) + _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) + yield _pubsubs_fsub + # TODO: Clean up + + +@pytest.fixture +def pubsubs_gsub(num_hosts, hosts, pubsub_cache_size, gossipsub_params): + gossipsubs = GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) + _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) + yield _pubsubs_gsub + # TODO: Clean up diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e091f669..a0a8a28d 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -14,7 +14,8 @@ from .utils import dense_connect, one_to_all_connect ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ) @pytest.mark.asyncio -async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): +async def test_join(num_hosts, hosts, pubsubs_gsub): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) hosts_indices = list(range(num_hosts)) topic = "test_join" @@ -85,7 +86,9 @@ async def test_leave(pubsubs_gsub): @pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.asyncio -async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch): +async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1 From 749ff275ede41276f83240a01e4610e2ac2ef434 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 16:55:42 +0800 Subject: [PATCH 11/33] Refactor `make_p2pd` Let `make_p2pd` get rid of `unused_tcp_port_factory`, which should only exist in fixtures/tests. --- tests/interop/conftest.py | 5 ++++- tests/interop/daemon.py | 9 ++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index 06dfeecb..982e74d1 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -45,7 +45,10 @@ async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory p2pds = await asyncio.gather( *[ make_p2pd( - unused_tcp_port_factory, is_host_secure, is_gossipsub=is_gossipsub + unused_tcp_port_factory(), + unused_tcp_port_factory(), + is_host_secure, + is_gossipsub=is_gossipsub, ) for _ in range(num_p2pds) ] diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index c56bff72..97391892 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -152,14 +152,15 @@ class Daemon: async def make_p2pd( - unused_tcp_port_factory, + daemon_control_port: int, + client_callback_port: int, is_secure: bool, is_pubsub_enabled=True, is_gossipsub=True, is_pubsub_signing=False, is_pubsub_signing_strict=False, ) -> Daemon: - control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}") + control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{daemon_control_port}") p2pd_proc = P2PDProcess( control_maddr, is_secure, @@ -169,9 +170,7 @@ async def make_p2pd( is_pubsub_signing_strict, ) await p2pd_proc.start() - client_callback_maddr = Multiaddr( - f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}" - ) + client_callback_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{client_callback_port}") p2pc = Client(control_maddr, client_callback_maddr) await p2pc.listen() peer_id, maddrs = await p2pc.identify() From 5280f3965c001ce65d8a7aaa66d0abe68732928e Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 17:41:17 +0800 Subject: [PATCH 12/33] Update install script for interop And adjust the structure of go packages for interop --- install_interop_go_pkgs.sh | 24 ++++++++++++++++++- .../interop/go_pkgs/{ => examples}/README.md | 0 .../go_pkgs/{ => examples}/echo/main.go | 0 tests/interop/go_pkgs/{ => examples}/go.mod | 0 tests/interop/go_pkgs/{ => examples}/go.sum | 0 .../go_pkgs/{ => examples}/utils/host.go | 0 6 files changed, 23 insertions(+), 1 deletion(-) rename tests/interop/go_pkgs/{ => examples}/README.md (100%) rename tests/interop/go_pkgs/{ => examples}/echo/main.go (100%) rename tests/interop/go_pkgs/{ => examples}/go.mod (100%) rename tests/interop/go_pkgs/{ => examples}/go.sum (100%) rename tests/interop/go_pkgs/{ => examples}/utils/host.go (100%) diff --git a/install_interop_go_pkgs.sh b/install_interop_go_pkgs.sh index cdf3193c..585d6205 100755 --- a/install_interop_go_pkgs.sh +++ b/install_interop_go_pkgs.sh @@ -1,5 +1,27 @@ #!/bin/bash +GO_PKGS_PATH=./tests/interop/go_pkgs + +DAEMON_REPO=go-libp2p-daemon +DAEMON_PATH=$GO_PKGS_PATH/$DAEMON_REPO + +EXAMPLES_PATHS=$GO_PKGS_PATH/examples + + go version -cd tests/interop/go_pkgs/ + +# Install `p2pd` +# FIXME: Use the canonical repo in libp2p, when we don't need `insecure`. +if [ ! -e "$DAEMON_PATH" ]; then + git clone git@github.com:mhchia/$DAEMON_REPO.git --branch test/add-options $DAEMON_PATH +fi +cd $DAEMON_PATH go install ./... + +cd - + +# Install example modeuls +cd $EXAMPLES_PATHS +go install ./... + +echo "Finish installing go modeuls for interop." diff --git a/tests/interop/go_pkgs/README.md b/tests/interop/go_pkgs/examples/README.md similarity index 100% rename from tests/interop/go_pkgs/README.md rename to tests/interop/go_pkgs/examples/README.md diff --git a/tests/interop/go_pkgs/echo/main.go b/tests/interop/go_pkgs/examples/echo/main.go similarity index 100% rename from tests/interop/go_pkgs/echo/main.go rename to tests/interop/go_pkgs/examples/echo/main.go diff --git a/tests/interop/go_pkgs/go.mod b/tests/interop/go_pkgs/examples/go.mod similarity index 100% rename from tests/interop/go_pkgs/go.mod rename to tests/interop/go_pkgs/examples/go.mod diff --git a/tests/interop/go_pkgs/go.sum b/tests/interop/go_pkgs/examples/go.sum similarity index 100% rename from tests/interop/go_pkgs/go.sum rename to tests/interop/go_pkgs/examples/go.sum diff --git a/tests/interop/go_pkgs/utils/host.go b/tests/interop/go_pkgs/examples/utils/host.go similarity index 100% rename from tests/interop/go_pkgs/utils/host.go rename to tests/interop/go_pkgs/examples/utils/host.go From 2ba7948f9561683ff8994feed1f8e7ad41a0de4b Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 18:52:55 +0800 Subject: [PATCH 13/33] Update bindings version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9ddfbd12..aa1aee77 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ extras_require = { "pytest-asyncio>=0.10.0,<1.0.0", "pexpect>=4.6,<5", # FIXME: Master branch. Use PyPI instead after it is released. - "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@2647296", + "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@19d6c1d", ], "lint": [ "mypy>=0.701,<1.0", From d1b0340164bda9ba340efe7a5a220a71bacb06ee Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 21:55:07 +0800 Subject: [PATCH 14/33] Update bindings version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index aa1aee77..f5c4a20a 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ extras_require = { "pytest-asyncio>=0.10.0,<1.0.0", "pexpect>=4.6,<5", # FIXME: Master branch. Use PyPI instead after it is released. - "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@19d6c1d", + "p2pclient @ git+https://git@github.com/mhchia/py-libp2p-daemon-bindings@628266f", ], "lint": [ "mypy>=0.701,<1.0", From c6d81d70b3d155765d5280f811353b0de7c4297a Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 22:25:29 +0800 Subject: [PATCH 15/33] Try 3.7-dev in CI Use https over ssh when cloning go-libp2p-daemon To avoid """ Warning: Permanently added the RSA host key for IP address '140.82.113.4' to the list of known hosts. Permission denied (publickey). fatal: Could not read from remote repository. """ Exit if clone fails Fix daemon url --- .travis.yml | 6 +++--- install_interop_go_pkgs.sh | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2373658c..a4c7a504 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,13 +2,13 @@ language: python matrix: include: - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=py37-test - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=lint - - python: 3.7 + - python: 3.7-dev dist: xenial env: TOXENV=py37-interop sudo: true diff --git a/install_interop_go_pkgs.sh b/install_interop_go_pkgs.sh index 585d6205..0f1e0259 100755 --- a/install_interop_go_pkgs.sh +++ b/install_interop_go_pkgs.sh @@ -13,15 +13,18 @@ go version # Install `p2pd` # FIXME: Use the canonical repo in libp2p, when we don't need `insecure`. if [ ! -e "$DAEMON_PATH" ]; then - git clone git@github.com:mhchia/$DAEMON_REPO.git --branch test/add-options $DAEMON_PATH + git clone https://github.com/mhchia/$DAEMON_REPO.git --branch test/add-options $DAEMON_PATH + if [ "$?" != 0 ]; then + echo "Failed to clone the daemon repo" + exit 1 + fi fi -cd $DAEMON_PATH -go install ./... + +cd $DAEMON_PATH && go install ./... cd - # Install example modeuls -cd $EXAMPLES_PATHS -go install ./... +cd $EXAMPLES_PATHS && go install ./... echo "Finish installing go modeuls for interop." From 4f7bb30d827d6b91bef9da754d1435df4878df18 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 22:59:44 +0800 Subject: [PATCH 16/33] Add `INetStream` to type `StreamReader` TODO: Make stream readers implement `Reader` --- libp2p/network/stream/net_stream.py | 1 + libp2p/typing.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 010bd922..ff78f5a8 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -7,6 +7,7 @@ from .net_stream_interface import INetStream class NetStream(INetStream): muxed_stream: IMuxedStream + # TODO: Why we expose `mplex_conn` here? mplex_conn: IMuxedConn protocol_id: TProtocol diff --git a/libp2p/typing.py b/libp2p/typing.py index f36d8ab7..ba776e19 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -9,5 +9,4 @@ if TYPE_CHECKING: TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] - -StreamReader = Union["IMuxedStream", IRawConnection] +StreamReader = Union["IMuxedStream", "INetStream", IRawConnection] From b23bf5d704f84c6f8b7f7bc9f9e96215e7104f8e Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 23:00:31 +0800 Subject: [PATCH 17/33] Avoid isort sorting the import wrong --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 0d85779c..2d50b08c 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ select = B,C,E,F,W,T4,B9 [isort] force_sort_within_sections=True -known_third_party=hypothesis,pytest,async_generator,cytoolz,trio_typing,pytest_trio +known_third_party=pytest,p2pclient multi_line_output=3 include_trailing_comma=True force_grid_wrap=0 From d7bce941d8c5f5ded759f98f35c9faa97611e94e Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 23:36:31 +0800 Subject: [PATCH 18/33] Fix wrong spelling --- install_interop_go_pkgs.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install_interop_go_pkgs.sh b/install_interop_go_pkgs.sh index 0f1e0259..978e0b0d 100755 --- a/install_interop_go_pkgs.sh +++ b/install_interop_go_pkgs.sh @@ -27,4 +27,4 @@ cd - # Install example modeuls cd $EXAMPLES_PATHS && go install ./... -echo "Finish installing go modeuls for interop." +echo "Finish installing go modules for interop." From 7f20ab781d8c5d46a5d6e450cb2b8845d9df4124 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 23:37:34 +0800 Subject: [PATCH 19/33] Fix gosssipsub tests --- libp2p/pubsub/gossipsub.py | 26 ++++++++++---------------- libp2p/pubsub/pubsub.py | 4 ---- tests/pubsub/test_gossipsub.py | 4 +++- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 64a56d11..f2b45217 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -4,6 +4,7 @@ import random from typing import Any, Dict, Iterable, List, Sequence, Set from libp2p.peer.id import ID +from libp2p.pubsub import floodsub from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed @@ -107,16 +108,19 @@ class GossipSub(IPubsubRouter): :param peer_id: id of peer to add :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ - - # Add peer to the correct peer list - peer_type = GossipSub.get_peer_type(protocol_id) - self.peers_to_protocol[peer_id] = protocol_id - if peer_type == "gossip": + if protocol_id == PROTOCOL_ID: self.peers_gossipsub.append(peer_id) - elif peer_type == "flood": + elif protocol_id == floodsub.PROTOCOL_ID: self.peers_floodsub.append(peer_id) + else: + # We should never enter here. Becuase the `protocol_id` is registered by your pubsub + # instance in multistream-select, but it is not the protocol that gossipsub supports, + # what we check above. In this case, probably we registered gossipsub to a wrong + # `protocol_id` in multistream-select, or wrong versions. + # TODO: Better handling + raise Exception(f"protocol is not supported: protocol_id={protocol_id}") def remove_peer(self, peer_id: ID) -> None: """ @@ -267,16 +271,6 @@ class GossipSub(IPubsubRouter): # Forget mesh[topic] self.mesh.pop(topic, None) - # Interface Helper Functions - @staticmethod - def get_peer_type(protocol_id: str) -> str: - # TODO: Do this in a better, more efficient way - if "gossipsub" in protocol_id: - return "gossip" - if "floodsub" in protocol_id: - return "flood" - return "unknown" - # Heartbeat async def heartbeat(self) -> None: """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c26bc547..46d2ee93 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -227,12 +227,8 @@ class Pubsub: await self.continuously_read_stream(stream) async def _handle_new_peer(self, peer_id: ID) -> None: - # Open a stream to peer on existing connection - # (we know connection exists since that's the only way - # an element gets added to peer_queue) stream: INetStream = await self.host.new_stream(peer_id, self.protocols) - # Map peer to stream self.peers[peer_id] = stream # Send hello packet diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index a0a8a28d..7a0efc2c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -140,7 +140,9 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): "num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),) ) @pytest.mark.asyncio -async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs): +async def test_handle_prune(pubsubs_gsub, hosts): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1 From 961e51fa2ed1aedb51f6fd93b25f003d7586ddba Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 23:39:29 +0800 Subject: [PATCH 20/33] Remove leftover prints --- libp2p/pubsub/pubsub.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 46d2ee93..b0833eef 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -153,14 +153,11 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - print("!@# continuously_read_stream: waiting") incoming: bytes = await read_varint_prefixed_bytes(stream) - print(f"!@# continuously_read_stream: incoming={incoming}") rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: - print("!@# continuously_read_stream: publish") # deal with RPC.publish for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): @@ -168,7 +165,6 @@ class Pubsub: asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: - print("!@# continuously_read_stream: subscriptions") # deal with RPC.subscriptions # We don't need to relay the subscription to our # peers because a given node only needs its peers @@ -181,7 +177,6 @@ class Pubsub: # This is necessary because `control` is an optional field in pb2. # Ref: https://developers.google.com/protocol-buffers/docs/reference/python-generated#singular-fields-proto2 # noqa: E501 if rpc_incoming.HasField("control"): - print("!@# continuously_read_stream: control") # Pass rpc to router so router could perform custom logic await self.router.handle_rpc(rpc_incoming, peer_id) From 677531db765b6d44b89853d1dae911819913faf5 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 15:33:07 +0800 Subject: [PATCH 21/33] Fix pubsub tests --- libp2p/pubsub/pubsub.py | 1 - tests/pubsub/test_pubsub.py | 40 +++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b0833eef..7b725d0c 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -156,7 +156,6 @@ class Pubsub: incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - if rpc_incoming.publish: # deal with RPC.publish for msg in rpc_incoming.publish: diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 59df7d7a..34139494 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -1,5 +1,4 @@ import asyncio -import io from typing import NamedTuple import pytest @@ -7,6 +6,7 @@ import pytest from libp2p.exceptions import ValidationError from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 +from libp2p.utils import encode_varint_prefixed from tests.utils import connect from .utils import make_pubsub_msg @@ -238,11 +238,19 @@ class FakeNetStream: def __init__(self) -> None: self._queue = asyncio.Queue() - async def read(self) -> bytes: - buf = io.BytesIO() - while not self._queue.empty(): - buf.write(await self._queue.get()) - return buf.getvalue() + async def read(self, n: int = -1) -> bytes: + buf = bytearray() + # Force to blocking wait if no data available now. + if self._queue.empty(): + first_byte = await self._queue.get() + buf.extend(first_byte) + # If `n == -1`, read until no data is in the buffer(_queue). + # Else, read until no data is in the buffer(_queue) or we have read `n` bytes. + while (n == -1) or (len(buf) < n): + if self._queue.empty(): + break + buf.extend(await self._queue.get()) + return bytes(buf) async def write(self, data: bytes) -> int: for i in data: @@ -278,7 +286,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): async def wait_for_event_occurring(event): try: - await asyncio.wait_for(event.wait(), timeout=0.01) + await asyncio.wait_for(event.wait(), timeout=1) except asyncio.TimeoutError as error: event.clear() raise asyncio.TimeoutError( @@ -295,7 +303,9 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): publish_subscribed_topic = rpc_pb2.RPC( publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])] ) - await stream.write(publish_subscribed_topic.SerializeToString()) + await stream.write( + encode_varint_prefixed(publish_subscribed_topic.SerializeToString()) + ) await wait_for_event_occurring(event_push_msg) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -307,13 +317,15 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): publish_not_subscribed_topic = rpc_pb2.RPC( publish=[rpc_pb2.Message(topicIDs=["NOT_SUBSCRIBED"])] ) - await stream.write(publish_not_subscribed_topic.SerializeToString()) + await stream.write( + encode_varint_prefixed(publish_not_subscribed_topic.SerializeToString()) + ) with pytest.raises(asyncio.TimeoutError): await wait_for_event_occurring(event_push_msg) # Test: `handle_subscription` is called when a subscription message is received. subscription_msg = rpc_pb2.RPC(subscriptions=[rpc_pb2.RPC.SubOpts()]) - await stream.write(subscription_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(subscription_msg.SerializeToString())) await wait_for_event_occurring(event_handle_subscription) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -323,7 +335,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): # Test: `handle_rpc` is called when a control message is received. control_msg = rpc_pb2.RPC(control=rpc_pb2.ControlMessage()) - await stream.write(control_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(control_msg.SerializeToString())) await wait_for_event_occurring(event_handle_rpc) # Make sure the other events are not emitted. with pytest.raises(asyncio.TimeoutError): @@ -405,9 +417,11 @@ async def test_message_all_peers(pubsubs_fsub, monkeypatch): monkeypatch.setattr(pubsubs_fsub[0], "peers", mock_peers) empty_rpc = rpc_pb2.RPC() - await pubsubs_fsub[0].message_all_peers(empty_rpc.SerializeToString()) + empty_rpc_bytes = empty_rpc.SerializeToString() + empty_rpc_bytes_len_prefixed = encode_varint_prefixed(empty_rpc_bytes) + await pubsubs_fsub[0].message_all_peers(empty_rpc_bytes) for stream in mock_peers.values(): - assert (await stream.read()) == empty_rpc.SerializeToString() + assert (await stream.read()) == empty_rpc_bytes_len_prefixed @pytest.mark.parametrize("num_hosts", (1,)) From dddaacad62e6030d8e0ce694b97638431ef1e037 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 16:33:25 +0800 Subject: [PATCH 22/33] Move install script under `tests/interop/go_pkgs` --- .../interop/go_pkgs/install_interop_go_pkgs.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) rename install_interop_go_pkgs.sh => tests/interop/go_pkgs/install_interop_go_pkgs.sh (89%) diff --git a/install_interop_go_pkgs.sh b/tests/interop/go_pkgs/install_interop_go_pkgs.sh similarity index 89% rename from install_interop_go_pkgs.sh rename to tests/interop/go_pkgs/install_interop_go_pkgs.sh index 978e0b0d..b830e865 100755 --- a/install_interop_go_pkgs.sh +++ b/tests/interop/go_pkgs/install_interop_go_pkgs.sh @@ -1,13 +1,14 @@ #!/bin/bash -GO_PKGS_PATH=./tests/interop/go_pkgs +SCRIPT_RELATIVE_PATH=`dirname $0` + +GO_PKGS_PATH=$SCRIPT_RELATIVE_PATH DAEMON_REPO=go-libp2p-daemon DAEMON_PATH=$GO_PKGS_PATH/$DAEMON_REPO EXAMPLES_PATHS=$GO_PKGS_PATH/examples - go version # Install `p2pd` From 46c13ee1c090434ce9d0b6754e8889c05eb16e15 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 16:34:56 +0800 Subject: [PATCH 23/33] Fix CI to call the script in the correct path --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index a4c7a504..f5d470d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ matrix: - export GOPATH=$HOME/go - export GOROOT=/usr/local/go - export PATH=$GOROOT/bin:$GOPATH/bin:$PATH - - ./install_interop_go_pkgs.sh + - ./tests/interop/go_pkgs/install_interop_go_pkgs.sh install: - pip install --upgrade pip From 51137855435f71d7e095b3b2ca9287a78cf535f9 Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:32:43 +0800 Subject: [PATCH 24/33] Update libp2p/pubsub/pubsub.py Co-Authored-By: NIC Lin --- libp2p/pubsub/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7b725d0c..b1812933 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -228,7 +228,7 @@ class Pubsub: # Send hello packet hello = self.get_hello_packet() await stream.write(encode_varint_prefixed(hello.SerializeToString())) - # TODO: Check EOF in the future in the stream's lifetime. + # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. self.router.add_peer(peer_id, stream.get_protocol()) From bd21b2f66fbe9a914c4263d5f6b248947d0c82ef Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:33:29 +0800 Subject: [PATCH 25/33] Update tests/interop/conftest.py Co-Authored-By: NIC Lin --- tests/interop/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index 982e74d1..76b11d68 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -61,7 +61,6 @@ async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory @pytest.fixture def pubsubs(num_hosts, hosts, is_gossipsub): - routers = None if is_gossipsub: routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) else: From 155f523c9f2f553092f8e8e967e6c191b3a02edf Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:33:50 +0800 Subject: [PATCH 26/33] Update tests/interop/daemon.py Co-Authored-By: NIC Lin --- tests/interop/daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 97391892..4c133cd6 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -20,7 +20,7 @@ TIMEOUT_DURATION = 30 async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): """ - Keep running ``coro_func`` until the time is out. + Keep running ``coro_func`` until either it succeed or time is up. All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. """ t_start = time.monotonic() From a843514afb0777079b5900d21460b4be3f474505 Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:35:42 +0800 Subject: [PATCH 27/33] Update tests/interop/daemon.py Co-Authored-By: NIC Lin --- tests/interop/daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 4c133cd6..5b8c7914 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -81,7 +81,7 @@ class P2PDProcess: for head_pattern in lines_head_occurred: if line.startswith(head_pattern): lines_head_occurred[head_pattern] = True - return all([value for _, value in lines_head_occurred.items()]) + return all([value for value in lines_head_occurred.values()]) await try_until_success(read_from_daemon_and_check) # Sleep a little bit to ensure the listener is up after logs are emitted. From b72c489f4e81184c7163cb35744b76fa0321fb70 Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:36:42 +0800 Subject: [PATCH 28/33] Update tests/interop/daemon.py Co-Authored-By: NIC Lin --- tests/interop/daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 5b8c7914..607ec3a5 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -30,7 +30,7 @@ async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): break if (time.monotonic() - t_start) >= timeout: # timeout - assert False, f"{coro_func} still failed after `{timeout}` seconds" + assert False, f"{coro_func} is still failing after `{timeout}` seconds" await asyncio.sleep(0.01) From 51d547ccc5d8592f482e98e246c0aaf59ddbc19b Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 4 Sep 2019 20:38:38 +0800 Subject: [PATCH 29/33] Update tests/interop/utils.py Co-Authored-By: NIC Lin --- tests/interop/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/interop/utils.py b/tests/interop/utils.py index d0506067..c9174179 100644 --- a/tests/interop/utils.py +++ b/tests/interop/utils.py @@ -37,7 +37,7 @@ async def _is_peer(peer_id: ID, node: TDaemonOrHost) -> bool: async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: # Type check err_msg = ( - f"Type of type(a)={type(a)} or type(b)={type(b)} is wrong." + f"Type of a={type(a)} or type of b={type(b)} is wrong." "Should be either `IHost` or `Daemon`" ) assert all( From db0da8083a9c86dc72a65d8349deda848ac8ef56 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 18:25:51 +0800 Subject: [PATCH 30/33] Do `p2pd.close` if not all of them succeed --- tests/interop/conftest.py | 14 +++++++++++--- tests/interop/daemon.py | 9 +++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index 76b11d68..7261ee7b 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -1,5 +1,6 @@ import asyncio import sys +from typing import Union import pexpect import pytest @@ -7,7 +8,7 @@ import pytest from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory from tests.pubsub.configs import GOSSIPSUB_PARAMS -from .daemon import make_p2pd +from .daemon import Daemon, make_p2pd @pytest.fixture @@ -42,7 +43,7 @@ def is_gossipsub(): @pytest.fixture async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory): - p2pds = await asyncio.gather( + p2pds: Union[Daemon, Exception] = await asyncio.gather( *[ make_p2pd( unused_tcp_port_factory(), @@ -51,8 +52,15 @@ async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory is_gossipsub=is_gossipsub, ) for _ in range(num_p2pds) - ] + ], + return_exceptions=True, ) + p2pds_succeeded = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Daemon)) + if len(p2pds_succeeded) != len(p2pds): + # Not all succeeded. Close the succeeded ones and print the failed ones(exceptions). + await asyncio.gather(*[p2pd.close() for p2pd in p2pds_succeeded]) + exceptions = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Exception)) + raise Exception(f"not all p2pds succeed: first exception={exceptions[0]}") try: yield p2pds finally: diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 607ec3a5..754b563c 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -38,6 +38,7 @@ class P2PDProcess: proc: asyncio.subprocess.Process cmd: str = str(P2PD_PATH) args: List[Any] + is_running: bool _tasks: List["asyncio.Future[Any]"] @@ -70,6 +71,8 @@ class P2PDProcess: # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 self.args = args + self.is_running = False + self._tasks = [] async def wait_until_ready(self): @@ -117,8 +120,10 @@ class P2PDProcess: await self.start_printing_logs() async def close(self) -> None: - self.proc.terminate() - await self.proc.wait() + if self.is_running: + self.proc.terminate() + await self.proc.wait() + self.is_running = False for task in self._tasks: task.cancel() From 0e3d4508d6b766d6691634649b44da3574968dee Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 20:49:14 +0800 Subject: [PATCH 31/33] PR feedback - Use `Sequence` instead of `List` - Add note - Remove redundant words in docstring --- libp2p/peer/peerinfo.py | 6 +++--- libp2p/pubsub/gossipsub.py | 6 +++--- tests/interop/daemon.py | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index 2f41a6c0..069a67dd 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Sequence import multiaddr @@ -10,9 +10,9 @@ class PeerInfo: peer_id: ID addrs: List[multiaddr.Multiaddr] - def __init__(self, peer_id: ID, addrs: List[multiaddr.Multiaddr]) -> None: + def __init__(self, peer_id: ID, addrs: Sequence[multiaddr.Multiaddr]) -> None: self.peer_id = peer_id - self.addrs = addrs + self.addrs = list(addrs) def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f2b45217..267bb81e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -116,9 +116,9 @@ class GossipSub(IPubsubRouter): self.peers_floodsub.append(peer_id) else: # We should never enter here. Becuase the `protocol_id` is registered by your pubsub - # instance in multistream-select, but it is not the protocol that gossipsub supports, - # what we check above. In this case, probably we registered gossipsub to a wrong - # `protocol_id` in multistream-select, or wrong versions. + # instance in multistream-select, but it is not the protocol that gossipsub supports. + # In this case, probably we registered gossipsub to a wrong `protocol_id` + # in multistream-select, or wrong versions. # TODO: Better handling raise Exception(f"protocol is not supported: protocol_id={protocol_id}") diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index 754b563c..e9c07e8f 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -183,6 +183,7 @@ async def make_p2pd( for maddr in maddrs: try: ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4) + # NOTE: Check if this `maddr` uses `tcp`. maddr.value_for_protocol(multiaddr.protocols.P_TCP) except multiaddr.exceptions.ProtocolLookupError: continue From 34b489af25944c99fa7801aeab8362205762f211 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 21:37:33 +0800 Subject: [PATCH 32/33] Fix kad_peerinfo according to peerinfo --- libp2p/kademlia/kad_peerinfo.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index 9fab8dc7..346f6714 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -1,11 +1,11 @@ import heapq from operator import itemgetter import random +from typing import List from multiaddr import Multiaddr from libp2p.peer.id import ID -from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import PeerInfo from .utils import digest @@ -15,16 +15,16 @@ P_UDP = "udp" class KadPeerInfo(PeerInfo): - def __init__(self, peer_id, peer_data=None): - super(KadPeerInfo, self).__init__(peer_id, peer_data) + def __init__(self, peer_id, addrs): + super(KadPeerInfo, self).__init__(peer_id, addrs) self.peer_id_bytes = peer_id.to_bytes() self.xor_id = peer_id.xor_id - self.addrs = peer_data.get_addrs() if peer_data else None + self.addrs = addrs - self.ip = self.addrs[0].value_for_protocol(P_IP) if peer_data else None - self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if peer_data else None + self.ip = self.addrs[0].value_for_protocol(P_IP) if addrs else None + self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if addrs else None def same_home_as(self, node): return sorted(self.addrs) == sorted(node.addrs) @@ -142,14 +142,14 @@ def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None): node_id = ( ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255))) ) - peer_data = None + addrs: List[Multiaddr] if sender_ip and sender_port: - peer_data = PeerData() - addr = [ + addrs = [ Multiaddr( "/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port) ) ] - peer_data.add_addrs(addr) + else: + addrs = [] - return KadPeerInfo(node_id, peer_data) + return KadPeerInfo(node_id, addrs) From 1f3c9af45be9a341ec4d54d2b9242865bb3fa16a Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 4 Sep 2019 22:19:11 +0800 Subject: [PATCH 33/33] Add the missing `is_proc_running=True` --- tests/interop/daemon.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/interop/daemon.py b/tests/interop/daemon.py index e9c07e8f..97356845 100644 --- a/tests/interop/daemon.py +++ b/tests/interop/daemon.py @@ -38,7 +38,7 @@ class P2PDProcess: proc: asyncio.subprocess.Process cmd: str = str(P2PD_PATH) args: List[Any] - is_running: bool + is_proc_running: bool _tasks: List["asyncio.Future[Any]"] @@ -71,7 +71,7 @@ class P2PDProcess: # - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second # Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501 self.args = args - self.is_running = False + self.is_proc_running = False self._tasks = [] @@ -109,6 +109,8 @@ class P2PDProcess: await asyncio.sleep(0) async def start(self) -> None: + if self.is_proc_running: + return self.proc = await asyncio.subprocess.create_subprocess_exec( self.cmd, *self.args, @@ -116,14 +118,15 @@ class P2PDProcess: stderr=asyncio.subprocess.PIPE, bufsize=0, ) + self.is_proc_running = True await self.wait_until_ready() await self.start_printing_logs() async def close(self) -> None: - if self.is_running: + if self.is_proc_running: self.proc.terminate() await self.proc.wait() - self.is_running = False + self.is_proc_running = False for task in self._tasks: task.cancel()