fix(app): 882 Comprehensive cross-platform path handling utilities

This commit is contained in:
yashksaini-coder
2025-09-01 02:03:51 +05:30
parent 6a24b138dd
commit 64ccce17eb
6 changed files with 367 additions and 163 deletions

View File

@ -27,7 +27,9 @@ except ModuleNotFoundError:
import tomli as tomllib # type: ignore (In case of >3.11 Pyrefly doesnt find tomli , which is right but a false flag)
# Path to pyproject.toml (assuming conf.py is in a 'docs' subdirectory)
pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml")
from libp2p.utils.paths import get_project_root, join_paths
pyproject_path = join_paths(get_project_root(), "pyproject.toml")
with open(pyproject_path, "rb") as f:
pyproject_data = tomllib.load(f)

View File

@ -41,6 +41,7 @@ from libp2p.tools.async_service import (
from libp2p.tools.utils import (
info_from_p2p_addr,
)
from libp2p.utils.paths import get_script_dir, join_paths
# Configure logging
logging.basicConfig(
@ -53,8 +54,8 @@ logger = logging.getLogger("kademlia-example")
# Configure DHT module loggers to inherit from the parent logger
# This ensures all kademlia-example.* loggers use the same configuration
# Get the directory where this script is located
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SERVER_ADDR_LOG = os.path.join(SCRIPT_DIR, "server_node_addr.txt")
SCRIPT_DIR = get_script_dir(__file__)
SERVER_ADDR_LOG = join_paths(SCRIPT_DIR, "server_node_addr.txt")
# Set the level for all child loggers
for module in [

View File

@ -1,7 +1,4 @@
import atexit
from datetime import (
datetime,
)
import logging
import logging.handlers
import os
@ -148,13 +145,10 @@ def setup_logging() -> None:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
else:
# Default log file with timestamp and unique identifier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
unique_id = os.urandom(4).hex() # Add a unique identifier to prevent collisions
if os.name == "nt": # Windows
log_file = f"C:\\Windows\\Temp\\py-libp2p_{timestamp}_{unique_id}.log"
else: # Unix-like
log_file = f"/tmp/py-libp2p_{timestamp}_{unique_id}.log"
# Use cross-platform temp file creation
from libp2p.utils.paths import create_temp_file
log_file = str(create_temp_file(prefix="py-libp2p_", suffix=".log"))
# Print the log file path so users know where to find it
print(f"Logging to: {log_file}", file=sys.stderr)

View File

@ -6,9 +6,10 @@ behavior across Windows, macOS, and Linux platforms.
"""
import os
import tempfile
from pathlib import Path
from typing import Union, Optional
import sys
import tempfile
from typing import Union
PathLike = Union[str, Path]
@ -19,6 +20,7 @@ def get_temp_dir() -> Path:
Returns:
Path: Platform-specific temporary directory path
"""
return Path(tempfile.gettempdir())
@ -29,6 +31,7 @@ def get_project_root() -> Path:
Returns:
Path: Path to the py-libp2p project root
"""
# Navigate from libp2p/utils/paths.py to project root
return Path(__file__).parent.parent.parent
@ -43,6 +46,7 @@ def join_paths(*parts: PathLike) -> Path:
Returns:
Path: Joined path using platform-appropriate separator
"""
return Path(*parts)
@ -56,6 +60,7 @@ def ensure_dir_exists(path: PathLike) -> Path:
Returns:
Path: Path object for the directory
"""
path_obj = Path(path)
path_obj.mkdir(parents=True, exist_ok=True)
@ -68,19 +73,20 @@ def get_config_dir() -> Path:
Returns:
Path: Platform-specific config directory
"""
if os.name == 'nt': # Windows
appdata = os.environ.get('APPDATA', '')
if os.name == "nt": # Windows
appdata = os.environ.get("APPDATA", "")
if appdata:
return Path(appdata) / 'py-libp2p'
return Path(appdata) / "py-libp2p"
else:
# Fallback to user home directory
return Path.home() / 'AppData' / 'Roaming' / 'py-libp2p'
return Path.home() / "AppData" / "Roaming" / "py-libp2p"
else: # Unix-like (Linux, macOS)
return Path.home() / '.config' / 'py-libp2p'
return Path.home() / ".config" / "py-libp2p"
def get_script_dir(script_path: Optional[PathLike] = None) -> Path:
def get_script_dir(script_path: PathLike | None = None) -> Path:
"""
Get the directory containing a script file.
@ -89,16 +95,24 @@ def get_script_dir(script_path: Optional[PathLike] = None) -> Path:
Returns:
Path: Directory containing the script
Raises:
RuntimeError: If script path cannot be determined
"""
if script_path is None:
# This will be the directory of the calling script
import inspect
frame = inspect.currentframe()
if frame and frame.f_back:
script_path = frame.f_back.f_globals.get('__file__')
script_path = frame.f_back.f_globals.get("__file__")
else:
raise RuntimeError("Could not determine script path")
if script_path is None:
raise RuntimeError("Script path is None")
return Path(script_path).parent.absolute()
@ -112,11 +126,12 @@ def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path:
Returns:
Path: Path to the created temporary file
"""
temp_dir = get_temp_dir()
# Create a unique filename using timestamp and random bytes
import time
import secrets
import time
timestamp = time.strftime("%Y%m%d_%H%M%S")
microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string
@ -139,6 +154,7 @@ def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path:
Returns:
Path: Resolved absolute path
"""
base = Path(base_path).resolve()
relative = Path(relative_path)
@ -158,5 +174,94 @@ def normalize_path(path: PathLike) -> Path:
Returns:
Path: Normalized absolute path
"""
return Path(path).resolve()
def get_venv_path() -> Path | None:
"""
Get virtual environment path if active.
Returns:
Path: Virtual environment path if active, None otherwise
"""
venv_path = os.environ.get("VIRTUAL_ENV")
if venv_path:
return Path(venv_path)
return None
def get_python_executable() -> Path:
"""
Get current Python executable path.
Returns:
Path: Path to the current Python executable
"""
return Path(sys.executable)
def find_executable(name: str) -> Path | None:
"""
Find executable in system PATH.
Args:
name: Name of the executable to find
Returns:
Path: Path to executable if found, None otherwise
"""
# Check if name already contains path
if os.path.dirname(name):
path = Path(name)
if path.exists() and os.access(path, os.X_OK):
return path
return None
# Search in PATH
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
if not path_dir:
continue
path = Path(path_dir) / name
if path.exists() and os.access(path, os.X_OK):
return path
return None
def get_script_binary_path() -> Path:
"""
Get path to script's binary directory.
Returns:
Path: Directory containing the script's binary
"""
return get_python_executable().parent
def get_binary_path(binary_name: str) -> Path | None:
"""
Find binary in PATH or virtual environment.
Args:
binary_name: Name of the binary to find
Returns:
Path: Path to binary if found, None otherwise
"""
# First check in virtual environment if active
venv_path = get_venv_path()
if venv_path:
venv_bin = venv_path / "bin" if os.name != "nt" else venv_path / "Scripts"
binary_path = venv_bin / binary_name
if binary_path.exists() and os.access(binary_path, os.X_OK):
return binary_path
# Fall back to system PATH
return find_executable(binary_name)

View File

@ -6,14 +6,13 @@ This script scans for patterns that should be migrated to use the new
cross-platform path utilities.
"""
import re
import os
from pathlib import Path
from typing import List, Dict, Any
import argparse
from pathlib import Path
import re
from typing import Any
def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]:
def scan_for_path_issues(directory: Path) -> dict[str, list[dict[str, Any]]]:
"""
Scan for path handling issues in the codebase.
@ -22,36 +21,37 @@ def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]:
Returns:
Dictionary mapping issue types to lists of found issues
"""
issues = {
'hard_coded_slash': [],
'os_path_join': [],
'temp_hardcode': [],
'os_path_dirname': [],
'os_path_abspath': [],
'direct_path_concat': [],
"hard_coded_slash": [],
"os_path_join": [],
"temp_hardcode": [],
"os_path_dirname": [],
"os_path_abspath": [],
"direct_path_concat": [],
}
# Patterns to search for
patterns = {
'hard_coded_slash': r'["\'][^"\']*\/[^"\']*["\']',
'os_path_join': r'os\.path\.join\(',
'temp_hardcode': r'["\']\/tmp\/|["\']C:\\\\',
'os_path_dirname': r'os\.path\.dirname\(',
'os_path_abspath': r'os\.path\.abspath\(',
'direct_path_concat': r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']',
"hard_coded_slash": r'["\'][^"\']*\/[^"\']*["\']',
"os_path_join": r"os\.path\.join\(",
"temp_hardcode": r'["\']\/tmp\/|["\']C:\\\\',
"os_path_dirname": r"os\.path\.dirname\(",
"os_path_abspath": r"os\.path\.abspath\(",
"direct_path_concat": r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']',
}
# Files to exclude
exclude_patterns = [
r'__pycache__',
r'\.git',
r'\.pytest_cache',
r'\.mypy_cache',
r'\.ruff_cache',
r'env/',
r'venv/',
r'\.venv/',
r"__pycache__",
r"\.git",
r"\.pytest_cache",
r"\.mypy_cache",
r"\.ruff_cache",
r"env/",
r"venv/",
r"\.venv/",
]
for py_file in directory.rglob("*.py"):
@ -60,7 +60,7 @@ def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]:
continue
try:
content = py_file.read_text(encoding='utf-8')
content = py_file.read_text(encoding="utf-8")
except UnicodeDecodeError:
print(f"Warning: Could not read {py_file} (encoding issue)")
continue
@ -68,21 +68,23 @@ def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]:
for issue_type, pattern in patterns.items():
matches = re.finditer(pattern, content, re.MULTILINE)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
line_content = content.split('\n')[line_num - 1].strip()
line_num = content[: match.start()].count("\n") + 1
line_content = content.split("\n")[line_num - 1].strip()
issues[issue_type].append({
'file': py_file,
'line': line_num,
'content': match.group(),
'full_line': line_content,
'relative_path': py_file.relative_to(directory)
})
issues[issue_type].append(
{
"file": py_file,
"line": line_num,
"content": match.group(),
"full_line": line_content,
"relative_path": py_file.relative_to(directory),
}
)
return issues
def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> str:
def generate_migration_suggestions(issues: dict[str, list[dict[str, Any]]]) -> str:
"""
Generate migration suggestions for found issues.
@ -91,6 +93,7 @@ def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> s
Returns:
Formatted string with migration suggestions
"""
suggestions = []
@ -103,30 +106,42 @@ def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> s
for issue in issue_list[:10]: # Show first 10 examples
suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}")
suggestions.append(f"```python")
suggestions.append(f"# Current code:")
suggestions.append("```python")
suggestions.append("# Current code:")
suggestions.append(f"{issue['full_line']}")
suggestions.append(f"```")
suggestions.append("```")
# Add migration suggestion based on issue type
if issue_type == 'os_path_join':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import join_paths")
suggestions.append(f"# Replace os.path.join(a, b, c) with join_paths(a, b, c)")
suggestions.append(f"```")
elif issue_type == 'temp_hardcode':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import get_temp_dir, create_temp_file")
suggestions.append(f"# Replace hard-coded temp paths with get_temp_dir() or create_temp_file()")
suggestions.append(f"```")
elif issue_type == 'os_path_dirname':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import get_script_dir")
suggestions.append(f"# Replace os.path.dirname(os.path.abspath(__file__)) with get_script_dir(__file__)")
suggestions.append(f"```")
if issue_type == "os_path_join":
suggestions.append("```python")
suggestions.append("# Suggested fix:")
suggestions.append("from libp2p.utils.paths import join_paths")
suggestions.append(
"# Replace os.path.join(a, b, c) with join_paths(a, b, c)"
)
suggestions.append("```")
elif issue_type == "temp_hardcode":
suggestions.append("```python")
suggestions.append("# Suggested fix:")
suggestions.append(
"from libp2p.utils.paths import get_temp_dir, create_temp_file"
)
temp_fix_msg = (
"# Replace hard-coded temp paths with get_temp_dir() or "
"create_temp_file()"
)
suggestions.append(temp_fix_msg)
suggestions.append("```")
elif issue_type == "os_path_dirname":
suggestions.append("```python")
suggestions.append("# Suggested fix:")
suggestions.append("from libp2p.utils.paths import get_script_dir")
script_dir_fix_msg = (
"# Replace os.path.dirname(os.path.abspath(__file__)) with "
"get_script_dir(__file__)"
)
suggestions.append(script_dir_fix_msg)
suggestions.append("```")
if len(issue_list) > 10:
suggestions.append(f"\n... and {len(issue_list) - 10} more instances")
@ -134,7 +149,7 @@ def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> s
return "\n".join(suggestions)
def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str:
def generate_summary_report(issues: dict[str, list[dict[str, Any]]]) -> str:
"""
Generate a summary report of all found issues.
@ -143,13 +158,14 @@ def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str:
Returns:
Formatted summary report
"""
total_issues = sum(len(issue_list) for issue_list in issues.values())
report = [
"# Cross-Platform Path Handling Audit Report",
"",
f"## Summary",
"## Summary",
f"Total issues found: {total_issues}",
"",
"## Issue Breakdown:",
@ -157,7 +173,9 @@ def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str:
for issue_type, issue_list in issues.items():
if issue_list:
report.append(f"- **{issue_type.replace('_', ' ').title()}**: {len(issue_list)} instances")
issue_title = issue_type.replace("_", " ").title()
instances_count = len(issue_list)
report.append(f"- **{issue_title}**: {instances_count} instances")
report.append("")
report.append("## Priority Matrix:")
@ -166,28 +184,43 @@ def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str:
report.append("|----------|------------|------------|---------|")
priority_map = {
'temp_hardcode': ('🔴 P0', 'HIGH', 'Core functionality fails on different platforms'),
'os_path_join': ('🟡 P1', 'MEDIUM', 'Examples and utilities may break'),
'os_path_dirname': ('🟡 P1', 'MEDIUM', 'Script location detection issues'),
'hard_coded_slash': ('🟢 P2', 'LOW', 'Future-proofing and consistency'),
'os_path_abspath': ('🟢 P2', 'LOW', 'Path resolution consistency'),
'direct_path_concat': ('🟢 P2', 'LOW', 'String concatenation issues'),
"temp_hardcode": (
"🔴 P0",
"HIGH",
"Core functionality fails on different platforms",
),
"os_path_join": ("🟡 P1", "MEDIUM", "Examples and utilities may break"),
"os_path_dirname": ("🟡 P1", "MEDIUM", "Script location detection issues"),
"hard_coded_slash": ("🟢 P2", "LOW", "Future-proofing and consistency"),
"os_path_abspath": ("🟢 P2", "LOW", "Path resolution consistency"),
"direct_path_concat": ("🟢 P2", "LOW", "String concatenation issues"),
}
for issue_type, issue_list in issues.items():
if issue_list:
priority, risk, impact = priority_map.get(issue_type, ('🟢 P2', 'LOW', 'General improvement'))
report.append(f"| {priority} | {issue_type.replace('_', ' ').title()} | {risk} | {impact} |")
priority, risk, impact = priority_map.get(
issue_type, ("🟢 P2", "LOW", "General improvement")
)
issue_title = issue_type.replace("_", " ").title()
report.append(f"| {priority} | {issue_title} | {risk} | {impact} |")
return "\n".join(report)
def main():
"""Main function to run the audit."""
parser = argparse.ArgumentParser(description="Audit py-libp2p codebase for path handling issues")
parser.add_argument("--directory", default=".", help="Directory to scan (default: current directory)")
parser = argparse.ArgumentParser(
description="Audit py-libp2p codebase for path handling issues"
)
parser.add_argument(
"--directory",
default=".",
help="Directory to scan (default: current directory)",
)
parser.add_argument("--output", help="Output file for detailed report")
parser.add_argument("--summary-only", action="store_true", help="Only show summary report")
parser.add_argument(
"--summary-only", action="store_true", help="Only show summary report"
)
args = parser.parse_args()
@ -208,7 +241,7 @@ def main():
suggestions = generate_migration_suggestions(issues)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
with open(args.output, "w", encoding="utf-8") as f:
f.write(summary)
f.write(suggestions)
print(f"\n📄 Detailed report saved to {args.output}")

View File

@ -3,20 +3,24 @@ Tests for cross-platform path utilities.
"""
import os
import tempfile
from pathlib import Path
import pytest
import tempfile
from libp2p.utils.paths import (
get_temp_dir,
get_project_root,
join_paths,
ensure_dir_exists,
get_config_dir,
get_script_dir,
create_temp_file,
resolve_relative_path,
ensure_dir_exists,
find_executable,
get_binary_path,
get_config_dir,
get_project_root,
get_python_executable,
get_script_binary_path,
get_script_dir,
get_temp_dir,
get_venv_path,
join_paths,
normalize_path,
resolve_relative_path,
)
@ -85,7 +89,7 @@ class TestPathUtilities:
config_dir = get_config_dir()
assert isinstance(config_dir, Path)
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
# Should be in AppData/Roaming or user home
assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir)
else: # Unix-like
@ -128,7 +132,7 @@ class TestPathUtilities:
assert result == expected
# Test absolute path (platform-agnostic)
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
absolute_path = "C:\\absolute\\path"
else: # Unix-like
absolute_path = "/absolute/path"
@ -149,14 +153,72 @@ class TestPathUtilities:
assert result.is_absolute()
assert result == absolute_path.resolve()
def test_get_venv_path(self, monkeypatch):
"""Test virtual environment path detection."""
# Test when no virtual environment is active
# Temporarily clear VIRTUAL_ENV to test the "no venv" case
monkeypatch.delenv("VIRTUAL_ENV", raising=False)
result = get_venv_path()
assert result is None
# Test when virtual environment is active
test_venv_path = "/path/to/venv"
monkeypatch.setenv("VIRTUAL_ENV", test_venv_path)
result = get_venv_path()
assert result == Path(test_venv_path)
def test_get_python_executable(self):
"""Test Python executable path detection."""
result = get_python_executable()
assert isinstance(result, Path)
assert result.exists()
assert result.name.startswith("python")
def test_find_executable(self):
"""Test executable finding in PATH."""
# Test with non-existent executable
result = find_executable("nonexistent_executable")
assert result is None
# Test with existing executable (python should be available)
result = find_executable("python")
if result:
assert isinstance(result, Path)
assert result.exists()
def test_get_script_binary_path(self):
"""Test script binary path detection."""
result = get_script_binary_path()
assert isinstance(result, Path)
assert result.exists()
assert result.is_dir()
def test_get_binary_path(self, monkeypatch):
"""Test binary path resolution with virtual environment."""
# Test when no virtual environment is active
result = get_binary_path("python")
if result:
assert isinstance(result, Path)
assert result.exists()
# Test when virtual environment is active
test_venv_path = "/path/to/venv"
monkeypatch.setenv("VIRTUAL_ENV", test_venv_path)
# This test is more complex as it depends on the actual venv structure
# We'll just verify the function doesn't crash
result = get_binary_path("python")
# Result can be None if binary not found in venv
if result:
assert isinstance(result, Path)
class TestCrossPlatformCompatibility:
"""Test cross-platform compatibility."""
def test_config_dir_platform_specific_windows(self, monkeypatch):
"""Test config directory respects Windows conventions."""
monkeypatch.setattr('os.name', 'nt')
monkeypatch.setenv('APPDATA', 'C:\\Users\\Test\\AppData\\Roaming')
monkeypatch.setattr("os.name", "nt")
monkeypatch.setenv("APPDATA", "C:\\Users\\Test\\AppData\\Roaming")
config_dir = get_config_dir()
assert "AppData" in str(config_dir)
assert "py-libp2p" in str(config_dir)
@ -169,7 +231,7 @@ class TestCrossPlatformCompatibility:
assert result == expected
# Test that the result uses correct separators for the platform
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
assert "\\" in str(result) or "/" in str(result)
else: # Unix-like
assert "/" in str(result)
@ -211,3 +273,10 @@ class TestBackwardCompatibility:
assert isinstance(create_temp_file(), Path)
assert isinstance(resolve_relative_path(".", "test"), Path)
assert isinstance(normalize_path("."), Path)
assert isinstance(get_python_executable(), Path)
assert isinstance(get_script_binary_path(), Path)
# Test optional return types
venv_path = get_venv_path()
if venv_path is not None:
assert isinstance(venv_path, Path)