diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 68e542c5..a4f2e86c 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,7 +1,9 @@ +from .pb import rpc_pb2 from .pubsub_router_interface import IPubsubRouter -from .message import create_message_talk + class FloodSub(IPubsubRouter): + # pylint: disable=no-member def __init__(self, protocols): self.protocols = protocols @@ -40,7 +42,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id, message): + async def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -52,35 +54,33 @@ class FloodSub(IPubsubRouter): It also never forwards a message back to the source or the peer that forwarded the message. :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: pubsub message in RPC string format """ - - # Encode message - encoded_msg = message.encode() - - # Get message sender, origin, and topics - msg_talk = create_message_talk(message) + packet = rpc_pb2.RPC() + packet.ParseFromString(rpc_message) msg_sender = str(sender_peer_id) - msg_origin = msg_talk.origin_id - topics = msg_talk.topics - # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message - if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(message) + for message in packet.publish: + decoded_from_id = message.from_id.decode('utf-8') + if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): + await self.pubsub.handle_talk(message) - # Deliver to self and peers - for topic in topics: - if topic in self.pubsub.peer_topics: - for peer_id_in_topic in self.pubsub.peer_topics[topic]: - # Forward to all known peers in the topic that are not the - # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, msg_origin): - stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(encoded_msg) - else: - # Implies publish did not write - print("publish did not write") + # Deliver to self and peers + for topic in message.topicIDs: + if topic in self.pubsub.peer_topics: + for peer_id_in_topic in self.pubsub.peer_topics[topic]: + # Forward to all known peers in the topic that are not the + # message sender and are not the message origin + if peer_id_in_topic not in (msg_sender, decoded_from_id): + stream = self.pubsub.peers[peer_id_in_topic] + # create new packet with just publish message + new_packet = rpc_pb2.RPC() + new_packet.publish.extend([message]) + await stream.write(new_packet.SerializeToString()) + else: + # Implies publish did not write + print("publish did not write") def join(self, topic): """ diff --git a/libp2p/pubsub/message.py b/libp2p/pubsub/message.py deleted file mode 100644 index 2f839dc9..00000000 --- a/libp2p/pubsub/message.py +++ /dev/null @@ -1,118 +0,0 @@ -import uuid - - -class MessageTalk(): - - """ - Object to make parsing talk messages easier, where talk messages are - defined as custom messages published to a set of topics - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, topics, data, message_id): - # pylint: disable=too-many-arguments - self.msg_type = "talk" - self.from_id = from_id - self.origin_id = origin_id - self.topics = topics - self.data = data - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageTalk object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id + '\n' - for i in range(len(self.topics)): - out += self.topics[i] - if i < len(self.topics) - 1: - out += ',' - out += '\n' + self.data - return out - - -class MessageSub(): - """ - Object to make parsing subscription messages easier, where subscription - messages are defined as indicating the topics a node wishes to subscribe to - or unsubscribe from - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, subs_map, message_id): - self.msg_type = "subscription" - self.from_id = from_id - self.origin_id = origin_id - self.subs_map = subs_map - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageSub object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id - - if self.subs_map: - out += '\n' - - keys = list(self.subs_map) - - for i, topic in enumerate(keys): - sub = self.subs_map[topic] - if sub: - out += "sub:" - else: - out += "unsub:" - out += topic - if i < len(keys) - 1: - out += '\n' - - return out - -def create_message_talk(msg_talk_as_str): - """ - Create a MessageTalk object from a MessageTalk string representation - :param msg_talk_as_str: a MessageTalk object in its string representation - :return: MessageTalk object - """ - msg_comps = msg_talk_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - topics = msg_comps[4].split(',') - data = msg_comps[5] - return MessageTalk(from_id, origin_id, topics, data, message_id) - -def create_message_sub(msg_sub_as_str): - """ - Create a MessageSub object from a MessageSub string representation - :param msg_talk_as_str: a MessageSub object in its string representation - :return: MessageSub object - """ - msg_comps = msg_sub_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - - subs_map = {} - for i in range(4, len(msg_comps)): - sub_comps = msg_comps[i].split(":") - topic = sub_comps[1] - if sub_comps[0] == "sub": - subs_map[topic] = True - else: - subs_map[topic] = False - return MessageSub(from_id, origin_id, subs_map, message_id) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto new file mode 100644 index 00000000..df38bad4 --- /dev/null +++ b/libp2p/pubsub/pb/rpc.proto @@ -0,0 +1,78 @@ +// Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto + +syntax = "proto2"; + +package pubsub.pb; + +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicid = 2; + } + + optional ControlMessage control = 3; +} + +message Message { + optional bytes from_id = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; +} + +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topicID = 1; + repeated string messageIDs = 2; +} + +message ControlIWant { + repeated string messageIDs = 1; +} + +message ControlGraft { + optional string topicID = 1; +} + +message ControlPrune { + optional string topicID = 1; +} + +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +} \ No newline at end of file diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py new file mode 100644 index 00000000..d3157822 --- /dev/null +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -0,0 +1,638 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: rpc.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='rpc.proto', + package='pubsub.pb', + syntax='proto2', + serialized_options=None, + serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +) + + + +_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor( + name='AuthMode', + full_name='pubsub.pb.TopicDescriptor.AuthOpts.AuthMode', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='NONE', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='KEY', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='WOT', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=868, + serialized_end=906, +) +_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) + +_TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor( + name='EncMode', + full_name='pubsub.pb.TopicDescriptor.EncOpts.EncMode', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='NONE', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SHAREDKEY', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='WOT', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=997, + serialized_end=1040, +) +_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) + + +_RPC_SUBOPTS = _descriptor.Descriptor( + name='SubOpts', + full_name='pubsub.pb.RPC.SubOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='subscribe', full_name='pubsub.pb.RPC.SubOpts.subscribe', index=0, + number=1, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=160, + serialized_end=205, +) + +_RPC = _descriptor.Descriptor( + name='RPC', + full_name='pubsub.pb.RPC', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='subscriptions', full_name='pubsub.pb.RPC.subscriptions', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='publish', full_name='pubsub.pb.RPC.publish', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='control', full_name='pubsub.pb.RPC.control', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_RPC_SUBOPTS, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=25, + serialized_end=205, +) + + +_MESSAGE = _descriptor.Descriptor( + name='Message', + full_name='pubsub.pb.Message', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='from_id', full_name='pubsub.pb.Message.from_id', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='data', full_name='pubsub.pb.Message.data', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='seqno', full_name='pubsub.pb.Message.seqno', index=2, + number=3, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3, + number=4, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='signature', full_name='pubsub.pb.Message.signature', index=4, + number=5, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='key', full_name='pubsub.pb.Message.key', index=5, + number=6, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=207, + serialized_end=312, +) + + +_CONTROLMESSAGE = _descriptor.Descriptor( + name='ControlMessage', + full_name='pubsub.pb.ControlMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ihave', full_name='pubsub.pb.ControlMessage.ihave', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3, + number=4, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=315, + serialized_end=491, +) + + +_CONTROLIHAVE = _descriptor.Descriptor( + name='ControlIHave', + full_name='pubsub.pb.ControlIHave', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlIHave.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1, + number=2, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=493, + serialized_end=544, +) + + +_CONTROLIWANT = _descriptor.Descriptor( + name='ControlIWant', + full_name='pubsub.pb.ControlIWant', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='messageIDs', full_name='pubsub.pb.ControlIWant.messageIDs', index=0, + number=1, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=546, + serialized_end=580, +) + + +_CONTROLGRAFT = _descriptor.Descriptor( + name='ControlGraft', + full_name='pubsub.pb.ControlGraft', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlGraft.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=582, + serialized_end=613, +) + + +_CONTROLPRUNE = _descriptor.Descriptor( + name='ControlPrune', + full_name='pubsub.pb.ControlPrune', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlPrune.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=615, + serialized_end=646, +) + + +_TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( + name='AuthOpts', + full_name='pubsub.pb.TopicDescriptor.AuthOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='mode', full_name='pubsub.pb.TopicDescriptor.AuthOpts.mode', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1, + number=2, type=12, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=782, + serialized_end=906, +) + +_TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( + name='EncOpts', + full_name='pubsub.pb.TopicDescriptor.EncOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='mode', full_name='pubsub.pb.TopicDescriptor.EncOpts.mode', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1, + number=2, type=12, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=909, + serialized_end=1040, +) + +_TOPICDESCRIPTOR = _descriptor.Descriptor( + name='TopicDescriptor', + full_name='pubsub.pb.TopicDescriptor', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='pubsub.pb.TopicDescriptor.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=649, + serialized_end=1040, +) + +_RPC_SUBOPTS.containing_type = _RPC +_RPC.fields_by_name['subscriptions'].message_type = _RPC_SUBOPTS +_RPC.fields_by_name['publish'].message_type = _MESSAGE +_RPC.fields_by_name['control'].message_type = _CONTROLMESSAGE +_CONTROLMESSAGE.fields_by_name['ihave'].message_type = _CONTROLIHAVE +_CONTROLMESSAGE.fields_by_name['iwant'].message_type = _CONTROLIWANT +_CONTROLMESSAGE.fields_by_name['graft'].message_type = _CONTROLGRAFT +_CONTROLMESSAGE.fields_by_name['prune'].message_type = _CONTROLPRUNE +_TOPICDESCRIPTOR_AUTHOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE +_TOPICDESCRIPTOR_AUTHOPTS.containing_type = _TOPICDESCRIPTOR +_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE.containing_type = _TOPICDESCRIPTOR_AUTHOPTS +_TOPICDESCRIPTOR_ENCOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_ENCOPTS_ENCMODE +_TOPICDESCRIPTOR_ENCOPTS.containing_type = _TOPICDESCRIPTOR +_TOPICDESCRIPTOR_ENCOPTS_ENCMODE.containing_type = _TOPICDESCRIPTOR_ENCOPTS +_TOPICDESCRIPTOR.fields_by_name['auth'].message_type = _TOPICDESCRIPTOR_AUTHOPTS +_TOPICDESCRIPTOR.fields_by_name['enc'].message_type = _TOPICDESCRIPTOR_ENCOPTS +DESCRIPTOR.message_types_by_name['RPC'] = _RPC +DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE +DESCRIPTOR.message_types_by_name['ControlMessage'] = _CONTROLMESSAGE +DESCRIPTOR.message_types_by_name['ControlIHave'] = _CONTROLIHAVE +DESCRIPTOR.message_types_by_name['ControlIWant'] = _CONTROLIWANT +DESCRIPTOR.message_types_by_name['ControlGraft'] = _CONTROLGRAFT +DESCRIPTOR.message_types_by_name['ControlPrune'] = _CONTROLPRUNE +DESCRIPTOR.message_types_by_name['TopicDescriptor'] = _TOPICDESCRIPTOR +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict( + + SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict( + DESCRIPTOR = _RPC_SUBOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts) + )) + , + DESCRIPTOR = _RPC, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.RPC) + )) +_sym_db.RegisterMessage(RPC) +_sym_db.RegisterMessage(RPC.SubOpts) + +Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( + DESCRIPTOR = _MESSAGE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.Message) + )) +_sym_db.RegisterMessage(Message) + +ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict( + DESCRIPTOR = _CONTROLMESSAGE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage) + )) +_sym_db.RegisterMessage(ControlMessage) + +ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict( + DESCRIPTOR = _CONTROLIHAVE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave) + )) +_sym_db.RegisterMessage(ControlIHave) + +ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict( + DESCRIPTOR = _CONTROLIWANT, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant) + )) +_sym_db.RegisterMessage(ControlIWant) + +ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict( + DESCRIPTOR = _CONTROLGRAFT, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft) + )) +_sym_db.RegisterMessage(ControlGraft) + +ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict( + DESCRIPTOR = _CONTROLPRUNE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune) + )) +_sym_db.RegisterMessage(ControlPrune) + +TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_message.Message,), dict( + + AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict( + DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts) + )) + , + + EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict( + DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts) + )) + , + DESCRIPTOR = _TOPICDESCRIPTOR, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor) + )) +_sym_db.RegisterMessage(TopicDescriptor) +_sym_db.RegisterMessage(TopicDescriptor.AuthOpts) +_sym_db.RegisterMessage(TopicDescriptor.EncOpts) + + +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2_grpc.py b/libp2p/pubsub/pb/rpc_pb2_grpc.py new file mode 100644 index 00000000..a8943526 --- /dev/null +++ b/libp2p/pubsub/pb/rpc_pb2_grpc.py @@ -0,0 +1,3 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 9bd072fe..bee5fba5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,44 +1,11 @@ import asyncio +from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .message import MessageSub -from .message import create_message_talk, create_message_sub -from. message import generate_message_id class Pubsub(): - """ - For now, because I'm on a plane and don't have access to the go repo/protobuf stuff, - this is going to be the message format for the two types: subscription and talk - subscription indicates subscribing or unsubscribing from a topic - talk is sending a message on topic(s) - subscription format: - subscription - 'from' - :'topicid' - :'topicid' - ... - Ex. - subscription - msg_sender_peer_id - origin_peer_id - sub:topic1 - sub:topic2 - unsub:fav_topic - talk format: - talk - 'from' - 'origin' - [topic_ids comma-delimited] - 'data' - Ex. - talk - msg_sender_peer_id - origin_peer_id - topic1,topics_are_cool,foo - I like tacos - """ - # pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-instance-attributes, no-member def __init__(self, host, router, my_id): """ @@ -89,60 +56,54 @@ class Pubsub(): def get_hello_packet(self): """ Generate subscription message with all topics we are subscribed to + only send hello packet if we have subscribed topics """ - subs_map = {} - for topic in self.my_topics: - subs_map[topic] = True - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), subs_map, generate_message_id()\ - ) - return sub_msg.to_str() + packet = rpc_pb2.RPC() + if self.my_topics: + for topic_id in self.my_topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, topicid=topic_id)]) - async def continously_read_stream(self, stream): + return packet.SerializeToString() + + async def continuously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process - messages from other nodes, which for now are considered MessageTalk - and MessageSub messages. - TODO: Handle RPC messages instead of my Aspyn's own custom message format + messages from other nodes :param stream: stream to continously read from """ + + # TODO check on types here + peer_id = str(stream.mplex_conn.peer_id) + while True: - incoming = (await stream.read()).decode() - msg_comps = incoming.split('\n') - msg_type = msg_comps[0] + incoming = (await stream.read()) + rpc_incoming = rpc_pb2.RPC() + rpc_incoming.ParseFromString(incoming) - msg_sender = msg_comps[1] - # msg_origin = msg_comps[2] - msg_id = msg_comps[3] - print("HIT ME1") - if msg_id not in self.seen_messages: - print("HIT ME") - # Do stuff with incoming unseen message - should_publish = True - if msg_type == "subscription": - self.handle_subscription(incoming) + should_publish = False - # We don't need to relay the subscription to our - # peers because a given node only needs its peers - # to know that it is subscribed to the topic (doesn't - # need everyone to know) - should_publish = False - elif msg_type == "talk": - await self.handle_talk(incoming) + if rpc_incoming.publish: + # deal with RPC.publish + for message in rpc_incoming.publish: + if message.seqno not in self.seen_messages: + should_publish = True + self.seen_messages.append(message.seqno) + await self.handle_talk(message) - # Add message id to seen - self.seen_messages.append(msg_id) + if rpc_incoming.subscriptions: + # deal with RPC.subscriptions + # We don't need to relay the subscription to our + # peers because a given node only needs its peers + # to know that it is subscribed to the topic (doesn't + # need everyone to know) + for message in rpc_incoming.subscriptions: + if message.subscribe: + self.handle_subscription(peer_id, message) - # Publish message using router's publish - if should_publish: - msg = create_message_talk(incoming) - - # Adjust raw_msg to that the message sender - # is now our peer_id - msg.from_id = str(self.host.get_id()) - - await self.router.publish(msg_sender, msg.to_str()) + if should_publish: + # relay message to peers with router + await self.router.publish(peer_id, incoming) # Force context switch await asyncio.sleep(0) @@ -161,9 +122,10 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + + await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) async def handle_peer_queue(self): """ @@ -173,6 +135,7 @@ class Pubsub(): pubsub protocols we support """ while True: + peer_id = await self.peer_queue.get() # Open a stream to peer on existing connection @@ -187,69 +150,64 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) # Force context switch await asyncio.sleep(0) - def handle_subscription(self, subscription): + def handle_subscription(self, origin_id, sub_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message - :param subscription: raw data constituting a subscription message + :param origin_id: id of the peer who subscribe to the message + :param sub_message: RPC.SubOpts """ - sub_msg = create_message_sub(subscription) - if sub_msg.subs_map: - print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) - for topic_id in sub_msg.subs_map: - # Look at each subscription in the msg individually - if sub_msg.subs_map[topic_id]: - if topic_id not in self.peer_topics: - # Create topic list if it did not yet exist - self.peer_topics[topic_id] = [sub_msg.origin_id] - elif sub_msg.origin_id not in self.peer_topics[topic_id]: - # Add peer to topic - self.peer_topics[topic_id].append(sub_msg.origin_id) - else: - # TODO: Remove peer from topic - pass + if sub_message.subscribe: + if sub_message.topicid not in self.peer_topics: + self.peer_topics[sub_message.topicid] = [origin_id] + elif origin_id not in self.peer_topics[sub_message.topicid]: + # Add peer to topic + self.peer_topics[sub_message.topicid].append(origin_id) + else: + # TODO: Remove peer from topic + pass - async def handle_talk(self, talk): + async def handle_talk(self, publish_message): """ - Handle incoming Talk message from a peer. A Talk message contains some - custom message that is published on a given topic(s) - :param talk: raw data constituting a talk message + Put incoming message from a peer onto my blocking queue + :param talk: RPC.Message format """ - msg = create_message_talk(talk) # Check if this message has any topics that we are subscribed to - for topic in msg.topics: + for topic in publish_message.topicIDs: if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue # for each topic - await self.my_topics[topic].put(talk) + await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id): """ Subscribe ourself to a topic :param topic_id: topic_id to subscribe to """ + # Map topic_id to blocking queue self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), {topic_id: True}, generate_message_id()\ - ) + packet = rpc_pb2.RPC() + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, + topicid=topic_id.encode('utf-8') + )]) # Send out subscribe message to all peers - await self.message_all_peers(sub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are joining this topic self.router.join(topic_id) @@ -268,27 +226,27 @@ class Pubsub(): del self.my_topics[topic_id] # Create unsubscribe message - unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ - {topic_id: False}, generate_message_id()) + packet = rpc_pb2.RPC() + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=False, + topicid=topic_id.encode('utf-8') + )]) # Send out unsubscribe message to all peers - await self.message_all_peers(unsub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are leaving this topic self.router.leave(topic_id) - async def message_all_peers(self, raw_msg): + async def message_all_peers(self, rpc_msg): """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ - # Encode message for sending - encoded_msg = raw_msg.encode() - # Broadcast message for peer in self.peers: stream = self.peers[peer] # Write message to stream - await stream.write(encoded_msg) + await stream.write(rpc_msg) diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 727b39e1..ec5132e8 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -39,11 +39,11 @@ class IPubsubRouter(ABC): """ @abstractmethod - def publish(self, sender_peer_id, message): + def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: message to forward """ @abstractmethod diff --git a/requirements_dev.txt b/requirements_dev.txt index 4bf18f60..e3d8c0ef 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -3,3 +3,5 @@ codecov pytest-cov pytest-asyncio pylint +grpcio +grpcio-tools diff --git a/setup.py b/setup.py index abaa1c73..c44a2763 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ setuptools.setup( "base58", "pymultihash", "multiaddr", + "grpcio", + "grpcio-tools" ], packages=["libp2p"], zip_safe=False, diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index f328a6e3..05a02bf9 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,12 +1,10 @@ import asyncio import multiaddr +from utils import generate_message_id, generate_RPC_packet from libp2p import new_node -from libp2p.pubsub.message import create_message_talk from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import generate_message_id SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] CRYPTO_TOPIC = "ethereum" @@ -53,16 +51,13 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - message_raw = await self.q.get() - message = create_message_talk(message_raw) - contents = message.data - - msg_comps = contents.split(",") + incoming = await self.q.get() + msg_comps = incoming.data.decode('utf-8').split(",") if msg_comps[0] == "send": self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) elif msg_comps[0] == "set": - self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) + self.handle_set_crypto(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): """ @@ -82,8 +77,8 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + await self.floodsub.publish(my_id, packet.SerializeToString()) async def publish_set_crypto(self, user, amount): """ @@ -93,8 +88,9 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + + await self.floodsub.publish(my_id, packet.SerializeToString()) def handle_send_crypto(self, source_user, dest_user, amount): """ @@ -113,7 +109,7 @@ class DummyAccountNode(): else: self.balances[dest_user] = amount - def handle_set_crypto_for_user(self, dest_user, amount): + def handle_set_crypto(self, dest_user, amount): """ Handle incoming set_crypto message :param dest_user: user to set crypto for diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index a071fa63..1f08c8d6 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -8,8 +8,6 @@ from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import create_message_talk from dummy_account_node import DummyAccountNode # pylint: disable=too-many-locals @@ -66,7 +64,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): await action_func(dummy_nodes) # Allow time for action function to be performed (i.e. messages to propogate) - await asyncio.sleep(0.25) + await asyncio.sleep(1) # Perform assertion function for dummy_node in dummy_nodes: diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index f4a5826a..272af79e 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -5,11 +5,10 @@ import pytest from tests.utils import cleanup from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import create_message_talk -from libp2p.pubsub.message import generate_message_id +from utils import generate_message_id, generate_RPC_packet # pylint: disable=too-many-locals @@ -22,7 +21,7 @@ async def connect(node1, node2): await node1.connect(info) @pytest.mark.asyncio -async def test_simple_two_nodes(): +async def test_simple_two_nodes_RPC(): node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -45,68 +44,15 @@ async def test_simple_two_nodes(): node_a_id = str(node_a.get_id()) - msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - - await floodsub_a.publish(node_a.get_id(), msg.to_str()) - + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) + await floodsub_a.publish(node_a_id, msg.SerializeToString()) await asyncio.sleep(0.25) res_b = await qb.get() # Check that the msg received by node_b is the same # as the message sent by node_a - assert res_b == msg.to_str() - - # Success, terminate pending tasks. - await cleanup() - -@pytest.mark.asyncio -async def test_simple_three_nodes(): - # Want to pass message from A -> B -> C - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - - await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - supported_protocols = ["/floodsub/1.0.0"] - - floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, "a") - floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, "b") - floodsub_c = FloodSub(supported_protocols) - pubsub_c = Pubsub(node_c, floodsub_c, "c") - - await connect(node_a, node_b) - await connect(node_b, node_c) - - await asyncio.sleep(0.25) - qb = await pubsub_b.subscribe("my_topic") - qc = await pubsub_c.subscribe("my_topic") - await asyncio.sleep(0.25) - - node_a_id = str(node_a.get_id()) - - msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - - await floodsub_a.publish(node_a.get_id(), msg.to_str()) - - await asyncio.sleep(0.25) - res_b = await qb.get() - res_c = await qc.get() - - # Check that the msg received by node_b is the same - # as the message sent by node_a - assert res_b == msg.to_str() - - # res_c should match original msg but with b as sender - node_b_id = str(node_b.get_id()) - msg.from_id = node_b_id - - assert res_c == msg.to_str() + assert res_b.SerializeToString() == msg.publish[0].SerializeToString() # Success, terminate pending tasks. await cleanup() @@ -237,11 +183,12 @@ async def perform_test_from_obj(obj): actual_node_id = str(node_map[node_id].get_id()) # Create correctly formatted message - msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id()) - + msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) + # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) - tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()))) + tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ + actual_node_id, msg_talk.SerializeToString()))) # For each topic in topics, add topic, msg_talk tuple to ordered test list # TODO: Update message sender to be correct message sender before @@ -261,12 +208,7 @@ async def perform_test_from_obj(obj): for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() - msg_on_node = create_message_talk(msg_on_node_str) - - # Perform checks - assert actual_msg.origin_id == msg_on_node.origin_id - assert actual_msg.topics == msg_on_node.topics - assert actual_msg.data == msg_on_node.data + assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() # Success, terminate pending tasks. await cleanup() diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py new file mode 100644 index 00000000..d4695d7d --- /dev/null +++ b/tests/pubsub/utils.py @@ -0,0 +1,30 @@ +import uuid +from libp2p.pubsub.pb import rpc_pb2 + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + """ + Generate RPC packet to send over wire + :param origin_id: peer id of the message origin + :param topics: list of topics + :param msg_content: string of content in data + :param msg_id: seqno for the message + """ + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + ) + + for topic in topics: + message.topicIDs.extend([topic.encode('utf-8')]) + + packet.publish.extend([message]) + return packet