lru cache seen_messages

This commit is contained in:
zixuanzh
2019-04-05 21:46:03 -04:00
parent 1f3aaded1d
commit d04798ce7c

View File

@ -1,4 +1,6 @@
# pylint: disable=no-name-in-module
import asyncio import asyncio
from lru import LRU
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
@ -7,7 +9,7 @@ from .pubsub_notifee import PubsubNotifee
class Pubsub(): class Pubsub():
# pylint: disable=too-many-instance-attributes, no-member # pylint: disable=too-many-instance-attributes, no-member
def __init__(self, host, router, my_id): def __init__(self, host, router, my_id, cache_size=None):
""" """
Construct a new Pubsub object, which is responsible for handling all Construct a new Pubsub object, which is responsible for handling all
Pubsub-related messages and relaying messages as appropriate to the Pubsub-related messages and relaying messages as appropriate to the
@ -37,8 +39,12 @@ class Pubsub():
self.incoming_msgs_from_peers = asyncio.Queue() self.incoming_msgs_from_peers = asyncio.Queue()
self.outgoing_messages = asyncio.Queue() self.outgoing_messages = asyncio.Queue()
# TODO: Make seen_messages a cache (LRU cache?) # keeps track of seen messages as LRU cache
self.seen_messages = [] if cache_size is None:
self.cache_size = 128
else:
self.cache_size = cache_size
self.seen_messages = LRU(self.cache_size)
# Map of topics we are subscribed to to handler functions # Map of topics we are subscribed to to handler functions
# for when the given topic receives a message # for when the given topic receives a message
@ -89,7 +95,7 @@ class Pubsub():
id_in_seen_msgs = (message.seqno, message.from_id) id_in_seen_msgs = (message.seqno, message.from_id)
if id_in_seen_msgs not in self.seen_messages: if id_in_seen_msgs not in self.seen_messages:
should_publish = True should_publish = True
self.seen_messages.append(id_in_seen_msgs) self.seen_messages[id_in_seen_msgs] = 1
await self.handle_talk(message) await self.handle_talk(message)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions: