From 6a24b138dd65b690ccc0e1f214d2d29a9f4c9b16 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Mon, 1 Sep 2025 01:35:32 +0530 Subject: [PATCH] feat: Add cross-platform path utilities module --- libp2p/utils/paths.py | 162 ++++++++++++++++++++++++++++ scripts/audit_paths.py | 222 ++++++++++++++++++++++++++++++++++++++ tests/utils/test_paths.py | 213 ++++++++++++++++++++++++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 libp2p/utils/paths.py create mode 100644 scripts/audit_paths.py create mode 100644 tests/utils/test_paths.py diff --git a/libp2p/utils/paths.py b/libp2p/utils/paths.py new file mode 100644 index 00000000..27924d8f --- /dev/null +++ b/libp2p/utils/paths.py @@ -0,0 +1,162 @@ +""" +Cross-platform path utilities for py-libp2p. + +This module provides standardized path operations to ensure consistent +behavior across Windows, macOS, and Linux platforms. +""" + +import os +import tempfile +from pathlib import Path +from typing import Union, Optional + +PathLike = Union[str, Path] + + +def get_temp_dir() -> Path: + """ + Get cross-platform temporary directory. + + Returns: + Path: Platform-specific temporary directory path + """ + return Path(tempfile.gettempdir()) + + +def get_project_root() -> Path: + """ + Get the project root directory. + + Returns: + Path: Path to the py-libp2p project root + """ + # Navigate from libp2p/utils/paths.py to project root + return Path(__file__).parent.parent.parent + + +def join_paths(*parts: PathLike) -> Path: + """ + Cross-platform path joining. + + Args: + *parts: Path components to join + + Returns: + Path: Joined path using platform-appropriate separator + """ + return Path(*parts) + + +def ensure_dir_exists(path: PathLike) -> Path: + """ + Ensure directory exists, create if needed. + + Args: + path: Directory path to ensure exists + + Returns: + Path: Path object for the directory + """ + path_obj = Path(path) + path_obj.mkdir(parents=True, exist_ok=True) + return path_obj + + +def get_config_dir() -> Path: + """ + Get user config directory (cross-platform). + + Returns: + Path: Platform-specific config directory + """ + if os.name == 'nt': # Windows + appdata = os.environ.get('APPDATA', '') + if appdata: + return Path(appdata) / 'py-libp2p' + else: + # Fallback to user home directory + return Path.home() / 'AppData' / 'Roaming' / 'py-libp2p' + else: # Unix-like (Linux, macOS) + return Path.home() / '.config' / 'py-libp2p' + + +def get_script_dir(script_path: Optional[PathLike] = None) -> Path: + """ + Get the directory containing a script file. + + Args: + script_path: Path to the script file. If None, uses __file__ + + Returns: + Path: Directory containing the script + """ + if script_path is None: + # This will be the directory of the calling script + import inspect + frame = inspect.currentframe() + if frame and frame.f_back: + script_path = frame.f_back.f_globals.get('__file__') + else: + raise RuntimeError("Could not determine script path") + + return Path(script_path).parent.absolute() + + +def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path: + """ + Create a temporary file with a unique name. + + Args: + prefix: File name prefix + suffix: File name suffix + + Returns: + Path: Path to the created temporary file + """ + temp_dir = get_temp_dir() + # Create a unique filename using timestamp and random bytes + import time + import secrets + + timestamp = time.strftime("%Y%m%d_%H%M%S") + microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string + unique_id = secrets.token_hex(4) + filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}" + + temp_file = temp_dir / filename + # Create the file by touching it + temp_file.touch() + return temp_file + + +def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path: + """ + Resolve a relative path from a base path. + + Args: + base_path: Base directory path + relative_path: Relative path to resolve + + Returns: + Path: Resolved absolute path + """ + base = Path(base_path).resolve() + relative = Path(relative_path) + + if relative.is_absolute(): + return relative + else: + return (base / relative).resolve() + + +def normalize_path(path: PathLike) -> Path: + """ + Normalize a path, resolving any symbolic links and relative components. + + Args: + path: Path to normalize + + Returns: + Path: Normalized absolute path + """ + return Path(path).resolve() diff --git a/scripts/audit_paths.py b/scripts/audit_paths.py new file mode 100644 index 00000000..b0079869 --- /dev/null +++ b/scripts/audit_paths.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Audit script to identify path handling issues in the py-libp2p codebase. + +This script scans for patterns that should be migrated to use the new +cross-platform path utilities. +""" + +import re +import os +from pathlib import Path +from typing import List, Dict, Any +import argparse + + +def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]: + """ + Scan for path handling issues in the codebase. + + Args: + directory: Root directory to scan + + Returns: + Dictionary mapping issue types to lists of found issues + """ + issues = { + 'hard_coded_slash': [], + 'os_path_join': [], + 'temp_hardcode': [], + 'os_path_dirname': [], + 'os_path_abspath': [], + 'direct_path_concat': [], + } + + # Patterns to search for + patterns = { + 'hard_coded_slash': r'["\'][^"\']*\/[^"\']*["\']', + 'os_path_join': r'os\.path\.join\(', + 'temp_hardcode': r'["\']\/tmp\/|["\']C:\\\\', + 'os_path_dirname': r'os\.path\.dirname\(', + 'os_path_abspath': r'os\.path\.abspath\(', + 'direct_path_concat': r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']', + } + + # Files to exclude + exclude_patterns = [ + r'__pycache__', + r'\.git', + r'\.pytest_cache', + r'\.mypy_cache', + r'\.ruff_cache', + r'env/', + r'venv/', + r'\.venv/', + ] + + for py_file in directory.rglob("*.py"): + # Skip excluded files + if any(re.search(pattern, str(py_file)) for pattern in exclude_patterns): + continue + + try: + content = py_file.read_text(encoding='utf-8') + except UnicodeDecodeError: + print(f"Warning: Could not read {py_file} (encoding issue)") + continue + + for issue_type, pattern in patterns.items(): + matches = re.finditer(pattern, content, re.MULTILINE) + for match in matches: + line_num = content[:match.start()].count('\n') + 1 + line_content = content.split('\n')[line_num - 1].strip() + + issues[issue_type].append({ + 'file': py_file, + 'line': line_num, + 'content': match.group(), + 'full_line': line_content, + 'relative_path': py_file.relative_to(directory) + }) + + return issues + + +def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> str: + """ + Generate migration suggestions for found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted string with migration suggestions + """ + suggestions = [] + + for issue_type, issue_list in issues.items(): + if not issue_list: + continue + + suggestions.append(f"\n## {issue_type.replace('_', ' ').title()}") + suggestions.append(f"Found {len(issue_list)} instances:") + + for issue in issue_list[:10]: # Show first 10 examples + suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}") + suggestions.append(f"```python") + suggestions.append(f"# Current code:") + suggestions.append(f"{issue['full_line']}") + suggestions.append(f"```") + + # Add migration suggestion based on issue type + if issue_type == 'os_path_join': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import join_paths") + suggestions.append(f"# Replace os.path.join(a, b, c) with join_paths(a, b, c)") + suggestions.append(f"```") + elif issue_type == 'temp_hardcode': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import get_temp_dir, create_temp_file") + suggestions.append(f"# Replace hard-coded temp paths with get_temp_dir() or create_temp_file()") + suggestions.append(f"```") + elif issue_type == 'os_path_dirname': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import get_script_dir") + suggestions.append(f"# Replace os.path.dirname(os.path.abspath(__file__)) with get_script_dir(__file__)") + suggestions.append(f"```") + + if len(issue_list) > 10: + suggestions.append(f"\n... and {len(issue_list) - 10} more instances") + + return "\n".join(suggestions) + + +def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str: + """ + Generate a summary report of all found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted summary report + """ + total_issues = sum(len(issue_list) for issue_list in issues.values()) + + report = [ + "# Cross-Platform Path Handling Audit Report", + "", + f"## Summary", + f"Total issues found: {total_issues}", + "", + "## Issue Breakdown:", + ] + + for issue_type, issue_list in issues.items(): + if issue_list: + report.append(f"- **{issue_type.replace('_', ' ').title()}**: {len(issue_list)} instances") + + report.append("") + report.append("## Priority Matrix:") + report.append("") + report.append("| Priority | Issue Type | Risk Level | Impact |") + report.append("|----------|------------|------------|---------|") + + priority_map = { + 'temp_hardcode': ('šŸ”“ P0', 'HIGH', 'Core functionality fails on different platforms'), + 'os_path_join': ('🟔 P1', 'MEDIUM', 'Examples and utilities may break'), + 'os_path_dirname': ('🟔 P1', 'MEDIUM', 'Script location detection issues'), + 'hard_coded_slash': ('🟢 P2', 'LOW', 'Future-proofing and consistency'), + 'os_path_abspath': ('🟢 P2', 'LOW', 'Path resolution consistency'), + 'direct_path_concat': ('🟢 P2', 'LOW', 'String concatenation issues'), + } + + for issue_type, issue_list in issues.items(): + if issue_list: + priority, risk, impact = priority_map.get(issue_type, ('🟢 P2', 'LOW', 'General improvement')) + report.append(f"| {priority} | {issue_type.replace('_', ' ').title()} | {risk} | {impact} |") + + return "\n".join(report) + + +def main(): + """Main function to run the audit.""" + parser = argparse.ArgumentParser(description="Audit py-libp2p codebase for path handling issues") + parser.add_argument("--directory", default=".", help="Directory to scan (default: current directory)") + parser.add_argument("--output", help="Output file for detailed report") + parser.add_argument("--summary-only", action="store_true", help="Only show summary report") + + args = parser.parse_args() + + directory = Path(args.directory) + if not directory.exists(): + print(f"Error: Directory {directory} does not exist") + return 1 + + print("šŸ” Scanning for path handling issues...") + issues = scan_for_path_issues(directory) + + # Generate and display summary + summary = generate_summary_report(issues) + print(summary) + + if not args.summary_only: + # Generate detailed suggestions + suggestions = generate_migration_suggestions(issues) + + if args.output: + with open(args.output, 'w', encoding='utf-8') as f: + f.write(summary) + f.write(suggestions) + print(f"\nšŸ“„ Detailed report saved to {args.output}") + else: + print(suggestions) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py new file mode 100644 index 00000000..fcd4c08a --- /dev/null +++ b/tests/utils/test_paths.py @@ -0,0 +1,213 @@ +""" +Tests for cross-platform path utilities. +""" + +import os +import tempfile +from pathlib import Path +import pytest + +from libp2p.utils.paths import ( + get_temp_dir, + get_project_root, + join_paths, + ensure_dir_exists, + get_config_dir, + get_script_dir, + create_temp_file, + resolve_relative_path, + normalize_path, +) + + +class TestPathUtilities: + """Test cross-platform path utilities.""" + + def test_get_temp_dir(self): + """Test that temp directory is accessible and exists.""" + temp_dir = get_temp_dir() + assert isinstance(temp_dir, Path) + assert temp_dir.exists() + assert temp_dir.is_dir() + # Should match system temp directory + assert temp_dir == Path(tempfile.gettempdir()) + + def test_get_project_root(self): + """Test that project root is correctly determined.""" + project_root = get_project_root() + assert isinstance(project_root, Path) + assert project_root.exists() + # Should contain pyproject.toml + assert (project_root / "pyproject.toml").exists() + # Should contain libp2p directory + assert (project_root / "libp2p").exists() + + def test_join_paths(self): + """Test cross-platform path joining.""" + # Test with strings + result = join_paths("a", "b", "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with mixed types + result = join_paths("a", Path("b"), "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with absolute path + result = join_paths("/absolute", "path") + expected = Path("/absolute") / "path" + assert result == expected + + def test_ensure_dir_exists(self, tmp_path): + """Test directory creation and existence checking.""" + # Test creating new directory + new_dir = tmp_path / "new_dir" + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + assert new_dir.is_dir() + + # Test creating nested directory + nested_dir = tmp_path / "parent" / "child" / "grandchild" + result = ensure_dir_exists(nested_dir) + assert result == nested_dir + assert nested_dir.exists() + assert nested_dir.is_dir() + + # Test with existing directory + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + + def test_get_config_dir(self): + """Test platform-specific config directory.""" + config_dir = get_config_dir() + assert isinstance(config_dir, Path) + + if os.name == 'nt': # Windows + # Should be in AppData/Roaming or user home + assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir) + else: # Unix-like + # Should be in ~/.config + assert ".config" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_get_script_dir(self): + """Test script directory detection.""" + # Test with current file + script_dir = get_script_dir(__file__) + assert isinstance(script_dir, Path) + assert script_dir.exists() + assert script_dir.is_dir() + # Should contain this test file + assert (script_dir / "test_paths.py").exists() + + def test_create_temp_file(self): + """Test temporary file creation.""" + temp_file = create_temp_file() + assert isinstance(temp_file, Path) + assert temp_file.parent == get_temp_dir() + assert temp_file.name.startswith("py-libp2p_") + assert temp_file.name.endswith(".log") + + # Test with custom prefix and suffix + temp_file = create_temp_file(prefix="test_", suffix=".txt") + assert temp_file.name.startswith("test_") + assert temp_file.name.endswith(".txt") + + def test_resolve_relative_path(self, tmp_path): + """Test relative path resolution.""" + base_path = tmp_path / "base" + base_path.mkdir() + + # Test relative path + relative_path = "subdir/file.txt" + result = resolve_relative_path(base_path, relative_path) + expected = (base_path / "subdir" / "file.txt").resolve() + assert result == expected + + # Test absolute path (platform-agnostic) + if os.name == 'nt': # Windows + absolute_path = "C:\\absolute\\path" + else: # Unix-like + absolute_path = "/absolute/path" + result = resolve_relative_path(base_path, absolute_path) + assert result == Path(absolute_path) + + def test_normalize_path(self, tmp_path): + """Test path normalization.""" + # Test with relative path + relative_path = tmp_path / ".." / "normalize_test" + result = normalize_path(relative_path) + assert result.is_absolute() + assert "normalize_test" in str(result) + + # Test with absolute path + absolute_path = tmp_path / "test_file" + result = normalize_path(absolute_path) + assert result.is_absolute() + assert result == absolute_path.resolve() + + +class TestCrossPlatformCompatibility: + """Test cross-platform compatibility.""" + + def test_config_dir_platform_specific_windows(self, monkeypatch): + """Test config directory respects Windows conventions.""" + monkeypatch.setattr('os.name', 'nt') + monkeypatch.setenv('APPDATA', 'C:\\Users\\Test\\AppData\\Roaming') + config_dir = get_config_dir() + assert "AppData" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_path_separators_consistent(self): + """Test that path separators are handled consistently.""" + # Test that join_paths uses platform-appropriate separators + result = join_paths("dir1", "dir2", "file.txt") + expected = Path("dir1") / "dir2" / "file.txt" + assert result == expected + + # Test that the result uses correct separators for the platform + if os.name == 'nt': # Windows + assert "\\" in str(result) or "/" in str(result) + else: # Unix-like + assert "/" in str(result) + + def test_temp_file_uniqueness(self): + """Test that temporary files have unique names.""" + files = set() + for _ in range(10): + temp_file = create_temp_file() + assert temp_file not in files + files.add(temp_file) + + +class TestBackwardCompatibility: + """Test backward compatibility with existing code patterns.""" + + def test_path_operations_equivalent(self): + """Test that new path operations are equivalent to old os.path operations.""" + # Test join_paths vs os.path.join + parts = ["a", "b", "c"] + new_result = join_paths(*parts) + old_result = Path(os.path.join(*parts)) + assert new_result == old_result + + # Test get_script_dir vs os.path.dirname(os.path.abspath(__file__)) + new_script_dir = get_script_dir(__file__) + old_script_dir = Path(os.path.dirname(os.path.abspath(__file__))) + assert new_script_dir == old_script_dir + + def test_existing_functionality_preserved(self): + """Ensure no existing functionality is broken.""" + # Test that all functions return Path objects + assert isinstance(get_temp_dir(), Path) + assert isinstance(get_project_root(), Path) + assert isinstance(join_paths("a", "b"), Path) + assert isinstance(ensure_dir_exists(tempfile.gettempdir()), Path) + assert isinstance(get_config_dir(), Path) + assert isinstance(get_script_dir(__file__), Path) + assert isinstance(create_temp_file(), Path) + assert isinstance(resolve_relative_path(".", "test"), Path) + assert isinstance(normalize_path("."), Path)