added event emmiter

This commit is contained in:
sumanjeet0012@gmail.com
2025-06-20 11:37:02 +05:30
parent cd7eaba4a4
commit 3262749db7
9 changed files with 199 additions and 127 deletions

View File

View File

@ -1,9 +1,21 @@
import secrets
import multiaddr
import trio
from libp2p import new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p import (
new_host,
)
from libp2p.abc import (
PeerInfo,
)
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.discovery.events.peerDiscovery import (
peerDiscovery,
)
async def main():
# Generate a key pair for the host
@ -17,20 +29,19 @@ async def main():
host = new_host(key_pair=key_pair, enable_mDNS=True)
async with host.run(listen_addrs=[listen_addr]):
print("Host started!")
print("Peer ID:", host.get_id())
print("Listening on:", [str(addr) for addr in host.get_addrs()])
print("host peer id", host.get_id())
# Print discovered peers via mDNS
print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...")
try:
while True:
# Print all known peers every 5 seconds
peers = host.get_peerstore().peer_ids()
print("Known peers:", [str(p) for p in peers if p != host.get_id()])
await trio.sleep(5)
peer_info = PeerInfo(host.get_id(), host.get_addrs())
await trio.sleep(1)
await peerDiscovery.emit_peer_discovered(peer_info=peer_info)
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
trio.run(main)
trio.run(main)

View File

@ -32,6 +32,9 @@ from libp2p.custom_types import (
TProtocol,
TSecurityOptions,
)
from libp2p.discovery.mdns.mdns import (
MDNSDiscovery,
)
from libp2p.host.basic_host import (
BasicHost,
)
@ -71,9 +74,6 @@ from libp2p.transport.upgrader import (
from libp2p.utils.logging import (
setup_logging,
)
from libp2p.discovery.mdns.mdns import (
MDNSDiscovery
)
# Initialize logging configuration
setup_logging()
@ -269,16 +269,13 @@ def new_host(
listen_addrs=listen_addrs,
)
if disc_opt is not None:
host = RoutedHost(swarm, disc_opt)
else:
host = BasicHost(swarm)
if enable_mDNS:
mdns = MDNSDiscovery(swarm)
mdns.start()
host._mdns = mdns
return host
if disc_opt is not None:
return RoutedHost(swarm, disc_opt)
else:
return BasicHost(swarm)
__version__ = __version("libp2p")

View File

View File

@ -0,0 +1,37 @@
from collections.abc import (
Awaitable,
Callable,
)
import trio
from libp2p.abc import (
PeerInfo,
)
TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds
class PeerDiscovery:
def __init__(self) -> None:
self._peer_discovered_handlers: list[Callable[[PeerInfo], Awaitable[None]]] = []
def register_peer_discovered_handler(
self, handler: Callable[[PeerInfo], Awaitable[None]]
) -> None:
self._peer_discovered_handlers.append(handler)
async def emit_peer_discovered(self, peer_info: PeerInfo) -> None:
for handler in self._peer_discovered_handlers:
await handler(peer_info)
peerDiscovery = PeerDiscovery()
async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None:
await trio.sleep(5) # Simulate some processing delay
# print("Discovered peer is", peerInfo.peer_id)
peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler)

View File

@ -1,32 +1,45 @@
from zeroconf import Zeroconf, ServiceInfo
from .utils import stringGen
import socket
from zeroconf import (
ServiceInfo,
Zeroconf,
)
class PeerBroadcaster:
"""
Broadcasts this peer's presence on the local network using mDNS/zeroconf.
Registers a service with the peer's ID in the TXT record as per libp2p spec.
"""
def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, peer_id: str, port: int):
def __init__(
self,
zeroconf: Zeroconf,
service_type: str,
service_name: str,
peer_id: str,
port: int,
):
self.zeroconf = zeroconf
self.service_type = service_type
self.peer_id = peer_id
self.port = port
self.service_name = service_name
# Get the local IP address
local_ip = self._get_local_ip()
hostname = socket.gethostname()
self.service_info = ServiceInfo(
type_=self.service_type,
name=self.service_name,
port=self.port,
properties={b'id': self.peer_id.encode()},
server=f"{self.service_name}",
addresses=[socket.inet_aton(local_ip)]
properties={b"id": self.peer_id.encode()},
server=f"{hostname}.local.",
addresses=[socket.inet_aton(local_ip)],
)
def _get_local_ip(self):
def _get_local_ip(self) -> str:
"""Get the local IP address of this machine"""
try:
# Connect to a remote address to determine the local IP
@ -34,16 +47,17 @@ class PeerBroadcaster:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
print(f"Local IP determined: {local_ip}")
return local_ip
except Exception:
# Fallback to localhost if we can't determine the IP
return "127.0.0.1"
def register(self):
def register(self) -> None:
"""Register the peer's mDNS service on the network."""
print(f"Registering with name {self.service_name} and peer_id {self.peer_id} on port {self.port}")
print(repr(self.service_info))
self.zeroconf.register_service(self.service_info)
def unregister(self):
def unregister(self) -> None:
"""Unregister the peer's mDNS service from the network."""
self.zeroconf.unregister_service(self.service_info)

View File

@ -1,102 +1,105 @@
import time
import socket
from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.id import ID
import time
class PeerListener:
"""Enhanced mDNS listener for libp2p peer discovery."""
def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None):
from zeroconf import (
ServiceBrowser,
ServiceInfo,
ServiceListener,
ServiceStateChange,
Zeroconf,
)
from libp2p.abc import IPeerStore
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
class PeerListener(ServiceListener):
"""mDNS listener — now a true ServiceListener subclass."""
def __init__(
self,
peerstore: IPeerStore,
zeroconf: Zeroconf,
service_type: str,
service_name: str,
) -> None:
self.peerstore = peerstore
self.zeroconf = zeroconf
self.service_type = service_type
self.service_name = service_name
self.on_peer_discovery = on_peer_discovery
self.discovered_services = set()
self.browser = ServiceBrowser(
self.zeroconf, self.service_type, handlers=[self.on_service_state_change]
)
self.discovered_services: set[str] = set()
def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change):
if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services:
# pass `self` as the listener object
self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self)
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
# map to your on_service_state_change logic
self._on_state_change(zc, type_, name, ServiceStateChange.Added)
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
self._on_state_change(zc, type_, name, ServiceStateChange.Removed)
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
self._on_state_change(zc, type_, name, ServiceStateChange.Updated)
def _on_state_change(
self,
zeroconf: Zeroconf,
service_type: str,
name: str,
state: ServiceStateChange,
) -> None:
# skip our own service
if name == self.service_name:
return
print(f"Discovered service: {name}")
self.discovered_services.add(name)
# Process the discovered service
self._process_discovered_service(zeroconf, service_type, name)
def _process_discovered_service(self, zeroconf, service_type, name):
# Try to get service info with retries
# handle Added
if state is ServiceStateChange.Added:
if name in self.discovered_services:
return
self.discovered_services.add(name)
self._process_discovered_service(zeroconf, service_type, name)
# ...optional hooks for Removed/Updated if you need them
def _process_discovered_service(
self, zeroconf: Zeroconf, service_type: str, name: str
) -> None:
# same retry logic you had before
info = None
for attempt in range(3):
info = zeroconf.get_service_info(service_type, name, timeout=5000)
if info:
print(f"Service info successfully retrieved for {name}")
break
print(f"Retrying service info retrieval (attempt {attempt + 1}/3)")
time.sleep(1)
if not info:
print(f"Failed to retrieve service info for {name}")
return
# Extract peer information
peer_info = self._extract_peer_info(info)
print(f"Extracted peer info: {peer_info}")
if peer_info:
# your existing hook
self._handle_discovered_peer(peer_info)
def _extract_peer_info(self, service_info):
def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None:
try:
# Extract IP addresses
addresses = []
for addr in service_info.addresses:
ip = socket.inet_ntoa(addr)
addresses.append(ip)
# Extract port
port = service_info.port
# Extract peer ID from TXT record
peer_id = None
if service_info.properties:
peer_id_bytes = service_info.properties.get(b'id')
if peer_id_bytes:
peer_id = peer_id_bytes.decode('utf-8')
if not peer_id:
print(f"No peer ID found in TXT record for {service_info.name}")
addrs = [
f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}"
for addr in info.addresses
]
pid_bytes = info.properties.get(b"id")
if not pid_bytes:
return None
# Create multiaddresses
multiaddrs = []
for ip in addresses:
multiaddr = f"/ip4/{ip}/udp/{port}"
multiaddrs.append(multiaddr)
# Create PeerInfo object
peer_info = PeerInfo(
peer_id=ID.from_base58(peer_id),
addrs=multiaddrs
)
return peer_info
except Exception as e:
print(f"Error extracting peer info from {service_info.name}: {e}")
pid = ID.from_base58(pid_bytes.decode())
return PeerInfo(peer_id=pid, addrs=addrs)
except Exception:
return None
def _handle_discovered_peer(self, peer_info):
print(f"Successfully discovered peer: {peer_info.peer_id}")
print(f"Peer addresses: {peer_info.addrs}")
# Trigger callback if provided
if self.on_peer_discovery:
self.on_peer_discovery(peer_info)
def _handle_discovered_peer(self, peer_info: PeerInfo) -> None:
# your “emit” or “connect” logic goes here
# print("Discovered:", peer_info)
print("Discovered:", peer_info.peer_id)
def stop(self):
"""Stop the listener."""
if self.browser:
self.browser.cancel()
def stop(self) -> None:
self.browser.cancel()

View File

@ -3,55 +3,65 @@ mDNS-based peer discovery for py-libp2p.
Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md
Uses zeroconf for mDNS broadcast/listen. Async operations use trio.
"""
import trio
from typing import Callable, Optional
from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceStateChange
from .utils import (
stringGen
from zeroconf import (
Zeroconf,
)
from libp2p.abc import (
INetworkService
INetworkService,
)
from .broadcaster import (
PeerBroadcaster,
)
from .listener import (
PeerListener,
)
from .utils import (
stringGen,
)
from .listener import PeerListener
from .broadcaster import PeerBroadcaster
from libp2p.peer.peerinfo import PeerInfo
SERVICE_TYPE = "_p2p._udp.local."
MCAST_PORT = 5353
MCAST_ADDR = "224.0.0.251"
class MDNSDiscovery:
"""
mDNS-based peer discovery for py-libp2p, using zeroconf.
Conforms to the libp2p mDNS discovery spec.
"""
def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None):
def __init__(self, swarm: INetworkService, port: int = 8000):
self.peer_id = str(swarm.get_peer_id())
self.port = port
self.on_peer_discovery = on_peer_discovery
self.zeroconf = Zeroconf()
self.serviceName = f"{stringGen()}.{SERVICE_TYPE}"
self.peerstore = swarm.peerstore
self.swarm = swarm
self.broadcaster = PeerBroadcaster(
zeroconf=self.zeroconf,
service_type=SERVICE_TYPE,
service_name=self.serviceName,
peer_id = self.peer_id,
port = self.port
peer_id=self.peer_id,
port=self.port,
)
self.listener = PeerListener(
zeroconf=self.zeroconf,
peerstore=self.peerstore,
service_type=SERVICE_TYPE,
service_name=self.serviceName,
on_peer_discovery=self.on_peer_discovery
)
def start(self):
def start(self) -> None:
"""Register this peer and start listening for others."""
print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}")
print("host is listening on", self.swarm.listeners)
self.broadcaster.register()
# Listener is started in constructor
def stop(self):
def stop(self) -> None:
"""Unregister this peer and clean up zeroconf resources."""
self.broadcaster.unregister()
self.zeroconf.close()
self.zeroconf.close()

View File

@ -1,11 +1,11 @@
import random
import string
def stringGen(len: int = 63) -> str:
"""Generate a random string of lowercase letters and digits."""
charset = string.ascii_lowercase + string.digits
result = []
for _ in range(len):
result.append(random.choice(charset))
return ''.join(result)
return "".join(result)