Pubsub example for py-libp2p (#515)

* Initial setup for pubsup

* Created node and trying to setup gossipsub

* Fix: Use pubsub object for publishing messages instead of gossipsub

* Correct help message for port argument.

* Fix: Used pubsub object instead of gossipsub object on Client side

* Fix: handle_new_peer method of pubsub is used to connect to new peers.

* used for host.connect to connect to peers

* Corrected script for connecting to other peers.

* message receiving function created

* message publishing function created

* Refactored the code for improved clarity and maintainability.

* fix: make publish loop input non-blocking to prevent event loop blocking

* refactored the code for better user experience while publishing message

* corrected the name of protocol

* Fix: Correct the implementation of the port argument

* Added pubsub initialization

* added logging

* pubsub instance is running

* Enhance publish loop with user prompts and error handling

* Connection monitoring added

* Add key pair generation and security options to pubsub host initialization

* Refactor pubsub logging and corrected gossipsub protocol id

* Started gossipsub service

* Add dynamic port assignment

* Refactor pubsub example for CI

* feat: monitor_peer_topics function added

* Noise protocol added

* refactor: default port set to none and some logging changes.

* refactor: Add graceful shutdown with termination events

- Replace infinite loops with termination events
- Add proper shutdown handling for all loops
- Implement clean resource cleanup on exit
- Add shutdown message for better user feedback
- Update signal handling for graceful termination

* Changed import path for factories file.
- to align import statement with changes from PR 543

* Added News Fragment

* Added pub-sub demo to the console_scripts section in setup.py

* Added pubsub example to Documentation

* Fix formatting and path in PubSub documentation example

* Added pubsub example in toctree

* Added tests for pubsub example

* updated the description of pubsub example

* corrected the name of pubsub docs file

* Remove unused imports and security options from pubsub example

* Update script usage instructions in pubsub example

* Enhanced compatibility for python 3.9

* Corrected console output
This commit is contained in:
Sumanjeet
2025-04-07 02:08:14 +05:30
committed by GitHub
parent 7793e322dc
commit 346a0a14db
6 changed files with 422 additions and 3 deletions

64
docs/examples.pubsub.rst Normal file
View File

@ -0,0 +1,64 @@
PubSub Chat Demo
================
This example demonstrates how to create a chat application using libp2p's PubSub implementation with the GossipSub protocol.
.. code-block:: console
$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ pubsub-demo
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-06 23:59:17,472 - pubsub-demo - INFO - Using random available port: 33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Node started with peer ID: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Listening on: /ip4/0.0.0.0/tcp/33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub ready.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Waiting for peers...
Type messages to send (press Enter to send):
Copy the line that starts with ``pubsub-demo -d``, open a new terminal and paste it in:
.. code-block:: console
$ pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-07 00:00:59,845 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Using random available port: 51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Node started with peer ID: QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Listening on: /ip4/0.0.0.0/tcp/51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Pubsub ready.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Connecting to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7 using protocols: MultiAddrKeys(<Multiaddr /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7>)
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/51977/p2p/QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,881 - pubsub-demo - INFO - Connected to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
Type messages to send (press Enter to send):
You can then start typing messages in either terminal and see them relayed to the other terminal. The messages will be distributed using the GossipSub protocol to all peers subscribed to the same topic. To exit the demo, type "quit" or send a keyboard interrupt (``Ctrl+C``) in either terminal.
Command Line Options
--------------------
- ``-t, --topic``: Specify the topic name to subscribe to (default: "pubsub-chat")
- ``-d, --destination``: Address of peer to connect to
- ``-p, --port``: Port to listen on (default: random available port)
- ``-v, --verbose``: Enable debug logging
The full source code for this example is below:
.. literalinclude:: ../examples/pubsub/pubsub.py
:language: python
:linenos:

View File

@ -9,3 +9,4 @@ Examples
examples.chat
examples.echo
examples.ping
examples.pubsub

289
examples/pubsub/pubsub.py Normal file
View File

@ -0,0 +1,289 @@
import argparse
import logging
import socket
from typing import (
Optional,
)
import base58
import multiaddr
import trio
from libp2p import (
new_host,
)
from libp2p.crypto.rsa import (
create_new_key_pair,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.pubsub.gossipsub import (
GossipSub,
)
from libp2p.pubsub.pubsub import (
Pubsub,
)
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
Mplex,
)
from libp2p.tools.async_service.trio_service import (
background_trio_service,
)
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set default to DEBUG for more verbose output
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pubsub-demo")
CHAT_TOPIC = "pubsub-chat"
GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
# Generate a key pair for the node
key_pair = create_new_key_pair()
async def receive_loop(subscription, termination_event):
logger.debug("Starting receive loop")
while not termination_event.is_set():
try:
message = await subscription.get()
logger.info(f"From peer: {base58.b58encode(message.from_id).decode()}")
print(f"Received message: {message.data.decode('utf-8')}")
except Exception:
logger.exception("Error in receive loop")
await trio.sleep(1)
async def publish_loop(pubsub, topic, termination_event):
"""Continuously read input from user and publish to the topic."""
logger.debug("Starting publish loop...")
print("Type messages to send (press Enter to send):")
while not termination_event.is_set():
try:
# Use trio's run_sync_in_worker_thread to avoid blocking the event loop
message = await trio.to_thread.run_sync(input)
if message.lower() == "quit":
termination_event.set() # Signal termination
break
if message:
logger.debug(f"Publishing message: {message}")
await pubsub.publish(topic, message.encode())
print(f"Published: {message}")
except Exception:
logger.exception("Error in publish loop")
await trio.sleep(1) # Avoid tight loop on error
def find_free_port():
"""Find a free port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the OS
return s.getsockname()[1]
async def monitor_peer_topics(pubsub, nursery, termination_event):
"""
Monitor for new topics that peers are subscribed to and
automatically subscribe the server to those topics.
"""
# Keep track of topics we've already subscribed to
subscribed_topics = set()
while not termination_event.is_set():
# Check for new topics in peer_topics
for topic in pubsub.peer_topics.keys():
if topic not in subscribed_topics:
logger.info(f"Auto-subscribing to new topic: {topic}")
subscription = await pubsub.subscribe(topic)
subscribed_topics.add(topic)
# Start a receive loop for this topic
nursery.start_soon(receive_loop, subscription, termination_event)
# Check every 2 seconds for new topics
await trio.sleep(2)
async def run(topic: str, destination: Optional[str], port: Optional[int]) -> None:
# Initialize network settings
localhost_ip = "127.0.0.1"
if port is None or port == 0:
port = find_free_port()
logger.info(f"Using random available port: {port}")
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Create a new libp2p host
host = new_host(
key_pair=key_pair,
muxer_opt={MPLEX_PROTOCOL_ID: Mplex},
)
# Log available protocols
logger.debug(f"Host ID: {host.get_id()}")
logger.debug(
f"Host multiselect protocols: "
f"{host.get_mux().get_protocols() if hasattr(host, 'get_mux') else 'N/A'}"
)
# Create and start gossipsub with optimized parameters for testing
gossipsub = GossipSub(
protocols=[GOSSIPSUB_PROTOCOL_ID],
degree=3, # Number of peers to maintain in mesh
degree_low=2, # Lower bound for mesh peers
degree_high=4, # Upper bound for mesh peers
time_to_live=60, # TTL for message cache in seconds
gossip_window=2, # Smaller window for faster gossip
gossip_history=5, # Keep more history
heartbeat_initial_delay=2.0, # Start heartbeats sooner
heartbeat_interval=5, # More frequent heartbeats for testing
)
pubsub = Pubsub(host, gossipsub)
termination_event = trio.Event() # Event to signal termination
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
logger.info(f"Node started with peer ID: {host.get_id()}")
logger.info(f"Listening on: {listen_addr}")
logger.info("Initializing PubSub and GossipSub...")
async with background_trio_service(pubsub):
async with background_trio_service(gossipsub):
logger.info("Pubsub and GossipSub services started.")
await pubsub.wait_until_ready()
logger.info("Pubsub ready.")
# Subscribe to the topic
subscription = await pubsub.subscribe(topic)
logger.info(f"Subscribed to topic: {topic}")
if not destination:
# Server mode
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
)
logger.info("Waiting for peers...")
# Start topic monitoring to auto-subscribe to client topics
nursery.start_soon(
monitor_peer_topics, pubsub, nursery, termination_event
)
# Start message publish and receive loops
nursery.start_soon(receive_loop, subscription, termination_event)
nursery.start_soon(publish_loop, pubsub, topic, termination_event)
else:
# Client mode
maddr = multiaddr.Multiaddr(destination)
protocols_in_maddr = maddr.protocols()
info = info_from_p2p_addr(maddr)
logger.debug(f"Multiaddr protocols: {protocols_in_maddr}")
logger.info(
f"Connecting to peer: {info.peer_id} "
f"using protocols: {protocols_in_maddr}"
)
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
)
try:
await host.connect(info)
logger.info(f"Connected to peer: {info.peer_id}")
if logger.isEnabledFor(logging.DEBUG):
await trio.sleep(1)
logger.debug(
f"After connection, pubsub.peers: {pubsub.peers}"
)
peer_protocols = [
gossipsub.peer_protocol.get(p)
for p in pubsub.peers.keys()
]
logger.debug(f"Peer protocols: {peer_protocols}")
# Start the loops
nursery.start_soon(
receive_loop, subscription, termination_event
)
nursery.start_soon(
publish_loop, pubsub, topic, termination_event
)
except Exception:
logger.exception(f"Failed to connect to peer: {info.peer_id}")
return
await termination_event.wait() # Wait for termination signal
# Ensure all tasks are completed before exiting
nursery.cancel_scope.cancel()
print("Application shutdown complete") # Print shutdown message
def main() -> None:
description = """
This program demonstrates a pubsub p2p chat application using libp2p with
the gossipsub protocol as the pubsub router.
To use it, first run 'python pubsub.py -p <PORT> -t <TOPIC>',
where <PORT> is the port number,
and <TOPIC> is the name of the topic you want to subscribe to.
Then, run another instance with 'python pubsub.py -p <ANOTHER_PORT> -t <TOPIC>
-d <DESTINATION>', where <DESTINATION> is the multiaddress of the previous
listener host. Messages typed in either terminal will be received by all peers
subscribed to the same topic.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-t",
"--topic",
type=str,
help="topic name to subscribe",
default=CHAT_TOPIC,
)
parser.add_argument(
"-d",
"--destination",
type=str,
help="Address of peer to connect to",
default=None,
)
parser.add_argument(
"-p",
"--port",
type=int,
help="Port to listen on",
default=None,
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Enable debug logging",
)
args = parser.parse_args()
# Set debug level if verbose flag is provided
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.debug("Debug logging enabled")
logger.info("Running pubsub chat example...")
logger.info(f"Your selected topic is: {args.topic}")
try:
trio.run(run, *(args.topic, args.destination, args.port))
except KeyboardInterrupt:
logger.info("Application terminated by user")
if __name__ == "__main__":
main()

View File

@ -0,0 +1 @@
Added a ``pub-sub`` example having ``gossipsub`` as the router to demonstrate how to use the pub-sub module in py-libp2p.

View File

@ -109,6 +109,7 @@ setup(
"echo-demo=examples.echo.echo:main",
"ping-demo=examples.ping.ping:main",
"identify-demo=examples.identify.identify:main",
"pubsub-demo=examples.pubsub.pubsub:main",
],
},
)

View File

@ -1,12 +1,24 @@
import pytest
import trio
from libp2p.custom_types import (
TProtocol,
)
from libp2p.host.exceptions import (
StreamFailure,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.pubsub.gossipsub import (
GossipSub,
)
from libp2p.pubsub.pubsub import (
Pubsub,
)
from libp2p.tools.async_service.trio_service import (
background_trio_service,
)
from libp2p.tools.utils import (
MAX_READ_LEN,
)
@ -17,6 +29,8 @@ from tests.utils.factories import (
CHAT_PROTOCOL_ID = "/chat/1.0.0"
ECHO_PROTOCOL_ID = "/echo/1.0.0"
PING_PROTOCOL_ID = "/ipfs/ping/1.0.0"
GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
PUBSUB_TEST_TOPIC = "test-pubsub-topic"
async def hello_world(host_a, host_b):
@ -185,6 +199,53 @@ async def ping_demo(host_a, host_b):
assert response == ping_data
async def pubsub_demo(host_a, host_b):
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
pubsub_a = Pubsub(host_a, gossipsub_a)
pubsub_b = Pubsub(host_b, gossipsub_b)
message_a_to_b = "Hello from A to B"
b_received = trio.Event()
received_by_b = None
async def handle_subscription_b(subscription):
nonlocal received_by_b
message = await subscription.get()
received_by_b = message.data.decode("utf-8")
print(f"Host B received: {received_by_b}")
b_received.set()
async with background_trio_service(pubsub_a):
async with background_trio_service(pubsub_b):
async with background_trio_service(gossipsub_a):
async with background_trio_service(gossipsub_b):
await pubsub_a.wait_until_ready()
await pubsub_b.wait_until_ready()
listen_addrs_b = host_b.get_addrs()
peer_info_b = info_from_p2p_addr(listen_addrs_b[0])
try:
await pubsub_a.host.connect(peer_info_b)
print("Connection attempt completed")
except Exception as e:
print(f"Connection error: {e}")
raise
subscription_b = await pubsub_b.subscribe(PUBSUB_TEST_TOPIC)
async with trio.open_nursery() as nursery:
nursery.start_soon(handle_subscription_b, subscription_b)
await trio.sleep(0.1)
await pubsub_a.publish(
PUBSUB_TEST_TOPIC, message_a_to_b.encode()
)
with trio.move_on_after(3):
await b_received.wait()
nursery.cancel_scope.cancel()
assert received_by_b == message_a_to_b
assert b_received.is_set()
@pytest.mark.parametrize(
"test",
[
@ -195,6 +256,7 @@ async def ping_demo(host_a, host_b):
chat_demo,
echo_demo,
ping_demo,
pubsub_demo,
],
)
@pytest.mark.trio
@ -203,8 +265,9 @@ async def test_protocols(test, security_protocol):
async with HostFactory.create_batch_and_listen(
2, security_protocol=security_protocol
) as hosts:
addr = hosts[0].get_addrs()[0]
info = info_from_p2p_addr(addr)
await hosts[1].connect(info)
if test != pubsub_demo:
addr = hosts[0].get_addrs()[0]
info = info_from_p2p_addr(addr)
await hosts[1].connect(info)
await test(hosts[0], hosts[1])