Merge pull request #141 from libp2p/floodsub-rpc

Floodsub RPC
This commit is contained in:
ZX
2019-04-03 14:15:32 -04:00
committed by GitHub
13 changed files with 880 additions and 351 deletions

View File

@ -1,7 +1,9 @@
from .pb import rpc_pb2
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
from .message import create_message_talk
class FloodSub(IPubsubRouter): class FloodSub(IPubsubRouter):
# pylint: disable=no-member
def __init__(self, protocols): def __init__(self, protocols):
self.protocols = protocols self.protocols = protocols
@ -40,7 +42,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, sender_peer_id, message): async def publish(self, sender_peer_id, rpc_message):
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens
@ -52,35 +54,33 @@ class FloodSub(IPubsubRouter):
It also never forwards a message back to the source It also never forwards a message back to the source
or the peer that forwarded the message. or the peer that forwarded the message.
:param sender_peer_id: peer_id of message sender :param sender_peer_id: peer_id of message sender
:param message: message to forward :param rpc_message: pubsub message in RPC string format
""" """
packet = rpc_pb2.RPC()
# Encode message packet.ParseFromString(rpc_message)
encoded_msg = message.encode()
# Get message sender, origin, and topics
msg_talk = create_message_talk(message)
msg_sender = str(sender_peer_id) msg_sender = str(sender_peer_id)
msg_origin = msg_talk.origin_id
topics = msg_talk.topics
# Deliver to self if self was origin # Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message # Note: handle_talk checks if self is subscribed to topics in message
if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()): for message in packet.publish:
await self.pubsub.handle_talk(message) decoded_from_id = message.from_id.decode('utf-8')
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()):
await self.pubsub.handle_talk(message)
# Deliver to self and peers # Deliver to self and peers
for topic in topics: for topic in message.topicIDs:
if topic in self.pubsub.peer_topics: if topic in self.pubsub.peer_topics:
for peer_id_in_topic in self.pubsub.peer_topics[topic]: for peer_id_in_topic in self.pubsub.peer_topics[topic]:
# Forward to all known peers in the topic that are not the # Forward to all known peers in the topic that are not the
# message sender and are not the message origin # message sender and are not the message origin
if peer_id_in_topic not in (msg_sender, msg_origin): if peer_id_in_topic not in (msg_sender, decoded_from_id):
stream = self.pubsub.peers[peer_id_in_topic] stream = self.pubsub.peers[peer_id_in_topic]
await stream.write(encoded_msg) # create new packet with just publish message
else: new_packet = rpc_pb2.RPC()
# Implies publish did not write new_packet.publish.extend([message])
print("publish did not write") await stream.write(new_packet.SerializeToString())
else:
# Implies publish did not write
print("publish did not write")
def join(self, topic): def join(self, topic):
""" """

View File

@ -1,118 +0,0 @@
import uuid
class MessageTalk():
"""
Object to make parsing talk messages easier, where talk messages are
defined as custom messages published to a set of topics
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, topics, data, message_id):
# pylint: disable=too-many-arguments
self.msg_type = "talk"
self.from_id = from_id
self.origin_id = origin_id
self.topics = topics
self.data = data
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageTalk object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id + '\n'
for i in range(len(self.topics)):
out += self.topics[i]
if i < len(self.topics) - 1:
out += ','
out += '\n' + self.data
return out
class MessageSub():
"""
Object to make parsing subscription messages easier, where subscription
messages are defined as indicating the topics a node wishes to subscribe to
or unsubscribe from
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, subs_map, message_id):
self.msg_type = "subscription"
self.from_id = from_id
self.origin_id = origin_id
self.subs_map = subs_map
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageSub object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id
if self.subs_map:
out += '\n'
keys = list(self.subs_map)
for i, topic in enumerate(keys):
sub = self.subs_map[topic]
if sub:
out += "sub:"
else:
out += "unsub:"
out += topic
if i < len(keys) - 1:
out += '\n'
return out
def create_message_talk(msg_talk_as_str):
"""
Create a MessageTalk object from a MessageTalk string representation
:param msg_talk_as_str: a MessageTalk object in its string representation
:return: MessageTalk object
"""
msg_comps = msg_talk_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
topics = msg_comps[4].split(',')
data = msg_comps[5]
return MessageTalk(from_id, origin_id, topics, data, message_id)
def create_message_sub(msg_sub_as_str):
"""
Create a MessageSub object from a MessageSub string representation
:param msg_talk_as_str: a MessageSub object in its string representation
:return: MessageSub object
"""
msg_comps = msg_sub_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
subs_map = {}
for i in range(4, len(msg_comps)):
sub_comps = msg_comps[i].split(":")
topic = sub_comps[1]
if sub_comps[0] == "sub":
subs_map[topic] = True
else:
subs_map[topic] = False
return MessageSub(from_id, origin_id, subs_map, message_id)
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())

View File

@ -0,0 +1,78 @@
// Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto
syntax = "proto2";
package pubsub.pb;
message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;
message SubOpts {
optional bool subscribe = 1; // subscribe or unsubcribe
optional string topicid = 2;
}
optional ControlMessage control = 3;
}
message Message {
optional bytes from_id = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}
message ControlIHave {
optional string topicID = 1;
repeated string messageIDs = 2;
}
message ControlIWant {
repeated string messageIDs = 1;
}
message ControlGraft {
optional string topicID = 1;
}
message ControlPrune {
optional string topicID = 1;
}
message TopicDescriptor {
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;
message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2; // root keys to trust
enum AuthMode {
NONE = 0; // no authentication, anyone can publish
KEY = 1; // only messages signed by keys in the topic descriptor are accepted
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
message EncOpts {
optional EncMode mode = 1;
repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted)
enum EncMode {
NONE = 0; // no encryption, anyone can read
SHAREDKEY = 1; // messages are encrypted with shared key
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
}

638
libp2p/pubsub/pb/rpc_pb2.py Normal file
View File

@ -0,0 +1,638 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rpc.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='rpc.proto',
package='pubsub.pb',
syntax='proto2',
serialized_options=None,
serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
)
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor(
name='AuthMode',
full_name='pubsub.pb.TopicDescriptor.AuthOpts.AuthMode',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='NONE', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='KEY', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='WOT', index=2, number=2,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=868,
serialized_end=906,
)
_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE)
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor(
name='EncMode',
full_name='pubsub.pb.TopicDescriptor.EncOpts.EncMode',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='NONE', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='SHAREDKEY', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='WOT', index=2, number=2,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=997,
serialized_end=1040,
)
_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE)
_RPC_SUBOPTS = _descriptor.Descriptor(
name='SubOpts',
full_name='pubsub.pb.RPC.SubOpts',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='subscribe', full_name='pubsub.pb.RPC.SubOpts.subscribe', index=0,
number=1, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=160,
serialized_end=205,
)
_RPC = _descriptor.Descriptor(
name='RPC',
full_name='pubsub.pb.RPC',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='subscriptions', full_name='pubsub.pb.RPC.subscriptions', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='publish', full_name='pubsub.pb.RPC.publish', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='control', full_name='pubsub.pb.RPC.control', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_RPC_SUBOPTS, ],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=25,
serialized_end=205,
)
_MESSAGE = _descriptor.Descriptor(
name='Message',
full_name='pubsub.pb.Message',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='from_id', full_name='pubsub.pb.Message.from_id', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='data', full_name='pubsub.pb.Message.data', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='seqno', full_name='pubsub.pb.Message.seqno', index=2,
number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3,
number=4, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='signature', full_name='pubsub.pb.Message.signature', index=4,
number=5, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='key', full_name='pubsub.pb.Message.key', index=5,
number=6, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=207,
serialized_end=312,
)
_CONTROLMESSAGE = _descriptor.Descriptor(
name='ControlMessage',
full_name='pubsub.pb.ControlMessage',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='ihave', full_name='pubsub.pb.ControlMessage.ihave', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3,
number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=315,
serialized_end=491,
)
_CONTROLIHAVE = _descriptor.Descriptor(
name='ControlIHave',
full_name='pubsub.pb.ControlIHave',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='topicID', full_name='pubsub.pb.ControlIHave.topicID', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1,
number=2, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=493,
serialized_end=544,
)
_CONTROLIWANT = _descriptor.Descriptor(
name='ControlIWant',
full_name='pubsub.pb.ControlIWant',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='messageIDs', full_name='pubsub.pb.ControlIWant.messageIDs', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=546,
serialized_end=580,
)
_CONTROLGRAFT = _descriptor.Descriptor(
name='ControlGraft',
full_name='pubsub.pb.ControlGraft',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='topicID', full_name='pubsub.pb.ControlGraft.topicID', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=582,
serialized_end=613,
)
_CONTROLPRUNE = _descriptor.Descriptor(
name='ControlPrune',
full_name='pubsub.pb.ControlPrune',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='topicID', full_name='pubsub.pb.ControlPrune.topicID', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=615,
serialized_end=646,
)
_TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor(
name='AuthOpts',
full_name='pubsub.pb.TopicDescriptor.AuthOpts',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='mode', full_name='pubsub.pb.TopicDescriptor.AuthOpts.mode', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1,
number=2, type=12, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE,
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=782,
serialized_end=906,
)
_TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor(
name='EncOpts',
full_name='pubsub.pb.TopicDescriptor.EncOpts',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='mode', full_name='pubsub.pb.TopicDescriptor.EncOpts.mode', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1,
number=2, type=12, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE,
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=909,
serialized_end=1040,
)
_TOPICDESCRIPTOR = _descriptor.Descriptor(
name='TopicDescriptor',
full_name='pubsub.pb.TopicDescriptor',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='pubsub.pb.TopicDescriptor.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=649,
serialized_end=1040,
)
_RPC_SUBOPTS.containing_type = _RPC
_RPC.fields_by_name['subscriptions'].message_type = _RPC_SUBOPTS
_RPC.fields_by_name['publish'].message_type = _MESSAGE
_RPC.fields_by_name['control'].message_type = _CONTROLMESSAGE
_CONTROLMESSAGE.fields_by_name['ihave'].message_type = _CONTROLIHAVE
_CONTROLMESSAGE.fields_by_name['iwant'].message_type = _CONTROLIWANT
_CONTROLMESSAGE.fields_by_name['graft'].message_type = _CONTROLGRAFT
_CONTROLMESSAGE.fields_by_name['prune'].message_type = _CONTROLPRUNE
_TOPICDESCRIPTOR_AUTHOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE
_TOPICDESCRIPTOR_AUTHOPTS.containing_type = _TOPICDESCRIPTOR
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE.containing_type = _TOPICDESCRIPTOR_AUTHOPTS
_TOPICDESCRIPTOR_ENCOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_ENCOPTS_ENCMODE
_TOPICDESCRIPTOR_ENCOPTS.containing_type = _TOPICDESCRIPTOR
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE.containing_type = _TOPICDESCRIPTOR_ENCOPTS
_TOPICDESCRIPTOR.fields_by_name['auth'].message_type = _TOPICDESCRIPTOR_AUTHOPTS
_TOPICDESCRIPTOR.fields_by_name['enc'].message_type = _TOPICDESCRIPTOR_ENCOPTS
DESCRIPTOR.message_types_by_name['RPC'] = _RPC
DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE
DESCRIPTOR.message_types_by_name['ControlMessage'] = _CONTROLMESSAGE
DESCRIPTOR.message_types_by_name['ControlIHave'] = _CONTROLIHAVE
DESCRIPTOR.message_types_by_name['ControlIWant'] = _CONTROLIWANT
DESCRIPTOR.message_types_by_name['ControlGraft'] = _CONTROLGRAFT
DESCRIPTOR.message_types_by_name['ControlPrune'] = _CONTROLPRUNE
DESCRIPTOR.message_types_by_name['TopicDescriptor'] = _TOPICDESCRIPTOR
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict(
SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict(
DESCRIPTOR = _RPC_SUBOPTS,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts)
))
,
DESCRIPTOR = _RPC,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.RPC)
))
_sym_db.RegisterMessage(RPC)
_sym_db.RegisterMessage(RPC.SubOpts)
Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict(
DESCRIPTOR = _MESSAGE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.Message)
))
_sym_db.RegisterMessage(Message)
ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict(
DESCRIPTOR = _CONTROLMESSAGE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage)
))
_sym_db.RegisterMessage(ControlMessage)
ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict(
DESCRIPTOR = _CONTROLIHAVE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave)
))
_sym_db.RegisterMessage(ControlIHave)
ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict(
DESCRIPTOR = _CONTROLIWANT,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant)
))
_sym_db.RegisterMessage(ControlIWant)
ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict(
DESCRIPTOR = _CONTROLGRAFT,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft)
))
_sym_db.RegisterMessage(ControlGraft)
ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict(
DESCRIPTOR = _CONTROLPRUNE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune)
))
_sym_db.RegisterMessage(ControlPrune)
TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_message.Message,), dict(
AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict(
DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts)
))
,
EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict(
DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts)
))
,
DESCRIPTOR = _TOPICDESCRIPTOR,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor)
))
_sym_db.RegisterMessage(TopicDescriptor)
_sym_db.RegisterMessage(TopicDescriptor.AuthOpts)
_sym_db.RegisterMessage(TopicDescriptor.EncOpts)
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,3 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc

View File

@ -1,44 +1,11 @@
import asyncio import asyncio
from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .message import MessageSub
from .message import create_message_talk, create_message_sub
from. message import generate_message_id
class Pubsub(): class Pubsub():
""" # pylint: disable=too-many-instance-attributes, no-member
For now, because I'm on a plane and don't have access to the go repo/protobuf stuff,
this is going to be the message format for the two types: subscription and talk
subscription indicates subscribing or unsubscribing from a topic
talk is sending a message on topic(s)
subscription format:
subscription
'from'
<one of 'sub', 'unsub'>:'topicid'
<one of 'sub', 'unsub'>:'topicid'
...
Ex.
subscription
msg_sender_peer_id
origin_peer_id
sub:topic1
sub:topic2
unsub:fav_topic
talk format:
talk
'from'
'origin'
[topic_ids comma-delimited]
'data'
Ex.
talk
msg_sender_peer_id
origin_peer_id
topic1,topics_are_cool,foo
I like tacos
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, host, router, my_id): def __init__(self, host, router, my_id):
""" """
@ -89,60 +56,54 @@ class Pubsub():
def get_hello_packet(self): def get_hello_packet(self):
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics
""" """
subs_map = {} packet = rpc_pb2.RPC()
for topic in self.my_topics: if self.my_topics:
subs_map[topic] = True for topic_id in self.my_topics:
sub_msg = MessageSub( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
str(self.host.get_id()),\ subscribe=True, topicid=topic_id)])
str(self.host.get_id()), subs_map, generate_message_id()\
)
return sub_msg.to_str()
async def continously_read_stream(self, stream): return packet.SerializeToString()
async def continuously_read_stream(self, stream):
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk messages from other nodes
and MessageSub messages.
TODO: Handle RPC messages instead of my Aspyn's own custom message format
:param stream: stream to continously read from :param stream: stream to continously read from
""" """
# TODO check on types here
peer_id = str(stream.mplex_conn.peer_id)
while True: while True:
incoming = (await stream.read()).decode() incoming = (await stream.read())
msg_comps = incoming.split('\n') rpc_incoming = rpc_pb2.RPC()
msg_type = msg_comps[0] rpc_incoming.ParseFromString(incoming)
msg_sender = msg_comps[1] should_publish = False
# msg_origin = msg_comps[2]
msg_id = msg_comps[3]
print("HIT ME1")
if msg_id not in self.seen_messages:
print("HIT ME")
# Do stuff with incoming unseen message
should_publish = True
if msg_type == "subscription":
self.handle_subscription(incoming)
# We don't need to relay the subscription to our if rpc_incoming.publish:
# peers because a given node only needs its peers # deal with RPC.publish
# to know that it is subscribed to the topic (doesn't for message in rpc_incoming.publish:
# need everyone to know) if message.seqno not in self.seen_messages:
should_publish = False should_publish = True
elif msg_type == "talk": self.seen_messages.append(message.seqno)
await self.handle_talk(incoming) await self.handle_talk(message)
# Add message id to seen if rpc_incoming.subscriptions:
self.seen_messages.append(msg_id) # deal with RPC.subscriptions
# We don't need to relay the subscription to our
# peers because a given node only needs its peers
# to know that it is subscribed to the topic (doesn't
# need everyone to know)
for message in rpc_incoming.subscriptions:
if message.subscribe:
self.handle_subscription(peer_id, message)
# Publish message using router's publish if should_publish:
if should_publish: # relay message to peers with router
msg = create_message_talk(incoming) await self.router.publish(peer_id, incoming)
# Adjust raw_msg to that the message sender
# is now our peer_id
msg.from_id = str(self.host.get_id())
await self.router.publish(msg_sender, msg.to_str())
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
@ -161,9 +122,10 @@ class Pubsub():
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello.encode())
await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
async def handle_peer_queue(self): async def handle_peer_queue(self):
""" """
@ -173,6 +135,7 @@ class Pubsub():
pubsub protocols we support pubsub protocols we support
""" """
while True: while True:
peer_id = await self.peer_queue.get() peer_id = await self.peer_queue.get()
# Open a stream to peer on existing connection # Open a stream to peer on existing connection
@ -187,69 +150,64 @@ class Pubsub():
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello.encode()) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
def handle_subscription(self, subscription): def handle_subscription(self, origin_id, sub_message):
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message defined in the subscription message
:param subscription: raw data constituting a subscription message :param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts
""" """
sub_msg = create_message_sub(subscription) if sub_message.subscribe:
if sub_msg.subs_map: if sub_message.topicid not in self.peer_topics:
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) self.peer_topics[sub_message.topicid] = [origin_id]
for topic_id in sub_msg.subs_map: elif origin_id not in self.peer_topics[sub_message.topicid]:
# Look at each subscription in the msg individually # Add peer to topic
if sub_msg.subs_map[topic_id]: self.peer_topics[sub_message.topicid].append(origin_id)
if topic_id not in self.peer_topics: else:
# Create topic list if it did not yet exist # TODO: Remove peer from topic
self.peer_topics[topic_id] = [sub_msg.origin_id] pass
elif sub_msg.origin_id not in self.peer_topics[topic_id]:
# Add peer to topic
self.peer_topics[topic_id].append(sub_msg.origin_id)
else:
# TODO: Remove peer from topic
pass
async def handle_talk(self, talk): async def handle_talk(self, publish_message):
""" """
Handle incoming Talk message from a peer. A Talk message contains some Put incoming message from a peer onto my blocking queue
custom message that is published on a given topic(s) :param talk: RPC.Message format
:param talk: raw data constituting a talk message
""" """
msg = create_message_talk(talk)
# Check if this message has any topics that we are subscribed to # Check if this message has any topics that we are subscribed to
for topic in msg.topics: for topic in publish_message.topicIDs:
if topic in self.my_topics: if topic in self.my_topics:
# we are subscribed to a topic this message was sent for, # we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue # so add message to the subscription output queue
# for each topic # for each topic
await self.my_topics[topic].put(talk) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id): async def subscribe(self, topic_id):
""" """
Subscribe ourself to a topic Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
""" """
# Map topic_id to blocking queue # Map topic_id to blocking queue
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message # Create subscribe message
sub_msg = MessageSub( packet = rpc_pb2.RPC()
str(self.host.get_id()),\ packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
str(self.host.get_id()), {topic_id: True}, generate_message_id()\ subscribe=True,
) topicid=topic_id.encode('utf-8')
)])
# Send out subscribe message to all peers # Send out subscribe message to all peers
await self.message_all_peers(sub_msg.to_str()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are joining this topic # Tell router we are joining this topic
self.router.join(topic_id) self.router.join(topic_id)
@ -268,27 +226,27 @@ class Pubsub():
del self.my_topics[topic_id] del self.my_topics[topic_id]
# Create unsubscribe message # Create unsubscribe message
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ packet = rpc_pb2.RPC()
{topic_id: False}, generate_message_id()) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe=False,
topicid=topic_id.encode('utf-8')
)])
# Send out unsubscribe message to all peers # Send out unsubscribe message to all peers
await self.message_all_peers(unsub_msg.to_str()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are leaving this topic # Tell router we are leaving this topic
self.router.leave(topic_id) self.router.leave(topic_id)
async def message_all_peers(self, raw_msg): async def message_all_peers(self, rpc_msg):
""" """
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
""" """
# Encode message for sending
encoded_msg = raw_msg.encode()
# Broadcast message # Broadcast message
for peer in self.peers: for peer in self.peers:
stream = self.peers[peer] stream = self.peers[peer]
# Write message to stream # Write message to stream
await stream.write(encoded_msg) await stream.write(rpc_msg)

View File

@ -39,11 +39,11 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def publish(self, sender_peer_id, message): def publish(self, sender_peer_id, rpc_message):
""" """
Invoked to forward a new message that has been validated Invoked to forward a new message that has been validated
:param sender_peer_id: peer_id of message sender :param sender_peer_id: peer_id of message sender
:param message: message to forward :param rpc_message: message to forward
""" """
@abstractmethod @abstractmethod

View File

@ -3,3 +3,5 @@ codecov
pytest-cov pytest-cov
pytest-asyncio pytest-asyncio
pylint pylint
grpcio
grpcio-tools

View File

@ -22,6 +22,8 @@ setuptools.setup(
"base58", "base58",
"pymultihash", "pymultihash",
"multiaddr", "multiaddr",
"grpcio",
"grpcio-tools"
], ],
packages=["libp2p"], packages=["libp2p"],
zip_safe=False, zip_safe=False,

View File

@ -1,12 +1,10 @@
import asyncio import asyncio
import multiaddr import multiaddr
from utils import generate_message_id, generate_RPC_packet
from libp2p import new_node from libp2p import new_node
from libp2p.pubsub.message import create_message_talk
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import generate_message_id
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
CRYPTO_TOPIC = "ethereum" CRYPTO_TOPIC = "ethereum"
@ -53,16 +51,13 @@ class DummyAccountNode():
Handle all incoming messages on the CRYPTO_TOPIC from peers Handle all incoming messages on the CRYPTO_TOPIC from peers
""" """
while True: while True:
message_raw = await self.q.get() incoming = await self.q.get()
message = create_message_talk(message_raw) msg_comps = incoming.data.decode('utf-8').split(",")
contents = message.data
msg_comps = contents.split(",")
if msg_comps[0] == "send": if msg_comps[0] == "send":
self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3]))
elif msg_comps[0] == "set": elif msg_comps[0] == "set":
self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) self.handle_set_crypto(msg_comps[1], int(msg_comps[2]))
async def setup_crypto_networking(self): async def setup_crypto_networking(self):
""" """
@ -82,8 +77,8 @@ class DummyAccountNode():
""" """
my_id = str(self.libp2p_node.get_id()) my_id = str(self.libp2p_node.get_id())
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
await self.floodsub.publish(my_id, msg.to_str()) await self.floodsub.publish(my_id, packet.SerializeToString())
async def publish_set_crypto(self, user, amount): async def publish_set_crypto(self, user, amount):
""" """
@ -93,8 +88,9 @@ class DummyAccountNode():
""" """
my_id = str(self.libp2p_node.get_id()) my_id = str(self.libp2p_node.get_id())
msg_contents = "set," + user + "," + str(amount) msg_contents = "set," + user + "," + str(amount)
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
await self.floodsub.publish(my_id, msg.to_str())
await self.floodsub.publish(my_id, packet.SerializeToString())
def handle_send_crypto(self, source_user, dest_user, amount): def handle_send_crypto(self, source_user, dest_user, amount):
""" """
@ -113,7 +109,7 @@ class DummyAccountNode():
else: else:
self.balances[dest_user] = amount self.balances[dest_user] = amount
def handle_set_crypto_for_user(self, dest_user, amount): def handle_set_crypto(self, dest_user, amount):
""" """
Handle incoming set_crypto message Handle incoming set_crypto message
:param dest_user: user to set crypto for :param dest_user: user to set crypto for

View File

@ -8,8 +8,6 @@ from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk
from libp2p.pubsub.message import create_message_talk
from dummy_account_node import DummyAccountNode from dummy_account_node import DummyAccountNode
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@ -66,7 +64,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
await action_func(dummy_nodes) await action_func(dummy_nodes)
# Allow time for action function to be performed (i.e. messages to propogate) # Allow time for action function to be performed (i.e. messages to propogate)
await asyncio.sleep(0.25) await asyncio.sleep(1)
# Perform assertion function # Perform assertion function
for dummy_node in dummy_nodes: for dummy_node in dummy_nodes:

View File

@ -5,11 +5,10 @@ import pytest
from tests.utils import cleanup from tests.utils import cleanup
from libp2p import new_node from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.message import MessageTalk from utils import generate_message_id, generate_RPC_packet
from libp2p.pubsub.message import create_message_talk
from libp2p.pubsub.message import generate_message_id
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@ -22,7 +21,7 @@ async def connect(node1, node2):
await node1.connect(info) await node1.connect(info)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes(): async def test_simple_two_nodes_RPC():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
@ -45,68 +44,15 @@ async def test_simple_two_nodes():
node_a_id = str(node_a.get_id()) node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a_id, msg.SerializeToString())
await floodsub_a.publish(node_a.get_id(), msg.to_str())
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
res_b = await qb.get() res_b = await qb.get()
# Check that the msg received by node_b is the same # Check that the msg received by node_b is the same
# as the message sent by node_a # as the message sent by node_a
assert res_b == msg.to_str() assert res_b.SerializeToString() == msg.publish[0].SerializeToString()
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio
async def test_simple_three_nodes():
# Want to pass message from A -> B -> C
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
supported_protocols = ["/floodsub/1.0.0"]
floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a")
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b")
floodsub_c = FloodSub(supported_protocols)
pubsub_c = Pubsub(node_c, floodsub_c, "c")
await connect(node_a, node_b)
await connect(node_b, node_c)
await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic")
qc = await pubsub_c.subscribe("my_topic")
await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id())
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
await floodsub_a.publish(node_a.get_id(), msg.to_str())
await asyncio.sleep(0.25)
res_b = await qb.get()
res_c = await qc.get()
# Check that the msg received by node_b is the same
# as the message sent by node_a
assert res_b == msg.to_str()
# res_c should match original msg but with b as sender
node_b_id = str(node_b.get_id())
msg.from_id = node_b_id
assert res_c == msg.to_str()
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
@ -237,11 +183,12 @@ async def perform_test_from_obj(obj):
actual_node_id = str(node_map[node_id].get_id()) actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message # Create correctly formatted message
msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id()) msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id())
# Publish message # Publish message
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()))) tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\
actual_node_id, msg_talk.SerializeToString())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list # For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before # TODO: Update message sender to be correct message sender before
@ -261,12 +208,7 @@ async def perform_test_from_obj(obj):
for node_id in topic_map[topic]: for node_id in topic_map[topic]:
# Get message from subscription queue # Get message from subscription queue
msg_on_node_str = await queues_map[node_id][topic].get() msg_on_node_str = await queues_map[node_id][topic].get()
msg_on_node = create_message_talk(msg_on_node_str) assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString()
# Perform checks
assert actual_msg.origin_id == msg_on_node.origin_id
assert actual_msg.topics == msg_on_node.topics
assert actual_msg.data == msg_on_node.data
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()

30
tests/pubsub/utils.py Normal file
View File

@ -0,0 +1,30 @@
import uuid
from libp2p.pubsub.pb import rpc_pb2
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())
def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
"""
Generate RPC packet to send over wire
:param origin_id: peer id of the message origin
:param topics: list of topics
:param msg_content: string of content in data
:param msg_id: seqno for the message
"""
packet = rpc_pb2.RPC()
message = rpc_pb2.Message(
from_id=origin_id.encode('utf-8'),
seqno=msg_id.encode('utf-8'),
data=msg_content.encode('utf-8'),
)
for topic in topics:
message.topicIDs.extend([topic.encode('utf-8')])
packet.publish.extend([message])
return packet