Stream rearchitecture (#126)

* Add generic protocol handler

* Add generic protocol handler to stream muxing pipeline

* Modify conn_handler to only deal with connections

* mplex accept stream architecture changes

* Add create generic protocol handler

* Fix minor bugs

* who would win 4 devs or one not

* Debugging

* rearch with handle_incoming infinite loop, seems to work, needs cleanup"

* passing linting, still needs cleanup

* fixing linting again; code still needs cleanup

* fixing tests; code still needs cleanup

* adding test cleanup and task cleanup, removing prints

* linting, and cleanup complete

* storing connections based on peer id

* remove dead code

* remove unnecessary peer_id
This commit is contained in:
Robert Zajac
2019-02-24 20:58:23 -05:00
committed by GitHub
parent 17c778de15
commit 82840b5e6c
14 changed files with 367 additions and 120 deletions

View File

@ -1,12 +1,13 @@
import asyncio
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.peer.id import id_b58_decode
from .network_interface import INetwork
from .stream.net_stream import NetStream
from .connection.raw_connection import RawConnection
class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
@ -23,6 +24,9 @@ class Swarm(INetwork):
self.multiselect = Multiselect()
self.multiselect_client = MultiselectClient()
# Create generic protocol handler
self.generic_protocol_handler = create_generic_protocol_handler(self)
def get_peer_id(self):
return self.self_id
@ -58,10 +62,10 @@ class Swarm(INetwork):
muxed_conn = self.connections[peer_id]
else:
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(multiaddr)
raw_conn = await self.transport.dial(multiaddr, self.self_id)
# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, self.generic_protocol_handler)
# Store muxed connection in connections
self.connections[peer_id] = muxed_conn
@ -87,7 +91,7 @@ class Swarm(INetwork):
# Use muxed conn to open stream, which returns
# a muxed stream
# TODO: Remove protocol id from being passed into muxed_conn
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], peer_id, multiaddr)
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
# Perform protocol muxing to determine protocol to use
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream)
@ -117,26 +121,21 @@ class Swarm(INetwork):
return True
async def conn_handler(reader, writer):
# Read in first message (should be peer_id of initiator) and ack
peer_id = id_b58_decode((await reader.read(1024)).decode())
writer.write("received peer id".encode())
await writer.drain()
# Upgrade reader/write to a net_stream and pass \
# to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
multiaddr.value_for_protocol('tcp'), reader, writer, False)
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
self.generic_protocol_handler)
# TODO: Remove protocol id from muxed_conn accept stream or
# move protocol muxing into accept_stream
muxed_stream, _, _ = await muxed_conn.accept_stream()
# Perform protocol muxing to determine protocol to use
selected_protocol, handler = await self.multiselect.negotiate(muxed_stream)
net_stream = NetStream(muxed_stream)
net_stream.set_protocol(selected_protocol)
# Give to stream handler
# TODO: handle case when stream handler is set
# TODO: handle case of multiple protocols over same raw connection
await handler(net_stream)
# Store muxed_conn with peer id
self.connections[peer_id] = muxed_conn
try:
# Success
@ -155,6 +154,23 @@ class Swarm(INetwork):
# TODO: Support more than one transport
self.transport = transport
def create_generic_protocol_handler(swarm):
"""
Create a generic protocol handler from the given swarm. We use swarm
to extract the multiselect module so that generic_protocol_handler
can use multiselect when generic_protocol_handler is called
from a different class
"""
multiselect = swarm.multiselect
async def generic_protocol_handler(muxed_stream):
# Perform protocol muxing to determine protocol to use
_, handler = await multiselect.negotiate(muxed_stream)
# Give to stream handler
asyncio.ensure_future(handler(muxed_stream))
return generic_protocol_handler
class SwarmException(Exception):
pass