mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
refactored code
This commit is contained in:
@ -6,15 +6,9 @@ import trio
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.abc import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.crypto.secp256k1 import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
from libp2p.discovery.events.peerDiscovery import (
|
||||
peerDiscovery,
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
@ -29,16 +23,10 @@ async def main():
|
||||
host = new_host(key_pair=key_pair, enable_mDNS=True)
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]):
|
||||
print("host peer id", host.get_id())
|
||||
|
||||
# Print discovered peers via mDNS
|
||||
print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...")
|
||||
try:
|
||||
while True:
|
||||
peer_info = PeerInfo(host.get_id(), host.get_addrs())
|
||||
|
||||
await trio.sleep(1)
|
||||
await peerDiscovery.emit_peer_discovered(peer_info=peer_info)
|
||||
await trio.sleep(100)
|
||||
except KeyboardInterrupt:
|
||||
print("Exiting...")
|
||||
|
||||
|
||||
@ -47,7 +47,6 @@ class PeerBroadcaster:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||||
s.connect(("8.8.8.8", 80))
|
||||
local_ip = s.getsockname()[0]
|
||||
print(f"Local IP determined: {local_ip}")
|
||||
return local_ip
|
||||
except Exception:
|
||||
# Fallback to localhost if we can't determine the IP
|
||||
@ -55,7 +54,6 @@ class PeerBroadcaster:
|
||||
|
||||
def register(self) -> None:
|
||||
"""Register the peer's mDNS service on the network."""
|
||||
print(repr(self.service_info))
|
||||
self.zeroconf.register_service(self.service_info)
|
||||
|
||||
def unregister(self) -> None:
|
||||
|
||||
@ -1,15 +1,13 @@
|
||||
import socket
|
||||
import time
|
||||
|
||||
from zeroconf import (
|
||||
ServiceBrowser,
|
||||
ServiceInfo,
|
||||
ServiceListener,
|
||||
ServiceStateChange,
|
||||
Zeroconf,
|
||||
)
|
||||
|
||||
from libp2p.abc import IPeerStore
|
||||
from libp2p.abc import IPeerStore, Multiaddr
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peerinfo import PeerInfo
|
||||
|
||||
@ -28,64 +26,51 @@ class PeerListener(ServiceListener):
|
||||
self.zeroconf = zeroconf
|
||||
self.service_type = service_type
|
||||
self.service_name = service_name
|
||||
self.discovered_services: set[str] = set()
|
||||
|
||||
# pass `self` as the listener object
|
||||
self.discovered_services: dict[str, ID] = {}
|
||||
self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self)
|
||||
|
||||
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
# map to your on_service_state_change logic
|
||||
self._on_state_change(zc, type_, name, ServiceStateChange.Added)
|
||||
|
||||
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
self._on_state_change(zc, type_, name, ServiceStateChange.Removed)
|
||||
|
||||
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
self._on_state_change(zc, type_, name, ServiceStateChange.Updated)
|
||||
|
||||
def _on_state_change(
|
||||
self,
|
||||
zeroconf: Zeroconf,
|
||||
service_type: str,
|
||||
name: str,
|
||||
state: ServiceStateChange,
|
||||
) -> None:
|
||||
# skip our own service
|
||||
if name == self.service_name:
|
||||
return
|
||||
|
||||
# handle Added
|
||||
if state is ServiceStateChange.Added:
|
||||
if name in self.discovered_services:
|
||||
return
|
||||
self.discovered_services.add(name)
|
||||
self._process_discovered_service(zeroconf, service_type, name)
|
||||
|
||||
# ...optional hooks for Removed/Updated if you need them
|
||||
|
||||
def _process_discovered_service(
|
||||
self, zeroconf: Zeroconf, service_type: str, name: str
|
||||
) -> None:
|
||||
# same retry logic you had before
|
||||
info = None
|
||||
for attempt in range(3):
|
||||
info = zeroconf.get_service_info(service_type, name, timeout=5000)
|
||||
if info:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
info = zc.get_service_info(type_, name, timeout=5000)
|
||||
if not info:
|
||||
return
|
||||
|
||||
peer_info = self._extract_peer_info(info)
|
||||
if peer_info:
|
||||
# your existing hook
|
||||
self._handle_discovered_peer(peer_info)
|
||||
self.discovered_services[name] = peer_info.peer_id
|
||||
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
|
||||
print("Discovered Peer:", peer_info.peer_id)
|
||||
|
||||
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
peer_id = self.discovered_services.pop(name)
|
||||
self.peerstore.clear_addrs(peer_id)
|
||||
print(f"Removed Peer: {peer_id}")
|
||||
|
||||
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
info = zc.get_service_info(type_, name, timeout=5000)
|
||||
if not info:
|
||||
return
|
||||
peer_info = self._extract_peer_info(info)
|
||||
if peer_info:
|
||||
self.peerstore.clear_addrs(peer_info.peer_id)
|
||||
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
|
||||
print("Updated Peer", peer_info.peer_id)
|
||||
|
||||
def _process_discovered_service(
|
||||
self, zeroconf: Zeroconf, type_: str, name: str
|
||||
) -> None:
|
||||
info = zeroconf.get_service_info(type_, name, timeout=5000)
|
||||
if not info:
|
||||
return
|
||||
peer_info = self._extract_peer_info(info)
|
||||
if peer_info:
|
||||
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
|
||||
print("Discovered:", peer_info.peer_id)
|
||||
|
||||
def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None:
|
||||
try:
|
||||
addrs = [
|
||||
f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}"
|
||||
Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}")
|
||||
for addr in info.addresses
|
||||
]
|
||||
pid_bytes = info.properties.get(b"id")
|
||||
@ -96,10 +81,5 @@ class PeerListener(ServiceListener):
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _handle_discovered_peer(self, peer_info: PeerInfo) -> None:
|
||||
# your “emit” or “connect” logic goes here
|
||||
# print("Discovered:", peer_info)
|
||||
print("Discovered:", peer_info.peer_id)
|
||||
|
||||
def stop(self) -> None:
|
||||
self.browser.cancel()
|
||||
|
||||
@ -57,7 +57,6 @@ class MDNSDiscovery:
|
||||
def start(self) -> None:
|
||||
"""Register this peer and start listening for others."""
|
||||
print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}")
|
||||
print("host is listening on", self.swarm.listeners)
|
||||
self.broadcaster.register()
|
||||
# Listener is started in constructor
|
||||
|
||||
|
||||
Reference in New Issue
Block a user