From fd1f466002726d0ba66983235e6ff164bc1eabdd Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 14:12:16 +0800 Subject: [PATCH] Fix: failed to open stream using existing conn Fix #233 --- libp2p/network/swarm.py | 88 +++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 48 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index e522a4b2..895ceb57 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -15,6 +15,7 @@ from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol +from libp2p.peer.peerstore import PeerStoreError from .connection.raw_connection import RawConnection from .exceptions import SwarmException @@ -92,55 +93,55 @@ class Swarm(INetwork): :return: muxed connection """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) + if peer_id in self.connections: + # If muxed connection already exists for peer_id, + # set muxed connection equal to existing muxed connection + return self.connections[peer_id] + + try: + # Get peer info from peer store + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError: + raise SwarmException(f"No known addresses to peer {peer_id}") if not addrs: - raise SwarmException("No known addresses to peer") + raise SwarmException(f"No known addresses to peer {peer_id}") if not self.router: multiaddr = addrs[0] else: multiaddr = self.router.find_peer(peer_id) + # Dial peer (connection to peer does not yet exist) + # Transport dials peer (gets back a raw conn) + raw_conn = await self.transport.dial(multiaddr, self.self_id) - if peer_id in self.connections: - # If muxed connection already exists for peer_id, - # set muxed connection equal to existing muxed connection - muxed_conn = self.connections[peer_id] - else: - # Dial peer (connection to peer does not yet exist) - # Transport dials peer (gets back a raw conn) - raw_conn = await self.transport.dial(multiaddr, self.self_id) + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn + try: + secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await raw_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a secured connection from {peer_id}" + ) from error + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + await secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error - # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure - # the conn and then mux the conn - try: - secured_conn = await self.upgrader.upgrade_security( - raw_conn, peer_id, True - ) - except SecurityUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await raw_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a secured connection from {peer_id}" - ) from error - try: - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) - except MuxerUpgradeFailure as error: - # TODO: Add logging to indicate the failure - await secured_conn.close() - raise SwarmException( - f"fail to upgrade the connection to a muxed connection from {peer_id}" - ) from error + # Store muxed connection in connections + self.connections[peer_id] = muxed_conn - # Store muxed connection in connections - self.connections[peer_id] = muxed_conn - - # Call notifiers since event occurred - for notifee in self.notifees: - await notifee.connected(self, muxed_conn) + # Call notifiers since event occurred + for notifee in self.notifees: + await notifee.connected(self, muxed_conn) return muxed_conn @@ -152,11 +153,6 @@ class Swarm(INetwork): :param protocol_id: protocol id :return: net stream instance """ - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) - - if not addrs: - raise SwarmException("No known addresses to peer") muxed_conn = await self.dial_peer(peer_id) @@ -217,10 +213,6 @@ class Swarm(INetwork): "fail to upgrade the connection to a secured connection" ) from error peer_id = secured_conn.get_remote_peer() - peer_ip, peer_port = writer.get_extra_info("peername") - peer_maddr = Multiaddr(f"/ip4/{peer_ip}/tcp/{peer_port}") - # TODO: Fix the ttl - self.peerstore.add_addr(peer_id, peer_maddr, 12345678) try: muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id