mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-10 07:00:54 +00:00
Add type hints to routing folder
This commit is contained in:
@ -1,11 +1,23 @@
|
|||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Coroutine,
|
||||||
|
Iterable,
|
||||||
|
)
|
||||||
|
|
||||||
|
from libp2p.peer.id import (
|
||||||
|
ID,
|
||||||
|
)
|
||||||
|
from libp2p.peer.peerinfo import (
|
||||||
|
PeerInfo,
|
||||||
|
)
|
||||||
# pylint: disable=too-few-public-methods
|
# pylint: disable=too-few-public-methods
|
||||||
|
|
||||||
|
|
||||||
class IContentRouting(ABC):
|
class IContentRouting(ABC):
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def provide(self, cid, announce=True):
|
def provide(self, cid: bytes, announce: bool=True) -> None:
|
||||||
"""
|
"""
|
||||||
Provide adds the given cid to the content routing system. If announce is True,
|
Provide adds the given cid to the content routing system. If announce is True,
|
||||||
it also announces it, otherwise it is just kept in the local
|
it also announces it, otherwise it is just kept in the local
|
||||||
@ -13,7 +25,7 @@ class IContentRouting(ABC):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def find_provider_iter(self, cid, count):
|
def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]:
|
||||||
"""
|
"""
|
||||||
Search for peers who are able to provide a given key
|
Search for peers who are able to provide a given key
|
||||||
returns an iterator of peer.PeerInfo
|
returns an iterator of peer.PeerInfo
|
||||||
@ -23,7 +35,7 @@ class IContentRouting(ABC):
|
|||||||
class IPeerRouting(ABC):
|
class IPeerRouting(ABC):
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def find_peer(self, peer_id):
|
def find_peer(self, peer_id: ID) -> Coroutine[Any, Any, PeerInfo]:
|
||||||
"""
|
"""
|
||||||
Find specific Peer
|
Find specific Peer
|
||||||
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
|
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
|
||||||
|
|||||||
@ -1,9 +1,14 @@
|
|||||||
|
from typing import (
|
||||||
|
Iterable,
|
||||||
|
)
|
||||||
|
|
||||||
|
from libp2p.peer.peerinfo import PeerInfo
|
||||||
from libp2p.routing.interfaces import IContentRouting
|
from libp2p.routing.interfaces import IContentRouting
|
||||||
|
|
||||||
|
|
||||||
class KadmeliaContentRouter(IContentRouting):
|
class KadmeliaContentRouter(IContentRouting):
|
||||||
|
|
||||||
def provide(self, cid, announce=True):
|
def provide(self, cid: bytes, announce: bool=True) -> None:
|
||||||
"""
|
"""
|
||||||
Provide adds the given cid to the content routing system. If announce is True,
|
Provide adds the given cid to the content routing system. If announce is True,
|
||||||
it also announces it, otherwise it is just kept in the local
|
it also announces it, otherwise it is just kept in the local
|
||||||
@ -12,7 +17,7 @@ class KadmeliaContentRouter(IContentRouting):
|
|||||||
# the DHT finds the closest peers to `key` using the `FIND_NODE` RPC
|
# the DHT finds the closest peers to `key` using the `FIND_NODE` RPC
|
||||||
# then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers.
|
# then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers.
|
||||||
|
|
||||||
def find_provider_iter(self, cid, count):
|
def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]:
|
||||||
"""
|
"""
|
||||||
Search for peers who are able to provide a given key
|
Search for peers who are able to provide a given key
|
||||||
returns an iterator of peer.PeerInfo
|
returns an iterator of peer.PeerInfo
|
||||||
|
|||||||
@ -1,16 +1,26 @@
|
|||||||
import ast
|
import ast
|
||||||
|
from typing import (
|
||||||
|
Union,
|
||||||
|
)
|
||||||
|
|
||||||
|
from libp2p.kademlia.kad_peerinfo import (
|
||||||
|
KadPeerInfo,
|
||||||
|
create_kad_peerinfo,
|
||||||
|
)
|
||||||
|
from libp2p.kademlia.network import KademliaServer
|
||||||
|
from libp2p.peer.id import ID
|
||||||
from libp2p.routing.interfaces import IPeerRouting
|
from libp2p.routing.interfaces import IPeerRouting
|
||||||
from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo
|
|
||||||
|
|
||||||
|
|
||||||
class KadmeliaPeerRouter(IPeerRouting):
|
class KadmeliaPeerRouter(IPeerRouting):
|
||||||
# pylint: disable=too-few-public-methods
|
# pylint: disable=too-few-public-methods
|
||||||
|
|
||||||
def __init__(self, dht_server):
|
server: KademliaServer
|
||||||
|
|
||||||
|
def __init__(self, dht_server: KademliaServer) -> None:
|
||||||
self.server = dht_server
|
self.server = dht_server
|
||||||
|
|
||||||
async def find_peer(self, peer_id):
|
async def find_peer(self, peer_id: ID) -> KadPeerInfo:
|
||||||
"""
|
"""
|
||||||
Find a specific peer
|
Find a specific peer
|
||||||
:param peer_id: peer to search for
|
:param peer_id: peer to search for
|
||||||
@ -21,7 +31,7 @@ class KadmeliaPeerRouter(IPeerRouting):
|
|||||||
value = await self.server.get(xor_id)
|
value = await self.server.get(xor_id)
|
||||||
return decode_peerinfo(value)
|
return decode_peerinfo(value)
|
||||||
|
|
||||||
def decode_peerinfo(encoded):
|
def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo:
|
||||||
if isinstance(encoded, bytes):
|
if isinstance(encoded, bytes):
|
||||||
encoded = encoded.decode()
|
encoded = encoded.decode()
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user