mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-11 15:40:54 +00:00
Add type hints to peer folder
This commit is contained in:
@ -1,3 +1,10 @@
|
||||
from typing import (
|
||||
Union,
|
||||
)
|
||||
|
||||
from Crypto.PublicKey.RSA import (
|
||||
RsaKey,
|
||||
)
|
||||
import hashlib
|
||||
import base58
|
||||
import multihash
|
||||
@ -13,67 +20,71 @@ MAX_INLINE_KEY_LENGTH = 42
|
||||
|
||||
class ID:
|
||||
|
||||
def __init__(self, id_str):
|
||||
_id_str: str
|
||||
|
||||
def __init__(self, id_str: str) -> None:
|
||||
self._id_str = id_str
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
return self._id_str
|
||||
|
||||
def get_raw_id(self):
|
||||
def get_raw_id(self) -> str:
|
||||
return self._id_str
|
||||
|
||||
def pretty(self):
|
||||
def pretty(self) -> str:
|
||||
return base58.b58encode(self._id_str).decode()
|
||||
|
||||
def get_xor_id(self):
|
||||
def get_xor_id(self) -> int:
|
||||
return int(digest(self.get_raw_id()).hex(), 16)
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
pid = self.pretty()
|
||||
return pid
|
||||
|
||||
__repr__ = __str__
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: object) -> bool:
|
||||
#pylint: disable=protected-access
|
||||
if not isinstance(other, ID):
|
||||
return NotImplemented
|
||||
return self._id_str == other._id_str
|
||||
|
||||
def __hash__(self):
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._id_str)
|
||||
|
||||
|
||||
def id_b58_encode(peer_id):
|
||||
def id_b58_encode(peer_id: ID) -> str:
|
||||
"""
|
||||
return a b58-encoded string
|
||||
"""
|
||||
#pylint: disable=protected-access
|
||||
return base58.b58encode(peer_id._id_str).decode()
|
||||
return base58.b58encode(peer_id.get_raw_id()).decode()
|
||||
|
||||
|
||||
def id_b58_decode(peer_id_str):
|
||||
def id_b58_decode(peer_id_str: str) -> ID:
|
||||
"""
|
||||
return a base58-decoded peer ID
|
||||
"""
|
||||
return ID(base58.b58decode(peer_id_str))
|
||||
|
||||
|
||||
def id_from_public_key(key):
|
||||
def id_from_public_key(key: RsaKey) -> ID:
|
||||
# export into binary format
|
||||
key_bin = key.exportKey("DER")
|
||||
|
||||
algo = multihash.Func.sha2_256
|
||||
algo: int = multihash.Func.sha2_256
|
||||
# TODO: seems identity is not yet supported in pymultihash
|
||||
# if len(b) <= MAX_INLINE_KEY_LENGTH:
|
||||
# algo multihash.func.identity
|
||||
|
||||
mh_digest = multihash.digest(key_bin, algo)
|
||||
mh_digest: multihash.Multihash = multihash.digest(key_bin, algo)
|
||||
return ID(mh_digest.encode())
|
||||
|
||||
|
||||
def id_from_private_key(key):
|
||||
def id_from_private_key(key: RsaKey) -> ID:
|
||||
return id_from_public_key(key.publickey())
|
||||
|
||||
def digest(string):
|
||||
if not isinstance(string, bytes):
|
||||
string = str(string).encode('utf8')
|
||||
return hashlib.sha1(string).digest()
|
||||
def digest(data: Union[str, bytes]) -> bytes:
|
||||
if not isinstance(data, bytes):
|
||||
data_bytes = str(data).encode('utf8')
|
||||
return hashlib.sha1(data_bytes).digest()
|
||||
|
||||
Reference in New Issue
Block a user