Fix: failed to open stream using existing conn

Fix #233
This commit is contained in:
mhchia
2019-09-03 14:12:16 +08:00
parent 194b494057
commit fd1f466002

View File

@ -15,6 +15,7 @@ from libp2p.transport.listener_interface import IListener
from libp2p.transport.transport_interface import ITransport from libp2p.transport.transport_interface import ITransport
from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
from libp2p.peer.peerstore import PeerStoreError
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
from .exceptions import SwarmException from .exceptions import SwarmException
@ -92,55 +93,55 @@ class Swarm(INetwork):
:return: muxed connection :return: muxed connection
""" """
# Get peer info from peer store if peer_id in self.connections:
addrs = self.peerstore.addrs(peer_id) # If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
return self.connections[peer_id]
try:
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
except PeerStoreError:
raise SwarmException(f"No known addresses to peer {peer_id}")
if not addrs: if not addrs:
raise SwarmException("No known addresses to peer") raise SwarmException(f"No known addresses to peer {peer_id}")
if not self.router: if not self.router:
multiaddr = addrs[0] multiaddr = addrs[0]
else: else:
multiaddr = self.router.find_peer(peer_id) multiaddr = self.router.find_peer(peer_id)
# Dial peer (connection to peer does not yet exist)
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(multiaddr, self.self_id)
if peer_id in self.connections: # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
# If muxed connection already exists for peer_id, # the conn and then mux the conn
# set muxed connection equal to existing muxed connection try:
muxed_conn = self.connections[peer_id] secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True)
else: except SecurityUpgradeFailure as error:
# Dial peer (connection to peer does not yet exist) # TODO: Add logging to indicate the failure
# Transport dials peer (gets back a raw conn) await raw_conn.close()
raw_conn = await self.transport.dial(multiaddr, self.self_id) raise SwarmException(
f"fail to upgrade the connection to a secured connection from {peer_id}"
) from error
try:
muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id
)
except MuxerUpgradeFailure as error:
# TODO: Add logging to indicate the failure
await secured_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a muxed connection from {peer_id}"
) from error
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # Store muxed connection in connections
# the conn and then mux the conn self.connections[peer_id] = muxed_conn
try:
secured_conn = await self.upgrader.upgrade_security(
raw_conn, peer_id, True
)
except SecurityUpgradeFailure as error:
# TODO: Add logging to indicate the failure
await raw_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a secured connection from {peer_id}"
) from error
try:
muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id
)
except MuxerUpgradeFailure as error:
# TODO: Add logging to indicate the failure
await secured_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a muxed connection from {peer_id}"
) from error
# Store muxed connection in connections # Call notifiers since event occurred
self.connections[peer_id] = muxed_conn for notifee in self.notifees:
await notifee.connected(self, muxed_conn)
# Call notifiers since event occurred
for notifee in self.notifees:
await notifee.connected(self, muxed_conn)
return muxed_conn return muxed_conn
@ -152,11 +153,6 @@ class Swarm(INetwork):
:param protocol_id: protocol id :param protocol_id: protocol id
:return: net stream instance :return: net stream instance
""" """
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
if not addrs:
raise SwarmException("No known addresses to peer")
muxed_conn = await self.dial_peer(peer_id) muxed_conn = await self.dial_peer(peer_id)
@ -217,10 +213,6 @@ class Swarm(INetwork):
"fail to upgrade the connection to a secured connection" "fail to upgrade the connection to a secured connection"
) from error ) from error
peer_id = secured_conn.get_remote_peer() peer_id = secured_conn.get_remote_peer()
peer_ip, peer_port = writer.get_extra_info("peername")
peer_maddr = Multiaddr(f"/ip4/{peer_ip}/tcp/{peer_port}")
# TODO: Fix the ttl
self.peerstore.add_addr(peer_id, peer_maddr, 12345678)
try: try:
muxed_conn = await self.upgrader.upgrade_connection( muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id secured_conn, self.generic_protocol_handler, peer_id