mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-03-30 00:51:27 +00:00
Migrate to new project structure.
This commit is contained in:
0
libp2p/network/__init__.py
Normal file
0
libp2p/network/__init__.py
Normal file
0
libp2p/network/connection/__init__.py
Normal file
0
libp2p/network/connection/__init__.py
Normal file
25
libp2p/network/connection/raw_connection.py
Normal file
25
libp2p/network/connection/raw_connection.py
Normal file
@ -0,0 +1,25 @@
|
||||
from .raw_connection_interface import IRawConnection
|
||||
|
||||
|
||||
class RawConnection(IRawConnection):
|
||||
|
||||
def __init__(self, ip, port, reader, writer, initiator):
|
||||
# pylint: disable=too-many-arguments
|
||||
self.conn_ip = ip
|
||||
self.conn_port = port
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
self._next_id = 0 if initiator else 1
|
||||
self.initiator = initiator
|
||||
|
||||
def close(self):
|
||||
self.writer.close()
|
||||
|
||||
def next_stream_id(self):
|
||||
"""
|
||||
Get next available stream id
|
||||
:return: next available stream id for the connection
|
||||
"""
|
||||
next_id = self._next_id
|
||||
self._next_id += 2
|
||||
return next_id
|
||||
9
libp2p/network/connection/raw_connection_interface.py
Normal file
9
libp2p/network/connection/raw_connection_interface.py
Normal file
@ -0,0 +1,9 @@
|
||||
from abc import ABC
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
|
||||
class IRawConnection(ABC):
|
||||
"""
|
||||
A Raw Connection provides a Reader and a Writer
|
||||
"""
|
||||
122
libp2p/network/multiaddr.py
Normal file
122
libp2p/network/multiaddr.py
Normal file
@ -0,0 +1,122 @@
|
||||
class MultiAddr:
|
||||
|
||||
# Validates input string and constructs internal representation.
|
||||
def __init__(self, addr):
|
||||
self.protocol_map = dict()
|
||||
|
||||
# Empty multiaddrs are valid.
|
||||
if not addr:
|
||||
self.protocol_map = dict()
|
||||
return
|
||||
|
||||
if not addr[0] == "/":
|
||||
raise MultiAddrValueError("Invalid input multiaddr.")
|
||||
|
||||
addr = addr[1:]
|
||||
protocol_map = dict()
|
||||
split_addr = addr.split("/")
|
||||
|
||||
if not split_addr or len(split_addr) % 2 != 0:
|
||||
raise MultiAddrValueError("Invalid input multiaddr.")
|
||||
|
||||
is_protocol = True
|
||||
curr_protocol = ""
|
||||
|
||||
for addr_part in split_addr:
|
||||
if is_protocol:
|
||||
curr_protocol = addr_part
|
||||
else:
|
||||
protocol_map[curr_protocol] = addr_part
|
||||
is_protocol = not is_protocol
|
||||
|
||||
# Basic validation of protocols
|
||||
# TODO(rzajac): Add more validation as necessary.
|
||||
if 'ip4' in self.protocol_map and 'ip6' in self.protocol_map:
|
||||
raise MultiAddrValueError("Multiaddr should not specify two IP layers.")
|
||||
|
||||
if 'tcp' in self.protocol_map and 'udp' in self.protocol_map:
|
||||
raise MultiAddrValueError("Multiaddr should not specify two transport layers.")
|
||||
|
||||
self.protocol_map = protocol_map
|
||||
|
||||
def get_protocols(self):
|
||||
"""
|
||||
:return: List of protocols contained in this multiaddr.
|
||||
"""
|
||||
return list(self.protocol_map.keys())
|
||||
|
||||
def get_protocol_value(self, protocol):
|
||||
"""
|
||||
Getter for protocol values in this multiaddr.
|
||||
:param protocol: the protocol whose value to retrieve
|
||||
:return: value of input protocol
|
||||
"""
|
||||
if protocol not in self.protocol_map:
|
||||
return None
|
||||
|
||||
return self.protocol_map[protocol]
|
||||
|
||||
def add_protocol(self, protocol, value):
|
||||
"""
|
||||
Setter for protocol values in this multiaddr.
|
||||
:param protocol: the protocol whose value to set or add
|
||||
:param value: the value for the input protocol
|
||||
:return: True if successful
|
||||
"""
|
||||
self.protocol_map[protocol] = value
|
||||
return True
|
||||
|
||||
def remove_protocol(self, protocol):
|
||||
"""
|
||||
Remove protocol and its value from this multiaddr.
|
||||
:param protocol: the protocol to remove
|
||||
:return: True if remove succeeded, False if protocol was not contained in this multiaddr
|
||||
"""
|
||||
del self.protocol_map[protocol]
|
||||
|
||||
def get_multiaddr_string(self):
|
||||
"""
|
||||
:return: the string representation of this multiaddr.
|
||||
"""
|
||||
addr = ""
|
||||
|
||||
for protocol in self.protocol_map:
|
||||
addr += "/" + protocol + "/" + self.get_protocol_value(protocol)
|
||||
|
||||
return addr
|
||||
|
||||
def to_options(self):
|
||||
"""
|
||||
Gives back a dictionary with access to transport information from this multiaddr.
|
||||
Example: MultiAddr('/ip4/127.0.0.1/tcp/4001').to_options()
|
||||
= { family: 'ipv4', host: '127.0.0.1', transport: 'tcp', port: '4001' }
|
||||
:return: {{family: String, host: String, transport: String, port: String}}
|
||||
with None if field does not exist
|
||||
"""
|
||||
options = dict()
|
||||
|
||||
if 'ip4' in self.protocol_map:
|
||||
options['family'] = 'ipv4'
|
||||
options['host'] = self.protocol_map['ip4']
|
||||
elif 'ip6' in self.protocol_map:
|
||||
options['family'] = 'ipv6'
|
||||
options['host'] = self.protocol_map['ip6']
|
||||
else:
|
||||
options['family'] = None
|
||||
options['host'] = None
|
||||
|
||||
if 'tcp' in self.protocol_map:
|
||||
options['transport'] = 'tcp'
|
||||
options['port'] = self.protocol_map['tcp']
|
||||
elif 'udp' in self.protocol_map:
|
||||
options['transport'] = 'udp'
|
||||
options['port'] = self.protocol_map['udp']
|
||||
else:
|
||||
options['transport'] = None
|
||||
options['port'] = None
|
||||
|
||||
return options
|
||||
|
||||
|
||||
class MultiAddrValueError(ValueError):
|
||||
"""Raised when the input string to the MultiAddr constructor was invalid."""
|
||||
43
libp2p/network/network_interface.py
Normal file
43
libp2p/network/network_interface.py
Normal file
@ -0,0 +1,43 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class INetwork(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_peer_id(self):
|
||||
"""
|
||||
:return: the peer id
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def dial_peer(self, peer_id):
|
||||
"""
|
||||
dial_peer try to create a connection to peer_id
|
||||
|
||||
:param peer_id: peer if we want to dial
|
||||
:raises SwarmException: raised when no address if found for peer_id
|
||||
:return: muxed connection
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_stream_handler(self, protocol_id, stream_handler):
|
||||
"""
|
||||
:param protocol_id: protocol id used on stream
|
||||
:param stream_handler: a stream handler instance
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def new_stream(self, peer_id, protocol_ids):
|
||||
"""
|
||||
:param peer_id: peer_id of destination
|
||||
:param protocol_ids: available protocol ids to use for stream
|
||||
:return: net stream instance
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def listen(self, *args):
|
||||
"""
|
||||
:param *args: one or many multiaddrs to start listening on
|
||||
:return: True if at least one success
|
||||
"""
|
||||
0
libp2p/network/stream/__init__.py
Normal file
0
libp2p/network/stream/__init__.py
Normal file
43
libp2p/network/stream/net_stream.py
Normal file
43
libp2p/network/stream/net_stream.py
Normal file
@ -0,0 +1,43 @@
|
||||
from .net_stream_interface import INetStream
|
||||
|
||||
|
||||
class NetStream(INetStream):
|
||||
|
||||
def __init__(self, muxed_stream):
|
||||
self.muxed_stream = muxed_stream
|
||||
self.protocol_id = None
|
||||
|
||||
def get_protocol(self):
|
||||
"""
|
||||
:return: protocol id that stream runs on
|
||||
"""
|
||||
return self.protocol_id
|
||||
|
||||
def set_protocol(self, protocol_id):
|
||||
"""
|
||||
:param protocol_id: protocol id that stream runs on
|
||||
:return: true if successful
|
||||
"""
|
||||
self.protocol_id = protocol_id
|
||||
|
||||
async def read(self):
|
||||
"""
|
||||
read from stream
|
||||
:return: bytes of input until EOF
|
||||
"""
|
||||
return await self.muxed_stream.read()
|
||||
|
||||
async def write(self, data):
|
||||
"""
|
||||
write to stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
return await self.muxed_stream.write(data)
|
||||
|
||||
async def close(self):
|
||||
"""
|
||||
close stream
|
||||
:return: true if successful
|
||||
"""
|
||||
await self.muxed_stream.close()
|
||||
return True
|
||||
38
libp2p/network/stream/net_stream_interface.py
Normal file
38
libp2p/network/stream/net_stream_interface.py
Normal file
@ -0,0 +1,38 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class INetStream(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_protocol(self):
|
||||
"""
|
||||
:return: protocol id that stream runs on
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_protocol(self, protocol_id):
|
||||
"""
|
||||
:param protocol_id: protocol id that stream runs on
|
||||
:return: true if successful
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def read(self):
|
||||
"""
|
||||
reads from the underlying muxed_stream
|
||||
:return: bytes of input
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def write(self, _bytes):
|
||||
"""
|
||||
write to the underlying muxed_stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""
|
||||
close the underlying muxed stream
|
||||
:return: true if successful
|
||||
"""
|
||||
160
libp2p/network/swarm.py
Normal file
160
libp2p/network/swarm.py
Normal file
@ -0,0 +1,160 @@
|
||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
|
||||
|
||||
from .network_interface import INetwork
|
||||
from .stream.net_stream import NetStream
|
||||
from .connection.raw_connection import RawConnection
|
||||
|
||||
|
||||
class Swarm(INetwork):
|
||||
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
||||
|
||||
def __init__(self, peer_id, peerstore, upgrader):
|
||||
self.self_id = peer_id
|
||||
self.peerstore = peerstore
|
||||
self.upgrader = upgrader
|
||||
self.connections = dict()
|
||||
self.listeners = dict()
|
||||
self.stream_handlers = dict()
|
||||
self.transport = None
|
||||
|
||||
# Protocol muxing
|
||||
self.multiselect = Multiselect()
|
||||
self.multiselect_client = MultiselectClient()
|
||||
|
||||
def get_peer_id(self):
|
||||
return self.self_id
|
||||
|
||||
def set_stream_handler(self, protocol_id, stream_handler):
|
||||
"""
|
||||
:param protocol_id: protocol id used on stream
|
||||
:param stream_handler: a stream handler instance
|
||||
:return: true if successful
|
||||
"""
|
||||
self.multiselect.add_handler(protocol_id, stream_handler)
|
||||
return True
|
||||
|
||||
async def dial_peer(self, peer_id):
|
||||
"""
|
||||
dial_peer try to create a connection to peer_id
|
||||
:param peer_id: peer if we want to dial
|
||||
:raises SwarmException: raised when no address if found for peer_id
|
||||
:return: muxed connection
|
||||
"""
|
||||
|
||||
# Get peer info from peer store
|
||||
addrs = self.peerstore.addrs(peer_id)
|
||||
|
||||
if not addrs:
|
||||
raise SwarmException("No known addresses to peer")
|
||||
|
||||
# TODO: define logic to choose which address to use, or try them all ?
|
||||
multiaddr = addrs[0]
|
||||
|
||||
if peer_id in self.connections:
|
||||
# If muxed connection already exists for peer_id,
|
||||
# set muxed connection equal to existing muxed connection
|
||||
muxed_conn = self.connections[peer_id]
|
||||
else:
|
||||
# Transport dials peer (gets back a raw conn)
|
||||
raw_conn = await self.transport.dial(multiaddr)
|
||||
|
||||
# Use upgrader to upgrade raw conn to muxed conn
|
||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
|
||||
|
||||
# Store muxed connection in connections
|
||||
self.connections[peer_id] = muxed_conn
|
||||
|
||||
return muxed_conn
|
||||
|
||||
async def new_stream(self, peer_id, protocol_ids):
|
||||
"""
|
||||
:param peer_id: peer_id of destination
|
||||
:param protocol_id: protocol id
|
||||
:return: net stream instance
|
||||
"""
|
||||
# Get peer info from peer store
|
||||
addrs = self.peerstore.addrs(peer_id)
|
||||
|
||||
if not addrs:
|
||||
raise SwarmException("No known addresses to peer")
|
||||
|
||||
multiaddr = addrs[0]
|
||||
|
||||
muxed_conn = await self.dial_peer(peer_id)
|
||||
|
||||
# Use muxed conn to open stream, which returns
|
||||
# a muxed stream
|
||||
# TODO: Remove protocol id from being passed into muxed_conn
|
||||
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], peer_id, multiaddr)
|
||||
|
||||
# Perform protocol muxing to determine protocol to use
|
||||
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream)
|
||||
|
||||
# Create a net stream with the selected protocol
|
||||
net_stream = NetStream(muxed_stream)
|
||||
net_stream.set_protocol(selected_protocol)
|
||||
|
||||
return net_stream
|
||||
|
||||
async def listen(self, *args):
|
||||
"""
|
||||
:param *args: one or many multiaddrs to start listening on
|
||||
:return: true if at least one success
|
||||
|
||||
For each multiaddr in args
|
||||
Check if a listener for multiaddr exists already
|
||||
If listener already exists, continue
|
||||
Otherwise:
|
||||
Capture multiaddr in conn handler
|
||||
Have conn handler delegate to stream handler
|
||||
Call listener listen with the multiaddr
|
||||
Map multiaddr to listener
|
||||
"""
|
||||
for multiaddr in args:
|
||||
if str(multiaddr) in self.listeners:
|
||||
return True
|
||||
|
||||
async def conn_handler(reader, writer):
|
||||
# Upgrade reader/write to a net_stream and pass \
|
||||
# to appropriate stream handler (using multiaddr)
|
||||
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
|
||||
multiaddr.value_for_protocol('tcp'), reader, writer, False)
|
||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn)
|
||||
|
||||
# TODO: Remove protocol id from muxed_conn accept stream or
|
||||
# move protocol muxing into accept_stream
|
||||
muxed_stream, _, _ = await muxed_conn.accept_stream()
|
||||
|
||||
# Perform protocol muxing to determine protocol to use
|
||||
selected_protocol, handler = await self.multiselect.negotiate(muxed_stream)
|
||||
|
||||
net_stream = NetStream(muxed_stream)
|
||||
net_stream.set_protocol(selected_protocol)
|
||||
|
||||
# Give to stream handler
|
||||
# TODO: handle case when stream handler is set
|
||||
# TODO: handle case of multiple protocols over same raw connection
|
||||
await handler(net_stream)
|
||||
|
||||
try:
|
||||
# Success
|
||||
listener = self.transport.create_listener(conn_handler)
|
||||
self.listeners[str(multiaddr)] = listener
|
||||
await listener.listen(multiaddr)
|
||||
return True
|
||||
except IOError:
|
||||
# Failed. Continue looping.
|
||||
print("Failed to connect to: " + str(multiaddr))
|
||||
|
||||
# No multiaddr succeeded
|
||||
return False
|
||||
|
||||
def add_transport(self, transport):
|
||||
# TODO: Support more than one transport
|
||||
self.transport = transport
|
||||
|
||||
|
||||
class SwarmException(Exception):
|
||||
pass
|
||||
Reference in New Issue
Block a user