diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 9a70b40b..aaf08ccd 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,6 +1,7 @@ import asyncio +from collections import namedtuple import time -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import Any, Awaitable, Callable, Dict, List, Tuple, Union, TYPE_CHECKING from lru import LRU @@ -20,6 +21,9 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: return (msg.seqno, msg.from_id) +TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) + + class Pubsub: host: IHost @@ -41,6 +45,8 @@ class Pubsub: peer_topics: Dict[str, List[ID]] peers: Dict[ID, INetStream] + topic_validators: Dict[str, TopicValidator] + # NOTE: Be sure it is increased atomically everytime. counter: int # uint64 @@ -93,6 +99,9 @@ class Pubsub: # Create peers map, which maps peer_id (as string) to stream (to a given peer) self.peers = {} + # Map of topic to topic validator + self.topic_validators = {} + self.counter = time.time_ns() # Call handle peer to keep waiting for updates to peer queue