mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Merge branch 'main' into main
This commit is contained in:
@ -15,6 +15,7 @@ from tests.utils.factories import (
|
||||
PubsubFactory,
|
||||
)
|
||||
from tests.utils.pubsub.utils import (
|
||||
connect_some,
|
||||
dense_connect,
|
||||
one_to_all_connect,
|
||||
sparse_connect,
|
||||
@ -134,7 +135,7 @@ async def test_handle_graft(monkeypatch):
|
||||
# check if it is called in `handle_graft`
|
||||
event_emit_prune = trio.Event()
|
||||
|
||||
async def emit_prune(topic, sender_peer_id):
|
||||
async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe):
|
||||
event_emit_prune.set()
|
||||
await trio.lowlevel.checkpoint()
|
||||
|
||||
@ -193,7 +194,7 @@ async def test_handle_prune():
|
||||
|
||||
# alice emit prune message to bob, alice should be removed
|
||||
# from bob's mesh peer
|
||||
await gossipsubs[index_alice].emit_prune(topic, id_bob)
|
||||
await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False)
|
||||
# `emit_prune` does not remove bob from alice's mesh peers
|
||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||
|
||||
@ -292,7 +293,9 @@ async def test_fanout():
|
||||
@pytest.mark.trio
|
||||
@pytest.mark.slow
|
||||
async def test_fanout_maintenance():
|
||||
async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub:
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
10, unsubscribe_back_off=1
|
||||
) as pubsubs_gsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||
num_msgs = 5
|
||||
|
||||
@ -588,3 +591,166 @@ async def test_sparse_connect():
|
||||
f"received the message. Ideally all nodes should receive it, but at "
|
||||
f"minimum {min_required} required for sparse network scalability."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_connect_some_with_fewer_hosts_than_degree():
|
||||
"""Test connect_some when there are fewer hosts than degree."""
|
||||
# Create 3 hosts with degree=5
|
||||
async with PubsubFactory.create_batch_with_floodsub(3) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
degree = 5
|
||||
|
||||
await connect_some(hosts, degree)
|
||||
await trio.sleep(0.1) # Allow connections to establish
|
||||
|
||||
# Each host should connect to all other hosts (since there are only 2 others)
|
||||
for i, pubsub in enumerate(pubsubs_fsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
expected_max_connections = len(hosts) - 1 # All others
|
||||
assert connected_peers <= expected_max_connections, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"but can only connect to {expected_max_connections} others"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_connect_some_degree_limit_enforced():
|
||||
"""Test that connect_some enforces degree limits and creates expected topology."""
|
||||
# Test with small network where we can verify exact behavior
|
||||
async with PubsubFactory.create_batch_with_floodsub(6) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
degree = 2
|
||||
|
||||
await connect_some(hosts, degree)
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# With 6 hosts and degree=2, expected connections:
|
||||
# Host 0 → connects to hosts 1,2 (2 peers total)
|
||||
# Host 1 → connects to hosts 2,3 (3 peers: 0,2,3)
|
||||
# Host 2 → connects to hosts 3,4 (4 peers: 0,1,3,4)
|
||||
# Host 3 → connects to hosts 4,5 (3 peers: 1,2,4,5) - wait, that's 4!
|
||||
# Host 4 → connects to host 5 (3 peers: 2,3,5)
|
||||
# Host 5 → (2 peers: 3,4)
|
||||
|
||||
peer_counts = [len(pubsub.peers) for pubsub in pubsubs_fsub]
|
||||
|
||||
# First and last hosts should have exactly degree connections
|
||||
assert peer_counts[0] == degree, (
|
||||
f"Host 0 should have {degree} peers, got {peer_counts[0]}"
|
||||
)
|
||||
assert peer_counts[-1] <= degree, (
|
||||
f"Last host should have ≤ {degree} peers, got {peer_counts[-1]}"
|
||||
)
|
||||
|
||||
# Middle hosts may have more due to bidirectional connections
|
||||
# but the pattern should be consistent with degree limit
|
||||
total_connections = sum(peer_counts)
|
||||
|
||||
# Should be less than full mesh (each host connected to all others)
|
||||
full_mesh_connections = len(hosts) * (len(hosts) - 1)
|
||||
assert total_connections < full_mesh_connections, (
|
||||
f"Got {total_connections} total connections, "
|
||||
f"but full mesh would be {full_mesh_connections}"
|
||||
)
|
||||
|
||||
# Should be more than just a chain (each host connected to next only)
|
||||
chain_connections = 2 * (len(hosts) - 1) # bidirectional chain
|
||||
assert total_connections > chain_connections, (
|
||||
f"Got {total_connections} total connections, which is too few "
|
||||
f"(chain would be {chain_connections})"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_connect_some_degree_zero():
|
||||
"""Test edge case: degree=0 should result in no connections."""
|
||||
# Create 5 hosts with degree=0
|
||||
async with PubsubFactory.create_batch_with_floodsub(5) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
degree = 0
|
||||
|
||||
await connect_some(hosts, degree)
|
||||
await trio.sleep(0.1) # Allow any potential connections to establish
|
||||
|
||||
# Verify no connections were made
|
||||
for i, pubsub in enumerate(pubsubs_fsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
assert connected_peers == 0, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"but degree=0 should result in no connections"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_connect_some_negative_degree():
|
||||
"""Test edge case: negative degree should be handled gracefully."""
|
||||
# Create 5 hosts with degree=-1
|
||||
async with PubsubFactory.create_batch_with_floodsub(5) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
degree = -1
|
||||
|
||||
await connect_some(hosts, degree)
|
||||
await trio.sleep(0.1) # Allow any potential connections to establish
|
||||
|
||||
# Verify no connections were made (negative degree should behave like 0)
|
||||
for i, pubsub in enumerate(pubsubs_fsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
assert connected_peers == 0, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"but negative degree should result in no connections"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_sparse_connect_degree_zero():
|
||||
"""Test sparse_connect with degree=0."""
|
||||
async with PubsubFactory.create_batch_with_floodsub(8) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
degree = 0
|
||||
|
||||
await sparse_connect(hosts, degree)
|
||||
await trio.sleep(0.1) # Allow connections to establish
|
||||
|
||||
# With degree=0, sparse_connect should still create neighbor connections
|
||||
# for connectivity (this is part of the algorithm design)
|
||||
for i, pubsub in enumerate(pubsubs_fsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
# Should have some connections due to neighbor connectivity
|
||||
# (each node connects to immediate neighbors)
|
||||
expected_neighbors = 2 # previous and next in ring
|
||||
assert connected_peers >= expected_neighbors, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"expected at least {expected_neighbors} neighbor connections"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_empty_host_list():
|
||||
"""Test edge case: empty host list should be handled gracefully."""
|
||||
hosts = []
|
||||
|
||||
# All functions should handle empty lists gracefully
|
||||
await connect_some(hosts, 5)
|
||||
await sparse_connect(hosts, 3)
|
||||
await dense_connect(hosts)
|
||||
|
||||
# If we reach here without exceptions, the test passes
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_single_host():
|
||||
"""Test edge case: single host should be handled gracefully."""
|
||||
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_fsub]
|
||||
|
||||
# All functions should handle single host gracefully
|
||||
await connect_some(hosts, 5)
|
||||
await sparse_connect(hosts, 3)
|
||||
await dense_connect(hosts)
|
||||
|
||||
# Single host should have no connections
|
||||
connected_peers = len(pubsubs_fsub[0].peers)
|
||||
assert connected_peers == 0, (
|
||||
f"Single host has {connected_peers} connections, expected 0"
|
||||
)
|
||||
|
||||
274
tests/core/pubsub/test_gossipsub_px_and_backoff.py
Normal file
274
tests/core/pubsub/test_gossipsub_px_and_backoff.py
Normal file
@ -0,0 +1,274 @@
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p.pubsub.gossipsub import (
|
||||
GossipSub,
|
||||
)
|
||||
from libp2p.tools.utils import (
|
||||
connect,
|
||||
)
|
||||
from tests.utils.factories import (
|
||||
PubsubFactory,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_prune_backoff():
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
2, heartbeat_interval=0.5, prune_back_off=2
|
||||
) as pubsubs:
|
||||
gsub0 = pubsubs[0].router
|
||||
gsub1 = pubsubs[1].router
|
||||
assert isinstance(gsub0, GossipSub)
|
||||
assert isinstance(gsub1, GossipSub)
|
||||
host_0 = pubsubs[0].host
|
||||
host_1 = pubsubs[1].host
|
||||
|
||||
topic = "test_prune_backoff"
|
||||
|
||||
# connect hosts
|
||||
await connect(host_0, host_1)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# both join the topic
|
||||
await gsub0.join(topic)
|
||||
await gsub1.join(topic)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# ensure peer is registered in mesh
|
||||
assert host_0.get_id() in gsub1.mesh[topic]
|
||||
|
||||
# prune host_1 from gsub0's mesh
|
||||
await gsub0.emit_prune(topic, host_1.get_id(), False, False)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# host_0 should not be in gsub1's mesh
|
||||
assert host_0.get_id() not in gsub1.mesh[topic]
|
||||
|
||||
# try to graft again immediately (should be rejected due to backoff)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
assert host_0.get_id() not in gsub1.mesh[topic], (
|
||||
"peer should be backoffed and not re-added"
|
||||
)
|
||||
|
||||
# try to graft again (should succeed after backoff)
|
||||
await trio.sleep(2)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(1)
|
||||
assert host_0.get_id() in gsub1.mesh[topic], (
|
||||
"peer should be able to rejoin after backoff"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_unsubscribe_backoff():
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
|
||||
) as pubsubs:
|
||||
gsub0 = pubsubs[0].router
|
||||
gsub1 = pubsubs[1].router
|
||||
assert isinstance(gsub0, GossipSub)
|
||||
assert isinstance(gsub1, GossipSub)
|
||||
host_0 = pubsubs[0].host
|
||||
host_1 = pubsubs[1].host
|
||||
|
||||
topic = "test_unsubscribe_backoff"
|
||||
|
||||
# connect hosts
|
||||
await connect(host_0, host_1)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# both join the topic
|
||||
await gsub0.join(topic)
|
||||
await gsub1.join(topic)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# ensure peer is registered in mesh
|
||||
assert host_0.get_id() in gsub1.mesh[topic]
|
||||
|
||||
# host_1 unsubscribes from the topic
|
||||
await gsub1.leave(topic)
|
||||
await trio.sleep(0.5)
|
||||
assert topic not in gsub1.mesh
|
||||
|
||||
# host_1 resubscribes to the topic
|
||||
await gsub1.join(topic)
|
||||
await trio.sleep(0.5)
|
||||
assert topic in gsub1.mesh
|
||||
|
||||
# try to graft again immediately (should be rejected due to backoff)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
assert host_0.get_id() not in gsub1.mesh[topic], (
|
||||
"peer should be backoffed and not re-added"
|
||||
)
|
||||
|
||||
# try to graft again (should succeed after backoff)
|
||||
await trio.sleep(1)
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(1)
|
||||
assert host_0.get_id() in gsub1.mesh[topic], (
|
||||
"peer should be able to rejoin after backoff"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_peer_exchange():
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
3,
|
||||
heartbeat_interval=0.5,
|
||||
do_px=True,
|
||||
px_peers_count=1,
|
||||
) as pubsubs:
|
||||
gsub0 = pubsubs[0].router
|
||||
gsub1 = pubsubs[1].router
|
||||
gsub2 = pubsubs[2].router
|
||||
assert isinstance(gsub0, GossipSub)
|
||||
assert isinstance(gsub1, GossipSub)
|
||||
assert isinstance(gsub2, GossipSub)
|
||||
host_0 = pubsubs[0].host
|
||||
host_1 = pubsubs[1].host
|
||||
host_2 = pubsubs[2].host
|
||||
|
||||
topic = "test_peer_exchange"
|
||||
|
||||
# connect hosts
|
||||
await connect(host_1, host_0)
|
||||
await connect(host_1, host_2)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# all join the topic and 0 <-> 1 and 1 <-> 2 graft
|
||||
await pubsubs[1].subscribe(topic)
|
||||
await pubsubs[0].subscribe(topic)
|
||||
await pubsubs[2].subscribe(topic)
|
||||
await gsub1.emit_graft(topic, host_0.get_id())
|
||||
await gsub1.emit_graft(topic, host_2.get_id())
|
||||
await gsub0.emit_graft(topic, host_1.get_id())
|
||||
await gsub2.emit_graft(topic, host_1.get_id())
|
||||
await trio.sleep(1)
|
||||
|
||||
# ensure peer is registered in mesh
|
||||
assert host_0.get_id() in gsub1.mesh[topic]
|
||||
assert host_2.get_id() in gsub1.mesh[topic]
|
||||
assert host_2.get_id() not in gsub0.mesh[topic]
|
||||
|
||||
# host_1 unsubscribes from the topic
|
||||
await gsub1.leave(topic)
|
||||
await trio.sleep(1) # Wait for heartbeat to update mesh
|
||||
assert topic not in gsub1.mesh
|
||||
|
||||
# Wait for gsub0 to graft host_2 into its mesh via PX
|
||||
await trio.sleep(1)
|
||||
assert host_2.get_id() in gsub0.mesh[topic]
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_topics_are_isolated():
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
2, heartbeat_interval=0.5, prune_back_off=2
|
||||
) as pubsubs:
|
||||
gsub0 = pubsubs[0].router
|
||||
gsub1 = pubsubs[1].router
|
||||
assert isinstance(gsub0, GossipSub)
|
||||
assert isinstance(gsub1, GossipSub)
|
||||
host_0 = pubsubs[0].host
|
||||
host_1 = pubsubs[1].host
|
||||
|
||||
topic1 = "test_prune_backoff"
|
||||
topic2 = "test_prune_backoff2"
|
||||
|
||||
# connect hosts
|
||||
await connect(host_0, host_1)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# both peers join both the topics
|
||||
await gsub0.join(topic1)
|
||||
await gsub1.join(topic1)
|
||||
await gsub0.join(topic2)
|
||||
await gsub1.join(topic2)
|
||||
await gsub0.emit_graft(topic1, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# ensure topic1 for peer is registered in mesh
|
||||
assert host_0.get_id() in gsub1.mesh[topic1]
|
||||
|
||||
# prune topic1 for host_1 from gsub0's mesh
|
||||
await gsub0.emit_prune(topic1, host_1.get_id(), False, False)
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# topic1 for host_0 should not be in gsub1's mesh
|
||||
assert host_0.get_id() not in gsub1.mesh[topic1]
|
||||
|
||||
# try to regraft topic1 and graft new topic2
|
||||
await gsub0.emit_graft(topic1, host_1.get_id())
|
||||
await gsub0.emit_graft(topic2, host_1.get_id())
|
||||
await trio.sleep(0.5)
|
||||
assert host_0.get_id() not in gsub1.mesh[topic1], (
|
||||
"peer should be backoffed and not re-added"
|
||||
)
|
||||
assert host_0.get_id() in gsub1.mesh[topic2], (
|
||||
"peer should be able to join a different topic"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_stress_churn():
|
||||
NUM_PEERS = 5
|
||||
CHURN_CYCLES = 30
|
||||
TOPIC = "stress_churn_topic"
|
||||
PRUNE_BACKOFF = 1
|
||||
HEARTBEAT_INTERVAL = 0.2
|
||||
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
NUM_PEERS,
|
||||
heartbeat_interval=HEARTBEAT_INTERVAL,
|
||||
prune_back_off=PRUNE_BACKOFF,
|
||||
) as pubsubs:
|
||||
routers: list[GossipSub] = []
|
||||
for ps in pubsubs:
|
||||
assert isinstance(ps.router, GossipSub)
|
||||
routers.append(ps.router)
|
||||
hosts = [ps.host for ps in pubsubs]
|
||||
|
||||
# fully connect all peers
|
||||
for i in range(NUM_PEERS):
|
||||
for j in range(i + 1, NUM_PEERS):
|
||||
await connect(hosts[i], hosts[j])
|
||||
await trio.sleep(1)
|
||||
|
||||
# all peers join the topic
|
||||
for router in routers:
|
||||
await router.join(TOPIC)
|
||||
await trio.sleep(1)
|
||||
|
||||
# rapid join/prune cycles
|
||||
for cycle in range(CHURN_CYCLES):
|
||||
for i, router in enumerate(routers):
|
||||
# prune all other peers from this router's mesh
|
||||
for j, peer_host in enumerate(hosts):
|
||||
if i != j:
|
||||
await router.emit_prune(TOPIC, peer_host.get_id(), False, False)
|
||||
await trio.sleep(0.1)
|
||||
for i, router in enumerate(routers):
|
||||
# graft all other peers back
|
||||
for j, peer_host in enumerate(hosts):
|
||||
if i != j:
|
||||
await router.emit_graft(TOPIC, peer_host.get_id())
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# wait for backoff entries to expire and cleanup
|
||||
await trio.sleep(PRUNE_BACKOFF * 2)
|
||||
|
||||
# check that the backoff table is not unbounded
|
||||
for router in routers:
|
||||
# backoff is a dict: topic -> peer -> expiry
|
||||
backoff = getattr(router, "back_off", None)
|
||||
assert backoff is not None, "router missing backoff table"
|
||||
# only a small number of entries should remain (ideally 0)
|
||||
total_entries = sum(len(peers) for peers in backoff.values())
|
||||
assert total_entries < NUM_PEERS * 2, (
|
||||
f"backoff table grew too large: {total_entries} entries"
|
||||
)
|
||||
Reference in New Issue
Block a user