delete old interop, turn on with placeholders, add py312 and py313 to CI testing

This commit is contained in:
pacrob
2025-05-08 12:57:24 -06:00
committed by Paul Robinson
parent 4c02c4ea02
commit fd893afba6
20 changed files with 24 additions and 2107 deletions

View File

@ -17,7 +17,7 @@ jobs:
strategy:
matrix:
python: ['3.9', '3.10', '3.11', '3.12', '3.13']
toxenv: [core, lint, wheel, demos]
toxenv: [core, interop, lint, wheel, demos]
include:
- python: '3.10'
toxenv: docs
@ -46,7 +46,7 @@ jobs:
runs-on: windows-latest
strategy:
matrix:
python-version: ['3.11'] # Using a stable Python version for Windows testing
python-version: ['3.11', '3.12', '3.13']
toxenv: [core, wheel]
fail-fast: false
steps:

View File

@ -38,8 +38,7 @@ lint:
)
test:
# remove core specification once interop tests pass
python -m pytest tests/core
python -m pytest tests
# protobufs management

View File

@ -212,9 +212,6 @@ We can run all tests with:
make test
At this time, the interop tests are not passing. You can run just the internal tests
with ``pytest tests/core``.
Code Style
~~~~~~~~~~

View File

@ -0,0 +1 @@
Removes old interop tests, creates placeholders for new ones, and turns on interop testing in CI.

View File

@ -1,23 +1,6 @@
import pytest
from tests.utils.factories import (
HostFactory,
)
@pytest.fixture
def security_protocol():
return None
@pytest.fixture
def num_hosts():
return 3
@pytest.fixture
async def hosts(num_hosts, security_protocol, nursery):
async with HostFactory.create_batch_and_listen(
num_hosts, security_protocol=security_protocol
) as _hosts:
yield _hosts

View File

@ -1,161 +0,0 @@
from contextlib import (
AsyncExitStack,
)
import pytest
import anyio
from p2pclient.datastructures import (
StreamInfo,
)
from p2pclient.utils import (
get_unused_tcp_port,
)
import trio
from libp2p.io.abc import (
ReadWriteCloser,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.secio.transport import ID as SECIO_PROTOCOL_ID
from tests.utils.factories import (
HostFactory,
PubsubFactory,
)
from tests.utils.interop.daemon import (
make_p2pd,
)
from tests.utils.interop.utils import (
connect,
)
@pytest.fixture(params=[NOISE_PROTOCOL_ID, SECIO_PROTOCOL_ID])
def security_protocol(request):
return request.param
@pytest.fixture
def num_p2pds():
return 1
@pytest.fixture
def is_gossipsub():
return True
@pytest.fixture
def is_pubsub_signing():
return True
@pytest.fixture
def is_pubsub_signing_strict():
return True
@pytest.fixture
async def p2pds(
num_p2pds,
security_protocol,
is_gossipsub,
is_pubsub_signing,
is_pubsub_signing_strict,
):
async with AsyncExitStack() as stack:
p2pds = [
await stack.enter_async_context(
make_p2pd(
get_unused_tcp_port(),
get_unused_tcp_port(),
security_protocol,
is_gossipsub=is_gossipsub,
is_pubsub_signing=is_pubsub_signing,
is_pubsub_signing_strict=is_pubsub_signing_strict,
)
)
for _ in range(num_p2pds)
]
try:
yield p2pds
finally:
for p2pd in p2pds:
await p2pd.close()
@pytest.fixture
async def pubsubs(num_hosts, security_protocol, is_gossipsub, is_pubsub_signing_strict):
if is_gossipsub:
yield PubsubFactory.create_batch_with_gossipsub(
num_hosts,
security_protocol=security_protocol,
strict_signing=is_pubsub_signing_strict,
)
else:
yield PubsubFactory.create_batch_with_floodsub(
num_hosts, security_protocol, strict_signing=is_pubsub_signing_strict
)
class DaemonStream(ReadWriteCloser):
stream_info: StreamInfo
stream: anyio.abc.SocketStream
def __init__(self, stream_info: StreamInfo, stream: anyio.abc.SocketStream) -> None:
self.stream_info = stream_info
self.stream = stream
async def close(self) -> None:
await self.stream.close()
async def read(self, n: int = None) -> bytes:
return await self.stream.receive_some(n)
async def write(self, data: bytes) -> None:
return await self.stream.send_all(data)
@pytest.fixture
async def is_to_fail_daemon_stream():
return False
@pytest.fixture
async def py_to_daemon_stream_pair(p2pds, security_protocol, is_to_fail_daemon_stream):
async with HostFactory.create_batch_and_listen(
1, security_protocol=security_protocol
) as hosts:
assert len(p2pds) >= 1
host = hosts[0]
p2pd = p2pds[0]
protocol_id = "/protocol/id/123"
stream_py = None
stream_daemon = None
event_stream_handled = trio.Event()
await connect(host, p2pd)
async def daemon_stream_handler(stream_info, stream):
nonlocal stream_daemon
stream_daemon = DaemonStream(stream_info, stream)
event_stream_handled.set()
await trio.lowlevel.checkpoint()
await p2pd.control.stream_handler(protocol_id, daemon_stream_handler)
# Sleep for a while to wait for the handler being registered.
await trio.sleep(0.01)
if is_to_fail_daemon_stream:
# FIXME: This is a workaround to make daemon reset the stream.
# We intentionally close the listener on the python side, it makes the
# connection from daemon to us fail, and therefore the daemon resets the
# opened stream on their side.
# Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501
# We need it because we want to test against `stream_py` after the remote
# side(daemon) is reset. This should be removed after the API
# `stream.reset` is exposed in daemon some day.
await p2pds[0].control.control.close()
stream_py = await host.new_stream(p2pd.peer_id, [protocol_id])
if not is_to_fail_daemon_stream:
await event_stream_handled.wait()
# NOTE: If `is_to_fail_daemon_stream == True`, then `stream_daemon == None`.
yield stream_py, stream_daemon

View File

@ -0,0 +1,5 @@
def test_go_libp2p_placeholder():
"""
Placeholder test for go-libp2p interop tests.
"""
assert True, "Placeholder test for go-libp2p interop tests"

View File

@ -1 +0,0 @@
Copied and modified from https://github.com/libp2p/go-libp2p-examples.

View File

@ -1,137 +0,0 @@
package main
import (
"bufio"
"context"
"flag"
"fmt"
"io/ioutil"
"log"
utils "interop/utils"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
golog "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
)
func main() {
// LibP2P code uses golog to log messages. They log with different
// string IDs (i.e. "swarm"). We can control the verbosity level for
// all loggers with:
golog.SetAllLoggers(golog.LevelDebug) // Change to DEBUG for extra info
// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial")
protocolID := flag.String("security", "", "security protocol used for secure channel")
seed := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}
// Make a host that listens on the given multiaddress
ha, err := utils.MakeBasicHost(*listenF, *protocolID, *seed)
if err != nil {
log.Fatal(err)
}
// Set a stream handler on host A. /echo/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
log.Println("Got a new stream!")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
if *target == "" {
log.Println("listening for connections")
select {} // hang forever
}
/**** This is where the listener code ends ****/
// The following code extracts target's the peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
log.Println("!@# targetAddr=", targetAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
ha.Peerstore().AddAddr(peerid, targetAddr, peerstore.PermanentAddrTTL)
log.Println("!@# ipfsaddr=", ipfsaddr)
pinfo, err := peer.AddrInfoFromP2pAddr(ipfsaddr)
if err != nil {
log.Fatalf("failed to parse %v to pinfo\n", ipfsaddr)
}
err = ha.Connect(context.Background(), *pinfo)
if err != nil {
panic(err)
}
log.Println("connect with peer", *pinfo)
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /echo/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0")
if err != nil {
log.Fatalln(err)
}
log.Println("opened stream")
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Fatalln(err)
}
out, err := ioutil.ReadAll(s)
if err != nil {
log.Fatalln(err)
}
log.Printf("read reply: %q\n", out)
}
// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}
log.Printf("read: %s\n", str)
_, err = s.Write([]byte(str))
return err
}

View File

@ -1,12 +0,0 @@
module interop
go 1.12
require (
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-libp2p v0.27.8
github.com/libp2p/go-libp2p-core v0.19.0
github.com/libp2p/go-libp2p-noise v0.3.0
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/multiformats/go-multiaddr v0.9.0
)

File diff suppressed because it is too large Load Diff

View File

@ -1,82 +0,0 @@
package utils
import (
"context"
"crypto/rand"
"fmt"
"io"
"log"
mrand "math/rand"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
plaintext "github.com/libp2p/go-libp2p-core/sec/insecure"
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
ma "github.com/multiformats/go-multiaddr"
)
// MakeBasicHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It won't encrypt the connection if insecure is true.
func MakeBasicHost(listenPort int, protocolID string, randseed int64) (host.Host, error) {
// If the seed is zero, use real cryptographic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// Generate a key pair for this host. We will use it at least
// to obtain a valid host ID.
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
libp2p.Identity(priv),
libp2p.DisableRelay(),
}
if protocolID == plaintext.ID {
opts = append(opts, libp2p.NoSecurity)
} else if protocolID == noise.ID {
tpt, err := noise.New(priv)
if err != nil {
return nil, err
}
opts = append(opts, libp2p.Security(protocolID, tpt))
} else if protocolID == secio.ID {
tpt, err := secio.New(priv)
if err != nil {
return nil, err
}
opts = append(opts, libp2p.Security(protocolID, tpt))
} else {
return nil, fmt.Errorf("security protocolID '%s' is not supported", protocolID)
}
basicHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
addr := basicHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
log.Printf("Now run \"./echo -l %d -d %s -security %s\" on a different terminal\n", listenPort+1, fullAddr, protocolID)
return basicHost, nil
}

View File

@ -1,32 +0,0 @@
#!/bin/bash
SCRIPT_RELATIVE_PATH=`dirname $0`
GO_PKGS_PATH=$SCRIPT_RELATIVE_PATH
DAEMON_REPO=go-libp2p-daemon
DAEMON_BRANCH=v0.2.4
DAEMON_PATH=$GO_PKGS_PATH/$DAEMON_REPO
EXAMPLES_PATHS=$GO_PKGS_PATH/examples
go version
# Install `p2pd`
# FIXME: Use the canonical repo in libp2p, when we don't need `insecure`.
if [ ! -e "$DAEMON_PATH" ]; then
git clone https://github.com/libp2p/$DAEMON_REPO.git --branch $DAEMON_BRANCH $DAEMON_PATH
if [ "$?" != 0 ]; then
echo "Failed to clone the daemon repo"
exit 1
fi
fi
cd $DAEMON_PATH && go install ./...
cd -
# Install example modeuls
cd $EXAMPLES_PATHS && go install ./...
echo "Finish installing go modules for interop."

View File

@ -0,0 +1,5 @@
def test_js_libp2p_placeholder():
"""
Placeholder test for js-libp2p interop tests.
"""
assert True, "Placeholder test for js-libp2p interop tests"

View File

@ -0,0 +1,5 @@
def test_rust_libp2p_placeholder():
"""
Placeholder test for rust-libp2p interop tests.
"""
assert True, "Placeholder test for rust-libp2p interop tests"

View File

@ -1,32 +0,0 @@
import pytest
import trio
from tests.utils.factories import (
HostFactory,
)
from tests.utils.interop.utils import (
connect,
)
@pytest.mark.trio
async def test_connect(security_protocol, p2pds):
async with HostFactory.create_batch_and_listen(
1, security_protocol=security_protocol
) as hosts:
p2pd = p2pds[0]
host = hosts[0]
assert len(await p2pd.control.list_peers()) == 0
# Test: connect from Py
await connect(host, p2pd)
assert len(await p2pd.control.list_peers()) == 1
# Test: `disconnect` from Py
await host.disconnect(p2pd.peer_id)
assert len(await p2pd.control.list_peers()) == 0
# Test: connect from Go
await connect(p2pd, host)
assert len(host.get_network().connections) == 1
# Test: `disconnect` from Go
await p2pd.control.disconnect(host.get_id())
await trio.sleep(0.01)
assert len(host.get_network().connections) == 0

View File

@ -1,116 +0,0 @@
import re
import pytest
from multiaddr import (
Multiaddr,
)
from p2pclient.utils import (
get_unused_tcp_port,
)
import trio
from libp2p.custom_types import (
TProtocol,
)
from libp2p.peer.peerinfo import (
PeerInfo,
info_from_p2p_addr,
)
from tests.utils.factories import (
HostFactory,
)
from tests.utils.interop.envs import (
GO_BIN_PATH,
)
from tests.utils.interop.process import (
BaseInteractiveProcess,
)
ECHO_PATH = GO_BIN_PATH / "echo"
ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0")
class EchoProcess(BaseInteractiveProcess):
port: int
_peer_info: PeerInfo
def __init__(
self, port: int, security_protocol: TProtocol, destination: Multiaddr = None
) -> None:
args = [f"-l={port}", f"-security={security_protocol}"]
if destination is not None:
args.append(f"-d={str(destination)}")
patterns = [b"I am"]
if destination is None:
patterns.append(b"listening for connections")
self.args = args
self.cmd = str(ECHO_PATH)
self.patterns = patterns
self.bytes_read = bytearray()
self.event_ready = trio.Event()
self.port = port
self._peer_info = None
self.regex_pat = re.compile(rb"I am ([\w\./]+)")
@property
def peer_info(self) -> None:
if self._peer_info is not None:
return self._peer_info
if not self.event_ready.is_set():
raise Exception("process is not ready yet. failed to parse the peer info")
# Example:
# b"I am /ip4/127.0.0.1/tcp/56171/ipfs/QmU41TRPs34WWqa1brJEojBLYZKrrBcJq9nyNfVvSrbZUJ\n" # noqa: E501
m = re.search(rb"I am ([\w\./]+)", self.bytes_read)
if m is None:
raise Exception("failed to find the pattern for the listening multiaddr")
maddr_bytes_str_ipfs = m.group(1)
maddr_str = maddr_bytes_str_ipfs.decode().replace("ipfs", "p2p")
maddr = Multiaddr(maddr_str)
self._peer_info = info_from_p2p_addr(maddr)
return self._peer_info
@pytest.mark.trio
async def test_insecure_conn_py_to_go(security_protocol):
async with HostFactory.create_batch_and_listen(
1, security_protocol=security_protocol
) as hosts:
go_proc = EchoProcess(get_unused_tcp_port(), security_protocol)
await go_proc.start()
host = hosts[0]
peer_info = go_proc.peer_info
await host.connect(peer_info)
s = await host.new_stream(peer_info.peer_id, [ECHO_PROTOCOL_ID])
data = "data321123\n"
await s.write(data.encode())
echoed_resp = await s.read(len(data))
assert echoed_resp.decode() == data
await s.close()
@pytest.mark.trio
async def test_insecure_conn_go_to_py(security_protocol):
async with HostFactory.create_batch_and_listen(
1, security_protocol=security_protocol
) as hosts:
host = hosts[0]
expected_data = "Hello, world!\n"
reply_data = "Replyooo!\n"
event_handler_finished = trio.Event()
async def _handle_echo(stream):
read_data = await stream.read(len(expected_data))
assert read_data == expected_data.encode()
await stream.write(reply_data.encode())
await stream.close()
event_handler_finished.set()
host.set_stream_handler(ECHO_PROTOCOL_ID, _handle_echo)
py_maddr = host.get_addrs()[0]
go_proc = EchoProcess(get_unused_tcp_port(), security_protocol, py_maddr)
await go_proc.start()
await event_handler_finished.wait()

View File

@ -1,81 +0,0 @@
import pytest
import trio
from libp2p.network.stream.exceptions import (
StreamClosed,
StreamEOF,
StreamReset,
)
from libp2p.tools.constants import (
MAX_READ_LEN,
)
DATA = b"data"
@pytest.mark.trio
async def test_net_stream_read_write(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair
assert (
stream_py.protocol_id is not None
and stream_py.protocol_id == stream_daemon.stream_info.proto
)
await stream_py.write(DATA)
assert (await stream_daemon.read(MAX_READ_LEN)) == DATA
@pytest.mark.trio
async def test_net_stream_read_after_remote_closed(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair
await stream_daemon.write(DATA)
await stream_daemon.close()
await trio.sleep(0.01)
assert (await stream_py.read(MAX_READ_LEN)) == DATA
# EOF
with pytest.raises(StreamEOF):
await stream_py.read(MAX_READ_LEN)
@pytest.mark.trio
async def test_net_stream_read_after_local_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair
await stream_py.reset()
with pytest.raises(StreamReset):
await stream_py.read(MAX_READ_LEN)
@pytest.mark.parametrize("is_to_fail_daemon_stream", (True,))
@pytest.mark.trio
@pytest.mark.skip
async def test_net_stream_read_after_remote_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair
await trio.sleep(0.01)
with pytest.raises(StreamReset):
await stream_py.read(MAX_READ_LEN)
@pytest.mark.trio
async def test_net_stream_write_after_local_closed(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair
await stream_py.write(DATA)
await stream_py.close()
with pytest.raises(StreamClosed):
await stream_py.write(DATA)
@pytest.mark.trio
async def test_net_stream_write_after_local_reset(py_to_daemon_stream_pair, p2pds):
stream_py, stream_daemon = py_to_daemon_stream_pair
await stream_py.reset()
with pytest.raises(StreamClosed):
await stream_py.write(DATA)
@pytest.mark.parametrize("is_to_fail_daemon_stream", (True,))
@pytest.mark.trio
@pytest.mark.skip
async def test_net_stream_write_after_remote_reset(py_to_daemon_stream_pair, p2pds):
stream_py, _ = py_to_daemon_stream_pair
await trio.sleep(0.01)
with pytest.raises(StreamClosed):
await stream_py.write(DATA)

View File

@ -1,185 +0,0 @@
import functools
import math
import pytest
from p2pclient.pb import (
p2pd_pb2,
)
import trio
from libp2p.io.trio import (
TrioTCPStream,
)
from libp2p.peer.id import (
ID,
)
from libp2p.pubsub.pb import (
rpc_pb2,
)
from libp2p.pubsub.subscription import (
TrioSubscriptionAPI,
)
from libp2p.utils import (
read_varint_prefixed_bytes,
)
from tests.utils.factories import (
PubsubFactory,
)
from tests.utils.interop.utils import (
connect,
)
TOPIC_0 = "ABALA"
TOPIC_1 = "YOOOO"
async def p2pd_subscribe(p2pd, topic, nursery):
stream = TrioTCPStream(await p2pd.control.pubsub_subscribe(topic))
send_channel, receive_channel = trio.open_memory_channel(math.inf)
sub = TrioSubscriptionAPI(receive_channel, unsubscribe_fn=stream.close)
async def _read_pubsub_msg() -> None:
while True:
msg_bytes = await read_varint_prefixed_bytes(stream)
ps_msg = p2pd_pb2.PSMessage()
ps_msg.ParseFromString(msg_bytes)
# Fill in the message used in py-libp2p
msg = rpc_pb2.Message(
from_id=ps_msg.from_id,
data=ps_msg.data,
seqno=ps_msg.seqno,
topicIDs=ps_msg.topicIDs,
signature=ps_msg.signature,
key=ps_msg.key,
)
await send_channel.send(msg)
nursery.start_soon(_read_pubsub_msg)
return sub
def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> None:
assert msg.data == data and msg.from_id == from_peer_id
@pytest.mark.parametrize(
"is_pubsub_signing, is_pubsub_signing_strict", ((True, True), (False, False))
)
@pytest.mark.parametrize("is_gossipsub", (True, False))
@pytest.mark.parametrize("num_p2pds", (2,))
@pytest.mark.trio
async def test_pubsub(
p2pds, is_gossipsub, security_protocol, is_pubsub_signing_strict, nursery
):
pubsub_factory = None
if is_gossipsub:
pubsub_factory = PubsubFactory.create_batch_with_gossipsub
else:
pubsub_factory = PubsubFactory.create_batch_with_floodsub
async with pubsub_factory(
1, security_protocol=security_protocol, strict_signing=is_pubsub_signing_strict
) as pubsubs:
#
# Test: Recognize pubsub peers on connection.
#
py_pubsub = pubsubs[0]
# go0 <-> py <-> go1
await connect(p2pds[0], py_pubsub.host)
await connect(py_pubsub.host, p2pds[1])
py_peer_id = py_pubsub.host.get_id()
# Check pubsub peers
pubsub_peers_0 = await p2pds[0].control.pubsub_list_peers("")
assert len(pubsub_peers_0) == 1 and pubsub_peers_0[0] == py_peer_id
pubsub_peers_1 = await p2pds[1].control.pubsub_list_peers("")
assert len(pubsub_peers_1) == 1 and pubsub_peers_1[0] == py_peer_id
assert (
len(py_pubsub.peers) == 2
and p2pds[0].peer_id in py_pubsub.peers
and p2pds[1].peer_id in py_pubsub.peers
)
#
# Test: `subscribe`.
#
# (name, topics)
# (go_0, [0, 1]) <-> (py, [0, 1]) <-> (go_1, [1])
sub_py_topic_0 = await py_pubsub.subscribe(TOPIC_0)
sub_py_topic_1 = await py_pubsub.subscribe(TOPIC_1)
sub_go_0_topic_0 = await p2pd_subscribe(p2pds[0], TOPIC_0, nursery)
sub_go_0_topic_1 = await p2pd_subscribe(p2pds[0], TOPIC_1, nursery)
sub_go_1_topic_1 = await p2pd_subscribe(p2pds[1], TOPIC_1, nursery)
# Check topic peers
await trio.sleep(0.1)
# go_0
go_0_topic_0_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_0)
assert len(go_0_topic_0_peers) == 1 and py_peer_id == go_0_topic_0_peers[0]
go_0_topic_1_peers = await p2pds[0].control.pubsub_list_peers(TOPIC_1)
assert len(go_0_topic_1_peers) == 1 and py_peer_id == go_0_topic_1_peers[0]
# py
py_topic_0_peers = list(py_pubsub.peer_topics[TOPIC_0])
assert len(py_topic_0_peers) == 1 and p2pds[0].peer_id == py_topic_0_peers[0]
# go_1
go_1_topic_1_peers = await p2pds[1].control.pubsub_list_peers(TOPIC_1)
assert len(go_1_topic_1_peers) == 1 and py_peer_id == go_1_topic_1_peers[0]
#
# Test: `publish`
#
# 1. py publishes
# - 1.1. py publishes data_11 to topic_0, py and go_0 receives.
# - 1.2. py publishes data_12 to topic_1, all receive.
# 2. go publishes
# - 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive.
# - 2.2. go_1 publishes data_22 to topic_1, all receive.
# 1.1. py publishes data_11 to topic_0, py and go_0 receives.
data_11 = b"data_11"
await py_pubsub.publish(TOPIC_0, data_11)
validate_11 = functools.partial(
validate_pubsub_msg, data=data_11, from_peer_id=py_peer_id
)
validate_11(await sub_py_topic_0.get())
validate_11(await sub_go_0_topic_0.get())
# 1.2. py publishes data_12 to topic_1, all receive.
data_12 = b"data_12"
validate_12 = functools.partial(
validate_pubsub_msg, data=data_12, from_peer_id=py_peer_id
)
await py_pubsub.publish(TOPIC_1, data_12)
validate_12(await sub_py_topic_1.get())
validate_12(await sub_go_0_topic_1.get())
validate_12(await sub_go_1_topic_1.get())
# 2.1. go_0 publishes data_21 to topic_0, py and go_0 receive.
data_21 = b"data_21"
validate_21 = functools.partial(
validate_pubsub_msg, data=data_21, from_peer_id=p2pds[0].peer_id
)
await p2pds[0].control.pubsub_publish(TOPIC_0, data_21)
validate_21(await sub_py_topic_0.get())
validate_21(await sub_go_0_topic_0.get())
# 2.2. go_1 publishes data_22 to topic_1, all receive.
data_22 = b"data_22"
validate_22 = functools.partial(
validate_pubsub_msg, data=data_22, from_peer_id=p2pds[1].peer_id
)
await p2pds[1].control.pubsub_publish(TOPIC_1, data_22)
validate_22(await sub_py_topic_1.get())
validate_22(await sub_go_0_topic_1.get())
validate_22(await sub_go_1_topic_1.get())
#
# Test: `unsubscribe` and re`subscribe`
#
await py_pubsub.unsubscribe(TOPIC_0)
await trio.sleep(0.1)
assert py_peer_id not in (await p2pds[0].control.pubsub_list_peers(TOPIC_0))
assert py_peer_id not in (await p2pds[1].control.pubsub_list_peers(TOPIC_0))
await py_pubsub.subscribe(TOPIC_0)
await trio.sleep(0.1)
assert py_peer_id in (await p2pds[0].control.pubsub_list_peers(TOPIC_0))
assert py_peer_id in (await p2pds[1].control.pubsub_list_peers(TOPIC_0))

View File

@ -0,0 +1,5 @@
def test_zig_libp2p_placeholder():
"""
Placeholder test for zig-libp2p interop tests.
"""
assert True, "Placeholder test for zig-libp2p interop tests"