mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Run black and isort w/ the new config
This commit is contained in:
@ -66,7 +66,9 @@ class BasicHost(IHost):
|
||||
addrs.append(addr.encapsulate(p2p_part))
|
||||
return addrs
|
||||
|
||||
def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool:
|
||||
def set_stream_handler(
|
||||
self, protocol_id: TProtocol, stream_handler: StreamHandlerFn
|
||||
) -> bool:
|
||||
"""
|
||||
set stream handler for host
|
||||
:param protocol_id: protocol id used on stream
|
||||
@ -77,7 +79,9 @@ class BasicHost(IHost):
|
||||
|
||||
# protocol_id can be a list of protocol_ids
|
||||
# stream will decide which protocol_id to run on
|
||||
async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream:
|
||||
async def new_stream(
|
||||
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
|
||||
) -> INetStream:
|
||||
"""
|
||||
:param peer_id: peer_id that host is connecting
|
||||
:param protocol_id: protocol id that stream runs on
|
||||
|
||||
@ -37,7 +37,9 @@ class IHost(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool:
|
||||
def set_stream_handler(
|
||||
self, protocol_id: TProtocol, stream_handler: StreamHandlerFn
|
||||
) -> bool:
|
||||
"""
|
||||
set stream handler for host
|
||||
:param protocol_id: protocol id used on stream
|
||||
@ -48,7 +50,9 @@ class IHost(ABC):
|
||||
# protocol_id can be a list of protocol_ids
|
||||
# stream will decide which protocol_id to run on
|
||||
@abstractmethod
|
||||
async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream:
|
||||
async def new_stream(
|
||||
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
|
||||
) -> INetStream:
|
||||
"""
|
||||
:param peer_id: peer_id that host is connecting
|
||||
:param protocol_ids: protocol ids that stream can run on
|
||||
|
||||
@ -114,7 +114,9 @@ class ValueSpiderCrawl(SpiderCrawl):
|
||||
"""
|
||||
value_counts = Counter(values)
|
||||
if len(value_counts) != 1:
|
||||
log.warning("Got multiple values for key %i: %s", self.node.xor_id, str(values))
|
||||
log.warning(
|
||||
"Got multiple values for key %i: %s", self.node.xor_id, str(values)
|
||||
)
|
||||
value = value_counts.most_common(1)[0][0]
|
||||
|
||||
peer = self.nearest_without_value.popleft()
|
||||
|
||||
@ -49,7 +49,9 @@ class KadPeerInfo(PeerInfo):
|
||||
|
||||
def encode(self):
|
||||
return (
|
||||
str(self.peer_id_bytes) + "\n" + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
|
||||
str(self.peer_id_bytes)
|
||||
+ "\n"
|
||||
+ str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
|
||||
)
|
||||
|
||||
|
||||
@ -137,11 +139,17 @@ class KadPeerHeap:
|
||||
|
||||
|
||||
def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None):
|
||||
node_id = ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255)))
|
||||
node_id = (
|
||||
ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255)))
|
||||
)
|
||||
peer_data = None
|
||||
if sender_ip and sender_port:
|
||||
peer_data = PeerData()
|
||||
addr = [Multiaddr("/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port))]
|
||||
addr = [
|
||||
Multiaddr(
|
||||
"/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port)
|
||||
)
|
||||
]
|
||||
peer_data.add_addrs(addr)
|
||||
|
||||
return KadPeerInfo(node_id, peer_data)
|
||||
|
||||
@ -62,7 +62,9 @@ class KademliaServer:
|
||||
Provide interface="::" to accept ipv6 address
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
listen = loop.create_datagram_endpoint(self._create_protocol, local_addr=(interface, port))
|
||||
listen = loop.create_datagram_endpoint(
|
||||
self._create_protocol, local_addr=(interface, port)
|
||||
)
|
||||
log.info("Node %i listening on %s:%i", self.node.xor_id, interface, port)
|
||||
self.transport, self.protocol = await listen
|
||||
# finally, schedule refreshing table
|
||||
@ -83,7 +85,9 @@ class KademliaServer:
|
||||
for node_id in self.protocol.get_refresh_ids():
|
||||
node = create_kad_peerinfo(node_id)
|
||||
nearest = self.protocol.router.find_neighbors(node, self.alpha)
|
||||
spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
|
||||
spider = NodeSpiderCrawl(
|
||||
self.protocol, node, nearest, self.ksize, self.alpha
|
||||
)
|
||||
results.append(spider.find())
|
||||
|
||||
# do our crawling
|
||||
@ -118,7 +122,9 @@ class KademliaServer:
|
||||
cos = list(map(self.bootstrap_node, addrs))
|
||||
gathered = await asyncio.gather(*cos)
|
||||
nodes = [node for node in gathered if node is not None]
|
||||
spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha)
|
||||
spider = NodeSpiderCrawl(
|
||||
self.protocol, self.node, nodes, self.ksize, self.alpha
|
||||
)
|
||||
return await spider.find()
|
||||
|
||||
async def bootstrap_node(self, addr):
|
||||
|
||||
@ -50,7 +50,9 @@ class KademliaProtocol(RPCProtocol):
|
||||
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
|
||||
|
||||
self.welcome_if_new(source)
|
||||
log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value)
|
||||
log.debug(
|
||||
"got a store request from %s, storing '%s'='%s'", sender, key.hex(), value
|
||||
)
|
||||
self.storage[key] = value
|
||||
return True
|
||||
|
||||
@ -80,7 +82,9 @@ class KademliaProtocol(RPCProtocol):
|
||||
we store a map of content_id to peer_id (non xor)
|
||||
"""
|
||||
if nodeid == provider_id:
|
||||
log.info("adding provider %s for key %s in local table", provider_id, str(key))
|
||||
log.info(
|
||||
"adding provider %s for key %s in local table", provider_id, str(key)
|
||||
)
|
||||
self.storage[key] = provider_id
|
||||
return True
|
||||
return False
|
||||
@ -131,7 +135,9 @@ class KademliaProtocol(RPCProtocol):
|
||||
|
||||
async def call_add_provider(self, node_to_ask, key, provider_id):
|
||||
address = (node_to_ask.ip, node_to_ask.port)
|
||||
result = await self.add_provider(address, self.source_node.peer_id_bytes, key, provider_id)
|
||||
result = await self.add_provider(
|
||||
address, self.source_node.peer_id_bytes, key, provider_id
|
||||
)
|
||||
|
||||
return self.handle_call_response(result, node_to_ask)
|
||||
|
||||
|
||||
@ -38,7 +38,9 @@ class INetwork(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool:
|
||||
def set_stream_handler(
|
||||
self, protocol_id: TProtocol, stream_handler: StreamHandlerFn
|
||||
) -> bool:
|
||||
"""
|
||||
:param protocol_id: protocol id used on stream
|
||||
:param stream_handler: a stream handler instance
|
||||
@ -46,7 +48,9 @@ class INetwork(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream:
|
||||
async def new_stream(
|
||||
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
|
||||
) -> INetStream:
|
||||
"""
|
||||
:param peer_id: peer_id of destination
|
||||
:param protocol_ids: available protocol ids to use for stream
|
||||
|
||||
@ -68,7 +68,9 @@ class Swarm(INetwork):
|
||||
def get_peer_id(self) -> ID:
|
||||
return self.self_id
|
||||
|
||||
def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool:
|
||||
def set_stream_handler(
|
||||
self, protocol_id: TProtocol, stream_handler: StreamHandlerFn
|
||||
) -> bool:
|
||||
"""
|
||||
:param protocol_id: protocol id used on stream
|
||||
:param stream_handler: a stream handler instance
|
||||
@ -121,7 +123,9 @@ class Swarm(INetwork):
|
||||
|
||||
return muxed_conn
|
||||
|
||||
async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> NetStream:
|
||||
async def new_stream(
|
||||
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
|
||||
) -> NetStream:
|
||||
"""
|
||||
:param peer_id: peer_id of destination
|
||||
:param protocol_id: protocol id
|
||||
@ -196,7 +200,9 @@ class Swarm(INetwork):
|
||||
|
||||
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
|
||||
# the conn and then mux the conn
|
||||
secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, False)
|
||||
secured_conn = await self.upgrader.upgrade_security(
|
||||
raw_conn, peer_id, False
|
||||
)
|
||||
muxed_conn = self.upgrader.upgrade_connection(
|
||||
secured_conn, self.generic_protocol_handler, peer_id
|
||||
)
|
||||
|
||||
@ -25,7 +25,9 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
|
||||
|
||||
parts = addr.split()
|
||||
if not parts:
|
||||
raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`")
|
||||
raise InvalidAddrError(
|
||||
f"`parts`={parts} should at least have a protocol `P_P2P`"
|
||||
)
|
||||
|
||||
p2p_part = parts[-1]
|
||||
last_protocol_code = p2p_part.protocols()[0].code
|
||||
|
||||
@ -30,7 +30,9 @@ class Multiselect(IMultiselectMuxer):
|
||||
"""
|
||||
self.handlers[protocol] = handler
|
||||
|
||||
async def negotiate(self, stream: NegotiableTransport) -> Tuple[TProtocol, StreamHandlerFn]:
|
||||
async def negotiate(
|
||||
self, stream: NegotiableTransport
|
||||
) -> Tuple[TProtocol, StreamHandlerFn]:
|
||||
"""
|
||||
Negotiate performs protocol selection
|
||||
:param stream: stream to negotiate on
|
||||
|
||||
@ -39,7 +39,9 @@ class MultiselectClient(IMultiselectClient):
|
||||
|
||||
# Handshake succeeded if this point is reached
|
||||
|
||||
async def select_protocol_or_fail(self, protocol: TProtocol, stream: IMuxedStream) -> TProtocol:
|
||||
async def select_protocol_or_fail(
|
||||
self, protocol: TProtocol, stream: IMuxedStream
|
||||
) -> TProtocol:
|
||||
"""
|
||||
Send message to multiselect selecting protocol
|
||||
and fail if multiselect does not return same protocol
|
||||
|
||||
@ -12,7 +12,9 @@ class IMultiselectClient(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def select_protocol_or_fail(self, protocol: TProtocol, stream: IMuxedStream) -> TProtocol:
|
||||
async def select_protocol_or_fail(
|
||||
self, protocol: TProtocol, stream: IMuxedStream
|
||||
) -> TProtocol:
|
||||
"""
|
||||
Send message to multiselect selecting protocol
|
||||
and fail if multiselect does not return same protocol
|
||||
|
||||
@ -22,7 +22,9 @@ class IMultiselectMuxer(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def negotiate(self, stream: NegotiableTransport) -> Tuple[TProtocol, StreamHandlerFn]:
|
||||
async def negotiate(
|
||||
self, stream: NegotiableTransport
|
||||
) -> Tuple[TProtocol, StreamHandlerFn]:
|
||||
"""
|
||||
Negotiate performs protocol selection
|
||||
:param stream: stream to negotiate on
|
||||
|
||||
@ -67,7 +67,9 @@ class FloodSub(IPubsubRouter):
|
||||
"""
|
||||
|
||||
peers_gen = self._get_peers_to_send(
|
||||
pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id)
|
||||
pubsub_msg.topicIDs,
|
||||
msg_forwarder=msg_forwarder,
|
||||
origin=ID(pubsub_msg.from_id),
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
|
||||
for peer_id in peers_gen:
|
||||
|
||||
@ -157,7 +157,9 @@ class GossipSub(IPubsubRouter):
|
||||
self.mcache.put(pubsub_msg)
|
||||
|
||||
peers_gen = self._get_peers_to_send(
|
||||
pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id)
|
||||
pubsub_msg.topicIDs,
|
||||
msg_forwarder=msg_forwarder,
|
||||
origin=ID(pubsub_msg.from_id),
|
||||
)
|
||||
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
|
||||
for peer_id in peers_gen:
|
||||
@ -352,7 +354,9 @@ class GossipSub(IPubsubRouter):
|
||||
if num_fanout_peers_in_topic < self.degree:
|
||||
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
|
||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic, self.degree - num_fanout_peers_in_topic, self.fanout[topic]
|
||||
topic,
|
||||
self.degree - num_fanout_peers_in_topic,
|
||||
self.fanout[topic],
|
||||
)
|
||||
# Add the peers to fanout[topic]
|
||||
self.fanout[topic].extend(selected_peers)
|
||||
@ -371,7 +375,9 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
# TODO: this line is a monster, can hopefully be simplified
|
||||
if (topic not in self.mesh or (peer not in self.mesh[topic])) and (
|
||||
if (
|
||||
topic not in self.mesh or (peer not in self.mesh[topic])
|
||||
) and (
|
||||
topic not in self.fanout or (peer not in self.fanout[topic])
|
||||
):
|
||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||
@ -391,7 +397,10 @@ class GossipSub(IPubsubRouter):
|
||||
topic, self.degree, []
|
||||
)
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
|
||||
if (
|
||||
peer not in self.mesh[topic]
|
||||
and peer not in self.fanout[topic]
|
||||
):
|
||||
|
||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||
@ -431,13 +440,19 @@ class GossipSub(IPubsubRouter):
|
||||
self, topic: str, num_to_select: int, minus: Sequence[ID]
|
||||
) -> List[ID]:
|
||||
gossipsub_peers_in_topic = [
|
||||
peer_id for peer_id in self.pubsub.peer_topics[topic] if peer_id in self.peers_gossipsub
|
||||
peer_id
|
||||
for peer_id in self.pubsub.peer_topics[topic]
|
||||
if peer_id in self.peers_gossipsub
|
||||
]
|
||||
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, list(minus))
|
||||
return self.select_from_minus(
|
||||
num_to_select, gossipsub_peers_in_topic, list(minus)
|
||||
)
|
||||
|
||||
# RPC handlers
|
||||
|
||||
async def handle_ihave(self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID) -> None:
|
||||
async def handle_ihave(
|
||||
self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID
|
||||
) -> None:
|
||||
"""
|
||||
Checks the seen set and requests unknown messages with an IWANT message.
|
||||
"""
|
||||
@ -461,7 +476,9 @@ class GossipSub(IPubsubRouter):
|
||||
if msg_ids_wanted:
|
||||
await self.emit_iwant(msg_ids_wanted, sender_peer_id)
|
||||
|
||||
async def handle_iwant(self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID) -> None:
|
||||
async def handle_iwant(
|
||||
self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID
|
||||
) -> None:
|
||||
"""
|
||||
Forwards all request messages that are present in mcache to the requesting peer.
|
||||
"""
|
||||
@ -496,7 +513,9 @@ class GossipSub(IPubsubRouter):
|
||||
# 4) And write the packet to the stream
|
||||
await peer_stream.write(rpc_msg)
|
||||
|
||||
async def handle_graft(self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID) -> None:
|
||||
async def handle_graft(
|
||||
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
|
||||
) -> None:
|
||||
topic: str = graft_msg.topicID
|
||||
|
||||
# Add peer to mesh for topic
|
||||
@ -507,7 +526,9 @@ class GossipSub(IPubsubRouter):
|
||||
# Respond with PRUNE if not subscribed to the topic
|
||||
await self.emit_prune(topic, sender_peer_id)
|
||||
|
||||
async def handle_prune(self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID) -> None:
|
||||
async def handle_prune(
|
||||
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
|
||||
) -> None:
|
||||
topic: str = prune_msg.topicID
|
||||
|
||||
# Remove peer from mesh for topic, if peer is in topic
|
||||
@ -569,7 +590,9 @@ class GossipSub(IPubsubRouter):
|
||||
|
||||
await self.emit_control_message(control_msg, to_peer)
|
||||
|
||||
async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: ID) -> None:
|
||||
async def emit_control_message(
|
||||
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
|
||||
) -> None:
|
||||
# Add control message to packet
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.control.CopyFrom(control_msg)
|
||||
|
||||
@ -138,7 +138,9 @@ class Pubsub:
|
||||
"""
|
||||
packet = rpc_pb2.RPC()
|
||||
for topic_id in self.my_topics:
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
||||
)
|
||||
return packet.SerializeToString()
|
||||
|
||||
async def continuously_read_stream(self, stream: INetStream) -> None:
|
||||
@ -207,7 +209,9 @@ class Pubsub:
|
||||
:param msg: the message published to the topic
|
||||
"""
|
||||
return tuple(
|
||||
self.topic_validators[topic] for topic in msg.topicIDs if topic in self.topic_validators
|
||||
self.topic_validators[topic]
|
||||
for topic in msg.topicIDs
|
||||
if topic in self.topic_validators
|
||||
)
|
||||
|
||||
async def stream_handler(self, stream: INetStream) -> None:
|
||||
@ -315,7 +319,9 @@ class Pubsub:
|
||||
|
||||
# Create subscribe message
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
||||
)
|
||||
|
||||
# Send out subscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
@ -340,7 +346,9 @@ class Pubsub:
|
||||
|
||||
# Create unsubscribe message
|
||||
packet: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)])
|
||||
packet.subscriptions.extend(
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
|
||||
)
|
||||
|
||||
# Send out unsubscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
@ -391,7 +399,9 @@ class Pubsub:
|
||||
cast(Awaitable[bool], topic_validator.validator(msg_forwarder, msg))
|
||||
)
|
||||
else:
|
||||
sync_topic_validators.append(cast(SyncValidatorFn, topic_validator.validator))
|
||||
sync_topic_validators.append(
|
||||
cast(SyncValidatorFn, topic_validator.validator)
|
||||
)
|
||||
|
||||
for validator in sync_topic_validators:
|
||||
if not validator(msg_forwarder, msg):
|
||||
|
||||
@ -73,7 +73,9 @@ class SecurityMultistream(ABC):
|
||||
|
||||
return secure_conn
|
||||
|
||||
async def select_transport(self, conn: IRawConnection, initiator: bool) -> ISecureTransport:
|
||||
async def select_transport(
|
||||
self, conn: IRawConnection, initiator: bool
|
||||
) -> ISecureTransport:
|
||||
"""
|
||||
Select a transport that both us and the node on the
|
||||
other end of conn support and agree on
|
||||
|
||||
@ -25,7 +25,9 @@ class SimpleSecurityTransport(ISecureTransport):
|
||||
incoming = (await conn.read()).decode()
|
||||
|
||||
if incoming != self.key_phrase:
|
||||
raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase)
|
||||
raise Exception(
|
||||
"Key phrase differed between nodes. Expected " + self.key_phrase
|
||||
)
|
||||
|
||||
secure_conn = SimpleSecureConn(conn, self.key_phrase)
|
||||
return secure_conn
|
||||
@ -44,7 +46,9 @@ class SimpleSecurityTransport(ISecureTransport):
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if incoming != self.key_phrase:
|
||||
raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase)
|
||||
raise Exception(
|
||||
"Key phrase differed between nodes. Expected " + self.key_phrase
|
||||
)
|
||||
|
||||
secure_conn = SimpleSecureConn(conn, self.key_phrase)
|
||||
return secure_conn
|
||||
|
||||
@ -22,7 +22,10 @@ class IMuxedConn(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def __init__(
|
||||
self, conn: ISecureConn, generic_protocol_handler: "GenericProtocolHandlerFn", peer_id: ID
|
||||
self,
|
||||
conn: ISecureConn,
|
||||
generic_protocol_handler: "GenericProtocolHandlerFn",
|
||||
peer_id: ID,
|
||||
) -> None:
|
||||
"""
|
||||
create a new muxed connection
|
||||
@ -54,7 +57,9 @@ class IMuxedConn(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> "IMuxedStream":
|
||||
async def open_stream(
|
||||
self, protocol_id: str, multi_addr: Multiaddr
|
||||
) -> "IMuxedStream":
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
|
||||
@ -90,7 +90,9 @@ class Mplex(IMuxedConn):
|
||||
# Stream not created yet
|
||||
return None
|
||||
|
||||
async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> IMuxedStream:
|
||||
async def open_stream(
|
||||
self, protocol_id: str, multi_addr: Multiaddr
|
||||
) -> IMuxedStream:
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
@ -177,7 +179,9 @@ class Mplex(IMuxedConn):
|
||||
try:
|
||||
header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
||||
length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
||||
message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout)
|
||||
message = await asyncio.wait_for(
|
||||
self.raw_conn.reader.read(length), timeout=timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return None, None, None
|
||||
|
||||
|
||||
@ -47,7 +47,11 @@ class MplexStream(IMuxedStream):
|
||||
write to stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
flag = HeaderTags.MessageInitiator if self.initiator else HeaderTags.MessageReceiver
|
||||
flag = (
|
||||
HeaderTags.MessageInitiator
|
||||
if self.initiator
|
||||
else HeaderTags.MessageReceiver
|
||||
)
|
||||
return await self.mplex_conn.send_message(flag, data, self.stream_id)
|
||||
|
||||
async def close(self) -> bool:
|
||||
@ -89,7 +93,11 @@ class MplexStream(IMuxedStream):
|
||||
return True
|
||||
|
||||
if not self.remote_closed:
|
||||
flag = HeaderTags.ResetInitiator if self.initiator else HeaderTags.ResetInitiator
|
||||
flag = (
|
||||
HeaderTags.ResetInitiator
|
||||
if self.initiator
|
||||
else HeaderTags.ResetInitiator
|
||||
)
|
||||
await self.mplex_conn.send_message(flag, None, self.stream_id)
|
||||
|
||||
self.local_closed = True
|
||||
|
||||
@ -31,7 +31,9 @@ def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]:
|
||||
return result, index + 1
|
||||
|
||||
|
||||
async def decode_uvarint_from_stream(reader: asyncio.StreamReader, timeout: float) -> int:
|
||||
async def decode_uvarint_from_stream(
|
||||
reader: asyncio.StreamReader, timeout: float
|
||||
) -> int:
|
||||
shift = 0
|
||||
result = 0
|
||||
while True:
|
||||
|
||||
@ -28,7 +28,9 @@ class TCPListener(IListener):
|
||||
:return: return True if successful
|
||||
"""
|
||||
self.server = await asyncio.start_server(
|
||||
self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp")
|
||||
self.handler,
|
||||
maddr.value_for_protocol("ip4"),
|
||||
maddr.value_for_protocol("tcp"),
|
||||
)
|
||||
socket = self.server.sockets[0]
|
||||
self.multiaddrs.append(_multiaddr_from_socket(socket))
|
||||
|
||||
@ -47,7 +47,9 @@ class TransportUpgrader:
|
||||
|
||||
@staticmethod
|
||||
def upgrade_connection(
|
||||
conn: ISecureConn, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID
|
||||
conn: ISecureConn,
|
||||
generic_protocol_handler: GenericProtocolHandlerFn,
|
||||
peer_id: ID,
|
||||
) -> Mplex:
|
||||
"""
|
||||
Upgrade raw connection to muxed connection
|
||||
|
||||
Reference in New Issue
Block a user