From f02d38c0ee5e532261dcfdcabe770e04928cf517 Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 29 Jul 2019 12:09:35 +0800 Subject: [PATCH] Reflect PR feedback * Rename `src` to `msg_forwarder` in pubsub/floodsub/gossipsub * Rename Variables * Sort imports * Clean up --- libp2p/pubsub/floodsub.py | 12 ++++----- libp2p/pubsub/gossipsub.py | 20 +++++++------- libp2p/pubsub/pubsub.py | 8 +++--- libp2p/pubsub/pubsub_router_interface.py | 4 +-- tests/pubsub/configs.py | 2 +- tests/pubsub/dummy_account_node.py | 7 ++--- .../floodsub_integration_test_settings.py | 26 +++++++++---------- tests/pubsub/test_dummyaccount_demo.py | 8 +----- tests/pubsub/test_gossipsub.py | 5 ++-- tests/pubsub/test_mcache.py | 5 +++- tests/pubsub/test_subscription.py | 5 +++- tests/pubsub/utils.py | 7 ++--- 12 files changed, 54 insertions(+), 55 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 162ce30a..04040213 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -51,7 +51,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, src: ID, pubsub_msg: rpc_pb2.Message) -> None: + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -62,13 +62,13 @@ class FloodSub(IPubsubRouter): so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. - :param src: peer ID of the peer who forwards the message to us + :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf. """ peers_gen = self._get_peers_to_send( pubsub_msg.topicIDs, - src=src, + msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id), ) rpc_msg = rpc_pb2.RPC( @@ -98,11 +98,11 @@ class FloodSub(IPubsubRouter): def _get_peers_to_send( self, topic_ids: Iterable[str], - src: ID, + msg_forwarder: ID, origin: ID) -> Iterable[ID]: """ Get the eligible peers to send the data to. - :param src: peer ID of the peer who forwards the message to us. + :param msg_forwarder: peer ID of the peer who forwards the message to us. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ @@ -111,7 +111,7 @@ class FloodSub(IPubsubRouter): continue for peer_id_str in self.pubsub.peer_topics[topic]: peer_id = id_b58_decode(peer_id_str) - if peer_id in (src, origin): + if peer_id in (msg_forwarder, origin): continue # FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...] if str(peer_id) not in self.pubsub.peers: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 20e21a9e..0af5e5ff 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -119,7 +119,7 @@ class GossipSub(IPubsubRouter): for prune in control_message.prune: await self.handle_prune(prune, sender_peer_id) - async def publish(self, src: ID, pubsub_msg: rpc_pb2.Message) -> None: + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # pylint: disable=too-many-locals """ Invoked to forward a new message that has been validated. @@ -128,7 +128,7 @@ class GossipSub(IPubsubRouter): peers_gen = self._get_peers_to_send( pubsub_msg.topicIDs, - src=src, + msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id), ) rpc_msg = rpc_pb2.RPC( @@ -144,11 +144,11 @@ class GossipSub(IPubsubRouter): def _get_peers_to_send( self, topic_ids: Iterable[str], - src: ID, + msg_forwarder: ID, origin: ID) -> Iterable[ID]: """ Get the eligible peers to send the data to. - :param src: the peer id of the peer who forwards the message to me. + :param msg_forwarder: the peer id of the peer who forwards the message to me. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ @@ -167,10 +167,10 @@ class GossipSub(IPubsubRouter): # gossipsub peers # FIXME: Change `str` to `ID` - gossipsub_peers: List[str] = None + in_topic_gossipsub_peers: List[str] = None # TODO: Do we need to check `topic in self.pubsub.my_topics`? if topic in self.mesh: - gossipsub_peers = self.mesh[topic] + in_topic_gossipsub_peers = self.mesh[topic] else: # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not # subscribed? @@ -185,11 +185,11 @@ class GossipSub(IPubsubRouter): self.degree, [], ) - gossipsub_peers = self.fanout[topic] - for peer_id_str in gossipsub_peers: + in_topic_gossipsub_peers = self.fanout[topic] + for peer_id_str in in_topic_gossipsub_peers: send_to.add(id_b58_decode(peer_id_str)) - # Excludes `src` and `origin` - yield from send_to.difference([src, origin]) + # Excludes `msg_forwarder` and `origin` + yield from send_to.difference([msg_forwarder, origin]) async def join(self, topic): # Note: the comments here are the near-exact algorithm description from the spec diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7711b080..2af4e245 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -142,7 +142,7 @@ class Pubsub: continue # TODO(mhchia): This will block this read_stream loop until all data are pushed. # Should investigate further if this is an issue. - await self.push_msg(src=peer_id, msg=msg) + await self.push_msg(msg_forwarder=peer_id, msg=msg) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -331,10 +331,10 @@ class Pubsub: await self.push_msg(self.host.get_id(), msg) - async def push_msg(self, src: ID, msg: rpc_pb2.Message) -> None: + async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ Push a pubsub message to others. - :param src: the peer who forward us the message. + :param msg_forwarder: the peer who forward us the message. :param msg: the message we are going to push out. """ # TODO: - Check if the `source` is in the blacklist. If yes, reject. @@ -350,7 +350,7 @@ class Pubsub: self._mark_msg_seen(msg) await self.handle_talk(msg) - await self.router.publish(src, msg) + await self.router.publish(msg_forwarder, msg) def _next_seqno(self) -> bytes: """ diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 6f69b9b0..8819e5f0 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -42,10 +42,10 @@ class IPubsubRouter(ABC): """ @abstractmethod - async def publish(self, src, pubsub_msg) -> None: + async def publish(self, msg_forwarder, pubsub_msg): """ Invoked to forward a new message that has been validated - :param src: peer_id of message sender + :param msg_forwarder: peer_id of message sender :param pubsub_msg: pubsub message to forward """ diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index d6849f7c..99295e47 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -2,6 +2,6 @@ import multiaddr FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" -SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] +GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index f0a95f87..42d17ca6 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -5,13 +5,14 @@ import multiaddr from libp2p import new_node from libp2p.host.host_interface import IHost -from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pubsub import Pubsub -from .utils import message_id_generator, generate_RPC_packet +from .configs import FLOODSUB_PROTOCOL_ID +from .utils import message_id_generator -SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] CRYPTO_TOPIC = "ethereum" # Message format: diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py index eac62baf..77d96a71 100644 --- a/tests/pubsub/floodsub_integration_test_settings.py +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -2,8 +2,6 @@ import asyncio import pytest -import multiaddr - from libp2p import new_node from libp2p.peer.id import ID from libp2p.pubsub.pubsub import Pubsub @@ -13,8 +11,12 @@ from tests.utils import ( connect, ) +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) + -FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] FLOODSUB_PROTOCOL_TEST_CASES = [ @@ -349,7 +351,6 @@ async def perform_test_from_obj(obj, router_factory): or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior is undefined (even if it may work) """ - listen_maddr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") # Step 1) Create graph adj_list = obj["adj_list"] @@ -357,8 +358,8 @@ async def perform_test_from_obj(obj, router_factory): pubsub_map = {} async def add_node(node_id: str) -> None: - node = await new_node(transport_opt=[str(listen_maddr)]) - await node.get_network().listen(listen_maddr) + node = await new_node(transport_opt=[str(LISTEN_MADDR)]) + await node.get_network().listen(LISTEN_MADDR) node_map[node_id] = node pubsub_router = router_factory(protocols=obj["supported_protocols"]) pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) @@ -417,7 +418,7 @@ async def perform_test_from_obj(obj, router_factory): node_id = msg["node_id"] # Publish message - # FIXME: Should be single RPC package with several topics + # TODO: Should be single RPC package with several topics for topic in topics: tasks_publish.append( pubsub_map[node_id].publish( @@ -426,23 +427,22 @@ async def perform_test_from_obj(obj, router_factory): ) ) - # For each topic in topics, add topic, msg_talk tuple to ordered test list - # TODO: Update message sender to be correct message sender before - # adding msg_talk to this list + # For each topic in topics, add (topic, node_id, data) tuple to ordered test list for topic in topics: - topics_in_msgs_ordered.append((topic, data)) + topics_in_msgs_ordered.append((topic, node_id, data)) # Allow time for publishing before continuing await asyncio.gather(*tasks_publish, asyncio.sleep(2)) # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for topic, data in topics_in_msgs_ordered: + for topic, origin_node_id, data in topics_in_msgs_ordered: # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue msg = await queues_map[node_id][topic].get() assert data == msg.data + # Check the message origin + assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id # Success, terminate pending tasks. await cleanup() diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index b1c4a64a..1efa579f 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -1,19 +1,13 @@ import asyncio from threading import Thread -import multiaddr - import pytest -from libp2p import new_node -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.pubsub.pubsub import Pubsub -from libp2p.pubsub.floodsub import FloodSub - from tests.utils import ( cleanup, connect, ) + from .dummy_account_node import DummyAccountNode # pylint: disable=too-many-locals diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 7f4c81ca..e0fed92d 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -8,6 +8,7 @@ from tests.utils import ( connect, ) +from .configs import GOSSIPSUB_PROTOCOL_ID from .utils import ( create_libp2p_hosts, create_pubsub_and_gossipsub_instances, @@ -16,7 +17,7 @@ from .utils import ( ) -SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] +SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID] @pytest.mark.asyncio @@ -51,7 +52,7 @@ async def test_join(): # Central node publish to the topic so that this topic # is added to central node's fanout # publish from the randomly chosen host - await pubsubs[central_node_index].publish(topic, b"") + await pubsubs[central_node_index].publish(topic, b"data") # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout diff --git a/tests/pubsub/test_mcache.py b/tests/pubsub/test_mcache.py index 5446f74c..0e73222c 100644 --- a/tests/pubsub/test_mcache.py +++ b/tests/pubsub/test_mcache.py @@ -1,14 +1,17 @@ import pytest + from libp2p.pubsub.mcache import MessageCache class Msg: def __init__(self, topicIDs, seqno, from_id): + # pylint: disable=invalid-name self.topicIDs = topicIDs - self.seqno = seqno, + self.seqno = seqno self.from_id = from_id + @pytest.mark.asyncio async def test_mcache(): # Ported from: diff --git a/tests/pubsub/test_subscription.py b/tests/pubsub/test_subscription.py index c8f46f56..0dfdc4dd 100644 --- a/tests/pubsub/test_subscription.py +++ b/tests/pubsub/test_subscription.py @@ -5,7 +5,10 @@ from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +from .configs import FLOODSUB_PROTOCOL_ID + + +SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] TESTING_TOPIC = "TEST_SUBSCRIBE" diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index 9bfc57e2..93fc2356 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,19 +1,16 @@ import asyncio -import random import struct from typing import ( Sequence, ) -import uuid import multiaddr from libp2p import new_node -from libp2p.pubsub.pb import rpc_pb2 -from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.id import ID -from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.pubsub import Pubsub from tests.utils import connect