From a3c9ac61e66ad4a160a34935a6da184bb7632071 Mon Sep 17 00:00:00 2001 From: varunrmallya <100590632+varun-r-mallya@users.noreply.github.com> Date: Thu, 5 Jun 2025 18:55:59 +0530 Subject: [PATCH 1/2] Improve performance of read from daemon test (#646) Signed-off-by: varun-r-mallya --- tests/utils/interop/process.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/utils/interop/process.py b/tests/utils/interop/process.py index cce4d78e..0d9e2650 100644 --- a/tests/utils/interop/process.py +++ b/tests/utils/interop/process.py @@ -32,18 +32,25 @@ class BaseInteractiveProcess(AbstractInterativeProcess): async def wait_until_ready(self) -> None: patterns_occurred = {pat: False for pat in self.patterns} + buffers = {pat: bytearray() for pat in self.patterns} async def read_from_daemon_and_check() -> None: async for data in self.proc.stdout: - # TODO: It takes O(n^2), which is quite bad. - # But it should succeed in a few seconds. self.bytes_read.extend(data) for pat, occurred in patterns_occurred.items(): if occurred: continue - if pat in self.bytes_read: + + # Check if pattern is in new data or spans across chunks + buf = buffers[pat] + buf.extend(data) + if pat in buf: patterns_occurred[pat] = True - if all([value for value in patterns_occurred.values()]): + else: + keep = min(len(pat) - 1, len(buf)) + buffers[pat] = buf[-keep:] if keep > 0 else bytearray() + + if all(patterns_occurred.values()): return with trio.fail_after(TIMEOUT_DURATION): From 5ca6f26933cfb5ddd8b81c1423be6f7f30a880e6 Mon Sep 17 00:00:00 2001 From: guha-rahul <52607971+guha-rahul@users.noreply.github.com> Date: Thu, 5 Jun 2025 20:40:04 +0530 Subject: [PATCH 2/2] feat: Add blacklisting of peers (#651) * init * remove blacklist validation after hello packet * add docs and newsfragment --- libp2p/pubsub/pubsub.py | 102 ++++++++- newsfragments/641.feature.rst | 1 + tests/core/pubsub/test_pubsub.py | 366 +++++++++++++++++++++++++++++++ 3 files changed, 466 insertions(+), 3 deletions(-) create mode 100644 newsfragments/641.feature.rst diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index ed6b75b0..1f37607e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -122,6 +122,9 @@ class Pubsub(Service, IPubsub): strict_signing: bool sign_key: PrivateKey + # Set of blacklisted peer IDs + blacklisted_peers: set[ID] + event_handle_peer_queue_started: trio.Event event_handle_dead_peer_queue_started: trio.Event @@ -201,6 +204,9 @@ class Pubsub(Service, IPubsub): self.counter = int(time.time()) + # Set of blacklisted peer IDs + self.blacklisted_peers = set() + self.event_handle_peer_queue_started = trio.Event() self.event_handle_dead_peer_queue_started = trio.Event() @@ -320,6 +326,82 @@ class Pubsub(Service, IPubsub): if topic in self.topic_validators ) + def add_to_blacklist(self, peer_id: ID) -> None: + """ + Add a peer to the blacklist. + When a peer is blacklisted: + - Any existing connection to that peer is immediately closed and removed + - The peer is removed from all topic subscription mappings + - Future connection attempts from this peer will be rejected + - Messages forwarded by or originating from this peer will be dropped + - The peer will not be able to participate in pubsub communication + + :param peer_id: the peer ID to blacklist + """ + self.blacklisted_peers.add(peer_id) + logger.debug("Added peer %s to blacklist", peer_id) + self.manager.run_task(self._teardown_if_connected, peer_id) + + async def _teardown_if_connected(self, peer_id: ID) -> None: + """Close their stream and remove them if connected""" + stream = self.peers.get(peer_id) + if stream is not None: + try: + await stream.reset() + except Exception: + pass + del self.peers[peer_id] + # Also remove from any subscription maps: + for _topic, peerset in self.peer_topics.items(): + if peer_id in peerset: + peerset.discard(peer_id) + + def remove_from_blacklist(self, peer_id: ID) -> None: + """ + Remove a peer from the blacklist. + Once removed from the blacklist: + - The peer can establish new connections to this node + - Messages from this peer will be processed normally + - The peer can participate in topic subscriptions and message forwarding + + :param peer_id: the peer ID to remove from blacklist + """ + self.blacklisted_peers.discard(peer_id) + logger.debug("Removed peer %s from blacklist", peer_id) + + def is_peer_blacklisted(self, peer_id: ID) -> bool: + """ + Check if a peer is blacklisted. + + :param peer_id: the peer ID to check + :return: True if peer is blacklisted, False otherwise + """ + return peer_id in self.blacklisted_peers + + def clear_blacklist(self) -> None: + """ + Clear all peers from the blacklist. + This removes all blacklist restrictions, allowing previously blacklisted + peers to: + - Establish new connections + - Send and forward messages + - Participate in topic subscriptions + + """ + self.blacklisted_peers.clear() + logger.debug("Cleared all peers from blacklist") + + def get_blacklisted_peers(self) -> set[ID]: + """ + Get a copy of the current blacklisted peers. + Returns a snapshot of all currently blacklisted peer IDs. These peers + are completely isolated from pubsub communication - their connections + are rejected and their messages are dropped. + + :return: a set containing all blacklisted peer IDs + """ + return self.blacklisted_peers.copy() + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is @@ -346,6 +428,10 @@ class Pubsub(Service, IPubsub): await self.event_handle_dead_peer_queue_started.wait() async def _handle_new_peer(self, peer_id: ID) -> None: + if self.is_peer_blacklisted(peer_id): + logger.debug("Rejecting blacklisted peer %s", peer_id) + return + try: stream: INetStream = await self.host.new_stream(peer_id, self.protocols) except SwarmException as error: @@ -359,7 +445,6 @@ class Pubsub(Service, IPubsub): except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) return - # TODO: Check if the peer in black list. try: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: @@ -609,9 +694,20 @@ class Pubsub(Service, IPubsub): """ logger.debug("attempting to publish message %s", msg) - # TODO: Check if the `source` is in the blacklist. If yes, reject. + # Check if the message forwarder (source) is in the blacklist. If yes, reject. + if self.is_peer_blacklisted(msg_forwarder): + logger.debug( + "Rejecting message from blacklisted source peer %s", msg_forwarder + ) + return - # TODO: Check if the `from` is in the blacklist. If yes, reject. + # Check if the message originator (from) is in the blacklist. If yes, reject. + msg_from_peer = ID(msg.from_id) + if self.is_peer_blacklisted(msg_from_peer): + logger.debug( + "Rejecting message from blacklisted originator peer %s", msg_from_peer + ) + return # If the message is processed before, return(i.e., don't further process the message) # noqa: E501 if self._is_msg_seen(msg): diff --git a/newsfragments/641.feature.rst b/newsfragments/641.feature.rst new file mode 100644 index 00000000..80e75a09 --- /dev/null +++ b/newsfragments/641.feature.rst @@ -0,0 +1 @@ +implement blacklist management for `pubsub.Pubsub` with methods to get, add, remove, check, and clear blacklisted peer IDs. diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 55897a68..ff145887 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -702,3 +702,369 @@ async def test_strict_signing_failed_validation(monkeypatch): await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg) await trio.sleep(0.01) assert event.is_set() + + +@pytest.mark.trio +async def test_blacklist_basic_operations(): + """Test basic blacklist operations: add, remove, check, clear.""" + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + + # Create test peer IDs + peer1 = IDFactory() + peer2 = IDFactory() + peer3 = IDFactory() + + # Initially no peers should be blacklisted + assert len(pubsub.get_blacklisted_peers()) == 0 + assert not pubsub.is_peer_blacklisted(peer1) + assert not pubsub.is_peer_blacklisted(peer2) + assert not pubsub.is_peer_blacklisted(peer3) + + # Add peers to blacklist + pubsub.add_to_blacklist(peer1) + pubsub.add_to_blacklist(peer2) + + # Check blacklist state + assert len(pubsub.get_blacklisted_peers()) == 2 + assert pubsub.is_peer_blacklisted(peer1) + assert pubsub.is_peer_blacklisted(peer2) + assert not pubsub.is_peer_blacklisted(peer3) + + # Remove one peer from blacklist + pubsub.remove_from_blacklist(peer1) + + # Check state after removal + assert len(pubsub.get_blacklisted_peers()) == 1 + assert not pubsub.is_peer_blacklisted(peer1) + assert pubsub.is_peer_blacklisted(peer2) + assert not pubsub.is_peer_blacklisted(peer3) + + # Add peer3 and then clear all + pubsub.add_to_blacklist(peer3) + assert len(pubsub.get_blacklisted_peers()) == 2 + + pubsub.clear_blacklist() + assert len(pubsub.get_blacklisted_peers()) == 0 + assert not pubsub.is_peer_blacklisted(peer1) + assert not pubsub.is_peer_blacklisted(peer2) + assert not pubsub.is_peer_blacklisted(peer3) + + # Test duplicate additions (should not increase size) + pubsub.add_to_blacklist(peer1) + pubsub.add_to_blacklist(peer1) + assert len(pubsub.get_blacklisted_peers()) == 1 + + # Test removing non-blacklisted peer (should not cause errors) + pubsub.remove_from_blacklist(peer2) + assert len(pubsub.get_blacklisted_peers()) == 1 + + +@pytest.mark.trio +async def test_blacklist_blocks_new_peer_connections(monkeypatch): + """Test that blacklisted peers are rejected when trying to connect.""" + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + + # Create a blacklisted peer ID + blacklisted_peer = IDFactory() + + # Add peer to blacklist + pubsub.add_to_blacklist(blacklisted_peer) + + new_stream_called = False + + async def mock_new_stream(*args, **kwargs): + nonlocal new_stream_called + new_stream_called = True + # Create a mock stream + from unittest.mock import ( + AsyncMock, + Mock, + ) + + mock_stream = Mock() + mock_stream.write = AsyncMock() + mock_stream.reset = AsyncMock() + mock_stream.get_protocol = Mock(return_value="test_protocol") + return mock_stream + + router_add_peer_called = False + + def mock_add_peer(*args, **kwargs): + nonlocal router_add_peer_called + router_add_peer_called = True + + with monkeypatch.context() as m: + m.setattr(pubsub.host, "new_stream", mock_new_stream) + m.setattr(pubsub.router, "add_peer", mock_add_peer) + + # Attempt to handle the blacklisted peer + await pubsub._handle_new_peer(blacklisted_peer) + + # Verify that both new_stream and router.add_peer was not called + assert ( + not new_stream_called + ), "new_stream should be not be called to get hello packet" + assert ( + not router_add_peer_called + ), "Router.add_peer should not be called for blacklisted peer" + assert ( + blacklisted_peer not in pubsub.peers + ), "Blacklisted peer should not be in peers dict" + + +@pytest.mark.trio +async def test_blacklist_blocks_messages_from_blacklisted_originator(): + """Test that messages from blacklisted originator (from field) are rejected.""" + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + blacklisted_originator = pubsubs_fsub[1].my_id # Use existing peer ID + + # Add the originator to blacklist + pubsub.add_to_blacklist(blacklisted_originator) + + # Create a message with blacklisted originator + msg = make_pubsub_msg( + origin_id=blacklisted_originator, + topic_ids=[TESTING_TOPIC], + data=TESTING_DATA, + seqno=b"\x00" * 8, + ) + + # Subscribe to the topic + await pubsub.subscribe(TESTING_TOPIC) + + # Track if router.publish is called + router_publish_called = False + + async def mock_router_publish(*args, **kwargs): + nonlocal router_publish_called + router_publish_called = True + await trio.lowlevel.checkpoint() + + original_router_publish = pubsub.router.publish + pubsub.router.publish = mock_router_publish + + try: + # Attempt to push message from blacklisted originator + await pubsub.push_msg(blacklisted_originator, msg) + + # Verify message was rejected + assert ( + not router_publish_called + ), "Router.publish should not be called for blacklisted originator" + assert not pubsub._is_msg_seen( + msg + ), "Message from blacklisted originator should not be marked as seen" + + finally: + pubsub.router.publish = original_router_publish + + +@pytest.mark.trio +async def test_blacklist_allows_non_blacklisted_peers(): + """Test that non-blacklisted peers can send messages normally.""" + async with PubsubFactory.create_batch_with_floodsub(3) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + allowed_peer = pubsubs_fsub[1].my_id + blacklisted_peer = pubsubs_fsub[2].my_id + + # Blacklist one peer but not the other + pubsub.add_to_blacklist(blacklisted_peer) + + # Create messages from both peers + msg_from_allowed = make_pubsub_msg( + origin_id=allowed_peer, + topic_ids=[TESTING_TOPIC], + data=b"allowed_data", + seqno=b"\x00" * 8, + ) + + msg_from_blacklisted = make_pubsub_msg( + origin_id=blacklisted_peer, + topic_ids=[TESTING_TOPIC], + data=b"blacklisted_data", + seqno=b"\x11" * 8, + ) + + # Subscribe to the topic + sub = await pubsub.subscribe(TESTING_TOPIC) + + # Track router.publish calls + router_publish_calls = [] + + async def mock_router_publish(*args, **kwargs): + router_publish_calls.append(args) + await trio.lowlevel.checkpoint() + + original_router_publish = pubsub.router.publish + pubsub.router.publish = mock_router_publish + + try: + # Send message from allowed peer (should succeed) + await pubsub.push_msg(allowed_peer, msg_from_allowed) + + # Send message from blacklisted peer (should be rejected) + await pubsub.push_msg(allowed_peer, msg_from_blacklisted) + + # Verify only allowed message was processed + assert ( + len(router_publish_calls) == 1 + ), "Only one message should be processed" + assert pubsub._is_msg_seen( + msg_from_allowed + ), "Allowed message should be marked as seen" + assert not pubsub._is_msg_seen( + msg_from_blacklisted + ), "Blacklisted message should not be marked as seen" + + # Verify subscription received the allowed message + received_msg = await sub.get() + assert received_msg.data == b"allowed_data" + + finally: + pubsub.router.publish = original_router_publish + + +@pytest.mark.trio +async def test_blacklist_integration_with_existing_functionality(): + """Test that blacklisting works correctly with existing pubsub functionality.""" + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + other_peer = pubsubs_fsub[1].my_id + + # Test that seen messages cache still works with blacklisting + pubsub.add_to_blacklist(other_peer) + + msg = make_pubsub_msg( + origin_id=other_peer, + topic_ids=[TESTING_TOPIC], + data=TESTING_DATA, + seqno=b"\x00" * 8, + ) + + # First attempt - should be rejected due to blacklist + await pubsub.push_msg(other_peer, msg) + assert not pubsub._is_msg_seen(msg) + + # Remove from blacklist + pubsub.remove_from_blacklist(other_peer) + + # Now the message should be processed + await pubsub.subscribe(TESTING_TOPIC) + await pubsub.push_msg(other_peer, msg) + assert pubsub._is_msg_seen(msg) + + # If we try to send the same message again, it should be rejected + # due to seen cache (not blacklist) + router_publish_called = False + + async def mock_router_publish(*args, **kwargs): + nonlocal router_publish_called + router_publish_called = True + await trio.lowlevel.checkpoint() + + original_router_publish = pubsub.router.publish + pubsub.router.publish = mock_router_publish + + try: + await pubsub.push_msg(other_peer, msg) + assert ( + not router_publish_called + ), "Duplicate message should be rejected by seen cache" + finally: + pubsub.router.publish = original_router_publish + + +@pytest.mark.trio +async def test_blacklist_blocks_messages_from_blacklisted_source(): + """Test that messages from blacklisted source (forwarder) are rejected.""" + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + blacklisted_forwarder = pubsubs_fsub[1].my_id + + # Add the forwarder to blacklist + pubsub.add_to_blacklist(blacklisted_forwarder) + + # Create a message + msg = make_pubsub_msg( + origin_id=pubsubs_fsub[1].my_id, + topic_ids=[TESTING_TOPIC], + data=TESTING_DATA, + seqno=b"\x00" * 8, + ) + + # Subscribe to the topic so we can check if message is processed + await pubsub.subscribe(TESTING_TOPIC) + + # Track if router.publish is called (it shouldn't be for blacklisted forwarder) + router_publish_called = False + + async def mock_router_publish(*args, **kwargs): + nonlocal router_publish_called + router_publish_called = True + await trio.lowlevel.checkpoint() + + original_router_publish = pubsub.router.publish + pubsub.router.publish = mock_router_publish + + try: + # Attempt to push message from blacklisted forwarder + await pubsub.push_msg(blacklisted_forwarder, msg) + + # Verify message was rejected + assert ( + not router_publish_called + ), "Router.publish should not be called for blacklisted forwarder" + assert not pubsub._is_msg_seen( + msg + ), "Message from blacklisted forwarder should not be marked as seen" + + finally: + pubsub.router.publish = original_router_publish + + +@pytest.mark.trio +async def test_blacklist_tears_down_existing_connection(): + """ + Verify that if a peer is already in pubsub.peers and pubsub.peer_topics, + calling add_to_blacklist(peer_id) immediately resets its stream and + removes it from both places. + """ + # Create two pubsub instances (floodsub), so they can connect to each other + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + pubsub0, pubsub1 = pubsubs_fsub + + # 1) Connect peer1 to peer0 + await connect(pubsub0.host, pubsub1.host) + # Give handle_peer_queue some time to run + await trio.sleep(0.1) + + # After connect, pubsub0.peers should contain pubsub1.my_id + assert pubsub1.my_id in pubsub0.peers + + # 2) Manually record a subscription from peer1 under TESTING_TOPIC, + # so that peer1 shows up in pubsub0.peer_topics[TESTING_TOPIC]. + sub_msg = rpc_pb2.RPC.SubOpts(subscribe=True, topicid=TESTING_TOPIC) + pubsub0.handle_subscription(pubsub1.my_id, sub_msg) + + assert TESTING_TOPIC in pubsub0.peer_topics + assert pubsub1.my_id in pubsub0.peer_topics[TESTING_TOPIC] + + # 3) Now blacklist peer1 + pubsub0.add_to_blacklist(pubsub1.my_id) + + # Allow the asynchronous teardown task (_teardown_if_connected) to run + await trio.sleep(0.1) + + # 4a) pubsub0.peers should no longer contain peer1 + assert pubsub1.my_id not in pubsub0.peers + + # 4b) pubsub0.peer_topics[TESTING_TOPIC] should no longer contain peer1 + # (or TESTING_TOPIC may have been removed entirely if no other peers remain) + if TESTING_TOPIC in pubsub0.peer_topics: + assert pubsub1.my_id not in pubsub0.peer_topics[TESTING_TOPIC] + else: + # It’s also fine if the entire topic entry was pruned + assert TESTING_TOPIC not in pubsub0.peer_topics