fix all tests

This commit is contained in:
zixuanzh
2019-04-01 16:23:20 -04:00
parent 2e5e7e3c10
commit 6eb070b78e
5 changed files with 221 additions and 154 deletions

View File

@ -65,7 +65,16 @@ class FloodSub(IPubsubRouter):
# Deliver to self if self was origin # Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message # Note: handle_talk checks if self is subscribed to topics in message
for message in packet.publish: for message in packet.publish:
if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): decoded_from_id = message.from_id.decode('utf-8')
print ("MESSAGE SENDER")
print (msg_sender)
print ("FROM ID")
print (message.from_id)
print (str(self.pubsub.host.get_id()))
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()):
await self.pubsub.handle_talk(sender_peer_id, message) await self.pubsub.handle_talk(sender_peer_id, message)
@ -82,9 +91,12 @@ class FloodSub(IPubsubRouter):
# message sender and are not the message origin # message sender and are not the message origin
print ("PEERID") print ("PEERID")
print (peer_id_in_topic) print (peer_id_in_topic)
if peer_id_in_topic not in (msg_sender, message.from_id): if peer_id_in_topic not in (msg_sender, decoded_from_id):
stream = self.pubsub.peers[peer_id_in_topic] stream = self.pubsub.peers[peer_id_in_topic]
await stream.write(packet.SerializeToString()) # create new packet with just publish message
new_packet = rpc_pb2.RPC()
new_packet.publish.extend([message])
await stream.write(new_packet.SerializeToString())
else: else:
# Implies publish did not write # Implies publish did not write
print("publish did not write") print("publish did not write")

View File

@ -58,17 +58,19 @@ class Pubsub():
def get_hello_packet(self): def get_hello_packet(self):
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics
""" """
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
message = rpc_pb2.Message( if self.my_topics:
from_id=str(self.host.get_id()).encode('utf-8'), for topic_id in self.my_topics:
seqno=str(generate_message_id()).encode('utf-8') packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
) subscribe=True, topicid=topic_id)])
packet.publish.extend([message])
for topic_id in self.my_topics: # message = rpc_pb2.Message(
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( # from_id=str(self.host.get_id()).encode('utf-8'),
subscribe=True, topicid=topic_id)]) # seqno=str(generate_message_id()).encode('utf-8')
# )
# packet.publish.extend([message])
return packet.SerializeToString() return packet.SerializeToString()
@ -80,9 +82,11 @@ class Pubsub():
""" """
# TODO check on types here # TODO check on types here
print ("++++++++++ASPYN+++++++++++++++++")
peer_id = str(stream.mplex_conn.peer_id) peer_id = str(stream.mplex_conn.peer_id)
while True: while True:
print ("HIT ME")
incoming = (await stream.read()) incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC() rpc_incoming = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
@ -91,13 +95,15 @@ class Pubsub():
print (rpc_incoming) print (rpc_incoming)
print ("###########################") print ("###########################")
should_publish = True should_publish = False
if rpc_incoming.publish: if rpc_incoming.publish:
# deal with RPC.publish # deal with RPC.publish
for message in rpc_incoming.publish: for message in rpc_incoming.publish:
self.seen_messages.append(message.seqno) if message.seqno not in self.seen_messages:
await self.handle_talk(peer_id, message) should_publish = True
self.seen_messages.append(message.seqno)
await self.handle_talk(peer_id, message)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
@ -106,13 +112,9 @@ class Pubsub():
# peers because a given node only needs its peers # peers because a given node only needs its peers
# to know that it is subscribed to the topic (doesn't # to know that it is subscribed to the topic (doesn't
# need everyone to know) # need everyone to know)
should_publish = False
# TODO check that peer_id is the same as origin_id
from_id = str(rpc_incoming.publish[0].from_id.decode('utf-8'))
for message in rpc_incoming.subscriptions: for message in rpc_incoming.subscriptions:
if message.subscribe: if message.subscribe:
self.handle_subscription(from_id, message) self.handle_subscription(peer_id, message)
if should_publish: if should_publish:
# relay message to peers with router # relay message to peers with router
@ -182,6 +184,8 @@ class Pubsub():
:param sub_message: RPC.SubOpts :param sub_message: RPC.SubOpts
""" """
# TODO verify logic here # TODO verify logic here
if sub_message.subscribe: if sub_message.subscribe:
if sub_message.topicid not in self.peer_topics: if sub_message.topicid not in self.peer_topics:
self.peer_topics[sub_message.topicid] = [peer_id] self.peer_topics[sub_message.topicid] = [peer_id]
@ -213,19 +217,23 @@ class Pubsub():
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
""" """
# Map topic_id to blocking queue # Map topic_id to blocking queue
print ("**PUBSUB** in SUBSCRIBE")
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message # Create subscribe message
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
packet.publish.extend([rpc_pb2.Message( # packet.publish.extend([rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'), # from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8') # seqno=str(generate_message_id()).encode('utf-8')
)]) # )])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = True, subscribe = True,
topicid = topic_id.encode('utf-8') topicid = topic_id.encode('utf-8')
)]) )])
print (packet)
print ("**PUBSUB** PEEERS")
print (self.peers)
# Send out subscribe message to all peers # Send out subscribe message to all peers
await self.message_all_peers(packet.SerializeToString()) await self.message_all_peers(packet.SerializeToString())
@ -247,10 +255,10 @@ class Pubsub():
# Create unsubscribe message # Create unsubscribe message
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
packet.publish.extend([rpc_pb2.Message( # packet.publish.extend([rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'), # from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8') # seqno=str(generate_message_id()).encode('utf-8')
)]) # )])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = False, subscribe = False,
topicid = topic_id.encode('utf-8') topicid = topic_id.encode('utf-8')
@ -267,6 +275,8 @@ class Pubsub():
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
""" """
print ("**PUBSU** IN MESSAGE ALL PEERS")
print (rpc_msg)
# Broadcast message # Broadcast message
for peer in self.peers: for peer in self.peers:

View File

@ -86,7 +86,7 @@ class DummyAccountNode():
my_id = str(self.libp2p_node.get_id()) my_id = str(self.libp2p_node.get_id())
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
await self.floodsub.publish(my_id, msg.SerializeToString()) await self.floodsub.publish(my_id, packet.SerializeToString())
async def publish_set_crypto(self, user, amount): async def publish_set_crypto(self, user, amount):
""" """
@ -128,6 +128,8 @@ class DummyAccountNode():
print (dest_user) print (dest_user)
print (amount) print (amount)
self.balances[dest_user] = amount self.balances[dest_user] = amount
print (self.balances)
print ("^^ balance")
def get_balance(self, user): def get_balance(self, user):
""" """

View File

@ -86,102 +86,102 @@ async def test_simple_two_nodes():
await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_simple_three_nodes_line_topography(): async def test_simple_three_nodes_line_topography():
# num_nodes = 3 num_nodes = 3
# adj_map = {0: [1], 1: [2]} adj_map = {0: [1], 1: [2]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("aspyn", 10) await dummy_nodes[0].publish_set_crypto("aspyn", 10)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 10 assert dummy_node.get_balance("aspyn") == 10
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_simple_three_nodes_triangle_topography(): async def test_simple_three_nodes_triangle_topography():
# num_nodes = 3 num_nodes = 3
# adj_map = {0: [1, 2], 1: [2]} adj_map = {0: [1, 2], 1: [2]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("aspyn", 20) await dummy_nodes[0].publish_set_crypto("aspyn", 20)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 20 assert dummy_node.get_balance("aspyn") == 20
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_simple_seven_nodes_tree_topography(): async def test_simple_seven_nodes_tree_topography():
# num_nodes = 7 num_nodes = 7
# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("aspyn", 20) await dummy_nodes[0].publish_set_crypto("aspyn", 20)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 20 assert dummy_node.get_balance("aspyn") == 20
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_set_then_send_from_root_seven_nodes_tree_topography(): async def test_set_then_send_from_root_seven_nodes_tree_topography():
# num_nodes = 7 num_nodes = 7
# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("aspyn", 20) await dummy_nodes[0].publish_set_crypto("aspyn", 20)
# await asyncio.sleep(0.25) await asyncio.sleep(0.25)
# await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 15 assert dummy_node.get_balance("aspyn") == 15
# assert dummy_node.get_balance("alex") == 5 assert dummy_node.get_balance("alex") == 5
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography():
# num_nodes = 7 num_nodes = 7
# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[6].publish_set_crypto("aspyn", 20) await dummy_nodes[6].publish_set_crypto("aspyn", 20)
# await asyncio.sleep(0.25) await asyncio.sleep(0.25)
# await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 15 assert dummy_node.get_balance("aspyn") == 15
# assert dummy_node.get_balance("alex") == 5 assert dummy_node.get_balance("alex") == 5
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_simple_five_nodes_ring_topography(): async def test_simple_five_nodes_ring_topography():
# num_nodes = 5 num_nodes = 5
# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("aspyn", 20) await dummy_nodes[0].publish_set_crypto("aspyn", 20)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("aspyn") == 20 assert dummy_node.get_balance("aspyn") == 20
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
# @pytest.mark.asyncio @pytest.mark.asyncio
# async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography():
# num_nodes = 5 num_nodes = 5
# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
# async def action_func(dummy_nodes): async def action_func(dummy_nodes):
# await dummy_nodes[0].publish_set_crypto("alex", 20) await dummy_nodes[0].publish_set_crypto("alex", 20)
# await asyncio.sleep(0.25) await asyncio.sleep(0.25)
# await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) await dummy_nodes[3].publish_send_crypto("alex", "rob", 12)
# def assertion_func(dummy_node): def assertion_func(dummy_node):
# assert dummy_node.get_balance("alex") == 8 assert dummy_node.get_balance("alex") == 8
# assert dummy_node.get_balance("rob") == 12 assert dummy_node.get_balance("rob") == 12
# await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)

View File

@ -1,15 +1,14 @@
import asyncio import asyncio
import uuid
import multiaddr import multiaddr
import pytest import pytest
from tests.utils import cleanup from tests.utils import cleanup
from libp2p import new_node from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import create_message_talk
from libp2p.pubsub.message import generate_message_id
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@ -22,7 +21,7 @@ async def connect(node1, node2):
await node1.connect(info) await node1.connect(info)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes(): async def test_simple_two_nodes_RPC():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
@ -45,71 +44,77 @@ async def test_simple_two_nodes():
node_a_id = str(node_a.get_id()) node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a_id, msg.SerializeToString())
await floodsub_a.publish(node_a.get_id(), msg.to_str()) print ("MESSAGE B")
print (msg.SerializeToString())
print ("=========")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
res_b = await qb.get() res_b = await qb.get()
print ("RES B")
print (res_b)
print ("-----")
# Check that the msg received by node_b is the same # Check that the msg received by node_b is the same
# as the message sent by node_a # as the message sent by node_a
assert res_b == msg.to_str() assert res_b.SerializeToString() == msg.publish[0].SerializeToString()
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
@pytest.mark.asyncio # @pytest.mark.asyncio
async def test_simple_three_nodes(): # async def test_simple_three_nodes():
# Want to pass message from A -> B -> C # # Want to pass message from A -> B -> C
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) # node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) # node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) # node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) # await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) # await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) # await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
supported_protocols = ["/floodsub/1.0.0"] # supported_protocols = ["/floodsub/1.0.0"]
floodsub_a = FloodSub(supported_protocols) # floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a") # pubsub_a = Pubsub(node_a, floodsub_a, "a")
floodsub_b = FloodSub(supported_protocols) # floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b") # pubsub_b = Pubsub(node_b, floodsub_b, "b")
floodsub_c = FloodSub(supported_protocols) # floodsub_c = FloodSub(supported_protocols)
pubsub_c = Pubsub(node_c, floodsub_c, "c") # pubsub_c = Pubsub(node_c, floodsub_c, "c")
await connect(node_a, node_b) # await connect(node_a, node_b)
await connect(node_b, node_c) # await connect(node_b, node_c)
await asyncio.sleep(0.25) # await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic") # qb = await pubsub_b.subscribe("my_topic")
qc = await pubsub_c.subscribe("my_topic") # qc = await pubsub_c.subscribe("my_topic")
await asyncio.sleep(0.25) # await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id()) # node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) # msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a.get_id(), msg.to_str()) # await floodsub_a.publish(node_a.get_id(), msg.to_str())
await asyncio.sleep(0.25) # await asyncio.sleep(0.25)
res_b = await qb.get() # res_b = await qb.get()
res_c = await qc.get() # res_c = await qc.get()
# Check that the msg received by node_b is the same # # Check that the msg received by node_b is the same
# as the message sent by node_a # # as the message sent by node_a
assert res_b == msg.to_str() # assert res_b == msg.to_str()
# res_c should match original msg but with b as sender # # res_c should match original msg but with b as sender
node_b_id = str(node_b.get_id()) # node_b_id = str(node_b.get_id())
msg.from_id = node_b_id # msg.from_id = node_b_id
assert res_c == msg.to_str() # assert res_c == msg.to_str()
# Success, terminate pending tasks. # # Success, terminate pending tasks.
await cleanup() # await cleanup()
async def perform_test_from_obj(obj): async def perform_test_from_obj(obj):
""" """
@ -237,11 +242,14 @@ async def perform_test_from_obj(obj):
actual_node_id = str(node_map[node_id].get_id()) actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message # Create correctly formatted message
msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id()) msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id())
print ("**TEST FLOODSUB** MESSAGE TALK")
print (msg_talk)
# Publish message # Publish message
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()))) tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\
actual_node_id, msg_talk.SerializeToString())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list # For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before # TODO: Update message sender to be correct message sender before
@ -261,12 +269,20 @@ async def perform_test_from_obj(obj):
for node_id in topic_map[topic]: for node_id in topic_map[topic]:
# Get message from subscription queue # Get message from subscription queue
msg_on_node_str = await queues_map[node_id][topic].get() msg_on_node_str = await queues_map[node_id][topic].get()
msg_on_node = create_message_talk(msg_on_node_str)
print ("MESSAGE ON NODE STR")
print (msg_on_node_str)
print ("ACTUAL MESSSSAGE")
print (actual_msg)
assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString()
# msg_on_node = create_message_talk(msg_on_node_str)
# Perform checks # Perform checks
assert actual_msg.origin_id == msg_on_node.origin_id # assert actual_msg.origin_id == msg_on_node.origin_id
assert actual_msg.topics == msg_on_node.topics # assert actual_msg.topics == msg_on_node.topics
assert actual_msg.data == msg_on_node.data # assert actual_msg.data == msg_on_node.data
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
@ -484,3 +500,30 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
] ]
} }
await perform_test_from_obj(test_obj) await perform_test_from_obj(test_obj)
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())
def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
packet = rpc_pb2.RPC()
message = rpc_pb2.Message(
from_id=origin_id.encode('utf-8'),
seqno=msg_id.encode('utf-8'),
data=msg_content.encode('utf-8'),
)
for topic in topics:
message.topicIDs.extend([topic.encode('utf-8')])
# for topic in topics:
# message.topicIDs.extend([topic.encode('utf-8')])
# packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
# subscribe=True,
# topicid = topic.encode('utf-8')
# )])
packet.publish.extend([message])
return packet