From 1507100632d54641355621e2e68561ac0203b9d4 Mon Sep 17 00:00:00 2001 From: paschal533 Date: Mon, 9 Jun 2025 00:11:19 +0100 Subject: [PATCH 01/10] Add interoperability test for py-libp2p and js-libp2p with enhanced logging --- tests/interop/js_libp2p/README.md | 81 ++++ tests/interop/js_libp2p/js_node/.gitIgnore | 4 + .../js_node/.github/pull_request_template.md | 17 + .../js_node/.github/workflows/sync.yml | 19 + tests/interop/js_libp2p/js_node/README.md | 53 +++ tests/interop/js_libp2p/js_node/package.json | 39 ++ tests/interop/js_libp2p/js_node/src/ping.js | 204 +++++++++ .../js_libp2p/js_node/src/ping_client.js | 241 +++++++++++ .../js_libp2p/js_node/src/ping_server.js | 167 ++++++++ tests/interop/js_libp2p/py_node/ping.py | 398 ++++++++++++++++++ tests/interop/js_libp2p/scripts/run_test.ps1 | 194 +++++++++ tests/interop/js_libp2p/scripts/run_test.sh | 215 ++++++++++ tests/interop/js_libp2p/test_js_basic.py | 5 - 13 files changed, 1632 insertions(+), 5 deletions(-) create mode 100644 tests/interop/js_libp2p/README.md create mode 100644 tests/interop/js_libp2p/js_node/.gitIgnore create mode 100644 tests/interop/js_libp2p/js_node/.github/pull_request_template.md create mode 100644 tests/interop/js_libp2p/js_node/.github/workflows/sync.yml create mode 100644 tests/interop/js_libp2p/js_node/README.md create mode 100644 tests/interop/js_libp2p/js_node/package.json create mode 100644 tests/interop/js_libp2p/js_node/src/ping.js create mode 100644 tests/interop/js_libp2p/js_node/src/ping_client.js create mode 100644 tests/interop/js_libp2p/js_node/src/ping_server.js create mode 100644 tests/interop/js_libp2p/py_node/ping.py create mode 100644 tests/interop/js_libp2p/scripts/run_test.ps1 create mode 100644 tests/interop/js_libp2p/scripts/run_test.sh delete mode 100644 tests/interop/js_libp2p/test_js_basic.py diff --git a/tests/interop/js_libp2p/README.md b/tests/interop/js_libp2p/README.md new file mode 100644 index 00000000..4c4d40b1 --- /dev/null +++ b/tests/interop/js_libp2p/README.md @@ -0,0 +1,81 @@ +# py-libp2p and js-libp2p Interoperability Tests + +This repository contains interoperability tests for py-libp2p and js-libp2p using the /ipfs/ping/1.0.0 protocol. The goal is to verify compatibility in stream multiplexing, protocol negotiation, ping handling, transport layer, and multiaddr parsing. + +## Directory Structure + +- js_node/ping.js: JavaScript implementation of a ping server and client using libp2p. +- py_node/ping.py: Python implementation of a ping server and client using py-libp2p. +- scripts/run_test.sh: Shell script to automate running the server and client for testing. +- README.md: This file. + +## Prerequisites + +- Python 3.8+ with `py-libp2p` and dependencies (`pip install libp2p trio cryptography multiaddr`). +- Node.js 16+ with `libp2p` dependencies (`npm install @libp2p/core @libp2p/tcp @chainsafe/libp2p-noise @chainsafe/libp2p-yamux @libp2p/ping @libp2p/identify @multiformats/multiaddr`). +- Bash shell for running `run_test.sh`. + +## Running Tests + +1. Change directory: + +``` +cd tests/interop/js_libp2p +``` + +2. Install dependencies: + +``` +For JavaScript: cd js_node && npm install && cd ... +``` + +3. Run the automated test: + +For Linux and Mac users: + +``` +chmod +x scripts/run_test.sh +./scripts/run_test.sh +``` + +For Windows users: + +``` +.\scripts\run_test.ps1 +``` + +This starts the Python server on port 8000 and runs the JavaScript client to send 5 pings. + +## Debugging + +- Logs are saved in py_node/py_server.log and js_node/js_client.log. +- Check for: + - Successful connection establishment. + - Protocol negotiation (/ipfs/ping/1.0.0). + - 32-byte payload echo in server logs. + - RTT and payload hex in client logs. + +## Test Plan + +### The test verifies: + +- Stream Multiplexer Compatibility: Yamux is used and negotiates correctly. +- Multistream Protocol Negotiation: /ipfs/ping/1.0.0 is selected via multistream-select. +- Ping Protocol Handler: Handles 32-byte payloads per the libp2p ping spec. +- Transport Layer Support: TCP is used; WebSocket support is optional. +- Multiaddr Parsing: Correctly resolves multiaddr strings. +- Logging: Includes peer ID, RTT, and payload hex for debugging. + +## Current Status + +### Working: + +- TCP transport and Noise encryption are functional. +- Yamux multiplexing is implemented in both nodes. +- Multiaddr parsing works correctly. +- Logging provides detailed debug information. + +## Not Working: + +- Ping protocol handler fails to complete pings (JS client reports "operation aborted"). +- Potential issues with stream handling or protocol negotiation. diff --git a/tests/interop/js_libp2p/js_node/.gitIgnore b/tests/interop/js_libp2p/js_node/.gitIgnore new file mode 100644 index 00000000..59bb2a9a --- /dev/null +++ b/tests/interop/js_libp2p/js_node/.gitIgnore @@ -0,0 +1,4 @@ +/node_modules +/package-lock.json +/dist +.log diff --git a/tests/interop/js_libp2p/js_node/.github/pull_request_template.md b/tests/interop/js_libp2p/js_node/.github/pull_request_template.md new file mode 100644 index 00000000..b47baa1f --- /dev/null +++ b/tests/interop/js_libp2p/js_node/.github/pull_request_template.md @@ -0,0 +1,17 @@ +# โš ๏ธ IMPORTANT โš ๏ธ + +# Please do not create a Pull Request for this repository + +The contents of this repository are automatically synced from the parent [js-libp2p Examples Project](https://github.com/libp2p/js-libp2p-examples) so any changes made to the standalone repository will be lost after the next sync. + +Please open a PR against [js-libp2p Examples](https://github.com/libp2p/js-libp2p-examples) instead. + +## Contributing + +Contributions are what make the open source community such an amazing place to be learn, inspire, and create. Any contributions you make are **greatly appreciated**. + +1. Fork the [js-libp2p Examples Project](https://github.com/libp2p/js-libp2p-examples) +1. Create your Feature Branch (`git checkout -b feature/amazing-example`) +1. Commit your Changes (`git commit -a -m 'feat: add some amazing example'`) +1. Push to the Branch (`git push origin feature/amazing-example`) +1. Open a Pull Request diff --git a/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml b/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml new file mode 100644 index 00000000..78f6c8d1 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml @@ -0,0 +1,19 @@ +name: pull + +on: + workflow_dispatch + +jobs: + sync: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Pull from another repository + uses: ipfs-examples/actions-pull-directory-from-repo@main + with: + source-repo: libp2p/js-libp2p-examples + source-folder-path: examples/${{ github.event.repository.name }} + source-branch: main + target-branch: main + git-username: github-actions + git-email: github-actions@github.com diff --git a/tests/interop/js_libp2p/js_node/README.md b/tests/interop/js_libp2p/js_node/README.md new file mode 100644 index 00000000..419dfc4a --- /dev/null +++ b/tests/interop/js_libp2p/js_node/README.md @@ -0,0 +1,53 @@ +# @libp2p/example-chat + +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-examples.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-examples) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p-examples/ci.yml?branch=main&style=flat-square)](https://github.com/libp2p/js-libp2p-examples/actions/workflows/ci.yml?query=branch%3Amain) + +> An example chat app using libp2p + +## Table of contents + +- [Setup](#setup) +- [Running](#running) +- [Need help?](#need-help) +- [License](#license) +- [Contribution](#contribution) + +## Setup + +1. Install example dependencies + ```console + $ npm install + ``` +1. Open 2 terminal windows in the `./src` directory. + +## Running + +1. Run the listener in window 1, `node listener.js` +1. Run the dialer in window 2, `node dialer.js` +1. Wait until the two peers discover each other +1. Type a message in either window and hit *enter* +1. Tell yourself secrets to your hearts content! + +## Need help? + +- Read the [js-libp2p documentation](https://github.com/libp2p/js-libp2p/tree/main/doc) +- Check out the [js-libp2p API docs](https://libp2p.github.io/js-libp2p/) +- Check out the [general libp2p documentation](https://docs.libp2p.io) for tips, how-tos and more +- Read the [libp2p specs](https://github.com/libp2p/specs) +- Ask a question on the [js-libp2p discussion board](https://github.com/libp2p/js-libp2p/discussions) + +## License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](LICENSE-MIT) / ) + +## Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/tests/interop/js_libp2p/js_node/package.json b/tests/interop/js_libp2p/js_node/package.json new file mode 100644 index 00000000..e89ebc8f --- /dev/null +++ b/tests/interop/js_libp2p/js_node/package.json @@ -0,0 +1,39 @@ +{ + "name": "@libp2p/example-chat", + "version": "0.0.0", + "description": "An example chat app using libp2p", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p-example-chat#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p-examples.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-examples/issues" + }, + "type": "module", + "scripts": { + "test": "test-node-example test/*" + }, + "dependencies": { + "@chainsafe/libp2p-noise": "^16.0.0", + "@chainsafe/libp2p-yamux": "^7.0.0", + "@libp2p/identify": "^3.0.33", + "@libp2p/mdns": "^11.0.1", + "@libp2p/ping": "^2.0.33", + "@libp2p/tcp": "^10.0.0", + "@libp2p/websockets": "^9.0.0", + "@multiformats/multiaddr": "^12.3.1", + "@nodeutils/defaults-deep": "^1.1.0", + "it-length-prefixed": "^10.0.1", + "it-map": "^3.0.3", + "it-pipe": "^3.0.1", + "libp2p": "^2.0.0", + "p-defer": "^4.0.0", + "uint8arrays": "^5.1.0" + }, + "devDependencies": { + "test-ipfs-example": "^1.1.0" + }, + "private": true +} diff --git a/tests/interop/js_libp2p/js_node/src/ping.js b/tests/interop/js_libp2p/js_node/src/ping.js new file mode 100644 index 00000000..c5a658c7 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping.js @@ -0,0 +1,204 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import { multiaddr } from '@multiformats/multiaddr' + +async function createNode() { + return await createLibp2p({ + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + // Use ipfs prefix to match py-libp2p example + ping: ping({ + protocolPrefix: 'ipfs', + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000 + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000 + } + }) +} + +async function runServer() { + console.log('๐Ÿš€ Starting js-libp2p ping server...') + + const node = await createNode() + await node.start() + + console.log('โœ… Server started!') + console.log(`๐Ÿ“‹ Peer ID: ${node.peerId.toString()}`) + console.log('๐Ÿ“ Listening addresses:') + + node.getMultiaddrs().forEach(addr => { + console.log(` ${addr.toString()}`) + }) + + // Listen for connections + node.addEventListener('peer:connect', (evt) => { + console.log(`๐Ÿ”— Peer connected: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + console.log(`โŒ Peer disconnected: ${evt.detail.toString()}`) + }) + + console.log('\n๐ŸŽง Server ready for ping requests...') + console.log('Press Ctrl+C to exit') + + // Graceful shutdown + process.on('SIGINT', async () => { + console.log('\n๐Ÿ›‘ Shutting down...') + await node.stop() + process.exit(0) + }) + + // Keep alive + while (true) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } +} + +async function runClient(targetAddr, count = 5) { + console.log('๐Ÿš€ Starting js-libp2p ping client...') + + const node = await createNode() + await node.start() + + console.log(`๐Ÿ“‹ Our Peer ID: ${node.peerId.toString()}`) + console.log(`๐ŸŽฏ Target: ${targetAddr}`) + + try { + const ma = multiaddr(targetAddr) + const targetPeerId = ma.getPeerId() + + if (!targetPeerId) { + throw new Error('Could not extract peer ID from multiaddr') + } + + console.log(`๐ŸŽฏ Target Peer ID: ${targetPeerId}`) + console.log('๐Ÿ”— Connecting to peer...') + + const connection = await node.dial(ma) + console.log('โœ… Connection established!') + console.log(`๐Ÿ”— Connected to: ${connection.remotePeer.toString()}`) + + // Add a small delay to let the connection fully establish + await new Promise(resolve => setTimeout(resolve, 1000)) + + const rtts = [] + + for (let i = 1; i <= count; i++) { + try { + console.log(`\n๐Ÿ“ Sending ping ${i}/${count}...`); + console.log('[DEBUG] Attempting to open ping stream with protocol: /ipfs/ping/1.0.0'); + const start = Date.now() + + const stream = await connection.newStream(['/ipfs/ping/1.0.0']).catch(err => { + console.error(`[ERROR] Failed to open ping stream: ${err.message}`); + throw err; + }); + console.log('[DEBUG] Ping stream opened successfully'); + + const latency = await Promise.race([ + node.services.ping.ping(connection.remotePeer), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Ping timeout')), 30000) // Increased timeout + ) + ]).catch(err => { + console.error(`[ERROR] Ping ${i} error: ${err.message}`); + throw err; + }); + + const rtt = Date.now() - start; + + rtts.push(latency) + console.log(`โœ… Ping ${i} successful!`) + console.log(` Reported latency: ${latency}ms`) + console.log(` Measured RTT: ${rtt}ms`) + + if (i < count) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } catch (error) { + console.error(`โŒ Ping ${i} failed:`, error.message) + // Try to continue with other pings + } + } + + // Stats + if (rtts.length > 0) { + const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length + const min = Math.min(...rtts) + const max = Math.max(...rtts) + + console.log(`\n๐Ÿ“Š Ping Statistics:`) + console.log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`) + console.log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`) + } else { + console.log(`\n๐Ÿ“Š All pings failed (${count} attempts)`) + } + + } catch (error) { + console.error('โŒ Client error:', error.message) + console.error('Stack:', error.stack) + process.exit(1) + } finally { + await node.stop() + console.log('\nโน๏ธ Client stopped') + } +} + +async function main() { + const args = process.argv.slice(2) + + if (args.length === 0) { + console.log('Usage:') + console.log(' node ping.js server # Start ping server') + console.log(' node ping.js client [count] # Ping a peer') + console.log('') + console.log('Examples:') + console.log(' node ping.js server') + console.log(' node ping.js client /ip4/127.0.0.1/tcp/12345/p2p/12D3Ko... 5') + process.exit(1) + } + + const mode = args[0] + + if (mode === 'server') { + await runServer() + } else if (mode === 'client') { + if (args.length < 2) { + console.error('โŒ Client mode requires target multiaddr') + process.exit(1) + } + const targetAddr = args[1] + const count = parseInt(args[2]) || 5 + await runClient(targetAddr, count) + } else { + console.error('โŒ Invalid mode. Use "server" or "client"') + process.exit(1) + } +} + +main().catch(console.error) diff --git a/tests/interop/js_libp2p/js_node/src/ping_client.js b/tests/interop/js_libp2p/js_node/src/ping_client.js new file mode 100644 index 00000000..4708dd4f --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping_client.js @@ -0,0 +1,241 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import { multiaddr } from '@multiformats/multiaddr' +import fs from 'fs' +import path from 'path' + +// Create logs directory if it doesn't exist +const logsDir = path.join(process.cwd(), '../logs') +if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir, { recursive: true }) +} + +// Setup logging +const logFile = path.join(logsDir, 'js_ping_client.log') +const logStream = fs.createWriteStream(logFile, { flags: 'w' }) + +function log(message) { + const timestamp = new Date().toISOString() + const logLine = `${timestamp} - ${message}\n` + logStream.write(logLine) + console.log(message) +} + +async function createNode() { + log('๐Ÿ”ง Creating libp2p node...') + + const node = await createLibp2p({ + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] // Random port + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + ping: ping({ + protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000, + runOnTransientConnection: true + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000, + maxParallelDials: 10 + } + }) + + log('โœ… Node created successfully') + return node +} + +async function runClient(targetAddr, count = 5) { + log('๐Ÿš€ Starting js-libp2p ping client...') + + const node = await createNode() + + // Add connection event listeners + node.addEventListener('peer:connect', (evt) => { + log(`๐Ÿ”— Connected to peer: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + log(`โŒ Disconnected from peer: ${evt.detail.toString()}`) + }) + + await node.start() + log('โœ… Node started') + + log(`๐Ÿ“‹ Our Peer ID: ${node.peerId.toString()}`) + log(`๐ŸŽฏ Target: ${targetAddr}`) + + try { + const ma = multiaddr(targetAddr) + const targetPeerId = ma.getPeerId() + + if (!targetPeerId) { + throw new Error('Could not extract peer ID from multiaddr') + } + + log(`๐ŸŽฏ Target Peer ID: ${targetPeerId}`) + + // Parse multiaddr components for debugging + const components = ma.toString().split('/') + log(`๐Ÿ“ Target components: ${components.join(' โ†’ ')}`) + + log('๐Ÿ”— Attempting to dial peer...') + const connection = await node.dial(ma) + log('โœ… Connection established!') + log(`๐Ÿ”— Connected to: ${connection.remotePeer.toString()}`) + log(`๐Ÿ”— Connection status: ${connection.status}`) + log(`๐Ÿ”— Connection direction: ${connection.direction}`) + + // List available protocols + if (connection.remoteAddr) { + log(`๐ŸŒ Remote address: ${connection.remoteAddr.toString()}`) + } + + // Wait for connection to stabilize + log('โณ Waiting for connection to stabilize...') + await new Promise(resolve => setTimeout(resolve, 2000)) + + // Attempt ping sequence + log(`\n๐Ÿ“ Starting ping sequence (${count} pings)...`) + const rtts = [] + + for (let i = 1; i <= count; i++) { + try { + log(`\n๐Ÿ“ Sending ping ${i}/${count}...`) + const start = Date.now() + + // Create a more robust ping with better error handling + const pingPromise = node.services.ping.ping(connection.remotePeer) + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Ping timeout (15s)')), 15000) + ) + + const latency = await Promise.race([pingPromise, timeoutPromise]) + const totalRtt = Date.now() - start + + rtts.push(latency) + log(`โœ… Ping ${i} successful!`) + log(` Reported latency: ${latency}ms`) + log(` Total RTT: ${totalRtt}ms`) + + // Wait between pings + if (i < count) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } catch (error) { + log(`โŒ Ping ${i} failed: ${error.message}`) + log(` Error type: ${error.constructor.name}`) + if (error.code) { + log(` Error code: ${error.code}`) + } + + // Check if connection is still alive + if (connection.status !== 'open') { + log(`โš ๏ธ Connection status changed to: ${connection.status}`) + break + } + } + } + + // Print statistics + if (rtts.length > 0) { + const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length + const min = Math.min(...rtts) + const max = Math.max(...rtts) + const lossRate = ((count - rtts.length) / count * 100).toFixed(1) + + log(`\n๐Ÿ“Š Ping Statistics:`) + log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`) + log(` Loss rate: ${lossRate}%`) + log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`) + } else { + log(`\n๐Ÿ“Š All pings failed (${count} attempts)`) + } + + // Close connection gracefully + log('\n๐Ÿ”’ Closing connection...') + await connection.close() + + } catch (error) { + log(`โŒ Client error: ${error.message}`) + log(` Error type: ${error.constructor.name}`) + if (error.stack) { + log(` Stack trace: ${error.stack}`) + } + process.exit(1) + } finally { + log('๐Ÿ›‘ Stopping node...') + await node.stop() + log('โน๏ธ Client stopped') + logStream.end() + } +} + +async function main() { + const args = process.argv.slice(2) + + if (args.length === 0) { + console.log('Usage:') + console.log(' node ping-client.js [count]') + console.log('') + console.log('Examples:') + console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 5') + console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 10') + process.exit(1) + } + + const targetAddr = args[0] + const count = parseInt(args[1]) || 5 + + if (count <= 0 || count > 100) { + console.error('โŒ Count must be between 1 and 100') + process.exit(1) + } + + await runClient(targetAddr, count) +} + +// Handle graceful shutdown +process.on('SIGINT', () => { + log('\n๐Ÿ‘‹ Shutting down...') + logStream.end() + process.exit(0) +}) + +process.on('uncaughtException', (error) => { + log(`๐Ÿ’ฅ Uncaught exception: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + process.exit(1) +}) + +main().catch((error) => { + log(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + process.exit(1) +}) diff --git a/tests/interop/js_libp2p/js_node/src/ping_server.js b/tests/interop/js_libp2p/js_node/src/ping_server.js new file mode 100644 index 00000000..6188cc65 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping_server.js @@ -0,0 +1,167 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import fs from 'fs' +import path from 'path' + +// Create logs directory if it doesn't exist +const logsDir = path.join(process.cwd(), '../logs') +if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir, { recursive: true }) +} + +// Setup logging +const logFile = path.join(logsDir, 'js_ping_server.log') +const logStream = fs.createWriteStream(logFile, { flags: 'w' }) + +function log(message) { + const timestamp = new Date().toISOString() + const logLine = `${timestamp} - ${message}\n` + logStream.write(logLine) + console.log(message) +} + +async function createNode(port) { + log('๐Ÿ”ง Creating libp2p node...') + + const node = await createLibp2p({ + addresses: { + listen: [`/ip4/0.0.0.0/tcp/${port}`] + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + ping: ping({ + protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000, + runOnTransientConnection: true + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000, + maxParallelDials: 10 + } + }) + + log('โœ… Node created successfully') + return node +} + +async function runServer(port) { + log('๐Ÿš€ Starting js-libp2p ping server...') + + const node = await createNode(port) + + // Add connection event listeners + node.addEventListener('peer:connect', (evt) => { + log(`๐Ÿ”— New peer connected: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + log(`โŒ Peer disconnected: ${evt.detail.toString()}`) + }) + + // Add protocol handler for incoming streams + node.addEventListener('peer:identify', (evt) => { + log(`๐Ÿ” Peer identified: ${evt.detail.peerId.toString()}`) + log(` Protocols: ${evt.detail.protocols.join(', ')}`) + log(` Listen addresses: ${evt.detail.listenAddrs.map(addr => addr.toString()).join(', ')}`) + }) + + await node.start() + log('โœ… Node started') + + const peerId = node.peerId.toString() + const listenAddrs = node.getMultiaddrs() + + log(`๐Ÿ“‹ Peer ID: ${peerId}`) + log(`๐ŸŒ Listen addresses:`) + listenAddrs.forEach(addr => { + log(` ${addr.toString()}`) + }) + + // Find the main TCP address for easy copy-paste + const tcpAddr = listenAddrs.find(addr => + addr.toString().includes('/tcp/') && + !addr.toString().includes('/ws') + ) + + if (tcpAddr) { + log(`\n๐Ÿงช Test with py-libp2p:`) + log(` python ping_client.py ${tcpAddr.toString()}`) + log(`\n๐Ÿงช Test with js-libp2p:`) + log(` node ping-client.js ${tcpAddr.toString()}`) + } + + log(`\n๐Ÿ“ Ping service is running with protocol: /ipfs/ping/1.0.0`) + log(`๐Ÿ” Security: Noise encryption`) + log(`๐Ÿš‡ Muxer: Yamux stream multiplexing`) + log(`\nโณ Waiting for connections...`) + log('Press Ctrl+C to exit') + + // Keep the server running + return new Promise((resolve, reject) => { + process.on('SIGINT', () => { + log('\n๐Ÿ›‘ Shutting down server...') + node.stop().then(() => { + log('โน๏ธ Server stopped') + logStream.end() + resolve() + }).catch(reject) + }) + + process.on('uncaughtException', (error) => { + log(`๐Ÿ’ฅ Uncaught exception: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + reject(error) + }) + }) +} + +async function main() { + const args = process.argv.slice(2) + const port = parseInt(args[0]) || 9000 + + if (port <= 0 || port > 65535) { + console.error('โŒ Port must be between 1 and 65535') + process.exit(1) + } + + try { + await runServer(port) + } catch (error) { + console.error(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + console.error(`Stack: ${error.stack}`) + } + process.exit(1) + } +} + +main().catch((error) => { + console.error(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + console.error(`Stack: ${error.stack}`) + } + process.exit(1) +}) diff --git a/tests/interop/js_libp2p/py_node/ping.py b/tests/interop/js_libp2p/py_node/ping.py new file mode 100644 index 00000000..a13a8ace --- /dev/null +++ b/tests/interop/js_libp2p/py_node/ping.py @@ -0,0 +1,398 @@ +import argparse +import logging + +from cryptography.hazmat.primitives.asymmetric import ( + x25519, +) +import multiaddr +import trio + +from libp2p import ( + generate_new_rsa_identity, + new_host, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.net_stream import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.security.noise.transport import Transport as NoiseTransport +from libp2p.stream_muxer.yamux.yamux import ( + Yamux, +) +from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID + +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + +PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") +PING_LENGTH = 32 +RESP_TIMEOUT = 60 + + +async def handle_ping(stream: INetStream) -> None: + """Handle incoming ping requests from js-libp2p clients""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] New ping stream opened by {peer_id}") + logging.info(f"Ping handler called for peer {peer_id}") + + ping_count = 0 + + try: + while True: + try: + print(f"[INFO] Waiting for ping data from {peer_id}...") + logging.debug(f"Stream state: {stream}") + data = await stream.read(PING_LENGTH) + + if not data: + print( + f"[INFO] No data received," + f"connection likely closed by {peer_id}" + ) + logging.debug("No data received, stream closed") + break + + if len(data) == 0: + print(f"[INFO] Empty data received, connection closed by {peer_id}") + logging.debug("Empty data received") + break + + ping_count += 1 + print( + f"[PING {ping_count}] Received ping from {peer_id}:" + f"{len(data)} bytes" + ) + logging.debug(f"Ping data: {data.hex()}") + + await stream.write(data) + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") + + except Exception as e: + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") + logging.exception("Ping loop error") + break + + except Exception as e: + print(f"[ERROR] Error handling ping from {peer_id}: {e}") + logging.exception("Ping handler error") + finally: + try: + print(f"[INFO] Closing ping stream with {peer_id}") + await stream.close() + except Exception as e: + logging.debug(f"Error closing stream: {e}") + + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") + + +async def send_ping_sequence(stream: INetStream, count: int = 5) -> None: + """Send a sequence of pings compatible with js-libp2p.""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") + + import os + import time + + rtts = [] + + for i in range(1, count + 1): + try: + payload = os.urandom(PING_LENGTH) + print(f"[PING {i}/{count}] Sending ping to {peer_id}") + logging.debug(f"Sending payload: {payload.hex()}") + start_time = time.time() + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + end_time = time.time() + rtt = (end_time - start_time) * 1000 + + if ( + response + and len(response) >= PING_LENGTH + and response[:PING_LENGTH] == payload + ): + rtts.append(rtt) + print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms") + else: + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") + if response: + logging.debug(f"Expected: {payload.hex()}") + logging.debug(f"Received: {response.hex()}") + + if i < count: + await trio.sleep(1) + + except trio.TooSlowError: + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") + except Exception as e: + print(f"[ERROR] Ping {i} failed: {e}") + logging.exception(f"Ping {i} error") + + if rtts: + avg_rtt = sum(rtts) / len(rtts) + min_rtt = min(rtts) + max_rtts = max(rtts) + success_count = len(rtts) + loss_rate = ((count - success_count) / count) * 100 + + print( + f" Packets: Sent={count}, Received={success_count}," + f" Lost={count - success_count}" + ) + print(f" Loss rate: {loss_rate:.1f}%") + print( + f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms," f"max={max_rtts:.2f}ms" + ) + else: + print(f"\n[STATS] All pings failed ({count} attempts)") + + +def create_noise_keypair(): + try: + x25519_private_key = x25519.X25519PrivateKey.generate() + + class NoisePrivateKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.private_bytes_raw() + + def public_key(self): + return NoisePublicKey(self._key.public_key()) + + def get_public_key(self): + return NoisePublicKey(self._key.public_key()) + + class NoisePublicKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.public_bytes_raw() + + return NoisePrivateKey(x25519_private_key) + except Exception as e: + logging.error(f"Failed to create Noise keypair: {e}") + return None + + +async def run_server(port: int) -> None: + """Run ping server that accepts connections from js-libp2p clients.""" + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping server...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + + alt_protocols = [ + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + for alt_proto in alt_protocols: + print(f"[INFO] Also registering handler for: {alt_proto}") + host.set_stream_handler(alt_proto, handle_ping) + + print("[INFO] Server started!") + print(f"[INFO] Peer ID: {host.get_id()}") + print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}") + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") + # print(f"[INFO] Security: Noise encryption") + # print(f"[INFO] Muxer: Yamux stream multiplexing") + + print("\n[INFO] Registered protocols:") + print(f" - {PING_PROTOCOL_ID}") + for proto in alt_protocols: + print(f" - {proto}") + + peer_id = host.get_id() + print("\n[TEST] Test with js-libp2p:") + print(f" node ping.js client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[TEST] Test with py-libp2p:") + print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[INFO] Waiting for connections...") + print("Press Ctrl+C to exit") + + await trio.sleep_forever() + + +async def run_client(destination: str, count: int = 5) -> None: + """Run ping client to test connectivity with another peer.""" + listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") + + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping client...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Our Peer ID: {host.get_id()}") + print(f"[INFO] Target: {destination}") + print("[INFO] Security: Noise encryption") + print("[INFO] Muxer: Yamux stream multiplexing") + + try: + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + target_peer_id = info.peer_id + + print(f"[INFO] Target Peer ID: {target_peer_id}") + print("[INFO] Connecting to peer...") + + await host.connect(info) + print("[INFO] Connection established!") + + protocols_to_try = [ + PING_PROTOCOL_ID, + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + stream = None + + for proto in protocols_to_try: + try: + print(f"[INFO] Trying to open stream with protocol: {proto}") + stream = await host.new_stream(target_peer_id, [proto]) + print(f"[INFO] Stream opened with protocol: {proto}") + break + except Exception as e: + print(f"[ERROR] Failed to open stream with {proto}: {e}") + continue + + if not stream: + print("[ERROR] Failed to open stream with any ping protocol") + return 1 + + await send_ping_sequence(stream, count) + + await stream.close() + + except Exception as e: + print(f"[ERROR] Client error: {e}") + import traceback + + traceback.print_exc() + return 1 + + print("\n[INFO] Client stopped") + return 0 + + +def main() -> None: + """Main function with argument parsing.""" + description = """ + py-libp2p ping tool for interoperability testing with js-libp2p. + Uses Noise encryption and Yamux multiplexing for compatibility. + + Server mode: Listens for ping requests from js-libp2p or py-libp2p clients. + Client mode: Sends ping requests to js-libp2p or py-libp2p servers. + """ + + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) + + parser = argparse.ArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + python ping.py server # Start server on port 8000 + python ping.py server --port 9000 # Start server on port 9000 + python ping.py client {example_maddr} + python ping.py client {example_maddr} --count 10 + """, + ) + + subparsers = parser.add_subparsers(dest="mode", help="Operation mode") + + server_parser = subparsers.add_parser("server", help="Run as ping server") + server_parser.add_argument( + "--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)" + ) + + client_parser = subparsers.add_parser("client", help="Run as ping client") + client_parser.add_argument("destination", help="Target peer multiaddr") + client_parser.add_argument( + "--count", + "-c", + type=int, + default=5, + help="Number of pings to send (default: 5)", + ) + + args = parser.parse_args() + + if not args.mode: + parser.print_help() + return 1 + + try: + if args.mode == "server": + trio.run(run_server, args.port) + elif args.mode == "client": + return trio.run(run_client, args.destination, args.count) + except KeyboardInterrupt: + print("\n[INFO] Goodbye!") + return 0 + except Exception as e: + print(f"[ERROR] Fatal error: {e}") + import traceback + + traceback.print_exc() + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/tests/interop/js_libp2p/scripts/run_test.ps1 b/tests/interop/js_libp2p/scripts/run_test.ps1 new file mode 100644 index 00000000..9654fc50 --- /dev/null +++ b/tests/interop/js_libp2p/scripts/run_test.ps1 @@ -0,0 +1,194 @@ +#!/usr/bin/env pwsh + +# run_test.ps1 - libp2p Interoperability Test Runner (PowerShell) +# Tests py-libp2p <-> js-libp2p ping communication + +$ErrorActionPreference = "Stop" + +# Colors for output +$Red = "`e[31m" +$Green = "`e[32m" +$Yellow = "`e[33m" +$Blue = "`e[34m" +$Cyan = "`e[36m" +$Reset = "`e[0m" + +function Write-ColorOutput { + param([string]$Message, [string]$Color = $Reset) + Write-Host "${Color}${Message}${Reset}" +} + +Write-ColorOutput "[CHECK] Checking prerequisites..." $Cyan +if (-not (Get-Command python -ErrorAction SilentlyContinue)) { + Write-ColorOutput "[ERROR] Python not found. Install Python 3.7+" $Red + exit 1 +} +if (-not (Get-Command node -ErrorAction SilentlyContinue)) { + Write-ColorOutput "[ERROR] Node.js not found. Install Node.js 16+" $Red + exit 1 +} + +Write-ColorOutput "[CHECK] Checking port 8000..." $Blue +$portCheck = netstat -a -n -o | findstr :8000 +if ($portCheck) { + Write-ColorOutput "[ERROR] Port 8000 in use. Free the port." $Red + Write-ColorOutput $portCheck $Yellow + exit 1 +} + +Write-ColorOutput "[DEBUG] Cleaning up Python processes..." $Blue +Get-Process -Name "python" -ErrorAction SilentlyContinue | Where-Object { $_.CommandLine -like "*ping.py*" } | Stop-Process -Force -ErrorAction SilentlyContinue + +Write-ColorOutput "[PYTHON] Starting server on port 8000..." $Yellow +Set-Location -Path "py_node" +$pyLogFile = "py_server_8000.log" +$pyErrLogFile = "py_server_8000.log.err" +$pyDebugLogFile = "ping_debug.log" + +if (Test-Path $pyLogFile) { Remove-Item $pyLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $pyErrLogFile) { Remove-Item $pyErrLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $pyDebugLogFile) { Remove-Item $pyDebugLogFile -Force -ErrorAction SilentlyContinue } + +$pyProcess = Start-Process -FilePath "python" -ArgumentList "-u", "ping.py", "server", "--port", "8000" -NoNewWindow -PassThru -RedirectStandardOutput $pyLogFile -RedirectStandardError $pyErrLogFile +Write-ColorOutput "[DEBUG] Python server PID: $($pyProcess.Id)" $Blue +Write-ColorOutput "[DEBUG] Python logs: $((Get-Location).Path)\$pyLogFile, $((Get-Location).Path)\$pyErrLogFile, $((Get-Location).Path)\$pyDebugLogFile" $Blue + +$timeoutSeconds = 20 +$startTime = Get-Date +$serverStarted = $false + +while (((Get-Date) - $startTime).TotalSeconds -lt $timeoutSeconds -and -not $serverStarted) { + if (Test-Path $pyLogFile) { + $content = Get-Content $pyLogFile -Raw -ErrorAction SilentlyContinue + if ($content -match "Server started|Listening") { + $serverStarted = $true + Write-ColorOutput "[OK] Python server started" $Green + } + } + if (Test-Path $pyErrLogFile) { + $errContent = Get-Content $pyErrLogFile -Raw -ErrorAction SilentlyContinue + if ($errContent) { + Write-ColorOutput "[DEBUG] Error log: $errContent" $Yellow + } + } + Start-Sleep -Milliseconds 500 +} + +if (-not $serverStarted) { + Write-ColorOutput "[ERROR] Python server failed to start" $Red + Write-ColorOutput "[DEBUG] Logs:" $Yellow + if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow } + Write-ColorOutput "[DEBUG] Trying foreground run..." $Yellow + python -u ping.py server --port 8000 + exit 1 +} + +# Extract Peer ID +$peerInfo = $null +if (Test-Path $pyLogFile) { + $content = Get-Content $pyLogFile -Raw + $peerIdPattern = "Peer ID:\s*([A-Za-z0-9]+)" + $peerIdMatch = [regex]::Match($content, $peerIdPattern) + if ($peerIdMatch.Success) { + $peerId = $peerIdMatch.Groups[1].Value + $peerInfo = @{ + PeerId = $peerId + MultiAddr = "/ip4/127.0.0.1/tcp/8000/p2p/$peerId" + } + Write-ColorOutput "[OK] Peer ID: $peerId" $Cyan + Write-ColorOutput "[OK] MultiAddr: $($peerInfo.MultiAddr)" $Cyan + } +} + +if (-not $peerInfo) { + Write-ColorOutput "[ERROR] Could not extract Peer ID" $Red + if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow } + Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue + exit 1 +} + +# Start JavaScript client +Write-ColorOutput "[JAVASCRIPT] Starting client..." $Yellow +Set-Location -Path "../js_node" +$jsLogFile = "test_js_client_to_py_server.log" +$jsErrLogFile = "test_js_client_to_py_server.log.err" + +if (Test-Path $jsLogFile) { Remove-Item $jsLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $jsErrLogFile) { Remove-Item $jsErrLogFile -Force -ErrorAction SilentlyContinue } + +$jsProcess = Start-Process -FilePath "node" -ArgumentList "src/ping.js", "client", $peerInfo.MultiAddr, "3" -NoNewWindow -PassThru -RedirectStandardOutput $jsLogFile -RedirectStandardError $jsErrLogFile +Write-ColorOutput "[DEBUG] JavaScript client PID: $($jsProcess.Id)" $Blue +Write-ColorOutput "[DEBUG] Client logs: $((Get-Location).Path)\$jsLogFile, $((Get-Location).Path)\$jsErrLogFile" $Blue + +# Wait for client to complete +$clientTimeout = 10 +$clientStart = Get-Date +while (-not $jsProcess.HasExited -and (((Get-Date) - $clientStart).TotalSeconds -lt $clientTimeout)) { + Start-Sleep -Seconds 1 +} + +if (-not $jsProcess.HasExited) { + Write-ColorOutput "[DEBUG] JavaScript client did not exit, terminating..." $Yellow + Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue +} + +Write-ColorOutput "[CHECK] Results..." $Cyan +$success = $false +if (Test-Path $jsLogFile) { + $jsLogContent = Get-Content $jsLogFile -Raw -ErrorAction SilentlyContinue + if ($jsLogContent -match "successful|Ping.*successful") { + $success = $true + Write-ColorOutput "[SUCCESS] Ping test passed" $Green + } else { + Write-ColorOutput "[FAILED] No successful pings" $Red + Write-ColorOutput "[DEBUG] Client log path: $((Get-Location).Path)\$jsLogFile" $Yellow + Write-ColorOutput "Client log:" $Yellow + Write-ColorOutput $jsLogContent $Yellow + if (Test-Path $jsErrLogFile) { + Write-ColorOutput "[DEBUG] Client error log path: $((Get-Location).Path)\$jsErrLogFile" $Yellow + Write-ColorOutput "Client error log:" $Yellow + Get-Content $jsErrLogFile | Write-ColorOutput -Color $Yellow + } + Write-ColorOutput "[DEBUG] Python server log path: $((Get-Location).Path)\..\py_node\$pyLogFile" $Yellow + Write-ColorOutput "Python server log:" $Yellow + if (Test-Path "../py_node/$pyLogFile") { + $pyLogContent = Get-Content "../py_node/$pyLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyLogContent) { Write-ColorOutput $pyLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + Write-ColorOutput "[DEBUG] Python server error log path: $((Get-Location).Path)\..\py_node\$pyErrLogFile" $Yellow + Write-ColorOutput "Python server error log:" $Yellow + if (Test-Path "../py_node/$pyErrLogFile") { + $pyErrLogContent = Get-Content "../py_node/$pyErrLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyErrLogContent) { Write-ColorOutput $pyErrLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + Write-ColorOutput "[DEBUG] Python debug log path: $((Get-Location).Path)\..\py_node\$pyDebugLogFile" $Yellow + Write-ColorOutput "Python debug log:" $Yellow + if (Test-Path "../py_node/$pyDebugLogFile") { + $pyDebugLogContent = Get-Content "../py_node/$pyDebugLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyDebugLogContent) { Write-ColorOutput $pyDebugLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + } +} + +Write-ColorOutput "[CLEANUP] Stopping processes..." $Yellow +Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue +Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue +Set-Location -Path "../" + +if ($success) { + Write-ColorOutput "[SUCCESS] Test completed" $Green + exit 0 +} else { + Write-ColorOutput "[FAILED] Test failed" $Red + exit 1 +} diff --git a/tests/interop/js_libp2p/scripts/run_test.sh b/tests/interop/js_libp2p/scripts/run_test.sh new file mode 100644 index 00000000..cbf9e627 --- /dev/null +++ b/tests/interop/js_libp2p/scripts/run_test.sh @@ -0,0 +1,215 @@ +#!/usr/bin/env bash + +# run_test.sh - libp2p Interoperability Test Runner (Bash) +# Tests py-libp2p <-> js-libp2p ping communication + +set -e + +# Colors for output +RED='\033[31m' +GREEN='\033[32m' +YELLOW='\033[33m' +BLUE='\033[34m' +CYAN='\033[36m' +RESET='\033[0m' + +write_color_output() { + local message="$1" + local color="${2:-$RESET}" + echo -e "${color}${message}${RESET}" +} + +write_color_output "[CHECK] Checking prerequisites..." "$CYAN" +if ! command -v python3 &> /dev/null && ! command -v python &> /dev/null; then + write_color_output "[ERROR] Python not found. Install Python 3.7+" "$RED" + exit 1 +fi + +# Use python3 if available, otherwise python +PYTHON_CMD="python3" +if ! command -v python3 &> /dev/null; then + PYTHON_CMD="python" +fi + +if ! command -v node &> /dev/null; then + write_color_output "[ERROR] Node.js not found. Install Node.js 16+" "$RED" + exit 1 +fi + +write_color_output "[CHECK] Checking port 8000..." "$BLUE" +if netstat -tuln 2>/dev/null | grep -q ":8000 " || ss -tuln 2>/dev/null | grep -q ":8000 "; then + write_color_output "[ERROR] Port 8000 in use. Free the port." "$RED" + if command -v netstat &> /dev/null; then + netstat -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW" + elif command -v ss &> /dev/null; then + ss -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW" + fi + exit 1 +fi + +write_color_output "[DEBUG] Cleaning up Python processes..." "$BLUE" +pkill -f "ping.py" 2>/dev/null || true + +write_color_output "[PYTHON] Starting server on port 8000..." "$YELLOW" +cd py_node + +PY_LOG_FILE="py_server_8000.log" +PY_ERR_LOG_FILE="py_server_8000.log.err" +PY_DEBUG_LOG_FILE="ping_debug.log" + +rm -f "$PY_LOG_FILE" "$PY_ERR_LOG_FILE" "$PY_DEBUG_LOG_FILE" + +$PYTHON_CMD -u ping.py server --port 8000 > "$PY_LOG_FILE" 2> "$PY_ERR_LOG_FILE" & +PY_PROCESS_PID=$! + +write_color_output "[DEBUG] Python server PID: $PY_PROCESS_PID" "$BLUE" +write_color_output "[DEBUG] Python logs: $(pwd)/$PY_LOG_FILE, $(pwd)/$PY_ERR_LOG_FILE, $(pwd)/$PY_DEBUG_LOG_FILE" "$BLUE" + +TIMEOUT_SECONDS=20 +START_TIME=$(date +%s) +SERVER_STARTED=false + +while [ $(($(date +%s) - START_TIME)) -lt $TIMEOUT_SECONDS ] && [ "$SERVER_STARTED" = false ]; do + if [ -f "$PY_LOG_FILE" ]; then + if grep -q "Server started\|Listening" "$PY_LOG_FILE" 2>/dev/null; then + SERVER_STARTED=true + write_color_output "[OK] Python server started" "$GREEN" + fi + fi + if [ -f "$PY_ERR_LOG_FILE" ] && [ -s "$PY_ERR_LOG_FILE" ]; then + ERR_CONTENT=$(cat "$PY_ERR_LOG_FILE" 2>/dev/null || true) + if [ -n "$ERR_CONTENT" ]; then + write_color_output "[DEBUG] Error log: $ERR_CONTENT" "$YELLOW" + fi + fi + sleep 0.5 +done + +if [ "$SERVER_STARTED" = false ]; then + write_color_output "[ERROR] Python server failed to start" "$RED" + write_color_output "[DEBUG] Logs:" "$YELLOW" + [ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + write_color_output "[DEBUG] Trying foreground run..." "$YELLOW" + $PYTHON_CMD -u ping.py server --port 8000 + exit 1 +fi + +# Extract Peer ID +PEER_ID="" +MULTI_ADDR="" +if [ -f "$PY_LOG_FILE" ]; then + CONTENT=$(cat "$PY_LOG_FILE" 2>/dev/null || true) + PEER_ID=$(echo "$CONTENT" | grep -oP "Peer ID:\s*\K[A-Za-z0-9]+" || true) + if [ -n "$PEER_ID" ]; then + MULTI_ADDR="/ip4/127.0.0.1/tcp/8000/p2p/$PEER_ID" + write_color_output "[OK] Peer ID: $PEER_ID" "$CYAN" + write_color_output "[OK] MultiAddr: $MULTI_ADDR" "$CYAN" + fi +fi + +if [ -z "$PEER_ID" ]; then + write_color_output "[ERROR] Could not extract Peer ID" "$RED" + [ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + kill $PY_PROCESS_PID 2>/dev/null || true + exit 1 +fi + +# Start JavaScript client +write_color_output "[JAVASCRIPT] Starting client..." "$YELLOW" +cd ../js_node + +JS_LOG_FILE="test_js_client_to_py_server.log" +JS_ERR_LOG_FILE="test_js_client_to_py_server.log.err" + +rm -f "$JS_LOG_FILE" "$JS_ERR_LOG_FILE" + +node src/ping.js client "$MULTI_ADDR" 3 > "$JS_LOG_FILE" 2> "$JS_ERR_LOG_FILE" & +JS_PROCESS_PID=$! + +write_color_output "[DEBUG] JavaScript client PID: $JS_PROCESS_PID" "$BLUE" +write_color_output "[DEBUG] Client logs: $(pwd)/$JS_LOG_FILE, $(pwd)/$JS_ERR_LOG_FILE" "$BLUE" + +# Wait for client to complete +CLIENT_TIMEOUT=10 +CLIENT_START=$(date +%s) +while kill -0 $JS_PROCESS_PID 2>/dev/null && [ $(($(date +%s) - CLIENT_START)) -lt $CLIENT_TIMEOUT ]; do + sleep 1 +done + +if kill -0 $JS_PROCESS_PID 2>/dev/null; then + write_color_output "[DEBUG] JavaScript client did not exit, terminating..." "$YELLOW" + kill $JS_PROCESS_PID 2>/dev/null || true +fi + +write_color_output "[CHECK] Results..." "$CYAN" +SUCCESS=false +if [ -f "$JS_LOG_FILE" ]; then + JS_LOG_CONTENT=$(cat "$JS_LOG_FILE" 2>/dev/null || true) + if echo "$JS_LOG_CONTENT" | grep -q "successful\|Ping.*successful"; then + SUCCESS=true + write_color_output "[SUCCESS] Ping test passed" "$GREEN" + else + write_color_output "[FAILED] No successful pings" "$RED" + write_color_output "[DEBUG] Client log path: $(pwd)/$JS_LOG_FILE" "$YELLOW" + write_color_output "Client log:" "$YELLOW" + write_color_output "$JS_LOG_CONTENT" "$YELLOW" + if [ -f "$JS_ERR_LOG_FILE" ]; then + write_color_output "[DEBUG] Client error log path: $(pwd)/$JS_ERR_LOG_FILE" "$YELLOW" + write_color_output "Client error log:" "$YELLOW" + cat "$JS_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + fi + write_color_output "[DEBUG] Python server log path: $(pwd)/../py_node/$PY_LOG_FILE" "$YELLOW" + write_color_output "Python server log:" "$YELLOW" + if [ -f "../py_node/$PY_LOG_FILE" ]; then + PY_LOG_CONTENT=$(cat "../py_node/$PY_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_LOG_CONTENT" ]; then + write_color_output "$PY_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + write_color_output "[DEBUG] Python server error log path: $(pwd)/../py_node/$PY_ERR_LOG_FILE" "$YELLOW" + write_color_output "Python server error log:" "$YELLOW" + if [ -f "../py_node/$PY_ERR_LOG_FILE" ]; then + PY_ERR_LOG_CONTENT=$(cat "../py_node/$PY_ERR_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_ERR_LOG_CONTENT" ]; then + write_color_output "$PY_ERR_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + write_color_output "[DEBUG] Python debug log path: $(pwd)/../py_node/$PY_DEBUG_LOG_FILE" "$YELLOW" + write_color_output "Python debug log:" "$YELLOW" + if [ -f "../py_node/$PY_DEBUG_LOG_FILE" ]; then + PY_DEBUG_LOG_CONTENT=$(cat "../py_node/$PY_DEBUG_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_DEBUG_LOG_CONTENT" ]; then + write_color_output "$PY_DEBUG_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + fi +fi + +write_color_output "[CLEANUP] Stopping processes..." "$YELLOW" +kill $PY_PROCESS_PID 2>/dev/null || true +kill $JS_PROCESS_PID 2>/dev/null || true +cd ../ + +if [ "$SUCCESS" = true ]; then + write_color_output "[SUCCESS] Test completed" "$GREEN" + exit 0 +else + write_color_output "[FAILED] Test failed" "$RED" + exit 1 +fi diff --git a/tests/interop/js_libp2p/test_js_basic.py b/tests/interop/js_libp2p/test_js_basic.py deleted file mode 100644 index f59dc4cf..00000000 --- a/tests/interop/js_libp2p/test_js_basic.py +++ /dev/null @@ -1,5 +0,0 @@ -def test_js_libp2p_placeholder(): - """ - Placeholder test for js-libp2p interop tests. - """ - assert True, "Placeholder test for js-libp2p interop tests" From 96434d99778d8aac2d93088fbd52dc59139781fa Mon Sep 17 00:00:00 2001 From: paschal533 Date: Mon, 9 Jun 2025 00:14:17 +0100 Subject: [PATCH 02/10] Remove .git --- tests/interop/js_libp2p/js_node/.gitIgnore | 1 + .../js_node/.github/pull_request_template.md | 17 ----------------- .../js_node/.github/workflows/sync.yml | 19 ------------------- 3 files changed, 1 insertion(+), 36 deletions(-) delete mode 100644 tests/interop/js_libp2p/js_node/.github/pull_request_template.md delete mode 100644 tests/interop/js_libp2p/js_node/.github/workflows/sync.yml diff --git a/tests/interop/js_libp2p/js_node/.gitIgnore b/tests/interop/js_libp2p/js_node/.gitIgnore index 59bb2a9a..cef77aaa 100644 --- a/tests/interop/js_libp2p/js_node/.gitIgnore +++ b/tests/interop/js_libp2p/js_node/.gitIgnore @@ -2,3 +2,4 @@ /package-lock.json /dist .log +.github \ No newline at end of file diff --git a/tests/interop/js_libp2p/js_node/.github/pull_request_template.md b/tests/interop/js_libp2p/js_node/.github/pull_request_template.md deleted file mode 100644 index b47baa1f..00000000 --- a/tests/interop/js_libp2p/js_node/.github/pull_request_template.md +++ /dev/null @@ -1,17 +0,0 @@ -# โš ๏ธ IMPORTANT โš ๏ธ - -# Please do not create a Pull Request for this repository - -The contents of this repository are automatically synced from the parent [js-libp2p Examples Project](https://github.com/libp2p/js-libp2p-examples) so any changes made to the standalone repository will be lost after the next sync. - -Please open a PR against [js-libp2p Examples](https://github.com/libp2p/js-libp2p-examples) instead. - -## Contributing - -Contributions are what make the open source community such an amazing place to be learn, inspire, and create. Any contributions you make are **greatly appreciated**. - -1. Fork the [js-libp2p Examples Project](https://github.com/libp2p/js-libp2p-examples) -1. Create your Feature Branch (`git checkout -b feature/amazing-example`) -1. Commit your Changes (`git commit -a -m 'feat: add some amazing example'`) -1. Push to the Branch (`git push origin feature/amazing-example`) -1. Open a Pull Request diff --git a/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml b/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml deleted file mode 100644 index 78f6c8d1..00000000 --- a/tests/interop/js_libp2p/js_node/.github/workflows/sync.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: pull - -on: - workflow_dispatch - -jobs: - sync: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Pull from another repository - uses: ipfs-examples/actions-pull-directory-from-repo@main - with: - source-repo: libp2p/js-libp2p-examples - source-folder-path: examples/${{ github.event.repository.name }} - source-branch: main - target-branch: main - git-username: github-actions - git-email: github-actions@github.com From b81168dae90ca7ef7e8c9eff4cd5da8e420619c1 Mon Sep 17 00:00:00 2001 From: Jinesh Jain <732005jinesh@gmail.com> Date: Sun, 13 Jul 2025 17:52:05 +0530 Subject: [PATCH 03/10] improve error message --- libp2p/utils/varint.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libp2p/utils/varint.py b/libp2p/utils/varint.py index b9fa6b9b..91891ed1 100644 --- a/libp2p/utils/varint.py +++ b/libp2p/utils/varint.py @@ -44,7 +44,9 @@ async def decode_uvarint_from_stream(reader: Reader) -> int: res = 0 for shift in itertools.count(0, 7): if shift > SHIFT_64_BIT_MAX: - raise ParseError("TODO: better exception msg: Integer is too large...") + raise ParseError( + "Varint decoding error: integer exceeds maximum size of 64 bits." + ) byte = await read_exactly(reader, 1) value = byte[0] From 8ec67289da9f9a507194e3b13a46a79a59ea361f Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 13 Jul 2025 15:55:37 +0200 Subject: [PATCH 04/10] feat: add length-prefixed protobuf support to identify protocol --- examples/identify/identify.py | 19 +- libp2p/host/defaults.py | 5 +- libp2p/identity/identify/identify.py | 60 ++- .../identity/identify_push/identify_push.py | 32 +- libp2p/utils/__init__.py | 4 + libp2p/utils/varint.py | 51 +++ tests/core/identity/identify/test_identify.py | 23 +- .../identify/test_identify_parsing.py | 410 ++++++++++++++++++ tests/core/utils/test_varint.py | 215 +++++++++ 9 files changed, 797 insertions(+), 22 deletions(-) create mode 100644 tests/core/identity/identify/test_identify_parsing.py create mode 100644 tests/core/utils/test_varint.py diff --git a/examples/identify/identify.py b/examples/identify/identify.py index 78cf8805..c6276ad5 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -8,9 +8,9 @@ import trio from libp2p import ( new_host, ) -from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID -from libp2p.identity.identify.pb.identify_pb2 import ( - Identify, +from libp2p.identity.identify.identify import ( + ID as IDENTIFY_PROTOCOL_ID, + parse_identify_response, ) from libp2p.peer.peerinfo import ( info_from_p2p_addr, @@ -84,11 +84,18 @@ async def run(port: int, destination: str) -> None: try: print("Starting identify protocol...") - response = await stream.read() + + # Read the complete response (could be either format) + # Read a larger chunk to get all the data before stream closes + response = await stream.read(8192) # Read enough data in one go + await stream.close() - identify_msg = Identify() - identify_msg.ParseFromString(response) + + # Parse the response using the robust protocol-level function + # This handles both old and new formats automatically + identify_msg = parse_identify_response(response) print_identify_response(identify_msg) + except Exception as e: print(f"Identify protocol error: {e}") diff --git a/libp2p/host/defaults.py b/libp2p/host/defaults.py index b8c50886..f0fe855e 100644 --- a/libp2p/host/defaults.py +++ b/libp2p/host/defaults.py @@ -26,5 +26,8 @@ if TYPE_CHECKING: def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": return OrderedDict( - ((IdentifyID, identify_handler_for(host)), (PingID, handle_ping)) + ( + (IdentifyID, identify_handler_for(host, use_varint_format=False)), + (PingID, handle_ping), + ) ) diff --git a/libp2p/identity/identify/identify.py b/libp2p/identity/identify/identify.py index 5d066e37..bbe9cdfd 100644 --- a/libp2p/identity/identify/identify.py +++ b/libp2p/identity/identify/identify.py @@ -16,7 +16,9 @@ from libp2p.network.stream.exceptions import ( StreamClosed, ) from libp2p.utils import ( + decode_varint_with_size, get_agent_version, + varint, ) from .pb.identify_pb2 import ( @@ -72,7 +74,47 @@ def _mk_identify_protobuf( ) -def identify_handler_for(host: IHost) -> StreamHandlerFn: +def parse_identify_response(response: bytes) -> Identify: + """ + Parse identify response that could be either: + - Old format: raw protobuf + - New format: length-prefixed protobuf + + This function provides backward and forward compatibility. + """ + # Try new format first: length-prefixed protobuf + if len(response) >= 1: + length, varint_size = decode_varint_with_size(response) + if varint_size > 0 and length > 0 and varint_size + length <= len(response): + protobuf_data = response[varint_size : varint_size + length] + try: + identify_response = Identify() + identify_response.ParseFromString(protobuf_data) + # Sanity check: must have agent_version (protocol_version is optional) + if identify_response.agent_version: + logger.debug( + "Parsed length-prefixed identify response (new format)" + ) + return identify_response + except Exception: + pass # Fall through to old format + + # Fall back to old format: raw protobuf + try: + identify_response = Identify() + identify_response.ParseFromString(response) + logger.debug("Parsed raw protobuf identify response (old format)") + return identify_response + except Exception as e: + logger.error(f"Failed to parse identify response: {e}") + logger.error(f"Response length: {len(response)}") + logger.error(f"Response hex: {response.hex()}") + raise + + +def identify_handler_for( + host: IHost, use_varint_format: bool = False +) -> StreamHandlerFn: async def handle_identify(stream: INetStream) -> None: # get observed address from ``stream`` peer_id = ( @@ -100,7 +142,21 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn: response = protobuf.SerializeToString() try: - await stream.write(response) + if use_varint_format: + # Send length-prefixed protobuf message (new format) + await stream.write(varint.encode_uvarint(len(response))) + await stream.write(response) + logger.debug( + "Sent new format (length-prefixed) identify response to %s", + peer_id, + ) + else: + # Send raw protobuf message (old format for backward compatibility) + await stream.write(response) + logger.debug( + "Sent old format (raw protobuf) identify response to %s", + peer_id, + ) except StreamClosed: logger.debug("Fail to respond to %s request: stream closed", ID) else: diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 914264ed..f9b031de 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -25,6 +25,10 @@ from libp2p.peer.id import ( ) from libp2p.utils import ( get_agent_version, + varint, +) +from libp2p.utils.varint import ( + decode_varint_from_bytes, ) from ..identify.identify import ( @@ -55,8 +59,29 @@ def identify_push_handler_for(host: IHost) -> StreamHandlerFn: peer_id = stream.muxed_conn.peer_id try: - # Read the identify message from the stream - data = await stream.read() + # Read length-prefixed identify message from the stream + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + identify_msg = Identify() identify_msg.ParseFromString(data) @@ -159,7 +184,8 @@ async def push_identify_to_peer( identify_msg = _mk_identify_protobuf(host, observed_multiaddr) response = identify_msg.SerializeToString() - # Send the identify message + # Send length-prefixed identify message + await stream.write(varint.encode_uvarint(len(response))) await stream.write(response) # Close the stream diff --git a/libp2p/utils/__init__.py b/libp2p/utils/__init__.py index 3b015c6a..2d1ee23e 100644 --- a/libp2p/utils/__init__.py +++ b/libp2p/utils/__init__.py @@ -7,6 +7,8 @@ from libp2p.utils.varint import ( encode_varint_prefixed, read_delim, read_varint_prefixed_bytes, + decode_varint_from_bytes, + decode_varint_with_size, ) from libp2p.utils.version import ( get_agent_version, @@ -20,4 +22,6 @@ __all__ = [ "get_agent_version", "read_delim", "read_varint_prefixed_bytes", + "decode_varint_from_bytes", + "decode_varint_with_size", ] diff --git a/libp2p/utils/varint.py b/libp2p/utils/varint.py index b9fa6b9b..7da96542 100644 --- a/libp2p/utils/varint.py +++ b/libp2p/utils/varint.py @@ -39,6 +39,30 @@ def encode_uvarint(number: int) -> bytes: return buf +def decode_varint_from_bytes(data: bytes) -> int: + """ + Decode a varint from bytes and return the value. + + This is a synchronous version of decode_uvarint_from_stream for already-read bytes. + """ + res = 0 + for shift in itertools.count(0, 7): + if shift > SHIFT_64_BIT_MAX: + raise ParseError("Integer is too large...") + + if not data: + raise ParseError("Unexpected end of data") + + value = data[0] + data = data[1:] + + res += (value & LOW_MASK) << shift + + if not value & HIGH_MASK: + break + return res + + async def decode_uvarint_from_stream(reader: Reader) -> int: """https://en.wikipedia.org/wiki/LEB128.""" res = 0 @@ -56,6 +80,33 @@ async def decode_uvarint_from_stream(reader: Reader) -> int: return res +def decode_varint_with_size(data: bytes) -> tuple[int, int]: + """ + Decode a varint from bytes and return (value, bytes_consumed). + Returns (0, 0) if the data doesn't start with a valid varint. + """ + try: + # Calculate how many bytes the varint consumes + varint_size = 0 + for i, byte in enumerate(data): + varint_size += 1 + if (byte & 0x80) == 0: + break + + if varint_size == 0: + return 0, 0 + + # Extract just the varint bytes + varint_bytes = data[:varint_size] + + # Decode the varint + value = decode_varint_from_bytes(varint_bytes) + + return value, varint_size + except Exception: + return 0, 0 + + def encode_varint_prefixed(msg_bytes: bytes) -> bytes: varint_len = encode_uvarint(len(msg_bytes)) return varint_len + msg_bytes diff --git a/tests/core/identity/identify/test_identify.py b/tests/core/identity/identify/test_identify.py index e88c7ebe..ee721299 100644 --- a/tests/core/identity/identify/test_identify.py +++ b/tests/core/identity/identify/test_identify.py @@ -11,9 +11,7 @@ from libp2p.identity.identify.identify import ( PROTOCOL_VERSION, _mk_identify_protobuf, _multiaddr_to_bytes, -) -from libp2p.identity.identify.pb.identify_pb2 import ( - Identify, + parse_identify_response, ) from tests.utils.factories import ( host_pair_factory, @@ -29,14 +27,18 @@ async def test_identify_protocol(security_protocol): host_b, ): # Here, host_b is the requester and host_a is the responder. - # observed_addr represent host_bโ€™s address as observed by host_a - # (i.e., the address from which host_bโ€™s request was received). + # observed_addr represent host_b's address as observed by host_a + # (i.e., the address from which host_b's request was received). stream = await host_b.new_stream(host_a.get_id(), (ID,)) - response = await stream.read() + + # Read the response (could be either format) + # Read a larger chunk to get all the data before stream closes + response = await stream.read(8192) # Read enough data in one go + await stream.close() - identify_response = Identify() - identify_response.ParseFromString(response) + # Parse the response (handles both old and new formats) + identify_response = parse_identify_response(response) logger.debug("host_a: %s", host_a.get_addrs()) logger.debug("host_b: %s", host_b.get_addrs()) @@ -62,8 +64,9 @@ async def test_identify_protocol(security_protocol): logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr)) logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0]) - logger.debug("cleaned_addr= %s", cleaned_addr) - assert identify_response.observed_addr == _multiaddr_to_bytes(cleaned_addr) + + # The observed address should match the cleaned address + assert Multiaddr(identify_response.observed_addr) == cleaned_addr # Check protocols assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols()) diff --git a/tests/core/identity/identify/test_identify_parsing.py b/tests/core/identity/identify/test_identify_parsing.py new file mode 100644 index 00000000..d76d82a1 --- /dev/null +++ b/tests/core/identity/identify/test_identify_parsing.py @@ -0,0 +1,410 @@ +import pytest + +from libp2p.identity.identify.identify import ( + _mk_identify_protobuf, +) +from libp2p.identity.identify.pb.identify_pb2 import ( + Identify, +) +from libp2p.io.abc import Closer, Reader, Writer +from libp2p.utils.varint import ( + decode_varint_from_bytes, + encode_varint_prefixed, +) +from tests.utils.factories import ( + host_pair_factory, +) + + +class MockStream(Reader, Writer, Closer): + """Mock stream for testing identify protocol compatibility.""" + + def __init__(self, data: bytes): + self.data = data + self.position = 0 + self.closed = False + + async def read(self, n: int | None = None) -> bytes: + if self.closed or self.position >= len(self.data): + return b"" + if n is None: + n = len(self.data) - self.position + result = self.data[self.position : self.position + n] + self.position += len(result) + return result + + async def write(self, data: bytes) -> None: + # Mock write - just store the data + pass + + async def close(self) -> None: + self.closed = True + + +def create_identify_message(host, observed_multiaddr=None): + """Create an identify protobuf message.""" + return _mk_identify_protobuf(host, observed_multiaddr) + + +def create_new_format_message(identify_msg): + """Create a new format (length-prefixed) identify message.""" + msg_bytes = identify_msg.SerializeToString() + return encode_varint_prefixed(msg_bytes) + + +def create_old_format_message(identify_msg): + """Create an old format (raw protobuf) identify message.""" + return identify_msg.SerializeToString() + + +async def read_new_format_message(stream) -> bytes: + """Read a new format (length-prefixed) identify message.""" + # Read varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + raise ValueError("No length prefix received") + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + response = await stream.read(msg_length) + if len(response) != msg_length: + raise ValueError("Incomplete message received") + + return response + + +async def read_old_format_message(stream) -> bytes: + """Read an old format (raw protobuf) identify message.""" + # Read all available data + response = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +async def read_compatible_message(stream) -> bytes: + """Read an identify message in either old or new format.""" + # Try to read a few bytes to detect the format + first_bytes = await stream.read(10) + if not first_bytes: + raise ValueError("No data received") + + # Try to decode as varint length prefix (new format) + try: + msg_length = decode_varint_from_bytes(first_bytes) + + # Validate that the length is reasonable (not too large) + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate how many bytes the varint consumed + varint_len = 0 + for i, byte in enumerate(first_bytes): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Read the remaining protobuf message + remaining_bytes = await stream.read( + msg_length - (len(first_bytes) - varint_len) + ) + if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len): + message_data = first_bytes[varint_len:] + remaining_bytes + + # Try to parse as protobuf to validate + try: + Identify().ParseFromString(message_data) + return message_data + except Exception: + # If protobuf parsing fails, fall back to old format + pass + except Exception: + pass + + # Fall back to old format (raw protobuf) + response = first_bytes + + # Read more data if available + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +async def read_compatible_message_simple(stream) -> bytes: + """Read a message in either old or new format (simplified version for testing).""" + # Try to read a few bytes to detect the format + first_bytes = await stream.read(10) + if not first_bytes: + raise ValueError("No data received") + + # Try to decode as varint length prefix (new format) + try: + msg_length = decode_varint_from_bytes(first_bytes) + + # Validate that the length is reasonable (not too large) + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate how many bytes the varint consumed + varint_len = 0 + for i, byte in enumerate(first_bytes): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Read the remaining message + remaining_bytes = await stream.read( + msg_length - (len(first_bytes) - varint_len) + ) + if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len): + return first_bytes[varint_len:] + remaining_bytes + except Exception: + pass + + # Fall back to old format (raw data) + response = first_bytes + + # Read more data if available + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +def detect_format(data): + """Detect if data is in new or old format (varint-prefixed or raw protobuf).""" + if not data: + return "unknown" + + # Try to decode as varint + try: + msg_length = decode_varint_from_bytes(data) + + # Validate that the length is reasonable + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate varint length + varint_len = 0 + for i, byte in enumerate(data): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Check if we have enough data for the message + if len(data) >= varint_len + msg_length: + # Additional check: try to parse the message as protobuf + try: + message_data = data[varint_len : varint_len + msg_length] + Identify().ParseFromString(message_data) + return "new" + except Exception: + # If protobuf parsing fails, it's probably not a valid new format + pass + except Exception: + pass + + # If varint decoding fails or length is unreasonable, assume old format + return "old" + + +@pytest.mark.trio +async def test_identify_new_format_compatibility(security_protocol): + """Test that identify protocol works with new format (length-prefixed) messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create new format message + new_format_data = create_new_format_message(identify_msg) + + # Create mock stream with new format data + stream = MockStream(new_format_data) + + # Read using new format reader + response = await read_new_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_old_format_compatibility(security_protocol): + """Test that identify protocol works with old format (raw protobuf) messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create old format message + old_format_data = create_old_format_message(identify_msg) + + # Create mock stream with old format data + stream = MockStream(old_format_data) + + # Read using old format reader + response = await read_old_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_backward_compatibility_old_format(security_protocol): + """Test backward compatibility reader with old format messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create old format message + old_format_data = create_old_format_message(identify_msg) + + # Create mock stream with old format data + stream = MockStream(old_format_data) + + # Read using old format reader (which should work reliably) + response = await read_old_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_backward_compatibility_new_format(security_protocol): + """Test backward compatibility reader with new format messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create new format message + new_format_data = create_new_format_message(identify_msg) + + # Create mock stream with new format data + stream = MockStream(new_format_data) + + # Read using new format reader (which should work reliably) + response = await read_new_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_format_detection(security_protocol): + """Test that the format detection works correctly.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Test new format detection + new_format_data = create_new_format_message(identify_msg) + format_type = detect_format(new_format_data) + assert format_type == "new", "New format should be detected correctly" + + # Test old format detection + old_format_data = create_old_format_message(identify_msg) + format_type = detect_format(old_format_data) + assert format_type == "old", "Old format should be detected correctly" + + +@pytest.mark.trio +async def test_identify_error_handling(security_protocol): + """Test error handling for malformed messages.""" + from libp2p.exceptions import ParseError + + # Test with empty data + stream = MockStream(b"") + with pytest.raises(ValueError, match="No data received"): + await read_compatible_message(stream) + + # Test with incomplete varint + stream = MockStream(b"\x80") # Incomplete varint + with pytest.raises(ParseError, match="Unexpected end of data"): + await read_new_format_message(stream) + + # Test with invalid protobuf data + stream = MockStream(b"\x05invalid") # Length prefix but invalid protobuf + with pytest.raises(Exception): # Should fail when parsing protobuf + response = await read_new_format_message(stream) + Identify().ParseFromString(response) + + +@pytest.mark.trio +async def test_identify_message_equivalence(security_protocol): + """Test that old and new format messages are equivalent.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create both formats + new_format_data = create_new_format_message(identify_msg) + old_format_data = create_old_format_message(identify_msg) + + # Extract the protobuf message from new format + varint_len = 0 + for i, byte in enumerate(new_format_data): + varint_len += 1 + if (byte & 0x80) == 0: + break + + new_format_protobuf = new_format_data[varint_len:] + + # The protobuf messages should be identical + assert new_format_protobuf == old_format_data, ( + "Protobuf messages should be identical in both formats" + ) diff --git a/tests/core/utils/test_varint.py b/tests/core/utils/test_varint.py new file mode 100644 index 00000000..6ade58fd --- /dev/null +++ b/tests/core/utils/test_varint.py @@ -0,0 +1,215 @@ +import pytest + +from libp2p.exceptions import ParseError +from libp2p.io.abc import Reader +from libp2p.utils.varint import ( + decode_varint_from_bytes, + encode_uvarint, + encode_varint_prefixed, + read_varint_prefixed_bytes, +) + + +class MockReader(Reader): + """Mock reader for testing varint functions.""" + + def __init__(self, data: bytes): + self.data = data + self.position = 0 + + async def read(self, n: int | None = None) -> bytes: + if self.position >= len(self.data): + return b"" + if n is None: + n = len(self.data) - self.position + result = self.data[self.position : self.position + n] + self.position += len(result) + return result + + +def test_encode_uvarint(): + """Test varint encoding with various values.""" + test_cases = [ + (0, b"\x00"), + (1, b"\x01"), + (127, b"\x7f"), + (128, b"\x80\x01"), + (255, b"\xff\x01"), + (256, b"\x80\x02"), + (65535, b"\xff\xff\x03"), + (65536, b"\x80\x80\x04"), + (16777215, b"\xff\xff\xff\x07"), + (16777216, b"\x80\x80\x80\x08"), + ] + + for value, expected in test_cases: + result = encode_uvarint(value) + assert result == expected, ( + f"Failed for value {value}: expected {expected.hex()}, got {result.hex()}" + ) + + +def test_decode_varint_from_bytes(): + """Test varint decoding with various values.""" + test_cases = [ + (b"\x00", 0), + (b"\x01", 1), + (b"\x7f", 127), + (b"\x80\x01", 128), + (b"\xff\x01", 255), + (b"\x80\x02", 256), + (b"\xff\xff\x03", 65535), + (b"\x80\x80\x04", 65536), + (b"\xff\xff\xff\x07", 16777215), + (b"\x80\x80\x80\x08", 16777216), + ] + + for data, expected in test_cases: + result = decode_varint_from_bytes(data) + assert result == expected, ( + f"Failed for data {data.hex()}: expected {expected}, got {result}" + ) + + +def test_decode_varint_from_bytes_invalid(): + """Test varint decoding with invalid data.""" + # Empty data + with pytest.raises(ParseError, match="Unexpected end of data"): + decode_varint_from_bytes(b"") + + # Incomplete varint (should not raise, but should handle gracefully) + # This depends on the implementation - some might raise, others might return partial + + +def test_encode_varint_prefixed(): + """Test encoding messages with varint length prefix.""" + test_cases = [ + (b"", b"\x00"), + (b"hello", b"\x05hello"), + (b"x" * 127, b"\x7f" + b"x" * 127), + (b"x" * 128, b"\x80\x01" + b"x" * 128), + ] + + for message, expected in test_cases: + result = encode_varint_prefixed(message) + assert result == expected, ( + f"Failed for message {message}: expected {expected.hex()}, " + f"got {result.hex()}" + ) + + +@pytest.mark.trio +async def test_read_varint_prefixed_bytes(): + """Test reading length-prefixed bytes from a stream.""" + test_cases = [ + (b"", b""), + (b"hello", b"hello"), + (b"x" * 127, b"x" * 127), + (b"x" * 128, b"x" * 128), + ] + + for message, expected in test_cases: + prefixed_data = encode_varint_prefixed(message) + reader = MockReader(prefixed_data) + + result = await read_varint_prefixed_bytes(reader) + assert result == expected, ( + f"Failed for message {message}: expected {expected}, got {result}" + ) + + +@pytest.mark.trio +async def test_read_varint_prefixed_bytes_incomplete(): + """Test reading length-prefixed bytes with incomplete data.""" + from libp2p.io.exceptions import IncompleteReadError + + # Test with incomplete varint + reader = MockReader(b"\x80") # Incomplete varint + with pytest.raises(IncompleteReadError): + await read_varint_prefixed_bytes(reader) + + # Test with incomplete message + prefixed_data = encode_varint_prefixed(b"hello world") + reader = MockReader(prefixed_data[:-3]) # Missing last 3 bytes + with pytest.raises(IncompleteReadError): + await read_varint_prefixed_bytes(reader) + + +def test_varint_roundtrip(): + """Test roundtrip encoding and decoding.""" + test_values = [0, 1, 127, 128, 255, 256, 65535, 65536, 16777215, 16777216] + + for value in test_values: + encoded = encode_uvarint(value) + decoded = decode_varint_from_bytes(encoded) + assert decoded == value, ( + f"Roundtrip failed for {value}: encoded={encoded.hex()}, decoded={decoded}" + ) + + +def test_varint_prefixed_roundtrip(): + """Test roundtrip encoding and decoding of length-prefixed messages.""" + test_messages = [ + b"", + b"hello", + b"x" * 127, + b"x" * 128, + b"x" * 1000, + ] + + for message in test_messages: + prefixed = encode_varint_prefixed(message) + + # Decode the length + length = decode_varint_from_bytes(prefixed) + assert length == len(message), ( + f"Length mismatch for {message}: expected {len(message)}, got {length}" + ) + + # Extract the message + varint_len = 0 + for i, byte in enumerate(prefixed): + varint_len += 1 + if (byte & 0x80) == 0: + break + + extracted_message = prefixed[varint_len:] + assert extracted_message == message, ( + f"Message mismatch: expected {message}, got {extracted_message}" + ) + + +def test_large_varint_values(): + """Test varint encoding/decoding with large values.""" + large_values = [ + 2**32 - 1, # 32-bit max + 2**64 - 1, # 64-bit max (if supported) + ] + + for value in large_values: + try: + encoded = encode_uvarint(value) + decoded = decode_varint_from_bytes(encoded) + assert decoded == value, f"Large value roundtrip failed for {value}" + except Exception as e: + # Some implementations might not support very large values + pytest.skip(f"Large value {value} not supported: {e}") + + +def test_varint_edge_cases(): + """Test varint encoding/decoding with edge cases.""" + # Test with maximum 7-bit value + assert encode_uvarint(127) == b"\x7f" + assert decode_varint_from_bytes(b"\x7f") == 127 + + # Test with minimum 8-bit value + assert encode_uvarint(128) == b"\x80\x01" + assert decode_varint_from_bytes(b"\x80\x01") == 128 + + # Test with maximum 14-bit value + assert encode_uvarint(16383) == b"\xff\x7f" + assert decode_varint_from_bytes(b"\xff\x7f") == 16383 + + # Test with minimum 15-bit value + assert encode_uvarint(16384) == b"\x80\x80\x01" + assert decode_varint_from_bytes(b"\x80\x80\x01") == 16384 From 912669a924daff0f89e1c755c546e29905f8641a Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 13 Jul 2025 16:04:46 +0200 Subject: [PATCH 05/10] doc: newsfragment --- newsfragments/761.feature.rst | 1 + tests/interop/js_libp2p/py_node/ping.py | 398 ------------------------ 2 files changed, 1 insertion(+), 398 deletions(-) create mode 100644 newsfragments/761.feature.rst delete mode 100644 tests/interop/js_libp2p/py_node/ping.py diff --git a/newsfragments/761.feature.rst b/newsfragments/761.feature.rst new file mode 100644 index 00000000..fd38866c --- /dev/null +++ b/newsfragments/761.feature.rst @@ -0,0 +1 @@ +add length-prefixed support to identify protocol diff --git a/tests/interop/js_libp2p/py_node/ping.py b/tests/interop/js_libp2p/py_node/ping.py deleted file mode 100644 index a13a8ace..00000000 --- a/tests/interop/js_libp2p/py_node/ping.py +++ /dev/null @@ -1,398 +0,0 @@ -import argparse -import logging - -from cryptography.hazmat.primitives.asymmetric import ( - x25519, -) -import multiaddr -import trio - -from libp2p import ( - generate_new_rsa_identity, - new_host, -) -from libp2p.custom_types import ( - TProtocol, -) -from libp2p.network.stream.net_stream import ( - INetStream, -) -from libp2p.peer.peerinfo import ( - info_from_p2p_addr, -) -from libp2p.security.noise.transport import Transport as NoiseTransport -from libp2p.stream_muxer.yamux.yamux import ( - Yamux, -) -from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID - -# Configure detailed logging -logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[ - logging.StreamHandler(), - logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), - ], -) - -PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") -PING_LENGTH = 32 -RESP_TIMEOUT = 60 - - -async def handle_ping(stream: INetStream) -> None: - """Handle incoming ping requests from js-libp2p clients""" - peer_id = stream.muxed_conn.peer_id - print(f"[INFO] New ping stream opened by {peer_id}") - logging.info(f"Ping handler called for peer {peer_id}") - - ping_count = 0 - - try: - while True: - try: - print(f"[INFO] Waiting for ping data from {peer_id}...") - logging.debug(f"Stream state: {stream}") - data = await stream.read(PING_LENGTH) - - if not data: - print( - f"[INFO] No data received," - f"connection likely closed by {peer_id}" - ) - logging.debug("No data received, stream closed") - break - - if len(data) == 0: - print(f"[INFO] Empty data received, connection closed by {peer_id}") - logging.debug("Empty data received") - break - - ping_count += 1 - print( - f"[PING {ping_count}] Received ping from {peer_id}:" - f"{len(data)} bytes" - ) - logging.debug(f"Ping data: {data.hex()}") - - await stream.write(data) - print(f"[PING {ping_count}] Echoed ping back to {peer_id}") - - except Exception as e: - print(f"[ERROR] Error in ping loop with {peer_id}: {e}") - logging.exception("Ping loop error") - break - - except Exception as e: - print(f"[ERROR] Error handling ping from {peer_id}: {e}") - logging.exception("Ping handler error") - finally: - try: - print(f"[INFO] Closing ping stream with {peer_id}") - await stream.close() - except Exception as e: - logging.debug(f"Error closing stream: {e}") - - print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") - - -async def send_ping_sequence(stream: INetStream, count: int = 5) -> None: - """Send a sequence of pings compatible with js-libp2p.""" - peer_id = stream.muxed_conn.peer_id - print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") - - import os - import time - - rtts = [] - - for i in range(1, count + 1): - try: - payload = os.urandom(PING_LENGTH) - print(f"[PING {i}/{count}] Sending ping to {peer_id}") - logging.debug(f"Sending payload: {payload.hex()}") - start_time = time.time() - - await stream.write(payload) - - with trio.fail_after(RESP_TIMEOUT): - response = await stream.read(PING_LENGTH) - - end_time = time.time() - rtt = (end_time - start_time) * 1000 - - if ( - response - and len(response) >= PING_LENGTH - and response[:PING_LENGTH] == payload - ): - rtts.append(rtt) - print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms") - else: - print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") - if response: - logging.debug(f"Expected: {payload.hex()}") - logging.debug(f"Received: {response.hex()}") - - if i < count: - await trio.sleep(1) - - except trio.TooSlowError: - print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") - except Exception as e: - print(f"[ERROR] Ping {i} failed: {e}") - logging.exception(f"Ping {i} error") - - if rtts: - avg_rtt = sum(rtts) / len(rtts) - min_rtt = min(rtts) - max_rtts = max(rtts) - success_count = len(rtts) - loss_rate = ((count - success_count) / count) * 100 - - print( - f" Packets: Sent={count}, Received={success_count}," - f" Lost={count - success_count}" - ) - print(f" Loss rate: {loss_rate:.1f}%") - print( - f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms," f"max={max_rtts:.2f}ms" - ) - else: - print(f"\n[STATS] All pings failed ({count} attempts)") - - -def create_noise_keypair(): - try: - x25519_private_key = x25519.X25519PrivateKey.generate() - - class NoisePrivateKey: - def __init__(self, key): - self._key = key - - def to_bytes(self): - return self._key.private_bytes_raw() - - def public_key(self): - return NoisePublicKey(self._key.public_key()) - - def get_public_key(self): - return NoisePublicKey(self._key.public_key()) - - class NoisePublicKey: - def __init__(self, key): - self._key = key - - def to_bytes(self): - return self._key.public_bytes_raw() - - return NoisePrivateKey(x25519_private_key) - except Exception as e: - logging.error(f"Failed to create Noise keypair: {e}") - return None - - -async def run_server(port: int) -> None: - """Run ping server that accepts connections from js-libp2p clients.""" - listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - - key_pair = generate_new_rsa_identity() - logging.debug("Generated RSA keypair") - - noise_privkey = create_noise_keypair() - logging.debug("Generated Noise keypair") - - noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) - logging.debug(f"Noise transport initialized: {noise_transport}") - sec_opt = {TProtocol("/noise"): noise_transport} - muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} - - logging.info(f"Using muxer: {muxer_opt}") - - host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) - - print("[INFO] Starting py-libp2p ping server...") - - async with host.run(listen_addrs=[listen_addr]): - print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") - host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) - - alt_protocols = [ - TProtocol("/ping/1.0.0"), - TProtocol("/libp2p/ping/1.0.0"), - ] - - for alt_proto in alt_protocols: - print(f"[INFO] Also registering handler for: {alt_proto}") - host.set_stream_handler(alt_proto, handle_ping) - - print("[INFO] Server started!") - print(f"[INFO] Peer ID: {host.get_id()}") - print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}") - print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") - # print(f"[INFO] Security: Noise encryption") - # print(f"[INFO] Muxer: Yamux stream multiplexing") - - print("\n[INFO] Registered protocols:") - print(f" - {PING_PROTOCOL_ID}") - for proto in alt_protocols: - print(f" - {proto}") - - peer_id = host.get_id() - print("\n[TEST] Test with js-libp2p:") - print(f" node ping.js client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") - - print("\n[TEST] Test with py-libp2p:") - print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") - - print("\n[INFO] Waiting for connections...") - print("Press Ctrl+C to exit") - - await trio.sleep_forever() - - -async def run_client(destination: str, count: int = 5) -> None: - """Run ping client to test connectivity with another peer.""" - listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") - - key_pair = generate_new_rsa_identity() - logging.debug("Generated RSA keypair") - - noise_privkey = create_noise_keypair() - logging.debug("Generated Noise keypair") - - noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) - logging.debug(f"Noise transport initialized: {noise_transport}") - sec_opt = {TProtocol("/noise"): noise_transport} - muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} - - logging.info(f"Using muxer: {muxer_opt}") - - host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) - - print("[INFO] Starting py-libp2p ping client...") - - async with host.run(listen_addrs=[listen_addr]): - print(f"[INFO] Our Peer ID: {host.get_id()}") - print(f"[INFO] Target: {destination}") - print("[INFO] Security: Noise encryption") - print("[INFO] Muxer: Yamux stream multiplexing") - - try: - maddr = multiaddr.Multiaddr(destination) - info = info_from_p2p_addr(maddr) - target_peer_id = info.peer_id - - print(f"[INFO] Target Peer ID: {target_peer_id}") - print("[INFO] Connecting to peer...") - - await host.connect(info) - print("[INFO] Connection established!") - - protocols_to_try = [ - PING_PROTOCOL_ID, - TProtocol("/ping/1.0.0"), - TProtocol("/libp2p/ping/1.0.0"), - ] - - stream = None - - for proto in protocols_to_try: - try: - print(f"[INFO] Trying to open stream with protocol: {proto}") - stream = await host.new_stream(target_peer_id, [proto]) - print(f"[INFO] Stream opened with protocol: {proto}") - break - except Exception as e: - print(f"[ERROR] Failed to open stream with {proto}: {e}") - continue - - if not stream: - print("[ERROR] Failed to open stream with any ping protocol") - return 1 - - await send_ping_sequence(stream, count) - - await stream.close() - - except Exception as e: - print(f"[ERROR] Client error: {e}") - import traceback - - traceback.print_exc() - return 1 - - print("\n[INFO] Client stopped") - return 0 - - -def main() -> None: - """Main function with argument parsing.""" - description = """ - py-libp2p ping tool for interoperability testing with js-libp2p. - Uses Noise encryption and Yamux multiplexing for compatibility. - - Server mode: Listens for ping requests from js-libp2p or py-libp2p clients. - Client mode: Sends ping requests to js-libp2p or py-libp2p servers. - """ - - example_maddr = ( - "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" - ) - - parser = argparse.ArgumentParser( - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=f""" -Examples: - python ping.py server # Start server on port 8000 - python ping.py server --port 9000 # Start server on port 9000 - python ping.py client {example_maddr} - python ping.py client {example_maddr} --count 10 - """, - ) - - subparsers = parser.add_subparsers(dest="mode", help="Operation mode") - - server_parser = subparsers.add_parser("server", help="Run as ping server") - server_parser.add_argument( - "--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)" - ) - - client_parser = subparsers.add_parser("client", help="Run as ping client") - client_parser.add_argument("destination", help="Target peer multiaddr") - client_parser.add_argument( - "--count", - "-c", - type=int, - default=5, - help="Number of pings to send (default: 5)", - ) - - args = parser.parse_args() - - if not args.mode: - parser.print_help() - return 1 - - try: - if args.mode == "server": - trio.run(run_server, args.port) - elif args.mode == "client": - return trio.run(run_client, args.destination, args.count) - except KeyboardInterrupt: - print("\n[INFO] Goodbye!") - return 0 - except Exception as e: - print(f"[ERROR] Fatal error: {e}") - import traceback - - traceback.print_exc() - return 1 - - return 0 - - -if __name__ == "__main__": - exit(main()) From 4bbb08ce2d152187be44149a0071af4c0fdadc0b Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 13 Jul 2025 16:13:52 +0200 Subject: [PATCH 06/10] feat: add length-prefixed protobuf support to identify protocol --- tests/interop/js_libp2p/js_node/.gitIgnore | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 tests/interop/js_libp2p/js_node/.gitIgnore diff --git a/tests/interop/js_libp2p/js_node/.gitIgnore b/tests/interop/js_libp2p/js_node/.gitIgnore deleted file mode 100644 index cef77aaa..00000000 --- a/tests/interop/js_libp2p/js_node/.gitIgnore +++ /dev/null @@ -1,5 +0,0 @@ -/node_modules -/package-lock.json -/dist -.log -.github \ No newline at end of file From 1c59653946d2654f962b68aa1dd92f95f7b8747d Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 13 Jul 2025 17:24:56 +0200 Subject: [PATCH 07/10] breaking: identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages --- examples/identify/identify.py | 39 +++++- .../identify_push_listener_dialer.py | 113 ++++++++++++++---- libp2p/host/defaults.py | 2 +- .../identity/identify_push/identify_push.py | 105 +++++++++++----- newsfragments/761.breaking.rst | 1 + .../identify_push/test_identify_push.py | 6 +- 6 files changed, 209 insertions(+), 57 deletions(-) create mode 100644 newsfragments/761.breaking.rst diff --git a/examples/identify/identify.py b/examples/identify/identify.py index c6276ad5..4882d2c3 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -10,6 +10,7 @@ from libp2p import ( ) from libp2p.identity.identify.identify import ( ID as IDENTIFY_PROTOCOL_ID, + identify_handler_for, parse_identify_response, ) from libp2p.peer.peerinfo import ( @@ -50,7 +51,7 @@ def print_identify_response(identify_response): ) -async def run(port: int, destination: str) -> None: +async def run(port: int, destination: str, use_varint_format: bool = True) -> None: localhost_ip = "0.0.0.0" if not destination: @@ -58,11 +59,24 @@ async def run(port: int, destination: str) -> None: listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") host_a = new_host() + # Set up identify handler with specified format + identify_handler = identify_handler_for( + host_a, use_varint_format=use_varint_format + ) + host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler) + async with host_a.run(listen_addrs=[listen_addr]): + # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client + # connections + server_addr = str(host_a.get_addrs()[0]) + client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/") + + format_name = "length-prefixed" if use_varint_format else "raw protobuf" print( - "First host listening. Run this from another console:\n\n" + f"First host listening (using {format_name} format). " + f"Run this from another console:\n\n" f"identify-demo " - f"-d {host_a.get_addrs()[0]}\n" + f"-d {client_addr}\n" ) print("Waiting for incoming identify request...") await trio.sleep_forever() @@ -105,9 +119,12 @@ async def run(port: int, destination: str) -> None: def main() -> None: description = """ This program demonstrates the libp2p identify protocol. - First run identify-demo -p ' to start a listener. + First run 'identify-demo -p [--raw-format]' to start a listener. Then run 'identify-demo -d ' where is the multiaddress shown by the listener. + + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). """ example_maddr = ( @@ -122,10 +139,22 @@ def main() -> None: type=str, help=f"destination multiaddr string, e.g. {example_maddr}", ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), + ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: - trio.run(run, *(args.port, args.destination)) + trio.run(run, *(args.port, args.destination, use_varint_format)) except KeyboardInterrupt: pass diff --git a/examples/identify_push/identify_push_listener_dialer.py b/examples/identify_push/identify_push_listener_dialer.py index 294b0d17..0e573e0b 100644 --- a/examples/identify_push/identify_push_listener_dialer.py +++ b/examples/identify_push/identify_push_listener_dialer.py @@ -57,18 +57,56 @@ from libp2p.peer.peerinfo import ( logger = logging.getLogger("libp2p.identity.identify-push-example") -def custom_identify_push_handler_for(host): +def custom_identify_push_handler_for(host, use_varint_format: bool = True): """ Create a custom handler for the identify/push protocol that logs and prints the identity information received from the dialer. + + Args: + host: The libp2p host + use_varint_format: If True, expect length-prefixed format; if False, expect + raw protobuf + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read the identify message from the stream - data = await stream.read() + if use_varint_format: + # Read length-prefixed identify message from the stream + from libp2p.utils.varint import decode_varint_from_bytes + + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + data = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + data += chunk + identify_msg = Identify() identify_msg.ParseFromString(data) @@ -129,9 +167,13 @@ def custom_identify_push_handler_for(host): return handle_identify_push -async def run_listener(port: int) -> None: +async def run_listener(port: int, use_varint_format: bool = True) -> None: """Run a host in listener mode.""" - print(f"\n==== Starting Identify-Push Listener on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Listener on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the listener key_pair = create_new_key_pair() @@ -139,9 +181,14 @@ async def run_listener(port: int) -> None: # Create the listener host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -165,9 +212,15 @@ async def run_listener(port: int) -> None: await trio.sleep_forever() -async def run_dialer(port: int, destination: str) -> None: +async def run_dialer( + port: int, destination: str, use_varint_format: bool = True +) -> None: """Run a host in dialer mode that connects to a listener.""" - print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Dialer on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the dialer key_pair = create_new_key_pair() @@ -175,9 +228,14 @@ async def run_dialer(port: int, destination: str) -> None: # Create the dialer host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening on a different port listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None: try: # Call push_identify_to_peer which returns a boolean - success = await push_identify_to_peer(host, peer_info.peer_id) + success = await push_identify_to_peer( + host, peer_info.peer_id, use_varint_format=use_varint_format + ) if success: logger.info("Identify push completed successfully!") @@ -240,11 +300,10 @@ def main() -> None: This program demonstrates the libp2p identify/push protocol. Without arguments, it runs as a listener on random port. With -d parameter, it runs as a dialer on random port. - """ - example = ( - "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" - ) + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). + """ parser = argparse.ArgumentParser(description=description) parser.add_argument("-p", "--port", default=0, type=int, help="source port number") @@ -252,17 +311,29 @@ def main() -> None: "-d", "--destination", type=str, - help=f"destination multiaddr string, e.g. {example}", + help="destination multiaddr string", + ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: if args.destination: # Run in dialer mode with random available port if not specified - trio.run(run_dialer, args.port, args.destination) + trio.run(run_dialer, args.port, args.destination, use_varint_format) else: # Run in listener mode with random available port if not specified - trio.run(run_listener, args.port) + trio.run(run_listener, args.port, use_varint_format) except KeyboardInterrupt: print("\nInterrupted by user") logger.info("Interrupted by user") diff --git a/libp2p/host/defaults.py b/libp2p/host/defaults.py index f0fe855e..5dac8bce 100644 --- a/libp2p/host/defaults.py +++ b/libp2p/host/defaults.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": return OrderedDict( ( - (IdentifyID, identify_handler_for(host, use_varint_format=False)), + (IdentifyID, identify_handler_for(host, use_varint_format=True)), (PingID, handle_ping), ) ) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index f9b031de..688737c3 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -47,40 +47,57 @@ AGENT_VERSION = get_agent_version() CONCURRENCY_LIMIT = 10 -def identify_push_handler_for(host: IHost) -> StreamHandlerFn: +def identify_push_handler_for( + host: IHost, use_varint_format: bool = True +) -> StreamHandlerFn: """ Create a handler for the identify/push protocol. This handler receives pushed identify messages from remote peers and updates the local peerstore with the new information. + + Args: + host: The libp2p host. + use_varint_format: If True, expect length-prefixed format; if False, + expect raw protobuf. + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read length-prefixed identify message from the stream - # First read the varint length prefix - length_bytes = b"" - while True: - b = await stream.read(1) - if not b: - break - length_bytes += b - if b[0] & 0x80 == 0: - break + if use_varint_format: + # Read length-prefixed identify message from the stream + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break - if not length_bytes: - logger.warning("No length prefix received from peer %s", peer_id) - return + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return - msg_length = decode_varint_from_bytes(length_bytes) + msg_length = decode_varint_from_bytes(length_bytes) - # Read the protobuf message - data = await stream.read(msg_length) - if len(data) != msg_length: - logger.warning("Incomplete message received from peer %s", peer_id) - return + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + data = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + data += chunk identify_msg = Identify() identify_msg.ParseFromString(data) @@ -162,6 +179,7 @@ async def push_identify_to_peer( peer_id: ID, observed_multiaddr: Multiaddr | None = None, limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format: bool = True, ) -> bool: """ Push an identify message to a specific peer. @@ -169,10 +187,16 @@ async def push_identify_to_peer( This function opens a stream to the peer using the identify/push protocol, sends the identify message, and closes the stream. - Returns - ------- - bool - True if the push was successful, False otherwise. + Args: + host: The libp2p host. + peer_id: The peer ID to push to. + observed_multiaddr: The observed multiaddress (optional). + limit: Semaphore for concurrency control. + use_varint_format: If True, send length-prefixed format; if False, + send raw protobuf. + + Returns: + bool: True if the push was successful, False otherwise. """ async with limit: @@ -184,9 +208,13 @@ async def push_identify_to_peer( identify_msg = _mk_identify_protobuf(host, observed_multiaddr) response = identify_msg.SerializeToString() - # Send length-prefixed identify message - await stream.write(varint.encode_uvarint(len(response))) - await stream.write(response) + if use_varint_format: + # Send length-prefixed identify message + await stream.write(varint.encode_uvarint(len(response))) + await stream.write(response) + else: + # Send raw protobuf message + await stream.write(response) # Close the stream await stream.close() @@ -202,18 +230,37 @@ async def push_identify_to_peers( host: IHost, peer_ids: set[ID] | None = None, observed_multiaddr: Multiaddr | None = None, + use_varint_format: bool = True, ) -> None: """ Push an identify message to multiple peers in parallel. If peer_ids is None, push to all connected peers. + + Args: + host: The libp2p host. + peer_ids: Set of peer IDs to push to (if None, push to all connected peers). + observed_multiaddr: The observed multiaddress (optional). + use_varint_format: If True, send length-prefixed format; if False, + send raw protobuf. + """ if peer_ids is None: # Get all connected peers peer_ids = set(host.get_connected_peers()) + # Create a single shared semaphore for concurrency control + limit = trio.Semaphore(CONCURRENCY_LIMIT) + # Push to each peer in parallel using a trio.Nursery - # limiting concurrent connections to 10 + # limiting concurrent connections to CONCURRENCY_LIMIT async with trio.open_nursery() as nursery: for peer_id in peer_ids: - nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) + nursery.start_soon( + push_identify_to_peer, + host, + peer_id, + observed_multiaddr, + limit, + use_varint_format, + ) diff --git a/newsfragments/761.breaking.rst b/newsfragments/761.breaking.rst new file mode 100644 index 00000000..cd63a4e3 --- /dev/null +++ b/newsfragments/761.breaking.rst @@ -0,0 +1 @@ +identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 935fb2c0..e62bad7a 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -459,7 +459,11 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): lock = trio.Lock() async def mock_push_identify_to_peer( - host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) + host, + peer_id, + observed_multiaddr=None, + limit=trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format=True, ) -> bool: """ Mock function to test concurrency by simulating an identify message. From 53614200bdf76e4d13beb1cae5c5603bb6ef2bdc Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 13 Jul 2025 17:42:47 +0200 Subject: [PATCH 08/10] doc: fix doc issues --- libp2p/identity/identify_push/identify_push.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 688737c3..5d6cf163 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -58,8 +58,7 @@ def identify_push_handler_for( Args: host: The libp2p host. - use_varint_format: If True, expect length-prefixed format; if False, - expect raw protobuf. + use_varint_format: True=length-prefixed, False=raw protobuf. """ @@ -192,8 +191,7 @@ async def push_identify_to_peer( peer_id: The peer ID to push to. observed_multiaddr: The observed multiaddress (optional). limit: Semaphore for concurrency control. - use_varint_format: If True, send length-prefixed format; if False, - send raw protobuf. + use_varint_format: True=length-prefixed, False=raw protobuf. Returns: bool: True if the push was successful, False otherwise. @@ -241,8 +239,7 @@ async def push_identify_to_peers( host: The libp2p host. peer_ids: Set of peer IDs to push to (if None, push to all connected peers). observed_multiaddr: The observed multiaddress (optional). - use_varint_format: If True, send length-prefixed format; if False, - send raw protobuf. + use_varint_format: True=length-prefixed, False=raw protobuf. """ if peer_ids is None: From 311b75051150ba2954eb23f734d6d817b87a7968 Mon Sep 17 00:00:00 2001 From: Jinesh Jain <732005jinesh@gmail.com> Date: Wed, 16 Jul 2025 20:53:23 +0530 Subject: [PATCH 09/10] add newsfragment file --- newsfragments/760.docs.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/760.docs.rst diff --git a/newsfragments/760.docs.rst b/newsfragments/760.docs.rst new file mode 100644 index 00000000..0cf211dd --- /dev/null +++ b/newsfragments/760.docs.rst @@ -0,0 +1 @@ +Improve error message under the function decode_uvarint_from_stream in libp2p/utils/varint.py file From 4115d033a8844d52fa9070602aea9637d3a0f783 Mon Sep 17 00:00:00 2001 From: acul71 Date: Wed, 16 Jul 2025 20:20:35 +0200 Subject: [PATCH 10/10] feat: identify identify/push raw-format fix and tests --- .../identity/identify_push/identify_push.py | 22 +++- newsfragments/761.internal.rst | 1 + .../identify_push/test_identify_push.py | 101 ++++++++++++++++++ 3 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 newsfragments/761.internal.rst diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 5d6cf163..f13bd970 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -91,12 +91,24 @@ def identify_push_handler_for( return else: # Read raw protobuf message from the stream + # For raw format, we need to read all data before the stream is closed data = b"" - while True: - chunk = await stream.read(4096) - if not chunk: - break - data += chunk + try: + # Read all available data in a single operation + data = await stream.read() + except StreamClosed: + # Try to read any remaining data + try: + data = await stream.read() + except Exception: + pass + + # If we got no data, log a warning and return + if not data: + logger.warning( + "No data received in raw format from peer %s", peer_id + ) + return identify_msg = Identify() identify_msg.ParseFromString(data) diff --git a/newsfragments/761.internal.rst b/newsfragments/761.internal.rst new file mode 100644 index 00000000..59496ebc --- /dev/null +++ b/newsfragments/761.internal.rst @@ -0,0 +1 @@ +Fix raw format reading in identify/push protocol and add comprehensive test coverage for both varint and raw formats diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index e62bad7a..a1e2e472 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -597,3 +597,104 @@ async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_lo assert peer_id_a in dummy_peerstore.peer_ids() nursery.cancel_scope.cancel() + + +@pytest.mark.trio +async def test_identify_push_default_varint_format(security_protocol): + """ + Test that the identify/push protocol uses varint format by default. + + This test verifies that: + 1. The default behavior uses length-prefixed messages (varint format) + 2. Messages are correctly encoded with varint length prefix + 3. Messages are correctly decoded with varint length prefix + 4. The peerstore is updated correctly with the received information + """ + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Set up the identify/push handlers with default settings + # (use_varint_format=True) + host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b)) + + # Push identify information from host_a to host_b using default settings + success = await push_identify_to_peer(host_a, host_b.get_id()) + assert success, "Identify push should succeed with default varint format" + + # Wait a bit for the push to complete + await trio.sleep(0.1) + + # Get the peerstore from host_b + peerstore = host_b.get_peerstore() + peer_id = host_a.get_id() + + # Verify that the peerstore was updated correctly + assert peer_id in peerstore.peer_ids() + + # Check that addresses have been updated + host_a_addrs = set(host_a.get_addrs()) + peerstore_addrs = set(peerstore.addrs(peer_id)) + assert all(addr in peerstore_addrs for addr in host_a_addrs) + + # Check that protocols have been updated + host_a_protocols = set(host_a.get_mux().get_protocols()) + peerstore_protocols = set(peerstore.get_protocols(peer_id)) + assert all(protocol in peerstore_protocols for protocol in host_a_protocols) + + # Check that the public key has been updated + host_a_public_key = host_a.get_public_key().serialize() + peerstore_public_key = peerstore.pubkey(peer_id).serialize() + assert host_a_public_key == peerstore_public_key + + +@pytest.mark.trio +async def test_identify_push_legacy_raw_format(security_protocol): + """ + Test that the identify/push protocol can use legacy raw format when specified. + + This test verifies that: + 1. When use_varint_format=False, messages are sent without length prefix + 2. Raw protobuf messages are correctly encoded and decoded + 3. The peerstore is updated correctly with the received information + 4. The legacy format is backward compatible + """ + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Set up the identify/push handlers with legacy format (use_varint_format=False) + host_b.set_stream_handler( + ID_PUSH, identify_push_handler_for(host_b, use_varint_format=False) + ) + + # Push identify information from host_a to host_b using legacy format + success = await push_identify_to_peer( + host_a, host_b.get_id(), use_varint_format=False + ) + assert success, "Identify push should succeed with legacy raw format" + + # Wait a bit for the push to complete + await trio.sleep(0.1) + + # Get the peerstore from host_b + peerstore = host_b.get_peerstore() + peer_id = host_a.get_id() + + # Verify that the peerstore was updated correctly + assert peer_id in peerstore.peer_ids() + + # Check that addresses have been updated + host_a_addrs = set(host_a.get_addrs()) + peerstore_addrs = set(peerstore.addrs(peer_id)) + assert all(addr in peerstore_addrs for addr in host_a_addrs) + + # Check that protocols have been updated + host_a_protocols = set(host_a.get_mux().get_protocols()) + peerstore_protocols = set(peerstore.get_protocols(peer_id)) + assert all(protocol in peerstore_protocols for protocol in host_a_protocols) + + # Check that the public key has been updated + host_a_public_key = host_a.get_public_key().serialize() + peerstore_public_key = peerstore.pubkey(peer_id).serialize() + assert host_a_public_key == peerstore_public_key