mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
feat: identify-push protocol. initial release
This commit is contained in:
21
docs/examples.identify_push.rst
Normal file
21
docs/examples.identify_push.rst
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
examples.identify\_push package
|
||||||
|
===============================
|
||||||
|
|
||||||
|
Submodules
|
||||||
|
----------
|
||||||
|
|
||||||
|
examples.identify\_push.identify\_push\_example module
|
||||||
|
------------------------------------------------------
|
||||||
|
|
||||||
|
.. automodule:: examples.identify_push.identify_push_example
|
||||||
|
:members:
|
||||||
|
:undoc-members:
|
||||||
|
:show-inheritance:
|
||||||
|
|
||||||
|
Module contents
|
||||||
|
---------------
|
||||||
|
|
||||||
|
.. automodule:: examples.identify_push
|
||||||
|
:members:
|
||||||
|
:undoc-members:
|
||||||
|
:show-inheritance:
|
||||||
21
docs/libp2p.identity.identify_push.rst
Normal file
21
docs/libp2p.identity.identify_push.rst
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
libp2p.identity.identify\_push package
|
||||||
|
======================================
|
||||||
|
|
||||||
|
Submodules
|
||||||
|
----------
|
||||||
|
|
||||||
|
libp2p.identity.identify\_push.identify\_push module
|
||||||
|
----------------------------------------------------
|
||||||
|
|
||||||
|
.. automodule:: libp2p.identity.identify_push.identify_push
|
||||||
|
:members:
|
||||||
|
:undoc-members:
|
||||||
|
:show-inheritance:
|
||||||
|
|
||||||
|
Module contents
|
||||||
|
---------------
|
||||||
|
|
||||||
|
.. automodule:: libp2p.identity.identify_push
|
||||||
|
:members:
|
||||||
|
:undoc-members:
|
||||||
|
:show-inheritance:
|
||||||
0
examples/identify_push/__init__.py
Normal file
0
examples/identify_push/__init__.py
Normal file
97
examples/identify_push/identify_push_example.py
Normal file
97
examples/identify_push/identify_push_example.py
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Example demonstrating the identify/push protocol.
|
||||||
|
|
||||||
|
This example shows how to:
|
||||||
|
1. Set up a host with the identify/push protocol handler
|
||||||
|
2. Connect to another peer
|
||||||
|
3. Push identify information to the peer
|
||||||
|
4. Receive and process identify/push messages
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from libp2p import (
|
||||||
|
new_host,
|
||||||
|
)
|
||||||
|
from libp2p.crypto.secp256k1 import (
|
||||||
|
create_new_key_pair,
|
||||||
|
)
|
||||||
|
from libp2p.custom_types import (
|
||||||
|
TProtocol,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify import (
|
||||||
|
identify_handler_for,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify_push import (
|
||||||
|
ID_PUSH,
|
||||||
|
identify_push_handler_for,
|
||||||
|
push_identify_to_peer,
|
||||||
|
)
|
||||||
|
from libp2p.peer.peerinfo import (
|
||||||
|
info_from_p2p_addr,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
# Create key pairs for the two hosts
|
||||||
|
key_pair_1 = create_new_key_pair()
|
||||||
|
key_pair_2 = create_new_key_pair()
|
||||||
|
|
||||||
|
# Create the first host
|
||||||
|
host_1 = new_host(key_pair=key_pair_1)
|
||||||
|
|
||||||
|
# Set up the identify and identify/push handlers
|
||||||
|
host_1.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host_1))
|
||||||
|
host_1.set_stream_handler(ID_PUSH, identify_push_handler_for(host_1))
|
||||||
|
|
||||||
|
# Create the second host
|
||||||
|
host_2 = new_host(key_pair=key_pair_2)
|
||||||
|
|
||||||
|
# Set up the identify and identify/push handlers
|
||||||
|
host_2.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host_2))
|
||||||
|
host_2.set_stream_handler(ID_PUSH, identify_push_handler_for(host_2))
|
||||||
|
|
||||||
|
# Start listening on random ports using the run context manager
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||||
|
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||||
|
|
||||||
|
async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]):
|
||||||
|
# Get the addresses of both hosts
|
||||||
|
addr_1 = host_1.get_addrs()[0]
|
||||||
|
logger.info("Host 1 listening on %s", addr_1)
|
||||||
|
|
||||||
|
addr_2 = host_2.get_addrs()[0]
|
||||||
|
logger.info("Host 2 listening on %s", addr_2)
|
||||||
|
|
||||||
|
# Connect host_2 to host_1
|
||||||
|
peer_info = info_from_p2p_addr(addr_1)
|
||||||
|
await host_2.connect(peer_info)
|
||||||
|
logger.info("Host 2 connected to Host 1")
|
||||||
|
|
||||||
|
# Wait a bit for the connection to establish
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
# Push identify information from host_1 to host_2
|
||||||
|
logger.info("Host 1 pushing identify information to Host 2")
|
||||||
|
await push_identify_to_peer(host_1, host_2.get_id())
|
||||||
|
|
||||||
|
# Wait a bit for the push to complete
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
logger.info("Example completed successfully")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
trio.run(main)
|
||||||
196
examples/identify_push/identify_push_listener_dialer.py
Normal file
196
examples/identify_push/identify_push_listener_dialer.py
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Example demonstrating the identify/push protocol with separate listener and dialer
|
||||||
|
roles.
|
||||||
|
|
||||||
|
This example shows how to:
|
||||||
|
1. Set up a listener host with the identify/push protocol handler
|
||||||
|
2. Connect to the listener from a dialer peer
|
||||||
|
3. Push identify information to the listener
|
||||||
|
4. Receive and process identify/push messages
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# First run this script as a listener (default port 8888):
|
||||||
|
python identify_push_listener_dialer.py
|
||||||
|
|
||||||
|
# Then in another console, run as a dialer (default port 8889):
|
||||||
|
python identify_push_listener_dialer.py -d /ip4/127.0.0.1/tcp/8888/p2p/PEER_ID
|
||||||
|
(where PEER_ID is the peer ID displayed by the listener)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import multiaddr
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from libp2p import (
|
||||||
|
new_host,
|
||||||
|
)
|
||||||
|
from libp2p.crypto.secp256k1 import (
|
||||||
|
create_new_key_pair,
|
||||||
|
)
|
||||||
|
from libp2p.custom_types import (
|
||||||
|
TProtocol,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify import (
|
||||||
|
identify_handler_for,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify_push import (
|
||||||
|
ID_PUSH,
|
||||||
|
identify_push_handler_for,
|
||||||
|
push_identify_to_peer,
|
||||||
|
)
|
||||||
|
from libp2p.peer.peerinfo import (
|
||||||
|
info_from_p2p_addr,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger("libp2p.identity.identify-push-example")
|
||||||
|
|
||||||
|
|
||||||
|
async def run_listener(port: int) -> None:
|
||||||
|
"""Run a host in listener mode."""
|
||||||
|
print(f"\n==== Starting Identify-Push Listener on port {port} ====\n")
|
||||||
|
|
||||||
|
# Create key pair for the listener
|
||||||
|
key_pair = create_new_key_pair()
|
||||||
|
|
||||||
|
# Create the listener host
|
||||||
|
host = new_host(key_pair=key_pair)
|
||||||
|
|
||||||
|
# Set up the identify and identify/push handlers
|
||||||
|
host.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host))
|
||||||
|
host.set_stream_handler(ID_PUSH, identify_push_handler_for(host))
|
||||||
|
|
||||||
|
# Start listening
|
||||||
|
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||||
|
|
||||||
|
async with host.run([listen_addr]):
|
||||||
|
addr = host.get_addrs()[0]
|
||||||
|
logger.info("Listener host ready")
|
||||||
|
logger.info("Listening on: %s", addr)
|
||||||
|
logger.info("Peer ID: %s", host.get_id().pretty())
|
||||||
|
|
||||||
|
# Print user-friendly information
|
||||||
|
print("Listener host ready!")
|
||||||
|
print(f"Listening on: {addr}")
|
||||||
|
print(f"Peer ID: {host.get_id().pretty()}")
|
||||||
|
print("\nRun dialer with command:")
|
||||||
|
print(f"python identify_push_listener_dialer.py -d {addr}")
|
||||||
|
print("\nWaiting for incoming connections... (Ctrl+C to exit)")
|
||||||
|
|
||||||
|
# Keep running until interrupted
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def run_dialer(port: int, destination: str) -> None:
|
||||||
|
"""Run a host in dialer mode that connects to a listener."""
|
||||||
|
print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n")
|
||||||
|
|
||||||
|
# Create key pair for the dialer
|
||||||
|
key_pair = create_new_key_pair()
|
||||||
|
|
||||||
|
# Create the dialer host
|
||||||
|
host = new_host(key_pair=key_pair)
|
||||||
|
|
||||||
|
# Set up the identify and identify/push handlers
|
||||||
|
host.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host))
|
||||||
|
host.set_stream_handler(ID_PUSH, identify_push_handler_for(host))
|
||||||
|
|
||||||
|
# Start listening on a different port
|
||||||
|
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||||
|
|
||||||
|
async with host.run([listen_addr]):
|
||||||
|
logger.info("Dialer host ready")
|
||||||
|
logger.info("Listening on: %s", host.get_addrs()[0])
|
||||||
|
|
||||||
|
print("Dialer host ready!")
|
||||||
|
print(f"Listening on: {host.get_addrs()[0]}")
|
||||||
|
|
||||||
|
# Parse the destination multiaddress and connect to the listener
|
||||||
|
maddr = multiaddr.Multiaddr(destination)
|
||||||
|
peer_info = info_from_p2p_addr(maddr)
|
||||||
|
logger.info("Connecting to peer: %s", peer_info.peer_id)
|
||||||
|
print(f"\nConnecting to peer: {peer_info.peer_id}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
await host.connect(peer_info)
|
||||||
|
logger.info("Successfully connected to listener")
|
||||||
|
print("Successfully connected to listener!")
|
||||||
|
|
||||||
|
# Wait briefly for the connection to establish
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
# Push identify information to the listener
|
||||||
|
logger.info("Pushing identify information to listener")
|
||||||
|
print("\nPushing identify information to listener...")
|
||||||
|
await push_identify_to_peer(host, peer_info.peer_id)
|
||||||
|
|
||||||
|
logger.info("Identify push completed. Waiting a moment...")
|
||||||
|
print("Identify push completed successfully!")
|
||||||
|
|
||||||
|
# Keep the connection open for a bit to observe what happens
|
||||||
|
print("Waiting a moment to keep connection open...")
|
||||||
|
await trio.sleep(5)
|
||||||
|
|
||||||
|
logger.info("Example completed successfully")
|
||||||
|
print("\nExample completed successfully!")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error during dialer operation: %s", str(e))
|
||||||
|
print(f"\nError during dialer operation: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Parse arguments and start the appropriate mode."""
|
||||||
|
description = """
|
||||||
|
This program demonstrates the libp2p identify/push protocol.
|
||||||
|
Without arguments, it runs as a listener on port 8888.
|
||||||
|
With -d parameter, it runs as a dialer on port 8889.
|
||||||
|
"""
|
||||||
|
|
||||||
|
example = (
|
||||||
|
"/ip4/127.0.0.1/tcp/8888/p2p/QmQn4SwGkDZkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description=description)
|
||||||
|
parser.add_argument(
|
||||||
|
"-p",
|
||||||
|
"--port",
|
||||||
|
type=int,
|
||||||
|
help="port to listen on (default: 8888 for listener, 8889 for dialer)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-d",
|
||||||
|
"--destination",
|
||||||
|
type=str,
|
||||||
|
help=f"destination multiaddr string, e.g. {example}",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if args.destination:
|
||||||
|
# Run in dialer mode with default port 8889 if not specified
|
||||||
|
port = args.port if args.port is not None else 8889
|
||||||
|
trio.run(run_dialer, port, args.destination)
|
||||||
|
else:
|
||||||
|
# Run in listener mode with default port 8888 if not specified
|
||||||
|
port = args.port if args.port is not None else 8888
|
||||||
|
trio.run(run_listener, port)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nInterrupted by user")
|
||||||
|
logger.info("Interrupted by user")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\nError: {str(e)}")
|
||||||
|
logger.error("Error: %s", str(e))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,9 @@
|
|||||||
|
from . import (
|
||||||
|
identify,
|
||||||
|
identify_push,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"identify",
|
||||||
|
"identify_push",
|
||||||
|
]
|
||||||
|
|||||||
@ -0,0 +1,9 @@
|
|||||||
|
from .identify import (
|
||||||
|
ID,
|
||||||
|
identify_handler_for,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ID",
|
||||||
|
"identify_handler_for",
|
||||||
|
]
|
||||||
|
|||||||
13
libp2p/identity/identify_push/__init__.py
Normal file
13
libp2p/identity/identify_push/__init__.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
from .identify_push import (
|
||||||
|
ID_PUSH,
|
||||||
|
identify_push_handler_for,
|
||||||
|
push_identify_to_peer,
|
||||||
|
push_identify_to_peers,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ID_PUSH",
|
||||||
|
"identify_push_handler_for",
|
||||||
|
"push_identify_to_peer",
|
||||||
|
"push_identify_to_peers",
|
||||||
|
]
|
||||||
180
libp2p/identity/identify_push/identify_push.py
Normal file
180
libp2p/identity/identify_push/identify_push.py
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
import logging
|
||||||
|
from typing import (
|
||||||
|
Optional,
|
||||||
|
)
|
||||||
|
|
||||||
|
from multiaddr import (
|
||||||
|
Multiaddr,
|
||||||
|
)
|
||||||
|
|
||||||
|
from libp2p.abc import (
|
||||||
|
IHost,
|
||||||
|
INetStream,
|
||||||
|
IPeerStore,
|
||||||
|
)
|
||||||
|
from libp2p.crypto.serialization import (
|
||||||
|
deserialize_public_key,
|
||||||
|
)
|
||||||
|
from libp2p.custom_types import (
|
||||||
|
StreamHandlerFn,
|
||||||
|
TProtocol,
|
||||||
|
)
|
||||||
|
from libp2p.network.stream.exceptions import (
|
||||||
|
StreamClosed,
|
||||||
|
)
|
||||||
|
from libp2p.peer.id import (
|
||||||
|
ID,
|
||||||
|
)
|
||||||
|
from libp2p.utils import (
|
||||||
|
get_agent_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ..identify.identify import (
|
||||||
|
_mk_identify_protobuf,
|
||||||
|
)
|
||||||
|
from ..identify.pb.identify_pb2 import (
|
||||||
|
Identify,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Protocol ID for identify/push
|
||||||
|
ID_PUSH = TProtocol("/ipfs/id/push/1.0.0")
|
||||||
|
PROTOCOL_VERSION = "ipfs/0.1.0"
|
||||||
|
AGENT_VERSION = get_agent_version()
|
||||||
|
|
||||||
|
|
||||||
|
def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
|
||||||
|
"""
|
||||||
|
Create a handler for the identify/push protocol.
|
||||||
|
|
||||||
|
This handler receives pushed identify messages from remote peers and updates
|
||||||
|
the local peerstore with the new information.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def handle_identify_push(stream: INetStream) -> None:
|
||||||
|
peer_id = stream.muxed_conn.peer_id
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read the identify message from the stream
|
||||||
|
data = await stream.read()
|
||||||
|
identify_msg = Identify()
|
||||||
|
identify_msg.ParseFromString(data)
|
||||||
|
|
||||||
|
# Update the peerstore with the new information
|
||||||
|
await _update_peerstore_from_identify(
|
||||||
|
host.get_peerstore(), peer_id, identify_msg
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Successfully processed identify/push from peer %s", peer_id)
|
||||||
|
except StreamClosed:
|
||||||
|
logger.debug(
|
||||||
|
"Stream closed while processing identify/push from %s", peer_id
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error processing identify/push from %s: %s", peer_id, e)
|
||||||
|
finally:
|
||||||
|
# Close the stream after processing
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
return handle_identify_push
|
||||||
|
|
||||||
|
|
||||||
|
async def _update_peerstore_from_identify(
|
||||||
|
peerstore: IPeerStore, peer_id: ID, identify_msg: Identify
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Update the peerstore with information from an identify message.
|
||||||
|
|
||||||
|
This function handles partial updates, where only some fields may be present
|
||||||
|
in the identify message.
|
||||||
|
"""
|
||||||
|
# Update public key if present
|
||||||
|
if identify_msg.HasField("public_key"):
|
||||||
|
try:
|
||||||
|
# Note: This assumes the peerstore has a method to update the public key
|
||||||
|
# You may need to adjust this based on your actual peerstore implementation
|
||||||
|
peerstore.add_protocols(peer_id, [])
|
||||||
|
# The actual public key update would go here
|
||||||
|
pubkey = deserialize_public_key(identify_msg.public_key)
|
||||||
|
peerstore.add_pubkey(peer_id, pubkey)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error updating public key for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
# Update listen addresses if present
|
||||||
|
if identify_msg.listen_addrs:
|
||||||
|
try:
|
||||||
|
# Convert bytes to Multiaddr objects
|
||||||
|
addrs = [Multiaddr(addr) for addr in identify_msg.listen_addrs]
|
||||||
|
# Add the addresses to the peerstore
|
||||||
|
for addr in addrs:
|
||||||
|
# Use a default TTL of 2 hours (7200 seconds)
|
||||||
|
peerstore.add_addr(peer_id, addr, 7200)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error updating listen addresses for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
# Update protocols if present
|
||||||
|
if identify_msg.protocols:
|
||||||
|
try:
|
||||||
|
# Add the protocols to the peerstore
|
||||||
|
peerstore.add_protocols(peer_id, identify_msg.protocols)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error updating protocols for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
# Update observed address if present
|
||||||
|
if identify_msg.HasField("observed_addr") and identify_msg.observed_addr:
|
||||||
|
try:
|
||||||
|
# Convert bytes to Multiaddr object
|
||||||
|
observed_addr = Multiaddr(identify_msg.observed_addr)
|
||||||
|
# Add the observed address to the peerstore
|
||||||
|
# Use a default TTL of 2 hours (7200 seconds)
|
||||||
|
peerstore.add_addr(peer_id, observed_addr, 7200)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error updating observed address for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
|
||||||
|
async def push_identify_to_peer(
|
||||||
|
host: IHost, peer_id: ID, observed_multiaddr: Optional[Multiaddr] = None
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Push an identify message to a specific peer.
|
||||||
|
|
||||||
|
This function opens a stream to the peer using the identify/push protocol,
|
||||||
|
sends the identify message, and closes the stream.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Create a new stream to the peer using the identify/push protocol
|
||||||
|
stream = await host.new_stream(peer_id, [ID_PUSH])
|
||||||
|
|
||||||
|
# Create the identify message
|
||||||
|
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
|
||||||
|
response = identify_msg.SerializeToString()
|
||||||
|
|
||||||
|
# Send the identify message
|
||||||
|
await stream.write(response)
|
||||||
|
|
||||||
|
# Close the stream
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
logger.debug("Successfully pushed identify to peer %s", peer_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
|
||||||
|
async def push_identify_to_peers(
|
||||||
|
host: IHost,
|
||||||
|
peer_ids: Optional[set[ID]] = None,
|
||||||
|
observed_multiaddr: Optional[Multiaddr] = None,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Push an identify message to multiple peers.
|
||||||
|
|
||||||
|
If peer_ids is None, push to all connected peers.
|
||||||
|
"""
|
||||||
|
if peer_ids is None:
|
||||||
|
# Get all connected peers
|
||||||
|
peer_ids = set(host.get_peerstore().peer_ids())
|
||||||
|
|
||||||
|
# Push to each peer
|
||||||
|
for peer_id in peer_ids:
|
||||||
|
await push_identify_to_peer(host, peer_id, observed_multiaddr)
|
||||||
278
tests/core/identity/identify_push/test_identify_push.py
Normal file
278
tests/core/identity/identify_push/test_identify_push.py
Normal file
@ -0,0 +1,278 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from libp2p.identity.identify.identify import (
|
||||||
|
_mk_identify_protobuf,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify.pb.identify_pb2 import (
|
||||||
|
Identify,
|
||||||
|
)
|
||||||
|
from libp2p.identity.identify_push.identify_push import (
|
||||||
|
ID_PUSH,
|
||||||
|
_update_peerstore_from_identify,
|
||||||
|
identify_push_handler_for,
|
||||||
|
push_identify_to_peer,
|
||||||
|
)
|
||||||
|
from tests.utils.factories import (
|
||||||
|
host_pair_factory,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger("libp2p.identity.identify-push-test")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_identify_push_protocol(security_protocol):
|
||||||
|
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||||
|
host_a,
|
||||||
|
host_b,
|
||||||
|
):
|
||||||
|
# Set up the identify/push handlers
|
||||||
|
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
|
||||||
|
|
||||||
|
# Push identify information from host_a to host_b
|
||||||
|
await push_identify_to_peer(host_a, host_b.get_id())
|
||||||
|
|
||||||
|
# Wait a bit for the push to complete
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# Get the peerstore from host_b
|
||||||
|
peerstore = host_b.get_peerstore()
|
||||||
|
|
||||||
|
# Check that host_b's peerstore has been updated with host_a's information
|
||||||
|
peer_id = host_a.get_id()
|
||||||
|
|
||||||
|
# Check that the peer is in the peerstore
|
||||||
|
assert peer_id in peerstore.peer_ids()
|
||||||
|
|
||||||
|
# Check that the addresses have been updated
|
||||||
|
host_a_addrs = set(host_a.get_addrs())
|
||||||
|
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional addresses from the connection
|
||||||
|
# So we just check that all of host_a's addresses are in the peerstore
|
||||||
|
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||||
|
|
||||||
|
# Check that the protocols have been updated
|
||||||
|
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||||
|
# Use get_protocols instead of protocols
|
||||||
|
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional protocols
|
||||||
|
# So we just check that all of host_a's protocols are in the peerstore
|
||||||
|
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||||
|
|
||||||
|
# Check that the public key has been updated
|
||||||
|
host_a_public_key = host_a.get_public_key().serialize()
|
||||||
|
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||||
|
|
||||||
|
assert host_a_public_key == peerstore_public_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_identify_push_handler(security_protocol):
|
||||||
|
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||||
|
host_a,
|
||||||
|
host_b,
|
||||||
|
):
|
||||||
|
# Set up the identify/push handlers
|
||||||
|
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
|
||||||
|
|
||||||
|
# Push identify information from host_a to host_b
|
||||||
|
await push_identify_to_peer(host_a, host_b.get_id())
|
||||||
|
|
||||||
|
# Wait a bit for the push to complete
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# Get the peerstore from host_b
|
||||||
|
peerstore = host_b.get_peerstore()
|
||||||
|
|
||||||
|
# Check that host_b's peerstore has been updated with host_a's information
|
||||||
|
peer_id = host_a.get_id()
|
||||||
|
|
||||||
|
# Check that the peer is in the peerstore
|
||||||
|
assert peer_id in peerstore.peer_ids()
|
||||||
|
|
||||||
|
# Check that the addresses have been updated
|
||||||
|
host_a_addrs = set(host_a.get_addrs())
|
||||||
|
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional addresses from the connection
|
||||||
|
# So we just check that all of host_a's addresses are in the peerstore
|
||||||
|
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||||
|
|
||||||
|
# Check that the protocols have been updated
|
||||||
|
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||||
|
# Use get_protocols instead of protocols
|
||||||
|
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional protocols
|
||||||
|
# So we just check that all of host_a's protocols are in the peerstore
|
||||||
|
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||||
|
|
||||||
|
# Check that the public key has been updated
|
||||||
|
host_a_public_key = host_a.get_public_key().serialize()
|
||||||
|
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||||
|
|
||||||
|
assert host_a_public_key == peerstore_public_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_identify_push_to_peers(security_protocol):
|
||||||
|
# Create three hosts
|
||||||
|
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||||
|
host_a,
|
||||||
|
host_b,
|
||||||
|
):
|
||||||
|
# Create a third host
|
||||||
|
# Instead of using key_pair, create a new host directly
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
from libp2p import (
|
||||||
|
new_host,
|
||||||
|
)
|
||||||
|
from libp2p.crypto.secp256k1 import (
|
||||||
|
create_new_key_pair,
|
||||||
|
)
|
||||||
|
from libp2p.peer.peerinfo import (
|
||||||
|
info_from_p2p_addr,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a new key pair for host_c
|
||||||
|
key_pair_c = create_new_key_pair()
|
||||||
|
host_c = new_host(key_pair=key_pair_c)
|
||||||
|
|
||||||
|
# Set up the identify/push handlers
|
||||||
|
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
|
||||||
|
host_c.set_stream_handler(ID_PUSH, identify_push_handler_for(host_c))
|
||||||
|
|
||||||
|
# Start listening on a random port using the run context manager
|
||||||
|
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||||
|
async with host_c.run([listen_addr]):
|
||||||
|
# Connect host_c to host_a and host_b
|
||||||
|
await host_c.connect(info_from_p2p_addr(host_a.get_addrs()[0]))
|
||||||
|
await host_c.connect(info_from_p2p_addr(host_b.get_addrs()[0]))
|
||||||
|
|
||||||
|
# Push identify information from host_a to all connected peers
|
||||||
|
from libp2p.identity.identify_push.identify_push import (
|
||||||
|
push_identify_to_peers,
|
||||||
|
)
|
||||||
|
|
||||||
|
await push_identify_to_peers(host_a)
|
||||||
|
|
||||||
|
# Wait a bit for the push to complete
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# Check that host_b's peerstore has been updated with host_a's information
|
||||||
|
peerstore_b = host_b.get_peerstore()
|
||||||
|
peer_id_a = host_a.get_id()
|
||||||
|
|
||||||
|
# Check that the peer is in the peerstore
|
||||||
|
assert peer_id_a in peerstore_b.peer_ids()
|
||||||
|
|
||||||
|
# Check that host_c's peerstore has been updated with host_a's information
|
||||||
|
peerstore_c = host_c.get_peerstore()
|
||||||
|
|
||||||
|
# Check that the peer is in the peerstore
|
||||||
|
assert peer_id_a in peerstore_c.peer_ids()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_update_peerstore_from_identify(security_protocol):
|
||||||
|
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||||
|
host_a,
|
||||||
|
host_b,
|
||||||
|
):
|
||||||
|
# Get the peerstore from host_b
|
||||||
|
peerstore = host_b.get_peerstore()
|
||||||
|
|
||||||
|
# Create an identify message with host_a's information
|
||||||
|
identify_msg = _mk_identify_protobuf(host_a, None)
|
||||||
|
|
||||||
|
# Update the peerstore with the identify message
|
||||||
|
await _update_peerstore_from_identify(peerstore, host_a.get_id(), identify_msg)
|
||||||
|
|
||||||
|
# Check that the peerstore has been updated with host_a's information
|
||||||
|
peer_id = host_a.get_id()
|
||||||
|
|
||||||
|
# Check that the peer is in the peerstore
|
||||||
|
assert peer_id in peerstore.peer_ids()
|
||||||
|
|
||||||
|
# Check that the addresses have been updated
|
||||||
|
host_a_addrs = set(host_a.get_addrs())
|
||||||
|
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional addresses from the connection
|
||||||
|
# So we just check that all of host_a's addresses are in the peerstore
|
||||||
|
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||||
|
|
||||||
|
# Check that the protocols have been updated
|
||||||
|
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||||
|
# Use get_protocols instead of protocols
|
||||||
|
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||||
|
|
||||||
|
# The peerstore might have additional protocols
|
||||||
|
# So we just check that all of host_a's protocols are in the peerstore
|
||||||
|
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||||
|
|
||||||
|
# Check that the public key has been updated
|
||||||
|
host_a_public_key = host_a.get_public_key().serialize()
|
||||||
|
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||||
|
|
||||||
|
assert host_a_public_key == peerstore_public_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_partial_update_peerstore_from_identify(security_protocol):
|
||||||
|
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||||
|
host_a,
|
||||||
|
host_b,
|
||||||
|
):
|
||||||
|
# Get the peerstore from host_b
|
||||||
|
peerstore = host_b.get_peerstore()
|
||||||
|
|
||||||
|
# First, update the peerstore with all of host_a's information
|
||||||
|
identify_msg_full = _mk_identify_protobuf(host_a, None)
|
||||||
|
await _update_peerstore_from_identify(
|
||||||
|
peerstore, host_a.get_id(), identify_msg_full
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now create a partial identify message with only some fields
|
||||||
|
identify_msg_partial = Identify()
|
||||||
|
|
||||||
|
# Only include the protocols field
|
||||||
|
identify_msg_partial.protocols.extend(["new_protocol_1", "new_protocol_2"])
|
||||||
|
|
||||||
|
# Update the peerstore with the partial identify message
|
||||||
|
await _update_peerstore_from_identify(
|
||||||
|
peerstore, host_a.get_id(), identify_msg_partial
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the peerstore has been updated with the new protocols
|
||||||
|
peer_id = host_a.get_id()
|
||||||
|
|
||||||
|
# Check that the peer is still in the peerstore
|
||||||
|
assert peer_id in peerstore.peer_ids()
|
||||||
|
|
||||||
|
# Check that the new protocols have been added
|
||||||
|
# Use get_protocols instead of protocols
|
||||||
|
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||||
|
|
||||||
|
# The new protocols should be in the peerstore
|
||||||
|
assert "new_protocol_1" in peerstore_protocols
|
||||||
|
assert "new_protocol_2" in peerstore_protocols
|
||||||
|
|
||||||
|
# The original protocols should still be in the peerstore
|
||||||
|
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||||
|
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||||
|
|
||||||
|
# The addresses should still be in the peerstore
|
||||||
|
host_a_addrs = set(host_a.get_addrs())
|
||||||
|
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||||
|
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||||
|
|
||||||
|
# The public key should still be in the peerstore
|
||||||
|
host_a_public_key = host_a.get_public_key().serialize()
|
||||||
|
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||||
|
assert host_a_public_key == peerstore_public_key
|
||||||
Reference in New Issue
Block a user