From 3973f1d13c53b323c2c5080fff3bec5d67ab1f67 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 3 Aug 2019 18:44:40 +0800 Subject: [PATCH 01/17] Add `pubsub.topic_validators` --- libp2p/pubsub/pubsub.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 9a70b40b..aaf08ccd 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,6 +1,7 @@ import asyncio +from collections import namedtuple import time -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import Any, Awaitable, Callable, Dict, List, Tuple, Union, TYPE_CHECKING from lru import LRU @@ -20,6 +21,9 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: return (msg.seqno, msg.from_id) +TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) + + class Pubsub: host: IHost @@ -41,6 +45,8 @@ class Pubsub: peer_topics: Dict[str, List[ID]] peers: Dict[ID, INetStream] + topic_validators: Dict[str, TopicValidator] + # NOTE: Be sure it is increased atomically everytime. counter: int # uint64 @@ -93,6 +99,9 @@ class Pubsub: # Create peers map, which maps peer_id (as string) to stream (to a given peer) self.peers = {} + # Map of topic to topic validator + self.topic_validators = {} + self.counter = time.time_ns() # Call handle peer to keep waiting for updates to peer queue From b1f4813195012ae3a7d8db5b6ac8730a480bf30b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 10:44:12 +0800 Subject: [PATCH 02/17] Add add/remove topic validator functions --- libp2p/pubsub/pubsub.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index aaf08ccd..31ce4c7f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -24,6 +24,9 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) +ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[None]] + + class Pubsub: host: IHost @@ -158,6 +161,15 @@ class Pubsub: # Force context switch await asyncio.sleep(0) + def add_topic_validator( + self, topic: str, validator: ValidatorFn, is_async_validator: bool + ) -> None: + self.topic_validators[topic] = TopicValidator(validator, is_async_validator) + + def remove_topic_validator(self, topic: str) -> None: + if topic in self.topic_validators: + del self.topic_validators[topic] + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created From cf69f7e8004fdc66cd287ec22005b034d7a97732 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 11:03:29 +0800 Subject: [PATCH 03/17] Rename to `set_topic_validator` and add test --- libp2p/pubsub/pubsub.py | 2 +- tests/pubsub/test_pubsub.py | 48 +++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 31ce4c7f..671df9b2 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -161,7 +161,7 @@ class Pubsub: # Force context switch await asyncio.sleep(0) - def add_topic_validator( + def set_topic_validator( self, topic: str, validator: ValidatorFn, is_async_validator: bool ) -> None: self.topic_validators[topic] = TopicValidator(validator, is_async_validator) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 530677bb..438c48b2 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -84,6 +84,54 @@ async def test_get_hello_packet(pubsubs_fsub): assert topic in topic_ids_in_hello +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_add_topic_validator(pubsubs_fsub): + + is_sync_validator_called = False + + def sync_validator(peer_id, msg): + nonlocal is_sync_validator_called + is_sync_validator_called = True + + is_async_validator_called = False + + async def async_validator(peer_id, msg): + nonlocal is_async_validator_called + is_async_validator_called = True + + topic = "TEST_VALIDATOR" + + assert topic not in pubsubs_fsub[0].topic_validators + + # Register sync validator + pubsubs_fsub[0].set_topic_validator(topic, sync_validator, False) + + assert topic in pubsubs_fsub[0].topic_validators + topic_validator = pubsubs_fsub[0].topic_validators[topic] + assert not topic_validator.is_async + + # Validate with sync validator + topic_validator.validator(peer_id=ID(b"peer"), msg="msg") + + assert is_sync_validator_called + assert not is_async_validator_called + + # Register with async validator + pubsubs_fsub[0].set_topic_validator(topic, async_validator, True) + + is_sync_validator_called = False + assert topic in pubsubs_fsub[0].topic_validators + topic_validator = pubsubs_fsub[0].topic_validators[topic] + assert topic_validator.is_async + + # Validate with async validator + await topic_validator.validator(peer_id=ID(b"peer"), msg="msg") + + assert is_async_validator_called + assert not is_sync_validator_called + + class FakeNetStream: _queue: asyncio.Queue From 1ed14d0cc8d30c242d32498b5f9f6333c1a49298 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 11:11:35 +0800 Subject: [PATCH 04/17] Add `remove_topic_validator` test --- tests/pubsub/test_pubsub.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 438c48b2..5a397f85 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -86,7 +86,7 @@ async def test_get_hello_packet(pubsubs_fsub): @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_add_topic_validator(pubsubs_fsub): +async def test_set_and_remove_topic_validator(pubsubs_fsub): is_sync_validator_called = False @@ -131,6 +131,10 @@ async def test_add_topic_validator(pubsubs_fsub): assert is_async_validator_called assert not is_sync_validator_called + # Remove validator + pubsubs_fsub[0].remove_topic_validator(topic) + assert topic not in pubsubs_fsub[0].topic_validators + class FakeNetStream: _queue: asyncio.Queue From f8ca4fa1efeecc1978751c3b2ce5ffbeb92c48eb Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 11:23:20 +0800 Subject: [PATCH 05/17] Add `get_msg_validators` and test --- libp2p/pubsub/pubsub.py | 7 ++++++ tests/pubsub/test_pubsub.py | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 671df9b2..4b55fd0c 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -170,6 +170,13 @@ class Pubsub: if topic in self.topic_validators: del self.topic_validators[topic] + def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: + return ( + self.topic_validators[topic] + for topic in msg.topicIDs + if topic in self.topic_validators + ) + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 5a397f85..907f3b8e 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -136,6 +136,56 @@ async def test_set_and_remove_topic_validator(pubsubs_fsub): assert topic not in pubsubs_fsub[0].topic_validators +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.asyncio +async def test_get_msg_validators(pubsubs_fsub): + + times_sync_validator_called = 0 + + def sync_validator(peer_id, msg): + nonlocal times_sync_validator_called + times_sync_validator_called += 1 + + times_async_validator_called = 0 + + async def async_validator(peer_id, msg): + nonlocal times_async_validator_called + times_async_validator_called += 1 + + topic_1 = "TEST_VALIDATOR_1" + topic_2 = "TEST_VALIDATOR_2" + topic_3 = "TEST_VALIDATOR_3" + + # Register sync validator for topic 1 and 2 + pubsubs_fsub[0].set_topic_validator(topic_1, sync_validator, False) + pubsubs_fsub[0].set_topic_validator(topic_2, sync_validator, False) + + assert topic_1 in pubsubs_fsub[0].topic_validators + assert topic_2 in pubsubs_fsub[0].topic_validators + + # Register async validator for topic 3 + pubsubs_fsub[0].set_topic_validator(topic_3, async_validator, True) + + assert topic_3 in pubsubs_fsub[0].topic_validators + + msg = make_pubsub_msg( + origin_id=pubsubs_fsub[0].my_id, + topic_ids=[topic_1, topic_2, topic_3], + data=b"1234", + seqno=b"\x00" * 8, + ) + + topic_validators = pubsubs_fsub[0].get_msg_validators(msg) + for topic_validator in topic_validators: + if topic_validator.is_async: + await topic_validator.validator(peer_id=ID(b"peer"), msg="msg") + else: + topic_validator.validator(peer_id=ID(b"peer"), msg="msg") + + assert times_sync_validator_called == 2 + assert times_async_validator_called == 1 + + class FakeNetStream: _queue: asyncio.Queue From ec2c566e5aab65fde6d2b4a1b73957f47e6ceb10 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 12:33:22 +0800 Subject: [PATCH 06/17] Fix validator return type and add docstring --- libp2p/pubsub/pubsub.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 4b55fd0c..0e6cbaf7 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -24,7 +24,7 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) -ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[None]] +ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[bool]] class Pubsub: @@ -164,13 +164,22 @@ class Pubsub: def set_topic_validator( self, topic: str, validator: ValidatorFn, is_async_validator: bool ) -> None: + """ + Register a validator under the given topic. One topic can only have one validtor. + """ self.topic_validators[topic] = TopicValidator(validator, is_async_validator) def remove_topic_validator(self, topic: str) -> None: + """ + Remove the validator from the given topic. + """ if topic in self.topic_validators: del self.topic_validators[topic] def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: + """ + Get all validators corresponding to the topics in the message. + """ return ( self.topic_validators[topic] for topic in msg.topicIDs From e1b86904e3f26f21e8f7ae0bc7d77af3cc1ae476 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 18:13:23 +0800 Subject: [PATCH 07/17] Add `validate_msg` and test --- libp2p/pubsub/pubsub.py | 32 ++++++++--- libp2p/pubsub/validators.py | 9 +++ .../floodsub_integration_test_settings.py | 2 +- tests/pubsub/test_pubsub.py | 57 +++++++++++++++++-- 4 files changed, 87 insertions(+), 13 deletions(-) create mode 100644 libp2p/pubsub/validators.py diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 0e6cbaf7..84df48ff 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,7 +1,7 @@ import asyncio from collections import namedtuple import time -from typing import Any, Awaitable, Callable, Dict, List, Tuple, Union, TYPE_CHECKING +from typing import Any, Awaitable, Callable, Dict, Iterable, List, Tuple, Union, TYPE_CHECKING from lru import LRU @@ -176,15 +176,13 @@ class Pubsub: if topic in self.topic_validators: del self.topic_validators[topic] - def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: + def get_msg_validators(self, msg: rpc_pb2.Message) -> Iterable[TopicValidator]: """ Get all validators corresponding to the topics in the message. """ - return ( - self.topic_validators[topic] - for topic in msg.topicIDs - if topic in self.topic_validators - ) + for topic in msg.topicIDs: + if topic in self.topic_validators: + yield self.topic_validators[topic] async def stream_handler(self, stream: INetStream) -> None: """ @@ -357,6 +355,26 @@ class Pubsub: await self.push_msg(self.host.get_id(), msg) + async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: + sync_topic_validators = [] + async_topic_validator_futures = [] + for topic_validator in self.get_msg_validators(msg): + if topic_validator.is_async: + async_topic_validator_futures.append( + topic_validator.validator(msg_forwarder, msg) + ) + else: + sync_topic_validators.append(topic_validator.validator) + + for validator in sync_topic_validators: + if not validator(msg_forwarder, msg): + return False + + # TODO: Implement throttle on async validators + + results = await asyncio.gather(*async_topic_validator_futures) + return all(results) + async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ Push a pubsub message to others. diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py new file mode 100644 index 00000000..e575980d --- /dev/null +++ b/libp2p/pubsub/validators.py @@ -0,0 +1,9 @@ +# FIXME: Replace the type of `pubkey` with a custom type `Pubkey` +def signature_validator(pubkey: bytes, msg: bytes) -> bool: + """ + Verify the message against the given public key. + :param pubkey: the public key which signs the message. + :param msg: the message signed. + """ + # TODO: Implement the signature validation + return True diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py index f72dc227..736e7250 100644 --- a/tests/pubsub/floodsub_integration_test_settings.py +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -208,7 +208,7 @@ async def perform_test_from_obj(obj, router_factory): tasks_topic.append(asyncio.sleep(2)) # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic, return_exceptions=True) + responses = await asyncio.gather(*tasks_topic) for i in range(len(responses) - 1): node_id, topic = tasks_topic_data[i] if node_id not in queues_map: diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 907f3b8e..d66960ae 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -160,14 +160,9 @@ async def test_get_msg_validators(pubsubs_fsub): pubsubs_fsub[0].set_topic_validator(topic_1, sync_validator, False) pubsubs_fsub[0].set_topic_validator(topic_2, sync_validator, False) - assert topic_1 in pubsubs_fsub[0].topic_validators - assert topic_2 in pubsubs_fsub[0].topic_validators - # Register async validator for topic 3 pubsubs_fsub[0].set_topic_validator(topic_3, async_validator, True) - assert topic_3 in pubsubs_fsub[0].topic_validators - msg = make_pubsub_msg( origin_id=pubsubs_fsub[0].my_id, topic_ids=[topic_1, topic_2, topic_3], @@ -186,6 +181,58 @@ async def test_get_msg_validators(pubsubs_fsub): assert times_async_validator_called == 1 +@pytest.mark.parametrize("num_hosts", (1,)) +@pytest.mark.parametrize( + "is_topic_1_val_passed, is_topic_2_val_passed", + ( + (False, True), + (True, False), + (True, True), + ) +) +@pytest.mark.asyncio +async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_passed): + + def passed_sync_validator(peer_id, msg): + return True + + def failed_sync_validator(peer_id, msg): + return False + + async def passed_async_validator(peer_id, msg): + return True + + async def failed_async_validator(peer_id, msg): + return False + + topic_1 = "TEST_SYNC_VALIDATOR" + topic_2 = "TEST_ASYNC_VALIDATOR" + + if is_topic_1_val_passed: + pubsubs_fsub[0].set_topic_validator(topic_1, passed_sync_validator, False) + else: + pubsubs_fsub[0].set_topic_validator(topic_1, failed_sync_validator, False) + + if is_topic_2_val_passed: + pubsubs_fsub[0].set_topic_validator(topic_2, passed_async_validator, True) + else: + pubsubs_fsub[0].set_topic_validator(topic_2, failed_async_validator, True) + + msg = make_pubsub_msg( + origin_id=pubsubs_fsub[0].my_id, + topic_ids=[topic_1, topic_2], + data=b"1234", + seqno=b"\x00" * 8, + ) + + is_validation_passed = await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) + + if is_topic_1_val_passed and is_topic_2_val_passed: + assert is_validation_passed + else: + assert not is_validation_passed + + class FakeNetStream: _queue: asyncio.Queue From 19ce5bb4200c7ff63aaf3afb6c79966bdcae35d0 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 18:13:34 +0800 Subject: [PATCH 08/17] Add `signature_validator` stub and docstring --- libp2p/pubsub/pubsub.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 84df48ff..a94542cb 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,7 +1,17 @@ import asyncio from collections import namedtuple import time -from typing import Any, Awaitable, Callable, Dict, Iterable, List, Tuple, Union, TYPE_CHECKING +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + List, + Tuple, + Union, + TYPE_CHECKING, +) from lru import LRU @@ -166,12 +176,16 @@ class Pubsub: ) -> None: """ Register a validator under the given topic. One topic can only have one validtor. + :param topic: the topic to register validator under + :param validator: the validator used to validate messages published to the topic + :param is_async_validator: indicate if the validator is an asynchronous validator """ self.topic_validators[topic] = TopicValidator(validator, is_async_validator) def remove_topic_validator(self, topic: str) -> None: """ Remove the validator from the given topic. + :param topic: the topic to remove validator from """ if topic in self.topic_validators: del self.topic_validators[topic] @@ -179,6 +193,7 @@ class Pubsub: def get_msg_validators(self, msg: rpc_pb2.Message) -> Iterable[TopicValidator]: """ Get all validators corresponding to the topics in the message. + :param msg: the message published to the topic """ for topic in msg.topicIDs: if topic in self.topic_validators: @@ -356,6 +371,11 @@ class Pubsub: await self.push_msg(self.host.get_id(), msg) async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: + """ + Validate the received message + :param msg_forwarder: the peer who forward us the message. + :param msg: the message. + """ sync_topic_validators = [] async_topic_validator_futures = [] for topic_validator in self.get_msg_validators(msg): From 2bb7f42c207f27ad362e168e0ee77ece91eba38b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 4 Aug 2019 18:13:45 +0800 Subject: [PATCH 09/17] Add validators to `push_msg` --- libp2p/pubsub/pubsub.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a94542cb..b367fe99 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -21,6 +21,7 @@ from libp2p.peer.id import ID from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee +from .validators import signature_validator if TYPE_CHECKING: from .pubsub_router_interface import IPubsubRouter @@ -392,8 +393,11 @@ class Pubsub: # TODO: Implement throttle on async validators - results = await asyncio.gather(*async_topic_validator_futures) - return all(results) + if len(async_topic_validator_futures) > 0: + results = await asyncio.gather(*async_topic_validator_futures) + return all(results) + else: + return True async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ @@ -411,6 +415,14 @@ class Pubsub: return # TODO: - Validate the message. If failed, reject it. + # Validate the signature of the message + # FIXME: `signature_validator` is currently a stub. + if not signature_validator(msg.key, msg.SerializeToString()): + return + # Validate the message with registered topic validators + is_validation_passed = await self.validate_msg(msg_forwarder, msg) + if not is_validation_passed: + return self._mark_msg_seen(msg) await self.handle_talk(msg) From a2efd03dfab6a337a7152fed414bb282f13242be Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 5 Aug 2019 18:15:48 +0800 Subject: [PATCH 10/17] Schedule `push_msg` into a task --- libp2p/pubsub/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b367fe99..5402cac1 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -151,7 +151,7 @@ class Pubsub: continue # TODO(mhchia): This will block this read_stream loop until all data are pushed. # Should investigate further if this is an issue. - await self.push_msg(msg_forwarder=peer_id, msg=msg) + asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: # deal with RPC.subscriptions From b96ef0e6c7e9dbf42b69d97caf84c4f708d711f9 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 5 Aug 2019 18:20:04 +0800 Subject: [PATCH 11/17] Fix: `_is_subscribed_to_msg` need only subscribe to one of the topics --- libp2p/pubsub/pubsub.py | 18 +++--------------- tests/pubsub/test_pubsub.py | 8 +------- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5402cac1..0dcee671 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,17 +1,7 @@ import asyncio from collections import namedtuple import time -from typing import ( - Any, - Awaitable, - Callable, - Dict, - Iterable, - List, - Tuple, - Union, - TYPE_CHECKING, -) +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Iterable, List, Tuple, Union from lru import LRU @@ -381,9 +371,7 @@ class Pubsub: async_topic_validator_futures = [] for topic_validator in self.get_msg_validators(msg): if topic_validator.is_async: - async_topic_validator_futures.append( - topic_validator.validator(msg_forwarder, msg) - ) + async_topic_validator_futures.append(topic_validator.validator(msg_forwarder, msg)) else: sync_topic_validators.append(topic_validator.validator) @@ -448,4 +436,4 @@ class Pubsub: def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: if not self.my_topics: return False - return all([topic in self.my_topics for topic in msg.topicIDs]) + return any([topic in self.my_topics for topic in msg.topicIDs]) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index d66960ae..ad9dd434 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -183,16 +183,10 @@ async def test_get_msg_validators(pubsubs_fsub): @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.parametrize( - "is_topic_1_val_passed, is_topic_2_val_passed", - ( - (False, True), - (True, False), - (True, True), - ) + "is_topic_1_val_passed, is_topic_2_val_passed", ((False, True), (True, False), (True, True)) ) @pytest.mark.asyncio async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_passed): - def passed_sync_validator(peer_id, msg): return True From 47643a67c61609b47b71052ab82f0d1d23a5e483 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 6 Aug 2019 12:32:18 +0800 Subject: [PATCH 12/17] Apply PR feedback --- libp2p/pubsub/pubsub.py | 30 ++++++++++++++++++------------ libp2p/pubsub/validators.py | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 0dcee671..dd51be2f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,7 +1,7 @@ import asyncio -from collections import namedtuple +import logging import time -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Iterable, List, Tuple, Union +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, NamedTuple, Tuple, Union from lru import LRU @@ -17,17 +17,20 @@ if TYPE_CHECKING: from .pubsub_router_interface import IPubsubRouter +log = logging.getLogger(__name__) + + def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: # NOTE: `string(from, seqno)` in Go return (msg.seqno, msg.from_id) -TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) - - ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[bool]] +TopicValidator = NamedTuple("TopicValidator", (("validator", ValidatorFn), ("is_async", bool))) + + class Pubsub: host: IHost @@ -181,14 +184,14 @@ class Pubsub: if topic in self.topic_validators: del self.topic_validators[topic] - def get_msg_validators(self, msg: rpc_pb2.Message) -> Iterable[TopicValidator]: + def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: """ Get all validators corresponding to the topics in the message. :param msg: the message published to the topic """ - for topic in msg.topicIDs: - if topic in self.topic_validators: - yield self.topic_validators[topic] + return ( + self.topic_validators[topic] for topic in msg.topicIDs if topic in self.topic_validators + ) async def stream_handler(self, stream: INetStream) -> None: """ @@ -399,15 +402,18 @@ class Pubsub: # TODO: Check if signing is required and if so signature should be attached. + # If the message is processed before, return(i.e., don't further process the message). if self._is_msg_seen(msg): return # TODO: - Validate the message. If failed, reject it. # Validate the signature of the message # FIXME: `signature_validator` is currently a stub. - if not signature_validator(msg.key, msg.SerializeToString()): + if not signature_validator(msg.key, msg.SerializeToString(), msg.singature): + log.debug(f"Signature validation failed for msg={msg}") return - # Validate the message with registered topic validators + # Validate the message with registered topic validators. + # If the validation failed, return(i.e., don't further process the message). is_validation_passed = await self.validate_msg(msg_forwarder, msg) if not is_validation_passed: return @@ -436,4 +442,4 @@ class Pubsub: def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: if not self.my_topics: return False - return any([topic in self.my_topics for topic in msg.topicIDs]) + return any(topic in self.my_topics for topic in msg.topicIDs) diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py index e575980d..4d99eded 100644 --- a/libp2p/pubsub/validators.py +++ b/libp2p/pubsub/validators.py @@ -1,5 +1,5 @@ # FIXME: Replace the type of `pubkey` with a custom type `Pubkey` -def signature_validator(pubkey: bytes, msg: bytes) -> bool: +def signature_validator(pubkey: bytes, msg: bytes, sig: bytes) -> bool: """ Verify the message against the given public key. :param pubkey: the public key which signs the message. From 9a1e5fe813d4465bece409026a0058a4522bef23 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 6 Aug 2019 12:37:34 +0800 Subject: [PATCH 13/17] Add `ValidationError` --- libp2p/exceptions.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 libp2p/exceptions.py diff --git a/libp2p/exceptions.py b/libp2p/exceptions.py new file mode 100644 index 00000000..8d8af447 --- /dev/null +++ b/libp2p/exceptions.py @@ -0,0 +1,6 @@ +class ValidationError(Exception): + """ + Raised when something does not pass a validation check. + """ + + pass From 1cea1264a438ab3497c5886fd65bb03d87e329a5 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 6 Aug 2019 12:38:31 +0800 Subject: [PATCH 14/17] Raise exception when topic validation failed --- libp2p/pubsub/pubsub.py | 16 +++++++++------- tests/pubsub/test_pubsub.py | 12 ++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index dd51be2f..37eb9321 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, NamedTup from lru import LRU +from libp2p.exceptions import ValidationError from libp2p.host.host_interface import IHost from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID @@ -364,7 +365,7 @@ class Pubsub: await self.push_msg(self.host.get_id(), msg) - async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: + async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ Validate the received message :param msg_forwarder: the peer who forward us the message. @@ -380,15 +381,14 @@ class Pubsub: for validator in sync_topic_validators: if not validator(msg_forwarder, msg): - return False + raise ValidationError(f"Validation failed for msg={msg}") # TODO: Implement throttle on async validators if len(async_topic_validator_futures) > 0: results = await asyncio.gather(*async_topic_validator_futures) - return all(results) - else: - return True + if not all(results): + raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ @@ -414,8 +414,10 @@ class Pubsub: return # Validate the message with registered topic validators. # If the validation failed, return(i.e., don't further process the message). - is_validation_passed = await self.validate_msg(msg_forwarder, msg) - if not is_validation_passed: + try: + await self.validate_msg(msg_forwarder, msg) + except ValidationError: + log.debug(f"Topic validation failed for msg={msg}") return self._mark_msg_seen(msg) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index ad9dd434..0799c341 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -4,6 +4,7 @@ from typing import NamedTuple import pytest +from libp2p.exceptions import ValidationError from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 from tests.utils import connect @@ -191,13 +192,13 @@ async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_ return True def failed_sync_validator(peer_id, msg): - return False + raise ValidationError() async def passed_async_validator(peer_id, msg): return True async def failed_async_validator(peer_id, msg): - return False + raise ValidationError() topic_1 = "TEST_SYNC_VALIDATOR" topic_2 = "TEST_ASYNC_VALIDATOR" @@ -219,12 +220,11 @@ async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_ seqno=b"\x00" * 8, ) - is_validation_passed = await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) - if is_topic_1_val_passed and is_topic_2_val_passed: - assert is_validation_passed + await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) else: - assert not is_validation_passed + with pytest.raises(ValidationError): + await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) class FakeNetStream: From d4febea46989f07cdbb6010046d2e77c4f20d07e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 6 Aug 2019 13:05:31 +0800 Subject: [PATCH 15/17] Message was not enforced to carry signature yet --- libp2p/pubsub/pubsub.py | 2 +- libp2p/pubsub/validators.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 37eb9321..75058e5f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -409,7 +409,7 @@ class Pubsub: # TODO: - Validate the message. If failed, reject it. # Validate the signature of the message # FIXME: `signature_validator` is currently a stub. - if not signature_validator(msg.key, msg.SerializeToString(), msg.singature): + if not signature_validator(msg.key, msg.SerializeToString()): log.debug(f"Signature validation failed for msg={msg}") return # Validate the message with registered topic validators. diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py index 4d99eded..e575980d 100644 --- a/libp2p/pubsub/validators.py +++ b/libp2p/pubsub/validators.py @@ -1,5 +1,5 @@ # FIXME: Replace the type of `pubkey` with a custom type `Pubkey` -def signature_validator(pubkey: bytes, msg: bytes, sig: bytes) -> bool: +def signature_validator(pubkey: bytes, msg: bytes) -> bool: """ Verify the message against the given public key. :param pubkey: the public key which signs the message. From b26426214ec70a9d6a428c9fdba6dc32c4113b02 Mon Sep 17 00:00:00 2001 From: NIC Lin Date: Wed, 7 Aug 2019 11:43:32 +0800 Subject: [PATCH 16/17] Update libp2p/pubsub/pubsub.py Co-Authored-By: Kevin Mai-Husan Chia --- libp2p/pubsub/pubsub.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 75058e5f..d0dad89b 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -26,7 +26,9 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: return (msg.seqno, msg.from_id) -ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[bool]] +SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool] +AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]] +ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn] TopicValidator = NamedTuple("TopicValidator", (("validator", ValidatorFn), ("is_async", bool))) From a1dc68ab704498363b0737e16dfc6d824dfb2754 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Wed, 7 Aug 2019 11:53:54 +0800 Subject: [PATCH 17/17] Apply PR feedback: add validation failed test to `push_msg` test --- tests/pubsub/test_pubsub.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 0799c341..170b72b9 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -192,13 +192,13 @@ async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_ return True def failed_sync_validator(peer_id, msg): - raise ValidationError() + return False async def passed_async_validator(peer_id, msg): return True async def failed_async_validator(peer_id, msg): - raise ValidationError() + return False topic_1 = "TEST_SYNC_VALIDATOR" topic_2 = "TEST_ASYNC_VALIDATOR" @@ -462,3 +462,23 @@ async def test_push_msg(pubsubs_fsub, monkeypatch): await asyncio.wait_for(event.wait(), timeout=0.1) # Test: Subscribers are notified when `push_msg` new messages. assert (await sub.get()) == msg_1 + + # Test: add a topic validator and `push_msg` the message that + # does not pass the validation. + # `router_publish` is not called then. + def failed_sync_validator(peer_id, msg): + return False + + pubsubs_fsub[0].set_topic_validator(TESTING_TOPIC, failed_sync_validator, False) + + msg_2 = make_pubsub_msg( + origin_id=pubsubs_fsub[0].my_id, + topic_ids=[TESTING_TOPIC], + data=TESTING_DATA, + seqno=b"\x22" * 8, + ) + + event.clear() + await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg_2) + await asyncio.sleep(0.01) + assert not event.is_set()