Add automatic docstring formatter and apply

This commit is contained in:
Dominik Muhs
2019-10-24 08:41:10 +02:00
parent 30aeb35122
commit eef505f2d9
74 changed files with 565 additions and 760 deletions

View File

@ -31,36 +31,35 @@ class FloodSub(IPubsubRouter):
return self.protocols
def attach(self, pubsub: Pubsub) -> None:
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
"""Attach is invoked by the PubSub constructor to attach the router to
a freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
self.pubsub = pubsub
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
"""
Notifies the router that a new peer has been connected
"""Notifies the router that a new peer has been connected.
:param peer_id: id of peer to add
"""
def remove_peer(self, peer_id: ID) -> None:
"""
Notifies the router that a peer has been disconnected
"""Notifies the router that a peer has been disconnected.
:param peer_id: id of peer to remove
"""
async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
"""Invoked to process control messages in the RPC envelope. It is
invoked after subscriptions and payload messages have been processed.
:param rpc: rpc message
"""
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
"""
Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens
"""Invoked to forward a new message that has been validated. This is
where the "flooding" part of floodsub happens.
With flooding, routing is almost trivial: for each incoming message,
forward to all known peers in the topic. There is a bit of logic,
@ -88,25 +87,24 @@ class FloodSub(IPubsubRouter):
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
async def join(self, topic: str) -> None:
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
"""Join notifies the router that we want to receive and forward
messages in a topic. It is invoked after the subscription announcement.
:param topic: topic to join
"""
async def leave(self, topic: str) -> None:
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
"""Leave notifies the router that we are no longer interested in a
topic. It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""
def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
"""Get the eligible peers to send the data to.
:param msg_forwarder: peer ID of the peer who forwards the message to us.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.

View File

@ -94,9 +94,9 @@ class GossipSub(IPubsubRouter):
return self.protocols
def attach(self, pubsub: Pubsub) -> None:
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
"""Attach is invoked by the PubSub constructor to attach the router to
a freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
self.pubsub = pubsub
@ -108,8 +108,8 @@ class GossipSub(IPubsubRouter):
asyncio.ensure_future(self.heartbeat())
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
"""
Notifies the router that a new peer has been connected
"""Notifies the router that a new peer has been connected.
:param peer_id: id of peer to add
:param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub
"""
@ -129,8 +129,8 @@ class GossipSub(IPubsubRouter):
self.peers_to_protocol[peer_id] = protocol_id
def remove_peer(self, peer_id: ID) -> None:
"""
Notifies the router that a peer has been disconnected
"""Notifies the router that a peer has been disconnected.
:param peer_id: id of peer to remove
"""
logger.debug("removing peer %s", peer_id)
@ -144,9 +144,9 @@ class GossipSub(IPubsubRouter):
del self.peers_to_protocol[peer_id]
async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
"""Invoked to process control messages in the RPC envelope. It is
invoked after subscriptions and payload messages have been processed.
:param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
"""
@ -167,9 +167,7 @@ class GossipSub(IPubsubRouter):
await self.handle_prune(prune, sender_peer_id)
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
"""
Invoked to forward a new message that has been validated.
"""
"""Invoked to forward a new message that has been validated."""
self.mcache.put(pubsub_msg)
peers_gen = self._get_peers_to_send(
@ -191,8 +189,8 @@ class GossipSub(IPubsubRouter):
def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
"""Get the eligible peers to send the data to.
:param msg_forwarder: the peer id of the peer who forwards the message to me.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
@ -233,10 +231,9 @@ class GossipSub(IPubsubRouter):
async def join(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
"""Join notifies the router that we want to receive and forward
messages in a topic. It is invoked after the subscription announcement.
:param topic: topic to join
"""
logger.debug("joining topic %s", topic)
@ -271,9 +268,9 @@ class GossipSub(IPubsubRouter):
async def leave(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
"""Leave notifies the router that we are no longer interested in a
topic. It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""
logger.debug("leaving topic %s", topic)
@ -289,8 +286,8 @@ class GossipSub(IPubsubRouter):
# Heartbeat
async def heartbeat(self) -> None:
"""
Call individual heartbeats.
"""Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the
state changes in the preceding heartbeat
"""
@ -453,9 +450,8 @@ class GossipSub(IPubsubRouter):
async def handle_ihave(
self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID
) -> None:
"""
Checks the seen set and requests unknown messages with an IWANT message.
"""
"""Checks the seen set and requests unknown messages with an IWANT
message."""
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
seen_seqnos_and_peers = [
seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys()
@ -477,9 +473,8 @@ class GossipSub(IPubsubRouter):
async def handle_iwant(
self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID
) -> None:
"""
Forwards all request messages that are present in mcache to the requesting peer.
"""
"""Forwards all request messages that are present in mcache to the
requesting peer."""
# FIXME: Update type of message ID
# FIXME: Find a better way to parse the msg ids
msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
@ -536,9 +531,7 @@ class GossipSub(IPubsubRouter):
# RPC emitters
async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None:
"""
Emit ihave message, sent to to_peer, for topic and msg_ids
"""
"""Emit ihave message, sent to to_peer, for topic and msg_ids."""
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(msg_ids)
@ -550,9 +543,7 @@ class GossipSub(IPubsubRouter):
await self.emit_control_message(control_msg, to_peer)
async def emit_iwant(self, msg_ids: Any, to_peer: ID) -> None:
"""
Emit iwant message, sent to to_peer, for msg_ids
"""
"""Emit iwant message, sent to to_peer, for msg_ids."""
iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant()
iwant_msg.messageIDs.extend(msg_ids)
@ -563,9 +554,7 @@ class GossipSub(IPubsubRouter):
await self.emit_control_message(control_msg, to_peer)
async def emit_graft(self, topic: str, to_peer: ID) -> None:
"""
Emit graft message, sent to to_peer, for topic
"""
"""Emit graft message, sent to to_peer, for topic."""
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
graft_msg.topicID = topic
@ -576,9 +565,7 @@ class GossipSub(IPubsubRouter):
await self.emit_control_message(control_msg, to_peer)
async def emit_prune(self, topic: str, to_peer: ID) -> None:
"""
Emit graft message, sent to to_peer, for topic
"""
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic

View File

@ -13,8 +13,8 @@ class CacheEntry:
"""
def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None:
"""
Constructor.
"""Constructor.
:param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on
"""
@ -32,8 +32,8 @@ class MessageCache:
history: List[List[CacheEntry]]
def __init__(self, window_size: int, history_size: int) -> None:
"""
Constructor.
"""Constructor.
:param window_size: Size of the window desired.
:param history_size: Size of the history desired.
:return: the MessageCache
@ -49,8 +49,8 @@ class MessageCache:
self.history = [[] for _ in range(history_size)]
def put(self, msg: rpc_pb2.Message) -> None:
"""
Put a message into the mcache.
"""Put a message into the mcache.
:param msg: The rpc message to put in. Should contain seqno and from_id
"""
mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id)
@ -59,8 +59,8 @@ class MessageCache:
self.history[0].append(CacheEntry(mid, msg.topicIDs))
def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]:
"""
Get a message from the mcache.
"""Get a message from the mcache.
:param mid: (seqno, from_id) of the message to get.
:return: The rpc message associated with this mid
"""
@ -70,8 +70,8 @@ class MessageCache:
return None
def window(self, topic: str) -> List[Tuple[bytes, bytes]]:
"""
Get the window for this topic.
"""Get the window for this topic.
:param topic: Topic whose message ids we desire.
:return: List of mids in the current window.
"""
@ -86,9 +86,8 @@ class MessageCache:
return mids
def shift(self) -> None:
"""
Shift the window over by 1 position, dropping the last element of the history.
"""
"""Shift the window over by 1 position, dropping the last element of
the history."""
last_entries: List[CacheEntry] = self.history[len(self.history) - 1]
for entry in last_entries:

View File

@ -81,12 +81,14 @@ class Pubsub:
def __init__(
self, host: IHost, router: "IPubsubRouter", my_id: ID, cache_size: int = None
) -> None:
"""
Construct a new Pubsub object, which is responsible for handling all
"""Construct a new Pubsub object, which is responsible for handling all
Pubsub-related messages and relaying messages as appropriate to the
Pubsub router (which is responsible for choosing who to send messages to).
Pubsub router (which is responsible for choosing who to send messages
to).
Since the logic for choosing peers to send pubsub messages to is
in the router, the same Pubsub impl can back floodsub, gossipsub, etc.
in the router, the same Pubsub impl can back floodsub,
gossipsub, etc.
"""
self.host = host
self.router = router
@ -136,10 +138,8 @@ class Pubsub:
asyncio.ensure_future(self.handle_peer_queue())
def get_hello_packet(self) -> rpc_pb2.RPC:
"""
Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics
"""
"""Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics."""
packet = rpc_pb2.RPC()
for topic_id in self.my_topics:
packet.subscriptions.extend(
@ -148,9 +148,9 @@ class Pubsub:
return packet
async def continuously_read_stream(self, stream: INetStream) -> None:
"""
Read from input stream in an infinite loop. Process
messages from other nodes
"""Read from input stream in an infinite loop. Process messages from
other nodes.
:param stream: stream to continously read from
"""
peer_id = stream.muxed_conn.peer_id
@ -207,8 +207,9 @@ class Pubsub:
def set_topic_validator(
self, topic: str, validator: ValidatorFn, is_async_validator: bool
) -> None:
"""
Register a validator under the given topic. One topic can only have one validtor.
"""Register a validator under the given topic. One topic can only have
one validtor.
:param topic: the topic to register validator under
:param validator: the validator used to validate messages published to the topic
:param is_async_validator: indicate if the validator is an asynchronous validator
@ -216,16 +217,16 @@ class Pubsub:
self.topic_validators[topic] = TopicValidator(validator, is_async_validator)
def remove_topic_validator(self, topic: str) -> None:
"""
Remove the validator from the given topic.
"""Remove the validator from the given topic.
:param topic: the topic to remove validator from
"""
if topic in self.topic_validators:
del self.topic_validators[topic]
def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]:
"""
Get all validators corresponding to the topics in the message.
"""Get all validators corresponding to the topics in the message.
:param msg: the message published to the topic
"""
return tuple(
@ -235,9 +236,9 @@ class Pubsub:
)
async def stream_handler(self, stream: INetStream) -> None:
"""
Stream handler for pubsub. Gets invoked whenever a new stream is created
on one of the supported pubsub protocols.
"""Stream handler for pubsub. Gets invoked whenever a new stream is
created on one of the supported pubsub protocols.
:param stream: newly created stream
"""
try:
@ -290,10 +291,10 @@ class Pubsub:
def handle_subscription(
self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts
) -> None:
"""
Handle an incoming subscription message from a peer. Update internal
"""Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message
defined in the subscription message.
:param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts
"""
@ -310,8 +311,8 @@ class Pubsub:
# FIXME(mhchia): Change the function name?
async def handle_talk(self, publish_message: rpc_pb2.Message) -> None:
"""
Put incoming message from a peer onto my blocking queue
"""Put incoming message from a peer onto my blocking queue.
:param publish_message: RPC.Message format
"""
@ -324,8 +325,8 @@ class Pubsub:
await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id: str) -> "asyncio.Queue[rpc_pb2.Message]":
"""
Subscribe ourself to a topic
"""Subscribe ourself to a topic.
:param topic_id: topic_id to subscribe to
"""
@ -354,8 +355,8 @@ class Pubsub:
return self.my_topics[topic_id]
async def unsubscribe(self, topic_id: str) -> None:
"""
Unsubscribe ourself from a topic
"""Unsubscribe ourself from a topic.
:param topic_id: topic_id to unsubscribe from
"""
@ -380,8 +381,8 @@ class Pubsub:
await self.router.leave(topic_id)
async def message_all_peers(self, raw_msg: bytes) -> None:
"""
Broadcast a message to peers
"""Broadcast a message to peers.
:param raw_msg: raw contents of the message to broadcast
"""
@ -391,8 +392,8 @@ class Pubsub:
await stream.write(encode_varint_prefixed(raw_msg))
async def publish(self, topic_id: str, data: bytes) -> None:
"""
Publish data to a topic
"""Publish data to a topic.
:param topic_id: topic which we are going to publish the data to
:param data: data which we are publishing
"""
@ -411,8 +412,8 @@ class Pubsub:
logger.debug("successfully published message %s", msg)
async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
"""
Validate the received message
"""Validate the received message.
:param msg_forwarder: the peer who forward us the message.
:param msg: the message.
"""
@ -440,8 +441,8 @@ class Pubsub:
raise ValidationError(f"Validation failed for msg={msg}")
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
"""
Push a pubsub message to others.
"""Push a pubsub message to others.
:param msg_forwarder: the peer who forward us the message.
:param msg: the message we are going to push out.
"""
@ -481,9 +482,7 @@ class Pubsub:
await self.router.publish(msg_forwarder, msg)
def _next_seqno(self) -> bytes:
"""
Make the next message sequence id.
"""
"""Make the next message sequence id."""
self.counter += 1
return self.counter.to_bytes(8, "big")

View File

@ -30,9 +30,10 @@ class PubsubNotifee(INotifee):
pass
async def connected(self, network: INetwork, conn: INetConn) -> None:
"""
Add peer_id to initiator_peers_queue, so that this peer_id can be used to
create a stream and we only want to have one pubsub stream with each peer.
"""Add peer_id to initiator_peers_queue, so that this peer_id can be
used to create a stream and we only want to have one pubsub stream with
each peer.
:param network: network the connection was opened on
:param conn: connection that was opened
"""

View File

@ -19,23 +19,23 @@ class IPubsubRouter(ABC):
@abstractmethod
def attach(self, pubsub: "Pubsub") -> None:
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
"""Attach is invoked by the PubSub constructor to attach the router to
a freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
@abstractmethod
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
"""
Notifies the router that a new peer has been connected
"""Notifies the router that a new peer has been connected.
:param peer_id: id of peer to add
"""
@abstractmethod
def remove_peer(self, peer_id: ID) -> None:
"""
Notifies the router that a peer has been disconnected
"""Notifies the router that a peer has been disconnected.
:param peer_id: id of peer to remove
"""
@ -53,25 +53,24 @@ class IPubsubRouter(ABC):
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
"""
Invoked to forward a new message that has been validated
"""Invoked to forward a new message that has been validated.
:param msg_forwarder: peer_id of message sender
:param pubsub_msg: pubsub message to forward
"""
@abstractmethod
async def join(self, topic: str) -> None:
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
"""Join notifies the router that we want to receive and forward
messages in a topic. It is invoked after the subscription announcement.
:param topic: topic to join
"""
@abstractmethod
async def leave(self, topic: str) -> None:
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
"""Leave notifies the router that we are no longer interested in a
topic. It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""

View File

@ -1,7 +1,7 @@
# FIXME: Replace the type of `pubkey` with a custom type `Pubkey`
def signature_validator(pubkey: bytes, msg: bytes) -> bool:
"""
Verify the message against the given public key.
"""Verify the message against the given public key.
:param pubkey: the public key which signs the message.
:param msg: the message signed.
"""