FIXME: Make TProtocol Optional[TProtocol] to keep types consistent (#770)

* FIXME: Make TProtocol Optional[TProtocol] to keep types consistent

* correct test case of test_protocol_muxer

* add newsfragment

* unit test added

---------

Co-authored-by: Manu Sheel Gupta <manusheel.edu@gmail.com>
This commit is contained in:
Jinesh Jain
2025-08-20 06:50:37 +05:30
committed by GitHub
parent e20a9a3814
commit dabb3a0962
9 changed files with 167 additions and 17 deletions

View File

@ -1,9 +1,9 @@
from collections import deque
import pytest
import trio
from libp2p.abc import (
IMultiselectCommunicator,
)
from libp2p.abc import IMultiselectCommunicator, INetStream
from libp2p.custom_types import TProtocol
from libp2p.protocol_muxer.exceptions import (
MultiselectClientError,
@ -13,6 +13,10 @@ from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
async def dummy_handler(stream: INetStream) -> None:
pass
class DummyMultiselectCommunicator(IMultiselectCommunicator):
"""
Dummy MultiSelectCommunicator to test out negotiate timmeout.
@ -31,7 +35,7 @@ class DummyMultiselectCommunicator(IMultiselectCommunicator):
@pytest.mark.trio
async def test_select_one_of_timeout():
async def test_select_one_of_timeout() -> None:
ECHO = TProtocol("/echo/1.0.0")
communicator = DummyMultiselectCommunicator()
@ -42,7 +46,7 @@ async def test_select_one_of_timeout():
@pytest.mark.trio
async def test_query_multistream_command_timeout():
async def test_query_multistream_command_timeout() -> None:
communicator = DummyMultiselectCommunicator()
client = MultiselectClient()
@ -51,9 +55,95 @@ async def test_query_multistream_command_timeout():
@pytest.mark.trio
async def test_negotiate_timeout():
async def test_negotiate_timeout() -> None:
communicator = DummyMultiselectCommunicator()
server = Multiselect()
with pytest.raises(MultiselectError, match="handshake read timeout"):
await server.negotiate(communicator, 2)
class HandshakeThenHangCommunicator(IMultiselectCommunicator):
handshaked: bool
def __init__(self) -> None:
self.handshaked = False
async def write(self, msg_str: str) -> None:
if msg_str == "/multistream/1.0.0":
self.handshaked = True
return
async def read(self) -> str:
if not self.handshaked:
return "/multistream/1.0.0"
# After handshake, hang on read.
await trio.sleep_forever()
# Should not be reached.
return ""
@pytest.mark.trio
async def test_negotiate_timeout_post_handshake() -> None:
communicator = HandshakeThenHangCommunicator()
server = Multiselect()
with pytest.raises(MultiselectError, match="handshake read timeout"):
await server.negotiate(communicator, 1)
class MockCommunicator(IMultiselectCommunicator):
def __init__(self, commands_to_read: list[str]):
self.read_queue = deque(commands_to_read)
self.written_data: list[str] = []
async def write(self, msg_str: str) -> None:
self.written_data.append(msg_str)
async def read(self) -> str:
if not self.read_queue:
raise EOFError
return self.read_queue.popleft()
@pytest.mark.trio
async def test_negotiate_empty_string_command() -> None:
# server receives an empty string, which means client wants `None` protocol.
server = Multiselect({None: dummy_handler})
# Handshake, then empty command
communicator = MockCommunicator(["/multistream/1.0.0", ""])
protocol, handler = await server.negotiate(communicator)
assert protocol is None
assert handler == dummy_handler
# Check that server sent back handshake and the protocol confirmation (empty string)
assert communicator.written_data == ["/multistream/1.0.0", ""]
@pytest.mark.trio
async def test_negotiate_with_none_handler() -> None:
# server has None handler, client sends "" to select it.
server = Multiselect({None: dummy_handler, TProtocol("/proto1"): dummy_handler})
# Handshake, then empty command
communicator = MockCommunicator(["/multistream/1.0.0", ""])
protocol, handler = await server.negotiate(communicator)
assert protocol is None
assert handler == dummy_handler
# Check written data: handshake, protocol confirmation
assert communicator.written_data == ["/multistream/1.0.0", ""]
@pytest.mark.trio
async def test_negotiate_with_none_handler_ls() -> None:
# server has None handler, client sends "ls" then empty string.
server = Multiselect({None: dummy_handler, TProtocol("/proto1"): dummy_handler})
# Handshake, ls, empty command
communicator = MockCommunicator(["/multistream/1.0.0", "ls", ""])
protocol, handler = await server.negotiate(communicator)
assert protocol is None
assert handler == dummy_handler
# Check written data: handshake, ls response, protocol confirmation
assert communicator.written_data[0] == "/multistream/1.0.0"
assert "/proto1" in communicator.written_data[1]
# Note: `ls` should not list the `None` protocol.
assert "None" not in communicator.written_data[1]
assert "\n\n" not in communicator.written_data[1]
assert communicator.written_data[2] == ""

View File

@ -159,3 +159,41 @@ async def test_get_protocols_returns_all_registered_protocols():
protocols = ms.get_protocols()
assert set(protocols) == {p1, p2, p3}
@pytest.mark.trio
async def test_negotiate_optional_tprotocol(security_protocol):
with pytest.raises(Exception):
await perform_simple_test(
None,
[None],
[None],
security_protocol,
)
@pytest.mark.trio
async def test_negotiate_optional_tprotocol_client_none_server_no_none(
security_protocol,
):
with pytest.raises(Exception):
await perform_simple_test(None, [None], [PROTOCOL_ECHO], security_protocol)
@pytest.mark.trio
async def test_negotiate_optional_tprotocol_client_none_in_list(security_protocol):
expected_selected_protocol = PROTOCOL_ECHO
await perform_simple_test(
expected_selected_protocol,
[None, PROTOCOL_ECHO],
[PROTOCOL_ECHO],
security_protocol,
)
@pytest.mark.trio
async def test_negotiate_optional_tprotocol_server_none_client_other(
security_protocol,
):
with pytest.raises(Exception):
await perform_simple_test(None, [PROTOCOL_ECHO], [None], security_protocol)