diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index aaf08ccd..31ce4c7f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -24,6 +24,9 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) +ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[None]] + + class Pubsub: host: IHost @@ -158,6 +161,15 @@ class Pubsub: # Force context switch await asyncio.sleep(0) + def add_topic_validator( + self, topic: str, validator: ValidatorFn, is_async_validator: bool + ) -> None: + self.topic_validators[topic] = TopicValidator(validator, is_async_validator) + + def remove_topic_validator(self, topic: str) -> None: + if topic in self.topic_validators: + del self.topic_validators[topic] + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created