[WIP] PubSub and FloodSub development (#133)

* Add notifee interface

* Add notify function to network interface

* Implement notify feature

* Add tests for notify

* Make notifee functions all async

* Fix linting issue

* Fix linting issue

* Scaffold pubsub router interface

* Scaffold pubsub directory

* Store peer_id in muxed connection

* Implement pubsub notifee

* Remove outdated files

* Implement pubsub first attempt

* Prepare pubsub for floodsub

* Add mplex conn to net stream and add conn in notify tests

* Implement floodsub

* Use NetStream in generic protocol handler

* Debugging async issues

* Modify test to perform proper assert. Test passes

* Remove callbacks. Reduce sleep time

* Add simple three node test

* Clean up code. Add message classes

* Add test for two topics

* Add conn to net stream and conn tests

* Refactor test setup to remove duplicate code

* Fix linting issues

* Fix linting issue

* Fix linting issue

* Fix outstanding unrelated lint issue in multiselect_client

* Add connect function

* Remove debug prints

* Remove debug prints from floodsub

* Use MessageTalk in place of direct message breakdown

* Remove extra prints

* Remove outdated function

* Add message to queues for all topics in message

* Debugging

* Add message self delivery

* Increase read timeout to 5 to get pubsub tests passing

* Refactor testing helper func. Add tests

* Add tests and increase timeout to get tests passing

* Add dummy account demo scaffolding

* Attempt to use threads. Test fails

* Implement basic dummy node tests using threads

* Add generic testing function

* Add simple seven node tree test

* Add more complex seven node tree tests

* Add five node ring tests

* Remove unnecessary get_message_type func

* Add documentation to classes

* Add message id to messages

* Add documentation to test helper func

* Add docs to dummy account node helper func

* Add more docs to dummy account node test helper func

* fixed linting errors in floodsub

* small notify bugfix

* move pubsub into libp2p

* fixed pubsub linting

* fixing pubsub test failures

* linting
This commit is contained in:
stuckinaboot
2019-03-23 13:52:02 -04:00
committed by Robert Zajac
parent 6c4bbd1e85
commit 57077cd3b4
14 changed files with 1474 additions and 11 deletions

View File

@ -49,6 +49,7 @@ class MyNotifee(INotifee):
async def listen_close(self, network, _multiaddr):
pass
class InvalidNotifee():
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
@ -70,6 +71,36 @@ class InvalidNotifee():
async def listen(self):
assert False
async def perform_two_host_simple_set_up():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
async def my_stream_handler(stream):
while True:
read_string = (await stream.read()).decode()
resp = "ack:" + read_string
await stream.write(resp.encode())
node_b.set_stream_handler("/echo/1.0.0", my_stream_handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
return node_a, node_b
async def perform_two_host_simple_set_up_custom_handler(handler):
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b.set_stream_handler("/echo/1.0.0", handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
return node_a, node_b
@pytest.mark.asyncio
async def test_one_notifier():
node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler)

View File

@ -0,0 +1,134 @@
import asyncio
import multiaddr
from libp2p import new_node
from libp2p.pubsub.message import create_message_talk
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import generate_message_id
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
CRYPTO_TOPIC = "ethereum"
# Message format:
# Sending crypto: <source>,<dest>,<amount as integer>
# Ex. send,aspyn,alex,5
# Set crypto: <dest>,<amount as integer>
# Ex. set,rob,5
# Determine message type by looking at first item before first comma
class DummyAccountNode():
"""
Node which has an internal balance mapping, meant to serve as
a dummy crypto blockchain. There is no actual blockchain, just a simple
map indicating how much crypto each user in the mappings holds
"""
def __init__(self):
self.balances = {}
@classmethod
async def create(cls):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
We use create as this serves as a factory function and allows us
to use async await, unlike the init function
"""
self = DummyAccountNode()
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
self.libp2p_node = libp2p_node
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
return self
async def handle_incoming_msgs(self):
"""
Handle all incoming messages on the CRYPTO_TOPIC from peers
"""
while True:
message_raw = await self.q.get()
message = create_message_talk(message_raw)
contents = message.data
msg_comps = contents.split(",")
if msg_comps[0] == "send":
self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3]))
elif msg_comps[0] == "set":
self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2]))
async def setup_crypto_networking(self):
"""
Subscribe to CRYPTO_TOPIC and perform call to function that handles
all incoming messages on said topic
"""
self.q = await self.pubsub.subscribe(CRYPTO_TOPIC)
asyncio.ensure_future(self.handle_incoming_msgs())
async def publish_send_crypto(self, source_user, dest_user, amount):
"""
Create a send crypto message and publish that message to all other nodes
:param source_user: user to send crypto from
:param dest_user: user to send crypto to
:param amount: amount of crypto to send
"""
my_id = str(self.libp2p_node.get_id())
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
await self.floodsub.publish(my_id, msg.to_str())
async def publish_set_crypto(self, user, amount):
"""
Create a set crypto message and publish that message to all other nodes
:param user: user to set crypto for
:param amount: amount of crypto
"""
my_id = str(self.libp2p_node.get_id())
msg_contents = "set," + user + "," + str(amount)
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
await self.floodsub.publish(my_id, msg.to_str())
def handle_send_crypto(self, source_user, dest_user, amount):
"""
Handle incoming send_crypto message
:param source_user: user to send crypto from
:param dest_user: user to send crypto to
:param amount: amount of crypto to send
"""
if source_user in self.balances:
self.balances[source_user] -= amount
else:
self.balances[source_user] = -amount
if dest_user in self.balances:
self.balances[dest_user] += amount
else:
self.balances[dest_user] = amount
def handle_set_crypto_for_user(self, dest_user, amount):
"""
Handle incoming set_crypto message
:param dest_user: user to set crypto for
:param amount: amount of crypto
"""
self.balances[dest_user] = amount
def get_balance(self, user):
"""
Get balance in crypto for a particular user
:param user: user to get balance for
:return: balance of user
"""
if user in self.balances:
return self.balances[user]
else:
return -1

View File

@ -0,0 +1,189 @@
import asyncio
import multiaddr
import pytest
from threading import Thread
from tests.utils import cleanup
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import create_message_talk
from dummy_account_node import DummyAccountNode
# pylint: disable=too-many-locals
async def connect(node1, node2):
# node1 connects to node2
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
def create_setup_in_new_thread_func(dummy_node):
def setup_in_new_thread():
asyncio.ensure_future(dummy_node.setup_crypto_networking())
return setup_in_new_thread
async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
"""
Helper function to allow for easy construction of custom tests for dummy account nodes
in various network topologies
:param num_nodes: number of nodes in the test
:param adjacency_map: adjacency map defining each node and its list of neighbors
:param action_func: function to execute that includes actions by the nodes,
such as send crypto and set crypto
:param assertion_func: assertions for testing the results of the actions are correct
"""
# Create nodes
dummy_nodes = []
for i in range(num_nodes):
dummy_nodes.append(await DummyAccountNode.create())
# Create network
for source_num in adjacency_map:
target_nums = adjacency_map[source_num]
for target_num in target_nums:
await connect(dummy_nodes[source_num].libp2p_node, \
dummy_nodes[target_num].libp2p_node)
# Allow time for network creation to take place
await asyncio.sleep(0.25)
# Start a thread for each node so that each node can listen and respond
# to messages on its own thread, which will avoid waiting indefinitely
# on the main thread. On this thread, call the setup func for the node,
# which subscribes the node to the CRYPTO_TOPIC topic
for dummy_node in dummy_nodes:
thread = Thread(target=create_setup_in_new_thread_func(dummy_node))
thread.run()
# Allow time for nodes to subscribe to CRYPTO_TOPIC topic
await asyncio.sleep(0.25)
# Perform action function
await action_func(dummy_nodes)
# Allow time for action function to be performed (i.e. messages to propogate)
await asyncio.sleep(0.25)
# Perform assertion function
for dummy_node in dummy_nodes:
assertion_func(dummy_node)
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio
async def test_simple_two_nodes():
num_nodes = 2
adj_map = {0: [1]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 10)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 10
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_simple_three_nodes_line_topography():
num_nodes = 3
adj_map = {0: [1], 1: [2]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 10)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 10
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_simple_three_nodes_triangle_topography():
num_nodes = 3
adj_map = {0: [1, 2], 1: [2]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 20
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_simple_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 20
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_set_then_send_from_root_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
await asyncio.sleep(0.25)
await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 15
assert dummy_node.get_balance("alex") == 5
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
async def action_func(dummy_nodes):
await dummy_nodes[6].publish_set_crypto("aspyn", 20)
await asyncio.sleep(0.25)
await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 15
assert dummy_node.get_balance("alex") == 5
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_simple_five_nodes_ring_topography():
num_nodes = 5
adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
def assertion_func(dummy_node):
assert dummy_node.get_balance("aspyn") == 20
await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio
async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography():
num_nodes = 5
adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
async def action_func(dummy_nodes):
await dummy_nodes[0].publish_set_crypto("alex", 20)
await asyncio.sleep(0.25)
await dummy_nodes[3].publish_send_crypto("alex", "rob", 12)
def assertion_func(dummy_node):
assert dummy_node.get_balance("alex") == 8
assert dummy_node.get_balance("rob") == 12
await perform_test(num_nodes, adj_map, action_func, assertion_func)

View File

@ -0,0 +1,486 @@
import asyncio
import multiaddr
import pytest
from tests.utils import cleanup
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import create_message_talk
from libp2p.pubsub.message import generate_message_id
# pylint: disable=too-many-locals
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
@pytest.mark.asyncio
async def test_simple_two_nodes():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
supported_protocols = ["/floodsub/1.0.0"]
floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a")
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b")
await connect(node_a, node_b)
await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic")
await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a.get_id(), msg.to_str())
await asyncio.sleep(0.25)
res_b = await qb.get()
# Check that the msg received by node_b is the same
# as the message sent by node_a
assert res_b == msg.to_str()
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio
async def test_simple_three_nodes():
# Want to pass message from A -> B -> C
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
supported_protocols = ["/floodsub/1.0.0"]
floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a")
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b")
floodsub_c = FloodSub(supported_protocols)
pubsub_c = Pubsub(node_c, floodsub_c, "c")
await connect(node_a, node_b)
await connect(node_b, node_c)
await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic")
qc = await pubsub_c.subscribe("my_topic")
await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a.get_id(), msg.to_str())
await asyncio.sleep(0.25)
res_b = await qb.get()
res_c = await qc.get()
# Check that the msg received by node_b is the same
# as the message sent by node_a
assert res_b == msg.to_str()
# res_c should match original msg but with b as sender
node_b_id = str(node_b.get_id())
msg.from_id = node_b_id
assert res_c == msg.to_str()
# Success, terminate pending tasks.
await cleanup()
async def perform_test_from_obj(obj):
"""
Perform a floodsub test from a test obj.
test obj are composed as follows:
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": "some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
floodsub_map = {}
pubsub_map = {}
supported_protocols = obj["supported_protocols"]
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[start_node_id] = node
floodsub = FloodSub(supported_protocols)
floodsub_map[start_node_id] = floodsub
pubsub = Pubsub(node, floodsub, start_node_id)
pubsub_map[start_node_id] = pubsub
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[neighbor_id] = neighbor_node
floodsub = FloodSub(supported_protocols)
floodsub_map[neighbor_id] = floodsub
pubsub = Pubsub(neighbor_node, floodsub, neighbor_id)
pubsub_map[neighbor_id] = pubsub
# Connect node and neighbor
# await connect(node_map[start_node_id], node_map[neighbor_id])
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
tasks_connect.append(asyncio.sleep(2))
await asyncio.gather(*tasks_connect)
# Allow time for graph creation before continuing
# await asyncio.sleep(0.25)
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic in topic_map:
for node_id in topic_map[topic]:
"""
# Subscribe node to topic
q = await pubsub_map[node_id].subscribe(topic)
# Create topic-queue map for node_id if one does not yet exist
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
"""
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
for i in range(len(responses) - 1):
q = responses[i]
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
# Allow time for subscribing before continuing
# await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Get actual id for sender node (not the id from the test obj)
actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message
msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id())
# Publish message
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before
# adding msg_talk to this list
for topic in topics:
topics_in_msgs_ordered.append((topic, msg_talk))
# Allow time for publishing before continuing
# await asyncio.sleep(0.4)
tasks_publish.append(asyncio.sleep(2))
await asyncio.gather(*tasks_publish)
# Step 4) Check that all messages were received correctly.
# TODO: Check message sender too
for i in range(len(topics_in_msgs_ordered)):
topic, actual_msg = topics_in_msgs_ordered[i]
for node_id in topic_map[topic]:
# Get message from subscription queue
msg_on_node_str = await queues_map[node_id][topic].get()
msg_on_node = create_message_talk(msg_on_node_str)
# Perform checks
assert actual_msg.origin_id == msg_on_node.origin_id
assert actual_msg.topics == msg_on_node.topics
assert actual_msg.data == msg_on_node.data
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio
async def test_simple_two_nodes_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_two_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"],
"B": ["C"]
},
"topic_map": {
"topic1": ["B", "C"],
"topic2": ["B", "C"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
},
{
"topics": ["topic2"],
"data": "Alex is tall",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_two_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
},
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_one_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "1"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "4"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "7"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3"],
"school": ["1", "2", "3"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)