mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Merge branch 'main' into add-last-publish
This commit is contained in:
@ -122,6 +122,9 @@ class Pubsub(Service, IPubsub):
|
|||||||
strict_signing: bool
|
strict_signing: bool
|
||||||
sign_key: PrivateKey
|
sign_key: PrivateKey
|
||||||
|
|
||||||
|
# Set of blacklisted peer IDs
|
||||||
|
blacklisted_peers: set[ID]
|
||||||
|
|
||||||
event_handle_peer_queue_started: trio.Event
|
event_handle_peer_queue_started: trio.Event
|
||||||
event_handle_dead_peer_queue_started: trio.Event
|
event_handle_dead_peer_queue_started: trio.Event
|
||||||
|
|
||||||
@ -201,6 +204,9 @@ class Pubsub(Service, IPubsub):
|
|||||||
|
|
||||||
self.counter = int(time.time())
|
self.counter = int(time.time())
|
||||||
|
|
||||||
|
# Set of blacklisted peer IDs
|
||||||
|
self.blacklisted_peers = set()
|
||||||
|
|
||||||
self.event_handle_peer_queue_started = trio.Event()
|
self.event_handle_peer_queue_started = trio.Event()
|
||||||
self.event_handle_dead_peer_queue_started = trio.Event()
|
self.event_handle_dead_peer_queue_started = trio.Event()
|
||||||
|
|
||||||
@ -320,6 +326,82 @@ class Pubsub(Service, IPubsub):
|
|||||||
if topic in self.topic_validators
|
if topic in self.topic_validators
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def add_to_blacklist(self, peer_id: ID) -> None:
|
||||||
|
"""
|
||||||
|
Add a peer to the blacklist.
|
||||||
|
When a peer is blacklisted:
|
||||||
|
- Any existing connection to that peer is immediately closed and removed
|
||||||
|
- The peer is removed from all topic subscription mappings
|
||||||
|
- Future connection attempts from this peer will be rejected
|
||||||
|
- Messages forwarded by or originating from this peer will be dropped
|
||||||
|
- The peer will not be able to participate in pubsub communication
|
||||||
|
|
||||||
|
:param peer_id: the peer ID to blacklist
|
||||||
|
"""
|
||||||
|
self.blacklisted_peers.add(peer_id)
|
||||||
|
logger.debug("Added peer %s to blacklist", peer_id)
|
||||||
|
self.manager.run_task(self._teardown_if_connected, peer_id)
|
||||||
|
|
||||||
|
async def _teardown_if_connected(self, peer_id: ID) -> None:
|
||||||
|
"""Close their stream and remove them if connected"""
|
||||||
|
stream = self.peers.get(peer_id)
|
||||||
|
if stream is not None:
|
||||||
|
try:
|
||||||
|
await stream.reset()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
del self.peers[peer_id]
|
||||||
|
# Also remove from any subscription maps:
|
||||||
|
for _topic, peerset in self.peer_topics.items():
|
||||||
|
if peer_id in peerset:
|
||||||
|
peerset.discard(peer_id)
|
||||||
|
|
||||||
|
def remove_from_blacklist(self, peer_id: ID) -> None:
|
||||||
|
"""
|
||||||
|
Remove a peer from the blacklist.
|
||||||
|
Once removed from the blacklist:
|
||||||
|
- The peer can establish new connections to this node
|
||||||
|
- Messages from this peer will be processed normally
|
||||||
|
- The peer can participate in topic subscriptions and message forwarding
|
||||||
|
|
||||||
|
:param peer_id: the peer ID to remove from blacklist
|
||||||
|
"""
|
||||||
|
self.blacklisted_peers.discard(peer_id)
|
||||||
|
logger.debug("Removed peer %s from blacklist", peer_id)
|
||||||
|
|
||||||
|
def is_peer_blacklisted(self, peer_id: ID) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a peer is blacklisted.
|
||||||
|
|
||||||
|
:param peer_id: the peer ID to check
|
||||||
|
:return: True if peer is blacklisted, False otherwise
|
||||||
|
"""
|
||||||
|
return peer_id in self.blacklisted_peers
|
||||||
|
|
||||||
|
def clear_blacklist(self) -> None:
|
||||||
|
"""
|
||||||
|
Clear all peers from the blacklist.
|
||||||
|
This removes all blacklist restrictions, allowing previously blacklisted
|
||||||
|
peers to:
|
||||||
|
- Establish new connections
|
||||||
|
- Send and forward messages
|
||||||
|
- Participate in topic subscriptions
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.blacklisted_peers.clear()
|
||||||
|
logger.debug("Cleared all peers from blacklist")
|
||||||
|
|
||||||
|
def get_blacklisted_peers(self) -> set[ID]:
|
||||||
|
"""
|
||||||
|
Get a copy of the current blacklisted peers.
|
||||||
|
Returns a snapshot of all currently blacklisted peer IDs. These peers
|
||||||
|
are completely isolated from pubsub communication - their connections
|
||||||
|
are rejected and their messages are dropped.
|
||||||
|
|
||||||
|
:return: a set containing all blacklisted peer IDs
|
||||||
|
"""
|
||||||
|
return self.blacklisted_peers.copy()
|
||||||
|
|
||||||
async def stream_handler(self, stream: INetStream) -> None:
|
async def stream_handler(self, stream: INetStream) -> None:
|
||||||
"""
|
"""
|
||||||
Stream handler for pubsub. Gets invoked whenever a new stream is
|
Stream handler for pubsub. Gets invoked whenever a new stream is
|
||||||
@ -346,6 +428,10 @@ class Pubsub(Service, IPubsub):
|
|||||||
await self.event_handle_dead_peer_queue_started.wait()
|
await self.event_handle_dead_peer_queue_started.wait()
|
||||||
|
|
||||||
async def _handle_new_peer(self, peer_id: ID) -> None:
|
async def _handle_new_peer(self, peer_id: ID) -> None:
|
||||||
|
if self.is_peer_blacklisted(peer_id):
|
||||||
|
logger.debug("Rejecting blacklisted peer %s", peer_id)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
|
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
|
||||||
except SwarmException as error:
|
except SwarmException as error:
|
||||||
@ -359,7 +445,6 @@ class Pubsub(Service, IPubsub):
|
|||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
logger.debug("Fail to add new peer %s: stream closed", peer_id)
|
logger.debug("Fail to add new peer %s: stream closed", peer_id)
|
||||||
return
|
return
|
||||||
# TODO: Check if the peer in black list.
|
|
||||||
try:
|
try:
|
||||||
self.router.add_peer(peer_id, stream.get_protocol())
|
self.router.add_peer(peer_id, stream.get_protocol())
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
@ -609,9 +694,20 @@ class Pubsub(Service, IPubsub):
|
|||||||
"""
|
"""
|
||||||
logger.debug("attempting to publish message %s", msg)
|
logger.debug("attempting to publish message %s", msg)
|
||||||
|
|
||||||
# TODO: Check if the `source` is in the blacklist. If yes, reject.
|
# Check if the message forwarder (source) is in the blacklist. If yes, reject.
|
||||||
|
if self.is_peer_blacklisted(msg_forwarder):
|
||||||
|
logger.debug(
|
||||||
|
"Rejecting message from blacklisted source peer %s", msg_forwarder
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# TODO: Check if the `from` is in the blacklist. If yes, reject.
|
# Check if the message originator (from) is in the blacklist. If yes, reject.
|
||||||
|
msg_from_peer = ID(msg.from_id)
|
||||||
|
if self.is_peer_blacklisted(msg_from_peer):
|
||||||
|
logger.debug(
|
||||||
|
"Rejecting message from blacklisted originator peer %s", msg_from_peer
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# If the message is processed before, return(i.e., don't further process the message) # noqa: E501
|
# If the message is processed before, return(i.e., don't further process the message) # noqa: E501
|
||||||
if self._is_msg_seen(msg):
|
if self._is_msg_seen(msg):
|
||||||
|
|||||||
1
newsfragments/641.feature.rst
Normal file
1
newsfragments/641.feature.rst
Normal file
@ -0,0 +1 @@
|
|||||||
|
implement blacklist management for `pubsub.Pubsub` with methods to get, add, remove, check, and clear blacklisted peer IDs.
|
||||||
@ -702,3 +702,369 @@ async def test_strict_signing_failed_validation(monkeypatch):
|
|||||||
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
|
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
|
||||||
await trio.sleep(0.01)
|
await trio.sleep(0.01)
|
||||||
assert event.is_set()
|
assert event.is_set()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_basic_operations():
|
||||||
|
"""Test basic blacklist operations: add, remove, check, clear."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
|
||||||
|
# Create test peer IDs
|
||||||
|
peer1 = IDFactory()
|
||||||
|
peer2 = IDFactory()
|
||||||
|
peer3 = IDFactory()
|
||||||
|
|
||||||
|
# Initially no peers should be blacklisted
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 0
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer1)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer2)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer3)
|
||||||
|
|
||||||
|
# Add peers to blacklist
|
||||||
|
pubsub.add_to_blacklist(peer1)
|
||||||
|
pubsub.add_to_blacklist(peer2)
|
||||||
|
|
||||||
|
# Check blacklist state
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 2
|
||||||
|
assert pubsub.is_peer_blacklisted(peer1)
|
||||||
|
assert pubsub.is_peer_blacklisted(peer2)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer3)
|
||||||
|
|
||||||
|
# Remove one peer from blacklist
|
||||||
|
pubsub.remove_from_blacklist(peer1)
|
||||||
|
|
||||||
|
# Check state after removal
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 1
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer1)
|
||||||
|
assert pubsub.is_peer_blacklisted(peer2)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer3)
|
||||||
|
|
||||||
|
# Add peer3 and then clear all
|
||||||
|
pubsub.add_to_blacklist(peer3)
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 2
|
||||||
|
|
||||||
|
pubsub.clear_blacklist()
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 0
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer1)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer2)
|
||||||
|
assert not pubsub.is_peer_blacklisted(peer3)
|
||||||
|
|
||||||
|
# Test duplicate additions (should not increase size)
|
||||||
|
pubsub.add_to_blacklist(peer1)
|
||||||
|
pubsub.add_to_blacklist(peer1)
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 1
|
||||||
|
|
||||||
|
# Test removing non-blacklisted peer (should not cause errors)
|
||||||
|
pubsub.remove_from_blacklist(peer2)
|
||||||
|
assert len(pubsub.get_blacklisted_peers()) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_blocks_new_peer_connections(monkeypatch):
|
||||||
|
"""Test that blacklisted peers are rejected when trying to connect."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
|
||||||
|
# Create a blacklisted peer ID
|
||||||
|
blacklisted_peer = IDFactory()
|
||||||
|
|
||||||
|
# Add peer to blacklist
|
||||||
|
pubsub.add_to_blacklist(blacklisted_peer)
|
||||||
|
|
||||||
|
new_stream_called = False
|
||||||
|
|
||||||
|
async def mock_new_stream(*args, **kwargs):
|
||||||
|
nonlocal new_stream_called
|
||||||
|
new_stream_called = True
|
||||||
|
# Create a mock stream
|
||||||
|
from unittest.mock import (
|
||||||
|
AsyncMock,
|
||||||
|
Mock,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_stream = Mock()
|
||||||
|
mock_stream.write = AsyncMock()
|
||||||
|
mock_stream.reset = AsyncMock()
|
||||||
|
mock_stream.get_protocol = Mock(return_value="test_protocol")
|
||||||
|
return mock_stream
|
||||||
|
|
||||||
|
router_add_peer_called = False
|
||||||
|
|
||||||
|
def mock_add_peer(*args, **kwargs):
|
||||||
|
nonlocal router_add_peer_called
|
||||||
|
router_add_peer_called = True
|
||||||
|
|
||||||
|
with monkeypatch.context() as m:
|
||||||
|
m.setattr(pubsub.host, "new_stream", mock_new_stream)
|
||||||
|
m.setattr(pubsub.router, "add_peer", mock_add_peer)
|
||||||
|
|
||||||
|
# Attempt to handle the blacklisted peer
|
||||||
|
await pubsub._handle_new_peer(blacklisted_peer)
|
||||||
|
|
||||||
|
# Verify that both new_stream and router.add_peer was not called
|
||||||
|
assert (
|
||||||
|
not new_stream_called
|
||||||
|
), "new_stream should be not be called to get hello packet"
|
||||||
|
assert (
|
||||||
|
not router_add_peer_called
|
||||||
|
), "Router.add_peer should not be called for blacklisted peer"
|
||||||
|
assert (
|
||||||
|
blacklisted_peer not in pubsub.peers
|
||||||
|
), "Blacklisted peer should not be in peers dict"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_blocks_messages_from_blacklisted_originator():
|
||||||
|
"""Test that messages from blacklisted originator (from field) are rejected."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
blacklisted_originator = pubsubs_fsub[1].my_id # Use existing peer ID
|
||||||
|
|
||||||
|
# Add the originator to blacklist
|
||||||
|
pubsub.add_to_blacklist(blacklisted_originator)
|
||||||
|
|
||||||
|
# Create a message with blacklisted originator
|
||||||
|
msg = make_pubsub_msg(
|
||||||
|
origin_id=blacklisted_originator,
|
||||||
|
topic_ids=[TESTING_TOPIC],
|
||||||
|
data=TESTING_DATA,
|
||||||
|
seqno=b"\x00" * 8,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribe to the topic
|
||||||
|
await pubsub.subscribe(TESTING_TOPIC)
|
||||||
|
|
||||||
|
# Track if router.publish is called
|
||||||
|
router_publish_called = False
|
||||||
|
|
||||||
|
async def mock_router_publish(*args, **kwargs):
|
||||||
|
nonlocal router_publish_called
|
||||||
|
router_publish_called = True
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
original_router_publish = pubsub.router.publish
|
||||||
|
pubsub.router.publish = mock_router_publish
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Attempt to push message from blacklisted originator
|
||||||
|
await pubsub.push_msg(blacklisted_originator, msg)
|
||||||
|
|
||||||
|
# Verify message was rejected
|
||||||
|
assert (
|
||||||
|
not router_publish_called
|
||||||
|
), "Router.publish should not be called for blacklisted originator"
|
||||||
|
assert not pubsub._is_msg_seen(
|
||||||
|
msg
|
||||||
|
), "Message from blacklisted originator should not be marked as seen"
|
||||||
|
|
||||||
|
finally:
|
||||||
|
pubsub.router.publish = original_router_publish
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_allows_non_blacklisted_peers():
|
||||||
|
"""Test that non-blacklisted peers can send messages normally."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(3) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
allowed_peer = pubsubs_fsub[1].my_id
|
||||||
|
blacklisted_peer = pubsubs_fsub[2].my_id
|
||||||
|
|
||||||
|
# Blacklist one peer but not the other
|
||||||
|
pubsub.add_to_blacklist(blacklisted_peer)
|
||||||
|
|
||||||
|
# Create messages from both peers
|
||||||
|
msg_from_allowed = make_pubsub_msg(
|
||||||
|
origin_id=allowed_peer,
|
||||||
|
topic_ids=[TESTING_TOPIC],
|
||||||
|
data=b"allowed_data",
|
||||||
|
seqno=b"\x00" * 8,
|
||||||
|
)
|
||||||
|
|
||||||
|
msg_from_blacklisted = make_pubsub_msg(
|
||||||
|
origin_id=blacklisted_peer,
|
||||||
|
topic_ids=[TESTING_TOPIC],
|
||||||
|
data=b"blacklisted_data",
|
||||||
|
seqno=b"\x11" * 8,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribe to the topic
|
||||||
|
sub = await pubsub.subscribe(TESTING_TOPIC)
|
||||||
|
|
||||||
|
# Track router.publish calls
|
||||||
|
router_publish_calls = []
|
||||||
|
|
||||||
|
async def mock_router_publish(*args, **kwargs):
|
||||||
|
router_publish_calls.append(args)
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
original_router_publish = pubsub.router.publish
|
||||||
|
pubsub.router.publish = mock_router_publish
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Send message from allowed peer (should succeed)
|
||||||
|
await pubsub.push_msg(allowed_peer, msg_from_allowed)
|
||||||
|
|
||||||
|
# Send message from blacklisted peer (should be rejected)
|
||||||
|
await pubsub.push_msg(allowed_peer, msg_from_blacklisted)
|
||||||
|
|
||||||
|
# Verify only allowed message was processed
|
||||||
|
assert (
|
||||||
|
len(router_publish_calls) == 1
|
||||||
|
), "Only one message should be processed"
|
||||||
|
assert pubsub._is_msg_seen(
|
||||||
|
msg_from_allowed
|
||||||
|
), "Allowed message should be marked as seen"
|
||||||
|
assert not pubsub._is_msg_seen(
|
||||||
|
msg_from_blacklisted
|
||||||
|
), "Blacklisted message should not be marked as seen"
|
||||||
|
|
||||||
|
# Verify subscription received the allowed message
|
||||||
|
received_msg = await sub.get()
|
||||||
|
assert received_msg.data == b"allowed_data"
|
||||||
|
|
||||||
|
finally:
|
||||||
|
pubsub.router.publish = original_router_publish
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_integration_with_existing_functionality():
|
||||||
|
"""Test that blacklisting works correctly with existing pubsub functionality."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
other_peer = pubsubs_fsub[1].my_id
|
||||||
|
|
||||||
|
# Test that seen messages cache still works with blacklisting
|
||||||
|
pubsub.add_to_blacklist(other_peer)
|
||||||
|
|
||||||
|
msg = make_pubsub_msg(
|
||||||
|
origin_id=other_peer,
|
||||||
|
topic_ids=[TESTING_TOPIC],
|
||||||
|
data=TESTING_DATA,
|
||||||
|
seqno=b"\x00" * 8,
|
||||||
|
)
|
||||||
|
|
||||||
|
# First attempt - should be rejected due to blacklist
|
||||||
|
await pubsub.push_msg(other_peer, msg)
|
||||||
|
assert not pubsub._is_msg_seen(msg)
|
||||||
|
|
||||||
|
# Remove from blacklist
|
||||||
|
pubsub.remove_from_blacklist(other_peer)
|
||||||
|
|
||||||
|
# Now the message should be processed
|
||||||
|
await pubsub.subscribe(TESTING_TOPIC)
|
||||||
|
await pubsub.push_msg(other_peer, msg)
|
||||||
|
assert pubsub._is_msg_seen(msg)
|
||||||
|
|
||||||
|
# If we try to send the same message again, it should be rejected
|
||||||
|
# due to seen cache (not blacklist)
|
||||||
|
router_publish_called = False
|
||||||
|
|
||||||
|
async def mock_router_publish(*args, **kwargs):
|
||||||
|
nonlocal router_publish_called
|
||||||
|
router_publish_called = True
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
original_router_publish = pubsub.router.publish
|
||||||
|
pubsub.router.publish = mock_router_publish
|
||||||
|
|
||||||
|
try:
|
||||||
|
await pubsub.push_msg(other_peer, msg)
|
||||||
|
assert (
|
||||||
|
not router_publish_called
|
||||||
|
), "Duplicate message should be rejected by seen cache"
|
||||||
|
finally:
|
||||||
|
pubsub.router.publish = original_router_publish
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_blocks_messages_from_blacklisted_source():
|
||||||
|
"""Test that messages from blacklisted source (forwarder) are rejected."""
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
blacklisted_forwarder = pubsubs_fsub[1].my_id
|
||||||
|
|
||||||
|
# Add the forwarder to blacklist
|
||||||
|
pubsub.add_to_blacklist(blacklisted_forwarder)
|
||||||
|
|
||||||
|
# Create a message
|
||||||
|
msg = make_pubsub_msg(
|
||||||
|
origin_id=pubsubs_fsub[1].my_id,
|
||||||
|
topic_ids=[TESTING_TOPIC],
|
||||||
|
data=TESTING_DATA,
|
||||||
|
seqno=b"\x00" * 8,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribe to the topic so we can check if message is processed
|
||||||
|
await pubsub.subscribe(TESTING_TOPIC)
|
||||||
|
|
||||||
|
# Track if router.publish is called (it shouldn't be for blacklisted forwarder)
|
||||||
|
router_publish_called = False
|
||||||
|
|
||||||
|
async def mock_router_publish(*args, **kwargs):
|
||||||
|
nonlocal router_publish_called
|
||||||
|
router_publish_called = True
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
original_router_publish = pubsub.router.publish
|
||||||
|
pubsub.router.publish = mock_router_publish
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Attempt to push message from blacklisted forwarder
|
||||||
|
await pubsub.push_msg(blacklisted_forwarder, msg)
|
||||||
|
|
||||||
|
# Verify message was rejected
|
||||||
|
assert (
|
||||||
|
not router_publish_called
|
||||||
|
), "Router.publish should not be called for blacklisted forwarder"
|
||||||
|
assert not pubsub._is_msg_seen(
|
||||||
|
msg
|
||||||
|
), "Message from blacklisted forwarder should not be marked as seen"
|
||||||
|
|
||||||
|
finally:
|
||||||
|
pubsub.router.publish = original_router_publish
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_blacklist_tears_down_existing_connection():
|
||||||
|
"""
|
||||||
|
Verify that if a peer is already in pubsub.peers and pubsub.peer_topics,
|
||||||
|
calling add_to_blacklist(peer_id) immediately resets its stream and
|
||||||
|
removes it from both places.
|
||||||
|
"""
|
||||||
|
# Create two pubsub instances (floodsub), so they can connect to each other
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
|
||||||
|
pubsub0, pubsub1 = pubsubs_fsub
|
||||||
|
|
||||||
|
# 1) Connect peer1 to peer0
|
||||||
|
await connect(pubsub0.host, pubsub1.host)
|
||||||
|
# Give handle_peer_queue some time to run
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# After connect, pubsub0.peers should contain pubsub1.my_id
|
||||||
|
assert pubsub1.my_id in pubsub0.peers
|
||||||
|
|
||||||
|
# 2) Manually record a subscription from peer1 under TESTING_TOPIC,
|
||||||
|
# so that peer1 shows up in pubsub0.peer_topics[TESTING_TOPIC].
|
||||||
|
sub_msg = rpc_pb2.RPC.SubOpts(subscribe=True, topicid=TESTING_TOPIC)
|
||||||
|
pubsub0.handle_subscription(pubsub1.my_id, sub_msg)
|
||||||
|
|
||||||
|
assert TESTING_TOPIC in pubsub0.peer_topics
|
||||||
|
assert pubsub1.my_id in pubsub0.peer_topics[TESTING_TOPIC]
|
||||||
|
|
||||||
|
# 3) Now blacklist peer1
|
||||||
|
pubsub0.add_to_blacklist(pubsub1.my_id)
|
||||||
|
|
||||||
|
# Allow the asynchronous teardown task (_teardown_if_connected) to run
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# 4a) pubsub0.peers should no longer contain peer1
|
||||||
|
assert pubsub1.my_id not in pubsub0.peers
|
||||||
|
|
||||||
|
# 4b) pubsub0.peer_topics[TESTING_TOPIC] should no longer contain peer1
|
||||||
|
# (or TESTING_TOPIC may have been removed entirely if no other peers remain)
|
||||||
|
if TESTING_TOPIC in pubsub0.peer_topics:
|
||||||
|
assert pubsub1.my_id not in pubsub0.peer_topics[TESTING_TOPIC]
|
||||||
|
else:
|
||||||
|
# It’s also fine if the entire topic entry was pruned
|
||||||
|
assert TESTING_TOPIC not in pubsub0.peer_topics
|
||||||
|
|||||||
@ -32,18 +32,25 @@ class BaseInteractiveProcess(AbstractInterativeProcess):
|
|||||||
|
|
||||||
async def wait_until_ready(self) -> None:
|
async def wait_until_ready(self) -> None:
|
||||||
patterns_occurred = {pat: False for pat in self.patterns}
|
patterns_occurred = {pat: False for pat in self.patterns}
|
||||||
|
buffers = {pat: bytearray() for pat in self.patterns}
|
||||||
|
|
||||||
async def read_from_daemon_and_check() -> None:
|
async def read_from_daemon_and_check() -> None:
|
||||||
async for data in self.proc.stdout:
|
async for data in self.proc.stdout:
|
||||||
# TODO: It takes O(n^2), which is quite bad.
|
|
||||||
# But it should succeed in a few seconds.
|
|
||||||
self.bytes_read.extend(data)
|
self.bytes_read.extend(data)
|
||||||
for pat, occurred in patterns_occurred.items():
|
for pat, occurred in patterns_occurred.items():
|
||||||
if occurred:
|
if occurred:
|
||||||
continue
|
continue
|
||||||
if pat in self.bytes_read:
|
|
||||||
|
# Check if pattern is in new data or spans across chunks
|
||||||
|
buf = buffers[pat]
|
||||||
|
buf.extend(data)
|
||||||
|
if pat in buf:
|
||||||
patterns_occurred[pat] = True
|
patterns_occurred[pat] = True
|
||||||
if all([value for value in patterns_occurred.values()]):
|
else:
|
||||||
|
keep = min(len(pat) - 1, len(buf))
|
||||||
|
buffers[pat] = buf[-keep:] if keep > 0 else bytearray()
|
||||||
|
|
||||||
|
if all(patterns_occurred.values()):
|
||||||
return
|
return
|
||||||
|
|
||||||
with trio.fail_after(TIMEOUT_DURATION):
|
with trio.fail_after(TIMEOUT_DURATION):
|
||||||
|
|||||||
Reference in New Issue
Block a user