mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Merge branch 'master' into peer_routing
This commit is contained in:
@ -1,12 +1,11 @@
|
||||
import asyncio
|
||||
import multiaddr
|
||||
import uuid
|
||||
|
||||
from utils import message_id_generator, generate_RPC_packet
|
||||
from libp2p import new_node
|
||||
from libp2p.pubsub.message import create_message_talk
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
from libp2p.pubsub.message import MessageTalk
|
||||
from libp2p.pubsub.message import generate_message_id
|
||||
|
||||
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||
CRYPTO_TOPIC = "ethereum"
|
||||
@ -27,6 +26,8 @@ class DummyAccountNode():
|
||||
|
||||
def __init__(self):
|
||||
self.balances = {}
|
||||
self.next_msg_id_func = message_id_generator(0)
|
||||
self.node_id = str(uuid.uuid1())
|
||||
|
||||
@classmethod
|
||||
async def create(cls):
|
||||
@ -53,16 +54,13 @@ class DummyAccountNode():
|
||||
Handle all incoming messages on the CRYPTO_TOPIC from peers
|
||||
"""
|
||||
while True:
|
||||
message_raw = await self.q.get()
|
||||
message = create_message_talk(message_raw)
|
||||
contents = message.data
|
||||
|
||||
msg_comps = contents.split(",")
|
||||
incoming = await self.q.get()
|
||||
msg_comps = incoming.data.decode('utf-8').split(",")
|
||||
|
||||
if msg_comps[0] == "send":
|
||||
self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3]))
|
||||
elif msg_comps[0] == "set":
|
||||
self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2]))
|
||||
self.handle_set_crypto(msg_comps[1], int(msg_comps[2]))
|
||||
|
||||
async def setup_crypto_networking(self):
|
||||
"""
|
||||
@ -82,8 +80,8 @@ class DummyAccountNode():
|
||||
"""
|
||||
my_id = str(self.libp2p_node.get_id())
|
||||
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
|
||||
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
||||
await self.floodsub.publish(my_id, msg.to_str())
|
||||
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
|
||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||
|
||||
async def publish_set_crypto(self, user, amount):
|
||||
"""
|
||||
@ -93,8 +91,9 @@ class DummyAccountNode():
|
||||
"""
|
||||
my_id = str(self.libp2p_node.get_id())
|
||||
msg_contents = "set," + user + "," + str(amount)
|
||||
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
||||
await self.floodsub.publish(my_id, msg.to_str())
|
||||
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
|
||||
|
||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||
|
||||
def handle_send_crypto(self, source_user, dest_user, amount):
|
||||
"""
|
||||
@ -113,7 +112,7 @@ class DummyAccountNode():
|
||||
else:
|
||||
self.balances[dest_user] = amount
|
||||
|
||||
def handle_set_crypto_for_user(self, dest_user, amount):
|
||||
def handle_set_crypto(self, dest_user, amount):
|
||||
"""
|
||||
Handle incoming set_crypto message
|
||||
:param dest_user: user to set crypto for
|
||||
|
||||
@ -8,8 +8,6 @@ from libp2p import new_node
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
from libp2p.pubsub.message import MessageTalk
|
||||
from libp2p.pubsub.message import create_message_talk
|
||||
from dummy_account_node import DummyAccountNode
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
@ -66,7 +64,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
|
||||
await action_func(dummy_nodes)
|
||||
|
||||
# Allow time for action function to be performed (i.e. messages to propogate)
|
||||
await asyncio.sleep(0.25)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Perform assertion function
|
||||
for dummy_node in dummy_nodes:
|
||||
@ -187,3 +185,28 @@ async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography():
|
||||
assert dummy_node.get_balance("rob") == 12
|
||||
|
||||
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_then_send_from_five_diff_nodes_five_nodes_ring_topography():
|
||||
num_nodes = 5
|
||||
adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
|
||||
|
||||
async def action_func(dummy_nodes):
|
||||
await dummy_nodes[0].publish_set_crypto("alex", 20)
|
||||
await asyncio.sleep(1)
|
||||
await dummy_nodes[1].publish_send_crypto("alex", "rob", 3)
|
||||
await asyncio.sleep(1)
|
||||
await dummy_nodes[2].publish_send_crypto("rob", "aspyn", 2)
|
||||
await asyncio.sleep(1)
|
||||
await dummy_nodes[3].publish_send_crypto("aspyn", "zx", 1)
|
||||
await asyncio.sleep(1)
|
||||
await dummy_nodes[4].publish_send_crypto("zx", "raul", 1)
|
||||
|
||||
def assertion_func(dummy_node):
|
||||
assert dummy_node.get_balance("alex") == 17
|
||||
assert dummy_node.get_balance("rob") == 1
|
||||
assert dummy_node.get_balance("aspyn") == 1
|
||||
assert dummy_node.get_balance("zx") == 0
|
||||
assert dummy_node.get_balance("raul") == 1
|
||||
|
||||
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||
|
||||
@ -5,11 +5,10 @@ import pytest
|
||||
from tests.utils import cleanup
|
||||
from libp2p import new_node
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from libp2p.pubsub.pb import rpc_pb2
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
from libp2p.pubsub.message import MessageTalk
|
||||
from libp2p.pubsub.message import create_message_talk
|
||||
from libp2p.pubsub.message import generate_message_id
|
||||
from utils import message_id_generator, generate_RPC_packet
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
|
||||
@ -45,72 +44,94 @@ async def test_simple_two_nodes():
|
||||
|
||||
node_a_id = str(node_a.get_id())
|
||||
|
||||
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
|
||||
|
||||
await floodsub_a.publish(node_a.get_id(), msg.to_str())
|
||||
|
||||
next_msg_id_func = message_id_generator(0)
|
||||
msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func())
|
||||
await floodsub_a.publish(node_a_id, msg.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
res_b = await qb.get()
|
||||
|
||||
# Check that the msg received by node_b is the same
|
||||
# as the message sent by node_a
|
||||
assert res_b == msg.to_str()
|
||||
assert res_b.SerializeToString() == msg.publish[0].SerializeToString()
|
||||
|
||||
# Success, terminate pending tasks.
|
||||
await cleanup()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_three_nodes():
|
||||
# Want to pass message from A -> B -> C
|
||||
async def test_lru_cache_two_nodes():
|
||||
# two nodes with cache_size of 4
|
||||
# node_a send the following messages to node_b
|
||||
# [1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
|
||||
# node_b should only receive the following
|
||||
# [1, 2, 3, 4, 5, 1]
|
||||
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
|
||||
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
|
||||
supported_protocols = ["/floodsub/1.0.0"]
|
||||
|
||||
# initialize PubSub with a cache_size of 4
|
||||
floodsub_a = FloodSub(supported_protocols)
|
||||
pubsub_a = Pubsub(node_a, floodsub_a, "a")
|
||||
pubsub_a = Pubsub(node_a, floodsub_a, "a", 4)
|
||||
floodsub_b = FloodSub(supported_protocols)
|
||||
pubsub_b = Pubsub(node_b, floodsub_b, "b")
|
||||
floodsub_c = FloodSub(supported_protocols)
|
||||
pubsub_c = Pubsub(node_c, floodsub_c, "c")
|
||||
pubsub_b = Pubsub(node_b, floodsub_b, "b", 4)
|
||||
|
||||
await connect(node_a, node_b)
|
||||
await connect(node_b, node_c)
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
qb = await pubsub_b.subscribe("my_topic")
|
||||
qc = await pubsub_c.subscribe("my_topic")
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
node_a_id = str(node_a.get_id())
|
||||
|
||||
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
|
||||
|
||||
await floodsub_a.publish(node_a.get_id(), msg.to_str())
|
||||
# initialize message_id_generator
|
||||
# store first message
|
||||
next_msg_id_func = message_id_generator(0)
|
||||
first_message = generate_RPC_packet(node_a_id, ["my_topic"], "some data 1", next_msg_id_func())
|
||||
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
print (first_message)
|
||||
|
||||
messages = [first_message]
|
||||
# for the next 5 messages
|
||||
for i in range(2, 6):
|
||||
# write first message
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# generate and write next message
|
||||
msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data " + str(i), next_msg_id_func())
|
||||
messages.append(msg)
|
||||
|
||||
await floodsub_a.publish(node_a_id, msg.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# write first message again
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# check the first five messages in queue
|
||||
# should only see 1 first_message
|
||||
for i in range(5):
|
||||
# Check that the msg received by node_b is the same
|
||||
# as the message sent by node_a
|
||||
res_b = await qb.get()
|
||||
assert res_b.SerializeToString() == messages[i].publish[0].SerializeToString()
|
||||
|
||||
# the 6th message should be first_message
|
||||
res_b = await qb.get()
|
||||
res_c = await qc.get()
|
||||
|
||||
# Check that the msg received by node_b is the same
|
||||
# as the message sent by node_a
|
||||
assert res_b == msg.to_str()
|
||||
|
||||
# res_c should match original msg but with b as sender
|
||||
node_b_id = str(node_b.get_id())
|
||||
msg.from_id = node_b_id
|
||||
|
||||
assert res_c == msg.to_str()
|
||||
|
||||
assert res_b.SerializeToString() == first_message.publish[0].SerializeToString()
|
||||
assert qb.empty()
|
||||
|
||||
# Success, terminate pending tasks.
|
||||
await cleanup()
|
||||
|
||||
|
||||
async def perform_test_from_obj(obj):
|
||||
"""
|
||||
Perform a floodsub test from a test obj.
|
||||
@ -227,6 +248,8 @@ async def perform_test_from_obj(obj):
|
||||
topics_in_msgs_ordered = []
|
||||
messages = obj["messages"]
|
||||
tasks_publish = []
|
||||
next_msg_id_func = message_id_generator(0)
|
||||
|
||||
for msg in messages:
|
||||
topics = msg["topics"]
|
||||
|
||||
@ -237,11 +260,12 @@ async def perform_test_from_obj(obj):
|
||||
actual_node_id = str(node_map[node_id].get_id())
|
||||
|
||||
# Create correctly formatted message
|
||||
msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id())
|
||||
|
||||
msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func())
|
||||
|
||||
# Publish message
|
||||
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
|
||||
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())))
|
||||
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\
|
||||
actual_node_id, msg_talk.SerializeToString())))
|
||||
|
||||
# For each topic in topics, add topic, msg_talk tuple to ordered test list
|
||||
# TODO: Update message sender to be correct message sender before
|
||||
@ -258,15 +282,12 @@ async def perform_test_from_obj(obj):
|
||||
# TODO: Check message sender too
|
||||
for i in range(len(topics_in_msgs_ordered)):
|
||||
topic, actual_msg = topics_in_msgs_ordered[i]
|
||||
|
||||
# Look at each node in each topic
|
||||
for node_id in topic_map[topic]:
|
||||
# Get message from subscription queue
|
||||
msg_on_node_str = await queues_map[node_id][topic].get()
|
||||
msg_on_node = create_message_talk(msg_on_node_str)
|
||||
|
||||
# Perform checks
|
||||
assert actual_msg.origin_id == msg_on_node.origin_id
|
||||
assert actual_msg.topics == msg_on_node.topics
|
||||
assert actual_msg.data == msg_on_node.data
|
||||
assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString()
|
||||
|
||||
# Success, terminate pending tasks.
|
||||
await cleanup()
|
||||
@ -484,3 +505,112 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
|
||||
]
|
||||
}
|
||||
await perform_test_from_obj(test_obj)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj():
|
||||
test_obj = {
|
||||
"supported_protocols": ["/floodsub/1.0.0"],
|
||||
"adj_list": {
|
||||
"1": ["2", "3", "4"],
|
||||
"2": ["1", "3", "4"],
|
||||
"3": ["1", "2", "4"],
|
||||
"4": ["1", "2", "3"]
|
||||
},
|
||||
"topic_map": {
|
||||
"astrophysics": ["1", "2", "3", "4"],
|
||||
"school": ["1", "2", "3", "4"]
|
||||
},
|
||||
"messages": [
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "e=mc^2",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar2",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic2",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar3",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic3",
|
||||
"node_id": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
await perform_test_from_obj(test_obj)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj():
|
||||
test_obj = {
|
||||
"supported_protocols": ["/floodsub/1.0.0"],
|
||||
"adj_list": {
|
||||
"1": ["2"],
|
||||
"2": ["3"],
|
||||
"3": ["4"],
|
||||
"4": ["5"],
|
||||
"5": ["1"]
|
||||
},
|
||||
"topic_map": {
|
||||
"astrophysics": ["1", "2", "3", "4", "5"],
|
||||
"school": ["1", "2", "3", "4", "5"]
|
||||
},
|
||||
"messages": [
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "e=mc^2",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar2",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic2",
|
||||
"node_id": "1"
|
||||
},
|
||||
{
|
||||
"topics": ["school"],
|
||||
"data": "foobar3",
|
||||
"node_id": "2"
|
||||
},
|
||||
{
|
||||
"topics": ["astrophysics"],
|
||||
"data": "I am allergic3",
|
||||
"node_id": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
await perform_test_from_obj(test_obj)
|
||||
44
tests/pubsub/utils.py
Normal file
44
tests/pubsub/utils.py
Normal file
@ -0,0 +1,44 @@
|
||||
import uuid
|
||||
import struct
|
||||
from libp2p.pubsub.pb import rpc_pb2
|
||||
|
||||
|
||||
def message_id_generator(start_val):
|
||||
"""
|
||||
Generate a unique message id
|
||||
:param start_val: value to start generating messages at
|
||||
:return: message id
|
||||
"""
|
||||
val = start_val
|
||||
def generator():
|
||||
# Allow manipulation of val within closure
|
||||
nonlocal val
|
||||
|
||||
# Increment id
|
||||
val += 1
|
||||
|
||||
# Convert val to big endian
|
||||
return struct.pack('>Q', val)
|
||||
|
||||
return generator
|
||||
|
||||
def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
|
||||
"""
|
||||
Generate RPC packet to send over wire
|
||||
:param origin_id: peer id of the message origin
|
||||
:param topics: list of topics
|
||||
:param msg_content: string of content in data
|
||||
:param msg_id: seqno for the message
|
||||
"""
|
||||
packet = rpc_pb2.RPC()
|
||||
message = rpc_pb2.Message(
|
||||
from_id=origin_id.encode('utf-8'),
|
||||
seqno=msg_id,
|
||||
data=msg_content.encode('utf-8'),
|
||||
)
|
||||
|
||||
for topic in topics:
|
||||
message.topicIDs.extend([topic.encode('utf-8')])
|
||||
|
||||
packet.publish.extend([message])
|
||||
return packet
|
||||
Reference in New Issue
Block a user