feat: Add cross-platform path utilities module

This commit is contained in:
yashksaini-coder
2025-09-01 01:35:32 +05:30
parent 5c11ac20e7
commit 6a24b138dd
3 changed files with 597 additions and 0 deletions

162
libp2p/utils/paths.py Normal file
View File

@ -0,0 +1,162 @@
"""
Cross-platform path utilities for py-libp2p.
This module provides standardized path operations to ensure consistent
behavior across Windows, macOS, and Linux platforms.
"""
import os
import tempfile
from pathlib import Path
from typing import Union, Optional
PathLike = Union[str, Path]
def get_temp_dir() -> Path:
"""
Get cross-platform temporary directory.
Returns:
Path: Platform-specific temporary directory path
"""
return Path(tempfile.gettempdir())
def get_project_root() -> Path:
"""
Get the project root directory.
Returns:
Path: Path to the py-libp2p project root
"""
# Navigate from libp2p/utils/paths.py to project root
return Path(__file__).parent.parent.parent
def join_paths(*parts: PathLike) -> Path:
"""
Cross-platform path joining.
Args:
*parts: Path components to join
Returns:
Path: Joined path using platform-appropriate separator
"""
return Path(*parts)
def ensure_dir_exists(path: PathLike) -> Path:
"""
Ensure directory exists, create if needed.
Args:
path: Directory path to ensure exists
Returns:
Path: Path object for the directory
"""
path_obj = Path(path)
path_obj.mkdir(parents=True, exist_ok=True)
return path_obj
def get_config_dir() -> Path:
"""
Get user config directory (cross-platform).
Returns:
Path: Platform-specific config directory
"""
if os.name == 'nt': # Windows
appdata = os.environ.get('APPDATA', '')
if appdata:
return Path(appdata) / 'py-libp2p'
else:
# Fallback to user home directory
return Path.home() / 'AppData' / 'Roaming' / 'py-libp2p'
else: # Unix-like (Linux, macOS)
return Path.home() / '.config' / 'py-libp2p'
def get_script_dir(script_path: Optional[PathLike] = None) -> Path:
"""
Get the directory containing a script file.
Args:
script_path: Path to the script file. If None, uses __file__
Returns:
Path: Directory containing the script
"""
if script_path is None:
# This will be the directory of the calling script
import inspect
frame = inspect.currentframe()
if frame and frame.f_back:
script_path = frame.f_back.f_globals.get('__file__')
else:
raise RuntimeError("Could not determine script path")
return Path(script_path).parent.absolute()
def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path:
"""
Create a temporary file with a unique name.
Args:
prefix: File name prefix
suffix: File name suffix
Returns:
Path: Path to the created temporary file
"""
temp_dir = get_temp_dir()
# Create a unique filename using timestamp and random bytes
import time
import secrets
timestamp = time.strftime("%Y%m%d_%H%M%S")
microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string
unique_id = secrets.token_hex(4)
filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}"
temp_file = temp_dir / filename
# Create the file by touching it
temp_file.touch()
return temp_file
def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path:
"""
Resolve a relative path from a base path.
Args:
base_path: Base directory path
relative_path: Relative path to resolve
Returns:
Path: Resolved absolute path
"""
base = Path(base_path).resolve()
relative = Path(relative_path)
if relative.is_absolute():
return relative
else:
return (base / relative).resolve()
def normalize_path(path: PathLike) -> Path:
"""
Normalize a path, resolving any symbolic links and relative components.
Args:
path: Path to normalize
Returns:
Path: Normalized absolute path
"""
return Path(path).resolve()

222
scripts/audit_paths.py Normal file
View File

@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Audit script to identify path handling issues in the py-libp2p codebase.
This script scans for patterns that should be migrated to use the new
cross-platform path utilities.
"""
import re
import os
from pathlib import Path
from typing import List, Dict, Any
import argparse
def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]:
"""
Scan for path handling issues in the codebase.
Args:
directory: Root directory to scan
Returns:
Dictionary mapping issue types to lists of found issues
"""
issues = {
'hard_coded_slash': [],
'os_path_join': [],
'temp_hardcode': [],
'os_path_dirname': [],
'os_path_abspath': [],
'direct_path_concat': [],
}
# Patterns to search for
patterns = {
'hard_coded_slash': r'["\'][^"\']*\/[^"\']*["\']',
'os_path_join': r'os\.path\.join\(',
'temp_hardcode': r'["\']\/tmp\/|["\']C:\\\\',
'os_path_dirname': r'os\.path\.dirname\(',
'os_path_abspath': r'os\.path\.abspath\(',
'direct_path_concat': r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']',
}
# Files to exclude
exclude_patterns = [
r'__pycache__',
r'\.git',
r'\.pytest_cache',
r'\.mypy_cache',
r'\.ruff_cache',
r'env/',
r'venv/',
r'\.venv/',
]
for py_file in directory.rglob("*.py"):
# Skip excluded files
if any(re.search(pattern, str(py_file)) for pattern in exclude_patterns):
continue
try:
content = py_file.read_text(encoding='utf-8')
except UnicodeDecodeError:
print(f"Warning: Could not read {py_file} (encoding issue)")
continue
for issue_type, pattern in patterns.items():
matches = re.finditer(pattern, content, re.MULTILINE)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
line_content = content.split('\n')[line_num - 1].strip()
issues[issue_type].append({
'file': py_file,
'line': line_num,
'content': match.group(),
'full_line': line_content,
'relative_path': py_file.relative_to(directory)
})
return issues
def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> str:
"""
Generate migration suggestions for found issues.
Args:
issues: Dictionary of found issues
Returns:
Formatted string with migration suggestions
"""
suggestions = []
for issue_type, issue_list in issues.items():
if not issue_list:
continue
suggestions.append(f"\n## {issue_type.replace('_', ' ').title()}")
suggestions.append(f"Found {len(issue_list)} instances:")
for issue in issue_list[:10]: # Show first 10 examples
suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}")
suggestions.append(f"```python")
suggestions.append(f"# Current code:")
suggestions.append(f"{issue['full_line']}")
suggestions.append(f"```")
# Add migration suggestion based on issue type
if issue_type == 'os_path_join':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import join_paths")
suggestions.append(f"# Replace os.path.join(a, b, c) with join_paths(a, b, c)")
suggestions.append(f"```")
elif issue_type == 'temp_hardcode':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import get_temp_dir, create_temp_file")
suggestions.append(f"# Replace hard-coded temp paths with get_temp_dir() or create_temp_file()")
suggestions.append(f"```")
elif issue_type == 'os_path_dirname':
suggestions.append(f"```python")
suggestions.append(f"# Suggested fix:")
suggestions.append(f"from libp2p.utils.paths import get_script_dir")
suggestions.append(f"# Replace os.path.dirname(os.path.abspath(__file__)) with get_script_dir(__file__)")
suggestions.append(f"```")
if len(issue_list) > 10:
suggestions.append(f"\n... and {len(issue_list) - 10} more instances")
return "\n".join(suggestions)
def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str:
"""
Generate a summary report of all found issues.
Args:
issues: Dictionary of found issues
Returns:
Formatted summary report
"""
total_issues = sum(len(issue_list) for issue_list in issues.values())
report = [
"# Cross-Platform Path Handling Audit Report",
"",
f"## Summary",
f"Total issues found: {total_issues}",
"",
"## Issue Breakdown:",
]
for issue_type, issue_list in issues.items():
if issue_list:
report.append(f"- **{issue_type.replace('_', ' ').title()}**: {len(issue_list)} instances")
report.append("")
report.append("## Priority Matrix:")
report.append("")
report.append("| Priority | Issue Type | Risk Level | Impact |")
report.append("|----------|------------|------------|---------|")
priority_map = {
'temp_hardcode': ('🔴 P0', 'HIGH', 'Core functionality fails on different platforms'),
'os_path_join': ('🟡 P1', 'MEDIUM', 'Examples and utilities may break'),
'os_path_dirname': ('🟡 P1', 'MEDIUM', 'Script location detection issues'),
'hard_coded_slash': ('🟢 P2', 'LOW', 'Future-proofing and consistency'),
'os_path_abspath': ('🟢 P2', 'LOW', 'Path resolution consistency'),
'direct_path_concat': ('🟢 P2', 'LOW', 'String concatenation issues'),
}
for issue_type, issue_list in issues.items():
if issue_list:
priority, risk, impact = priority_map.get(issue_type, ('🟢 P2', 'LOW', 'General improvement'))
report.append(f"| {priority} | {issue_type.replace('_', ' ').title()} | {risk} | {impact} |")
return "\n".join(report)
def main():
"""Main function to run the audit."""
parser = argparse.ArgumentParser(description="Audit py-libp2p codebase for path handling issues")
parser.add_argument("--directory", default=".", help="Directory to scan (default: current directory)")
parser.add_argument("--output", help="Output file for detailed report")
parser.add_argument("--summary-only", action="store_true", help="Only show summary report")
args = parser.parse_args()
directory = Path(args.directory)
if not directory.exists():
print(f"Error: Directory {directory} does not exist")
return 1
print("🔍 Scanning for path handling issues...")
issues = scan_for_path_issues(directory)
# Generate and display summary
summary = generate_summary_report(issues)
print(summary)
if not args.summary_only:
# Generate detailed suggestions
suggestions = generate_migration_suggestions(issues)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(summary)
f.write(suggestions)
print(f"\n📄 Detailed report saved to {args.output}")
else:
print(suggestions)
return 0
if __name__ == "__main__":
exit(main())

213
tests/utils/test_paths.py Normal file
View File

@ -0,0 +1,213 @@
"""
Tests for cross-platform path utilities.
"""
import os
import tempfile
from pathlib import Path
import pytest
from libp2p.utils.paths import (
get_temp_dir,
get_project_root,
join_paths,
ensure_dir_exists,
get_config_dir,
get_script_dir,
create_temp_file,
resolve_relative_path,
normalize_path,
)
class TestPathUtilities:
"""Test cross-platform path utilities."""
def test_get_temp_dir(self):
"""Test that temp directory is accessible and exists."""
temp_dir = get_temp_dir()
assert isinstance(temp_dir, Path)
assert temp_dir.exists()
assert temp_dir.is_dir()
# Should match system temp directory
assert temp_dir == Path(tempfile.gettempdir())
def test_get_project_root(self):
"""Test that project root is correctly determined."""
project_root = get_project_root()
assert isinstance(project_root, Path)
assert project_root.exists()
# Should contain pyproject.toml
assert (project_root / "pyproject.toml").exists()
# Should contain libp2p directory
assert (project_root / "libp2p").exists()
def test_join_paths(self):
"""Test cross-platform path joining."""
# Test with strings
result = join_paths("a", "b", "c")
expected = Path("a") / "b" / "c"
assert result == expected
# Test with mixed types
result = join_paths("a", Path("b"), "c")
expected = Path("a") / "b" / "c"
assert result == expected
# Test with absolute path
result = join_paths("/absolute", "path")
expected = Path("/absolute") / "path"
assert result == expected
def test_ensure_dir_exists(self, tmp_path):
"""Test directory creation and existence checking."""
# Test creating new directory
new_dir = tmp_path / "new_dir"
result = ensure_dir_exists(new_dir)
assert result == new_dir
assert new_dir.exists()
assert new_dir.is_dir()
# Test creating nested directory
nested_dir = tmp_path / "parent" / "child" / "grandchild"
result = ensure_dir_exists(nested_dir)
assert result == nested_dir
assert nested_dir.exists()
assert nested_dir.is_dir()
# Test with existing directory
result = ensure_dir_exists(new_dir)
assert result == new_dir
assert new_dir.exists()
def test_get_config_dir(self):
"""Test platform-specific config directory."""
config_dir = get_config_dir()
assert isinstance(config_dir, Path)
if os.name == 'nt': # Windows
# Should be in AppData/Roaming or user home
assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir)
else: # Unix-like
# Should be in ~/.config
assert ".config" in str(config_dir)
assert "py-libp2p" in str(config_dir)
def test_get_script_dir(self):
"""Test script directory detection."""
# Test with current file
script_dir = get_script_dir(__file__)
assert isinstance(script_dir, Path)
assert script_dir.exists()
assert script_dir.is_dir()
# Should contain this test file
assert (script_dir / "test_paths.py").exists()
def test_create_temp_file(self):
"""Test temporary file creation."""
temp_file = create_temp_file()
assert isinstance(temp_file, Path)
assert temp_file.parent == get_temp_dir()
assert temp_file.name.startswith("py-libp2p_")
assert temp_file.name.endswith(".log")
# Test with custom prefix and suffix
temp_file = create_temp_file(prefix="test_", suffix=".txt")
assert temp_file.name.startswith("test_")
assert temp_file.name.endswith(".txt")
def test_resolve_relative_path(self, tmp_path):
"""Test relative path resolution."""
base_path = tmp_path / "base"
base_path.mkdir()
# Test relative path
relative_path = "subdir/file.txt"
result = resolve_relative_path(base_path, relative_path)
expected = (base_path / "subdir" / "file.txt").resolve()
assert result == expected
# Test absolute path (platform-agnostic)
if os.name == 'nt': # Windows
absolute_path = "C:\\absolute\\path"
else: # Unix-like
absolute_path = "/absolute/path"
result = resolve_relative_path(base_path, absolute_path)
assert result == Path(absolute_path)
def test_normalize_path(self, tmp_path):
"""Test path normalization."""
# Test with relative path
relative_path = tmp_path / ".." / "normalize_test"
result = normalize_path(relative_path)
assert result.is_absolute()
assert "normalize_test" in str(result)
# Test with absolute path
absolute_path = tmp_path / "test_file"
result = normalize_path(absolute_path)
assert result.is_absolute()
assert result == absolute_path.resolve()
class TestCrossPlatformCompatibility:
"""Test cross-platform compatibility."""
def test_config_dir_platform_specific_windows(self, monkeypatch):
"""Test config directory respects Windows conventions."""
monkeypatch.setattr('os.name', 'nt')
monkeypatch.setenv('APPDATA', 'C:\\Users\\Test\\AppData\\Roaming')
config_dir = get_config_dir()
assert "AppData" in str(config_dir)
assert "py-libp2p" in str(config_dir)
def test_path_separators_consistent(self):
"""Test that path separators are handled consistently."""
# Test that join_paths uses platform-appropriate separators
result = join_paths("dir1", "dir2", "file.txt")
expected = Path("dir1") / "dir2" / "file.txt"
assert result == expected
# Test that the result uses correct separators for the platform
if os.name == 'nt': # Windows
assert "\\" in str(result) or "/" in str(result)
else: # Unix-like
assert "/" in str(result)
def test_temp_file_uniqueness(self):
"""Test that temporary files have unique names."""
files = set()
for _ in range(10):
temp_file = create_temp_file()
assert temp_file not in files
files.add(temp_file)
class TestBackwardCompatibility:
"""Test backward compatibility with existing code patterns."""
def test_path_operations_equivalent(self):
"""Test that new path operations are equivalent to old os.path operations."""
# Test join_paths vs os.path.join
parts = ["a", "b", "c"]
new_result = join_paths(*parts)
old_result = Path(os.path.join(*parts))
assert new_result == old_result
# Test get_script_dir vs os.path.dirname(os.path.abspath(__file__))
new_script_dir = get_script_dir(__file__)
old_script_dir = Path(os.path.dirname(os.path.abspath(__file__)))
assert new_script_dir == old_script_dir
def test_existing_functionality_preserved(self):
"""Ensure no existing functionality is broken."""
# Test that all functions return Path objects
assert isinstance(get_temp_dir(), Path)
assert isinstance(get_project_root(), Path)
assert isinstance(join_paths("a", "b"), Path)
assert isinstance(ensure_dir_exists(tempfile.gettempdir()), Path)
assert isinstance(get_config_dir(), Path)
assert isinstance(get_script_dir(__file__), Path)
assert isinstance(create_temp_file(), Path)
assert isinstance(resolve_relative_path(".", "test"), Path)
assert isinstance(normalize_path("."), Path)