diff --git a/libp2p/abc.py b/libp2p/abc.py index a50a364d..dc941c43 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -2130,14 +2130,14 @@ class IPubsub(ServiceAPI): ... @abstractmethod - async def publish(self, topic_id: str, data: bytes) -> None: + async def publish(self, topic_id: str | list[str], data: bytes) -> None: """ - Publish a message to a topic. + Publish a message to a topic or multiple topics. Parameters ---------- - topic_id : str - The identifier of the topic. + topic_id : str | list[str] + The identifier of the topic (str) or topics (list[str]). data : bytes The data to publish. diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f66f30a..8ba7d471 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -620,16 +620,22 @@ class Pubsub(Service, IPubsub): logger.debug("Fail to message peer %s: stream closed", peer_id) self._handle_dead_peer(peer_id) - async def publish(self, topic_id: str, data: bytes) -> None: + async def publish(self, topic_id: str | list[str], data: bytes) -> None: """ - Publish data to a topic. + Publish data to a topic or multiple topics. - :param topic_id: topic which we are going to publish the data to + :param topic_id: topic (str) or topics (list[str]) to publish the data to :param data: data which we are publishing """ + # Handle both single topic (str) and multiple topics (list[str]) + if isinstance(topic_id, str): + topic_ids = [topic_id] + else: + topic_ids = topic_id + msg = rpc_pb2.Message( data=data, - topicIDs=[topic_id], + topicIDs=topic_ids, # Origin is ourself. from_id=self.my_id.to_bytes(), seqno=self._next_seqno(), diff --git a/newsfragments/685.feature.rst b/newsfragments/685.feature.rst new file mode 100644 index 00000000..1ea15c49 --- /dev/null +++ b/newsfragments/685.feature.rst @@ -0,0 +1 @@ +Optimized pubsub publishing to send multiple topics in a single message instead of separate messages per topic. diff --git a/tests/utils/pubsub/floodsub_integration_test_settings.py b/tests/utils/pubsub/floodsub_integration_test_settings.py index ab895d71..4550a0f8 100644 --- a/tests/utils/pubsub/floodsub_integration_test_settings.py +++ b/tests/utils/pubsub/floodsub_integration_test_settings.py @@ -1,7 +1,3 @@ -# type: ignore -# To add typing to this module, it's better to do it after refactoring test cases -# into classes - import pytest import trio @@ -151,7 +147,7 @@ FLOODSUB_PROTOCOL_TEST_CASES = [ ] floodsub_protocol_pytest_params = [ - pytest.param(test_case, id=test_case["name"]) + pytest.param(test_case, id=str(test_case["name"])) for test_case in FLOODSUB_PROTOCOL_TEST_CASES ] @@ -241,10 +237,8 @@ async def perform_test_from_obj(obj, pubsub_factory) -> None: data = msg["data"] node_id = msg["node_id"] - # Publish message - # TODO: Should be single RPC package with several topics - for topic in topics: - await pubsub_map[node_id].publish(topic, data) + # Publish message - now uses single RPC package with several topics + await pubsub_map[node_id].publish(topics, data) # For each topic in topics, add (topic, node_id, data) tuple to # ordered test list