Files
py-libp2p/examples/pubsub/pubsub.py

286 lines
10 KiB
Python

import argparse
import logging
import base58
import multiaddr
import trio
from libp2p import (
new_host,
)
from libp2p.crypto.rsa import (
create_new_key_pair,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.pubsub.gossipsub import (
GossipSub,
)
from libp2p.pubsub.pubsub import (
Pubsub,
)
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
Mplex,
)
from libp2p.tools.async_service.trio_service import (
background_trio_service,
)
from libp2p.utils.address_validation import (
find_free_port,
)
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set default to DEBUG for more verbose output
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pubsub-demo")
CHAT_TOPIC = "pubsub-chat"
GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
# Generate a key pair for the node
key_pair = create_new_key_pair()
async def receive_loop(subscription, termination_event):
logger.debug("Starting receive loop")
while not termination_event.is_set():
try:
message = await subscription.get()
logger.info(f"From peer: {base58.b58encode(message.from_id).decode()}")
print(f"Received message: {message.data.decode('utf-8')}")
except Exception:
logger.exception("Error in receive loop")
await trio.sleep(1)
async def publish_loop(pubsub, topic, termination_event):
"""Continuously read input from user and publish to the topic."""
logger.debug("Starting publish loop...")
print("Type messages to send (press Enter to send):")
while not termination_event.is_set():
try:
# Use trio's run_sync_in_worker_thread to avoid blocking the event loop
message = await trio.to_thread.run_sync(input)
if message.lower() == "quit":
termination_event.set() # Signal termination
break
if message:
logger.debug(f"Publishing message: {message}")
await pubsub.publish(topic, message.encode())
print(f"Published: {message}")
except Exception:
logger.exception("Error in publish loop")
await trio.sleep(1) # Avoid tight loop on error
async def monitor_peer_topics(pubsub, nursery, termination_event):
"""
Monitor for new topics that peers are subscribed to and
automatically subscribe the server to those topics.
"""
# Keep track of topics we've already subscribed to
subscribed_topics = set()
while not termination_event.is_set():
# Check for new topics in peer_topics
for topic in pubsub.peer_topics.keys():
if topic not in subscribed_topics:
logger.info(f"Auto-subscribing to new topic: {topic}")
subscription = await pubsub.subscribe(topic)
subscribed_topics.add(topic)
# Start a receive loop for this topic
nursery.start_soon(receive_loop, subscription, termination_event)
# Check every 2 seconds for new topics
await trio.sleep(2)
async def run(topic: str, destination: str | None, port: int | None) -> None:
# Initialize network settings
localhost_ip = "127.0.0.1"
if port is None or port == 0:
port = find_free_port()
logger.info(f"Using random available port: {port}")
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Create a new libp2p host
host = new_host(
key_pair=key_pair,
muxer_opt={MPLEX_PROTOCOL_ID: Mplex},
)
# Log available protocols
logger.debug(f"Host ID: {host.get_id()}")
logger.debug(
f"Host multiselect protocols: "
f"{host.get_mux().get_protocols() if hasattr(host, 'get_mux') else 'N/A'}"
)
# Create and start gossipsub with optimized parameters for testing
gossipsub = GossipSub(
protocols=[GOSSIPSUB_PROTOCOL_ID],
degree=3, # Number of peers to maintain in mesh
degree_low=2, # Lower bound for mesh peers
degree_high=4, # Upper bound for mesh peers
direct_peers=None, # Direct peers
time_to_live=60, # TTL for message cache in seconds
gossip_window=2, # Smaller window for faster gossip
gossip_history=5, # Keep more history
heartbeat_initial_delay=2.0, # Start heartbeats sooner
heartbeat_interval=5, # More frequent heartbeats for testing
)
pubsub = Pubsub(host, gossipsub)
termination_event = trio.Event() # Event to signal termination
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
logger.info(f"Node started with peer ID: {host.get_id()}")
logger.info(f"Listening on: {listen_addr}")
logger.info("Initializing PubSub and GossipSub...")
async with background_trio_service(pubsub):
async with background_trio_service(gossipsub):
logger.info("Pubsub and GossipSub services started.")
await pubsub.wait_until_ready()
logger.info("Pubsub ready.")
# Subscribe to the topic
subscription = await pubsub.subscribe(topic)
logger.info(f"Subscribed to topic: {topic}")
if not destination:
# Server mode
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
)
logger.info("Waiting for peers...")
# Start topic monitoring to auto-subscribe to client topics
nursery.start_soon(
monitor_peer_topics, pubsub, nursery, termination_event
)
# Start message publish and receive loops
nursery.start_soon(receive_loop, subscription, termination_event)
nursery.start_soon(publish_loop, pubsub, topic, termination_event)
else:
# Client mode
maddr = multiaddr.Multiaddr(destination)
protocols_in_maddr = maddr.protocols()
info = info_from_p2p_addr(maddr)
logger.debug(f"Multiaddr protocols: {protocols_in_maddr}")
logger.info(
f"Connecting to peer: {info.peer_id} "
f"using protocols: {protocols_in_maddr}"
)
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
)
try:
await host.connect(info)
logger.info(f"Connected to peer: {info.peer_id}")
if logger.isEnabledFor(logging.DEBUG):
await trio.sleep(1)
logger.debug(
f"After connection, pubsub.peers: {pubsub.peers}"
)
peer_protocols = [
gossipsub.peer_protocol.get(p)
for p in pubsub.peers.keys()
]
logger.debug(f"Peer protocols: {peer_protocols}")
# Start the loops
nursery.start_soon(
receive_loop, subscription, termination_event
)
nursery.start_soon(
publish_loop, pubsub, topic, termination_event
)
except Exception:
logger.exception(f"Failed to connect to peer: {info.peer_id}")
return
await termination_event.wait() # Wait for termination signal
# Ensure all tasks are completed before exiting
nursery.cancel_scope.cancel()
print("Application shutdown complete") # Print shutdown message
def main() -> None:
description = """
This program demonstrates a pubsub p2p chat application using libp2p with
the gossipsub protocol as the pubsub router.
To use it, first run 'python pubsub.py -p <PORT> -t <TOPIC>',
where <PORT> is the port number,
and <TOPIC> is the name of the topic you want to subscribe to.
Then, run another instance with 'python pubsub.py -p <ANOTHER_PORT> -t <TOPIC>
-d <DESTINATION>', where <DESTINATION> is the multiaddress of the previous
listener host. Messages typed in either terminal will be received by all peers
subscribed to the same topic.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-t",
"--topic",
type=str,
help="topic name to subscribe",
default=CHAT_TOPIC,
)
parser.add_argument(
"-d",
"--destination",
type=str,
help="Address of peer to connect to",
default=None,
)
parser.add_argument(
"-p",
"--port",
type=int,
help="Port to listen on",
default=None,
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Enable debug logging",
)
args = parser.parse_args()
# Set debug level if verbose flag is provided
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.debug("Debug logging enabled")
logger.info("Running pubsub chat example...")
logger.info(f"Your selected topic is: {args.topic}")
try:
trio.run(run, *(args.topic, args.destination, args.port))
except KeyboardInterrupt:
logger.info("Application terminated by user")
if __name__ == "__main__":
main()