mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-03-17 10:41:28 +00:00
Refactor floodsub.publish
Passed the first test of floodsub
This commit is contained in:
@ -45,7 +45,8 @@ class Pubsub:
|
||||
outgoing_messages: asyncio.Queue()
|
||||
seen_messages: LRU
|
||||
my_topics: Dict[str, asyncio.Queue]
|
||||
peer_topics: Dict[str, List[ID]]
|
||||
# FIXME: Should be changed to `Dict[str, List[ID]]`
|
||||
peer_topics: Dict[str, List[str]]
|
||||
# FIXME: Should be changed to `Dict[ID, INetStream]`
|
||||
peers: Dict[str, INetStream]
|
||||
# NOTE: Be sure it is increased atomically everytime.
|
||||
@ -320,23 +321,34 @@ class Pubsub:
|
||||
# Write message to stream
|
||||
await stream.write(rpc_msg)
|
||||
|
||||
def list_peers(self, topic_id: str) -> Tuple[ID]:
|
||||
def list_peers(self, topic_id: str) -> Tuple[ID, ...]:
|
||||
return
|
||||
|
||||
async def publish(self, topic_id: str, data: bytes) -> None:
|
||||
"""
|
||||
Publish data to a topic
|
||||
:param topic_id: topic which we are going to publish the data to
|
||||
:param data: data which we are publishing
|
||||
"""
|
||||
msg = rpc_pb2.Message(
|
||||
data=data,
|
||||
topicIDs=[topic_id],
|
||||
# Origin is myself.
|
||||
from_id=self.host.get_id().to_bytes(),
|
||||
seqno=self._next_seqno(),
|
||||
)
|
||||
|
||||
# TODO: Sign with our signing key
|
||||
|
||||
self.push_msg(self.host.get_id(), msg)
|
||||
await self.push_msg(self.host.get_id(), msg)
|
||||
|
||||
async def push_msg(self, src: ID, msg: rpc_pb2.Message):
|
||||
# TODO: - Check if the source is in the blacklist. If yes, reject.
|
||||
async def push_msg(self, src: ID, msg: rpc_pb2.Message) -> None:
|
||||
"""
|
||||
Push a pubsub message to others.
|
||||
:param src: the peer who forward us the message.
|
||||
:param msg: the message we are going to push out.
|
||||
"""
|
||||
# TODO: - Check if the `source` is in the blacklist. If yes, reject.
|
||||
|
||||
# TODO: - Check if the `from` is in the blacklist. If yes, reject.
|
||||
|
||||
@ -352,6 +364,9 @@ class Pubsub:
|
||||
await self.router.publish(src, msg)
|
||||
|
||||
def _next_seqno(self) -> bytes:
|
||||
"""
|
||||
Make the next message sequence id.
|
||||
"""
|
||||
self.counter += 1
|
||||
return self.counter.to_bytes(8, 'big')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user