From 0ab548aee5cbf0d3b9952ea925766b225439a2b2 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 9 Sep 2019 16:58:58 +0800 Subject: [PATCH] Add the missing tests --- tests/network/__init__.py | 0 tests/network/conftest.py | 14 ++++ tests/network/test_net_stream.py | 111 +++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 tests/network/__init__.py create mode 100644 tests/network/conftest.py create mode 100644 tests/network/test_net_stream.py diff --git a/tests/network/__init__.py b/tests/network/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/network/conftest.py b/tests/network/conftest.py new file mode 100644 index 00000000..10f77918 --- /dev/null +++ b/tests/network/conftest.py @@ -0,0 +1,14 @@ +import asyncio + +import pytest + +from tests.factories import net_stream_pair_factory + + +@pytest.fixture +async def net_stream_pair(): + stream_0, host_0, stream_1, host_1 = await net_stream_pair_factory() + try: + yield stream_0, stream_1 + finally: + await asyncio.gather(*[host_0.close(), host_1.close()]) diff --git a/tests/network/test_net_stream.py b/tests/network/test_net_stream.py new file mode 100644 index 00000000..e7029125 --- /dev/null +++ b/tests/network/test_net_stream.py @@ -0,0 +1,111 @@ +import asyncio + +import pytest + +from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset +from tests.constants import MAX_READ_LEN + +DATA = b"data_123" + +# TODO: Move `muxed_stream` specific(currently we are using `MplexStream`) tests to its +# own file, after `generic_protocol_handler` is refactored out of `Mplex`. + + +@pytest.mark.asyncio +async def test_net_stream_read_write(net_stream_pair): + stream_0, stream_1 = net_stream_pair + assert ( + stream_0.protocol_id is not None + and stream_0.protocol_id == stream_1.protocol_id + ) + await stream_0.write(DATA) + assert (await stream_1.read(MAX_READ_LEN)) == DATA + + +@pytest.mark.asyncio +async def test_net_stream_read_until_eof(net_stream_pair): + read_bytes = bytearray() + stream_0, stream_1 = net_stream_pair + + async def read_until_eof(): + read_bytes.extend(await stream_1.read()) + + task = asyncio.ensure_future(read_until_eof()) + + expected_data = bytearray() + + # Test: `read` doesn't return before `close` is called. + await stream_0.write(DATA) + expected_data.extend(DATA) + await asyncio.sleep(0.01) + assert len(read_bytes) == 0 + # Test: `read` doesn't return before `close` is called. + await stream_0.write(DATA) + expected_data.extend(DATA) + await asyncio.sleep(0.01) + assert len(read_bytes) == 0 + + # Test: Close the stream, `read` returns, and receive previous sent data. + await stream_0.close() + await asyncio.sleep(0.01) + assert read_bytes == expected_data + + task.cancel() + + +@pytest.mark.asyncio +async def test_net_stream_read_after_remote_closed(net_stream_pair): + stream_0, stream_1 = net_stream_pair + assert not stream_1.muxed_stream.event_remote_closed.is_set() + await stream_0.write(DATA) + await stream_0.close() + await asyncio.sleep(0.01) + assert stream_1.muxed_stream.event_remote_closed.is_set() + assert (await stream_1.read(MAX_READ_LEN)) == DATA + with pytest.raises(StreamEOF): + await stream_1.read(MAX_READ_LEN) + + +@pytest.mark.asyncio +async def test_net_stream_read_after_local_reset(net_stream_pair): + stream_0, stream_1 = net_stream_pair + await stream_0.reset() + with pytest.raises(StreamReset): + await stream_0.read(MAX_READ_LEN) + + +@pytest.mark.asyncio +async def test_net_stream_read_after_remote_reset(net_stream_pair): + stream_0, stream_1 = net_stream_pair + await stream_0.write(DATA) + await stream_0.reset() + # Sleep to let `stream_1` receive the message. + await asyncio.sleep(0.01) + with pytest.raises(StreamReset): + await stream_1.read(MAX_READ_LEN) + + +@pytest.mark.asyncio +async def test_net_stream_write_after_local_closed(net_stream_pair): + stream_0, stream_1 = net_stream_pair + await stream_0.write(DATA) + await stream_0.close() + with pytest.raises(StreamClosed): + await stream_0.write(DATA) + + +@pytest.mark.asyncio +async def test_net_stream_write_after_local_reset(net_stream_pair): + stream_0, stream_1 = net_stream_pair + await stream_0.reset() + with pytest.raises(StreamClosed): + await stream_0.write(DATA) + + +@pytest.mark.asyncio +async def test_net_stream_write_after_remote_reset(net_stream_pair): + stream_0, stream_1 = net_stream_pair + await stream_1.reset() + await asyncio.sleep(0.01) + with pytest.raises(StreamClosed): + await stream_0.write(DATA)