diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 0fa2fdbf..62e014b7 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -11,6 +11,9 @@ from .transport.tcp.tcp import TCP async def cleanup_done_tasks(): + """ + clean up asyncio done tasks to free up resources + """ while True: for task in asyncio.all_tasks(): if task.done(): @@ -20,30 +23,59 @@ async def cleanup_done_tasks(): # Some sleep necessary to context switch await asyncio.sleep(3) -async def new_node( +def initialize_default_swarm( id_opt=None, transport_opt=None, - muxer_opt=None, sec_opt=None, peerstore=None): - - if id_opt is None: + muxer_opt=None, sec_opt=None, peerstore_opt=None): + """ + initialize swarm when no swarm is passed in + :param id_opt: optional id for host + :param transport_opt: optional choice of transport upgrade + :param muxer_opt: optional choice of stream muxer + :param sec_opt: optional choice of security upgrade + :param peerstore_opt: optional peerstore + :return: return a default swarm instance + """ + # pylint: disable=too-many-arguments, unused-argument + if not id_opt: new_key = RSA.generate(2048, e=65537) id_opt = id_from_public_key(new_key.publickey()) # private_key = new_key.exportKey("PEM") transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"] - transport_opt = [multiaddr.Multiaddr(t) for t in transport_opt] - muxer_opt = muxer_opt or ["mplex/6.7.0"] - sec_opt = sec_opt or ["secio"] - peerstore = peerstore or PeerStore() - - upgrader = TransportUpgrader(sec_opt, transport_opt) - swarm = Swarm(id_opt, peerstore, upgrader) + transport = [multiaddr.Multiaddr(t) for t in transport_opt] + # TODO wire muxer up with swarm + # muxer = muxer_opt or ["mplex/6.7.0"] + sec = sec_opt or ["secio"] + peerstore = peerstore_opt or PeerStore() + upgrader = TransportUpgrader(sec, transport) + swarm_opt = Swarm(id_opt, peerstore, upgrader) tcp = TCP() - swarm.add_transport(tcp) - await swarm.listen(transport_opt[0]) + swarm_opt.add_transport(tcp) + + return swarm_opt + +async def new_node( + swarm_opt=None, id_opt=None, transport_opt=None, + muxer_opt=None, sec_opt=None, peerstore_opt=None): + """ + create new libp2p node + :param id_opt: optional id for host + :param transport_opt: optional choice of transport upgrade + :param muxer_opt: optional choice of stream muxer + :param sec_opt: optional choice of security upgrade + :param peerstore_opt: optional peerstore + :return: return a default swarm instance + """ + # pylint: disable=too-many-arguments + if not swarm_opt: + swarm_opt = initialize_default_swarm( + id_opt=id_opt, transport_opt=transport_opt, + muxer_opt=muxer_opt, sec_opt=sec_opt, + peerstore_opt=peerstore_opt) # TODO enable support for other host type # TODO routing unimplemented - host = BasicHost(swarm) + host = BasicHost(swarm_opt) # Kick off cleanup job asyncio.ensure_future(cleanup_done_tasks()) diff --git a/tests/examples/test_chat.py b/tests/examples/test_chat.py index 0ea19793..ee192cca 100644 --- a/tests/examples/test_chat.py +++ b/tests/examples/test_chat.py @@ -1,7 +1,8 @@ import pytest import asyncio +import multiaddr -from tests.utils import cleanup +from tests.utils import cleanup, set_up_nodes_by_transport_opt from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.protocol_muxer.multiselect_client import MultiselectClientError @@ -9,7 +10,6 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClientError PROTOCOL_ID = '/chat/1.0.0' - async def hello_world(host_a, host_b): async def stream_handler(stream): read = await stream.read() @@ -100,8 +100,8 @@ async def no_common_protocol(host_a, host_b): (no_common_protocol), ]) async def test_chat(test): - host_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - host_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (host_a, host_b) = await set_up_nodes_by_transport_opt(transport_opt_list) addr = host_a.get_addrs()[0] info = info_from_p2p_addr(addr) diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index 4b7e59c3..e9bfb83f 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -1,17 +1,15 @@ import multiaddr import pytest -from tests.utils import cleanup -from libp2p import new_node +from tests.utils import cleanup, set_up_nodes_by_transport_opt from libp2p.peer.peerinfo import info_from_p2p_addr + # pylint: disable=too-many-locals - - @pytest.mark.asyncio async def test_simple_messages(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler(stream): while True: @@ -41,8 +39,8 @@ async def test_simple_messages(): @pytest.mark.asyncio async def test_double_response(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler(stream): while True: @@ -78,8 +76,8 @@ async def test_double_response(): async def test_multiple_streams(): # Node A should be able to open a stream with node B and then vice versa. # Stream IDs should be generated uniquely so that the stream state is not overwritten - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler_a(stream): while True: @@ -124,8 +122,8 @@ async def test_multiple_streams(): @pytest.mark.asyncio async def test_multiple_streams_same_initiator_different_protocols(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler_a1(stream): while True: @@ -184,8 +182,8 @@ async def test_multiple_streams_same_initiator_different_protocols(): @pytest.mark.asyncio async def test_multiple_streams_two_initiators(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler_a1(stream): while True: @@ -262,9 +260,9 @@ async def test_multiple_streams_two_initiators(): @pytest.mark.asyncio async def test_triangle_nodes_connection(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"],\ + ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b, node_c) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler(stream): while True: @@ -315,8 +313,8 @@ async def test_triangle_nodes_connection(): @pytest.mark.asyncio async def test_host_connect(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) assert not node_a.get_peerstore().peers() diff --git a/tests/libp2p/test_notify.py b/tests/libp2p/test_notify.py index 108f04a2..71b455ba 100644 --- a/tests/libp2p/test_notify.py +++ b/tests/libp2p/test_notify.py @@ -10,10 +10,14 @@ features are implemented in swarm """ import pytest +import multiaddr -from tests.utils import cleanup -from libp2p import new_node + +from tests.utils import cleanup, echo_stream_handler, \ + perform_two_host_set_up_custom_handler +from libp2p import new_node, initialize_default_swarm from libp2p.network.notifee_interface import INotifee +from libp2p.host.basic_host import BasicHost # pylint: disable=too-many-locals @@ -38,10 +42,11 @@ class MyNotifee(INotifee): async def disconnected(self, network, conn): pass - async def listen(self, network, multiaddr): - pass + async def listen(self, network, _multiaddr): + self.events.append(["listened" + self.val_to_append_to_event,\ + _multiaddr]) - async def listen_close(self, network, multiaddr): + async def listen_close(self, network, _multiaddr): pass class InvalidNotifee(): @@ -65,36 +70,9 @@ class InvalidNotifee(): async def listen(self): assert False -async def perform_two_host_simple_set_up(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - - async def my_stream_handler(stream): - while True: - read_string = (await stream.read()).decode() - - resp = "ack:" + read_string - await stream.write(resp.encode()) - - node_b.set_stream_handler("/echo/1.0.0", my_stream_handler) - - # Associate the peer with local ip address (see default parameters of Libp2p()) - node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) - return node_a, node_b - -async def perform_two_host_simple_set_up_custom_handler(handler): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - - node_b.set_stream_handler("/echo/1.0.0", handler) - - # Associate the peer with local ip address (see default parameters of Libp2p()) - node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) - return node_a, node_b - @pytest.mark.asyncio async def test_one_notifier(): - node_a, node_b = await perform_two_host_simple_set_up() + node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) # Add notifee for node_a events = [] @@ -135,7 +113,7 @@ async def test_one_notifier_on_two_nodes(): resp = "ack:" + read_string await stream.write(resp.encode()) - node_a, node_b = await perform_two_host_simple_set_up_custom_handler(my_stream_handler) + node_a, node_b = await perform_two_host_set_up_custom_handler(my_stream_handler) # Add notifee for node_a events_a = [] @@ -163,9 +141,72 @@ async def test_one_notifier_on_two_nodes(): # Success, terminate pending tasks. await cleanup() +@pytest.mark.asyncio +async def test_one_notifier_on_two_nodes_with_listen(): + events_b = [] + + node_a_transport_opt = ["/ip4/127.0.0.1/tcp/0"] + node_a = await new_node(transport_opt=node_a_transport_opt) + await node_a.get_network().listen(multiaddr.Multiaddr(node_a_transport_opt[0])) + + # Set up node_b swarm to pass into host + node_b_transport_opt = ["/ip4/127.0.0.1/tcp/0"] + node_b_multiaddr = multiaddr.Multiaddr(node_b_transport_opt[0]) + node_b_swarm = initialize_default_swarm(transport_opt=node_b_transport_opt) + node_b = BasicHost(node_b_swarm) + + async def my_stream_handler(stream): + # Ensure the listened, connected and opened_stream events were hit in Notifee obj + # and that the stream passed into opened_stream matches the stream created on + # node_b + assert events_b == [ + ["listenedb", node_b_multiaddr], \ + ["connectedb", stream.mplex_conn], \ + ["opened_streamb", stream] + ] + while True: + read_string = (await stream.read()).decode() + + resp = "ack:" + read_string + await stream.write(resp.encode()) + + # Add notifee for node_a + events_a = [] + assert node_a.get_network().notify(MyNotifee(events_a, "a")) + + # Add notifee for node_b + assert node_b.get_network().notify(MyNotifee(events_b, "b")) + + # start listen on node_b_swarm + await node_b.get_network().listen(node_b_multiaddr) + + node_b.set_stream_handler("/echo/1.0.0", my_stream_handler) + # Associate the peer with local ip address (see default parameters of Libp2p()) + node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) + stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"]) + + # Ensure the connected and opened_stream events were hit in MyNotifee obj + # and that stream passed into opened_stream matches the stream created on + # node_a + assert events_a == [ + ["connecteda", stream.mplex_conn], \ + ["opened_streama", stream] + ] + + messages = ["hello", "hello"] + for message in messages: + await stream.write(message.encode()) + + response = (await stream.read()).decode() + + assert response == ("ack:" + message) + + # Success, terminate pending tasks. + await cleanup() + @pytest.mark.asyncio async def test_two_notifiers(): - node_a, node_b = await perform_two_host_simple_set_up() + node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) # Add notifee for node_a events0 = [] @@ -198,7 +239,7 @@ async def test_two_notifiers(): async def test_ten_notifiers(): num_notifiers = 10 - node_a, node_b = await perform_two_host_simple_set_up() + node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) # Add notifee for node_a events_lst = [] @@ -244,7 +285,7 @@ async def test_ten_notifiers_on_two_nodes(): resp = "ack:" + read_string await stream.write(resp.encode()) - node_a, node_b = await perform_two_host_simple_set_up_custom_handler(my_stream_handler) + node_a, node_b = await perform_two_host_set_up_custom_handler(my_stream_handler) # Add notifee for node_a and node_b events_lst_a = [] @@ -278,7 +319,7 @@ async def test_ten_notifiers_on_two_nodes(): async def test_invalid_notifee(): num_notifiers = 10 - node_a, node_b = await perform_two_host_simple_set_up() + node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) # Add notifee for node_a events_lst = [] diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index a7e19afd..00949faf 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -1,7 +1,6 @@ import pytest -from tests.utils import cleanup -from libp2p import new_node +from tests.utils import cleanup, set_up_nodes_by_transport_opt from libp2p.protocol_muxer.multiselect_client import MultiselectClientError # TODO: Add tests for multiple streams being opened on different @@ -15,12 +14,8 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClientError async def perform_simple_test(expected_selected_protocol, protocols_for_client, protocols_with_handlers): - transport_opt_a = ["/ip4/127.0.0.1/tcp/0"] - transport_opt_b = ["/ip4/127.0.0.1/tcp/0"] - node_a = await new_node( - transport_opt=transport_opt_a) - node_b = await new_node( - transport_opt=transport_opt_b) + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler(stream): while True: diff --git a/tests/utils.py b/tests/utils.py index c9954123..4efde83c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,6 +1,8 @@ -import asyncio - from contextlib import suppress +import asyncio +import multiaddr + +from libp2p import new_node async def cleanup(): @@ -12,3 +14,28 @@ async def cleanup(): # Cancelled task raises asyncio.CancelledError that we can suppress: with suppress(asyncio.CancelledError): await task + +async def set_up_nodes_by_transport_opt(transport_opt_list): + nodes_list = [] + for transport_opt in transport_opt_list: + node = await new_node(transport_opt=transport_opt) + await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0])) + nodes_list.append(node) + return tuple(nodes_list) + +async def echo_stream_handler(stream): + while True: + read_string = (await stream.read()).decode() + + resp = "ack:" + read_string + await stream.write(resp.encode()) + +async def perform_two_host_set_up_custom_handler(handler): + transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] + (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) + + node_b.set_stream_handler("/echo/1.0.0", handler) + + # Associate the peer with local ip address (see default parameters of Libp2p()) + node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) + return node_a, node_b