Compare commits

...

107 Commits

Author SHA1 Message Date
6881d2e960 Complete documentation coverage: add final module docstrings
Co-authored-by: varun-r-mallya <100590632+varun-r-mallya@users.noreply.github.com>
2025-10-08 17:30:03 +00:00
d9dfb61000 Add remaining docstrings to complete documentation coverage
Co-authored-by: varun-r-mallya <100590632+varun-r-mallya@users.noreply.github.com>
2025-10-08 17:25:29 +00:00
cdf4f3e885 Add module-level docstrings and helper utility docstrings
Co-authored-by: varun-r-mallya <100590632+varun-r-mallya@users.noreply.github.com>
2025-10-08 17:20:45 +00:00
5b20b08d9f Add docstrings to core modules and helper functions
Co-authored-by: varun-r-mallya <100590632+varun-r-mallya@users.noreply.github.com>
2025-10-08 17:16:05 +00:00
9f103c34a0 Initial plan 2025-10-08 17:04:52 +00:00
6402cf7be5 remove todos and move to projects on github. 2025-10-08 22:27:51 +05:30
9a96e1247b Merge pull request #29 from pythonbpf/smol_pp
add patch for Kernel 6.14 BTF in transpiler
2025-10-08 21:47:49 +05:30
989134f4be add patch for Kernel 6.14 BTF
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 21:47:02 +05:30
120aec08da Update TODO.md 2025-10-08 21:40:14 +05:30
e66ae7cc89 Add failing oneline IfExpr conditional test 2025-10-08 21:36:08 +05:30
32dc8e6636 Merge pull request #21 from pythonbpf/globals
Adds support for globals
SO......
*I'm not merging this because it's complete, but because I don't want it to diverge from master too much.
*Stuff I still need to complete:
-> Structs and eval expressions in these globals.
-> handling the global keyword.
-> assigning back to the global and reading from inside a function.
-> Basically, `global` keyword in Python is used to write only and reading can be done directly without declaring as global as a direct assign without global declaration is going to diverge from Python.
-> The above logic is going to be supported by `global_sym_tab` generated using the new order of passes that we are doing.
-> This needs to be fixed and done ASAP to avoid conflicts. so yes, im  gonna do it soon.
2025-10-08 14:48:37 +05:30
8485460374 Merge pull request #26 from pythonbpf/refactor_conds
Refactor conds
2025-10-08 07:28:08 +05:30
9fdc6fa3ed Add compile to tests/failing_tests/conditionals/helper_cond.py 2025-10-08 07:26:41 +05:30
17004d58df Remove completed short term goal from TODO.md 2025-10-08 07:25:14 +05:30
6362a5e665 Fix expr imports 2025-10-08 07:24:14 +05:30
d38d73d5c6 Move handle_comparator to type_normalization 2025-10-08 07:20:04 +05:30
0a6571726a Move convert_to_bool to type_normalization 2025-10-08 07:14:42 +05:30
e62557bd1d Seperate type_normalization from expr_pass 2025-10-08 06:59:32 +05:30
ee90ee9392 Fix type_deducer import in expr 2025-10-08 06:50:53 +05:30
5f9eaff59c Fix expr imports 2025-10-08 06:49:34 +05:30
b86341ce7a Rework dir structure for expr 2025-10-08 06:45:52 +05:30
4857739eec cleanup handle_cond in functions_pass 2025-10-08 06:42:34 +05:30
3bb4b099c1 Add passing and.py test for conditionals 2025-10-08 06:28:03 +05:30
e7912a088f Add passing or.py test for conditionals 2025-10-08 06:27:18 +05:30
95d63d969e Add _handle_or_or in expr_pass 2025-10-08 06:24:57 +05:30
1f96bab944 Add _handle_and_op in expr_pass 2025-10-08 06:24:13 +05:30
f98491f3bd Add handle_and and handle_or handling stub in eval_expr 2025-10-08 06:14:32 +05:30
98f262ae22 Add BoolOp handling stub in eval_expr 2025-10-08 06:11:59 +05:30
d2ff53052c Add support for is and is not keywords 2025-10-08 06:04:29 +05:30
ecac24c1d2 Add explanation notes to failing conditionals tests 2025-10-08 05:57:17 +05:30
a764b095f8 Add helper_cond failing test for conditionals 2025-10-08 05:54:49 +05:30
95a196a91f Move map_comp test to passing 2025-10-08 05:53:52 +05:30
6b59980874 Add null checks for pointer derefs to avoid map_value_or_null verifier errors 2025-10-08 05:53:12 +05:30
0c977514af Add TODO for fixing struct_kioctx issue 2025-10-08 05:34:25 +05:30
1207730ce3 update vmlinux.py 2025-10-08 05:27:56 +05:30
0d9dcd122c Merge pull request #27 from pythonbpf/vmlinux
Add vmlinux transpiler from experiments
2025-10-08 05:19:44 +05:30
8a69e05ee2 fix duplicate variable in example
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 05:18:49 +05:30
976af290af Revert "format chore"
This reverts commit a3443ab1d5.
2025-10-08 05:17:59 +05:30
a3443ab1d5 format chore
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 05:16:36 +05:30
a27360482b complete vmlinux transpiler.
TODO: struct_kioctx for x86_64 vmlinux.h has anonymous structs that refused to transpile well, so an extra rule has been written to make only the structs of that external. Fix this in the future.
2025-10-08 05:15:29 +05:30
3f9604a370 Add _deref_to_depth in expr_pass 2025-10-08 03:12:17 +05:30
480afd1341 Move _get_base_type to _get_base_type_and_depth 2025-10-08 03:02:31 +05:30
ab71275566 Add _get_base_type to expr_pass 2025-10-08 03:00:52 +05:30
2d850f457f Add _normalize_types to handle mismatched ints, move type_mismatch test to passing 2025-10-08 02:22:41 +05:30
c423cc647d add vmlinux.py transpiler from experiment repository
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 00:45:30 +05:30
9e1142bf05 Add type_mismatch failing test for conditionals 2025-10-07 14:02:09 +05:30
1843ca6c53 Add failing struct_ptr test for conditionals 2025-10-07 13:42:58 +05:30
caa5d92c32 Fix struct_access in eval_expr, move struct_access conditional test to passing 2025-10-07 13:35:31 +05:30
f41693bc6d Add 'and' and 'or' BoolOps as future deliverables 2025-10-07 05:27:31 +05:30
b7092fa362 Add failing test map_comp for conditionals 2025-10-07 05:20:43 +05:30
0e7dcafbab Add var_comp test for conditionals 2025-10-07 05:02:26 +05:30
a574527891 Add support for unary op 'not' in eval_expr, move not test to passing 2025-10-07 04:49:45 +05:30
176673017c Add failing tests struct and not for conditionals 2025-10-07 04:17:26 +05:30
1d6226d829 Add map test to conditionals 2025-10-07 04:06:16 +05:30
12b712c217 Add var_binop test for conditionals 2025-10-07 03:43:36 +05:30
2de280915a Add var test for conditionals 2025-10-07 03:37:13 +05:30
1cce49f5e0 Add const_binop test for conditionals 2025-10-07 03:24:11 +05:30
682a7e6566 Add const_int test for conditionals 2025-10-07 03:15:34 +05:30
fb63dbd698 Move conditional logic to eval_expr, add _conver_to_bool, add passing bool test 2025-10-07 03:11:23 +05:30
4f433d00cc Add Boolean return support 2025-10-06 23:04:45 +05:30
6cf5115ea9 Eval LHS and RHS in _handle_compare 2025-10-06 22:38:43 +05:30
f11a43010d Add _handle_cond to expr_pass 2025-10-06 22:33:03 +05:30
d1055e4d41 Reduce a condition from handle_cond 2025-10-06 22:20:54 +05:30
8554688230 Merge pull request #25 from pythonbpf/dependabot/github_actions/actions-6a14be197d
Bump the actions group with 2 updates
2025-10-06 19:32:01 +05:30
3e873f378e Bump the actions group with 2 updates
Bumps the actions group with 2 updates: [actions/checkout](https://github.com/actions/checkout) and [actions/setup-python](https://github.com/actions/setup-python).


Updates `actions/checkout` from 4 to 5
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

Updates `actions/setup-python` from 5 to 6
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
- dependency-name: actions/setup-python
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-06 11:26:23 +00:00
28ce14ce34 Merge pull request #24 from pythonbpf/func_refactor
Refactor handle_return
2025-10-06 05:12:20 +05:30
5066cd4cfe Use named args for eval_expr call in handle_return 2025-10-06 05:11:33 +05:30
0bfb3855b6 Remove dead code from _handle_ctypes_call 2025-10-06 05:10:22 +05:30
2f0dd20f1e Add false case for _handle_xdp in return_utils 2025-10-06 05:09:03 +05:30
abef68c274 Remove redundant TODO from return_utils 2025-10-06 05:04:06 +05:30
9aff614ff5 Remove unnecessary parts from return_utils 2025-10-06 05:02:02 +05:30
7b0e8a2fca Add xdp example for passing return type 2025-10-06 04:59:20 +05:30
3e68d6df4f Add passing test examples for return statements 2025-10-06 04:57:04 +05:30
b75dc82f90 Remove clutter from handle_return 2025-10-06 04:44:55 +05:30
f53ca3bd5b Add ctypes in eval_expr 2025-10-06 04:43:04 +05:30
02885af1ca Add binops to eval_expr 2025-10-06 03:36:44 +05:30
e6e2a69506 Add _is_xdp_name 2025-10-06 03:02:08 +05:30
e4e92710c0 Move XDP pass above general return handling 2025-10-06 02:58:57 +05:30
f08bc9976c Add _handle_wrapped_return 2025-10-06 02:22:43 +05:30
23183da2e1 Add _handle_variable_return 2025-10-06 00:05:23 +05:30
c6fef1693e Add _handle_binop_return 2025-10-06 00:03:34 +05:30
192e03aa98 Add _handle_typed_constant_return 2025-10-05 23:59:04 +05:30
6f02b61527 Add _handle_xdp_return 2025-10-05 23:54:06 +05:30
a21ff5633c Add _handle_none_return 2025-10-05 23:44:46 +05:30
f96a6b94dc Remove useless args from handle_return 2025-10-05 23:40:48 +05:30
e9f3aa25d2 Make handle_return (crude for now) 2025-10-05 23:19:06 +05:30
d0a8e96b70 Use getitem dunder for StatementHandlerRegistry 2025-10-05 20:10:07 +05:30
b09dc815fc Add StatementHandlerRegistry 2025-10-05 15:19:16 +05:30
ceaac78633 Janitorial: fix lint 2025-10-05 15:12:01 +05:30
dc7a127fa6 Restructure dir for functions 2025-10-05 15:09:39 +05:30
552cd352f2 Merge pull request #20 from pythonbpf/fix-failing-tests
Fix failing tests in tests/
2025-10-05 14:04:14 +05:30
c7f2955ee9 Fix typo in process_stmt 2025-10-05 14:03:19 +05:30
ef36ea1e03 Add nullcheck for var_name in handle_binary_ops 2025-10-05 14:02:08 +05:30
d341cb24c0 Update explanation for named_arg 2025-10-05 04:27:37 +05:30
2fabb67942 Add note for faling test named_arg 2025-10-05 03:15:17 +05:30
a0b0ad370e Merge pull request #23 from pythonbpf/formatter
update formatter and pre-commit
2025-10-05 01:15:01 +05:30
283b947fc5 Add named_arg failing test 2025-10-04 19:50:33 +05:30
bf78ac21fe Remove 'Static Typing' from short term tasks 2025-10-04 07:30:11 +05:30
ac49cd8b1c Fix hashmap access in direct_assign.py 2025-10-04 02:14:33 +05:30
af44bd063c Add explanation for direct_assign.py failing test 2025-10-04 02:13:46 +05:30
1239d1c35f Fix handle_binary_ops calls in functions_pass 2025-10-04 02:09:11 +05:30
f41a9ccf26 Remove unnecessary args from binary_ops 2025-10-04 02:07:31 +05:30
be05b5d102 Allow local symbols to be used within return 2025-10-03 19:50:56 +05:30
3f061750cf fix return value error 2025-10-03 19:11:11 +05:30
6d5d6345e2 Add var_rval failing test 2025-10-03 18:01:15 +05:30
6fea580693 Fix t/f/return.py, tweak handle_binary_ops 2025-10-03 17:56:21 +05:30
86b9ec56d7 update formatter and pre-commit
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-02 22:43:05 +05:30
68 changed files with 235420 additions and 188030 deletions

View File

@ -12,8 +12,8 @@ jobs:
name: Format name: Format
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v5
- uses: actions/setup-python@v5 - uses: actions/setup-python@v6
with: with:
python-version: "3.x" python-version: "3.x"
- uses: pre-commit/action@v3.0.1 - uses: pre-commit/action@v3.0.1

View File

@ -21,7 +21,7 @@ ci:
repos: repos:
# Standard hooks # Standard hooks
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0 rev: v6.0.0
hooks: hooks:
- id: check-added-large-files - id: check-added-large-files
- id: check-case-conflict - id: check-case-conflict
@ -36,7 +36,7 @@ repos:
- id: trailing-whitespace - id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.4.2" rev: "v0.13.2"
hooks: hooks:
- id: ruff - id: ruff
args: ["--fix", "--show-fixes"] args: ["--fix", "--show-fixes"]
@ -45,7 +45,7 @@ repos:
# Checking static types # Checking static types
- repo: https://github.com/pre-commit/mirrors-mypy - repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.10.0" rev: "v1.18.2"
hooks: hooks:
- id: mypy - id: mypy
exclude: ^(tests)|^(examples) exclude: ^(tests)|^(examples)

13
TODO.md
View File

@ -1,13 +0,0 @@
## Short term
- Implement enough functionality to port the BCC tutorial examples in PythonBPF
- Static Typing
- Add all maps
- XDP support in pylibbpf
- ringbuf support
- recursive expression resolution
## Long term
- Refactor the codebase to be better than a hackathon project
- Port to C++ and use actual LLVM?

View File

@ -12,7 +12,7 @@
"from pythonbpf import bpf, map, section, bpfglobal, BPF\n", "from pythonbpf import bpf, map, section, bpfglobal, BPF\n",
"from pythonbpf.helper import pid\n", "from pythonbpf.helper import pid\n",
"from pythonbpf.maps import HashMap\n", "from pythonbpf.maps import HashMap\n",
"from pylibbpf import *\n", "from pylibbpf import BpfMap\n",
"from ctypes import c_void_p, c_int64, c_uint64, c_int32\n", "from ctypes import c_void_p, c_int64, c_uint64, c_int32\n",
"import matplotlib.pyplot as plt" "import matplotlib.pyplot as plt"
] ]

27
examples/kprobes.py Normal file
View File

@ -0,0 +1,27 @@
from pythonbpf import bpf, section, bpfglobal, BPF
from ctypes import c_void_p, c_int64
@bpf
@section("kretprobe/do_unlinkat")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return c_int64(0)
@bpf
@section("kprobe/do_unlinkat")
def hello_world2(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
b = BPF()
b.load_and_attach()
while True:
print("running")
# Now cat /sys/kernel/debug/tracing/trace_pipe to see results of unlink kprobe.

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,10 @@
"""
PythonBPF - A Python frontend for eBPF programs.
This package provides decorators and compilation tools to write BPF programs
in Python syntax and compile them to eBPF bytecode that can run in the kernel.
"""
from .decorators import bpf, map, section, bpfglobal, struct from .decorators import bpf, map, section, bpfglobal, struct
from .codegen import compile_to_ir, compile, BPF from .codegen import compile_to_ir, compile, BPF

View File

@ -1,3 +1,10 @@
"""
Binary operations handling for BPF programs.
This module provides functions to handle binary operations (add, subtract,
multiply, etc.) and emit the corresponding LLVM IR instructions.
"""
import ast import ast
from llvmlite import ir from llvmlite import ir
from logging import Logger from logging import Logger
@ -9,6 +16,7 @@ logger: Logger = logging.getLogger(__name__)
def recursive_dereferencer(var, builder): def recursive_dereferencer(var, builder):
"""dereference until primitive type comes out""" """dereference until primitive type comes out"""
# TODO: Not worrying about stack overflow for now # TODO: Not worrying about stack overflow for now
logger.info(f"Dereferencing {var}, type is {var.type}")
if isinstance(var.type, ir.PointerType): if isinstance(var.type, ir.PointerType):
a = builder.load(var) a = builder.load(var)
return recursive_dereferencer(a, builder) return recursive_dereferencer(a, builder)
@ -18,7 +26,7 @@ def recursive_dereferencer(var, builder):
raise TypeError(f"Unsupported type for dereferencing: {var.type}") raise TypeError(f"Unsupported type for dereferencing: {var.type}")
def get_operand_value(operand, module, builder, local_sym_tab): def get_operand_value(operand, builder, local_sym_tab):
"""Extract the value from an operand, handling variables and constants.""" """Extract the value from an operand, handling variables and constants."""
if isinstance(operand, ast.Name): if isinstance(operand, ast.Name):
if operand.id in local_sym_tab: if operand.id in local_sym_tab:
@ -29,14 +37,25 @@ def get_operand_value(operand, module, builder, local_sym_tab):
return ir.Constant(ir.IntType(64), operand.value) return ir.Constant(ir.IntType(64), operand.value)
raise TypeError(f"Unsupported constant type: {type(operand.value)}") raise TypeError(f"Unsupported constant type: {type(operand.value)}")
elif isinstance(operand, ast.BinOp): elif isinstance(operand, ast.BinOp):
return handle_binary_op_impl(operand, module, builder, local_sym_tab) return handle_binary_op_impl(operand, builder, local_sym_tab)
raise TypeError(f"Unsupported operand type: {type(operand)}") raise TypeError(f"Unsupported operand type: {type(operand)}")
def handle_binary_op_impl(rval, module, builder, local_sym_tab): def handle_binary_op_impl(rval, builder, local_sym_tab):
"""
Handle binary operations and emit corresponding LLVM IR instructions.
Args:
rval: The AST BinOp node representing the binary operation
builder: LLVM IR builder for emitting instructions
local_sym_tab: Symbol table mapping variable names to their IR representations
Returns:
The LLVM IR value representing the result of the binary operation
"""
op = rval.op op = rval.op
left = get_operand_value(rval.left, module, builder, local_sym_tab) left = get_operand_value(rval.left, builder, local_sym_tab)
right = get_operand_value(rval.right, module, builder, local_sym_tab) right = get_operand_value(rval.right, builder, local_sym_tab)
logger.info(f"left is {left}, right is {right}, op is {op}") logger.info(f"left is {left}, right is {right}, op is {op}")
# Map AST operation nodes to LLVM IR builder methods # Map AST operation nodes to LLVM IR builder methods
@ -61,6 +80,23 @@ def handle_binary_op_impl(rval, module, builder, local_sym_tab):
raise SyntaxError("Unsupported binary operation") raise SyntaxError("Unsupported binary operation")
def handle_binary_op(rval, module, builder, var_name, local_sym_tab): def handle_binary_op(rval, builder, var_name, local_sym_tab):
result = handle_binary_op_impl(rval, module, builder, local_sym_tab) """
builder.store(result, local_sym_tab[var_name].var) Handle binary operations and optionally store the result to a variable.
Args:
rval: The AST BinOp node representing the binary operation
builder: LLVM IR builder for emitting instructions
var_name: Optional variable name to store the result
local_sym_tab: Symbol table mapping variable names to their IR representations
Returns:
A tuple of (result_value, result_type)
"""
result = handle_binary_op_impl(rval, builder, local_sym_tab)
if var_name and var_name in local_sym_tab:
logger.info(
f"Storing result {result} into variable {local_sym_tab[var_name].var}"
)
builder.store(result, local_sym_tab[var_name].var)
return result, result.type

View File

@ -1,7 +1,15 @@
"""
Code generation module for PythonBPF.
This module handles the conversion of Python BPF programs to LLVM IR and
object files. It provides the main compilation pipeline from Python AST
to BPF bytecode.
"""
import ast import ast
from llvmlite import ir from llvmlite import ir
from .license_pass import license_processing from .license_pass import license_processing
from .functions_pass import func_proc from .functions import func_proc
from .maps import maps_proc from .maps import maps_proc
from .structs import structs_proc from .structs import structs_proc
from .globals_pass import ( from .globals_pass import (
@ -37,6 +45,14 @@ def find_bpf_chunks(tree):
def processor(source_code, filename, module): def processor(source_code, filename, module):
"""
Process Python source code and convert BPF-decorated functions to LLVM IR.
Args:
source_code: The Python source code to process
filename: The name of the source file
module: The LLVM IR module to populate
"""
tree = ast.parse(source_code, filename) tree = ast.parse(source_code, filename)
logger.debug(ast.dump(tree, indent=4)) logger.debug(ast.dump(tree, indent=4))
@ -55,7 +71,18 @@ def processor(source_code, filename, module):
globals_list_creation(tree, module) globals_list_creation(tree, module)
def compile_to_ir(filename: str, output: str, loglevel=logging.WARNING): def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
"""
Compile a Python BPF program to LLVM IR.
Args:
filename: Path to the Python source file containing BPF programs
output: Path where the LLVM IR (.ll) file will be written
loglevel: Logging level for compilation messages
Returns:
Path to the generated LLVM IR file
"""
logging.basicConfig( logging.basicConfig(
level=loglevel, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" level=loglevel, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
) )
@ -128,7 +155,19 @@ def compile_to_ir(filename: str, output: str, loglevel=logging.WARNING):
return output return output
def compile(loglevel=logging.WARNING) -> bool: def compile(loglevel=logging.INFO) -> bool:
"""
Compile the calling Python BPF program to an object file.
This function should be called from a Python file containing BPF programs.
It will compile the calling file to LLVM IR and then to a BPF object file.
Args:
loglevel: Logging level for compilation messages
Returns:
True if compilation succeeded, False otherwise
"""
# Look one level up the stack to the caller of this function # Look one level up the stack to the caller of this function
caller_frame = inspect.stack()[1] caller_frame = inspect.stack()[1]
caller_file = Path(caller_frame.filename).resolve() caller_file = Path(caller_frame.filename).resolve()
@ -161,7 +200,19 @@ def compile(loglevel=logging.WARNING) -> bool:
return success return success
def BPF(loglevel=logging.WARNING) -> BpfProgram: def BPF(loglevel=logging.INFO) -> BpfProgram:
"""
Compile the calling Python BPF program and return a BpfProgram object.
This function compiles the calling file's BPF programs to an object file
and loads it into a BpfProgram object for immediate use.
Args:
loglevel: Logging level for compilation messages
Returns:
A BpfProgram object that can be used to load and attach BPF programs
"""
caller_frame = inspect.stack()[1] caller_frame = inspect.stack()[1]
src = inspect.getsource(caller_frame.frame) src = inspect.getsource(caller_frame.frame)
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(

View File

@ -1,3 +1,5 @@
"""Debug information generation for BPF programs (DWARF/BTF)."""
from .dwarf_constants import * # noqa: F403 from .dwarf_constants import * # noqa: F403
from .dtypes import * # noqa: F403 from .dtypes import * # noqa: F403
from .debug_info_generator import DebugInfoGenerator from .debug_info_generator import DebugInfoGenerator

View File

@ -8,11 +8,31 @@ from typing import Any, List
class DebugInfoGenerator: class DebugInfoGenerator:
"""
Generator for DWARF/BTF debug information in LLVM IR modules.
This class provides methods to create debug metadata for BPF programs,
including types, structs, globals, and compilation units.
"""
def __init__(self, module): def __init__(self, module):
"""
Initialize the debug info generator.
Args:
module: LLVM IR module to attach debug info to
"""
self.module = module self.module = module
self._type_cache = {} # Cache for common debug types self._type_cache = {} # Cache for common debug types
def generate_file_metadata(self, filename, dirname): def generate_file_metadata(self, filename, dirname):
"""
Generate file metadata for debug info.
Args:
filename: Name of the source file
dirname: Directory containing the source file
"""
self.module._file_metadata = self.module.add_debug_info( self.module._file_metadata = self.module.add_debug_info(
"DIFile", "DIFile",
{ # type: ignore { # type: ignore
@ -24,6 +44,15 @@ class DebugInfoGenerator:
def generate_debug_cu( def generate_debug_cu(
self, language, producer: str, is_optimized: bool, is_distinct: bool self, language, producer: str, is_optimized: bool, is_distinct: bool
): ):
"""
Generate debug compile unit metadata.
Args:
language: DWARF language code (e.g., DW_LANG_C11)
producer: Compiler/producer string
is_optimized: Whether the code is optimized
is_distinct: Whether the compile unit should be distinct
"""
self.module._debug_compile_unit = self.module.add_debug_info( self.module._debug_compile_unit = self.module.add_debug_info(
"DICompileUnit", "DICompileUnit",
{ # type: ignore { # type: ignore
@ -83,6 +112,16 @@ class DebugInfoGenerator:
@staticmethod @staticmethod
def _compute_array_size(base_type: Any, count: int) -> int: def _compute_array_size(base_type: Any, count: int) -> int:
"""
Compute the size of an array in bits.
Args:
base_type: The base type of the array
count: Number of elements in the array
Returns:
Total size in bits
"""
# Extract size from base_type if possible # Extract size from base_type if possible
# For simplicity, assuming base_type has a size attribute # For simplicity, assuming base_type has a size attribute
return getattr(base_type, "size", 32) * count return getattr(base_type, "size", 32) * count

View File

@ -1,7 +1,10 @@
"""Debug information types and constants."""
import llvmlite.ir as ir import llvmlite.ir as ir
class DwarfBehaviorEnum: class DwarfBehaviorEnum:
"""DWARF module flag behavior constants for LLVM."""
ERROR_IF_MISMATCH = ir.Constant(ir.IntType(32), 1) ERROR_IF_MISMATCH = ir.Constant(ir.IntType(32), 1)
WARNING_IF_MISMATCH = ir.Constant(ir.IntType(32), 2) WARNING_IF_MISMATCH = ir.Constant(ir.IntType(32), 2)
OVERRIDE_USE_LARGEST = ir.Constant(ir.IntType(32), 7) OVERRIDE_USE_LARGEST = ir.Constant(ir.IntType(32), 7)

View File

@ -1,3 +1,9 @@
"""
DWARF debugging format constants.
Generated constants from dwarf.h for use in debug information generation.
"""
# generated constants from dwarf.h # generated constants from dwarf.h
DW_UT_compile = 0x01 DW_UT_compile = 0x01

View File

@ -1,3 +1,11 @@
"""
Decorators for marking BPF functions, maps, structs, and globals.
This module provides the core decorators used to annotate Python code
for BPF compilation.
"""
def bpf(func): def bpf(func):
"""Decorator to mark a function for BPF compilation.""" """Decorator to mark a function for BPF compilation."""
func._is_bpf = True func._is_bpf = True
@ -23,7 +31,17 @@ def struct(cls):
def section(name: str): def section(name: str):
"""
Decorator to specify the ELF section name for a BPF program.
Args:
name: The section name (e.g., 'xdp', 'tracepoint/syscalls/sys_enter_execve')
Returns:
A decorator function that marks the function with the section name
"""
def wrapper(fn): def wrapper(fn):
"""Decorator that sets the section name on the function."""
fn._section = name fn._section = name
return fn return fn

View File

@ -0,0 +1,6 @@
"""Expression evaluation and processing for BPF programs."""
from .expr_pass import eval_expr, handle_expr
from .type_normalization import convert_to_bool
__all__ = ["eval_expr", "handle_expr", "convert_to_bool"]

468
pythonbpf/expr/expr_pass.py Normal file
View File

@ -0,0 +1,468 @@
"""
Expression evaluation and LLVM IR generation.
This module handles the evaluation of Python expressions in BPF programs,
including variables, constants, function calls, comparisons, boolean
operations, and more.
"""
import ast
from llvmlite import ir
from logging import Logger
import logging
from typing import Dict
from pythonbpf.type_deducer import ctypes_to_ir, is_ctypes
from .type_normalization import convert_to_bool, handle_comparator
logger: Logger = logging.getLogger(__name__)
def _handle_name_expr(expr: ast.Name, local_sym_tab: Dict, builder: ir.IRBuilder):
"""Handle ast.Name expressions."""
if expr.id in local_sym_tab:
var = local_sym_tab[expr.id].var
val = builder.load(var)
return val, local_sym_tab[expr.id].ir_type
else:
logger.info(f"Undefined variable {expr.id}")
return None
def _handle_constant_expr(expr: ast.Constant):
"""Handle ast.Constant expressions."""
if isinstance(expr.value, int) or isinstance(expr.value, bool):
return ir.Constant(ir.IntType(64), int(expr.value)), ir.IntType(64)
else:
logger.error("Unsupported constant type")
return None
def _handle_attribute_expr(
expr: ast.Attribute,
local_sym_tab: Dict,
structs_sym_tab: Dict,
builder: ir.IRBuilder,
):
"""Handle ast.Attribute expressions for struct field access."""
if isinstance(expr.value, ast.Name):
var_name = expr.value.id
attr_name = expr.attr
if var_name in local_sym_tab:
var_ptr, var_type, var_metadata = local_sym_tab[var_name]
logger.info(f"Loading attribute {attr_name} from variable {var_name}")
logger.info(f"Variable type: {var_type}, Variable ptr: {var_ptr}")
metadata = structs_sym_tab[var_metadata]
if attr_name in metadata.fields:
gep = metadata.gep(builder, var_ptr, attr_name)
val = builder.load(gep)
field_type = metadata.field_type(attr_name)
return val, field_type
return None
def _handle_deref_call(expr: ast.Call, local_sym_tab: Dict, builder: ir.IRBuilder):
"""Handle deref function calls."""
logger.info(f"Handling deref {ast.dump(expr)}")
if len(expr.args) != 1:
logger.info("deref takes exactly one argument")
return None
arg = expr.args[0]
if (
isinstance(arg, ast.Call)
and isinstance(arg.func, ast.Name)
and arg.func.id == "deref"
):
logger.info("Multiple deref not supported")
return None
if isinstance(arg, ast.Name):
if arg.id in local_sym_tab:
arg_ptr = local_sym_tab[arg.id].var
else:
logger.info(f"Undefined variable {arg.id}")
return None
else:
logger.info("Unsupported argument type for deref")
return None
if arg_ptr is None:
logger.info("Failed to evaluate deref argument")
return None
# Load the value from pointer
val = builder.load(arg_ptr)
return val, local_sym_tab[arg.id].ir_type
def _handle_ctypes_call(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""Handle ctypes type constructor calls."""
if len(expr.args) != 1:
logger.info("ctypes constructor takes exactly one argument")
return None
arg = expr.args[0]
val = eval_expr(
func,
module,
builder,
arg,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if val is None:
logger.info("Failed to evaluate argument to ctypes constructor")
return None
call_type = expr.func.id
expected_type = ctypes_to_ir(call_type)
if val[1] != expected_type:
# NOTE: We are only considering casting to and from int types for now
if isinstance(val[1], ir.IntType) and isinstance(expected_type, ir.IntType):
if val[1].width < expected_type.width:
val = (builder.sext(val[0], expected_type), expected_type)
else:
val = (builder.trunc(val[0], expected_type), expected_type)
else:
raise ValueError(f"Type mismatch: expected {expected_type}, got {val[1]}")
return val
def _handle_compare(
func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab=None
):
"""Handle ast.Compare expressions."""
if len(cond.ops) != 1 or len(cond.comparators) != 1:
logger.error("Only single comparisons are supported")
return None
lhs = eval_expr(
func,
module,
builder,
cond.left,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
rhs = eval_expr(
func,
module,
builder,
cond.comparators[0],
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if lhs is None or rhs is None:
logger.error("Failed to evaluate comparison operands")
return None
lhs, _ = lhs
rhs, _ = rhs
return handle_comparator(func, builder, cond.ops[0], lhs, rhs)
def _handle_unary_op(
func,
module,
builder,
expr: ast.UnaryOp,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""Handle ast.UnaryOp expressions."""
if not isinstance(expr.op, ast.Not):
logger.error("Only 'not' unary operator is supported")
return None
operand = eval_expr(
func, module, builder, expr.operand, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand is None:
logger.error("Failed to evaluate operand for unary operation")
return None
operand_val, operand_type = operand
true_const = ir.Constant(ir.IntType(1), 1)
result = builder.xor(convert_to_bool(builder, operand_val), true_const)
return result, ir.IntType(1)
def _handle_and_op(func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab):
"""Handle `and` boolean operations."""
logger.debug(f"Handling 'and' operator with {len(expr.values)} operands")
merge_block = func.append_basic_block(name="and.merge")
false_block = func.append_basic_block(name="and.false")
incoming_values = []
for i, value in enumerate(expr.values):
is_last = i == len(expr.values) - 1
# Evaluate current operand
operand_result = eval_expr(
func, None, builder, value, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand_result is None:
logger.error(f"Failed to evaluate operand {i} in 'and' expression")
return None
operand_val, operand_type = operand_result
# Convert to boolean if needed
operand_bool = convert_to_bool(builder, operand_val)
current_block = builder.block
if is_last:
# Last operand: result is this value
builder.branch(merge_block)
incoming_values.append((operand_bool, current_block))
else:
# Not last: check if true, continue or short-circuit
next_check = func.append_basic_block(name=f"and.check_{i + 1}")
builder.cbranch(operand_bool, next_check, false_block)
builder.position_at_end(next_check)
# False block: short-circuit with false
builder.position_at_end(false_block)
builder.branch(merge_block)
false_value = ir.Constant(ir.IntType(1), 0)
incoming_values.append((false_value, false_block))
# Merge block: phi node
builder.position_at_end(merge_block)
phi = builder.phi(ir.IntType(1), name="and.result")
for val, block in incoming_values:
phi.add_incoming(val, block)
logger.debug(f"Generated 'and' with {len(incoming_values)} incoming values")
return phi, ir.IntType(1)
def _handle_or_op(func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab):
"""Handle `or` boolean operations."""
logger.debug(f"Handling 'or' operator with {len(expr.values)} operands")
merge_block = func.append_basic_block(name="or.merge")
true_block = func.append_basic_block(name="or.true")
incoming_values = []
for i, value in enumerate(expr.values):
is_last = i == len(expr.values) - 1
# Evaluate current operand
operand_result = eval_expr(
func, None, builder, value, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand_result is None:
logger.error(f"Failed to evaluate operand {i} in 'or' expression")
return None
operand_val, operand_type = operand_result
# Convert to boolean if needed
operand_bool = convert_to_bool(builder, operand_val)
current_block = builder.block
if is_last:
# Last operand: result is this value
builder.branch(merge_block)
incoming_values.append((operand_bool, current_block))
else:
# Not last: check if false, continue or short-circuit
next_check = func.append_basic_block(name=f"or.check_{i + 1}")
builder.cbranch(operand_bool, true_block, next_check)
builder.position_at_end(next_check)
# True block: short-circuit with true
builder.position_at_end(true_block)
builder.branch(merge_block)
true_value = ir.Constant(ir.IntType(1), 1)
incoming_values.append((true_value, true_block))
# Merge block: phi node
builder.position_at_end(merge_block)
phi = builder.phi(ir.IntType(1), name="or.result")
for val, block in incoming_values:
phi.add_incoming(val, block)
logger.debug(f"Generated 'or' with {len(incoming_values)} incoming values")
return phi, ir.IntType(1)
def _handle_boolean_op(
func,
module,
builder,
expr: ast.BoolOp,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""Handle `and` and `or` boolean operations."""
if isinstance(expr.op, ast.And):
return _handle_and_op(
func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr.op, ast.Or):
return _handle_or_op(
func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
else:
logger.error(f"Unsupported boolean operator: {type(expr.op).__name__}")
return None
def eval_expr(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""
Evaluate an expression and return its LLVM IR value and type.
Args:
func: The LLVM IR function being built
module: The LLVM IR module
builder: LLVM IR builder
expr: The AST expression node to evaluate
local_sym_tab: Local symbol table
map_sym_tab: Map symbol table
structs_sym_tab: Struct symbol table
Returns:
A tuple of (value, type) or None if evaluation fails
"""
logger.info(f"Evaluating expression: {ast.dump(expr)}")
if isinstance(expr, ast.Name):
return _handle_name_expr(expr, local_sym_tab, builder)
elif isinstance(expr, ast.Constant):
return _handle_constant_expr(expr)
elif isinstance(expr, ast.Call):
if isinstance(expr.func, ast.Name) and expr.func.id == "deref":
return _handle_deref_call(expr, local_sym_tab, builder)
if isinstance(expr.func, ast.Name) and is_ctypes(expr.func.id):
return _handle_ctypes_call(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
# delayed import to avoid circular dependency
from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
if isinstance(expr.func, ast.Name) and HelperHandlerRegistry.has_handler(
expr.func.id
):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func, ast.Attribute):
logger.info(f"Handling method call: {ast.dump(expr.func)}")
if isinstance(expr.func.value, ast.Call) and isinstance(
expr.func.value.func, ast.Name
):
method_name = expr.func.attr
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func.value, ast.Name):
obj_name = expr.func.value.id
method_name = expr.func.attr
if obj_name in map_sym_tab:
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr, ast.Attribute):
return _handle_attribute_expr(expr, local_sym_tab, structs_sym_tab, builder)
elif isinstance(expr, ast.BinOp):
from pythonbpf.binary_ops import handle_binary_op
return handle_binary_op(expr, builder, None, local_sym_tab)
elif isinstance(expr, ast.Compare):
return _handle_compare(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr, ast.UnaryOp):
return _handle_unary_op(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr, ast.BoolOp):
return _handle_boolean_op(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
logger.info("Unsupported expression evaluation")
return None
def handle_expr(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
):
"""Handle expression statements in the function body."""
logger.info(f"Handling expression: {ast.dump(expr)}")
call = expr.value
if isinstance(call, ast.Call):
eval_expr(
func,
module,
builder,
call,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
else:
logger.info("Unsupported expression type")

View File

@ -0,0 +1,186 @@
"""
Type normalization and comparison operations for expressions.
This module provides utilities for normalizing types between expressions,
handling pointer dereferencing, and generating comparison operations.
"""
from llvmlite import ir
import logging
import ast
logger = logging.getLogger(__name__)
COMPARISON_OPS = {
ast.Eq: "==",
ast.NotEq: "!=",
ast.Lt: "<",
ast.LtE: "<=",
ast.Gt: ">",
ast.GtE: ">=",
ast.Is: "==",
ast.IsNot: "!=",
}
def _get_base_type_and_depth(ir_type):
"""
Get the base type and pointer depth for an LLVM IR type.
Args:
ir_type: The LLVM IR type to analyze
Returns:
A tuple of (base_type, depth) where depth is the number of pointer levels
"""
cur_type = ir_type
depth = 0
while isinstance(cur_type, ir.PointerType):
depth += 1
cur_type = cur_type.pointee
return cur_type, depth
def _deref_to_depth(func, builder, val, target_depth):
"""
Dereference a pointer to a certain depth with null checks.
Args:
func: The LLVM IR function being built
builder: LLVM IR builder
val: The pointer value to dereference
target_depth: Number of levels to dereference
Returns:
The dereferenced value, or None if dereferencing fails
"""
cur_val = val
cur_type = val.type
for depth in range(target_depth):
if not isinstance(val.type, ir.PointerType):
logger.error("Cannot dereference further, non-pointer type")
return None
# dereference with null check
pointee_type = cur_type.pointee
null_check_block = builder.block
not_null_block = func.append_basic_block(name=f"deref_not_null_{depth}")
merge_block = func.append_basic_block(name=f"deref_merge_{depth}")
null_ptr = ir.Constant(cur_type, None)
is_not_null = builder.icmp_signed("!=", cur_val, null_ptr)
logger.debug(f"Inserted null check for pointer at depth {depth}")
builder.cbranch(is_not_null, not_null_block, merge_block)
builder.position_at_end(not_null_block)
dereferenced_val = builder.load(cur_val)
logger.debug(f"Dereferenced to depth {depth - 1}, type: {pointee_type}")
builder.branch(merge_block)
builder.position_at_end(merge_block)
phi = builder.phi(pointee_type, name=f"deref_result_{depth}")
zero_value = (
ir.Constant(pointee_type, 0)
if isinstance(pointee_type, ir.IntType)
else ir.Constant(pointee_type, None)
)
phi.add_incoming(zero_value, null_check_block)
phi.add_incoming(dereferenced_val, not_null_block)
# Continue with phi result
cur_val = phi
cur_type = pointee_type
return cur_val
def _normalize_types(func, builder, lhs, rhs):
"""
Normalize types for comparison by casting or dereferencing as needed.
Args:
func: The LLVM IR function being built
builder: LLVM IR builder
lhs: Left-hand side value
rhs: Right-hand side value
Returns:
A tuple of (normalized_lhs, normalized_rhs) or (None, None) on error
"""
logger.info(f"Normalizing types: {lhs.type} vs {rhs.type}")
if isinstance(lhs.type, ir.IntType) and isinstance(rhs.type, ir.IntType):
if lhs.type.width < rhs.type.width:
lhs = builder.sext(lhs, rhs.type)
else:
rhs = builder.sext(rhs, lhs.type)
return lhs, rhs
elif not isinstance(lhs.type, ir.PointerType) and not isinstance(
rhs.type, ir.PointerType
):
logger.error(f"Type mismatch: {lhs.type} vs {rhs.type}")
return None, None
else:
lhs_base, lhs_depth = _get_base_type_and_depth(lhs.type)
rhs_base, rhs_depth = _get_base_type_and_depth(rhs.type)
if lhs_base == rhs_base:
if lhs_depth < rhs_depth:
rhs = _deref_to_depth(func, builder, rhs, rhs_depth - lhs_depth)
elif rhs_depth < lhs_depth:
lhs = _deref_to_depth(func, builder, lhs, lhs_depth - rhs_depth)
return _normalize_types(func, builder, lhs, rhs)
def convert_to_bool(builder, val):
"""
Convert an LLVM IR value to a boolean (i1) type.
Args:
builder: LLVM IR builder
val: The value to convert
Returns:
An i1 boolean value
"""
if val.type == ir.IntType(1):
return val
if isinstance(val.type, ir.PointerType):
zero = ir.Constant(val.type, None)
else:
zero = ir.Constant(val.type, 0)
return builder.icmp_signed("!=", val, zero)
def handle_comparator(func, builder, op, lhs, rhs):
"""
Handle comparison operations between two values.
Args:
func: The LLVM IR function being built
builder: LLVM IR builder
op: The AST comparison operator node
lhs: Left-hand side value
rhs: Right-hand side value
Returns:
A tuple of (result, ir.IntType(1)) or None on error
"""
if lhs.type != rhs.type:
lhs, rhs = _normalize_types(func, builder, lhs, rhs)
if lhs is None or rhs is None:
return None
if type(op) not in COMPARISON_OPS:
logger.error(f"Unsupported comparison operator: {type(op)}")
return None
predicate = COMPARISON_OPS[type(op)]
result = builder.icmp_signed(predicate, lhs, rhs)
logger.debug(f"Comparison result: {result}")
return result, ir.IntType(1)

View File

@ -1,183 +0,0 @@
import ast
from llvmlite import ir
from logging import Logger
import logging
from typing import Dict
logger: Logger = logging.getLogger(__name__)
def _handle_name_expr(expr: ast.Name, local_sym_tab: Dict, builder: ir.IRBuilder):
"""Handle ast.Name expressions."""
if expr.id in local_sym_tab:
var = local_sym_tab[expr.id].var
val = builder.load(var)
return val, local_sym_tab[expr.id].ir_type
else:
logger.info(f"Undefined variable {expr.id}")
return None
def _handle_constant_expr(expr: ast.Constant):
"""Handle ast.Constant expressions."""
if isinstance(expr.value, int):
return ir.Constant(ir.IntType(64), expr.value), ir.IntType(64)
elif isinstance(expr.value, bool):
return ir.Constant(ir.IntType(1), int(expr.value)), ir.IntType(1)
else:
logger.info("Unsupported constant type")
return None
def _handle_attribute_expr(
expr: ast.Attribute,
local_sym_tab: Dict,
structs_sym_tab: Dict,
builder: ir.IRBuilder,
):
"""Handle ast.Attribute expressions for struct field access."""
if isinstance(expr.value, ast.Name):
var_name = expr.value.id
attr_name = expr.attr
if var_name in local_sym_tab:
var_ptr, var_type, var_metadata = local_sym_tab[var_name]
logger.info(f"Loading attribute {attr_name} from variable {var_name}")
logger.info(f"Variable type: {var_type}, Variable ptr: {var_ptr}")
metadata = structs_sym_tab[var_metadata]
if attr_name in metadata.fields:
gep = metadata.gep(builder, var_ptr, attr_name)
val = builder.load(gep)
field_type = metadata.field_type(attr_name)
return val, field_type
return None
def _handle_deref_call(expr: ast.Call, local_sym_tab: Dict, builder: ir.IRBuilder):
"""Handle deref function calls."""
logger.info(f"Handling deref {ast.dump(expr)}")
if len(expr.args) != 1:
logger.info("deref takes exactly one argument")
return None
arg = expr.args[0]
if (
isinstance(arg, ast.Call)
and isinstance(arg.func, ast.Name)
and arg.func.id == "deref"
):
logger.info("Multiple deref not supported")
return None
if isinstance(arg, ast.Name):
if arg.id in local_sym_tab:
arg_ptr = local_sym_tab[arg.id].var
else:
logger.info(f"Undefined variable {arg.id}")
return None
else:
logger.info("Unsupported argument type for deref")
return None
if arg_ptr is None:
logger.info("Failed to evaluate deref argument")
return None
# Load the value from pointer
val = builder.load(arg_ptr)
return val, local_sym_tab[arg.id].ir_type
def eval_expr(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
logger.info(f"Evaluating expression: {ast.dump(expr)}")
if isinstance(expr, ast.Name):
return _handle_name_expr(expr, local_sym_tab, builder)
elif isinstance(expr, ast.Constant):
return _handle_constant_expr(expr)
elif isinstance(expr, ast.Call):
if isinstance(expr.func, ast.Name) and expr.func.id == "deref":
return _handle_deref_call(expr, local_sym_tab, builder)
# delayed import to avoid circular dependency
from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
if isinstance(expr.func, ast.Name) and HelperHandlerRegistry.has_handler(
expr.func.id
):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func, ast.Attribute):
logger.info(f"Handling method call: {ast.dump(expr.func)}")
if isinstance(expr.func.value, ast.Call) and isinstance(
expr.func.value.func, ast.Name
):
method_name = expr.func.attr
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func.value, ast.Name):
obj_name = expr.func.value.id
method_name = expr.func.attr
if obj_name in map_sym_tab:
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr, ast.Attribute):
return _handle_attribute_expr(expr, local_sym_tab, structs_sym_tab, builder)
logger.info("Unsupported expression evaluation")
return None
def handle_expr(
func,
module,
builder,
expr,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
):
"""Handle expression statements in the function body."""
logger.info(f"Handling expression: {ast.dump(expr)}")
call = expr.value
if isinstance(call, ast.Call):
eval_expr(
func,
module,
builder,
call,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
else:
logger.info("Unsupported expression type")

View File

@ -0,0 +1,5 @@
"""BPF function processing and LLVM IR generation."""
from .functions_pass import func_proc
__all__ = ["func_proc"]

View File

@ -0,0 +1,25 @@
"""Registry for statement handler functions."""
from typing import Dict
class StatementHandlerRegistry:
"""Registry for statement handlers."""
_handlers: Dict = {}
@classmethod
def register(cls, stmt_type):
"""Register a handler for a specific statement type."""
def decorator(handler):
"""Decorator that registers the handler."""
cls._handlers[stmt_type] = handler
return handler
return decorator
@classmethod
def __getitem__(cls, stmt_type):
"""Get the handler for a specific statement type."""
return cls._handlers.get(stmt_type, None)

View File

@ -1,24 +1,44 @@
"""
BPF function processing and LLVM IR generation.
This module handles the core function processing, converting Python function
definitions into LLVM IR for BPF programs. It manages local variables,
control flow, and statement processing.
"""
from llvmlite import ir from llvmlite import ir
import ast import ast
import logging import logging
from typing import Any from typing import Any
from dataclasses import dataclass from dataclasses import dataclass
from .helper import HelperHandlerRegistry, handle_helper_call from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
from .type_deducer import ctypes_to_ir from pythonbpf.type_deducer import ctypes_to_ir
from .binary_ops import handle_binary_op from pythonbpf.binary_ops import handle_binary_op
from .expr_pass import eval_expr, handle_expr from pythonbpf.expr import eval_expr, handle_expr, convert_to_bool
from .return_utils import _handle_none_return, _handle_xdp_return, _is_xdp_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass @dataclass
class LocalSymbol: class LocalSymbol:
"""
Represents a local variable in a BPF function.
Attributes:
var: LLVM IR alloca instruction for the variable
ir_type: LLVM IR type of the variable
metadata: Optional metadata (e.g., struct type name)
"""
var: ir.AllocaInstr var: ir.AllocaInstr
ir_type: ir.Type ir_type: ir.Type
metadata: Any = None metadata: Any = None
def __iter__(self): def __iter__(self):
"""Support tuple unpacking of LocalSymbol."""
yield self.var yield self.var
yield self.ir_type yield self.ir_type
yield self.metadata yield self.metadata
@ -146,8 +166,7 @@ def handle_assign(
local_sym_tab[var_name].var, local_sym_tab[var_name].var,
) )
logger.info( logger.info(
f"Assigned {call_type} constant " f"Assigned {call_type} constant {rval.args[0].value} to {var_name}"
f"{rval.args[0].value} to {var_name}"
) )
elif HelperHandlerRegistry.has_handler(call_type): elif HelperHandlerRegistry.has_handler(call_type):
# var = builder.alloca(ir.IntType(64), name=var_name) # var = builder.alloca(ir.IntType(64), name=var_name)
@ -233,76 +252,33 @@ def handle_assign(
else: else:
logger.info("Unsupported assignment call function type") logger.info("Unsupported assignment call function type")
elif isinstance(rval, ast.BinOp): elif isinstance(rval, ast.BinOp):
handle_binary_op(rval, module, builder, var_name, local_sym_tab) handle_binary_op(rval, builder, var_name, local_sym_tab)
else: else:
logger.info("Unsupported assignment value type") logger.info("Unsupported assignment value type")
def handle_cond(func, module, builder, cond, local_sym_tab, map_sym_tab): def handle_cond(
if isinstance(cond, ast.Constant): func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab=None
if isinstance(cond.value, bool): ):
return ir.Constant(ir.IntType(1), int(cond.value)) """
elif isinstance(cond.value, int): Evaluate a condition expression and convert it to a boolean value.
return ir.Constant(ir.IntType(1), int(bool(cond.value)))
else: Args:
logger.info("Unsupported constant type in condition") func: The LLVM IR function being built
return None module: The LLVM IR module
elif isinstance(cond, ast.Name): builder: LLVM IR builder
if cond.id in local_sym_tab: cond: The AST condition node to evaluate
var = local_sym_tab[cond.id].var local_sym_tab: Local symbol table
val = builder.load(var) map_sym_tab: Map symbol table
if val.type != ir.IntType(1): structs_sym_tab: Struct symbol table
# Convert nonzero values to true, zero to false
if isinstance(val.type, ir.PointerType): Returns:
# For pointer types, compare with null pointer LLVM IR boolean value representing the condition result
zero = ir.Constant(val.type, None) """
else: val = eval_expr(
# For integer types, compare with zero func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab
zero = ir.Constant(val.type, 0) )[0]
val = builder.icmp_signed("!=", val, zero) return convert_to_bool(builder, val)
return val
else:
logger.info(f"Undefined variable {cond.id} in condition")
return None
elif isinstance(cond, ast.Compare):
lhs = eval_expr(func, module, builder, cond.left, local_sym_tab, map_sym_tab)[0]
if len(cond.ops) != 1 or len(cond.comparators) != 1:
logger.info("Unsupported complex comparison")
return None
rhs = eval_expr(
func, module, builder, cond.comparators[0], local_sym_tab, map_sym_tab
)[0]
op = cond.ops[0]
if lhs.type != rhs.type:
if isinstance(lhs.type, ir.IntType) and isinstance(rhs.type, ir.IntType):
# Extend the smaller type to the larger type
if lhs.type.width < rhs.type.width:
lhs = builder.sext(lhs, rhs.type)
elif lhs.type.width > rhs.type.width:
rhs = builder.sext(rhs, lhs.type)
else:
logger.info("Type mismatch in comparison")
return None
if isinstance(op, ast.Eq):
return builder.icmp_signed("==", lhs, rhs)
elif isinstance(op, ast.NotEq):
return builder.icmp_signed("!=", lhs, rhs)
elif isinstance(op, ast.Lt):
return builder.icmp_signed("<", lhs, rhs)
elif isinstance(op, ast.LtE):
return builder.icmp_signed("<=", lhs, rhs)
elif isinstance(op, ast.Gt):
return builder.icmp_signed(">", lhs, rhs)
elif isinstance(op, ast.GtE):
return builder.icmp_signed(">=", lhs, rhs)
else:
logger.info("Unsupported comparison operator")
return None
else:
logger.info("Unsupported condition expression")
return None
def handle_if( def handle_if(
@ -318,7 +294,9 @@ def handle_if(
else: else:
else_block = None else_block = None
cond = handle_cond(func, module, builder, stmt.test, local_sym_tab, map_sym_tab) cond = handle_cond(
func, module, builder, stmt.test, local_sym_tab, map_sym_tab, structs_sym_tab
)
if else_block: if else_block:
builder.cbranch(cond, then_block, else_block) builder.cbranch(cond, then_block, else_block)
else: else:
@ -351,6 +329,39 @@ def handle_if(
builder.position_at_end(merge_block) builder.position_at_end(merge_block)
def handle_return(builder, stmt, local_sym_tab, ret_type):
"""
Handle return statements in BPF functions.
Args:
builder: LLVM IR builder
stmt: The AST Return node
local_sym_tab: Local symbol table
ret_type: Expected return type
Returns:
True if a return was emitted, False otherwise
"""
logger.info(f"Handling return statement: {ast.dump(stmt)}")
if stmt.value is None:
return _handle_none_return(builder)
elif isinstance(stmt.value, ast.Name) and _is_xdp_name(stmt.value.id):
return _handle_xdp_return(stmt, builder, ret_type)
else:
val = eval_expr(
func=None,
module=None,
builder=builder,
expr=stmt.value,
local_sym_tab=local_sym_tab,
map_sym_tab={},
structs_sym_tab={},
)
logger.info(f"Evaluated return expression to {val}")
builder.ret(val[0])
return True
def process_stmt( def process_stmt(
func, func,
module, module,
@ -362,6 +373,23 @@ def process_stmt(
did_return, did_return,
ret_type=ir.IntType(64), ret_type=ir.IntType(64),
): ):
"""
Process a single statement in a BPF function.
Args:
func: The LLVM IR function being built
module: The LLVM IR module
builder: LLVM IR builder
stmt: The AST statement node to process
local_sym_tab: Local symbol table
map_sym_tab: Map symbol table
structs_sym_tab: Struct symbol table
did_return: Whether a return has been emitted
ret_type: Expected return type
Returns:
True if a return was emitted, False otherwise
"""
logger.info(f"Processing statement: {ast.dump(stmt)}") logger.info(f"Processing statement: {ast.dump(stmt)}")
if isinstance(stmt, ast.Expr): if isinstance(stmt, ast.Expr):
handle_expr( handle_expr(
@ -384,42 +412,37 @@ def process_stmt(
func, module, builder, stmt, map_sym_tab, local_sym_tab, structs_sym_tab func, module, builder, stmt, map_sym_tab, local_sym_tab, structs_sym_tab
) )
elif isinstance(stmt, ast.Return): elif isinstance(stmt, ast.Return):
if stmt.value is None: did_return = handle_return(
builder.ret(ir.Constant(ir.IntType(32), 0)) builder,
did_return = True stmt,
elif ( local_sym_tab,
isinstance(stmt.value, ast.Call) ret_type,
and isinstance(stmt.value.func, ast.Name) )
and len(stmt.value.args) == 1
and isinstance(stmt.value.args[0], ast.Constant)
and isinstance(stmt.value.args[0].value, int)
):
call_type = stmt.value.func.id
if ctypes_to_ir(call_type) != ret_type:
raise ValueError(
"Return type mismatch: expected"
f"{ctypes_to_ir(call_type)}, got {call_type}"
)
else:
builder.ret(ir.Constant(ret_type, stmt.value.args[0].value))
did_return = True
elif isinstance(stmt.value, ast.Name):
if stmt.value.id == "XDP_PASS":
builder.ret(ir.Constant(ret_type, 2))
did_return = True
elif stmt.value.id == "XDP_DROP":
builder.ret(ir.Constant(ret_type, 1))
did_return = True
else:
raise ValueError("Failed to evaluate return expression")
else:
raise ValueError("Unsupported return value")
return did_return return did_return
def allocate_mem( def allocate_mem(
module, builder, body, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab module, builder, body, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab
): ):
"""
Pre-allocate stack memory for local variables in a BPF function.
This function scans the function body and creates alloca instructions
for all local variables before processing the function statements.
Args:
module: The LLVM IR module
builder: LLVM IR builder
body: List of AST statements in the function body
func: The LLVM IR function being built
ret_type: Expected return type
map_sym_tab: Map symbol table
local_sym_tab: Local symbol table to populate
structs_sym_tab: Struct symbol table
Returns:
Updated local symbol table
"""
for stmt in body: for stmt in body:
has_metadata = False has_metadata = False
if isinstance(stmt, ast.If): if isinstance(stmt, ast.If):
@ -455,6 +478,9 @@ def allocate_mem(
continue continue
var_name = target.id var_name = target.id
rval = stmt.value rval = stmt.value
if var_name in local_sym_tab:
logger.info(f"Variable {var_name} already allocated")
continue
if isinstance(rval, ast.Call): if isinstance(rval, ast.Call):
if isinstance(rval.func, ast.Name): if isinstance(rval.func, ast.Name):
call_type = rval.func.id call_type = rval.func.id
@ -483,8 +509,7 @@ def allocate_mem(
var = builder.alloca(ir_type, name=var_name) var = builder.alloca(ir_type, name=var_name)
has_metadata = True has_metadata = True
logger.info( logger.info(
f"Pre-allocated variable {var_name} " f"Pre-allocated variable {var_name} for struct {call_type}"
f"for struct {call_type}"
) )
elif isinstance(rval.func, ast.Attribute): elif isinstance(rval.func, ast.Attribute):
ir_type = ir.PointerType(ir.IntType(64)) ir_type = ir.PointerType(ir.IntType(64))
@ -568,7 +593,7 @@ def process_func_body(
) )
if not did_return: if not did_return:
builder.ret(ir.Constant(ir.IntType(32), 0)) builder.ret(ir.Constant(ir.IntType(64), 0))
def process_bpf_chunk(func_node, module, return_type, map_sym_tab, structs_sym_tab): def process_bpf_chunk(func_node, module, return_type, map_sym_tab, structs_sym_tab):
@ -611,6 +636,16 @@ def process_bpf_chunk(func_node, module, return_type, map_sym_tab, structs_sym_t
def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab): def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
"""
Process all BPF function chunks and generate LLVM IR.
Args:
tree: The Python AST (not used in current implementation)
module: The LLVM IR module to add functions to
chunks: List of AST function nodes decorated with @bpf
map_sym_tab: Map symbol table
structs_sym_tab: Struct symbol table
"""
for func_node in chunks: for func_node in chunks:
is_global = False is_global = False
for decorator in func_node.decorator_list: for decorator in func_node.decorator_list:
@ -636,6 +671,18 @@ def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
def infer_return_type(func_node: ast.FunctionDef): def infer_return_type(func_node: ast.FunctionDef):
"""
Infer the return type of a BPF function from annotations or return statements.
Args:
func_node: The AST function node
Returns:
String representation of the return type (e.g., 'c_int64')
Raises:
TypeError: If func_node is not a FunctionDef
"""
if not isinstance(func_node, (ast.FunctionDef, ast.AsyncFunctionDef)): if not isinstance(func_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise TypeError("Expected ast.FunctionDef") raise TypeError("Expected ast.FunctionDef")
if func_node.returns is not None: if func_node.returns is not None:
@ -654,6 +701,7 @@ def infer_return_type(func_node: ast.FunctionDef):
found_type = None found_type = None
def _expr_type(e): def _expr_type(e):
"""Helper function to extract type from an expression."""
if e is None: if e is None:
return "None" return "None"
if isinstance(e, ast.Constant): if isinstance(e, ast.Constant):

View File

@ -0,0 +1,52 @@
"""
Utility functions for handling return statements in BPF functions.
Provides handlers for different types of returns including XDP actions,
None returns, and standard returns.
"""
import logging
import ast
from llvmlite import ir
logger: logging.Logger = logging.getLogger(__name__)
XDP_ACTIONS = {
"XDP_ABORTED": 0,
"XDP_DROP": 1,
"XDP_PASS": 2,
"XDP_TX": 3,
"XDP_REDIRECT": 4,
}
def _handle_none_return(builder) -> bool:
"""Handle return or return None -> returns 0."""
builder.ret(ir.Constant(ir.IntType(64), 0))
logger.debug("Generated default return: 0")
return True
def _is_xdp_name(name: str) -> bool:
"""Check if a name is an XDP action"""
return name in XDP_ACTIONS
def _handle_xdp_return(stmt: ast.Return, builder, ret_type) -> bool:
"""Handle XDP returns"""
if not isinstance(stmt.value, ast.Name):
return False
action_name = stmt.value.id
if action_name not in XDP_ACTIONS:
raise ValueError(
f"Unknown XDP action: {action_name}. Available: {XDP_ACTIONS.keys()}"
)
return False
value = XDP_ACTIONS[action_name]
builder.ret(ir.Constant(ret_type, value))
logger.debug(f"Generated XDP action return: {action_name} = {value}")
return True

View File

@ -1,3 +1,10 @@
"""
Global variables and compiler metadata processing.
This module handles BPF global variables and emits the @llvm.compiler.used
metadata to prevent LLVM from optimizing away important symbols.
"""
from llvmlite import ir from llvmlite import ir
import ast import ast
@ -12,6 +19,16 @@ global_sym_tab = []
def populate_global_symbol_table(tree, module: ir.Module): def populate_global_symbol_table(tree, module: ir.Module):
"""
Populate the global symbol table with BPF functions, maps, and globals.
Args:
tree: The Python AST to scan for global symbols
module: The LLVM IR module (not used in current implementation)
Returns:
False (legacy return value)
"""
for node in tree.body: for node in tree.body:
if isinstance(node, ast.FunctionDef): if isinstance(node, ast.FunctionDef):
for dec in node.decorator_list: for dec in node.decorator_list:
@ -33,6 +50,17 @@ def populate_global_symbol_table(tree, module: ir.Module):
def emit_global(module: ir.Module, node, name): def emit_global(module: ir.Module, node, name):
"""
Emit a BPF global variable into the LLVM IR module.
Args:
module: The LLVM IR module to add the global variable to
node: The AST function node containing the global definition
name: The name of the global variable
Returns:
The created global variable
"""
logger.info(f"global identifier {name} processing") logger.info(f"global identifier {name} processing")
# deduce LLVM type from the annotated return # deduce LLVM type from the annotated return
if not isinstance(node.returns, ast.Name): if not isinstance(node.returns, ast.Name):
@ -117,7 +145,11 @@ def globals_processing(tree, module):
def emit_llvm_compiler_used(module: ir.Module, names: list[str]): def emit_llvm_compiler_used(module: ir.Module, names: list[str]):
""" """
Emit the @llvm.compiler.used global given a list of function/global names. Emit the @llvm.compiler.used global to prevent LLVM from optimizing away symbols.
Args:
module: The LLVM IR module to add the compiler.used metadata to
names: List of function/global names that must be preserved
""" """
ptr_ty = ir.PointerType() ptr_ty = ir.PointerType()
used_array_ty = ir.ArrayType(ptr_ty, len(names)) used_array_ty = ir.ArrayType(ptr_ty, len(names))
@ -138,6 +170,13 @@ def emit_llvm_compiler_used(module: ir.Module, names: list[str]):
def globals_list_creation(tree, module: ir.Module): def globals_list_creation(tree, module: ir.Module):
"""
Collect all BPF symbols and emit @llvm.compiler.used metadata.
Args:
tree: The Python AST to scan for symbols
module: The LLVM IR module to add metadata to
"""
collected = ["LICENSE"] collected = ["LICENSE"]
for node in tree.body: for node in tree.body:

View File

@ -1,3 +1,5 @@
"""BPF helper functions and handlers."""
from .helper_utils import HelperHandlerRegistry from .helper_utils import HelperHandlerRegistry
from .bpf_helper_handler import handle_helper_call from .bpf_helper_handler import handle_helper_call
from .helpers import ktime, pid, deref, XDP_DROP, XDP_PASS from .helpers import ktime, pid, deref, XDP_DROP, XDP_PASS

View File

@ -1,3 +1,11 @@
"""
BPF helper function handlers for LLVM IR emission.
This module provides handlers for various BPF helper functions, emitting
the appropriate LLVM IR to call kernel BPF helpers like map operations,
printing, time functions, etc.
"""
import ast import ast
from llvmlite import ir from llvmlite import ir
from enum import Enum from enum import Enum
@ -16,6 +24,7 @@ logger: Logger = logging.getLogger(__name__)
class BPFHelperID(Enum): class BPFHelperID(Enum):
"""Enumeration of BPF helper function IDs."""
BPF_MAP_LOOKUP_ELEM = 1 BPF_MAP_LOOKUP_ELEM = 1
BPF_MAP_UPDATE_ELEM = 2 BPF_MAP_UPDATE_ELEM = 2
BPF_MAP_DELETE_ELEM = 3 BPF_MAP_DELETE_ELEM = 3
@ -62,7 +71,7 @@ def bpf_map_lookup_elem_emitter(
""" """
if not call.args or len(call.args) != 1: if not call.args or len(call.args) != 1:
raise ValueError( raise ValueError(
"Map lookup expects exactly one argument (key), got " f"{len(call.args)}" f"Map lookup expects exactly one argument (key), got {len(call.args)}"
) )
key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab) key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType()) map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
@ -145,8 +154,7 @@ def bpf_map_update_elem_emitter(
""" """
if not call.args or len(call.args) < 2 or len(call.args) > 3: if not call.args or len(call.args) < 2 or len(call.args) > 3:
raise ValueError( raise ValueError(
"Map update expects 2 or 3 args (key, value, flags), " f"Map update expects 2 or 3 args (key, value, flags), got {len(call.args)}"
f"got {len(call.args)}"
) )
key_arg = call.args[0] key_arg = call.args[0]
@ -196,7 +204,7 @@ def bpf_map_delete_elem_emitter(
""" """
if not call.args or len(call.args) != 1: if not call.args or len(call.args) != 1:
raise ValueError( raise ValueError(
"Map delete expects exactly one argument (key), got " f"{len(call.args)}" f"Map delete expects exactly one argument (key), got {len(call.args)}"
) )
key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab) key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType()) map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
@ -253,9 +261,14 @@ def bpf_perf_event_output_handler(
local_sym_tab=None, local_sym_tab=None,
struct_sym_tab=None, struct_sym_tab=None,
): ):
"""
Emit LLVM IR for bpf_perf_event_output helper function call.
This allows sending data to userspace via a perf event array.
"""
if len(call.args) != 1: if len(call.args) != 1:
raise ValueError( raise ValueError(
"Perf event output expects exactly one argument, " f"got {len(call.args)}" f"Perf event output expects exactly one argument, got {len(call.args)}"
) )
data_arg = call.args[0] data_arg = call.args[0]
ctx_ptr = func.args[0] # First argument to the function is ctx ctx_ptr = func.args[0] # First argument to the function is ctx
@ -303,6 +316,7 @@ def handle_helper_call(
# Helper function to get map pointer and invoke handler # Helper function to get map pointer and invoke handler
def invoke_helper(method_name, map_ptr=None): def invoke_helper(method_name, map_ptr=None):
"""Helper function to look up and invoke a registered handler."""
handler = HelperHandlerRegistry.get_handler(method_name) handler = HelperHandlerRegistry.get_handler(method_name)
if not handler: if not handler:
raise NotImplementedError( raise NotImplementedError(

View File

@ -1,9 +1,17 @@
"""
Utility functions for BPF helper function handling.
This module provides utility functions for processing BPF helper function
calls, including argument handling, string formatting for bpf_printk,
and a registry for helper function handlers.
"""
import ast import ast
import logging import logging
from collections.abc import Callable from collections.abc import Callable
from llvmlite import ir from llvmlite import ir
from pythonbpf.expr_pass import eval_expr from pythonbpf.expr import eval_expr
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,6 +26,7 @@ class HelperHandlerRegistry:
"""Decorator to register a handler function for a helper""" """Decorator to register a handler function for a helper"""
def decorator(func): def decorator(func):
"""Decorator that registers the handler function."""
cls._handlers[helper_name] = func cls._handlers[helper_name] = func
return func return func
@ -35,14 +44,36 @@ class HelperHandlerRegistry:
def get_var_ptr_from_name(var_name, local_sym_tab): def get_var_ptr_from_name(var_name, local_sym_tab):
"""Get a pointer to a variable from the symbol table.""" """
Get a pointer to a variable from the symbol table.
Args:
var_name: Name of the variable to look up
local_sym_tab: Local symbol table
Returns:
Pointer to the variable
Raises:
ValueError: If the variable is not found
"""
if local_sym_tab and var_name in local_sym_tab: if local_sym_tab and var_name in local_sym_tab:
return local_sym_tab[var_name].var return local_sym_tab[var_name].var
raise ValueError(f"Variable '{var_name}' not found in local symbol table") raise ValueError(f"Variable '{var_name}' not found in local symbol table")
def create_int_constant_ptr(value, builder, int_width=64): def create_int_constant_ptr(value, builder, int_width=64):
"""Create a pointer to an integer constant.""" """
Create a pointer to an integer constant.
Args:
value: The integer value
builder: LLVM IR builder
int_width: Width of the integer in bits (default: 64)
Returns:
Pointer to the allocated integer constant
"""
# Default to 64-bit integer # Default to 64-bit integer
int_type = ir.IntType(int_width) int_type = ir.IntType(int_width)
ptr = builder.alloca(int_type) ptr = builder.alloca(int_type)
@ -52,7 +83,20 @@ def create_int_constant_ptr(value, builder, int_width=64):
def get_or_create_ptr_from_arg(arg, builder, local_sym_tab): def get_or_create_ptr_from_arg(arg, builder, local_sym_tab):
"""Extract or create pointer from the call arguments.""" """
Extract or create pointer from call arguments.
Args:
arg: The AST argument node
builder: LLVM IR builder
local_sym_tab: Local symbol table
Returns:
Pointer to the argument value
Raises:
NotImplementedError: If the argument type is not supported
"""
if isinstance(arg, ast.Name): if isinstance(arg, ast.Name):
ptr = get_var_ptr_from_name(arg.id, local_sym_tab) ptr = get_var_ptr_from_name(arg.id, local_sym_tab)
@ -66,7 +110,21 @@ def get_or_create_ptr_from_arg(arg, builder, local_sym_tab):
def get_flags_val(arg, builder, local_sym_tab): def get_flags_val(arg, builder, local_sym_tab):
"""Extract or create flags value from the call arguments.""" """
Extract or create flags value from call arguments.
Args:
arg: The AST argument node for flags
builder: LLVM IR builder
local_sym_tab: Local symbol table
Returns:
Integer flags value or LLVM IR value
Raises:
ValueError: If a variable is not found in symbol table
NotImplementedError: If the argument type is not supported
"""
if not arg: if not arg:
return 0 return 0
@ -85,7 +143,18 @@ def get_flags_val(arg, builder, local_sym_tab):
def simple_string_print(string_value, module, builder, func): def simple_string_print(string_value, module, builder, func):
"""Prepare arguments for bpf_printk from a simple string value""" """
Prepare arguments for bpf_printk from a simple string value.
Args:
string_value: The string to print
module: LLVM IR module
builder: LLVM IR builder
func: The LLVM IR function being built
Returns:
List of arguments for bpf_printk
"""
fmt_str = string_value + "\n\0" fmt_str = string_value + "\n\0"
fmt_ptr = _create_format_string_global(fmt_str, func, module, builder) fmt_ptr = _create_format_string_global(fmt_str, func, module, builder)
@ -101,7 +170,23 @@ def handle_fstring_print(
local_sym_tab=None, local_sym_tab=None,
struct_sym_tab=None, struct_sym_tab=None,
): ):
"""Handle f-string formatting for bpf_printk emitter.""" """
Handle f-string formatting for bpf_printk emitter.
Args:
joined_str: AST JoinedStr node representing the f-string
module: LLVM IR module
builder: LLVM IR builder
func: The LLVM IR function being built
local_sym_tab: Local symbol table
struct_sym_tab: Struct symbol table
Returns:
List of arguments for bpf_printk
Raises:
NotImplementedError: If f-string contains unsupported value types
"""
fmt_parts = [] fmt_parts = []
exprs = [] exprs = []
@ -270,7 +355,7 @@ def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_ta
val = builder.sext(val, ir.IntType(64)) val = builder.sext(val, ir.IntType(64))
else: else:
logger.warning( logger.warning(
"Only int and ptr supported in bpf_printk args. " "Others default to 0." "Only int and ptr supported in bpf_printk args. Others default to 0."
) )
val = ir.Constant(ir.IntType(64), 0) val = ir.Constant(ir.IntType(64), 0)
return val return val

View File

@ -1,11 +1,31 @@
"""
BPF helper function stubs for Python type hints.
This module provides Python stub functions that represent BPF helper functions.
These stubs are used for type checking and will be replaced with actual BPF
helper calls during compilation.
"""
import ctypes import ctypes
def ktime(): def ktime():
"""
Get the current kernel time in nanoseconds.
Returns:
A c_int64 stub value (actual implementation is in BPF runtime)
"""
return ctypes.c_int64(0) return ctypes.c_int64(0)
def pid(): def pid():
"""
Get the current process ID (PID).
Returns:
A c_int32 stub value (actual implementation is in BPF runtime)
"""
return ctypes.c_int32(0) return ctypes.c_int32(0)

View File

@ -1,3 +1,10 @@
"""
LICENSE global variable processing for BPF programs.
This module handles the processing of the LICENSE function which is required
for BPF programs to declare their license (typically "GPL").
"""
from llvmlite import ir from llvmlite import ir
import ast import ast
from logging import Logger from logging import Logger
@ -7,6 +14,16 @@ logger: Logger = logging.getLogger(__name__)
def emit_license(module: ir.Module, license_str: str): def emit_license(module: ir.Module, license_str: str):
"""
Emit a LICENSE global variable into the LLVM IR module.
Args:
module: The LLVM IR module to add the LICENSE variable to
license_str: The license string (e.g., 'GPL')
Returns:
The created global variable
"""
license_bytes = license_str.encode("utf8") + b"\x00" license_bytes = license_str.encode("utf8") + b"\x00"
elems = [ir.Constant(ir.IntType(8), b) for b in license_bytes] elems = [ir.Constant(ir.IntType(8), b) for b in license_bytes]
ty = ir.ArrayType(ir.IntType(8), len(elems)) ty = ir.ArrayType(ir.IntType(8), len(elems))

View File

@ -1,3 +1,5 @@
"""BPF map types and processing."""
from .maps import HashMap, PerfEventArray, RingBuf from .maps import HashMap, PerfEventArray, RingBuf
from .maps_pass import maps_proc from .maps_pass import maps_proc

View File

@ -1,18 +1,59 @@
"""
BPF map type definitions for Python type hints.
This module provides Python classes that represent BPF map types.
These are used for type checking and map definition; the actual BPF maps
are generated as LLVM IR during compilation.
"""
# This file provides type and function hints only and does not actually give any functionality. # This file provides type and function hints only and does not actually give any functionality.
class HashMap: class HashMap:
"""
A BPF hash map for storing key-value pairs.
This is a type hint class used during compilation. The actual BPF map
implementation is generated as LLVM IR.
"""
def __init__(self, key, value, max_entries): def __init__(self, key, value, max_entries):
"""
Initialize a HashMap definition.
Args:
key: The ctypes type for keys (e.g., c_int64)
value: The ctypes type for values (e.g., c_int64)
max_entries: Maximum number of entries the map can hold
"""
self.key = key self.key = key
self.value = value self.value = value
self.max_entries = max_entries self.max_entries = max_entries
self.entries = {} self.entries = {}
def lookup(self, key): def lookup(self, key):
"""
Look up a value by key in the map.
Args:
key: The key to look up
Returns:
The value if found, None otherwise
"""
if key in self.entries: if key in self.entries:
return self.entries[key] return self.entries[key]
else: else:
return None return None
def delete(self, key): def delete(self, key):
"""
Delete an entry from the map by key.
Args:
key: The key to delete
Raises:
KeyError: If the key is not found in the map
"""
if key in self.entries: if key in self.entries:
del self.entries[key] del self.entries[key]
else: else:
@ -20,6 +61,17 @@ class HashMap:
# TODO: define the flags that can be added # TODO: define the flags that can be added
def update(self, key, value, flags=None): def update(self, key, value, flags=None):
"""
Update or insert a key-value pair in the map.
Args:
key: The key to update
value: The new value
flags: Optional flags for update behavior
Raises:
KeyError: If the key is not found in the map
"""
if key in self.entries: if key in self.entries:
self.entries[key] = value self.entries[key] = value
else: else:
@ -27,25 +79,76 @@ class HashMap:
class PerfEventArray: class PerfEventArray:
"""
A BPF perf event array for sending data to userspace.
This is a type hint class used during compilation.
"""
def __init__(self, key_size, value_size): def __init__(self, key_size, value_size):
"""
Initialize a PerfEventArray definition.
Args:
key_size: The size/type for keys
value_size: The size/type for values
"""
self.key_type = key_size self.key_type = key_size
self.value_type = value_size self.value_type = value_size
self.entries = {} self.entries = {}
def output(self, data): def output(self, data):
"""
Output data to the perf event array.
Args:
data: The data to output
"""
pass # Placeholder for output method pass # Placeholder for output method
class RingBuf: class RingBuf:
"""
A BPF ring buffer for efficient data transfer to userspace.
This is a type hint class used during compilation.
"""
def __init__(self, max_entries): def __init__(self, max_entries):
"""
Initialize a RingBuf definition.
Args:
max_entries: Maximum number of entries the ring buffer can hold
"""
self.max_entries = max_entries self.max_entries = max_entries
def reserve(self, size: int, flags=0): def reserve(self, size: int, flags=0):
"""
Reserve space in the ring buffer.
Args:
size: Size in bytes to reserve
flags: Optional reservation flags
Returns:
0 as a placeholder (actual implementation is in BPF runtime)
Raises:
ValueError: If size exceeds max_entries
"""
if size > self.max_entries: if size > self.max_entries:
raise ValueError("size cannot be greater than set maximum entries") raise ValueError("size cannot be greater than set maximum entries")
return 0 return 0
def submit(self, data, flags=0): def submit(self, data, flags=0):
"""
Submit data to the ring buffer.
Args:
data: The data to submit
flags: Optional submission flags
"""
pass pass
# add discard, output and also give names to flags and stuff # add discard, output and also give names to flags and stuff

View File

@ -1,9 +1,16 @@
"""
BPF map processing and LLVM IR generation.
This module handles the processing of BPF map definitions decorated with @map,
converting them to appropriate LLVM IR global variables with BTF debug info.
"""
import ast import ast
from logging import Logger from logging import Logger
from llvmlite import ir from llvmlite import ir
from enum import Enum from enum import Enum
from .maps_utils import MapProcessorRegistry from .maps_utils import MapProcessorRegistry
from ..debuginfo import DebugInfoGenerator from pythonbpf.debuginfo import DebugInfoGenerator
import logging import logging
logger: Logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
@ -20,6 +27,15 @@ def maps_proc(tree, module, chunks):
def is_map(func_node): def is_map(func_node):
"""
Check if a function node is decorated with @map.
Args:
func_node: The AST function node to check
Returns:
True if the function is decorated with @map, False otherwise
"""
return any( return any(
isinstance(decorator, ast.Name) and decorator.id == "map" isinstance(decorator, ast.Name) and decorator.id == "map"
for decorator in func_node.decorator_list for decorator in func_node.decorator_list
@ -27,6 +43,7 @@ def is_map(func_node):
class BPFMapType(Enum): class BPFMapType(Enum):
"""Enumeration of BPF map types."""
UNSPEC = 0 UNSPEC = 0
HASH = 1 HASH = 1
ARRAY = 2 ARRAY = 2
@ -65,7 +82,17 @@ class BPFMapType(Enum):
def create_bpf_map(module, map_name, map_params): def create_bpf_map(module, map_name, map_params):
"""Create a BPF map in the module with given parameters and debug info""" """
Create a BPF map in the module with given parameters and debug info.
Args:
module: The LLVM IR module to add the map to
map_name: The name of the BPF map
map_params: Dictionary of map parameters (type, key_size, value_size, max_entries)
Returns:
The created global variable representing the map
"""
# Create the anonymous struct type for BPF map # Create the anonymous struct type for BPF map
map_struct_type = ir.LiteralStructType( map_struct_type = ir.LiteralStructType(
@ -278,9 +305,7 @@ def process_bpf_map(func_node, module):
if handler: if handler:
return handler(map_name, rval, module) return handler(map_name, rval, module)
else: else:
logger.warning( logger.warning(f"Unknown map type {rval.func.id}, defaulting to HashMap")
f"Unknown map type " f"{rval.func.id}, defaulting to HashMap"
)
return process_hash_map(map_name, rval, module) return process_hash_map(map_name, rval, module)
else: else:
raise ValueError("Function under @map must return a map") raise ValueError("Function under @map must return a map")

View File

@ -1,3 +1,5 @@
"""Registry for BPF map processor functions."""
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
@ -12,6 +14,7 @@ class MapProcessorRegistry:
"""Decorator to register a processor function for a map type""" """Decorator to register a processor function for a map type"""
def decorator(func): def decorator(func):
"""Decorator that registers the processor function."""
cls._processors[map_type_name] = func cls._processors[map_type_name] = func
return func return func

View File

@ -1,3 +1,5 @@
"""Struct processing for BPF programs."""
from .structs_pass import structs_proc from .structs_pass import structs_proc
__all__ = ["structs_proc"] __all__ = ["structs_proc"]

View File

@ -1,19 +1,72 @@
"""
Struct type wrapper for BPF structs.
This module provides a wrapper class for LLVM IR struct types with
helper methods for field access and manipulation.
"""
from llvmlite import ir from llvmlite import ir
class StructType: class StructType:
"""
Wrapper class for LLVM IR struct types with field access helpers.
Attributes:
ir_type: The LLVM IR struct type
fields: Dictionary mapping field names to their types
size: Total size of the struct in bytes
"""
def __init__(self, ir_type, fields, size): def __init__(self, ir_type, fields, size):
"""
Initialize a StructType.
Args:
ir_type: The LLVM IR struct type
fields: Dictionary mapping field names to their types
size: Total size of the struct in bytes
"""
self.ir_type = ir_type self.ir_type = ir_type
self.fields = fields self.fields = fields
self.size = size self.size = size
def field_idx(self, field_name): def field_idx(self, field_name):
"""
Get the index of a field in the struct.
Args:
field_name: The name of the field
Returns:
The zero-based index of the field
"""
return list(self.fields.keys()).index(field_name) return list(self.fields.keys()).index(field_name)
def field_type(self, field_name): def field_type(self, field_name):
"""
Get the LLVM IR type of a field.
Args:
field_name: The name of the field
Returns:
The LLVM IR type of the field
"""
return self.fields[field_name] return self.fields[field_name]
def gep(self, builder, ptr, field_name): def gep(self, builder, ptr, field_name):
"""
Generate a GEP (GetElementPtr) instruction to access a struct field.
Args:
builder: LLVM IR builder
ptr: Pointer to the struct
field_name: Name of the field to access
Returns:
A pointer to the field
"""
idx = self.field_idx(field_name) idx = self.field_idx(field_name)
return builder.gep( return builder.gep(
ptr, ptr,
@ -22,6 +75,18 @@ class StructType:
) )
def field_size(self, field_name): def field_size(self, field_name):
"""
Calculate the size of a field in bytes.
Args:
field_name: The name of the field
Returns:
The size of the field in bytes
Raises:
TypeError: If the field type is not supported
"""
fld = self.fields[field_name] fld = self.fields[field_name]
if isinstance(fld, ir.ArrayType): if isinstance(fld, ir.ArrayType):
return fld.count * (fld.element.width // 8) return fld.count * (fld.element.width // 8)

View File

@ -1,3 +1,10 @@
"""
BPF struct processing and LLVM IR type generation.
This module handles the processing of Python classes decorated with @struct,
converting them to LLVM IR struct types for use in BPF programs.
"""
import ast import ast
import logging import logging
from llvmlite import ir from llvmlite import ir
@ -26,6 +33,15 @@ def structs_proc(tree, module, chunks):
def is_bpf_struct(cls_node): def is_bpf_struct(cls_node):
"""
Check if a class node is decorated with @struct.
Args:
cls_node: The AST class node to check
Returns:
True if the class is decorated with @struct, False otherwise
"""
return any( return any(
isinstance(decorator, ast.Name) and decorator.id == "struct" isinstance(decorator, ast.Name) and decorator.id == "struct"
for decorator in cls_node.decorator_list for decorator in cls_node.decorator_list
@ -33,7 +49,16 @@ def is_bpf_struct(cls_node):
def process_bpf_struct(cls_node, module): def process_bpf_struct(cls_node, module):
"""Process a single BPF struct definition""" """
Process a single BPF struct definition and create its LLVM IR representation.
Args:
cls_node: The AST class node representing the struct
module: The LLVM IR module (not used in current implementation)
Returns:
A StructType object containing the struct's type information
"""
fields = parse_struct_fields(cls_node) fields = parse_struct_fields(cls_node)
field_types = list(fields.values()) field_types = list(fields.values())
@ -44,7 +69,18 @@ def process_bpf_struct(cls_node, module):
def parse_struct_fields(cls_node): def parse_struct_fields(cls_node):
"""Parse fields of a struct class node""" """
Parse fields of a struct class node.
Args:
cls_node: The AST class node representing the struct
Returns:
A dictionary mapping field names to their LLVM IR types
Raises:
TypeError: If a field has an unsupported type annotation
"""
fields = {} fields = {}
for item in cls_node.body: for item in cls_node.body:
@ -57,7 +93,18 @@ def parse_struct_fields(cls_node):
def get_type_from_ann(annotation): def get_type_from_ann(annotation):
"""Convert an AST annotation node to an LLVM IR type for struct fields""" """
Convert an AST annotation node to an LLVM IR type for struct fields.
Args:
annotation: The AST annotation node (e.g., c_int64, str(32))
Returns:
The corresponding LLVM IR type
Raises:
TypeError: If the annotation type is not supported
"""
if isinstance(annotation, ast.Call) and isinstance(annotation.func, ast.Name): if isinstance(annotation, ast.Call) and isinstance(annotation.func, ast.Name):
if annotation.func.id == "str": if annotation.func.id == "str":
# Char array # Char array
@ -72,7 +119,15 @@ def get_type_from_ann(annotation):
def calc_struct_size(field_types): def calc_struct_size(field_types):
"""Calculate total size of the struct with alignment and padding""" """
Calculate total size of the struct with alignment and padding.
Args:
field_types: List of LLVM IR types for each field
Returns:
The total size of the struct in bytes
"""
curr_offset = 0 curr_offset = 0
for ftype in field_types: for ftype in field_types:
if isinstance(ftype, ir.IntType): if isinstance(ftype, ir.IntType):

View File

@ -1,24 +1,56 @@
"""
Type mapping from Python ctypes to LLVM IR types.
This module provides utilities to convert Python ctypes type names
to their corresponding LLVM IR representations.
"""
from llvmlite import ir from llvmlite import ir
# TODO: THIS IS NOT SUPPOSED TO MATCH STRINGS :skull: # TODO: THIS IS NOT SUPPOSED TO MATCH STRINGS :skull:
mapping = {
"c_int8": ir.IntType(8),
"c_uint8": ir.IntType(8),
"c_int16": ir.IntType(16),
"c_uint16": ir.IntType(16),
"c_int32": ir.IntType(32),
"c_uint32": ir.IntType(32),
"c_int64": ir.IntType(64),
"c_uint64": ir.IntType(64),
"c_float": ir.FloatType(),
"c_double": ir.DoubleType(),
"c_void_p": ir.IntType(64),
# Not so sure about this one
"str": ir.PointerType(ir.IntType(8)),
}
def ctypes_to_ir(ctype: str): def ctypes_to_ir(ctype: str):
mapping = { """
"c_int8": ir.IntType(8), Convert a ctypes type name to its corresponding LLVM IR type.
"c_uint8": ir.IntType(8),
"c_int16": ir.IntType(16), Args:
"c_uint16": ir.IntType(16), ctype: String name of the ctypes type (e.g., 'c_int64', 'c_void_p')
"c_int32": ir.IntType(32),
"c_uint32": ir.IntType(32), Returns:
"c_int64": ir.IntType(64), The corresponding LLVM IR type
"c_uint64": ir.IntType(64),
"c_float": ir.FloatType(), Raises:
"c_double": ir.DoubleType(), NotImplementedError: If the ctype is not supported
"c_void_p": ir.IntType(64), """
# Not so sure about this one
"str": ir.PointerType(ir.IntType(8)),
}
if ctype in mapping: if ctype in mapping:
return mapping[ctype] return mapping[ctype]
raise NotImplementedError(f"No mapping for {ctype}") raise NotImplementedError(f"No mapping for {ctype}")
def is_ctypes(ctype: str) -> bool:
"""
Check if a given type name is a supported ctypes type.
Args:
ctype: String name of the type to check
Returns:
True if the type is a supported ctypes type, False otherwise
"""
return ctype in mapping

19
tests/c-form/kprobe.bpf.c Normal file
View File

@ -0,0 +1,19 @@
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
char LICENSE[] SEC("license") = "Dual BSD/GPL";
SEC("kprobe/do_unlinkat")
int kprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat created");
return 0;
}
SEC("kretprobe/do_unlinkat")
int kretprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat returned\n");
return 0;
}

View File

@ -0,0 +1,34 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
# NOTE: Decided against fixing this
# as a workaround is assigning the result of lookup to a variable
# and then using that variable in the if statement.
# Might fix in future.
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
if last.lookup(0) > 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!") if True else print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,34 @@
from pythonbpf import bpf, struct, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
# NOTE: Decided against fixing this
# as one workaround is to just check any field of the struct
# in the if statement. Ugly but works.
# Might fix in future.
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
if dat:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -4,6 +4,18 @@ from pythonbpf.maps import HashMap
from ctypes import c_void_p, c_int64 from ctypes import c_void_p, c_int64
# NOTE: I have decided to not fix this example for now.
# The issue is in line 31, where we are passing an expression.
# The update helper expects a pointer type. But the problem is
# that we must allocate the space for said pointer in the first
# basic block. As that usage is in a different basic block, we
# are unable to cast the expression to a pointer type. (as we never
# allocated space for it).
# Shall we change our space allocation logic? That allows users to
# spam the same helper with the same args, and still run out of
# stack space. So we consider this usage invalid for now.
# Might fix it later.
@bpf @bpf
@map @map
@ -14,12 +26,12 @@ def count() -> HashMap:
@bpf @bpf
@section("xdp") @section("xdp")
def hello_world(ctx: c_void_p) -> c_int64: def hello_world(ctx: c_void_p) -> c_int64:
prev = count().lookup(0) prev = count.lookup(0)
if prev: if prev:
count().update(0, prev + 1) count.update(0, prev + 1)
return XDP_PASS return XDP_PASS
else: else:
count().update(0, 1) count.update(0, 1)
return XDP_PASS return XDP_PASS

View File

@ -0,0 +1,40 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from pythonbpf.helper import XDP_PASS
from pythonbpf.maps import HashMap
from ctypes import c_void_p, c_int64
# NOTE: This example exposes the problems with our typing system.
# We can't do steps on line 25 and 27.
# prev is of type i64**. For prev + 1, we deref it down to i64
# To assign it back to prev, we need to go back to i64**.
# We cannot allocate space for the intermediate type now.
# We probably need to track the ref/deref chain for each variable.
@bpf
@map
def count() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1)
@bpf
@section("xdp")
def hello_world(ctx: c_void_p) -> c_int64:
prev = count.lookup(0)
if prev:
prev = prev + 1
count.update(0, prev)
return XDP_PASS
else:
count.update(0, 1)
return XDP_PASS
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,32 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
last.update(1, 2)
x = last.lookup(0)
y = last.lookup(1)
if x and y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,21 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if True:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,21 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if (0 + 1) * 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,21 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,30 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
# last.update(0, 1)
tsp = last.lookup(0)
if tsp:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,30 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
tsp = last.lookup(0)
if tsp > 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,30 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
# last.update(0, 1)
tsp = last.lookup(0)
if not tsp:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,32 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
# last.update(1, 2)
x = last.lookup(0)
y = last.lookup(1)
if x or y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,29 @@
from pythonbpf import bpf, struct, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
if dat.ts:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,23 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_int32
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
y = c_int32(0)
if x == y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,22 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
if x:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,22 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
if x * 1:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,22 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 2
if x > 3:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return 1 + 1 - 2
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,19 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
a = 2
return a - 2
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return True
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return 1
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,20 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int32
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int32:
print("Hello, World!")
a = 1 # int64
x = 1 # int64
return c_int32(a - x) # typecast to int32
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,18 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int32
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int32:
print("Hello, World!")
return c_int32(1)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,19 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int32
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int32:
print("Hello, World!")
a = 1 # int64
return c_int32(a) # typecast to int32
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,19 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
a = 1
return a
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,19 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
from pythonbpf.helper import XDP_PASS
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return XDP_PASS
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,20 @@
import logging
from pythonbpf import compile, bpf, section, bpfglobal
from ctypes import c_void_p, c_int64
@bpf
@section("sometag1")
def sometag(ctx: c_void_p) -> c_int64:
a = 1 - 1
return c_int64(a)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile(loglevel=logging.INFO)

380
tools/vmlinux-gen.py Executable file
View File

@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
BTF to Python ctypes Converter
Converts Linux kernel BTF (BPF Type Format) to Python ctypes definitions.
This tool automates the process of:
1. Dumping BTF from vmlinux
2. Preprocessing enum definitions
3. Processing struct kioctx to extract anonymous nested structs
4. Running C preprocessor
5. Converting to Python ctypes using clang2py
6. Post-processing the output
Requirements:
- bpftool
- clang
- ctypeslib2 (pip install ctypeslib2)
"""
import argparse
import os
import re
import subprocess
import sys
import tempfile
class BTFConverter:
def __init__(self, btf_source="/sys/kernel/btf/vmlinux", output_file="vmlinux.py",
keep_intermediate=False, verbose=False):
self.btf_source = btf_source
self.output_file = output_file
self.keep_intermediate = keep_intermediate
self.verbose = verbose
self.temp_dir = tempfile.mkdtemp() if not keep_intermediate else "."
def log(self, message):
"""Print message if verbose mode is enabled."""
if self.verbose:
print(f"[*] {message}")
def run_command(self, cmd, description):
"""Run a shell command and handle errors."""
self.log(f"{description}...")
try:
result = subprocess.run(
cmd,
shell=True,
check=True,
capture_output=True,
text=True
)
if self.verbose and result.stdout:
print(result.stdout)
return result
except subprocess.CalledProcessError as e:
print(f"Error during {description}:", file=sys.stderr)
print(e.stderr, file=sys.stderr)
sys.exit(1)
def step1_dump_btf(self):
"""Step 1: Dump BTF from vmlinux."""
vmlinux_h = os.path.join(self.temp_dir, "vmlinux.h")
cmd = f"bpftool btf dump file {self.btf_source} format c > {vmlinux_h}"
self.run_command(cmd, "Dumping BTF from vmlinux")
return vmlinux_h
def step2_preprocess_enums(self, input_file):
"""Step 1.5: Preprocess enum definitions."""
self.log("Preprocessing enum definitions...")
with open(input_file, 'r') as f:
original_code = f.read()
# Extract anonymous enums
enums = re.findall(
r'(?<!typedef\s)(enum\s*\{[^}]*\})\s*(\w+)\s*(?::\s*\d+)?\s*;',
original_code
)
enum_defs = [enum_block + ';' for enum_block, _ in enums]
# Replace anonymous enums with int declarations
processed_code = re.sub(
r'(?<!typedef\s)enum\s*\{[^}]*\}\s*(\w+)\s*(?::\s*\d+)?\s*;',
r'int \1;',
original_code
)
# Prepend enum definitions
if enum_defs:
enum_text = '\n'.join(enum_defs) + '\n\n'
processed_code = enum_text + processed_code
output_file = os.path.join(self.temp_dir, "vmlinux_processed.h")
with open(output_file, 'w') as f:
f.write(processed_code)
return output_file
def step2_5_process_kioctx(self, input_file):
#TODO: this is a very bad bug and design decision. A single struct has an issue mostly.
"""Step 2.5: Process struct kioctx to extract nested anonymous structs."""
self.log("Processing struct kioctx nested structs...")
with open(input_file, 'r') as f:
content = f.read()
# Pattern to match struct kioctx with its full body (handles multiple nesting levels)
kioctx_pattern = r'struct\s+kioctx\s*\{(?:[^{}]|\{(?:[^{}]|\{[^{}]*\})*\})*\}\s*;'
def process_kioctx_replacement(match):
full_struct = match.group(0)
self.log(f"Found struct kioctx, length: {len(full_struct)} chars")
# Extract the struct body (everything between outermost { and })
body_match = re.search(r'struct\s+kioctx\s*\{(.*)\}\s*;', full_struct, re.DOTALL)
if not body_match:
return full_struct
body = body_match.group(1)
# Find all anonymous structs within the body
# Pattern: struct { ... } followed by ; (not a member name)
anon_struct_pattern = r'struct\s*\{[^}]*\}'
anon_structs = []
anon_counter = 4 # Start from 4, counting down to 1
def replace_anonymous_struct(m):
nonlocal anon_counter
anon_struct_content = m.group(0)
# Extract the body of the anonymous struct
anon_body_match = re.search(r'struct\s*\{(.*)\}', anon_struct_content, re.DOTALL)
if not anon_body_match:
return anon_struct_content
anon_body = anon_body_match.group(1)
# Create the named struct definition
anon_name = f"__anon{anon_counter}"
member_name = f"a{anon_counter}"
# Store the struct definition
anon_structs.append(f"struct {anon_name} {{{anon_body}}};")
anon_counter -= 1
# Return the member declaration
return f"struct {anon_name} {member_name}"
# Process the body, finding and replacing anonymous structs
# We need to be careful to only match anonymous structs followed by ;
processed_body = body
# Find all occurrences and process them
pattern_with_semicolon = r'struct\s*\{([^}]*)\}\s*;'
matches = list(re.finditer(pattern_with_semicolon, body, re.DOTALL))
if not matches:
self.log("No anonymous structs found in kioctx")
return full_struct
self.log(f"Found {len(matches)} anonymous struct(s)")
# Process in reverse order to maintain string positions
for match in reversed(matches):
anon_struct_content = match.group(1)
start_pos = match.start()
end_pos = match.end()
# Create the named struct definition
anon_name = f"__anon{anon_counter}"
member_name = f"a{anon_counter}"
# Store the struct definition
anon_structs.insert(0, f"struct {anon_name} {{{anon_struct_content}}};")
# Replace in the body
replacement = f"struct {anon_name} {member_name};"
processed_body = processed_body[:start_pos] + replacement + processed_body[end_pos:]
anon_counter -= 1
# Rebuild the complete definition
if anon_structs:
# Prepend the anonymous struct definitions
anon_definitions = '\n'.join(anon_structs) + '\n\n'
new_struct = f"struct kioctx {{{processed_body}}};"
return anon_definitions + new_struct
else:
return full_struct
# Apply the transformation
processed_content = re.sub(
kioctx_pattern,
process_kioctx_replacement,
content,
flags=re.DOTALL
)
output_file = os.path.join(self.temp_dir, "vmlinux_kioctx_processed.h")
with open(output_file, 'w') as f:
f.write(processed_content)
self.log(f"Saved kioctx-processed output to {output_file}")
return output_file
def step3_run_preprocessor(self, input_file):
"""Step 2: Run C preprocessor."""
output_file = os.path.join(self.temp_dir, "vmlinux.i")
cmd = f"clang -E {input_file} > {output_file}"
self.run_command(cmd, "Running C preprocessor")
return output_file
def step4_convert_to_ctypes(self, input_file):
"""Step 3: Convert to Python ctypes using clang2py."""
output_file = os.path.join(self.temp_dir, "vmlinux_raw.py")
cmd = (
f"clang2py {input_file} -o {output_file} "
f"--clang-args=\"-fno-ms-extensions -I/usr/include -I/usr/include/linux\""
)
self.run_command(cmd, "Converting to Python ctypes")
return output_file
def step5_postprocess(self, input_file):
"""Step 4: Post-process the generated Python file."""
self.log("Post-processing Python ctypes definitions...")
with open(input_file, "r") as f:
data = f.read()
# Remove lines like ('_45', ctypes.c_int64, 0)
data = re.sub(r"\('_[0-9]+',\s*ctypes\.[a-zA-Z0-9_]+,\s*0\),?\s*\n?", "", data)
# Replace ('_20', ctypes.c_uint64, 64) → ('_20', ctypes.c_uint64)
data = re.sub(r"\('(_[0-9]+)',\s*(ctypes\.[a-zA-Z0-9_]+),\s*[0-9]+\)", r"('\1', \2)", data)
# Replace ('_20', ctypes.c_char, 8) with ('_20', ctypes.c_uint8, 8)
data = re.sub(
r"(ctypes\.c_char)(\s*,\s*\d+\))",
r"ctypes.c_uint8\2",
data
)
# below to replace those c_bool with bitfield greater than 8
def repl(m):
name, bits = m.groups()
return f"('{name}', ctypes.c_uint32, {bits})" if int(bits) > 8 else m.group(0)
data = re.sub(
r"\('([^']+)',\s*ctypes\.c_bool,\s*(\d+)\)",
repl,
data
)
# Remove ctypes. prefix from invalid entries
invalid_ctypes = ["bpf_iter_state", "_cache_type", "fs_context_purpose"]
for name in invalid_ctypes:
data = re.sub(rf"\bctypes\.{name}\b", name, data)
with open(self.output_file, "w") as f:
f.write(data)
self.log(f"Saved final output to {self.output_file}")
def cleanup(self):
"""Remove temporary files if not keeping them."""
if not self.keep_intermediate and self.temp_dir != ".":
self.log(f"Cleaning up temporary directory: {self.temp_dir}")
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def convert(self):
"""Run the complete conversion pipeline."""
try:
self.log("Starting BTF to Python ctypes conversion...")
# Check dependencies
self.check_dependencies()
# Run conversion pipeline
vmlinux_h = self.step1_dump_btf()
vmlinux_processed_h = self.step2_preprocess_enums(vmlinux_h)
vmlinux_kioctx_h = self.step2_5_process_kioctx(vmlinux_processed_h)
vmlinux_i = self.step3_run_preprocessor(vmlinux_kioctx_h)
vmlinux_raw_py = self.step4_convert_to_ctypes(vmlinux_i)
self.step5_postprocess(vmlinux_raw_py)
print(f"\n✓ Conversion complete! Output saved to: {self.output_file}")
except Exception as e:
print(f"\n✗ Error during conversion: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
finally:
self.cleanup()
def check_dependencies(self):
"""Check if required tools are available."""
self.log("Checking dependencies...")
dependencies = {
"bpftool": "bpftool --version",
"clang": "clang --version",
"clang2py": "clang2py --version"
}
missing = []
for tool, cmd in dependencies.items():
try:
subprocess.run(
cmd,
shell=True,
check=True,
capture_output=True
)
except subprocess.CalledProcessError:
missing.append(tool)
if missing:
print("Error: Missing required dependencies:", file=sys.stderr)
for tool in missing:
print(f" - {tool}", file=sys.stderr)
if "clang2py" in missing:
print("\nInstall ctypeslib2: pip install ctypeslib2", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="Convert Linux kernel BTF to Python ctypes definitions",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s
%(prog)s -o kernel_types.py
%(prog)s --btf-source /sys/kernel/btf/custom_module -k -v
"""
)
parser.add_argument(
"--btf-source",
default="/sys/kernel/btf/vmlinux",
help="Path to BTF source (default: /sys/kernel/btf/vmlinux)"
)
parser.add_argument(
"-o", "--output",
default="vmlinux.py",
help="Output Python file (default: vmlinux.py)"
)
parser.add_argument(
"-k", "--keep-intermediate",
action="store_true",
help="Keep intermediate files (vmlinux.h, vmlinux_processed.h, etc.)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
converter = BTFConverter(
btf_source=args.btf_source,
output_file=args.output,
keep_intermediate=args.keep_intermediate,
verbose=args.verbose
)
converter.convert()
if __name__ == "__main__":
main()