Add test for notifee disconnected

This commit is contained in:
mhchia
2019-09-17 21:54:20 +08:00
parent 675c61ce3b
commit b8b5ac5e06
5 changed files with 87 additions and 333 deletions

View File

@ -7,7 +7,7 @@ from libp2p.network.connection.net_connection_interface import INetConn
from libp2p.peer.id import ID
from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.transport.listener_interface import IListener
from libp2p.typing import StreamHandlerFn, TProtocol
from libp2p.typing import StreamHandlerFn
from .stream.net_stream_interface import INetStream
@ -38,9 +38,7 @@ class INetwork(ABC):
"""
@abstractmethod
async def new_stream(
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
) -> INetStream:
async def new_stream(self, peer_id: ID) -> INetStream:
"""
:param peer_id: peer_id of destination
:param protocol_ids: available protocol ids to use for stream
@ -61,7 +59,7 @@ class INetwork(ABC):
"""
@abstractmethod
def notify(self, notifee: "INotifee") -> bool:
def register_notifee(self, notifee: "INotifee") -> None:
"""
:param notifee: object implementing Notifee interface
:return: true if notifee registered successfully, false otherwise

View File

@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Dict, List, Optional, Sequence
from typing import Dict, List, Optional
from multiaddr import Multiaddr
@ -14,7 +14,7 @@ from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFail
from libp2p.transport.listener_interface import IListener
from libp2p.transport.transport_interface import ITransport
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import StreamHandlerFn, TProtocol
from libp2p.typing import StreamHandlerFn
from .connection.raw_connection import RawConnection
from .connection.swarm_connection import SwarmConn
@ -131,9 +131,7 @@ class Swarm(INetwork):
return swarm_conn
async def new_stream(
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
) -> INetStream:
async def new_stream(self, peer_id: ID) -> INetStream:
"""
:param peer_id: peer_id of destination
:param protocol_id: protocol id
@ -229,15 +227,12 @@ class Swarm(INetwork):
# No maddr succeeded
return False
def notify(self, notifee: INotifee) -> bool:
def register_notifee(self, notifee: INotifee) -> None:
"""
:param notifee: object implementing Notifee interface
:return: true if notifee registered successfully, false otherwise
"""
if isinstance(notifee, INotifee):
self.notifees.append(notifee)
return True
return False
self.notifees.append(notifee)
def add_router(self, router: IPeerRouting) -> None:
self.router = router