This commit is contained in:
guha-rahul
2025-06-16 02:50:40 +05:30
parent d2825af045
commit c33ab32c33
3 changed files with 45 additions and 43 deletions

View File

@ -12,15 +12,9 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .exceptions import (
PubsubRouterError,
@ -120,13 +114,7 @@ class FloodSub(IPubsubRouter):
if peer_id not in pubsub.peers:
continue
stream = pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
pubsub._handle_dead_peer(peer_id)
await pubsub.write_msg(stream, rpc_msg)
async def join(self, topic: str) -> None:
"""

View File

@ -24,9 +24,6 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
@ -42,9 +39,6 @@ from libp2p.pubsub import (
from libp2p.tools.async_service import (
Service,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .exceptions import (
NoPubsubAttached,
@ -249,14 +243,8 @@ class GossipSub(IPubsubRouter, Service):
if peer_id not in self.pubsub.peers:
continue
stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
await self.pubsub.write_msg(stream, rpc_msg)
for topic in pubsub_msg.topicIDs:
self.time_since_last_publish[topic] = int(time.time())
@ -705,8 +693,6 @@ class GossipSub(IPubsubRouter, Service):
packet.publish.extend(msgs_to_forward)
# 2) Serialize that packet
rpc_msg: bytes = packet.SerializeToString()
if self.pubsub is None:
raise NoPubsubAttached
@ -720,14 +706,7 @@ class GossipSub(IPubsubRouter, Service):
peer_stream = self.pubsub.peers[sender_peer_id]
# 4) And write the packet to the stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug(
"Fail to responed to iwant request from %s: stream closed",
sender_peer_id,
)
self.pubsub._handle_dead_peer(sender_peer_id)
await self.pubsub.write_msg(peer_stream, packet)
async def handle_graft(
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@ -826,8 +805,6 @@ class GossipSub(IPubsubRouter, Service):
packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg)
rpc_msg: bytes = packet.SerializeToString()
# Get stream for peer from pubsub
if to_peer not in self.pubsub.peers:
logger.debug(
@ -837,8 +814,4 @@ class GossipSub(IPubsubRouter, Service):
peer_stream = self.pubsub.peers[to_peer]
# Write rpc to stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug("Fail to emit control message to %s: stream closed", to_peer)
self.pubsub._handle_dead_peer(to_peer)
await self.pubsub.write_msg(peer_stream, packet)

View File

@ -66,6 +66,7 @@ from libp2p.utils import (
encode_varint_prefixed,
read_varint_prefixed_bytes,
)
from libp2p.utils.varint import encode_uvarint
from .pb import (
rpc_pb2,
@ -773,3 +774,43 @@ class Pubsub(Service, IPubsub):
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
return any(topic in self.topic_ids for topic in msg.topicIDs)
async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool:
"""
Write an RPC message to a stream with proper error handling.
Implements WriteMsg similar to go-libp2p-pubsub comm.go
Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
:param stream: stream to write the message to
:param rpc_msg: RPC message to write
:return: True if successful, False if stream was closed
"""
try:
# Calculate message size first
msg_bytes = rpc_msg.SerializeToString()
msg_size = len(msg_bytes)
# Calculate varint size and allocate exact buffer size needed
varint_bytes = encode_uvarint(msg_size)
varint_size = len(varint_bytes)
# Allocate buffer with exact size (like Go's pool.Get())
buf = bytearray(varint_size + msg_size)
# Write varint length prefix to buffer (like Go's binary.PutUvarint())
buf[:varint_size] = varint_bytes
# Write serialized message after varint (like Go's rpc.MarshalTo())
buf[varint_size:] = msg_bytes
# Single write operation (like Go's s.Write(buf))
await stream.write(bytes(buf))
return True
except StreamClosed:
peer_id = stream.muxed_conn.peer_id
logger.debug("Fail to write message to %s: stream closed", peer_id)
self._handle_dead_peer(peer_id)
return False