Conform stream_id to go-mplex

This commit is contained in:
mhchia
2019-08-28 21:43:34 +08:00
parent 9b60e1757d
commit d35b8ffc64
5 changed files with 55 additions and 46 deletions

View File

@ -3,6 +3,7 @@ import asyncio
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
from .constants import HeaderTags
from .datastructures import StreamID
class MplexStream(IMuxedStream):
@ -11,8 +12,7 @@ class MplexStream(IMuxedStream):
"""
name: str
stream_id: int
initiator: bool
stream_id: StreamID
mplex_conn: IMuxedConn
read_deadline: int
write_deadline: int
@ -22,18 +22,14 @@ class MplexStream(IMuxedStream):
_buf: bytearray
def __init__(
self, name: str, stream_id: int, initiator: bool, mplex_conn: IMuxedConn
) -> None:
def __init__(self, name: str, stream_id: StreamID, mplex_conn: IMuxedConn) -> None:
"""
create new MuxedStream in muxer
:param stream_id: stream stream id
:param initiator: boolean if this is an initiator
:param stream_id: stream id of this stream
:param mplex_conn: muxed connection of this muxed_stream
"""
self.name = name
self.stream_id = stream_id
self.initiator = initiator
self.mplex_conn = mplex_conn
self.read_deadline = None
self.write_deadline = None
@ -42,6 +38,10 @@ class MplexStream(IMuxedStream):
self.stream_lock = asyncio.Lock()
self._buf = bytearray()
@property
def is_initiator(self) -> bool:
return self.stream_id.is_initiator
async def read(self, n: int = -1) -> bytes:
"""
Read up to n bytes. Read possibly returns fewer than `n` bytes,
@ -85,7 +85,7 @@ class MplexStream(IMuxedStream):
"""
flag = (
HeaderTags.MessageInitiator
if self.initiator
if self.is_initiator
else HeaderTags.MessageReceiver
)
return await self.mplex_conn.send_message(flag, data, self.stream_id)
@ -98,7 +98,9 @@ class MplexStream(IMuxedStream):
"""
# TODO error handling with timeout
# TODO understand better how mutexes are used from go repo
flag = HeaderTags.CloseInitiator if self.initiator else HeaderTags.CloseReceiver
flag = (
HeaderTags.CloseInitiator if self.is_initiator else HeaderTags.CloseReceiver
)
await self.mplex_conn.send_message(flag, None, self.stream_id)
remote_lock = False
@ -131,7 +133,7 @@ class MplexStream(IMuxedStream):
if not self.remote_closed:
flag = (
HeaderTags.ResetInitiator
if self.initiator
if self.is_initiator
else HeaderTags.ResetReceiver
)
await self.mplex_conn.send_message(flag, None, self.stream_id)