[WIP] PubSub and FloodSub development (#133)

* Add notifee interface

* Add notify function to network interface

* Implement notify feature

* Add tests for notify

* Make notifee functions all async

* Fix linting issue

* Fix linting issue

* Scaffold pubsub router interface

* Scaffold pubsub directory

* Store peer_id in muxed connection

* Implement pubsub notifee

* Remove outdated files

* Implement pubsub first attempt

* Prepare pubsub for floodsub

* Add mplex conn to net stream and add conn in notify tests

* Implement floodsub

* Use NetStream in generic protocol handler

* Debugging async issues

* Modify test to perform proper assert. Test passes

* Remove callbacks. Reduce sleep time

* Add simple three node test

* Clean up code. Add message classes

* Add test for two topics

* Add conn to net stream and conn tests

* Refactor test setup to remove duplicate code

* Fix linting issues

* Fix linting issue

* Fix linting issue

* Fix outstanding unrelated lint issue in multiselect_client

* Add connect function

* Remove debug prints

* Remove debug prints from floodsub

* Use MessageTalk in place of direct message breakdown

* Remove extra prints

* Remove outdated function

* Add message to queues for all topics in message

* Debugging

* Add message self delivery

* Increase read timeout to 5 to get pubsub tests passing

* Refactor testing helper func. Add tests

* Add tests and increase timeout to get tests passing

* Add dummy account demo scaffolding

* Attempt to use threads. Test fails

* Implement basic dummy node tests using threads

* Add generic testing function

* Add simple seven node tree test

* Add more complex seven node tree tests

* Add five node ring tests

* Remove unnecessary get_message_type func

* Add documentation to classes

* Add message id to messages

* Add documentation to test helper func

* Add docs to dummy account node helper func

* Add more docs to dummy account node test helper func

* fixed linting errors in floodsub

* small notify bugfix

* move pubsub into libp2p

* fixed pubsub linting

* fixing pubsub test failures

* linting
This commit is contained in:
stuckinaboot
2019-03-23 13:52:02 -04:00
committed by Stuckinaboot
parent fa53c5a866
commit 8d0f40a378
14 changed files with 1474 additions and 11 deletions

View File

@ -70,7 +70,8 @@ class Swarm(INetwork):
raw_conn = await self.transport.dial(multiaddr, self.self_id)
# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn, self.generic_protocol_handler)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
self.generic_protocol_handler, peer_id)
# Store muxed connection in connections
self.connections[peer_id] = muxed_conn
@ -145,7 +146,7 @@ class Swarm(INetwork):
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
multiaddr.value_for_protocol('tcp'), reader, writer, False)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
self.generic_protocol_handler)
self.generic_protocol_handler, peer_id)
# Store muxed_conn with peer id
self.connections[peer_id] = muxed_conn
@ -197,14 +198,17 @@ def create_generic_protocol_handler(swarm):
async def generic_protocol_handler(muxed_stream):
# Perform protocol muxing to determine protocol to use
_, handler = await multiselect.negotiate(muxed_stream)
protocol, handler = await multiselect.negotiate(muxed_stream)
net_stream = NetStream(muxed_stream)
net_stream.set_protocol(protocol)
# Call notifiers since event occurred
for notifee in swarm.notifees:
await notifee.opened_stream(swarm, muxed_stream)
await notifee.opened_stream(swarm, net_stream)
# Give to stream handler
asyncio.ensure_future(handler(muxed_stream))
asyncio.ensure_future(handler(net_stream))
return generic_protocol_handler

View File

98
libp2p/pubsub/floodsub.py Normal file
View File

@ -0,0 +1,98 @@
from .pubsub_router_interface import IPubsubRouter
from .message import create_message_talk
class FloodSub(IPubsubRouter):
def __init__(self, protocols):
self.protocols = protocols
self.pubsub = None
def get_protocols(self):
"""
:return: the list of protocols supported by the router
"""
return self.protocols
def attach(self, pubsub):
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
self.pubsub = pubsub
def add_peer(self, peer_id, protocol_id):
"""
Notifies the router that a new peer has been connected
:param peer_id: id of peer to add
"""
def remove_peer(self, peer_id):
"""
Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove
"""
def handle_rpc(self, rpc):
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
:param rpc: rpc message
"""
async def publish(self, sender_peer_id, message):
"""
Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens
With flooding, routing is almost trivial: for each incoming message,
forward to all known peers in the topic. There is a bit of logic,
as the router maintains a timed cache of previous messages,
so that seen messages are not further forwarded.
It also never forwards a message back to the source
or the peer that forwarded the message.
:param sender_peer_id: peer_id of message sender
:param message: message to forward
"""
# Encode message
encoded_msg = message.encode()
# Get message sender, origin, and topics
msg_talk = create_message_talk(message)
msg_sender = str(sender_peer_id)
msg_origin = msg_talk.origin_id
topics = msg_talk.topics
# Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message
if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()):
await self.pubsub.handle_talk(message)
# Deliver to self and peers
for topic in topics:
if topic in self.pubsub.peer_topics:
for peer_id_in_topic in self.pubsub.peer_topics[topic]:
# Forward to all known peers in the topic that are not the
# message sender and are not the message origin
if peer_id_in_topic not in (msg_sender, msg_origin):
stream = self.pubsub.peers[peer_id_in_topic]
await stream.write(encoded_msg)
else:
# Implies publish did not write
print("publish did not write")
def join(self, topic):
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
:param topic: topic to join
"""
def leave(self, topic):
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""

118
libp2p/pubsub/message.py Normal file
View File

@ -0,0 +1,118 @@
import uuid
class MessageTalk():
"""
Object to make parsing talk messages easier, where talk messages are
defined as custom messages published to a set of topics
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, topics, data, message_id):
# pylint: disable=too-many-arguments
self.msg_type = "talk"
self.from_id = from_id
self.origin_id = origin_id
self.topics = topics
self.data = data
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageTalk object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id + '\n'
for i in range(len(self.topics)):
out += self.topics[i]
if i < len(self.topics) - 1:
out += ','
out += '\n' + self.data
return out
class MessageSub():
"""
Object to make parsing subscription messages easier, where subscription
messages are defined as indicating the topics a node wishes to subscribe to
or unsubscribe from
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, subs_map, message_id):
self.msg_type = "subscription"
self.from_id = from_id
self.origin_id = origin_id
self.subs_map = subs_map
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageSub object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id
if self.subs_map:
out += '\n'
keys = list(self.subs_map)
for i, topic in enumerate(keys):
sub = self.subs_map[topic]
if sub:
out += "sub:"
else:
out += "unsub:"
out += topic
if i < len(keys) - 1:
out += '\n'
return out
def create_message_talk(msg_talk_as_str):
"""
Create a MessageTalk object from a MessageTalk string representation
:param msg_talk_as_str: a MessageTalk object in its string representation
:return: MessageTalk object
"""
msg_comps = msg_talk_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
topics = msg_comps[4].split(',')
data = msg_comps[5]
return MessageTalk(from_id, origin_id, topics, data, message_id)
def create_message_sub(msg_sub_as_str):
"""
Create a MessageSub object from a MessageSub string representation
:param msg_talk_as_str: a MessageSub object in its string representation
:return: MessageSub object
"""
msg_comps = msg_sub_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
subs_map = {}
for i in range(4, len(msg_comps)):
sub_comps = msg_comps[i].split(":")
topic = sub_comps[1]
if sub_comps[0] == "sub":
subs_map[topic] = True
else:
subs_map[topic] = False
return MessageSub(from_id, origin_id, subs_map, message_id)
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())

294
libp2p/pubsub/pubsub.py Normal file
View File

@ -0,0 +1,294 @@
import asyncio
from .pubsub_notifee import PubsubNotifee
from .message import MessageSub
from .message import create_message_talk, create_message_sub
from. message import generate_message_id
class Pubsub():
"""
For now, because I'm on a plane and don't have access to the go repo/protobuf stuff,
this is going to be the message format for the two types: subscription and talk
subscription indicates subscribing or unsubscribing from a topic
talk is sending a message on topic(s)
subscription format:
subscription
'from'
<one of 'sub', 'unsub'>:'topicid'
<one of 'sub', 'unsub'>:'topicid'
...
Ex.
subscription
msg_sender_peer_id
origin_peer_id
sub:topic1
sub:topic2
unsub:fav_topic
talk format:
talk
'from'
'origin'
[topic_ids comma-delimited]
'data'
Ex.
talk
msg_sender_peer_id
origin_peer_id
topic1,topics_are_cool,foo
I like tacos
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, host, router, my_id):
"""
Construct a new Pubsub object, which is responsible for handling all
Pubsub-related messages and relaying messages as appropriate to the
Pubsub router (which is responsible for choosing who to send messages to).
Since the logic for choosing peers to send pubsub messages to is
in the router, the same Pubsub impl can back floodsub, gossipsub, etc.
"""
self.host = host
self.router = router
self.my_id = my_id
# Attach this new Pubsub object to the router
self.router.attach(self)
# Register a notifee
self.peer_queue = asyncio.Queue()
self.host.get_network().notify(PubsubNotifee(self.peer_queue))
# Register stream handlers for each pubsub router protocol to handle
# the pubsub streams opened on those protocols
self.protocols = self.router.get_protocols()
for protocol in self.protocols:
self.host.set_stream_handler(protocol, self.stream_handler)
# TODO: determine if these need to be asyncio queues, or if could possibly
# be ordinary blocking queues
self.incoming_msgs_from_peers = asyncio.Queue()
self.outgoing_messages = asyncio.Queue()
# TODO: Make seen_messages a cache (LRU cache?)
self.seen_messages = []
# Map of topics we are subscribed to to handler functions
# for when the given topic receives a message
self.my_topics = {}
# Map of topic to peers to keep track of what peers are subscribed to
self.peer_topics = {}
# Create peers map, which maps peer_id (as string) to stream (to a given peer)
self.peers = {}
# Call handle peer to keep waiting for updates to peer queue
asyncio.ensure_future(self.handle_peer_queue())
def get_hello_packet(self):
"""
Generate subscription message with all topics we are subscribed to
"""
subs_map = {}
for topic in self.my_topics:
subs_map[topic] = True
sub_msg = MessageSub(
str(self.host.get_id()),\
str(self.host.get_id()), subs_map, generate_message_id()\
)
return sub_msg.to_str()
async def continously_read_stream(self, stream):
"""
Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk
and MessageSub messages.
TODO: Handle RPC messages instead of my Aspyn's own custom message format
:param stream: stream to continously read from
"""
while True:
incoming = (await stream.read()).decode()
msg_comps = incoming.split('\n')
msg_type = msg_comps[0]
msg_sender = msg_comps[1]
# msg_origin = msg_comps[2]
msg_id = msg_comps[3]
print("HIT ME1")
if msg_id not in self.seen_messages:
print("HIT ME")
# Do stuff with incoming unseen message
should_publish = True
if msg_type == "subscription":
self.handle_subscription(incoming)
# We don't need to relay the subscription to our
# peers because a given node only needs its peers
# to know that it is subscribed to the topic (doesn't
# need everyone to know)
should_publish = False
elif msg_type == "talk":
await self.handle_talk(incoming)
# Add message id to seen
self.seen_messages.append(msg_id)
# Publish message using router's publish
if should_publish:
msg = create_message_talk(incoming)
# Adjust raw_msg to that the message sender
# is now our peer_id
msg.from_id = str(self.host.get_id())
await self.router.publish(msg_sender, msg.to_str())
# Force context switch
await asyncio.sleep(0)
async def stream_handler(self, stream):
"""
Stream handler for pubsub. Gets invoked whenever a new stream is created
on one of the supported pubsub protocols.
:param stream: newly created stream
"""
# Add peer
# Map peer to stream
peer_id = stream.mplex_conn.peer_id
self.peers[str(peer_id)] = stream
self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet
hello = self.get_hello_packet()
await stream.write(hello.encode())
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))
async def handle_peer_queue(self):
"""
Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol
TODO: Handle failure for when the peer does not support any of the
pubsub protocols we support
"""
while True:
peer_id = await self.peer_queue.get()
# Open a stream to peer on existing connection
# (we know connection exists since that's the only way
# an element gets added to peer_queue)
stream = await self.host.new_stream(peer_id, self.protocols)
# Add Peer
# Map peer to stream
self.peers[str(peer_id)] = stream
self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet
hello = self.get_hello_packet()
await stream.write(hello.encode())
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))
# Force context switch
await asyncio.sleep(0)
def handle_subscription(self, subscription):
"""
Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message
:param subscription: raw data constituting a subscription message
"""
sub_msg = create_message_sub(subscription)
if sub_msg.subs_map:
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id)
for topic_id in sub_msg.subs_map:
# Look at each subscription in the msg individually
if sub_msg.subs_map[topic_id]:
if topic_id not in self.peer_topics:
# Create topic list if it did not yet exist
self.peer_topics[topic_id] = [sub_msg.origin_id]
elif sub_msg.origin_id not in self.peer_topics[topic_id]:
# Add peer to topic
self.peer_topics[topic_id].append(sub_msg.origin_id)
else:
# TODO: Remove peer from topic
pass
async def handle_talk(self, talk):
"""
Handle incoming Talk message from a peer. A Talk message contains some
custom message that is published on a given topic(s)
:param talk: raw data constituting a talk message
"""
msg = create_message_talk(talk)
# Check if this message has any topics that we are subscribed to
for topic in msg.topics:
if topic in self.my_topics:
# we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue
# for each topic
await self.my_topics[topic].put(talk)
async def subscribe(self, topic_id):
"""
Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to
"""
# Map topic_id to blocking queue
self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message
sub_msg = MessageSub(
str(self.host.get_id()),\
str(self.host.get_id()), {topic_id: True}, generate_message_id()\
)
# Send out subscribe message to all peers
await self.message_all_peers(sub_msg.to_str())
# Tell router we are joining this topic
self.router.join(topic_id)
# Return the asyncio queue for messages on this topic
return self.my_topics[topic_id]
async def unsubscribe(self, topic_id):
"""
Unsubscribe ourself from a topic
:param topic_id: topic_id to unsubscribe from
"""
# Remove topic_id from map if present
if topic_id in self.my_topics:
del self.my_topics[topic_id]
# Create unsubscribe message
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\
{topic_id: False}, generate_message_id())
# Send out unsubscribe message to all peers
await self.message_all_peers(unsub_msg.to_str())
# Tell router we are leaving this topic
self.router.leave(topic_id)
async def message_all_peers(self, raw_msg):
"""
Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast
"""
# Encode message for sending
encoded_msg = raw_msg.encode()
# Broadcast message
for peer in self.peers:
stream = self.peers[peer]
# Write message to stream
await stream.write(encoded_msg)

View File

@ -0,0 +1,40 @@
from libp2p.network.notifee_interface import INotifee
class PubsubNotifee(INotifee):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
def __init__(self, initiator_peers_queue):
"""
:param initiator_peers_queue: queue to add new peers to so that pubsub
can process new peers after we connect to them
"""
self.initiator_peers_queue = initiator_peers_queue
async def opened_stream(self, network, stream):
pass
async def closed_stream(self, network, stream):
pass
async def connected(self, network, conn):
"""
Add peer_id to initiator_peers_queue, so that this peer_id can be used to
create a stream and we only want to have one pubsub stream with each peer.
:param network: network the connection was opened on
:param conn: connection that was opened
"""
# Only add peer_id if we are initiator (otherwise we would end up
# with two pubsub streams between us and the peer)
if conn.initiator:
await self.initiator_peers_queue.put(conn.peer_id)
async def disconnected(self, network, conn):
pass
async def listen(self, network, multiaddr):
pass
async def listen_close(self, network, multiaddr):
pass

View File

@ -0,0 +1,64 @@
from abc import ABC, abstractmethod
class IPubsubRouter(ABC):
@abstractmethod
def get_protocols(self):
"""
:return: the list of protocols supported by the router
"""
@abstractmethod
def attach(self, pubsub):
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
@abstractmethod
def add_peer(self, peer_id, protocol_id):
"""
Notifies the router that a new peer has been connected
:param peer_id: id of peer to add
"""
@abstractmethod
def remove_peer(self, peer_id):
"""
Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove
"""
@abstractmethod
def handle_rpc(self, rpc):
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
:param rpc: rpc message
"""
@abstractmethod
def publish(self, sender_peer_id, message):
"""
Invoked to forward a new message that has been validated
:param sender_peer_id: peer_id of message sender
:param message: message to forward
"""
@abstractmethod
def join(self, topic):
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
:param topic: topic to join
"""
@abstractmethod
def leave(self, topic):
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""

View File

@ -11,14 +11,15 @@ class Mplex(IMuxedConn):
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
"""
def __init__(self, conn, generic_protocol_handler):
def __init__(self, conn, generic_protocol_handler, peer_id):
"""
create a new muxed connection
:param conn: an instance of raw connection
:param generic_protocol_handler: generic protocol handler
for new muxed streams
:param peer_id: peer_id of peer the connection is to
"""
super(Mplex, self).__init__(conn, generic_protocol_handler)
super(Mplex, self).__init__(conn, generic_protocol_handler, peer_id)
self.raw_conn = conn
self.initiator = conn.initiator
@ -26,6 +27,9 @@ class Mplex(IMuxedConn):
# Store generic protocol handler
self.generic_protocol_handler = generic_protocol_handler
# Set peer_id
self.peer_id = peer_id
# Mapping from stream ID -> buffer of messages for that stream
self.buffers = {}
@ -56,7 +60,7 @@ class Mplex(IMuxedConn):
# TODO: pass down timeout from user and use that
if stream_id in self.buffers:
try:
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=3)
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8)
return data
except asyncio.TimeoutError:
return None

View File

@ -7,12 +7,13 @@ class IMuxedConn(ABC):
"""
@abstractmethod
def __init__(self, conn, generic_protocol_handler):
def __init__(self, conn, generic_protocol_handler, peer_id):
"""
create a new muxed connection
:param conn: an instance of raw connection
:param generic_protocol_handler: generic protocol handler
for new muxed streams
:param peer_id: peer_id of peer the connection is to
"""
@abstractmethod

View File

@ -17,11 +17,11 @@ class TransportUpgrader:
def upgrade_security(self):
pass
def upgrade_connection(self, conn, generic_protocol_handler):
def upgrade_connection(self, conn, generic_protocol_handler, peer_id):
"""
upgrade raw connection to muxed connection
"""
# For PoC, no security, default to mplex
# TODO do exchange to determine multiplexer
return Mplex(conn, generic_protocol_handler)
return Mplex(conn, generic_protocol_handler, peer_id)