From 0238dff2173600a109cbbe73a4725e7104360e7e Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 2 Apr 2019 21:17:48 -0400 Subject: [PATCH 01/10] remove unused code --- libp2p/pubsub/pubsub.py | 12 ------------ tests/pubsub/test_floodsub.py | 2 -- 2 files changed, 14 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bfad873c..bee5fba5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,5 +1,4 @@ import asyncio -import uuid from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -202,10 +201,6 @@ class Pubsub(): # Create subscribe message packet = rpc_pb2.RPC() - # packet.publish.extend([rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid=topic_id.encode('utf-8') @@ -255,10 +250,3 @@ class Pubsub(): # Write message to stream await stream.write(rpc_msg) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 4fc059bc..272af79e 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -54,8 +54,6 @@ async def test_simple_two_nodes_RPC(): # as the message sent by node_a assert res_b.SerializeToString() == msg.publish[0].SerializeToString() - - # Success, terminate pending tasks. await cleanup() From 3a52d29cb784d332045349c26f6c5a8d214752d2 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 3 Apr 2019 14:05:37 -0400 Subject: [PATCH 02/10] remove redundant proto file --- rpc.proto | 78 ------------------------------------------------------- 1 file changed, 78 deletions(-) delete mode 100644 rpc.proto diff --git a/rpc.proto b/rpc.proto deleted file mode 100644 index 2ae2ef2a..00000000 --- a/rpc.proto +++ /dev/null @@ -1,78 +0,0 @@ -// Borrowed from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto - -syntax = "proto2"; - -package pubsub.pb; - -message RPC { - repeated SubOpts subscriptions = 1; - repeated Message publish = 2; - - message SubOpts { - optional bool subscribe = 1; // subscribe or unsubcribe - optional string topicid = 2; - } - - optional ControlMessage control = 3; -} - -message Message { - optional bytes from = 1; - optional bytes data = 2; - optional bytes seqno = 3; - repeated string topicIDs = 4; - optional bytes signature = 5; - optional bytes key = 6; -} - -message ControlMessage { - repeated ControlIHave ihave = 1; - repeated ControlIWant iwant = 2; - repeated ControlGraft graft = 3; - repeated ControlPrune prune = 4; -} - -message ControlIHave { - optional string topicID = 1; - repeated string messageIDs = 2; -} - -message ControlIWant { - repeated string messageIDs = 1; -} - -message ControlGraft { - optional string topicID = 1; -} - -message ControlPrune { - optional string topicID = 1; -} - -message TopicDescriptor { - optional string name = 1; - optional AuthOpts auth = 2; - optional EncOpts enc = 3; - - message AuthOpts { - optional AuthMode mode = 1; - repeated bytes keys = 2; // root keys to trust - - enum AuthMode { - NONE = 0; // no authentication, anyone can publish - KEY = 1; // only messages signed by keys in the topic descriptor are accepted - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } - - message EncOpts { - optional EncMode mode = 1; - repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) - - enum EncMode { - NONE = 0; // no encryption, anyone can read - SHAREDKEY = 1; // messages are encrypted with shared key - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } -} \ No newline at end of file From 225bd390dfc154a11238460097152a09b0150673 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 3 Apr 2019 14:08:05 -0400 Subject: [PATCH 03/10] add source to rpc.proto --- libp2p/pubsub/pb/rpc.proto | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index f07da200..df38bad4 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -1,3 +1,5 @@ +// Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto + syntax = "proto2"; package pubsub.pb; From f6299c7dee33cfcb72c42e2c786f0fd6bd54ba9a Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Tue, 2 Apr 2019 22:05:14 -0400 Subject: [PATCH 04/10] Add priority queues to handle seqno --- libp2p/pubsub/pubsub.py | 14 ++++++++------ libp2p/pubsub/read_only_queue.py | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) create mode 100644 libp2p/pubsub/read_only_queue.py diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bee5fba5..5f018d7e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -2,6 +2,7 @@ import asyncio from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee +from .read_only_queue import ReadOnlyQueue class Pubsub(): @@ -187,8 +188,9 @@ class Pubsub(): if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue - # for each topic - await self.my_topics[topic].put(publish_message) + # for each topic with priority being the message's seqno. + # Note: asyncio.PriorityQueue item format is (priority, data) + await self.my_topics[topic].put((publish_message.seqno, publish_message)) async def subscribe(self, topic_id): """ @@ -196,8 +198,8 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ - # Map topic_id to blocking queue - self.my_topics[topic_id] = asyncio.Queue() + # Map topic_id to a priority blocking queue + self.my_topics[topic_id] = asyncio.PriorityQueue() # Create subscribe message packet = rpc_pb2.RPC() @@ -212,8 +214,8 @@ class Pubsub(): # Tell router we are joining this topic self.router.join(topic_id) - # Return the asyncio queue for messages on this topic - return self.my_topics[topic_id] + # Return the readonly queue for messages on this topic + return ReadOnlyQueue(self.my_topics[topic_id]) async def unsubscribe(self, topic_id): """ diff --git a/libp2p/pubsub/read_only_queue.py b/libp2p/pubsub/read_only_queue.py new file mode 100644 index 00000000..16567685 --- /dev/null +++ b/libp2p/pubsub/read_only_queue.py @@ -0,0 +1,14 @@ +import asyncio + +class ReadOnlyQueue(): + + def __init__(self, queue): + self.queue = queue + + async def get(self): + """ + Get the next item from queue, which has items in the format (priority, data) + :return: next item from the queue + """ + return (await self.queue.get())[1] + From c2b538936224b18169a036ca5ee46b64d0a027ea Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Tue, 2 Apr 2019 22:05:32 -0400 Subject: [PATCH 05/10] Adjust floodsub tests for new seqno util --- tests/pubsub/test_floodsub.py | 11 ++++++++--- tests/pubsub/utils.py | 22 ++++++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 272af79e..4e012379 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -8,7 +8,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet # pylint: disable=too-many-locals @@ -44,7 +44,8 @@ async def test_simple_two_nodes_RPC(): node_a_id = str(node_a.get_id()) - msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) + next_msg_id_func = message_id_generator(0) + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func()) await floodsub_a.publish(node_a_id, msg.SerializeToString()) await asyncio.sleep(0.25) @@ -173,6 +174,8 @@ async def perform_test_from_obj(obj): topics_in_msgs_ordered = [] messages = obj["messages"] tasks_publish = [] + next_msg_id_func = message_id_generator(0) + for msg in messages: topics = msg["topics"] @@ -183,7 +186,7 @@ async def perform_test_from_obj(obj): actual_node_id = str(node_map[node_id].get_id()) # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) + msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) @@ -205,6 +208,8 @@ async def perform_test_from_obj(obj): # TODO: Check message sender too for i in range(len(topics_in_msgs_ordered)): topic, actual_msg = topics_in_msgs_ordered[i] + + # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index d4695d7d..8aa3f882 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,12 +1,26 @@ import uuid +import struct from libp2p.pubsub.pb import rpc_pb2 -def generate_message_id(): + +def message_id_generator(start_val): """ Generate a unique message id - :return: messgae id + :param start_val: value to start generating messages at + :return: message id """ - return str(uuid.uuid1()) + val = start_val + def generator(): + # Allow manipulation of val within closure + nonlocal val + + # Increment id + val += 1 + + # Convert val to big endian + return struct.pack('>I', val) + + return generator def generate_RPC_packet(origin_id, topics, msg_content, msg_id): """ @@ -19,7 +33,7 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), + seqno=msg_id, data=msg_content.encode('utf-8'), ) From 9d16aa834d3220d85b6d087d85e2da4e555536a9 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Tue, 2 Apr 2019 22:34:01 -0400 Subject: [PATCH 06/10] Modify pubsub to have seen message check incorporate seqno and node id --- libp2p/pubsub/pubsub.py | 16 +++++++--------- libp2p/pubsub/read_only_queue.py | 14 -------------- tests/pubsub/dummy_account_node.py | 13 +++++++++---- 3 files changed, 16 insertions(+), 27 deletions(-) delete mode 100644 libp2p/pubsub/read_only_queue.py diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f018d7e..aec04dee 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -2,7 +2,6 @@ import asyncio from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .read_only_queue import ReadOnlyQueue class Pubsub(): @@ -89,7 +88,7 @@ class Pubsub(): for message in rpc_incoming.publish: if message.seqno not in self.seen_messages: should_publish = True - self.seen_messages.append(message.seqno) + self.seen_messages.append((message.seqno, message.from_id)) await self.handle_talk(message) if rpc_incoming.subscriptions: @@ -188,9 +187,8 @@ class Pubsub(): if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue - # for each topic with priority being the message's seqno. - # Note: asyncio.PriorityQueue item format is (priority, data) - await self.my_topics[topic].put((publish_message.seqno, publish_message)) + # for each topic + await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id): """ @@ -198,8 +196,8 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ - # Map topic_id to a priority blocking queue - self.my_topics[topic_id] = asyncio.PriorityQueue() + # Map topic_id to blocking queue + self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message packet = rpc_pb2.RPC() @@ -214,8 +212,8 @@ class Pubsub(): # Tell router we are joining this topic self.router.join(topic_id) - # Return the readonly queue for messages on this topic - return ReadOnlyQueue(self.my_topics[topic_id]) + # Return the asyncio queue for messages on this topic + return self.my_topics[topic_id] async def unsubscribe(self, topic_id): """ diff --git a/libp2p/pubsub/read_only_queue.py b/libp2p/pubsub/read_only_queue.py deleted file mode 100644 index 16567685..00000000 --- a/libp2p/pubsub/read_only_queue.py +++ /dev/null @@ -1,14 +0,0 @@ -import asyncio - -class ReadOnlyQueue(): - - def __init__(self, queue): - self.queue = queue - - async def get(self): - """ - Get the next item from queue, which has items in the format (priority, data) - :return: next item from the queue - """ - return (await self.queue.get())[1] - diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 05a02bf9..8fb7310c 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,7 +1,8 @@ import asyncio import multiaddr +import uuid -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub @@ -25,6 +26,8 @@ class DummyAccountNode(): def __init__(self): self.balances = {} + self.next_msg_id_func = message_id_generator(0) + self.node_id = str(uuid.uuid1()) @classmethod async def create(cls): @@ -51,7 +54,7 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - incoming = await self.q.get() + incoming = await self.q.get() msg_comps = incoming.data.decode('utf-8').split(",") if msg_comps[0] == "send": @@ -77,7 +80,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) async def publish_set_crypto(self, user, amount): @@ -88,7 +91,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) @@ -99,6 +102,7 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ + print("handle send " + self.node_id) if source_user in self.balances: self.balances[source_user] -= amount else: @@ -115,6 +119,7 @@ class DummyAccountNode(): :param dest_user: user to set crypto for :param amount: amount of crypto """ + print("handle set " + self.node_id) self.balances[dest_user] = amount def get_balance(self, user): From feaa393c5ff1201a67c2fb54bf04b9c0922e2c98 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Wed, 3 Apr 2019 00:34:39 -0400 Subject: [PATCH 07/10] Fix seen messages bug --- libp2p/pubsub/pubsub.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index aec04dee..40c4036e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -86,9 +86,10 @@ class Pubsub(): if rpc_incoming.publish: # deal with RPC.publish for message in rpc_incoming.publish: - if message.seqno not in self.seen_messages: + id_in_seen_msgs = (message.seqno, message.from_id) + if id_in_seen_msgs not in self.seen_messages: should_publish = True - self.seen_messages.append((message.seqno, message.from_id)) + self.seen_messages.append(id_in_seen_msgs) await self.handle_talk(message) if rpc_incoming.subscriptions: From 211d6f6860bbf3df2f9e70f59ba04ff07b138947 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Wed, 3 Apr 2019 00:35:39 -0400 Subject: [PATCH 08/10] Add dummy node test --- tests/pubsub/test_dummyaccount_demo.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index 1f08c8d6..9fa2aa7b 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -185,3 +185,28 @@ async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): assert dummy_node.get_balance("rob") == 12 await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_set_then_send_from_five_diff_nodes_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("alex", 20) + await asyncio.sleep(1) + await dummy_nodes[1].publish_send_crypto("alex", "rob", 3) + await asyncio.sleep(1) + await dummy_nodes[2].publish_send_crypto("rob", "aspyn", 2) + await asyncio.sleep(1) + await dummy_nodes[3].publish_send_crypto("aspyn", "zx", 1) + await asyncio.sleep(1) + await dummy_nodes[4].publish_send_crypto("zx", "raul", 1) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("alex") == 17 + assert dummy_node.get_balance("rob") == 1 + assert dummy_node.get_balance("aspyn") == 1 + assert dummy_node.get_balance("zx") == 0 + assert dummy_node.get_balance("raul") == 1 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) From fc789280370a47206738e8f94811af775f7260da Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Wed, 3 Apr 2019 14:17:33 -0400 Subject: [PATCH 09/10] Add test for multiple messages from two origins --- tests/pubsub/test_floodsub.py | 54 +++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 4e012379..00c68b53 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -431,3 +431,57 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): ] } await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3", "4"], + "2": ["1", "3", "4"], + "3": ["1", "2", "4"], + "4": ["1", "2", "3"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) From 22e503a2606ba4f3cf7212908301fcd74970be34 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Wed, 3 Apr 2019 14:20:50 -0400 Subject: [PATCH 10/10] Add test for ring topology multiple messages from two origins --- tests/pubsub/test_floodsub.py | 55 +++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 00c68b53..69b2f8cc 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -485,3 +485,58 @@ async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): ] } await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2"], + "2": ["3"], + "3": ["4"], + "4": ["5"], + "5": ["1"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5"], + "school": ["1", "2", "3", "4", "5"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) \ No newline at end of file