From c33ab32c33bcbccd5a68d6080e9cd3d8cf9ae71b Mon Sep 17 00:00:00 2001 From: guha-rahul <69rahul16@gmail.com> Date: Mon, 16 Jun 2025 02:50:40 +0530 Subject: [PATCH] init --- libp2p/pubsub/floodsub.py | 14 +------------ libp2p/pubsub/gossipsub.py | 33 +++--------------------------- libp2p/pubsub/pubsub.py | 41 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 93d01f1a..3e0d454f 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -12,15 +12,9 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( PubsubRouterError, @@ -120,13 +114,7 @@ class FloodSub(IPubsubRouter): if peer_id not in pubsub.peers: continue stream = pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - pubsub._handle_dead_peer(peer_id) + await pubsub.write_msg(stream, rpc_msg) async def join(self, topic: str) -> None: """ diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 813719dd..d2a52aaa 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,9 +24,6 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) @@ -42,9 +39,6 @@ from libp2p.pubsub import ( from libp2p.tools.async_service import ( Service, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( NoPubsubAttached, @@ -249,14 +243,8 @@ class GossipSub(IPubsubRouter, Service): if peer_id not in self.pubsub.peers: continue stream = self.pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - self.pubsub._handle_dead_peer(peer_id) + await self.pubsub.write_msg(stream, rpc_msg) for topic in pubsub_msg.topicIDs: self.time_since_last_publish[topic] = int(time.time()) @@ -705,8 +693,6 @@ class GossipSub(IPubsubRouter, Service): packet.publish.extend(msgs_to_forward) - # 2) Serialize that packet - rpc_msg: bytes = packet.SerializeToString() if self.pubsub is None: raise NoPubsubAttached @@ -720,14 +706,7 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug( - "Fail to responed to iwant request from %s: stream closed", - sender_peer_id, - ) - self.pubsub._handle_dead_peer(sender_peer_id) + await self.pubsub.write_msg(peer_stream, packet) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -826,8 +805,6 @@ class GossipSub(IPubsubRouter, Service): packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) - rpc_msg: bytes = packet.SerializeToString() - # Get stream for peer from pubsub if to_peer not in self.pubsub.peers: logger.debug( @@ -837,8 +814,4 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug("Fail to emit control message to %s: stream closed", to_peer) - self.pubsub._handle_dead_peer(to_peer) + await self.pubsub.write_msg(peer_stream, packet) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f66f30a..093e2754 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -66,6 +66,7 @@ from libp2p.utils import ( encode_varint_prefixed, read_varint_prefixed_bytes, ) +from libp2p.utils.varint import encode_uvarint from .pb import ( rpc_pb2, @@ -773,3 +774,43 @@ class Pubsub(Service, IPubsub): def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: return any(topic in self.topic_ids for topic in msg.topicIDs) + + async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool: + """ + Write an RPC message to a stream with proper error handling. + + Implements WriteMsg similar to go-libp2p-pubsub comm.go + Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + + + :param stream: stream to write the message to + :param rpc_msg: RPC message to write + :return: True if successful, False if stream was closed + """ + try: + # Calculate message size first + msg_bytes = rpc_msg.SerializeToString() + msg_size = len(msg_bytes) + + # Calculate varint size and allocate exact buffer size needed + + varint_bytes = encode_uvarint(msg_size) + varint_size = len(varint_bytes) + + # Allocate buffer with exact size (like Go's pool.Get()) + buf = bytearray(varint_size + msg_size) + + # Write varint length prefix to buffer (like Go's binary.PutUvarint()) + buf[:varint_size] = varint_bytes + + # Write serialized message after varint (like Go's rpc.MarshalTo()) + buf[varint_size:] = msg_bytes + + # Single write operation (like Go's s.Write(buf)) + await stream.write(bytes(buf)) + return True + except StreamClosed: + peer_id = stream.muxed_conn.peer_id + logger.debug("Fail to write message to %s: stream closed", peer_id) + self._handle_dead_peer(peer_id) + return False