Mplex: change message channel size to 8

To avoid infinity sized channel, and to conform to the go
implementation.
This commit is contained in:
mhchia
2020-02-05 20:31:18 +08:00
parent 64c9c48dac
commit 1fff6ad6b4
2 changed files with 36 additions and 6 deletions

View File

@ -7,6 +7,7 @@ from libp2p.stream_muxer.mplex.exceptions import (
MplexStreamEOF,
MplexStreamReset,
)
from libp2p.stream_muxer.mplex.mplex import MPLEX_MESSAGE_CHANNEL_SIZE
from libp2p.tools.constants import MAX_READ_LEN
DATA = b"data_123"
@ -19,6 +20,28 @@ async def test_mplex_stream_read_write(mplex_stream_pair):
assert (await stream_1.read(MAX_READ_LEN)) == DATA
@pytest.mark.trio
async def test_mplex_stream_full_buffer(mplex_stream_pair):
stream_0, stream_1 = mplex_stream_pair
# Test: The message channel is of size `MPLEX_MESSAGE_CHANNEL_SIZE`.
# It should be fine to read even there are already `MPLEX_MESSAGE_CHANNEL_SIZE`
# messages arriving.
for _ in range(MPLEX_MESSAGE_CHANNEL_SIZE):
await stream_0.write(DATA)
await wait_all_tasks_blocked()
# Sanity check
assert MAX_READ_LEN >= MPLEX_MESSAGE_CHANNEL_SIZE * len(DATA)
assert (await stream_1.read(MAX_READ_LEN)) == MPLEX_MESSAGE_CHANNEL_SIZE * DATA
# Test: Read after `MPLEX_MESSAGE_CHANNEL_SIZE + 1` messages has arrived, which
# exceeds the channel size. The stream should have been reset.
for _ in range(MPLEX_MESSAGE_CHANNEL_SIZE + 1):
await stream_0.write(DATA)
await wait_all_tasks_blocked()
with pytest.raises(MplexStreamReset):
await stream_1.read(MAX_READ_LEN)
@pytest.mark.trio
async def test_mplex_stream_pair_read_until_eof(mplex_stream_pair):
read_bytes = bytearray()