Add events in MplexStream

And modify a little bit of `close` and `reset`
This commit is contained in:
mhchia
2019-09-05 18:18:08 +08:00
parent 34b02588cf
commit 96230758e4
2 changed files with 64 additions and 51 deletions

View File

@ -10,6 +10,7 @@ from libp2p.typing import TProtocol
from libp2p.utils import (
decode_uvarint_from_stream,
encode_uvarint,
encode_varint_prefixed,
read_varint_prefixed_bytes,
)
@ -34,6 +35,8 @@ class Mplex(IMuxedConn):
buffers: Dict[StreamID, "asyncio.Queue[bytes]"]
stream_queue: "asyncio.Queue[StreamID]"
next_channel_id: int
buffers_lock: asyncio.Lock
shutdown: asyncio.Event
_tasks: List["asyncio.Future[Any]"]
@ -63,6 +66,8 @@ class Mplex(IMuxedConn):
# Mapping from stream ID -> buffer of messages for that stream
self.buffers = {}
self.buffers_lock = asyncio.Lock()
self.shutdown = asyncio.Event()
self.stream_queue = asyncio.Queue()
@ -145,7 +150,7 @@ class Mplex(IMuxedConn):
self._tasks.append(asyncio.ensure_future(self.generic_protocol_handler(stream)))
async def send_message(
self, flag: HeaderTags, data: bytes, stream_id: StreamID
self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID
) -> int:
"""
sends a message over the connection
@ -154,19 +159,16 @@ class Mplex(IMuxedConn):
:param stream_id: stream the message is in
"""
# << by 3, then or with flag
header = (stream_id.channel_id << 3) | flag.value
header = encode_uvarint(header)
header = encode_uvarint((stream_id.channel_id << 3) | flag.value)
if data is None:
data_length = encode_uvarint(0)
_bytes = header + data_length
else:
data_length = encode_uvarint(len(data))
_bytes = header + data_length + data
data = b""
_bytes = header + encode_varint_prefixed(data)
return await self.write_to_stream(_bytes)
async def write_to_stream(self, _bytes: bytearray) -> int:
async def write_to_stream(self, _bytes: bytes) -> int:
"""
writes a byte array to a secured connection
:param _bytes: byte array to write
@ -199,6 +201,10 @@ class Mplex(IMuxedConn):
HeaderTags.MessageReceiver.value,
):
await self.buffers[stream_id].put(message)
# elif flag in (
# HeaderTags.CloseInitiator.value,
# HeaderTags.CloseReceiver.value
# ):
# Force context switch
await asyncio.sleep(0)