From b528c211b97f78db2ae55df1ac6e2fb04c6a9621 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 24 Jul 2019 14:35:14 +0800 Subject: [PATCH] Temp modified publish --- libp2p/pubsub/floodsub.py | 13 +++++++++---- libp2p/pubsub/pubsub.py | 7 +++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index c0562141..659b6728 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -46,7 +46,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id: ID, rpc_message: rpc_pb2.Message) -> None: + async def publish(self, from_peer: ID, pubsub_message: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -60,9 +60,14 @@ class FloodSub(IPubsubRouter): :param sender_peer_id: peer_id of message sender :param rpc_message: pubsub message in RPC string format """ - packet = rpc_pb2.RPC() - packet.ParseFromString(rpc_message) - msg_sender = str(sender_peer_id) + # packet = rpc_pb2.RPC() + # packet.ParseFromString(rpc_message) + + from_peer_str = str(from_peer) + for topic in pubsub_message.topicIDs: + if topic not in self.pubsub.topics: + continue + peers = self.pubsub.peer_topics[topic] # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message for message in packet.publish: diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f8390fb..cd2345f2 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -341,16 +341,23 @@ class Pubsub: from_id=self.host.get_id().to_bytes(), seqno=self._next_seqno(), ) + # TODO: Sign with our signing key + self.push_msg(self.host.get_id(), msg) async def push_msg(self, src: ID, msg: rpc_pb2.Message): # TODO: - Check if the source is in the blacklist. If yes, reject. + # TODO: - Check if the `from` is in the blacklist. If yes, reject. + # TODO: - Check if signing is required and if so signature should be attached. + if self._is_msg_seen(msg): return + # TODO: - Validate the message. If failed, reject it. + self._mark_msg_seen(msg) await self.handle_talk(msg) await self.router.publish(src, msg)