fix merge conflicts

This commit is contained in:
zixuanzh
2019-01-10 20:45:03 +08:00
71 changed files with 154 additions and 107 deletions

View File

@ -0,0 +1,38 @@
from Crypto.PublicKey import RSA
import multiaddr
from .peer.peerstore import PeerStore
from .peer.id import id_from_public_key
from .network.swarm import Swarm
from .host.basic_host import BasicHost
from .transport.upgrader import TransportUpgrader
from .transport.tcp.tcp import TCP
async def new_node(
id_opt=None, transport_opt=None,
muxer_opt=None, sec_opt=None, peerstore=None):
if id_opt is None:
new_key = RSA.generate(2048, e=65537)
id_opt = id_from_public_key(new_key.publickey())
# private_key = new_key.exportKey("PEM")
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
transport_opt = [multiaddr.Multiaddr(t) for t in transport_opt]
muxer_opt = muxer_opt or ["mplex/6.7.0"]
sec_opt = sec_opt or ["secio"]
peerstore = peerstore or PeerStore()
upgrader = TransportUpgrader(sec_opt, transport_opt)
swarm = Swarm(id_opt, peerstore, upgrader)
tcp = TCP()
swarm.add_transport(tcp)
await swarm.listen(transport_opt[0])
# TODO enable support for other host type
# TODO routing unimplemented
host = BasicHost(swarm)
return host

View File

View File

0
libp2p/host/__init__.py Normal file
View File

90
libp2p/host/basic_host.py Normal file
View File

@ -0,0 +1,90 @@
import multiaddr
from .host_interface import IHost
# Upon host creation, host takes in options,
# including the list of addresses on which to listen.
# Host then parses these options and delegates to its Network instance,
# telling it to listen on the given listen addresses.
class BasicHost(IHost):
# default options constructor
def __init__(self, _network):
self.network = _network
self.peerstore = self.network.peerstore
def get_id(self):
"""
:return: peer_id of host
"""
return self.network.get_peer_id()
def get_network(self):
"""
:return: network instance of host
"""
return self.network
def get_peerstore(self):
"""
:return: peerstore of the host (same one as in its network instance)
"""
return self.peerstore
def get_mux(self):
"""
:return: mux instance of host
"""
def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""
p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty()))
addrs = []
for transport in self.network.listeners.values():
for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part))
return addrs
def set_stream_handler(self, protocol_id, stream_handler):
"""
set stream handler for host
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler function
:return: true if successful
"""
return self.network.set_stream_handler(protocol_id, stream_handler)
# protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on
async def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id that host is connecting
:param protocol_id: protocol id that stream runs on
:return: true if successful
"""
stream = await self.network.new_stream(peer_id, protocol_ids)
return stream
async def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.
:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# there is already a connection to this peer
if peer_info.peer_id in self.network.connections:
return
await self.network.dial_peer(peer_info.peer_id)

View File

@ -0,0 +1,60 @@
from abc import ABC, abstractmethod
class IHost(ABC):
@abstractmethod
def get_id(self):
"""
:return: peer_id of host
"""
@abstractmethod
def get_network(self):
"""
:return: network instance of host
"""
@abstractmethod
def get_mux(self):
"""
:return: mux instance of host
"""
@abstractmethod
def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""
@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""
set stream handler for host
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler function
:return: true if successful
"""
# protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on
@abstractmethod
def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id that host is connecting
:param protocol_ids: protocol ids that stream can run on
:return: true if successful
"""
@abstractmethod
def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.
:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""

View File

@ -0,0 +1,5 @@
"""
Kademlia is a Python implementation of the Kademlia protocol which
utilizes the asyncio library.
"""
__version__ = "1.1"

181
libp2p/kademlia/crawling.py Normal file
View File

@ -0,0 +1,181 @@
from collections import Counter
import logging
from .kademlia.node import Node, NodeHeap
from .kademlia.utils import gather_dict
log = logging.getLogger(__name__)
class SpiderCrawl:
"""
Crawl the network and look for given 160-bit keys.
"""
def __init__(self, protocol, node, peers, ksize, alpha):
"""
Create a new C{SpiderCrawl}er.
Args:
protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance.
node: A :class:`~kademlia.node.Node` representing the key we're
looking for
peers: A list of :class:`~kademlia.node.Node` instances that
provide the entry point for the network
ksize: The value for k based on the paper
alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = NodeHeap(self.node, self.ksize)
self.lastIDsCrawled = []
log.info("creating spider with peers: %s", peers)
self.nearest.push(peers)
async def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
Args:
rpcmethod: The protocol's callfindValue or callFindNode.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried
already sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
log.info("crawling network with nearest: %s", str(tuple(self.nearest)))
count = self.alpha
if self.nearest.getIDs() == self.lastIDsCrawled:
count = len(self.nearest)
self.lastIDsCrawled = self.nearest.getIDs()
ds = {}
for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer)
found = await gather_dict(ds)
return await self._nodesFound(found)
async def _nodesFound(self, responses):
raise NotImplementedError
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearestWithoutValue = NodeHeap(self.node, 1)
async def find(self):
"""
Find either the closest nodes or the value requested.
"""
return await self._find(self.protocol.callFindValue)
async def _nodesFound(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
foundValues = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.hasValue():
foundValues.append(response.getValue())
else:
peer = self.nearest.getNodeById(peerid)
self.nearestWithoutValue.push(peer)
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if len(foundValues) > 0:
return await self._handleFoundValues(foundValues)
if self.nearest.allBeenContacted():
# not found!
return None
return await self.find()
async def _handleFoundValues(self, values):
"""
We got some values! Exciting. But let's make sure
they're all the same or freak out a little bit. Also,
make sure we tell the nearest node that *didn't* have
the value to store it.
"""
valueCounts = Counter(values)
if len(valueCounts) != 1:
log.warning("Got multiple values for key %i: %s",
self.node.long_id, str(values))
value = valueCounts.most_common(1)[0][0]
peerToSaveTo = self.nearestWithoutValue.popleft()
if peerToSaveTo is not None:
await self.protocol.callStore(peerToSaveTo, self.node.id, value)
return value
class NodeSpiderCrawl(SpiderCrawl):
async def find(self):
"""
Find the closest nodes.
"""
return await self._find(self.protocol.callFindNode)
async def _nodesFound(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if self.nearest.allBeenContacted():
return list(self.nearest)
return await self.find()
class RPCFindResponse:
def __init__(self, response):
"""
A wrapper for the result of a RPC find.
Args:
response: This will be a tuple of (<response received>, <value>)
where <value> will be a list of tuples if not found or
a dictionary of {'value': v} where v is the value desired
"""
self.response = response
def happened(self):
"""
Did the other host actually respond?
"""
return self.response[0]
def hasValue(self):
return isinstance(self.response[1], dict)
def getValue(self):
return self.response[1]['value']
def getNodeList(self):
"""
Get the node list in the response. If there's no value, this should
be set.
"""
nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist]

258
libp2p/kademlia/network.py Normal file
View File

@ -0,0 +1,258 @@
"""
Package for interacting on the network at a high level.
"""
import random
import pickle
import asyncio
import logging
from .kademlia.protocol import KademliaProtocol
from .kademlia.utils import digest
from .kademlia.storage import ForgetfulStorage
from .kademlia.node import Node
from .kademlia.crawling import ValueSpiderCrawl
from .kademlia.crawling import NodeSpiderCrawl
log = logging.getLogger(__name__)
class Server:
"""
High level view of a node instance. This is the object that should be
created to start listening as an active node on the network.
"""
protocol_class = KademliaProtocol
def __init__(self, ksize=20, alpha=3, node_id=None, storage=None):
"""
Create a server instance. This will start listening on the given port.
Args:
ksize (int): The k parameter from the paper
alpha (int): The alpha parameter from the paper
node_id: The id for this node on the network.
storage: An instance that implements
:interface:`~kademlia.storage.IStorage`
"""
self.ksize = ksize
self.alpha = alpha
self.storage = storage or ForgetfulStorage()
self.node = Node(node_id or digest(random.getrandbits(255)))
self.transport = None
self.protocol = None
self.refresh_loop = None
self.save_state_loop = None
def stop(self):
if self.transport is not None:
self.transport.close()
if self.refresh_loop:
self.refresh_loop.cancel()
if self.save_state_loop:
self.save_state_loop.cancel()
def _create_protocol(self):
return self.protocol_class(self.node, self.storage, self.ksize)
def listen(self, port, interface='0.0.0.0'):
"""
Start listening on the given port.
Provide interface="::" to accept ipv6 address
"""
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port))
log.info("Node %i listening on %s:%i",
self.node.long_id, interface, port)
self.transport, self.protocol = loop.run_until_complete(listen)
# finally, schedule refreshing table
self.refresh_table()
def refresh_table(self):
log.debug("Refreshing routing table")
asyncio.ensure_future(self._refresh_table())
loop = asyncio.get_event_loop()
self.refresh_loop = loop.call_later(3600, self.refresh_table)
async def _refresh_table(self):
"""
Refresh buckets that haven't had any lookups in the last hour
(per section 2.3 of the paper).
"""
ds = []
for node_id in self.protocol.getRefreshIDs():
node = Node(node_id)
nearest = self.protocol.router.findNeighbors(node, self.alpha)
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
ds.append(spider.find())
# do our crawling
await asyncio.gather(*ds)
# now republish keys older than one hour
for dkey, value in self.storage.iteritemsOlderThan(3600):
await self.set_digest(dkey, value)
def bootstrappableNeighbors(self):
"""
Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for
use as an argument to the bootstrap method.
The server should have been bootstrapped
already - this is just a utility for getting some neighbors and then
storing them if this server is going down for a while. When it comes
back up, the list of nodes can be used to bootstrap.
"""
neighbors = self.protocol.router.findNeighbors(self.node)
return [tuple(n)[-2:] for n in neighbors]
async def bootstrap(self, addrs):
"""
Bootstrap the server by connecting to other known nodes in the network.
Args:
addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP
addresses are acceptable - hostnames will cause an error.
"""
log.debug("Attempting to bootstrap node with %i initial contacts",
len(addrs))
cos = list(map(self.bootstrap_node, addrs))
gathered = await asyncio.gather(*cos)
nodes = [node for node in gathered if node is not None]
spider = NodeSpiderCrawl(self.protocol, self.node, nodes,
self.ksize, self.alpha)
return await spider.find()
async def bootstrap_node(self, addr):
result = await self.protocol.ping(addr, self.node.id)
return Node(result[1], addr[0], addr[1]) if result[0] else None
async def get(self, key):
"""
Get a key if the network has it.
Returns:
:class:`None` if not found, the value otherwise.
"""
log.info("Looking up key %s", key)
dkey = digest(key)
# if this node has it, return it
if self.storage.get(dkey) is not None:
return self.storage.get(dkey)
node = Node(dkey)
nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0:
log.warning("There are no known neighbors to get key %s", key)
return None
spider = ValueSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
return await spider.find()
async def set(self, key, value):
"""
Set the given string key to the given value in the network.
"""
if not check_dht_value_type(value):
raise TypeError(
"Value must be of type int, float, bool, str, or bytes"
)
log.info("setting '%s' = '%s' on network", key, value)
dkey = digest(key)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value):
"""
Set the given SHA1 digest key (bytes) to the given value in the
network.
"""
node = Node(dkey)
nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0:
log.warning("There are no known neighbors to set key %s",
dkey.hex())
return False
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
nodes = await spider.find()
log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes)))
# if this node is close too, then store here as well
biggest = max([n.distanceTo(node) for n in nodes])
if self.node.distanceTo(node) < biggest:
self.storage[dkey] = value
ds = [self.protocol.callStore(n, dkey, value) for n in nodes]
# return true only if at least one store call succeeded
return any(await asyncio.gather(*ds))
def saveState(self, fname):
"""
Save the state of this node (the alpha/ksize/id/immediate neighbors)
to a cache file with the given fname.
"""
log.info("Saving state to %s", fname)
data = {
'ksize': self.ksize,
'alpha': self.alpha,
'id': self.node.id,
'neighbors': self.bootstrappableNeighbors()
}
if len(data['neighbors']) == 0:
log.warning("No known neighbors, so not writing to cache.")
return
with open(fname, 'wb') as f:
pickle.dump(data, f)
@classmethod
def loadState(self, fname):
"""
Load the state of this node (the alpha/ksize/id/immediate neighbors)
from a cache file with the given fname.
"""
log.info("Loading state from %s", fname)
with open(fname, 'rb') as f:
data = pickle.load(f)
s = Server(data['ksize'], data['alpha'], data['id'])
if len(data['neighbors']) > 0:
s.bootstrap(data['neighbors'])
return s
def saveStateRegularly(self, fname, frequency=600):
"""
Save the state of node with a given regularity to the given
filename.
Args:
fname: File name to save retularly to
frequency: Frequency in seconds that the state should be saved.
By default, 10 minutes.
"""
self.saveState(fname)
loop = asyncio.get_event_loop()
self.save_state_loop = loop.call_later(frequency,
self.saveStateRegularly,
fname,
frequency)
def check_dht_value_type(value):
"""
Checks to see if the type of the value is a valid type for
placing in the dht.
"""
typeset = set(
[
int,
float,
bool,
str,
bytes,
]
)
return type(value) in typeset

115
libp2p/kademlia/node.py Normal file
View File

@ -0,0 +1,115 @@
from operator import itemgetter
import heapq
class Node:
def __init__(self, node_id, ip=None, port=None):
self.id = node_id
self.ip = ip
self.port = port
self.long_id = int(node_id.hex(), 16)
def sameHomeAs(self, node):
return self.ip == node.ip and self.port == node.port
def distanceTo(self, node):
"""
Get the distance between this node and another.
"""
return self.long_id ^ node.long_id
def __iter__(self):
"""
Enables use of Node as a tuple - i.e., tuple(node) works.
"""
return iter([self.id, self.ip, self.port])
def __repr__(self):
return repr([self.long_id, self.ip, self.port])
def __str__(self):
return "%s:%s" % (self.ip, str(self.port))
class NodeHeap:
"""
A heap of nodes ordered by distance to a given node.
"""
def __init__(self, node, maxsize):
"""
Constructor.
@param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to.
"""
self.node = node
self.heap = []
self.contacted = set()
self.maxsize = maxsize
def remove(self, peerIDs):
"""
Remove a list of peer ids from this heap. Note that while this
heap retains a constant visible size (based on the iterator), it's
actual size may be quite a bit larger than what's exposed. Therefore,
removal of nodes may not change the visible size as previously added
nodes suddenly become visible.
"""
peerIDs = set(peerIDs)
if len(peerIDs) == 0:
return
nheap = []
for distance, node in self.heap:
if node.id not in peerIDs:
heapq.heappush(nheap, (distance, node))
self.heap = nheap
def getNodeById(self, node_id):
for _, node in self.heap:
if node.id == node_id:
return node
return None
def allBeenContacted(self):
return len(self.getUncontacted()) == 0
def getIDs(self):
return [n.id for n in self]
def markContacted(self, node):
self.contacted.add(node.id)
def popleft(self):
if len(self) > 0:
return heapq.heappop(self.heap)[1]
return None
def push(self, nodes):
"""
Push nodes onto heap.
@param nodes: This can be a single item or a C{list}.
"""
if not isinstance(nodes, list):
nodes = [nodes]
for node in nodes:
if node not in self:
distance = self.node.distanceTo(node)
heapq.heappush(self.heap, (distance, node))
def __len__(self):
return min(len(self.heap), self.maxsize)
def __iter__(self):
nodes = heapq.nsmallest(self.maxsize, self.heap)
return iter(map(itemgetter(1), nodes))
def __contains__(self, node):
for _, n in self.heap:
if node.id == n.id:
return True
return False
def getUncontacted(self):
return [n for n in self if n.id not in self.contacted]

128
libp2p/kademlia/protocol.py Normal file
View File

@ -0,0 +1,128 @@
import random
import asyncio
import logging
from rpcudp.protocol import RPCProtocol
from .kademlia.node import Node
from .kademlia.routing import RoutingTable
from .kademlia.utils import digest
log = logging.getLogger(__name__)
class KademliaProtocol(RPCProtocol):
def __init__(self, sourceNode, storage, ksize):
RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize, sourceNode)
self.storage = storage
self.sourceNode = sourceNode
def getRefreshIDs(self):
"""
Get ids to search for to keep old buckets up to date.
"""
ids = []
for bucket in self.router.getLonelyBuckets():
rid = random.randint(*bucket.range).to_bytes(20, byteorder='big')
ids.append(rid)
return ids
def rpc_stun(self, sender):
return sender
def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
return self.sourceNode.id
def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
log.debug("got a store request from %s, storing '%s'='%s'",
sender, key.hex(), value)
self.storage[key] = value
return True
def rpc_find_node(self, sender, nodeid, key):
log.info("finding neighbors of %i in local table",
int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
node = Node(key)
neighbors = self.router.findNeighbors(node, exclude=source)
return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
return {'value': value}
async def callFindNode(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.find_node(address, self.sourceNode.id,
nodeToFind.id)
return self.handleCallResponse(result, nodeToAsk)
async def callFindValue(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.find_value(address, self.sourceNode.id,
nodeToFind.id)
return self.handleCallResponse(result, nodeToAsk)
async def callPing(self, nodeToAsk):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.ping(address, self.sourceNode.id)
return self.handleCallResponse(result, nodeToAsk)
async def callStore(self, nodeToAsk, key, value):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.store(address, self.sourceNode.id, key, value)
return self.handleCallResponse(result, nodeToAsk)
def welcomeIfNewNode(self, node):
"""
Given a new node, send it all the keys/values it should be storing,
then add it to the routing table.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
if not self.router.isNewNode(node):
return
log.info("never seen %s before, adding to router", node)
for key, value in self.storage.items():
keynode = Node(digest(key))
neighbors = self.router.findNeighbors(keynode)
if len(neighbors) > 0:
last = neighbors[-1].distanceTo(keynode)
newNodeClose = node.distanceTo(keynode) < last
first = neighbors[0].distanceTo(keynode)
thisNodeClosest = self.sourceNode.distanceTo(keynode) < first
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
asyncio.ensure_future(self.callStore(node, key, value))
self.router.addContact(node)
def handleCallResponse(self, result, node):
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if not result[0]:
log.warning("no response from %s, removing from router", node)
self.router.removeContact(node)
return result
log.info("got successful response from %s", node)
self.welcomeIfNewNode(node)
return result

185
libp2p/kademlia/routing.py Normal file
View File

@ -0,0 +1,185 @@
import heapq
import time
import operator
import asyncio
from collections import OrderedDict
from .kademlia.utils import OrderedSet, sharedPrefix, bytesToBitString
class KBucket:
def __init__(self, rangeLower, rangeUpper, ksize):
self.range = (rangeLower, rangeUpper)
self.nodes = OrderedDict()
self.replacementNodes = OrderedSet()
self.touchLastUpdated()
self.ksize = ksize
def touchLastUpdated(self):
self.lastUpdated = time.monotonic()
def getNodes(self):
return list(self.nodes.values())
def split(self):
midpoint = (self.range[0] + self.range[1]) / 2
one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two
bucket.nodes[node.id] = node
return (one, two)
def removeNode(self, node):
if node.id not in self.nodes:
return
# delete node, and see if we can add a replacement
del self.nodes[node.id]
if len(self.replacementNodes) > 0:
newnode = self.replacementNodes.pop()
self.nodes[newnode.id] = newnode
def hasInRange(self, node):
return self.range[0] <= node.long_id <= self.range[1]
def isNewNode(self, node):
return node.id not in self.nodes
def addNode(self, node):
"""
Add a C{Node} to the C{KBucket}. Return True if successful,
False if the bucket is full.
If the bucket is full, keep track of node in a replacement list,
per section 4.1 of the paper.
"""
if node.id in self.nodes:
del self.nodes[node.id]
self.nodes[node.id] = node
elif len(self) < self.ksize:
self.nodes[node.id] = node
else:
self.replacementNodes.push(node)
return False
return True
def depth(self):
vals = self.nodes.values()
sp = sharedPrefix([bytesToBitString(n.id) for n in vals])
return len(sp)
def head(self):
return list(self.nodes.values())[0]
def __getitem__(self, node_id):
return self.nodes.get(node_id, None)
def __len__(self):
return len(self.nodes)
class TableTraverser:
def __init__(self, table, startNode):
index = table.getBucketFor(startNode)
table.buckets[index].touchLastUpdated()
self.currentNodes = table.buckets[index].getNodes()
self.leftBuckets = table.buckets[:index]
self.rightBuckets = table.buckets[(index + 1):]
self.left = True
def __iter__(self):
return self
def __next__(self):
"""
Pop an item from the left subtree, then right, then left, etc.
"""
if len(self.currentNodes) > 0:
return self.currentNodes.pop()
if self.left and len(self.leftBuckets) > 0:
self.currentNodes = self.leftBuckets.pop().getNodes()
self.left = False
return next(self)
if len(self.rightBuckets) > 0:
self.currentNodes = self.rightBuckets.pop(0).getNodes()
self.left = True
return next(self)
raise StopIteration
class RoutingTable:
def __init__(self, protocol, ksize, node):
"""
@param node: The node that represents this server. It won't
be added to the routing table, but will be needed later to
determine which buckets to split or not.
"""
self.node = node
self.protocol = protocol
self.ksize = ksize
self.flush()
def flush(self):
self.buckets = [KBucket(0, 2 ** 160, self.ksize)]
def splitBucket(self, index):
one, two = self.buckets[index].split()
self.buckets[index] = one
self.buckets.insert(index + 1, two)
def getLonelyBuckets(self):
"""
Get all of the buckets that haven't been updated in over
an hour.
"""
hrago = time.monotonic() - 3600
return [b for b in self.buckets if b.lastUpdated < hrago]
def removeContact(self, node):
index = self.getBucketFor(node)
self.buckets[index].removeNode(node)
def isNewNode(self, node):
index = self.getBucketFor(node)
return self.buckets[index].isNewNode(node)
def addContact(self, node):
index = self.getBucketFor(node)
bucket = self.buckets[index]
# this will succeed unless the bucket is full
if bucket.addNode(node):
return
# Per section 4.2 of paper, split if the bucket has the node
# in its range or if the depth is not congruent to 0 mod 5
if bucket.hasInRange(self.node) or bucket.depth() % 5 != 0:
self.splitBucket(index)
self.addContact(node)
else:
asyncio.ensure_future(self.protocol.callPing(bucket.head()))
def getBucketFor(self, node):
"""
Get the index of the bucket that the given node would fall into.
"""
for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]:
return index
def findNeighbors(self, node, k=None, exclude=None):
k = k or self.ksize
nodes = []
for neighbor in TableTraverser(self, node):
notexcluded = exclude is None or not neighbor.sameHomeAs(exclude)
if neighbor.id != node.id and notexcluded:
heapq.heappush(nodes, (node.distanceTo(neighbor), neighbor))
if len(nodes) == k:
break
return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes)))

View File

@ -0,0 +1,97 @@
import time
from itertools import takewhile
import operator
from collections import OrderedDict
class IStorage:
"""
Local storage for this node.
IStorage implementations of get must return the same type as put in by set
"""
def __setitem__(self, key, value):
"""
Set a key to the given value.
"""
raise NotImplementedError
def __getitem__(self, key):
"""
Get the given key. If item doesn't exist, raises C{KeyError}
"""
raise NotImplementedError
def get(self, key, default=None):
"""
Get given key. If not found, return default.
"""
raise NotImplementedError
def iteritemsOlderThan(self, secondsOld):
"""
Return the an iterator over (key, value) tuples for items older
than the given secondsOld.
"""
raise NotImplementedError
def __iter__(self):
"""
Get the iterator for this storage, should yield tuple of (key, value)
"""
raise NotImplementedError
class ForgetfulStorage(IStorage):
def __init__(self, ttl=604800):
"""
By default, max age is a week.
"""
self.data = OrderedDict()
self.ttl = ttl
def __setitem__(self, key, value):
if key in self.data:
del self.data[key]
self.data[key] = (time.monotonic(), value)
self.cull()
def cull(self):
for _, _ in self.iteritemsOlderThan(self.ttl):
self.data.popitem(last=False)
def get(self, key, default=None):
self.cull()
if key in self.data:
return self[key]
return default
def __getitem__(self, key):
self.cull()
return self.data[key][1]
def __iter__(self):
self.cull()
return iter(self.data)
def __repr__(self):
self.cull()
return repr(self.data)
def iteritemsOlderThan(self, secondsOld):
minBirthday = time.monotonic() - secondsOld
zipped = self._tripleIterable()
matches = takewhile(lambda r: minBirthday >= r[1], zipped)
return list(map(operator.itemgetter(0, 2), matches))
def _tripleIterable(self):
ikeys = self.data.keys()
ibirthday = map(operator.itemgetter(0), self.data.values())
ivalues = map(operator.itemgetter(1), self.data.values())
return zip(ikeys, ibirthday, ivalues)
def items(self):
self.cull()
ikeys = self.data.keys()
ivalues = map(operator.itemgetter(1), self.data.values())
return zip(ikeys, ivalues)

57
libp2p/kademlia/utils.py Normal file
View File

@ -0,0 +1,57 @@
"""
General catchall for functions that don't make sense as methods.
"""
import hashlib
import operator
import asyncio
async def gather_dict(d):
cors = list(d.values())
results = await asyncio.gather(*cors)
return dict(zip(d.keys(), results))
def digest(s):
if not isinstance(s, bytes):
s = str(s).encode('utf8')
return hashlib.sha1(s).digest()
class OrderedSet(list):
"""
Acts like a list in all ways, except in the behavior of the
:meth:`push` method.
"""
def push(self, thing):
"""
1. If the item exists in the list, it's removed
2. The item is pushed to the end of the list
"""
if thing in self:
self.remove(thing)
self.append(thing)
def sharedPrefix(args):
"""
Find the shared prefix between the strings.
For instance:
sharedPrefix(['blahblah', 'blahwhat'])
returns 'blah'.
"""
i = 0
while i < min(map(len, args)):
if len(set(map(operator.itemgetter(i), args))) != 1:
break
i += 1
return args[0][:i]
def bytesToBitString(bites):
bits = [bin(bite)[2:].rjust(8, '0') for bite in bites]
return "".join(bits)

View File

@ -1,35 +0,0 @@
from Crypto.PublicKey import RSA
import multiaddr
from peer.peerstore import PeerStore
from peer.id import id_from_public_key
from network.swarm import Swarm
from host.basic_host import BasicHost
from transport.upgrader import TransportUpgrader
from transport.tcp.tcp import TCP
async def new_node(id_opt=None, transport_opt=None, \
muxer_opt=None, sec_opt=None, peerstore=None):
if id_opt is None:
new_key = RSA.generate(2048, e=65537)
id_opt = id_from_public_key(new_key.publickey())
# private_key = new_key.exportKey("PEM")
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
transport_opt = [multiaddr.Multiaddr(t) for t in transport_opt]
muxer_opt = muxer_opt or ["mplex/6.7.0"]
sec_opt = sec_opt or ["secio"]
peerstore = peerstore or PeerStore()
upgrader = TransportUpgrader(sec_opt, transport_opt)
swarm = Swarm(id_opt, peerstore, upgrader)
tcp = TCP()
swarm.add_transport(tcp)
await swarm.listen(transport_opt[0])
# TODO enable support for other host type
# TODO routing unimplemented
host = BasicHost(swarm)
return host

View File

View File

View File

@ -0,0 +1,25 @@
from .raw_connection_interface import IRawConnection
class RawConnection(IRawConnection):
def __init__(self, ip, port, reader, writer, initiator):
# pylint: disable=too-many-arguments
self.conn_ip = ip
self.conn_port = port
self.reader = reader
self.writer = writer
self._next_id = 0 if initiator else 1
self.initiator = initiator
def close(self):
self.writer.close()
def next_stream_id(self):
"""
Get next available stream id
:return: next available stream id for the connection
"""
next_id = self._next_id
self._next_id += 2
return next_id

View File

@ -0,0 +1,9 @@
from abc import ABC
# pylint: disable=too-few-public-methods
class IRawConnection(ABC):
"""
A Raw Connection provides a Reader and a Writer
"""

122
libp2p/network/multiaddr.py Normal file
View File

@ -0,0 +1,122 @@
class MultiAddr:
# Validates input string and constructs internal representation.
def __init__(self, addr):
self.protocol_map = dict()
# Empty multiaddrs are valid.
if not addr:
self.protocol_map = dict()
return
if not addr[0] == "/":
raise MultiAddrValueError("Invalid input multiaddr.")
addr = addr[1:]
protocol_map = dict()
split_addr = addr.split("/")
if not split_addr or len(split_addr) % 2 != 0:
raise MultiAddrValueError("Invalid input multiaddr.")
is_protocol = True
curr_protocol = ""
for addr_part in split_addr:
if is_protocol:
curr_protocol = addr_part
else:
protocol_map[curr_protocol] = addr_part
is_protocol = not is_protocol
# Basic validation of protocols
# TODO(rzajac): Add more validation as necessary.
if 'ip4' in self.protocol_map and 'ip6' in self.protocol_map:
raise MultiAddrValueError("Multiaddr should not specify two IP layers.")
if 'tcp' in self.protocol_map and 'udp' in self.protocol_map:
raise MultiAddrValueError("Multiaddr should not specify two transport layers.")
self.protocol_map = protocol_map
def get_protocols(self):
"""
:return: List of protocols contained in this multiaddr.
"""
return list(self.protocol_map.keys())
def get_protocol_value(self, protocol):
"""
Getter for protocol values in this multiaddr.
:param protocol: the protocol whose value to retrieve
:return: value of input protocol
"""
if protocol not in self.protocol_map:
return None
return self.protocol_map[protocol]
def add_protocol(self, protocol, value):
"""
Setter for protocol values in this multiaddr.
:param protocol: the protocol whose value to set or add
:param value: the value for the input protocol
:return: True if successful
"""
self.protocol_map[protocol] = value
return True
def remove_protocol(self, protocol):
"""
Remove protocol and its value from this multiaddr.
:param protocol: the protocol to remove
:return: True if remove succeeded, False if protocol was not contained in this multiaddr
"""
del self.protocol_map[protocol]
def get_multiaddr_string(self):
"""
:return: the string representation of this multiaddr.
"""
addr = ""
for protocol in self.protocol_map:
addr += "/" + protocol + "/" + self.get_protocol_value(protocol)
return addr
def to_options(self):
"""
Gives back a dictionary with access to transport information from this multiaddr.
Example: MultiAddr('/ip4/127.0.0.1/tcp/4001').to_options()
= { family: 'ipv4', host: '127.0.0.1', transport: 'tcp', port: '4001' }
:return: {{family: String, host: String, transport: String, port: String}}
with None if field does not exist
"""
options = dict()
if 'ip4' in self.protocol_map:
options['family'] = 'ipv4'
options['host'] = self.protocol_map['ip4']
elif 'ip6' in self.protocol_map:
options['family'] = 'ipv6'
options['host'] = self.protocol_map['ip6']
else:
options['family'] = None
options['host'] = None
if 'tcp' in self.protocol_map:
options['transport'] = 'tcp'
options['port'] = self.protocol_map['tcp']
elif 'udp' in self.protocol_map:
options['transport'] = 'udp'
options['port'] = self.protocol_map['udp']
else:
options['transport'] = None
options['port'] = None
return options
class MultiAddrValueError(ValueError):
"""Raised when the input string to the MultiAddr constructor was invalid."""

View File

@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
class INetwork(ABC):
@abstractmethod
def get_peer_id(self):
"""
:return: the peer id
"""
@abstractmethod
def dial_peer(self, peer_id):
"""
dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""
@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance
:return: true if successful
"""
@abstractmethod
def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id of destination
:param protocol_ids: available protocol ids to use for stream
:return: net stream instance
"""
@abstractmethod
def listen(self, *args):
"""
:param *args: one or many multiaddrs to start listening on
:return: True if at least one success
"""

View File

View File

@ -0,0 +1,43 @@
from .net_stream_interface import INetStream
class NetStream(INetStream):
def __init__(self, muxed_stream):
self.muxed_stream = muxed_stream
self.protocol_id = None
def get_protocol(self):
"""
:return: protocol id that stream runs on
"""
return self.protocol_id
def set_protocol(self, protocol_id):
"""
:param protocol_id: protocol id that stream runs on
:return: true if successful
"""
self.protocol_id = protocol_id
async def read(self):
"""
read from stream
:return: bytes of input until EOF
"""
return await self.muxed_stream.read()
async def write(self, data):
"""
write to stream
:return: number of bytes written
"""
return await self.muxed_stream.write(data)
async def close(self):
"""
close stream
:return: true if successful
"""
await self.muxed_stream.close()
return True

View File

@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
class INetStream(ABC):
@abstractmethod
def get_protocol(self):
"""
:return: protocol id that stream runs on
"""
@abstractmethod
def set_protocol(self, protocol_id):
"""
:param protocol_id: protocol id that stream runs on
:return: true if successful
"""
@abstractmethod
def read(self):
"""
reads from the underlying muxed_stream
:return: bytes of input
"""
@abstractmethod
def write(self, _bytes):
"""
write to the underlying muxed_stream
:return: number of bytes written
"""
@abstractmethod
def close(self):
"""
close the underlying muxed stream
:return: true if successful
"""

160
libp2p/network/swarm.py Normal file
View File

@ -0,0 +1,160 @@
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect import Multiselect
from .network_interface import INetwork
from .stream.net_stream import NetStream
from .connection.raw_connection import RawConnection
class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
def __init__(self, peer_id, peerstore, upgrader):
self.self_id = peer_id
self.peerstore = peerstore
self.upgrader = upgrader
self.connections = dict()
self.listeners = dict()
self.stream_handlers = dict()
self.transport = None
# Protocol muxing
self.multiselect = Multiselect()
self.multiselect_client = MultiselectClient()
def get_peer_id(self):
return self.self_id
def set_stream_handler(self, protocol_id, stream_handler):
"""
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance
:return: true if successful
"""
self.multiselect.add_handler(protocol_id, stream_handler)
return True
async def dial_peer(self, peer_id):
"""
dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
if not addrs:
raise SwarmException("No known addresses to peer")
# TODO: define logic to choose which address to use, or try them all ?
multiaddr = addrs[0]
if peer_id in self.connections:
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
muxed_conn = self.connections[peer_id]
else:
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(multiaddr)
# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
# Store muxed connection in connections
self.connections[peer_id] = muxed_conn
return muxed_conn
async def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id of destination
:param protocol_id: protocol id
:return: net stream instance
"""
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
if not addrs:
raise SwarmException("No known addresses to peer")
multiaddr = addrs[0]
muxed_conn = await self.dial_peer(peer_id)
# Use muxed conn to open stream, which returns
# a muxed stream
# TODO: Remove protocol id from being passed into muxed_conn
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], peer_id, multiaddr)
# Perform protocol muxing to determine protocol to use
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream)
# Create a net stream with the selected protocol
net_stream = NetStream(muxed_stream)
net_stream.set_protocol(selected_protocol)
return net_stream
async def listen(self, *args):
"""
:param *args: one or many multiaddrs to start listening on
:return: true if at least one success
For each multiaddr in args
Check if a listener for multiaddr exists already
If listener already exists, continue
Otherwise:
Capture multiaddr in conn handler
Have conn handler delegate to stream handler
Call listener listen with the multiaddr
Map multiaddr to listener
"""
for multiaddr in args:
if str(multiaddr) in self.listeners:
return True
async def conn_handler(reader, writer):
# Upgrade reader/write to a net_stream and pass \
# to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
multiaddr.value_for_protocol('tcp'), reader, writer, False)
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
# TODO: Remove protocol id from muxed_conn accept stream or
# move protocol muxing into accept_stream
muxed_stream, _, _ = await muxed_conn.accept_stream()
# Perform protocol muxing to determine protocol to use
selected_protocol, handler = await self.multiselect.negotiate(muxed_stream)
net_stream = NetStream(muxed_stream)
net_stream.set_protocol(selected_protocol)
# Give to stream handler
# TODO: handle case when stream handler is set
# TODO: handle case of multiple protocols over same raw connection
await handler(net_stream)
try:
# Success
listener = self.transport.create_listener(conn_handler)
self.listeners[str(multiaddr)] = listener
await listener.listen(multiaddr)
return True
except IOError:
# Failed. Continue looping.
print("Failed to connect to: " + str(multiaddr))
# No multiaddr succeeded
return False
def add_transport(self, transport):
# TODO: Support more than one transport
self.transport = transport
class SwarmException(Exception):
pass

5
libp2p/peer/README.md Normal file
View File

@ -0,0 +1,5 @@
# PeerStore
The PeerStore contains a mapping of peer IDs to PeerData objects. Each PeerData object represents a peer, and each PeerData contains a collection of protocols, addresses, and a mapping of metadata. PeerStore implements the IPeerStore (peer protocols), IAddrBook (address book), and IPeerMetadata (peer metadata) interfaces, which allows the peer store to effectively function as a dictionary for peer ID to protocol, address, and metadata.
Note: PeerInfo represents a read-only summary of a PeerData object. Only the attributes assigned in PeerInfo are readable by references to PeerInfo objects.

0
libp2p/peer/__init__.py Normal file
View File

View File

@ -0,0 +1,47 @@
from abc import ABC, abstractmethod
class IAddrBook(ABC):
def __init__(self):
pass
@abstractmethod
def add_addr(self, peer_id, addr, ttl):
"""
Calls add_addrs(peer_id, [addr], ttl)
:param peer_id: the peer to add address for
:param addr: multiaddress of the peer
:param ttl: time-to-live for the address (after this time, address is no longer valid)
"""
@abstractmethod
def add_addrs(self, peer_id, addrs, ttl):
"""
Adds addresses for a given peer all with the same time-to-live. If one of the
addresses already exists for the peer and has a longer TTL, no operation should take place.
If one of the addresses exists with a shorter TTL, extend the TTL to equal param ttl.
:param peer_id: the peer to add address for
:param addr: multiaddresses of the peer
:param ttl: time-to-live for the address (after this time, address is no longer valid
"""
@abstractmethod
def addrs(self, peer_id):
"""
:param peer_id: peer to get addresses of
:return: all known (and valid) addresses for the given peer
"""
@abstractmethod
def clear_addrs(self, peer_id):
"""
Removes all previously stored addresses
:param peer_id: peer to remove addresses of
"""
@abstractmethod
def peers_with_addrs(self):
"""
:return: all of the peer IDs stored with addresses
"""

66
libp2p/peer/id.py Normal file
View File

@ -0,0 +1,66 @@
import base58
import multihash
# MaxInlineKeyLength is the maximum length a key can be for it to be inlined in
# the peer ID.
# * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the
# identity multihash hash of the public key.
# * When `len(pubKey.Bytes()) > MaxInlineKeyLength`, the peer ID is the
# sha2-256 multihash of the public key.
MAX_INLINE_KEY_LENGTH = 42
class ID:
def __init__(self, id_str):
self._id_str = id_str
def pretty(self):
return base58.b58encode(self._id_str).decode()
def __str__(self):
pid = self.pretty()
if len(pid) <= 10:
return "<peer.ID %s>" % pid
return "<peer.ID %s*%s>" % (pid[:2], pid[len(pid)-6:])
__repr__ = __str__
def __eq__(self, other):
#pylint: disable=protected-access
return self._id_str == other._id_str
def __hash__(self):
return hash(self._id_str)
def id_b58_encode(peer_id):
"""
return a b58-encoded string
"""
#pylint: disable=protected-access
return base58.b58encode(peer_id._id_str).decode()
def id_b58_decode(peer_id_str):
"""
return a base58-decoded peer ID
"""
return ID(base58.b58decode(peer_id_str))
def id_from_public_key(key):
# export into binary format
key_bin = key.exportKey("DER")
algo = multihash.Func.sha2_256
# TODO: seems identity is not yet supported in pymultihash
# if len(b) <= MAX_INLINE_KEY_LENGTH:
# algo multihash.func.identity
mh_digest = multihash.digest(key_bin, algo)
return ID(mh_digest.encode())
def id_from_private_key(key):
return id_from_public_key(key.publickey())

39
libp2p/peer/peerdata.py Normal file
View File

@ -0,0 +1,39 @@
from .peerdata_interface import IPeerData
class PeerData(IPeerData):
def __init__(self):
self.metadata = {}
self.protocols = []
self.addrs = []
def get_protocols(self):
return self.protocols
def add_protocols(self, protocols):
self.protocols.extend(protocols)
def set_protocols(self, protocols):
self.protocols = protocols
def add_addrs(self, addrs):
self.addrs.extend(addrs)
def get_addrs(self):
return self.addrs
def clear_addrs(self):
self.addrs = []
def put_metadata(self, key, val):
self.metadata[key] = val
def get_metadata(self, key):
if key in self.metadata:
return self.metadata[key]
raise PeerDataError("key not found")
class PeerDataError(KeyError):
"""Raised when a key is not found in peer metadata"""

View File

@ -0,0 +1,56 @@
from abc import ABC, abstractmethod
class IPeerData(ABC):
@abstractmethod
def get_protocols(self):
"""
:return: all protocols associated with given peer
"""
@abstractmethod
def add_protocols(self, protocols):
"""
:param protocols: protocols to add
"""
@abstractmethod
def set_protocols(self, protocols):
"""
:param protocols: protocols to add
"""
@abstractmethod
def add_addrs(self, addrs):
"""
:param addrs: multiaddresses to add
"""
@abstractmethod
def get_addrs(self):
"""
:return: all multiaddresses
"""
@abstractmethod
def clear_addrs(self):
"""
Clear all addresses
"""
@abstractmethod
def put_metadata(self, key, val):
"""
:param key: key in KV pair
:param val: val to associate with key
:raise Exception: unsuccesful put
"""
@abstractmethod
def get_metadata(self, key):
"""
:param key: key in KV pair
:return: val for key
:raise Exception: key not found
"""

43
libp2p/peer/peerinfo.py Normal file
View File

@ -0,0 +1,43 @@
import multiaddr
import multiaddr.util
from .id import id_b58_decode
from .peerdata import PeerData
class PeerInfo:
# pylint: disable=too-few-public-methods
def __init__(self, peer_id, peer_data):
self.peer_id = peer_id
self.addrs = peer_data.get_addrs()
def info_from_p2p_addr(addr):
if not addr:
raise InvalidAddrError()
parts = multiaddr.util.split(addr)
if not parts:
raise InvalidAddrError()
p2p_part = parts[-1]
if p2p_part.protocols()[0].code != multiaddr.protocols.P_P2P:
raise InvalidAddrError()
# make sure the /p2p value parses as a peer.ID
peer_id_str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P)
peer_id = id_b58_decode(peer_id_str)
# we might have received just an / p2p part, which means there's no addr.
if len(parts) > 1:
addr = multiaddr.util.join(parts[:-1])
peer_data = PeerData()
peer_data.addrs = [addr]
peer_data.protocols = [p.code for p in addr.protocols()]
return PeerInfo(peer_id, peer_data)
class InvalidAddrError(ValueError):
pass

View File

@ -0,0 +1,25 @@
from abc import ABC, abstractmethod
class IPeerMetadata(ABC):
def __init__(self):
pass
@abstractmethod
def get(self, peer_id, key):
"""
:param peer_id: peer ID to lookup key for
:param key: key to look up
:return: value at key for given peer
:raise Exception: peer ID not found
"""
@abstractmethod
def put(self, peer_id, key, val):
"""
:param peer_id: peer ID to lookup key for
:param key: key to associate with peer
:param val: value to associated with key
:raise Exception: unsuccessful put
"""

89
libp2p/peer/peerstore.py Normal file
View File

@ -0,0 +1,89 @@
from .peerstore_interface import IPeerStore
from .peerdata import PeerData
from .peerinfo import PeerInfo
class PeerStore(IPeerStore):
def __init__(self):
IPeerStore.__init__(self)
self.peer_map = {}
def __create_or_get_peer(self, peer_id):
"""
Returns the peer data for peer_id or creates a new
peer data (and stores it in peer_map) if peer
data for peer_id does not yet exist
:param peer_id: peer ID
:return: peer data
"""
if peer_id in self.peer_map:
return self.peer_map[peer_id]
data = PeerData()
self.peer_map[peer_id] = data
return self.peer_map[peer_id]
def peer_info(self, peer_id):
if peer_id in self.peer_map:
peer = self.peer_map[peer_id]
return PeerInfo(peer_id, peer)
return None
def get_protocols(self, peer_id):
if peer_id in self.peer_map:
return self.peer_map[peer_id].get_protocols()
raise PeerStoreError("peer ID not found")
def add_protocols(self, peer_id, protocols):
peer = self.__create_or_get_peer(peer_id)
peer.add_protocols(protocols)
def set_protocols(self, peer_id, protocols):
peer = self.__create_or_get_peer(peer_id)
peer.set_protocols(protocols)
def peers(self):
return list(self.peer_map.keys())
def get(self, peer_id, key):
if peer_id in self.peer_map:
val = self.peer_map[peer_id].get_metadata(key)
return val
raise PeerStoreError("peer ID not found")
def put(self, peer_id, key, val):
# <<?>>
# This can output an error, not sure what the possible errors are
peer = self.__create_or_get_peer(peer_id)
peer.put_metadata(key, val)
def add_addr(self, peer_id, addr, ttl):
self.add_addrs(peer_id, [addr], ttl)
def add_addrs(self, peer_id, addrs, ttl):
# Ignore ttl for now
peer = self.__create_or_get_peer(peer_id)
peer.add_addrs(addrs)
def addrs(self, peer_id):
if peer_id in self.peer_map:
return self.peer_map[peer_id].get_addrs()
raise PeerStoreError("peer ID not found")
def clear_addrs(self, peer_id):
# Only clear addresses if the peer is in peer map
if peer_id in self.peer_map:
self.peer_map[peer_id].clear_addrs()
def peers_with_addrs(self):
# Add all peers with addrs at least 1 to output
output = []
for key in self.peer_map:
if len(self.peer_map[key].get_addrs()) >= 1:
output.append(key)
return output
class PeerStoreError(KeyError):
"""Raised when peer ID is not found in peer store"""

View File

@ -0,0 +1,48 @@
from abc import abstractmethod
from .addrbook_interface import IAddrBook
from .peermetadata_interface import IPeerMetadata
class IPeerStore(IAddrBook, IPeerMetadata):
def __init__(self):
IPeerMetadata.__init__(self)
IAddrBook.__init__(self)
@abstractmethod
def peer_info(self, peer_id):
"""
:param peer_id: peer ID to get info for
:return: peer info object
"""
@abstractmethod
def get_protocols(self, peer_id):
"""
:param peer_id: peer ID to get protocols for
:return: protocols (as strings)
:raise Exception: peer ID not found exception
"""
@abstractmethod
def add_protocols(self, peer_id, protocols):
"""
:param peer_id: peer ID to add protocols for
:param protocols: protocols to add
:raise Exception: peer ID not found
"""
@abstractmethod
def set_protocols(self, peer_id, protocols):
"""
:param peer_id: peer ID to set protocols for
:param protocols: protocols to set
:raise Exception: peer ID not found
"""
@abstractmethod
def peers(self):
"""
:return: all of the peer IDs stored in peer store
"""

View File

View File

@ -0,0 +1,96 @@
from .multiselect_muxer_interface import IMultiselectMuxer
from .multiselect_communicator import MultiselectCommunicator
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
PROTOCOL_NOT_FOUND_MSG = "na"
class Multiselect(IMultiselectMuxer):
"""
Multiselect module that is responsible for responding to
a multiselect client and deciding on
a specific protocol and handler pair to use for communication
"""
def __init__(self):
self.handlers = {}
def add_handler(self, protocol, handler):
"""
Store the handler with the given protocol
:param protocol: protocol name
:param handler: handler function
"""
self.handlers[protocol] = handler
async def negotiate(self, stream):
"""
Negotiate performs protocol selection
:param stream: stream to negotiate on
:return: selected protocol name, handler function
:raise Exception: negotiation failed exception
"""
# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)
# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)
# Read and respond to commands until a valid protocol ID is sent
while True:
# Read message
command = await communicator.read_stream_until_eof()
# Command is ls or a protocol
if command == "ls":
# TODO: handle ls command
pass
else:
protocol = command
if protocol in self.handlers:
# Tell counterparty we have decided on a protocol
await communicator.write(protocol)
# Return the decided on protocol
return protocol, self.handlers[protocol]
# Tell counterparty this protocol was not found
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
async def handshake(self, communicator):
"""
Perform handshake to agree on multiselect protocol
:param communicator: communicator to use
:raise Exception: error in handshake
"""
# TODO: Use format used by go repo for messages
# Send our MULTISELECT_PROTOCOL_ID to other party
await communicator.write(MULTISELECT_PROTOCOL_ID)
# Read in the protocol ID from other party
handshake_contents = await communicator.read_stream_until_eof()
# Confirm that the protocols are the same
if not validate_handshake(handshake_contents):
raise MultiselectError("multiselect protocol ID mismatch")
# Handshake succeeded if this point is reached
def validate_handshake(handshake_contents):
"""
Determine if handshake is valid and should be confirmed
:param handshake_contents: contents of handshake message
:return: true if handshake is complete, false otherwise
"""
# TODO: Modify this when format used by go repo for messages
# is added
return handshake_contents == MULTISELECT_PROTOCOL_ID
class MultiselectError(ValueError):
"""Raised when an error occurs in multiselect process"""

View File

@ -0,0 +1,124 @@
from .multiselect_client_interface import IMultiselectClient
from .multiselect_communicator import MultiselectCommunicator
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
PROTOCOL_NOT_FOUND_MSG = "na"
class MultiselectClient(IMultiselectClient):
"""
Client for communicating with receiver's multiselect
module in order to select a protocol id to communicate over
"""
def __init__(self):
pass
async def handshake(self, communicator):
"""
Ensure that the client and multiselect
are both using the same multiselect protocol
:param stream: stream to communicate with multiselect over
:raise Exception: multiselect protocol ID mismatch
"""
# TODO: Use format used by go repo for messages
# Send our MULTISELECT_PROTOCOL_ID to counterparty
await communicator.write(MULTISELECT_PROTOCOL_ID)
# Read in the protocol ID from other party
handshake_contents = await communicator.read_stream_until_eof()
# Confirm that the protocols are the same
if not validate_handshake(handshake_contents):
raise MultiselectClientError("multiselect protocol ID mismatch")
# Handshake succeeded if this point is reached
async def select_protocol_or_fail(self, protocol, stream):
"""
Send message to multiselect selecting protocol
and fail if multiselect does not return same protocol
:param protocol: protocol to select
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""
# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)
# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)
# Try to select the given protocol
selected_protocol = await self.try_select(communicator, protocol)
return selected_protocol
async def select_one_of(self, protocols, stream):
"""
For each protocol, send message to multiselect selecting protocol
and fail if multiselect does not return same protocol. Returns first
protocol that multiselect agrees on (i.e. that multiselect selects)
:param protocol: protocol to select
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""
# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)
# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)
# For each protocol, attempt to select that protocol
# and return the first protocol selected
for protocol in protocols:
try:
selected_protocol = await self.try_select(communicator, protocol)
return selected_protocol
except MultiselectClientError:
pass
# No protocols were found, so return no protocols supported error
raise MultiselectClientError("protocols not supported")
async def try_select(self, communicator, protocol):
"""
Try to select the given protocol or raise exception if fails
:param communicator: communicator to use to communicate with counterparty
:param protocol: protocol to select
:raise Exception: error in protocol selection
:return: selected protocol
"""
# Tell counterparty we want to use protocol
await communicator.write(protocol)
# Get what counterparty says in response
response = await communicator.read_stream_until_eof()
# Return protocol if response is equal to protocol or raise error
if response == protocol:
return protocol
if response == PROTOCOL_NOT_FOUND_MSG:
raise MultiselectClientError("protocol not supported")
else:
raise MultiselectClientError("unrecognized response: " + response)
def validate_handshake(handshake_contents):
"""
Determine if handshake is valid and should be confirmed
:param handshake_contents: contents of handshake message
:return: true if handshake is complete, false otherwise
"""
# TODO: Modify this when format used by go repo for messages
# is added
return handshake_contents == MULTISELECT_PROTOCOL_ID
class MultiselectClientError(ValueError):
"""Raised when an error occurs in protocol selection process"""

View File

@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
class IMultiselectClient(ABC):
"""
Client for communicating with receiver's multiselect
module in order to select a protocol id to communicate over
"""
@abstractmethod
def select_protocol_or_fail(self, protocol, stream):
"""
Send message to multiselect selecting protocol
and fail if multiselect does not return same protocol
:param protocol: protocol to select
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""
@abstractmethod
def select_one_of(self, protocols, stream):
"""
For each protocol, send message to multiselect selecting protocol
and fail if multiselect does not return same protocol. Returns first
protocol that multiselect agrees on (i.e. that multiselect selects)
:param protocol: protocol to select
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""

View File

@ -0,0 +1,26 @@
from .multiselect_communicator_interface import IMultiselectCommunicator
class MultiselectCommunicator(IMultiselectCommunicator):
"""
Communicator helper class that ensures both the client
and multistream module will follow the same multistream protocol,
which is necessary for them to work
"""
def __init__(self, stream):
self.stream = stream
async def write(self, msg_str):
"""
Write message to stream
:param msg_str: message to write
"""
await self.stream.write(msg_str.encode())
async def read_stream_until_eof(self):
"""
Reads message from stream until EOF
"""
read_str = (await self.stream.read()).decode()
return read_str

View File

@ -0,0 +1,22 @@
from abc import ABC, abstractmethod
class IMultiselectCommunicator(ABC):
"""
Communicator helper class that ensures both the client
and multistream module will follow the same multistream protocol,
which is necessary for them to work
"""
@abstractmethod
def write(self, msg_str):
"""
Write message to stream
:param msg_str: message to write
"""
@abstractmethod
def read_stream_until_eof(self):
"""
Reads message from stream until EOF
"""

View File

@ -0,0 +1,26 @@
from abc import ABC, abstractmethod
class IMultiselectMuxer(ABC):
"""
Multiselect module that is responsible for responding to
a multiselect client and deciding on
a specific protocol and handler pair to use for communication
"""
@abstractmethod
def add_handler(self, protocol, handler):
"""
Store the handler with the given protocol
:param protocol: protocol name
:param handler: handler function
"""
@abstractmethod
def negotiate(self, stream):
"""
Negotiate performs protocol selection
:param stream: stream to negotiate on
:return: selected protocol name, handler function
:raise Exception: negotiation failed exception
"""

View File

View File

View File

@ -0,0 +1,6 @@
HEADER_TAGS = {
"NEW_STREAM": 0,
"MESSAGE": 2,
"CLOSE": 4,
"RESET": 6
}

View File

@ -0,0 +1,172 @@
import asyncio
from .utils import encode_uvarint, decode_uvarint_from_stream
from .mplex_stream import MplexStream
from ..muxed_connection_interface import IMuxedConn
class Mplex(IMuxedConn):
# pylint: disable=too-many-instance-attributes
"""
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
"""
def __init__(self, conn):
"""
create a new muxed connection
:param conn: an instance of raw connection
:param initiator: boolean to prevent multiplex with self
"""
self.raw_conn = conn
self.initiator = conn.initiator
# Mapping from stream ID -> buffer of messages for that stream
self.buffers = {}
self.stream_queue = asyncio.Queue()
self.data_buffer = bytearray()
# The initiator of the raw connection need not read upon construction time.
# It should read when the user decides that it wants to read from the constructed stream.
if not self.initiator:
asyncio.ensure_future(self.handle_incoming())
def close(self):
"""
close the stream muxer and underlying raw connection
"""
self.raw_conn.close()
def is_closed(self):
"""
check connection is fully closed
:return: true if successful
"""
async def read_buffer(self, stream_id):
"""
Read a message from stream_id's buffer, check raw connection for new messages
:param stream_id: stream id of stream to read from
:return: message read
"""
# Empty buffer or nonexistent stream
# TODO: propagate up timeout exception and catch
if stream_id not in self.buffers or self.buffers[stream_id].empty():
await self.handle_incoming()
if stream_id in self.buffers:
return await self._read_buffer_exists(stream_id)
return None
async def _read_buffer_exists(self, stream_id):
"""
Reads from raw connection with the assumption that the message buffer for stream_id exsits
:param stream_id: stream id of stream to read from
:return: message read
"""
try:
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=5)
return data
except asyncio.TimeoutError:
return None
async def open_stream(self, protocol_id, peer_id, multi_addr):
"""
creates a new muxed_stream
:param protocol_id: protocol_id of stream
:param stream_id: stream_id of stream
:param peer_id: peer_id that stream connects to
:param multi_addr: multi_addr that stream connects to
:return: a new stream
"""
stream_id = self.raw_conn.next_stream_id()
stream = MplexStream(stream_id, multi_addr, self)
self.buffers[stream_id] = asyncio.Queue()
return stream
async def accept_stream(self):
"""
accepts a muxed stream opened by the other end
:return: the accepted stream
"""
# TODO update to pull out protocol_id from message
protocol_id = "/echo/1.0.0"
stream_id = await self.stream_queue.get()
stream = MplexStream(stream_id, False, self)
return stream, stream_id, protocol_id
async def send_message(self, flag, data, stream_id):
"""
sends a message over the connection
:param header: header to use
:param data: data to send in the message
:param stream_id: stream the message is in
:return: True if success
"""
# << by 3, then or with flag
header = (stream_id << 3) | flag
header = encode_uvarint(header)
if data is None:
data_length = encode_uvarint(0)
_bytes = header + data_length
else:
data_length = encode_uvarint(len(data))
_bytes = header + data_length + data
return await self.write_to_stream(_bytes)
async def write_to_stream(self, _bytes):
"""
writes a byte array to a raw connection
:param _bytes: byte array to write
:return: length written
"""
self.raw_conn.writer.write(_bytes)
await self.raw_conn.writer.drain()
return len(_bytes)
async def handle_incoming(self):
"""
Read a message off of the raw connection and add it to the corresponding message buffer
"""
# TODO Deal with other types of messages using flag (currently _)
# TODO call read_message in loop to handle case message for other stream was in conn
stream_id, _, message = await self.read_message()
if stream_id not in self.buffers:
self.buffers[stream_id] = asyncio.Queue()
await self.stream_queue.put(stream_id)
await self.buffers[stream_id].put(message)
async def read_chunk(self):
"""
Read a chunk of bytes off of the raw connection into data_buffer
"""
# unused now but possibly useful in the future
try:
chunk = await asyncio.wait_for(self.raw_conn.reader.read(-1), timeout=5)
self.data_buffer += chunk
except asyncio.TimeoutError:
print('timeout!')
return
async def read_message(self):
"""
Read a single message off of the raw connection
:return: stream_id, flag, message contents
"""
try:
header = await decode_uvarint_from_stream(self.raw_conn.reader)
length = await decode_uvarint_from_stream(self.raw_conn.reader)
message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=5)
except asyncio.TimeoutError:
print("message malformed")
return None, None, None
flag = header & 0x07
stream_id = header >> 3
return stream_id, flag, message

View File

@ -0,0 +1,124 @@
import asyncio
from .constants import HEADER_TAGS
from ..muxed_stream_interface import IMuxedStream
class MplexStream(IMuxedStream):
# pylint: disable=too-many-instance-attributes
"""
reference: https://github.com/libp2p/go-mplex/blob/master/stream.go
"""
def __init__(self, stream_id, initiator, mplex_conn):
"""
create new MuxedStream in muxer
:param stream_id: stream stream id
:param initiator: boolean if this is an initiator
:param mplex_conn: muxed connection of this muxed_stream
"""
self.stream_id = stream_id
self.initiator = initiator
self.mplex_conn = mplex_conn
self.read_deadline = None
self.write_deadline = None
self.local_closed = False
self.remote_closed = False
self.stream_lock = asyncio.Lock()
def get_flag(self, action):
"""
get header flag based on action for mplex
:param action: action type in str
:return: int flag
"""
if self.initiator:
return HEADER_TAGS[action]
return HEADER_TAGS[action] - 1
async def read(self):
"""
read messages associated with stream from buffer til end of file
:return: bytes of input
"""
return await self.mplex_conn.read_buffer(self.stream_id)
async def write(self, data):
"""
write to stream
:return: number of bytes written
"""
return await self.mplex_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id)
async def close(self):
"""
Closing a stream closes it for writing and closes the remote end for reading
but allows writing in the other direction.
:return: true if successful
"""
# TODO error handling with timeout
# TODO understand better how mutexes are used from go repo
await self.mplex_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id)
remote_lock = ""
async with self.stream_lock:
if self.local_closed:
return True
self.local_closed = True
remote_lock = self.remote_closed
if remote_lock:
async with self.mplex_conn.conn_lock:
self.mplex_conn.buffers.pop(self.stream_id)
return True
async def reset(self):
"""
closes both ends of the stream
tells this remote side to hang up
:return: true if successful
"""
# TODO understand better how mutexes are used here
# TODO understand the difference between close and reset
async with self.stream_lock:
if self.remote_closed and self.local_closed:
return True
if not self.remote_closed:
await self.mplex_conn.send_message(self.get_flag("RESET"), None, self.stream_id)
self.local_closed = True
self.remote_closed = True
async with self.mplex_conn.conn_lock:
self.mplex_conn.buffers.pop(self.stream_id, None)
return True
# TODO deadline not in use
def set_deadline(self, ttl):
"""
set deadline for muxed stream
:return: True if successful
"""
self.read_deadline = ttl
self.write_deadline = ttl
return True
def set_read_deadline(self, ttl):
"""
set read deadline for muxed stream
:return: True if successful
"""
self.read_deadline = ttl
return True
def set_write_deadline(self, ttl):
"""
set write deadline for muxed stream
:return: True if successful
"""
self.write_deadline = ttl
return True

View File

@ -0,0 +1,44 @@
import asyncio
import struct
def encode_uvarint(number):
"""Pack `number` into varint bytes"""
buf = b''
while True:
towrite = number & 0x7f
number >>= 7
if number:
buf += bytes((towrite | 0x80, ))
else:
buf += bytes((towrite, ))
break
return buf
def decode_uvarint(buff, index):
shift = 0
result = 0
while True:
i = buff[index]
result |= (i & 0x7f) << shift
shift += 7
if not i & 0x80:
break
index += 1
return result, index + 1
async def decode_uvarint_from_stream(reader):
shift = 0
result = 0
while True:
byte = await asyncio.wait_for(reader.read(1), timeout=5)
i = struct.unpack('>H', b'\x00' + byte)[0]
result |= (i & 0x7f) << shift
shift += 7
if not i & 0x80:
break
return result

View File

@ -0,0 +1,39 @@
from abc import ABC, abstractmethod
class IMuxedConn(ABC):
"""
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
"""
@abstractmethod
def close(self):
"""
close connection
:return: true if successful
"""
@abstractmethod
def is_closed(self):
"""
check connection is fully closed
:return: true if successful
"""
@abstractmethod
def open_stream(self, protocol_id, peer_id, multi_addr):
"""
creates a new muxed_stream
:param protocol_id: protocol_id of stream
:param stream_id: stream_id of stream
:param peer_id: peer_id that stream connects to
:param multi_addr: multi_addr that stream connects to
:return: a new stream
"""
@abstractmethod
def accept_stream(self):
"""
accepts a muxed stream opened by the other end
:return: the accepted stream
"""

View File

@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
class IMuxedStream(ABC):
@abstractmethod
def read(self):
"""
reads from the underlying muxed_conn
:return: bytes of input
"""
@abstractmethod
def write(self, _bytes):
"""
writes to the underlying muxed_conn
:return: number of bytes written
"""
@abstractmethod
def close(self):
"""
close the underlying muxed_conn
:return: true if successful
"""
@abstractmethod
def reset(self):
"""
closes both ends of the stream
tells this remote side to hang up
:return: error/exception
"""
@abstractmethod
def set_deadline(self, ttl):
"""
set deadline for muxed stream
:return: a new stream
"""

View File

View File

View File

@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
class IListener(ABC):
@abstractmethod
def listen(self, multiaddr):
"""
put listener in listening mode and wait for incoming connections
:param multiaddr: multiaddr of peer
:return: return True if successful
"""
@abstractmethod
def get_addrs(self):
"""
retrieve list of addresses the listener is listening on
:return: return list of addrs
"""
@abstractmethod
def close(self, options=None):
"""
close the listener such that no more connections
can be open on this transport instance
:param options: optional object potential with timeout
a timeout value in ms that fires and destroy all connections
:return: return True if successful
"""

View File

@ -0,0 +1,92 @@
import asyncio
import multiaddr
from libp2p.network.connection.raw_connection import RawConnection
from ..listener_interface import IListener
from ..transport_interface import ITransport
class TCP(ITransport):
def __init__(self):
self.listener = self.Listener()
class Listener(IListener):
def __init__(self, handler_function=None):
self.multiaddrs = []
self.server = None
self.handler = handler_function
async def listen(self, multiaddr):
"""
put listener in listening mode and wait for incoming connections
:param multiaddr: multiaddr of peer
:return: return True if successful
"""
_multiaddr = multiaddr
_multiaddr = _multiaddr.decapsulate('/p2p')
coroutine = asyncio.start_server(self.handler,
_multiaddr.value_for_protocol('ip4'),
_multiaddr.value_for_protocol('tcp'))
self.server = await coroutine
socket = self.server.sockets[0]
self.multiaddrs.append(_multiaddr_from_socket(socket))
return True
def get_addrs(self):
"""
retrieve list of addresses the listener is listening on
:return: return list of addrs
"""
# TODO check if server is listening
return self.multiaddrs
def close(self, options=None):
"""
close the listener such that no more connections
can be open on this transport instance
:param options: optional object potential with timeout
a timeout value in ms that fires and destroy all connections
:return: return True if successful
"""
if self.server is None:
return False
self.server.close()
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.server.wait_closed())
_loop.close()
self.server = None
return True
async def dial(self, multiaddr, options=None):
"""
dial a transport to peer listening on multiaddr
:param multiaddr: multiaddr of peer
:param options: optional object
:return: True if successful
"""
host = multiaddr.value_for_protocol('ip4')
port = int(multiaddr.value_for_protocol('tcp'))
reader, writer = await asyncio.open_connection(host, port)
return RawConnection(host, port, reader, writer, True)
def create_listener(self, handler_function, options=None):
"""
create listener on transport
:param options: optional object with properties the listener must have
:param handler_function: a function called when a new connection is received
that takes a connection as argument which implements interface-connection
:return: a listener object that implements listener_interface.py
"""
return self.Listener(handler_function)
def _multiaddr_from_socket(socket):
return multiaddr.Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname())

View File

@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
class ITransport(ABC):
@abstractmethod
def dial(self, multiaddr, options=None):
"""
dial a transport to peer listening on multiaddr
:param multiaddr: multiaddr of peer
:param options: optional object
:return: list of multiaddrs
"""
@abstractmethod
def create_listener(self, handler_function, options=None):
"""
create listener on transport
:param options: optional object with properties the listener must have
:param handler_function: a function called when a new conntion is received
that takes a connection as argument which implements interface-connection
:return: a listener object that implements listener_interface.py
"""

View File

@ -0,0 +1,27 @@
from libp2p.stream_muxer.mplex.mplex import Mplex
class TransportUpgrader:
# pylint: disable=no-self-use
def __init__(self, secOpt, muxerOpt):
self.sec = secOpt
self.muxer = muxerOpt
def upgrade_listener(self, transport, listeners):
"""
upgrade multiaddr listeners to libp2p-transport listeners
"""
def upgrade_security(self):
pass
def upgrade_connection(self, conn):
"""
upgrade raw connection to muxed connection
"""
# For PoC, no security, default to mplex
# TODO do exchange to determine multiplexer
return Mplex(conn)