From 63fd531ed031e883f64f15fa94df1f051c06d45d Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Mon, 2 Dec 2019 15:57:22 -0800 Subject: [PATCH 1/5] Fixes to add python 3.6 compatibility --- libp2p/network/connection/raw_connection.py | 4 +- libp2p/pubsub/pubsub.py | 2 +- libp2p/tools/factories.py | 7 +- .../floodsub_integration_test_settings.py | 72 ++++++++----------- libp2p/transport/tcp/tcp.py | 4 +- setup.py | 4 +- tests/pubsub/test_pubsub.py | 4 +- tests/security/test_secio.py | 4 +- tests_interop/conftest.py | 6 +- 9 files changed, 54 insertions(+), 53 deletions(-) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 08d22055..55b47efc 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -1,4 +1,5 @@ import asyncio +import sys from .exceptions import RawConnError from .raw_connection_interface import IRawConnection @@ -52,4 +53,5 @@ class RawConnection(IRawConnection): async def close(self) -> None: self.writer.close() - await self.writer.wait_closed() + if sys.version_info[0:2] > (3, 6): + await self.writer.wait_closed() diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index fab8024b..5233a32d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -150,7 +150,7 @@ class Pubsub: # Map of topic to topic validator self.topic_validators = {} - self.counter = time.time_ns() + self.counter = int(time.time()) self._tasks = [] # Call handle peer to keep waiting for updates to peer queue diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index b189cfa1..5e223130 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -1,5 +1,4 @@ import asyncio -from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Dict, Tuple, cast import factory @@ -33,6 +32,12 @@ from .constants import ( ) from .utils import connect, connect_swarm +try: + from contextlib import asynccontextmanager +except ImportError: + # NOTE: mypy complains about a duplicate import without the following ``# type: ignore`` + from async_generator import asynccontextmanager # type: ignore + def initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) -> PeerStore: peer_store = PeerStore() diff --git a/libp2p/tools/pubsub/floodsub_integration_test_settings.py b/libp2p/tools/pubsub/floodsub_integration_test_settings.py index b7b66244..d6b5b678 100644 --- a/libp2p/tools/pubsub/floodsub_integration_test_settings.py +++ b/libp2p/tools/pubsub/floodsub_integration_test_settings.py @@ -143,6 +143,14 @@ floodsub_protocol_pytest_params = [ ] +def _collect_node_ids(adj_list): + node_ids = set() + for node, neighbors in adj_list.items(): + node_ids.add(node) + node_ids.update(set(neighbors)) + return node_ids + + async def perform_test_from_obj(obj, router_factory) -> None: """ Perform pubsub tests from a test object, which is composed as follows: @@ -180,59 +188,43 @@ async def perform_test_from_obj(obj, router_factory) -> None: node_map = {} pubsub_map = {} - async def add_node(node_id_str: str) -> None: + async def add_node(node_id_str: str): pubsub_router = router_factory(protocols=obj["supported_protocols"]) pubsub = PubsubFactory(router=pubsub_router) await pubsub.host.get_network().listen(LISTEN_MADDR) node_map[node_id_str] = pubsub.host pubsub_map[node_id_str] = pubsub - tasks_connect = [] - for start_node_id in adj_list: - # Create node if node does not yet exist - if start_node_id not in node_map: - await add_node(start_node_id) + all_node_ids = _collect_node_ids(adj_list) - # For each neighbor of start_node, create if does not yet exist, - # then connect start_node to neighbor - for neighbor_id in adj_list[start_node_id]: - # Create neighbor if neighbor does not yet exist - if neighbor_id not in node_map: - await add_node(neighbor_id) - tasks_connect.append( - connect(node_map[start_node_id], node_map[neighbor_id]) - ) - # Connect nodes and wait at least for 2 seconds - await asyncio.gather(*tasks_connect, asyncio.sleep(2)) + for node in all_node_ids: + await add_node(node) + + for node, neighbors in adj_list.items(): + for neighbor_id in neighbors: + await connect(node_map[node], node_map[neighbor_id]) + + # NOTE: the test using this routine will fail w/o these sleeps... + await asyncio.sleep(1) # Step 2) Subscribe to topics queues_map = {} topic_map = obj["topic_map"] - tasks_topic = [] - tasks_topic_data = [] for topic, node_ids in topic_map.items(): for node_id in node_ids: - tasks_topic.append(pubsub_map[node_id].subscribe(topic)) - tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) + queue = await pubsub_map[node_id].subscribe(topic) + if node_id not in queues_map: + queues_map[node_id] = {} + # Store queue in topic-queue map for node + queues_map[node_id][topic] = queue - # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic) - for i in range(len(responses) - 1): - node_id, topic = tasks_topic_data[i] - if node_id not in queues_map: - queues_map[node_id] = {} - # Store queue in topic-queue map for node - queues_map[node_id][topic] = responses[i] - - # Allow time for subscribing before continuing - await asyncio.sleep(0.01) + # NOTE: the test using this routine will fail w/o these sleeps... + await asyncio.sleep(1) # Step 3) Publish messages topics_in_msgs_ordered = [] messages = obj["messages"] - tasks_publish = [] for msg in messages: topics = msg["topics"] @@ -242,21 +234,17 @@ async def perform_test_from_obj(obj, router_factory) -> None: # Publish message # TODO: Should be single RPC package with several topics for topic in topics: - tasks_publish.append(pubsub_map[node_id].publish(topic, data)) - - # For each topic in topics, add (topic, node_id, data) tuple to ordered test list - for topic in topics: + await pubsub_map[node_id].publish(topic, data) + # For each topic in topics, add (topic, node_id, data) tuple to ordered test list topics_in_msgs_ordered.append((topic, node_id, data)) - # Allow time for publishing before continuing - await asyncio.gather(*tasks_publish, asyncio.sleep(2)) - # Step 4) Check that all messages were received correctly. for topic, origin_node_id, data in topics_in_msgs_ordered: # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue - msg = await queues_map[node_id][topic].get() + queue = queues_map[node_id][topic] + msg = await queue.get() assert data == msg.data # Check the message origin assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 7470510d..c27de0e1 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,5 +1,6 @@ import asyncio from socket import socket +import sys from typing import List from multiaddr import Multiaddr @@ -53,7 +54,8 @@ class TCPListener(IListener): if self.server is None: return self.server.close() - await self.server.wait_closed() + if sys.version_info[0:2] > (3, 6): + await self.server.wait_closed() self.server = None diff --git a/setup.py b/setup.py index a7b1c367..35b17bf8 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,8 @@ install_requires = [ "protobuf>=3.10.0,<4.0.0", "coincurve>=10.0.0,<11.0.0", "pynacl==1.3.0", + "dataclasses>=0.7, <1;python_version<'3.7'", + "async_generator==1.10;python_version<'3.7'", ] @@ -80,7 +82,7 @@ setup( url="https://github.com/libp2p/py-libp2p", include_package_data=True, install_requires=install_requires, - python_requires=">=3.7,<4", + python_requires=">=3.6,<4", extras_require=extras_require, py_modules=["libp2p"], license="MIT/APACHE2.0", diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 01d8ba74..6f3c6725 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -58,11 +58,11 @@ async def test_peers_subscribe(pubsubs_fsub): await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) await pubsubs_fsub[0].subscribe(TESTING_TOPIC) # Yield to let 0 notify 1 - await asyncio.sleep(0.1) + await asyncio.sleep(1) assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC) # Yield to let 0 notify 1 - await asyncio.sleep(0.1) + await asyncio.sleep(1) assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] diff --git a/tests/security/test_secio.py b/tests/security/test_secio.py index c7808b46..50374809 100644 --- a/tests/security/test_secio.py +++ b/tests/security/test_secio.py @@ -76,8 +76,8 @@ async def test_create_secure_session(): local_conn = InMemoryConnection(local_peer, is_initiator=True) remote_conn = InMemoryConnection(remote_peer) - local_pipe_task = asyncio.create_task(create_pipe(local_conn, remote_conn)) - remote_pipe_task = asyncio.create_task(create_pipe(remote_conn, local_conn)) + local_pipe_task = asyncio.ensure_future(create_pipe(local_conn, remote_conn)) + remote_pipe_task = asyncio.ensure_future(create_pipe(remote_conn, local_conn)) local_session_builder = create_secure_session( local_nonce, local_peer, local_key_pair.private_key, local_conn, remote_peer diff --git a/tests_interop/conftest.py b/tests_interop/conftest.py index 8ae9769b..3fcc74f3 100644 --- a/tests_interop/conftest.py +++ b/tests_interop/conftest.py @@ -151,7 +151,8 @@ class DaemonStream(ReadWriteCloser): async def close(self) -> None: self.writer.close() - await self.writer.wait_closed() + if sys.version_info[0:2] > (3, 6): + await self.writer.wait_closed() async def read(self, n: int = -1) -> bytes: return await self.reader.read(n) @@ -196,7 +197,8 @@ async def py_to_daemon_stream_pair(hosts, p2pds, is_to_fail_daemon_stream): # some day. listener = p2pds[0].control.control.listener listener.close() - await listener.wait_closed() + if sys.version_info[0:2] > (3, 6): + await listener.wait_closed() stream_py = await host.new_stream(p2pd.peer_id, [protocol_id]) if not is_to_fail_daemon_stream: await event_stream_handled.wait() From 4c0f511516632975b706f106cfe5cbb88a4e6061 Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Mon, 2 Dec 2019 16:00:56 -0800 Subject: [PATCH 2/5] Add `py36` tox env for testing --- tox.ini | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index de4f8f1b..a482e70f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,9 @@ # Reference: https://github.com/ethereum/ethereum-python-project-template/blob/master/tox.ini -# TODO: consider py36 and pypy3 support +# TODO: consider pypy3 support [tox] envlist = + py36-test py37-test py37-interop lint @@ -37,6 +38,7 @@ commands = basepython = docs: python py37: python3.7 + py36: python3.6 extras = test docs: doc From a1fd106bf3e74ce0e6508a8bcb73cb433dab4a50 Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Mon, 2 Dec 2019 16:01:06 -0800 Subject: [PATCH 3/5] Ensure correct names of test envs in circle ci --- .circleci/config.yml | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fa7691e6..c3094b9b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -48,30 +48,24 @@ jobs: - image: circleci/python:3.6 environment: TOXENV: lint - py36-core: + py36-test: <<: *common docker: - image: circleci/python:3.6 environment: - TOXENV: py36-core - py37-core: + TOXENV: py36-test + py37-test: <<: *common docker: - image: circleci/python:3.7 environment: - TOXENV: py37-core - pypy3-core: - <<: *common - docker: - - image: pypy - environment: - TOXENV: pypy3-core + TOXENV: py37-test + workflows: version: 2 test: jobs: - docs - lint - - py36-core - - py37-core - - pypy3-core + - py36-test + - py37-test From d516cf51b814aeb17a303e422b7dc6e35a234e3f Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Mon, 2 Dec 2019 16:05:28 -0800 Subject: [PATCH 4/5] Add py3.6 to travis config --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index ce9b061e..6b6b3ccc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,9 @@ language: python matrix: include: + - python: 3.6-dev + dist: xenial + env: TOXENV=py36-test - python: 3.7-dev dist: xenial env: TOXENV=py37-test From 3b9d7c7acd7af955b4e1855effac85aec44ca81f Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Tue, 10 Dec 2019 17:07:21 -0800 Subject: [PATCH 5/5] Apply PR feedback --- .circleci/config.yml | 20 +++++++++++++------- libp2p/network/connection/raw_connection.py | 5 +++-- libp2p/tools/factories.py | 10 +++------- libp2p/transport/tcp/tcp.py | 6 ++++-- setup.py | 3 ++- tests_interop/conftest.py | 5 +++-- tox.ini | 3 +-- 7 files changed, 29 insertions(+), 23 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c3094b9b..fa7691e6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -48,24 +48,30 @@ jobs: - image: circleci/python:3.6 environment: TOXENV: lint - py36-test: + py36-core: <<: *common docker: - image: circleci/python:3.6 environment: - TOXENV: py36-test - py37-test: + TOXENV: py36-core + py37-core: <<: *common docker: - image: circleci/python:3.7 environment: - TOXENV: py37-test - + TOXENV: py37-core + pypy3-core: + <<: *common + docker: + - image: pypy + environment: + TOXENV: pypy3-core workflows: version: 2 test: jobs: - docs - lint - - py36-test - - py37-test + - py36-core + - py37-core + - pypy3-core diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 55b47efc..4f01b124 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -53,5 +53,6 @@ class RawConnection(IRawConnection): async def close(self) -> None: self.writer.close() - if sys.version_info[0:2] > (3, 6): - await self.writer.wait_closed() + if sys.version_info < (3, 7): + return + await self.writer.wait_closed() diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index 5e223130..5c9c310d 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -1,6 +1,8 @@ import asyncio from typing import Any, AsyncIterator, Dict, Tuple, cast +# NOTE: import ``asynccontextmanager`` from ``contextlib`` when support for python 3.6 is dropped. +from async_generator import asynccontextmanager import factory from libp2p import generate_new_rsa_identity, generate_peer_id_from @@ -32,12 +34,6 @@ from .constants import ( ) from .utils import connect, connect_swarm -try: - from contextlib import asynccontextmanager -except ImportError: - # NOTE: mypy complains about a duplicate import without the following ``# type: ignore`` - from async_generator import asynccontextmanager # type: ignore - def initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) -> PeerStore: peer_store = PeerStore() @@ -177,7 +173,7 @@ async def host_pair_factory(is_secure: bool) -> Tuple[BasicHost, BasicHost]: return hosts[0], hosts[1] -@asynccontextmanager +@asynccontextmanager # type: ignore async def pair_of_connected_hosts( is_secure: bool = True ) -> AsyncIterator[Tuple[BasicHost, BasicHost]]: diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index c27de0e1..f5c2aa40 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -54,9 +54,11 @@ class TCPListener(IListener): if self.server is None: return self.server.close() - if sys.version_info[0:2] > (3, 6): - await self.server.wait_closed() + server = self.server self.server = None + if sys.version_info < (3, 7): + return + await server.wait_closed() class TCP(ITransport): diff --git a/setup.py b/setup.py index 35b17bf8..56a6bf20 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ install_requires = [ "coincurve>=10.0.0,<11.0.0", "pynacl==1.3.0", "dataclasses>=0.7, <1;python_version<'3.7'", - "async_generator==1.10;python_version<'3.7'", + "async_generator==1.10", ] @@ -96,6 +96,7 @@ setup( "License :: OSI Approved :: Apache Software License", "Natural Language :: English", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", ], platforms=["unix", "linux", "osx"], diff --git a/tests_interop/conftest.py b/tests_interop/conftest.py index 3fcc74f3..08df614c 100644 --- a/tests_interop/conftest.py +++ b/tests_interop/conftest.py @@ -151,8 +151,9 @@ class DaemonStream(ReadWriteCloser): async def close(self) -> None: self.writer.close() - if sys.version_info[0:2] > (3, 6): - await self.writer.wait_closed() + if sys.version_info < (3, 7): + return + await self.writer.wait_closed() async def read(self, n: int = -1) -> bytes: return await self.reader.read(n) diff --git a/tox.ini b/tox.ini index a482e70f..21f46435 100644 --- a/tox.ini +++ b/tox.ini @@ -3,8 +3,7 @@ # TODO: consider pypy3 support [tox] envlist = - py36-test - py37-test + py{36,37}-test py37-interop lint docs