implement trio queue interface

This commit is contained in:
Chih Cheng Liang
2019-11-19 18:01:29 +08:00
committed by mhchia
parent 41ff884eef
commit c55ea0e5bb
2 changed files with 44 additions and 0 deletions

19
tests/test_utils.py Normal file
View File

@ -0,0 +1,19 @@
import trio
import pytest
from libp2p.utils import TrioQueue
@pytest.mark.trio
async def test_trio_queue():
queue = TrioQueue()
async def queue_get(task_status=None):
result = await queue.get()
task_status.started(result)
async with trio.open_nursery() as nursery:
nursery.start_soon(queue.put, 123)
result = await nursery.start(queue_get)
assert result == 123