mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Add back some comment and TODO. Add comment to tests
This commit is contained in:
@ -205,6 +205,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
stream = self.pubsub.peers[peer_id]
|
stream = self.pubsub.peers[peer_id]
|
||||||
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
||||||
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
|
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
|
||||||
|
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
|
||||||
try:
|
try:
|
||||||
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
|
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
|
||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
@ -238,6 +239,8 @@ class GossipSub(IPubsubRouter):
|
|||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
in_topic_gossipsub_peers = self.mesh[topic]
|
in_topic_gossipsub_peers = self.mesh[topic]
|
||||||
else:
|
else:
|
||||||
|
# It could be the case that we publish to a topic that we have not subscribe
|
||||||
|
# and the topic is not yet added to our `fanout`.
|
||||||
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
|
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
|
||||||
# If no peers in fanout, choose some peers from gossipsub peers in topic.
|
# If no peers in fanout, choose some peers from gossipsub peers in topic.
|
||||||
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
|
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
|||||||
@ -197,7 +197,7 @@ async def test_dense(num_hosts, pubsubs_gsub, hosts):
|
|||||||
# publish from the randomly chosen host
|
# publish from the randomly chosen host
|
||||||
await pubsubs_gsub[origin_idx].publish("foobar", msg_content)
|
await pubsubs_gsub[origin_idx].publish("foobar", msg_content)
|
||||||
|
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(2)
|
||||||
# Assert that all blocking queues receive the message
|
# Assert that all blocking queues receive the message
|
||||||
for queue in queues:
|
for queue in queues:
|
||||||
msg = await queue.get()
|
msg = await queue.get()
|
||||||
@ -377,6 +377,12 @@ async def test_gossip_propagation(hosts, pubsubs_gsub):
|
|||||||
async def test_mesh_heartbeat(
|
async def test_mesh_heartbeat(
|
||||||
num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch
|
num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch
|
||||||
):
|
):
|
||||||
|
# It's difficult to set up the initial peer subscription condition.
|
||||||
|
# Ideally I would like to have initial mesh peer count that's below ``GossipSubDegree``
|
||||||
|
# so I can test if `mesh_heartbeat` return correct peers to GRAFT.
|
||||||
|
# The problem is that I can not set it up so that we have peers subscribe to the topic
|
||||||
|
# but not being part of our mesh peers (as these peers are the peers to GRAFT).
|
||||||
|
# So I monkeypatch the peer subscriptions and our mesh peers.
|
||||||
total_peer_count = 14
|
total_peer_count = 14
|
||||||
topic = "TEST_MESH_HEARTBEAT"
|
topic = "TEST_MESH_HEARTBEAT"
|
||||||
|
|
||||||
@ -386,20 +392,24 @@ async def test_mesh_heartbeat(
|
|||||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
|
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
|
||||||
|
|
||||||
peer_topics = {topic: fake_peer_ids}
|
peer_topics = {topic: fake_peer_ids}
|
||||||
|
# Monkeypatch the peer subscriptions
|
||||||
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
|
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
|
||||||
|
|
||||||
mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count)
|
mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count)
|
||||||
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
|
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
|
||||||
router_mesh = {topic: list(mesh_peers)}
|
router_mesh = {topic: list(mesh_peers)}
|
||||||
|
# Monkeypatch our mesh peers
|
||||||
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
|
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
|
||||||
|
|
||||||
peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat()
|
peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat()
|
||||||
if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree:
|
if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree:
|
||||||
|
# If number of initial mesh peers is more than `GossipSubDegree`, we should PRUNE mesh peers
|
||||||
assert len(peers_to_graft) == 0
|
assert len(peers_to_graft) == 0
|
||||||
assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree
|
assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree
|
||||||
for peer in peers_to_prune:
|
for peer in peers_to_prune:
|
||||||
assert peer in mesh_peers
|
assert peer in mesh_peers
|
||||||
elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree:
|
elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree:
|
||||||
|
# If number of initial mesh peers is less than `GossipSubDegree`, we should GRAFT more peers
|
||||||
assert len(peers_to_prune) == 0
|
assert len(peers_to_prune) == 0
|
||||||
assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count
|
assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count
|
||||||
for peer in peers_to_graft:
|
for peer in peers_to_graft:
|
||||||
@ -416,6 +426,9 @@ async def test_mesh_heartbeat(
|
|||||||
async def test_gossip_heartbeat(
|
async def test_gossip_heartbeat(
|
||||||
num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch
|
num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch
|
||||||
):
|
):
|
||||||
|
# The problem is that I can not set it up so that we have peers subscribe to the topic
|
||||||
|
# but not being part of our mesh peers (as these peers are the peers to GRAFT).
|
||||||
|
# So I monkeypatch the peer subscriptions and our mesh peers.
|
||||||
total_peer_count = 28
|
total_peer_count = 28
|
||||||
topic_mesh = "TEST_GOSSIP_HEARTBEAT_1"
|
topic_mesh = "TEST_GOSSIP_HEARTBEAT_1"
|
||||||
topic_fanout = "TEST_GOSSIP_HEARTBEAT_2"
|
topic_fanout = "TEST_GOSSIP_HEARTBEAT_2"
|
||||||
@ -426,21 +439,25 @@ async def test_gossip_heartbeat(
|
|||||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
|
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids)
|
||||||
|
|
||||||
topic_mesh_peer_count = 14
|
topic_mesh_peer_count = 14
|
||||||
|
# Split into mesh peers and fanout peers
|
||||||
peer_topics = {
|
peer_topics = {
|
||||||
topic_mesh: fake_peer_ids[:topic_mesh_peer_count],
|
topic_mesh: fake_peer_ids[:topic_mesh_peer_count],
|
||||||
topic_fanout: fake_peer_ids[topic_mesh_peer_count:],
|
topic_fanout: fake_peer_ids[topic_mesh_peer_count:],
|
||||||
}
|
}
|
||||||
|
# Monkeypatch the peer subscriptions
|
||||||
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
|
monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics)
|
||||||
|
|
||||||
mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count)
|
mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count)
|
||||||
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
|
mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices]
|
||||||
router_mesh = {topic_mesh: list(mesh_peers)}
|
router_mesh = {topic_mesh: list(mesh_peers)}
|
||||||
|
# Monkeypatch our mesh peers
|
||||||
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
|
monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh)
|
||||||
fanout_peer_indices = random.sample(
|
fanout_peer_indices = random.sample(
|
||||||
range(topic_mesh_peer_count, total_peer_count), initial_peer_count
|
range(topic_mesh_peer_count, total_peer_count), initial_peer_count
|
||||||
)
|
)
|
||||||
fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices]
|
fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices]
|
||||||
router_fanout = {topic_fanout: list(fanout_peers)}
|
router_fanout = {topic_fanout: list(fanout_peers)}
|
||||||
|
# Monkeypatch our fanout peers
|
||||||
monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout)
|
monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout)
|
||||||
|
|
||||||
def window(topic):
|
def window(topic):
|
||||||
@ -451,18 +468,24 @@ async def test_gossip_heartbeat(
|
|||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
# Monkeypatch the memory cache messages
|
||||||
monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window)
|
monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window)
|
||||||
|
|
||||||
peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat()
|
peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat()
|
||||||
|
# If our mesh peer count is less than `GossipSubDegree`, we should gossip to up to
|
||||||
|
# `GossipSubDegree` peers (exclude mesh peers).
|
||||||
if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree:
|
if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree:
|
||||||
|
# The same goes for fanout so it's two times the number of peers to gossip.
|
||||||
assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count)
|
assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count)
|
||||||
elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree:
|
elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree:
|
||||||
assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree)
|
assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree)
|
||||||
|
|
||||||
for peer in peers_to_gossip:
|
for peer in peers_to_gossip:
|
||||||
if peer in peer_topics[topic_mesh]:
|
if peer in peer_topics[topic_mesh]:
|
||||||
|
# Check that the peer to gossip to is not in our mesh peers
|
||||||
assert peer not in mesh_peers
|
assert peer not in mesh_peers
|
||||||
assert topic_mesh in peers_to_gossip[peer]
|
assert topic_mesh in peers_to_gossip[peer]
|
||||||
elif peer in peer_topics[topic_fanout]:
|
elif peer in peer_topics[topic_fanout]:
|
||||||
|
# Check that the peer to gossip to is not in our fanout peers
|
||||||
assert peer not in fanout_peers
|
assert peer not in fanout_peers
|
||||||
assert topic_fanout in peers_to_gossip[peer]
|
assert topic_fanout in peers_to_gossip[peer]
|
||||||
|
|||||||
Reference in New Issue
Block a user