mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix: message id type inonsistency in handle ihave and message id parsing improvement in handle iwant
This commit is contained in:
@ -37,3 +37,4 @@ SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool]
|
|||||||
AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]]
|
AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]]
|
||||||
ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn]
|
ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn]
|
||||||
UnsubscribeFn = Callable[[], Awaitable[None]]
|
UnsubscribeFn = Callable[[], Awaitable[None]]
|
||||||
|
MessageID = NewType("MessageID", str)
|
||||||
|
|||||||
@ -1,6 +1,3 @@
|
|||||||
from ast import (
|
|
||||||
literal_eval,
|
|
||||||
)
|
|
||||||
from collections import (
|
from collections import (
|
||||||
defaultdict,
|
defaultdict,
|
||||||
)
|
)
|
||||||
@ -22,6 +19,7 @@ from libp2p.abc import (
|
|||||||
IPubsubRouter,
|
IPubsubRouter,
|
||||||
)
|
)
|
||||||
from libp2p.custom_types import (
|
from libp2p.custom_types import (
|
||||||
|
MessageID,
|
||||||
TProtocol,
|
TProtocol,
|
||||||
)
|
)
|
||||||
from libp2p.peer.id import (
|
from libp2p.peer.id import (
|
||||||
@ -54,6 +52,10 @@ from .pb import (
|
|||||||
from .pubsub import (
|
from .pubsub import (
|
||||||
Pubsub,
|
Pubsub,
|
||||||
)
|
)
|
||||||
|
from .utils import (
|
||||||
|
parse_message_id_safe,
|
||||||
|
safe_parse_message_id,
|
||||||
|
)
|
||||||
|
|
||||||
PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
|
PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
|
||||||
PROTOCOL_ID_V11 = TProtocol("/meshsub/1.1.0")
|
PROTOCOL_ID_V11 = TProtocol("/meshsub/1.1.0")
|
||||||
@ -780,11 +782,10 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
# Add all unknown message ids (ids that appear in ihave_msg but not in
|
# Add all unknown message ids (ids that appear in ihave_msg but not in
|
||||||
# seen_seqnos) to list of messages we want to request
|
# seen_seqnos) to list of messages we want to request
|
||||||
# FIXME: Update type of message ID
|
msg_ids_wanted: list[MessageID] = [
|
||||||
msg_ids_wanted: list[Any] = [
|
parse_message_id_safe(msg_id)
|
||||||
msg_id
|
|
||||||
for msg_id in ihave_msg.messageIDs
|
for msg_id in ihave_msg.messageIDs
|
||||||
if literal_eval(msg_id) not in seen_seqnos_and_peers
|
if msg_id not in str(seen_seqnos_and_peers)
|
||||||
]
|
]
|
||||||
|
|
||||||
# Request messages with IWANT message
|
# Request messages with IWANT message
|
||||||
@ -798,9 +799,9 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
Forwards all request messages that are present in mcache to the
|
Forwards all request messages that are present in mcache to the
|
||||||
requesting peer.
|
requesting peer.
|
||||||
"""
|
"""
|
||||||
# FIXME: Update type of message ID
|
msg_ids: list[tuple[bytes, bytes]] = [
|
||||||
# FIXME: Find a better way to parse the msg ids
|
safe_parse_message_id(msg) for msg in iwant_msg.messageIDs
|
||||||
msg_ids: list[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
|
]
|
||||||
msgs_to_forward: list[rpc_pb2.Message] = []
|
msgs_to_forward: list[rpc_pb2.Message] = []
|
||||||
for msg_id_iwant in msg_ids:
|
for msg_id_iwant in msg_ids:
|
||||||
# Check if the wanted message ID is present in mcache
|
# Check if the wanted message ID is present in mcache
|
||||||
|
|||||||
31
libp2p/pubsub/utils.py
Normal file
31
libp2p/pubsub/utils.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
import ast
|
||||||
|
|
||||||
|
from libp2p.custom_types import (
|
||||||
|
MessageID,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_message_id_safe(msg_id_str: str) -> MessageID:
|
||||||
|
"""Safely handle message ID as string."""
|
||||||
|
return MessageID(msg_id_str)
|
||||||
|
|
||||||
|
|
||||||
|
def safe_parse_message_id(msg_id_str: str) -> tuple[bytes, bytes]:
|
||||||
|
"""
|
||||||
|
Safely parse message ID using ast.literal_eval with validation.
|
||||||
|
:param msg_id_str: String representation of message ID
|
||||||
|
:return: Tuple of (seqno, from_id) as bytes
|
||||||
|
:raises ValueError: If parsing fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
parsed = ast.literal_eval(msg_id_str)
|
||||||
|
if not isinstance(parsed, tuple) or len(parsed) != 2:
|
||||||
|
raise ValueError("Invalid message ID format")
|
||||||
|
|
||||||
|
seqno, from_id = parsed
|
||||||
|
if not isinstance(seqno, bytes) or not isinstance(from_id, bytes):
|
||||||
|
raise ValueError("Message ID components must be bytes")
|
||||||
|
|
||||||
|
return (seqno, from_id)
|
||||||
|
except (ValueError, SyntaxError) as e:
|
||||||
|
raise ValueError(f"Invalid message ID format: {e}")
|
||||||
Reference in New Issue
Block a user