mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Merge branch 'master' into fix/change-notifee-and-add-tests-for-swarm-conn-and-mplex
This commit is contained in:
@ -16,8 +16,11 @@ from typing import (
|
||||
import base58
|
||||
from lru import LRU
|
||||
|
||||
from libp2p.exceptions import ValidationError
|
||||
from libp2p.exceptions import ParseError, ValidationError
|
||||
from libp2p.host.host_interface import IHost
|
||||
from libp2p.io.exceptions import IncompleteReadError
|
||||
from libp2p.network.exceptions import SwarmException
|
||||
from libp2p.network.stream.exceptions import StreamEOF, StreamReset
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.typing import TProtocol
|
||||
@ -154,7 +157,13 @@ class Pubsub:
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
|
||||
while True:
|
||||
incoming: bytes = await read_varint_prefixed_bytes(stream)
|
||||
try:
|
||||
incoming: bytes = await read_varint_prefixed_bytes(stream)
|
||||
except (ParseError, IncompleteReadError) as error:
|
||||
logger.debug(
|
||||
"read corrupted data from peer %s, error=%s", peer_id, error
|
||||
)
|
||||
continue
|
||||
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
rpc_incoming.ParseFromString(incoming)
|
||||
if rpc_incoming.publish:
|
||||
@ -228,10 +237,20 @@ class Pubsub:
|
||||
on one of the supported pubsub protocols.
|
||||
:param stream: newly created stream
|
||||
"""
|
||||
await self.continuously_read_stream(stream)
|
||||
try:
|
||||
await self.continuously_read_stream(stream)
|
||||
except (StreamEOF, StreamReset) as error:
|
||||
logger.debug("fail to read from stream, error=%s", error)
|
||||
stream.reset()
|
||||
# TODO: what to do when the stream is terminated?
|
||||
# disconnect the peer?
|
||||
|
||||
async def _handle_new_peer(self, peer_id: ID) -> None:
|
||||
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
|
||||
try:
|
||||
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
|
||||
except SwarmException as error:
|
||||
logger.debug("fail to add new peer %s, error %s", peer_id, error)
|
||||
return
|
||||
|
||||
self.peers[peer_id] = stream
|
||||
|
||||
|
||||
Reference in New Issue
Block a user