mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Merge branch 'master' into security-dev
This commit is contained in:
@ -68,18 +68,18 @@ def initialize_default_swarm(
|
|||||||
if not id_opt:
|
if not id_opt:
|
||||||
id_opt = generate_id()
|
id_opt = generate_id()
|
||||||
|
|
||||||
|
# TODO parse transport_opt to determine transport
|
||||||
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
|
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
|
||||||
transport = [multiaddr.Multiaddr(t) for t in transport_opt]
|
transport = TCP()
|
||||||
# TODO wire muxer up with swarm
|
|
||||||
# muxer = muxer_opt or ["mplex/6.7.0"]
|
# TODO TransportUpgrader is not doing anything really
|
||||||
|
# TODO parse muxer and sec to pass into TransportUpgrader
|
||||||
|
muxer = muxer_opt or ["mplex/6.7.0"]
|
||||||
|
sec = sec_opt or {"insecure/1.0.0": InsecureTransport("insecure")}
|
||||||
|
upgrader = TransportUpgrader(sec, muxer)
|
||||||
|
|
||||||
# Use passed in security option or the default insecure option
|
|
||||||
sec = sec_opt or {"/insecure/1.0.0": InsecureTransport("insecure")}
|
|
||||||
peerstore = peerstore_opt or PeerStore()
|
peerstore = peerstore_opt or PeerStore()
|
||||||
upgrader = TransportUpgrader(sec, transport)
|
swarm_opt = Swarm(id_opt, peerstore, upgrader, transport)
|
||||||
swarm_opt = Swarm(id_opt, peerstore, upgrader)
|
|
||||||
tcp = TCP()
|
|
||||||
swarm_opt.add_transport(tcp)
|
|
||||||
|
|
||||||
return swarm_opt
|
return swarm_opt
|
||||||
|
|
||||||
|
|||||||
@ -166,6 +166,20 @@ class KademliaServer:
|
|||||||
dkey = digest(key)
|
dkey = digest(key)
|
||||||
return await self.set_digest(dkey, value)
|
return await self.set_digest(dkey, value)
|
||||||
|
|
||||||
|
async def provide(self, key):
|
||||||
|
"""
|
||||||
|
publish to the network that it provides for a particular key
|
||||||
|
"""
|
||||||
|
neighbors = self.protocol.router.find_neighbors(self.node)
|
||||||
|
return [await self.protocol.call_add_provider(n, key, self.node.peer_id) for n in neighbors]
|
||||||
|
|
||||||
|
async def get_providers(self, key):
|
||||||
|
"""
|
||||||
|
get the list of providers for a key
|
||||||
|
"""
|
||||||
|
neighbors = self.protocol.router.find_neighbors(self.node)
|
||||||
|
return [await self.protocol.call_get_providers(n, key) for n in neighbors]
|
||||||
|
|
||||||
async def set_digest(self, dkey, value):
|
async def set_digest(self, dkey, value):
|
||||||
"""
|
"""
|
||||||
Set the given SHA1 digest key (bytes) to the given value in the
|
Set the given SHA1 digest key (bytes) to the given value in the
|
||||||
|
|||||||
@ -36,12 +36,6 @@ class KademliaProtocol(RPCProtocol):
|
|||||||
ids.append(rid)
|
ids.append(rid)
|
||||||
return ids
|
return ids
|
||||||
|
|
||||||
def rpc_add_provider(self, sender, nodeid, key):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def rpc_get_providers(self, sender, nodeid, key):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def rpc_stun(self, sender): # pylint: disable=no-self-use
|
def rpc_stun(self, sender): # pylint: disable=no-self-use
|
||||||
return sender
|
return sender
|
||||||
|
|
||||||
@ -79,6 +73,42 @@ class KademliaProtocol(RPCProtocol):
|
|||||||
return self.rpc_find_node(sender, nodeid, key)
|
return self.rpc_find_node(sender, nodeid, key)
|
||||||
return {'value': value}
|
return {'value': value}
|
||||||
|
|
||||||
|
def rpc_add_provider(self, sender, nodeid, key, provider_id):
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
rpc when receiving an add_provider call
|
||||||
|
should validate received PeerInfo matches sender nodeid
|
||||||
|
if it does, receipient must store a record in its datastore
|
||||||
|
we store a map of content_id to peer_id (non xor)
|
||||||
|
"""
|
||||||
|
if nodeid == provider_id:
|
||||||
|
log.info("adding provider %s for key %s in local table",
|
||||||
|
provider_id, str(key))
|
||||||
|
self.storage[key] = provider_id
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def rpc_get_providers(self, sender, key):
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
rpc when receiving a get_providers call
|
||||||
|
should look up key in data store and respond with records
|
||||||
|
plus a list of closer peers in its routing table
|
||||||
|
"""
|
||||||
|
providers = []
|
||||||
|
record = self.storage.get(key, None)
|
||||||
|
|
||||||
|
if record:
|
||||||
|
providers.append(record)
|
||||||
|
|
||||||
|
keynode = create_kad_peerinfo(key)
|
||||||
|
neighbors = self.router.find_neighbors(keynode)
|
||||||
|
for neighbor in neighbors:
|
||||||
|
if neighbor.peer_id != record:
|
||||||
|
providers.append(neighbor.peer_id)
|
||||||
|
|
||||||
|
return providers
|
||||||
|
|
||||||
async def call_find_node(self, node_to_ask, node_to_find):
|
async def call_find_node(self, node_to_ask, node_to_find):
|
||||||
address = (node_to_ask.ip, node_to_ask.port)
|
address = (node_to_ask.ip, node_to_ask.port)
|
||||||
result = await self.find_node(address, self.source_node.peer_id,
|
result = await self.find_node(address, self.source_node.peer_id,
|
||||||
@ -101,6 +131,19 @@ class KademliaProtocol(RPCProtocol):
|
|||||||
result = await self.store(address, self.source_node.peer_id, key, value)
|
result = await self.store(address, self.source_node.peer_id, key, value)
|
||||||
return self.handle_call_response(result, node_to_ask)
|
return self.handle_call_response(result, node_to_ask)
|
||||||
|
|
||||||
|
async def call_add_provider(self, node_to_ask, key, provider_id):
|
||||||
|
address = (node_to_ask.ip, node_to_ask.port)
|
||||||
|
result = await self.add_provider(address,
|
||||||
|
self.source_node.peer_id,
|
||||||
|
key, provider_id)
|
||||||
|
|
||||||
|
return self.handle_call_response(result, node_to_ask)
|
||||||
|
|
||||||
|
async def call_get_providers(self, node_to_ask, key):
|
||||||
|
address = (node_to_ask.ip, node_to_ask.port)
|
||||||
|
result = await self.get_providers(address, key)
|
||||||
|
return self.handle_call_response(result, node_to_ask)
|
||||||
|
|
||||||
def welcome_if_new(self, node):
|
def welcome_if_new(self, node):
|
||||||
"""
|
"""
|
||||||
Given a new node, send it all the keys/values it should be storing,
|
Given a new node, send it all the keys/values it should be storing,
|
||||||
|
|||||||
@ -12,14 +12,14 @@ from .connection.raw_connection import RawConnection
|
|||||||
class Swarm(INetwork):
|
class Swarm(INetwork):
|
||||||
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
||||||
|
|
||||||
def __init__(self, peer_id, peerstore, upgrader):
|
def __init__(self, peer_id, peerstore, upgrader, transport):
|
||||||
self.self_id = peer_id
|
self.self_id = peer_id
|
||||||
self.peerstore = peerstore
|
self.peerstore = peerstore
|
||||||
self.upgrader = upgrader
|
self.upgrader = upgrader
|
||||||
|
self.transport = transport
|
||||||
self.connections = dict()
|
self.connections = dict()
|
||||||
self.listeners = dict()
|
self.listeners = dict()
|
||||||
self.stream_handlers = dict()
|
self.stream_handlers = dict()
|
||||||
self.transport = None
|
|
||||||
|
|
||||||
# Protocol muxing
|
# Protocol muxing
|
||||||
self.multiselect = Multiselect()
|
self.multiselect = Multiselect()
|
||||||
@ -189,10 +189,6 @@ class Swarm(INetwork):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def add_transport(self, transport):
|
|
||||||
# TODO: Support more than one transport
|
|
||||||
self.transport = transport
|
|
||||||
|
|
||||||
def create_generic_protocol_handler(swarm):
|
def create_generic_protocol_handler(swarm):
|
||||||
"""
|
"""
|
||||||
Create a generic protocol handler from the given swarm. We use swarm
|
Create a generic protocol handler from the given swarm. We use swarm
|
||||||
|
|||||||
31
tests/kademlia/test_providers.py
Normal file
31
tests/kademlia/test_providers.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
import pytest
|
||||||
|
from libp2p.kademlia.network import KademliaServer
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_example():
|
||||||
|
node_a = KademliaServer()
|
||||||
|
await node_a.listen(5801)
|
||||||
|
|
||||||
|
node_b = KademliaServer()
|
||||||
|
await node_b.listen(5802)
|
||||||
|
await node_b.bootstrap([("127.0.0.1", 5801)])
|
||||||
|
|
||||||
|
key = "hello"
|
||||||
|
value = "world"
|
||||||
|
await node_b.set(key, value)
|
||||||
|
await node_b.provide("hello")
|
||||||
|
|
||||||
|
providers = await node_b.get_providers("hello")
|
||||||
|
# print ("providers")
|
||||||
|
# print (providers)
|
||||||
|
|
||||||
|
# bmuller's handle_call_response wraps
|
||||||
|
# every rpc call result in a list of tuples
|
||||||
|
# [(True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])]
|
||||||
|
first_tuple = providers[0]
|
||||||
|
# (True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])
|
||||||
|
first_providers = first_tuple[1]
|
||||||
|
# [b'\xf9\xa1\xf5\x10a\xe5\xe0F']
|
||||||
|
first_provider = first_providers[0]
|
||||||
|
assert node_b.node.peer_id == first_provider
|
||||||
Reference in New Issue
Block a user