Compare commits

..

12 Commits

Author SHA1 Message Date
1d555ddd47 Tests: Add tests/README.md
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-30 19:35:08 +05:30
ee444447b9 Tests: Add tests/__init__.py and pythonpath config
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-30 19:34:47 +05:30
da57911122 Tests: Add automated testing framework with coverage support
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-30 19:33:43 +05:30
0498885f71 Merge pull request #82 from pythonbpf/compilation-context
PythonBPF: Add Compilation Context to allow parallel compilation of multiple jobs
2026-03-29 10:02:10 +02:00
3f4f95115f Core: Pass compilation_context to _prepare_expr_args as it calls eval_expr 2026-03-29 13:28:38 +05:30
f2b9767098 Core: Fix args in helper/printk_formatter 2026-03-29 13:11:44 +05:30
0e087b9ea5 Core: Revert unnecessary changes to allocation_pass 2026-03-29 02:48:16 +05:30
ccbdfee9de Core: Remove unsused args in assign_pass 2026-03-29 02:44:42 +05:30
61bca6bad9 Core: Fix global pass to divide internal functions 2026-03-29 02:40:55 +05:30
305a8ba9e3 Core: Fix unnecessary args and changes in maps pass 2026-03-29 01:36:46 +05:30
b7f917c3c2 Merge pull request #83 from pythonbpf/dependabot/github_actions/actions-985357984d
build(deps): bump the actions group with 2 updates
2026-03-07 14:11:31 +05:30
b025ae7158 build(deps): bump the actions group with 2 updates
Bumps the actions group with 2 updates: [actions/upload-artifact](https://github.com/actions/upload-artifact) and [actions/download-artifact](https://github.com/actions/download-artifact).


Updates `actions/upload-artifact` from 6 to 7
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v6...v7)

Updates `actions/download-artifact` from 7 to 8
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v7...v8)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '7'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
- dependency-name: actions/download-artifact
  dependency-version: '8'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-02 11:45:33 +00:00
21 changed files with 590 additions and 75 deletions

View File

@ -33,7 +33,7 @@ jobs:
python -m build python -m build
- name: Upload distributions - name: Upload distributions
uses: actions/upload-artifact@v6 uses: actions/upload-artifact@v7
with: with:
name: release-dists name: release-dists
path: dist/ path: dist/
@ -59,7 +59,7 @@ jobs:
steps: steps:
- name: Retrieve release distributions - name: Retrieve release distributions
uses: actions/download-artifact@v7 uses: actions/download-artifact@v8
with: with:
name: release-dists name: release-dists
path: dist/ path: dist/

View File

@ -1,10 +1,22 @@
install: install:
pip install -e . uv pip install -e ".[test]"
clean: clean:
rm -rf build dist *.egg-info rm -rf build dist *.egg-info
rm -rf examples/*.ll examples/*.o rm -rf examples/*.ll examples/*.o
rm -rf htmlcov .coverage
test:
pytest tests/ -v --tb=short -m "not verifier"
test-cov:
pytest tests/ -v --tb=short -m "not verifier" \
--cov=pythonbpf --cov-report=term-missing --cov-report=html
test-verifier:
@echo "NOTE: verifier tests require sudo and bpftool. Uses sudo .venv/bin/python3."
pytest tests/test_verifier.py -v --tb=short -m verifier
all: clean install all: clean install
.PHONY: all clean .PHONY: all clean install test test-cov test-verifier

View File

@ -41,7 +41,30 @@ docs = [
"sphinx-rtd-theme>=2.0", "sphinx-rtd-theme>=2.0",
"sphinx-copybutton", "sphinx-copybutton",
] ]
test = [
"pytest>=8.0",
"pytest-cov>=5.0",
"tomli>=2.0; python_version < '3.11'",
]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["."] where = ["."]
include = ["pythonbpf*"] include = ["pythonbpf*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]
python_files = ["test_*.py"]
markers = [
"verifier: requires sudo/root for kernel verifier tests (not run by default)",
"vmlinux: requires vmlinux.py for current kernel",
]
log_cli = false
[tool.coverage.run]
source = ["pythonbpf"]
omit = ["*/vmlinux*", "*/__pycache__/*"]
[tool.coverage.report]
show_missing = true
skip_covered = false

View File

@ -298,15 +298,6 @@ def allocate_temp_pool(builder, max_temps, local_sym_tab):
logger.debug(f"Allocated temp variable: {temp_name}") logger.debug(f"Allocated temp variable: {temp_name}")
def _get_alignment(tmp_type):
"""Return alignment for a given type."""
if isinstance(tmp_type, ir.PointerType):
return 8
elif isinstance(tmp_type, ir.IntType):
return tmp_type.width // 8
return 8
def _allocate_for_name(builder, var_name, rval, local_sym_tab): def _allocate_for_name(builder, var_name, rval, local_sym_tab):
"""Allocate memory for variable-to-variable assignment (b = a).""" """Allocate memory for variable-to-variable assignment (b = a)."""
source_var = rval.id source_var = rval.id
@ -329,16 +320,6 @@ def _allocate_for_name(builder, var_name, rval, local_sym_tab):
) )
def _allocate_with_type(builder, var_name, ir_type):
"""Allocate memory for a variable with a specific type."""
var = builder.alloca(ir_type, name=var_name)
if isinstance(ir_type, ir.IntType):
var.align = ir_type.width // 8
elif isinstance(ir_type, ir.PointerType):
var.align = 8
return var
def _allocate_for_attribute( def _allocate_for_attribute(
builder, var_name, rval, local_sym_tab, compilation_context builder, var_name, rval, local_sym_tab, compilation_context
): ):
@ -477,3 +458,20 @@ def _allocate_for_attribute(
logger.info( logger.info(
f"Pre-allocated {var_name} from {struct_var}.{field_name} with type {alloc_type}" f"Pre-allocated {var_name} from {struct_var}.{field_name} with type {alloc_type}"
) )
def _allocate_with_type(builder, var_name, ir_type):
"""Allocate variable with appropriate alignment for type."""
var = builder.alloca(ir_type, name=var_name)
var.align = _get_alignment(ir_type)
return var
def _get_alignment(ir_type):
"""Get appropriate alignment for IR type."""
if isinstance(ir_type, ir.IntType):
return ir_type.width // 8
elif isinstance(ir_type, ir.ArrayType) and isinstance(ir_type.element, ir.IntType):
return ir_type.element.width // 8
else:
return 8 # Default: pointer size

View File

@ -45,7 +45,6 @@ def handle_struct_field_assignment(
if _is_char_array(field_type) and _is_i8_ptr(val_type): if _is_char_array(field_type) and _is_i8_ptr(val_type):
_copy_string_to_char_array( _copy_string_to_char_array(
func, func,
compilation_context,
builder, builder,
val, val,
field_ptr, field_ptr,
@ -62,7 +61,6 @@ def handle_struct_field_assignment(
def _copy_string_to_char_array( def _copy_string_to_char_array(
func, func,
compilation_context,
builder, builder,
src_ptr, src_ptr,
dst_ptr, dst_ptr,

View File

@ -32,7 +32,7 @@ def populate_global_symbol_table(tree, compilation_context):
return False return False
def emit_global(module: ir.Module, node, name): def _emit_global(module: ir.Module, node, name):
logger.info(f"global identifier {name} processing") logger.info(f"global identifier {name} processing")
# deduce LLVM type from the annotated return # deduce LLVM type from the annotated return
if not isinstance(node.returns, ast.Name): if not isinstance(node.returns, ast.Name):
@ -111,14 +111,14 @@ def globals_processing(tree, compilation_context):
node.body[0].value, (ast.Constant, ast.Name, ast.Call) node.body[0].value, (ast.Constant, ast.Name, ast.Call)
) )
): ):
emit_global(compilation_context.module, node, name) _emit_global(compilation_context.module, node, name)
else: else:
raise SyntaxError(f"ERROR: Invalid syntax for {name} global") raise SyntaxError(f"ERROR: Invalid syntax for {name} global")
return None return None
def emit_llvm_compiler_used(module: ir.Module, names: list[str]): def _emit_llvm_compiler_used(module: ir.Module, names: list[str]):
""" """
Emit the @llvm.compiler.used global given a list of function/global names. Emit the @llvm.compiler.used global given a list of function/global names.
""" """
@ -164,4 +164,4 @@ def globals_list_creation(tree, compilation_context):
elif isinstance(dec, ast.Name) and dec.id == "map": elif isinstance(dec, ast.Name) and dec.id == "map":
collected.append(node.name) collected.append(node.name)
emit_llvm_compiler_used(module, collected) _emit_llvm_compiler_used(module, collected)

View File

@ -41,7 +41,7 @@ def handle_fstring_print(
fmt_parts, fmt_parts,
exprs, exprs,
local_sym_tab, local_sym_tab,
compilation_context.struct_sym_tab, compilation_context.structs_sym_tab,
) )
else: else:
raise NotImplementedError(f"Unsupported f-string value type: {type(value)}") raise NotImplementedError(f"Unsupported f-string value type: {type(value)}")
@ -55,12 +55,7 @@ def handle_fstring_print(
for expr in exprs[:3]: for expr in exprs[:3]:
arg_value = _prepare_expr_args( arg_value = _prepare_expr_args(
expr, expr, func, compilation_context, builder, local_sym_tab
func,
compilation_context.module,
builder,
local_sym_tab,
compilation_context.struct_sym_tab,
) )
args.append(arg_value) args.append(arg_value)
@ -216,19 +211,19 @@ def _create_format_string_global(fmt_str, func, module, builder):
return builder.bitcast(fmt_gvar, ir.PointerType()) return builder.bitcast(fmt_gvar, ir.PointerType())
def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_tab): def _prepare_expr_args(expr, func, compilation_context, builder, local_sym_tab):
"""Evaluate and prepare an expression to use as an arg for bpf_printk.""" """Evaluate and prepare an expression to use as an arg for bpf_printk."""
# Special case: struct field char array needs pointer to first element # Special case: struct field char array needs pointer to first element
if isinstance(expr, ast.Attribute): if isinstance(expr, ast.Attribute):
char_array_ptr, _ = get_char_array_ptr_and_size( char_array_ptr, _ = get_char_array_ptr_and_size(
expr, builder, local_sym_tab, struct_sym_tab, func expr, builder, local_sym_tab, compilation_context, func
) )
if char_array_ptr: if char_array_ptr:
return char_array_ptr return char_array_ptr
# Regular expression evaluation # Regular expression evaluation
val, _ = eval_expr(func, module, builder, expr, local_sym_tab, None, struct_sym_tab) val, _ = eval_expr(func, compilation_context, builder, expr, local_sym_tab)
if not val: if not val:
logger.warning("Failed to evaluate expression for bpf_printk, defaulting to 0") logger.warning("Failed to evaluate expression for bpf_printk, defaulting to 0")

View File

@ -6,9 +6,10 @@ from .map_types import BPFMapType
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
def create_map_debug_info(module, map_global, map_name, map_params, structs_sym_tab): def create_map_debug_info(compilation_context, map_global, map_name, map_params):
"""Generate debug info metadata for BPF maps HASH and PERF_EVENT_ARRAY""" """Generate debug info metadata for BPF maps HASH and PERF_EVENT_ARRAY"""
generator = DebugInfoGenerator(module) generator = DebugInfoGenerator(compilation_context.module)
structs_sym_tab = compilation_context.structs_sym_tab
logger.info(f"Creating debug info for map {map_name} with params {map_params}") logger.info(f"Creating debug info for map {map_name} with params {map_params}")
uint_type = generator.get_uint32_type() uint_type = generator.get_uint32_type()
array_type = generator.create_array_type( array_type = generator.create_array_type(
@ -77,11 +78,9 @@ def create_map_debug_info(module, map_global, map_name, map_params, structs_sym_
# Ideally we should expose a single create_map_debug_info function that handles all map types. # Ideally we should expose a single create_map_debug_info function that handles all map types.
# We can probably use a registry pattern to register different map types and their debug info generators. # We can probably use a registry pattern to register different map types and their debug info generators.
# map_params["type"] will be used to determine which generator to use. # map_params["type"] will be used to determine which generator to use.
def create_ringbuf_debug_info( def create_ringbuf_debug_info(compilation_context, map_global, map_name, map_params):
module, map_global, map_name, map_params, structs_sym_tab
):
"""Generate debug information metadata for BPF RINGBUF map""" """Generate debug information metadata for BPF RINGBUF map"""
generator = DebugInfoGenerator(module) generator = DebugInfoGenerator(compilation_context.module)
int_type = generator.get_int32_type() int_type = generator.get_int32_type()

View File

@ -31,7 +31,7 @@ def is_map(func_node):
) )
def create_bpf_map(module, map_name, map_params): def create_bpf_map(compilation_context, map_name, map_params):
"""Create a BPF map in the module with given parameters and debug info""" """Create a BPF map in the module with given parameters and debug info"""
# Create the anonymous struct type for BPF map # Create the anonymous struct type for BPF map
@ -40,7 +40,9 @@ def create_bpf_map(module, map_name, map_params):
) )
# Create the global variable # Create the global variable
map_global = ir.GlobalVariable(module, map_struct_type, name=map_name) map_global = ir.GlobalVariable(
compilation_context.module, map_struct_type, name=map_name
)
map_global.linkage = "dso_local" map_global.linkage = "dso_local"
map_global.global_constant = False map_global.global_constant = False
map_global.initializer = ir.Constant(map_struct_type, None) map_global.initializer = ir.Constant(map_struct_type, None)
@ -51,11 +53,13 @@ def create_bpf_map(module, map_name, map_params):
return MapSymbol(type=map_params["type"], sym=map_global, params=map_params) return MapSymbol(type=map_params["type"], sym=map_global, params=map_params)
def _parse_map_params(rval, compilation_context, expected_args=None): def _parse_map_params(rval, expected_args=None):
"""Parse map parameters from call arguments and keywords.""" """Parse map parameters from call arguments and keywords."""
params = {} params = {}
handler = compilation_context.vmlinux_handler
# TODO: Replace it with compilation_context.vmlinux_handler someday?
handler = VmlinuxHandlerRegistry.get_handler()
# Parse positional arguments # Parse positional arguments
if expected_args: if expected_args:
for i, arg_name in enumerate(expected_args): for i, arg_name in enumerate(expected_args):
@ -82,14 +86,6 @@ def _parse_map_params(rval, compilation_context, expected_args=None):
def _get_vmlinux_enum(handler, name): def _get_vmlinux_enum(handler, name):
if handler and handler.is_vmlinux_enum(name): if handler and handler.is_vmlinux_enum(name):
return handler.get_vmlinux_enum_value(name) return handler.get_vmlinux_enum_value(name)
# Fallback to VmlinuxHandlerRegistry if handler invalid
# This is for backward compatibility or if refactoring isn't complete
if (
VmlinuxHandlerRegistry.get_handler()
and VmlinuxHandlerRegistry.get_handler().is_vmlinux_enum(name)
):
return VmlinuxHandlerRegistry.get_handler().get_vmlinux_enum_value(name)
return None return None
@ -97,9 +93,7 @@ def _get_vmlinux_enum(handler, name):
def process_ringbuf_map(map_name, rval, compilation_context): def process_ringbuf_map(map_name, rval, compilation_context):
"""Process a BPF_RINGBUF map declaration""" """Process a BPF_RINGBUF map declaration"""
logger.info(f"Processing Ringbuf: {map_name}") logger.info(f"Processing Ringbuf: {map_name}")
map_params = _parse_map_params( map_params = _parse_map_params(rval, expected_args=["max_entries"])
rval, compilation_context, expected_args=["max_entries"]
)
map_params["type"] = BPFMapType.RINGBUF map_params["type"] = BPFMapType.RINGBUF
# NOTE: constraints borrowed from https://docs.ebpf.io/linux/map-type/BPF_MAP_TYPE_RINGBUF/ # NOTE: constraints borrowed from https://docs.ebpf.io/linux/map-type/BPF_MAP_TYPE_RINGBUF/
@ -115,13 +109,12 @@ def process_ringbuf_map(map_name, rval, compilation_context):
logger.info(f"Ringbuf map parameters: {map_params}") logger.info(f"Ringbuf map parameters: {map_params}")
map_global = create_bpf_map(compilation_context.module, map_name, map_params) map_global = create_bpf_map(compilation_context, map_name, map_params)
create_ringbuf_debug_info( create_ringbuf_debug_info(
compilation_context.module, compilation_context,
map_global.sym, map_global.sym,
map_name, map_name,
map_params, map_params,
compilation_context.structs_sym_tab,
) )
return map_global return map_global
@ -130,20 +123,17 @@ def process_ringbuf_map(map_name, rval, compilation_context):
def process_hash_map(map_name, rval, compilation_context): def process_hash_map(map_name, rval, compilation_context):
"""Process a BPF_HASH map declaration""" """Process a BPF_HASH map declaration"""
logger.info(f"Processing HashMap: {map_name}") logger.info(f"Processing HashMap: {map_name}")
map_params = _parse_map_params( map_params = _parse_map_params(rval, expected_args=["key", "value", "max_entries"])
rval, compilation_context, expected_args=["key", "value", "max_entries"]
)
map_params["type"] = BPFMapType.HASH map_params["type"] = BPFMapType.HASH
logger.info(f"Map parameters: {map_params}") logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(compilation_context.module, map_name, map_params) map_global = create_bpf_map(compilation_context, map_name, map_params)
# Generate debug info for BTF # Generate debug info for BTF
create_map_debug_info( create_map_debug_info(
compilation_context.module, compilation_context,
map_global.sym, map_global.sym,
map_name, map_name,
map_params, map_params,
compilation_context.structs_sym_tab,
) )
return map_global return map_global
@ -152,20 +142,17 @@ def process_hash_map(map_name, rval, compilation_context):
def process_perf_event_map(map_name, rval, compilation_context): def process_perf_event_map(map_name, rval, compilation_context):
"""Process a BPF_PERF_EVENT_ARRAY map declaration""" """Process a BPF_PERF_EVENT_ARRAY map declaration"""
logger.info(f"Processing PerfEventArray: {map_name}") logger.info(f"Processing PerfEventArray: {map_name}")
map_params = _parse_map_params( map_params = _parse_map_params(rval, expected_args=["key_size", "value_size"])
rval, compilation_context, expected_args=["key_size", "value_size"]
)
map_params["type"] = BPFMapType.PERF_EVENT_ARRAY map_params["type"] = BPFMapType.PERF_EVENT_ARRAY
logger.info(f"Map parameters: {map_params}") logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(compilation_context.module, map_name, map_params) map_global = create_bpf_map(compilation_context, map_name, map_params)
# Generate debug info for BTF # Generate debug info for BTF
create_map_debug_info( create_map_debug_info(
compilation_context.module, compilation_context,
map_global.sym, map_global.sym,
map_name, map_name,
map_params, map_params,
compilation_context.structs_sym_tab,
) )
return map_global return map_global

116
tests/README.md Normal file
View File

@ -0,0 +1,116 @@
# PythonBPF Test Suite
## Quick start
```bash
# Activate the venv and install test deps (once)
source .venv/bin/activate
uv pip install -e ".[test]"
# Run the full suite (IR + LLC levels, no sudo required)
make test
# Run with coverage report
make test-cov
```
## Test levels
Tests are split into three levels, each in a separate file:
| Level | File | What it checks | Needs sudo? |
|---|---|---|---|
| 1 — IR generation | `test_ir_generation.py` | `compile_to_ir()` completes without exception or `logging.ERROR` | No |
| 2 — LLC compilation | `test_llc_compilation.py` | Level 1 + `llc` produces a non-empty `.o` file | No |
| 3 — Kernel verifier | `test_verifier.py` | `bpftool prog load -d` exits 0 | Yes |
Levels 1 and 2 run together with `make test`. Level 3 is opt-in:
```bash
make test-verifier # requires bpftool and sudo
```
## Running a single test
Tests are parametrized by file path. Use `-k` to filter:
```bash
# By file name
pytest tests/ -v -k "and.py" -m "not verifier"
# By category
pytest tests/ -v -k "conditionals" -m "not verifier"
# One specific level only
pytest tests/test_ir_generation.py -v -k "hash_map.py"
```
## Coverage report
```bash
make test-cov
```
- **Terminal**: shows per-file coverage with missing lines after the test run.
- **HTML**: written to `htmlcov/index.html` — open in a browser for line-by-line detail.
```bash
xdg-open htmlcov/index.html
```
`htmlcov/` and `.coverage` are excluded from git (listed in `.gitignore` if not already).
## Expected failures (`test_config.toml`)
Known-broken tests are declared in `tests/test_config.toml`:
```toml
[xfail]
"failing_tests/my_test.py" = {reason = "...", level = "ir"}
```
- `level = "ir"` — fails during IR generation; both IR and LLC tests are marked xfail.
- `level = "llc"` — IR generates fine but `llc` rejects it; only the LLC test is marked xfail.
All xfails use `strict = True`: if a test starts **passing** it shows up as **XPASS** and is treated as a test failure. This is intentional — it means the bug was fixed and the test should be promoted to `passing_tests/`.
## Adding a new test
1. Create a `.py` file in `tests/passing_tests/<category>/` with the usual `@bpf` decorators and a `compile()` call at the bottom.
2. Run `make test` — the file is discovered and tested automatically at all levels.
3. If the test is expected to fail, add it to `tests/test_config.toml` instead of `passing_tests/`.
## Directory structure
```
tests/
├── README.md ← you are here
├── conftest.py ← pytest config: discovery, xfail/skip injection, fixtures
├── test_config.toml ← expected-failure list
├── test_ir_generation.py ← Level 1
├── test_llc_compilation.py ← Level 2
├── test_verifier.py ← Level 3 (opt-in, sudo)
├── framework/
│ ├── bpf_test_case.py ← BpfTestCase dataclass
│ ├── collector.py ← discovers test files, reads test_config.toml
│ ├── compiler.py ← wrappers around compile_to_ir() + _run_llc()
│ └── verifier.py ← bpftool subprocess wrapper
├── passing_tests/ ← programs that should compile and verify cleanly
└── failing_tests/ ← programs with known issues (declared in test_config.toml)
```
## Known regressions (as of compilation-context PR merge)
Three tests in `passing_tests/` currently fail — these are bugs to fix, not xfails:
| Test | Error |
|---|---|
| `passing_tests/assign/comprehensive.py` | `TypeError: cannot store i64* to i64*` |
| `passing_tests/helpers/bpf_probe_read.py` | `ValueError: 'ctx' not in local symbol table` |
| `passing_tests/vmlinux/register_state_dump.py` | `KeyError: 'cs'` |
Nine tests in `failing_tests/` were fixed by the compilation-context PR (they show as XPASS). They can be moved to `passing_tests/` when convenient:
`assign/retype.py`, `conditionals/helper_cond.py`, `conditionals/oneline.py`,
`direct_assign.py`, `globals.py`, `if.py`, `license.py` (IR only), `named_arg.py`,
`xdp/xdp_test_1.py`

0
tests/__init__.py Normal file
View File

103
tests/conftest.py Normal file
View File

@ -0,0 +1,103 @@
"""
pytest configuration for the PythonBPF test suite.
Test discovery:
All .py files under tests/passing_tests/ and tests/failing_tests/ are
collected as parametrized BPF test cases.
Markers applied automatically from test_config.toml:
- xfail (strict=True): failing_tests/ entries that are expected to fail
- skip: vmlinux tests when vmlinux.py is not importable
Run the suite:
pytest tests/ -v -m "not verifier" # IR + LLC only (no sudo)
pytest tests/ -v --cov=pythonbpf # with coverage
pytest tests/test_verifier.py -m verifier # kernel verifier (sudo required)
"""
import logging
import pytest
from tests.framework.collector import collect_all_test_files
# ── vmlinux availability ────────────────────────────────────────────────────
try:
import vmlinux # noqa: F401
VMLINUX_AVAILABLE = True
except ImportError:
VMLINUX_AVAILABLE = False
# ── shared fixture: collected test cases ───────────────────────────────────
def _all_cases():
return collect_all_test_files()
# ── pytest_generate_tests: parametrize on bpf_test_file ───────────────────
def pytest_generate_tests(metafunc):
if "bpf_test_file" in metafunc.fixturenames:
cases = _all_cases()
metafunc.parametrize(
"bpf_test_file",
[c.path for c in cases],
ids=[c.rel_path for c in cases],
)
# ── pytest_collection_modifyitems: apply xfail / skip markers ─────────────
def pytest_collection_modifyitems(items):
case_map = {c.rel_path: c for c in _all_cases()}
for item in items:
# Resolve the test case from the parametrize ID embedded in the node id.
# Node id format: tests/test_foo.py::test_bar[passing_tests/helpers/pid.py]
case = None
for bracket in (item.callspec.id,) if hasattr(item, "callspec") else ():
case = case_map.get(bracket)
break
if case is None:
continue
# vmlinux skip
if case.needs_vmlinux and not VMLINUX_AVAILABLE:
item.add_marker(
pytest.mark.skip(reason="vmlinux.py not available for current kernel")
)
continue
# xfail (strict: XPASS counts as a test failure, alerting us to fixed bugs)
if case.is_expected_fail:
# Level "ir" → fails at IR generation: xfail both IR and LLC tests
# Level "llc" → IR succeeds but LLC fails: only xfail the LLC test
is_llc_test = item.nodeid.startswith("tests/test_llc_compilation.py")
apply_xfail = (case.xfail_level == "ir") or (
case.xfail_level == "llc" and is_llc_test
)
if apply_xfail:
item.add_marker(
pytest.mark.xfail(
reason=case.xfail_reason,
strict=True,
raises=Exception,
)
)
# ── caplog level fixture: capture ERROR+ from pythonbpf ───────────────────
@pytest.fixture(autouse=True)
def set_log_level(caplog):
with caplog.at_level(logging.ERROR, logger="pythonbpf"):
yield

View File

View File

@ -0,0 +1,17 @@
from dataclasses import dataclass
from pathlib import Path
@dataclass
class BpfTestCase:
path: Path
rel_path: str
is_expected_fail: bool = False
xfail_reason: str = ""
xfail_level: str = "ir" # "ir" or "llc"
needs_vmlinux: bool = False
skip_reason: str = ""
@property
def test_id(self) -> str:
return self.rel_path.replace("/", "::")

View File

@ -0,0 +1,60 @@
import sys
from pathlib import Path
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib
from .bpf_test_case import BpfTestCase
TESTS_DIR = Path(__file__).parent.parent
CONFIG_FILE = TESTS_DIR / "test_config.toml"
VMLINUX_TEST_DIRS = {"passing_tests/vmlinux"}
VMLINUX_TEST_PREFIXES = {
"failing_tests/vmlinux",
"failing_tests/xdp",
}
def _is_vmlinux_test(rel_path: str) -> bool:
for prefix in VMLINUX_TEST_DIRS | VMLINUX_TEST_PREFIXES:
if rel_path.startswith(prefix):
return True
return False
def _load_config() -> dict:
if not CONFIG_FILE.exists():
return {}
with open(CONFIG_FILE, "rb") as f:
return tomllib.load(f)
def collect_all_test_files() -> list[BpfTestCase]:
config = _load_config()
xfail_map: dict = config.get("xfail", {})
cases = []
for subdir in ("passing_tests", "failing_tests"):
for py_file in sorted((TESTS_DIR / subdir).rglob("*.py")):
rel = str(py_file.relative_to(TESTS_DIR))
needs_vmlinux = _is_vmlinux_test(rel)
xfail_entry = xfail_map.get(rel)
is_expected_fail = xfail_entry is not None
xfail_reason = xfail_entry.get("reason", "") if xfail_entry else ""
xfail_level = xfail_entry.get("level", "ir") if xfail_entry else "ir"
cases.append(
BpfTestCase(
path=py_file,
rel_path=rel,
is_expected_fail=is_expected_fail,
xfail_reason=xfail_reason,
xfail_level=xfail_level,
needs_vmlinux=needs_vmlinux,
)
)
return cases

View File

@ -0,0 +1,23 @@
import logging
from pathlib import Path
from pythonbpf.codegen import compile_to_ir, _run_llc
def run_ir_generation(test_path: Path, output_ll: Path):
"""Run compile_to_ir on a BPF test file.
Returns the (output, structs_sym_tab, maps_sym_tab) tuple from compile_to_ir.
Raises on exception. Any logging.ERROR records captured by pytest caplog
indicate a compile failure even when no exception is raised.
"""
return compile_to_ir(str(test_path), str(output_ll), loglevel=logging.WARNING)
def run_llc(ll_path: Path, obj_path: Path) -> bool:
"""Compile a .ll file to a BPF .o using llc.
Raises subprocess.CalledProcessError on failure (llc uses check=True).
Returns True on success.
"""
return _run_llc(str(ll_path), str(obj_path))

View File

@ -0,0 +1,25 @@
import subprocess
import uuid
from pathlib import Path
def verify_object(obj_path: Path) -> tuple[bool, str]:
"""Run bpftool prog load -d to verify a BPF object file against the kernel verifier.
Pins the program temporarily at /sys/fs/bpf/bpf_prog_test_<uuid>, then removes it.
Returns (success, combined_output). Requires sudo / root.
"""
pin_path = f"/sys/fs/bpf/bpf_prog_test_{uuid.uuid4().hex[:8]}"
try:
result = subprocess.run(
["sudo", "bpftool", "prog", "load", "-d", str(obj_path), pin_path],
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout + result.stderr
return result.returncode == 0, output
except subprocess.TimeoutExpired:
return False, "bpftool timed out after 30s"
finally:
subprocess.run(["sudo", "rm", "-f", pin_path], check=False, capture_output=True)

33
tests/test_config.toml Normal file
View File

@ -0,0 +1,33 @@
# test_config.toml
#
# [xfail] — tests expected to fail.
# key = path relative to tests/
# value = {reason = "...", level = "ir" | "llc"}
# level "ir" = fails during pythonbpf IR generation (exception or ERROR log)
# level "llc" = IR generates but llc rejects it
#
# Tests removed from this list because they now pass (fixed by compilation-context PR):
# failing_tests/assign/retype.py
# failing_tests/conditionals/helper_cond.py
# failing_tests/conditionals/oneline.py
# failing_tests/direct_assign.py
# failing_tests/globals.py
# failing_tests/if.py
# failing_tests/license.py
# failing_tests/named_arg.py
# failing_tests/xdp/xdp_test_1.py
# These files can be moved to passing_tests/ when convenient.
[xfail]
"failing_tests/conditionals/struct_ptr.py" = {reason = "Struct pointer used directly as boolean condition not supported", level = "ir"}
"failing_tests/license.py" = {reason = "Missing LICENSE global produces IR that llc rejects — should be caught earlier with a clear error message", level = "llc"}
"failing_tests/undeclared_values.py" = {reason = "Undeclared variable used in f-string — should raise SyntaxError (correct behaviour, test documents it)", level = "ir"}
"failing_tests/vmlinux/args_test.py" = {reason = "struct_trace_event_raw_sys_enter args field access not supported", level = "ir"}
"failing_tests/vmlinux/assignment_handling.py" = {reason = "Assigning vmlinux enum value (XDP_PASS) to a local variable not yet supported", level = "ir"}
"failing_tests/xdp_pass.py" = {reason = "XDP program using vmlinux structs (struct_xdp_md) and complex map/struct interaction not yet supported", level = "ir"}

View File

@ -0,0 +1,29 @@
"""
Level 1 — IR Generation tests.
For every BPF test file, calls compile_to_ir() and asserts:
1. No exception is raised by the pythonbpf compiler.
2. No logging.ERROR records are emitted during compilation.
3. A .ll file is produced.
Tests in failing_tests/ are marked xfail (strict=True) by conftest.py —
they must raise an exception or produce an ERROR log to pass the suite.
"""
import logging
from pathlib import Path
from tests.framework.compiler import run_ir_generation
def test_ir_generation(bpf_test_file: Path, tmp_path, caplog):
ll_path = tmp_path / "output.ll"
run_ir_generation(bpf_test_file, ll_path)
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
assert not error_records, "IR generation produced ERROR log(s):\n" + "\n".join(
f" [{r.name}] {r.getMessage()}" for r in error_records
)
assert ll_path.exists(), "compile_to_ir() returned without writing a .ll file"

View File

@ -0,0 +1,32 @@
"""
Level 2 — LLC compilation tests.
For every BPF test file, runs the full compile_to_ir() + _run_llc() pipeline
and asserts a non-empty .o file is produced.
Tests in failing_tests/ are marked xfail (strict=True) by conftest.py.
"""
import logging
from pathlib import Path
from tests.framework.compiler import run_ir_generation, run_llc
def test_llc_compilation(bpf_test_file: Path, tmp_path, caplog):
ll_path = tmp_path / "output.ll"
obj_path = tmp_path / "output.o"
run_ir_generation(bpf_test_file, ll_path)
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
assert not error_records, "IR generation produced ERROR log(s):\n" + "\n".join(
f" [{r.name}] {r.getMessage()}" for r in error_records
)
run_llc(ll_path, obj_path)
assert obj_path.exists() and obj_path.stat().st_size > 0, (
"llc did not produce a non-empty .o file"
)

65
tests/test_verifier.py Normal file
View File

@ -0,0 +1,65 @@
"""
Level 3 — Kernel verifier tests.
For every passing BPF test file, compiles to a .o and runs:
sudo bpftool prog load -d <file.o> /sys/fs/bpf/bpf_prog_test_<id>
These tests are opt-in: they require sudo and kernel access, and are gated
behind the `verifier` pytest mark. Run with:
pytest tests/test_verifier.py -m verifier -v
Note: uses the venv Python binary for any in-process calls, but bpftool
itself is invoked via subprocess with sudo. Ensure bpftool is installed
and the user can sudo.
"""
import logging
from pathlib import Path
import pytest
from tests.framework.collector import collect_all_test_files
from tests.framework.compiler import run_ir_generation, run_llc
from tests.framework.verifier import verify_object
def _passing_test_files():
return [
c.path
for c in collect_all_test_files()
if not c.is_expected_fail and not c.needs_vmlinux
]
def _passing_test_ids():
return [
c.rel_path
for c in collect_all_test_files()
if not c.is_expected_fail and not c.needs_vmlinux
]
@pytest.mark.verifier
@pytest.mark.parametrize(
"verifier_test_file",
_passing_test_files(),
ids=_passing_test_ids(),
)
def test_kernel_verifier(verifier_test_file: Path, tmp_path, caplog):
"""Compile the BPF test and verify it passes the kernel verifier."""
ll_path = tmp_path / "output.ll"
obj_path = tmp_path / "output.o"
run_ir_generation(verifier_test_file, ll_path)
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
assert not error_records, "IR generation produced ERROR log(s):\n" + "\n".join(
f" [{r.name}] {r.getMessage()}" for r in error_records
)
run_llc(ll_path, obj_path)
assert obj_path.exists() and obj_path.stat().st_size > 0
ok, output = verify_object(obj_path)
assert ok, f"Kernel verifier rejected {verifier_test_file.name}:\n{output}"