mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
remove message.py
This commit is contained in:
@ -1,11 +1,9 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
|
||||
from .pb import rpc_pb2_grpc
|
||||
from .pb import rpc_pb2
|
||||
from .pubsub_notifee import PubsubNotifee
|
||||
from .message import MessageSub, MessageTalk
|
||||
from .message import create_message_talk, create_message_sub
|
||||
from. message import generate_message_id
|
||||
|
||||
|
||||
class Pubsub():
|
||||
@ -77,8 +75,7 @@ class Pubsub():
|
||||
async def continuously_read_stream(self, stream):
|
||||
"""
|
||||
Read from input stream in an infinite loop. Process
|
||||
messages from other nodes, which for now are considered MessageTalk
|
||||
and MessageSub messages.
|
||||
messages from other nodes
|
||||
:param stream: stream to continously read from
|
||||
"""
|
||||
|
||||
@ -94,13 +91,8 @@ class Pubsub():
|
||||
if rpc_incoming.publish:
|
||||
# deal with "talk messages"
|
||||
for msg in rpc_incoming.publish:
|
||||
old_format = MessageTalk(peer_id,
|
||||
msg.from_id,
|
||||
msg.topicIDs,
|
||||
msg.data,
|
||||
msg.seqno)
|
||||
self.seen_messages.append(msg.seqno)
|
||||
await self.handle_talk(old_format)
|
||||
await self.handle_talk(peer_id, msg)
|
||||
await self.router.publish(peer_id, msg)
|
||||
|
||||
if rpc_incoming.subscriptions:
|
||||
@ -189,21 +181,20 @@ class Pubsub():
|
||||
# TODO: Remove peer from topic
|
||||
pass
|
||||
|
||||
async def handle_talk(self, talk):
|
||||
async def handle_talk(self, peer_id, publish_message):
|
||||
"""
|
||||
Handle incoming Talk message from a peer. A Talk message contains some
|
||||
custom message that is published on a given topic(s)
|
||||
:param talk: raw data constituting a talk message
|
||||
"""
|
||||
msg = talk
|
||||
|
||||
# Check if this message has any topics that we are subscribed to
|
||||
for topic in msg.topics:
|
||||
for topic in publish_message.topicIDs:
|
||||
if topic in self.my_topics:
|
||||
# we are subscribed to a topic this message was sent for,
|
||||
# so add message to the subscription output queue
|
||||
# for each topic
|
||||
await self.my_topics[topic].put(talk)
|
||||
await self.my_topics[topic].put(publish_message)
|
||||
|
||||
async def subscribe(self, topic_id):
|
||||
"""
|
||||
@ -272,3 +263,10 @@ class Pubsub():
|
||||
|
||||
# Write message to stream
|
||||
await stream.write(rpc_msg)
|
||||
|
||||
def generate_message_id():
|
||||
"""
|
||||
Generate a unique message id
|
||||
:return: messgae id
|
||||
"""
|
||||
return str(uuid.uuid1())
|
||||
Reference in New Issue
Block a user