Optimize pubsub publishing to support multiple topics in single RPC message (#686)

* init

* add newsfragment

* lint

---------

Co-authored-by: Manu Sheel Gupta <manusheel.edu@gmail.com>
This commit is contained in:
guha-rahul
2025-06-18 02:53:03 +05:30
committed by GitHub
parent 2ed2587fc9
commit 79094d70d3
4 changed files with 18 additions and 17 deletions

View File

@ -2130,14 +2130,14 @@ class IPubsub(ServiceAPI):
...
@abstractmethod
async def publish(self, topic_id: str, data: bytes) -> None:
async def publish(self, topic_id: str | list[str], data: bytes) -> None:
"""
Publish a message to a topic.
Publish a message to a topic or multiple topics.
Parameters
----------
topic_id : str
The identifier of the topic.
topic_id : str | list[str]
The identifier of the topic (str) or topics (list[str]).
data : bytes
The data to publish.

View File

@ -620,16 +620,22 @@ class Pubsub(Service, IPubsub):
logger.debug("Fail to message peer %s: stream closed", peer_id)
self._handle_dead_peer(peer_id)
async def publish(self, topic_id: str, data: bytes) -> None:
async def publish(self, topic_id: str | list[str], data: bytes) -> None:
"""
Publish data to a topic.
Publish data to a topic or multiple topics.
:param topic_id: topic which we are going to publish the data to
:param topic_id: topic (str) or topics (list[str]) to publish the data to
:param data: data which we are publishing
"""
# Handle both single topic (str) and multiple topics (list[str])
if isinstance(topic_id, str):
topic_ids = [topic_id]
else:
topic_ids = topic_id
msg = rpc_pb2.Message(
data=data,
topicIDs=[topic_id],
topicIDs=topic_ids,
# Origin is ourself.
from_id=self.my_id.to_bytes(),
seqno=self._next_seqno(),

View File

@ -0,0 +1 @@
Optimized pubsub publishing to send multiple topics in a single message instead of separate messages per topic.

View File

@ -1,7 +1,3 @@
# type: ignore
# To add typing to this module, it's better to do it after refactoring test cases
# into classes
import pytest
import trio
@ -151,7 +147,7 @@ FLOODSUB_PROTOCOL_TEST_CASES = [
]
floodsub_protocol_pytest_params = [
pytest.param(test_case, id=test_case["name"])
pytest.param(test_case, id=str(test_case["name"]))
for test_case in FLOODSUB_PROTOCOL_TEST_CASES
]
@ -241,10 +237,8 @@ async def perform_test_from_obj(obj, pubsub_factory) -> None:
data = msg["data"]
node_id = msg["node_id"]
# Publish message
# TODO: Should be single RPC package with several topics
for topic in topics:
await pubsub_map[node_id].publish(topic, data)
# Publish message - now uses single RPC package with several topics
await pubsub_map[node_id].publish(topics, data)
# For each topic in topics, add (topic, node_id, data) tuple to
# ordered test list