fixed the linter <> protobuf issues

This commit is contained in:
lla-dane
2025-07-17 23:22:49 +05:30
parent 8625226be8
commit 9d597012cc
11 changed files with 117 additions and 40 deletions

View File

@ -1,4 +1,4 @@
from typing import Any
from typing import Any, cast
from libp2p.crypto.ed25519 import Ed25519PublicKey
from libp2p.crypto.keys import PrivateKey, PublicKey
@ -58,7 +58,7 @@ class Envelope:
:return: Serialized envelope as bytes.
"""
pb_env = pb.Envelope( # type: ignore[attr-defined]
pb_env = pb.Envelope(
public_key=pub_key_to_protobuf(self.public_key),
payload_type=self.payload_type,
payload=self.raw_payload,
@ -97,7 +97,7 @@ class Envelope:
try:
if self.payload_type != PEER_RECORD_CODEC:
raise ValueError("Unsuported payload type in envelope")
msg = record_pb.PeerRecord() # type: ignore[attr-defined]
msg = record_pb.PeerRecord()
msg.ParseFromString(self.raw_payload)
self._cached_record = peer_record_from_protobuf(msg)
@ -128,20 +128,21 @@ class Envelope:
return False
def pub_key_to_protobuf(pub_key: PublicKey) -> cryto_pb.PublicKey: # type: ignore[name-defined]
def pub_key_to_protobuf(pub_key: PublicKey) -> cryto_pb.PublicKey:
"""
Convert a Python PublicKey object to its protobuf equivalent.
:param pub_key: A libp2p-compatible PublicKey instance.
:return: Serialized protobuf PublicKey message.
"""
key_type = pub_key.get_type().value
internal_key_type = pub_key.get_type()
key_type = cast(cryto_pb.KeyType, internal_key_type.value)
data = pub_key.to_bytes()
protobuf_key = cryto_pb.PublicKey(Type=key_type, Data=data) # type: ignore[attr-defined]
protobuf_key = cryto_pb.PublicKey(Type=key_type, Data=data)
return protobuf_key
def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey: # type: ignore[name-defined]
def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey:
"""
Parse a protobuf PublicKey message into a native libp2p PublicKey.
@ -151,11 +152,11 @@ def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey: # type: ign
:return: Parsed PublicKey object.
:raises ValueError: If the key type is unrecognized.
"""
if pb_key.Type == cryto_pb.KeyType.Ed25519: # type: ignore[attr-defined]
if pb_key.Type == cryto_pb.KeyType.Ed25519:
return Ed25519PublicKey.from_bytes(pb_key.Data)
elif pb_key.Type == cryto_pb.KeyType.RSA: # type: ignore[attr-defined]
elif pb_key.Type == cryto_pb.KeyType.RSA:
return RSAPublicKey.from_bytes(pb_key.Data)
elif pb_key.Type == cryto_pb.KeyType.Secp256k1: # type: ignore[attr-defined]
elif pb_key.Type == cryto_pb.KeyType.Secp256k1:
return Secp256k1PublicKey.from_bytes(pb_key.Data)
# TODO: Add suport fot ECDSA parsing also
else:
@ -214,7 +215,7 @@ def unmarshal_envelope(data: bytes) -> Envelope:
:return: Parsed Envelope object.
:raises DecodeError: If protobuf parsing fails.
"""
pb_env = pb.Envelope() # type: ignore[attr-defined]
pb_env = pb.Envelope()
pb_env.ParseFromString(data)
pk = pub_key_from_protobuf(pb_env.public_key)