mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-07 13:40:56 +00:00
TODO: throttle on async validators (#755)
* fixed todo: throttle on async validators * added test: validate message respects concurrency limit * added newsfragment * added configurable validator semaphore in the PubSub constructor * added the concurrency-checker in the original test-validate-msg test case * separate out a _run_async_validator function * remove redundant run_async_validator
This commit is contained in:
committed by
GitHub
parent
3507531344
commit
11560f5cc9
@ -5,10 +5,12 @@ import inspect
|
||||
from typing import (
|
||||
NamedTuple,
|
||||
)
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p.custom_types import AsyncValidatorFn
|
||||
from libp2p.exceptions import (
|
||||
ValidationError,
|
||||
)
|
||||
@ -243,7 +245,37 @@ async def test_get_msg_validators():
|
||||
((False, True), (True, False), (True, True)),
|
||||
)
|
||||
@pytest.mark.trio
|
||||
async def test_validate_msg(is_topic_1_val_passed, is_topic_2_val_passed):
|
||||
async def test_validate_msg_with_throttle_condition(
|
||||
is_topic_1_val_passed, is_topic_2_val_passed
|
||||
):
|
||||
CONCURRENCY_LIMIT = 10
|
||||
|
||||
state = {
|
||||
"concurrency_counter": 0,
|
||||
"max_observed": 0,
|
||||
}
|
||||
lock = trio.Lock()
|
||||
|
||||
async def mock_run_async_validator(
|
||||
self,
|
||||
func: AsyncValidatorFn,
|
||||
msg_forwarder: ID,
|
||||
msg: rpc_pb2.Message,
|
||||
results: list[bool],
|
||||
) -> None:
|
||||
async with self._validator_semaphore:
|
||||
async with lock:
|
||||
state["concurrency_counter"] += 1
|
||||
if state["concurrency_counter"] > state["max_observed"]:
|
||||
state["max_observed"] = state["concurrency_counter"]
|
||||
|
||||
try:
|
||||
result = await func(msg_forwarder, msg)
|
||||
results.append(result)
|
||||
finally:
|
||||
async with lock:
|
||||
state["concurrency_counter"] -= 1
|
||||
|
||||
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
|
||||
|
||||
def passed_sync_validator(peer_id: ID, msg: rpc_pb2.Message) -> bool:
|
||||
@ -280,11 +312,19 @@ async def test_validate_msg(is_topic_1_val_passed, is_topic_2_val_passed):
|
||||
seqno=b"\x00" * 8,
|
||||
)
|
||||
|
||||
if is_topic_1_val_passed and is_topic_2_val_passed:
|
||||
await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg)
|
||||
else:
|
||||
with pytest.raises(ValidationError):
|
||||
with patch(
|
||||
"libp2p.pubsub.pubsub.Pubsub._run_async_validator",
|
||||
new=mock_run_async_validator,
|
||||
):
|
||||
if is_topic_1_val_passed and is_topic_2_val_passed:
|
||||
await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg)
|
||||
else:
|
||||
with pytest.raises(ValidationError):
|
||||
await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg)
|
||||
|
||||
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
|
||||
f"Max concurrency observed: {state['max_observed']}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
|
||||
Reference in New Issue
Block a user