Compare commits

..

12 Commits

Author SHA1 Message Date
b95fbd0ed0 rollback example 2025-10-08 14:53:51 +05:30
d84ce0c6fa update helpers and change examples. 2025-10-08 13:57:09 +05:30
8d07a4cd05 add xdp struct to args
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 11:40:12 +05:30
1207730ce3 update vmlinux.py 2025-10-08 05:27:56 +05:30
0d9dcd122c Merge pull request #27 from pythonbpf/vmlinux
Add vmlinux transpiler from experiments
2025-10-08 05:19:44 +05:30
8a69e05ee2 fix duplicate variable in example
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 05:18:49 +05:30
976af290af Revert "format chore"
This reverts commit a3443ab1d5.
2025-10-08 05:17:59 +05:30
a3443ab1d5 format chore
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 05:16:36 +05:30
a27360482b complete vmlinux transpiler.
TODO: struct_kioctx for x86_64 vmlinux.h has anonymous structs that refused to transpile well, so an extra rule has been written to make only the structs of that external. Fix this in the future.
2025-10-08 05:15:29 +05:30
c423cc647d add vmlinux.py transpiler from experiment repository
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-08 00:45:30 +05:30
8554688230 Merge pull request #25 from pythonbpf/dependabot/github_actions/actions-6a14be197d
Bump the actions group with 2 updates
2025-10-06 19:32:01 +05:30
3e873f378e Bump the actions group with 2 updates
Bumps the actions group with 2 updates: [actions/checkout](https://github.com/actions/checkout) and [actions/setup-python](https://github.com/actions/setup-python).


Updates `actions/checkout` from 4 to 5
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

Updates `actions/setup-python` from 5 to 6
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
- dependency-name: actions/setup-python
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-06 11:26:23 +00:00
32 changed files with 546 additions and 204166 deletions

View File

@ -12,8 +12,8 @@ jobs:
name: Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: "3.x"
- uses: pre-commit/action@v3.0.1

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ __pycache__/
*.ll
*.o
.ipynb_checkpoints/
vmlinux.py

27
examples/kprobes.py Normal file
View File

@ -0,0 +1,27 @@
from pythonbpf import bpf, section, bpfglobal, BPF
from ctypes import c_void_p, c_int64
@bpf
@section("kretprobe/do_unlinkat")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return c_int64(0)
@bpf
@section("kprobe/do_unlinkat")
def hello_world2(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
b = BPF()
b.load_and_attach()
while True:
print("running")
# Now cat /sys/kernel/debug/tracing/trace_pipe to see results of unlink kprobe.

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from pythonbpf import bpf, map, section, bpfglobal, compile, compile_to_ir
from pythonbpf.helper import XDP_PASS
from pythonbpf.maps import HashMap
from ctypes import c_int64, c_void_p
from ctypes import c_void_p, c_int64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
@ -40,5 +40,5 @@ def hello_world(ctx: c_void_p) -> c_int64:
def LICENSE() -> str:
return "GPL"
compile_to_ir("xdp_pass.py", "xdp_pass.ll")
compile()

View File

@ -1,4 +0,0 @@
from .expr_pass import eval_expr, handle_expr
from .type_normalization import convert_to_bool
__all__ = ["eval_expr", "handle_expr", "convert_to_bool"]

View File

@ -1,128 +0,0 @@
from llvmlite import ir
import logging
import ast
logger = logging.getLogger(__name__)
COMPARISON_OPS = {
ast.Eq: "==",
ast.NotEq: "!=",
ast.Lt: "<",
ast.LtE: "<=",
ast.Gt: ">",
ast.GtE: ">=",
ast.Is: "==",
ast.IsNot: "!=",
}
def _get_base_type_and_depth(ir_type):
"""Get the base type for pointer types."""
cur_type = ir_type
depth = 0
while isinstance(cur_type, ir.PointerType):
depth += 1
cur_type = cur_type.pointee
return cur_type, depth
def _deref_to_depth(func, builder, val, target_depth):
"""Dereference a pointer to a certain depth."""
cur_val = val
cur_type = val.type
for depth in range(target_depth):
if not isinstance(val.type, ir.PointerType):
logger.error("Cannot dereference further, non-pointer type")
return None
# dereference with null check
pointee_type = cur_type.pointee
null_check_block = builder.block
not_null_block = func.append_basic_block(name=f"deref_not_null_{depth}")
merge_block = func.append_basic_block(name=f"deref_merge_{depth}")
null_ptr = ir.Constant(cur_type, None)
is_not_null = builder.icmp_signed("!=", cur_val, null_ptr)
logger.debug(f"Inserted null check for pointer at depth {depth}")
builder.cbranch(is_not_null, not_null_block, merge_block)
builder.position_at_end(not_null_block)
dereferenced_val = builder.load(cur_val)
logger.debug(f"Dereferenced to depth {depth - 1}, type: {pointee_type}")
builder.branch(merge_block)
builder.position_at_end(merge_block)
phi = builder.phi(pointee_type, name=f"deref_result_{depth}")
zero_value = (
ir.Constant(pointee_type, 0)
if isinstance(pointee_type, ir.IntType)
else ir.Constant(pointee_type, None)
)
phi.add_incoming(zero_value, null_check_block)
phi.add_incoming(dereferenced_val, not_null_block)
# Continue with phi result
cur_val = phi
cur_type = pointee_type
return cur_val
def _normalize_types(func, builder, lhs, rhs):
"""Normalize types for comparison."""
logger.info(f"Normalizing types: {lhs.type} vs {rhs.type}")
if isinstance(lhs.type, ir.IntType) and isinstance(rhs.type, ir.IntType):
if lhs.type.width < rhs.type.width:
lhs = builder.sext(lhs, rhs.type)
else:
rhs = builder.sext(rhs, lhs.type)
return lhs, rhs
elif not isinstance(lhs.type, ir.PointerType) and not isinstance(
rhs.type, ir.PointerType
):
logger.error(f"Type mismatch: {lhs.type} vs {rhs.type}")
return None, None
else:
lhs_base, lhs_depth = _get_base_type_and_depth(lhs.type)
rhs_base, rhs_depth = _get_base_type_and_depth(rhs.type)
if lhs_base == rhs_base:
if lhs_depth < rhs_depth:
rhs = _deref_to_depth(func, builder, rhs, rhs_depth - lhs_depth)
elif rhs_depth < lhs_depth:
lhs = _deref_to_depth(func, builder, lhs, lhs_depth - rhs_depth)
return _normalize_types(func, builder, lhs, rhs)
def convert_to_bool(builder, val):
"""Convert a value to boolean."""
if val.type == ir.IntType(1):
return val
if isinstance(val.type, ir.PointerType):
zero = ir.Constant(val.type, None)
else:
zero = ir.Constant(val.type, 0)
return builder.icmp_signed("!=", val, zero)
def handle_comparator(func, builder, op, lhs, rhs):
"""Handle comparison operations."""
if lhs.type != rhs.type:
lhs, rhs = _normalize_types(func, builder, lhs, rhs)
if lhs is None or rhs is None:
return None
if type(op) not in COMPARISON_OPS:
logger.error(f"Unsupported comparison operator: {type(op)}")
return None
predicate = COMPARISON_OPS[type(op)]
result = builder.icmp_signed(predicate, lhs, rhs)
logger.debug(f"Comparison result: {result}")
return result, ir.IntType(1)

View File

@ -4,8 +4,7 @@ from logging import Logger
import logging
from typing import Dict
from pythonbpf.type_deducer import ctypes_to_ir, is_ctypes
from .type_normalization import convert_to_bool, handle_comparator
from .type_deducer import ctypes_to_ir, is_ctypes
logger: Logger = logging.getLogger(__name__)
@ -23,10 +22,12 @@ def _handle_name_expr(expr: ast.Name, local_sym_tab: Dict, builder: ir.IRBuilder
def _handle_constant_expr(expr: ast.Constant):
"""Handle ast.Constant expressions."""
if isinstance(expr.value, int) or isinstance(expr.value, bool):
return ir.Constant(ir.IntType(64), int(expr.value)), ir.IntType(64)
if isinstance(expr.value, int):
return ir.Constant(ir.IntType(64), expr.value), ir.IntType(64)
elif isinstance(expr.value, bool):
return ir.Constant(ir.IntType(1), int(expr.value)), ir.IntType(1)
else:
logger.error("Unsupported constant type")
logger.info("Unsupported constant type")
return None
@ -44,6 +45,7 @@ def _handle_attribute_expr(
var_ptr, var_type, var_metadata = local_sym_tab[var_name]
logger.info(f"Loading attribute {attr_name} from variable {var_name}")
logger.info(f"Variable type: {var_type}, Variable ptr: {var_ptr}")
metadata = structs_sym_tab[var_metadata]
if attr_name in metadata.fields:
gep = metadata.gep(builder, var_ptr, attr_name)
@ -130,199 +132,6 @@ def _handle_ctypes_call(
return val
def _handle_compare(
func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab=None
):
"""Handle ast.Compare expressions."""
if len(cond.ops) != 1 or len(cond.comparators) != 1:
logger.error("Only single comparisons are supported")
return None
lhs = eval_expr(
func,
module,
builder,
cond.left,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
rhs = eval_expr(
func,
module,
builder,
cond.comparators[0],
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if lhs is None or rhs is None:
logger.error("Failed to evaluate comparison operands")
return None
lhs, _ = lhs
rhs, _ = rhs
return handle_comparator(func, builder, cond.ops[0], lhs, rhs)
def _handle_unary_op(
func,
module,
builder,
expr: ast.UnaryOp,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""Handle ast.UnaryOp expressions."""
if not isinstance(expr.op, ast.Not):
logger.error("Only 'not' unary operator is supported")
return None
operand = eval_expr(
func, module, builder, expr.operand, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand is None:
logger.error("Failed to evaluate operand for unary operation")
return None
operand_val, operand_type = operand
true_const = ir.Constant(ir.IntType(1), 1)
result = builder.xor(convert_to_bool(builder, operand_val), true_const)
return result, ir.IntType(1)
def _handle_and_op(func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab):
"""Handle `and` boolean operations."""
logger.debug(f"Handling 'and' operator with {len(expr.values)} operands")
merge_block = func.append_basic_block(name="and.merge")
false_block = func.append_basic_block(name="and.false")
incoming_values = []
for i, value in enumerate(expr.values):
is_last = i == len(expr.values) - 1
# Evaluate current operand
operand_result = eval_expr(
func, None, builder, value, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand_result is None:
logger.error(f"Failed to evaluate operand {i} in 'and' expression")
return None
operand_val, operand_type = operand_result
# Convert to boolean if needed
operand_bool = convert_to_bool(builder, operand_val)
current_block = builder.block
if is_last:
# Last operand: result is this value
builder.branch(merge_block)
incoming_values.append((operand_bool, current_block))
else:
# Not last: check if true, continue or short-circuit
next_check = func.append_basic_block(name=f"and.check_{i + 1}")
builder.cbranch(operand_bool, next_check, false_block)
builder.position_at_end(next_check)
# False block: short-circuit with false
builder.position_at_end(false_block)
builder.branch(merge_block)
false_value = ir.Constant(ir.IntType(1), 0)
incoming_values.append((false_value, false_block))
# Merge block: phi node
builder.position_at_end(merge_block)
phi = builder.phi(ir.IntType(1), name="and.result")
for val, block in incoming_values:
phi.add_incoming(val, block)
logger.debug(f"Generated 'and' with {len(incoming_values)} incoming values")
return phi, ir.IntType(1)
def _handle_or_op(func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab):
"""Handle `or` boolean operations."""
logger.debug(f"Handling 'or' operator with {len(expr.values)} operands")
merge_block = func.append_basic_block(name="or.merge")
true_block = func.append_basic_block(name="or.true")
incoming_values = []
for i, value in enumerate(expr.values):
is_last = i == len(expr.values) - 1
# Evaluate current operand
operand_result = eval_expr(
func, None, builder, value, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand_result is None:
logger.error(f"Failed to evaluate operand {i} in 'or' expression")
return None
operand_val, operand_type = operand_result
# Convert to boolean if needed
operand_bool = convert_to_bool(builder, operand_val)
current_block = builder.block
if is_last:
# Last operand: result is this value
builder.branch(merge_block)
incoming_values.append((operand_bool, current_block))
else:
# Not last: check if false, continue or short-circuit
next_check = func.append_basic_block(name=f"or.check_{i + 1}")
builder.cbranch(operand_bool, true_block, next_check)
builder.position_at_end(next_check)
# True block: short-circuit with true
builder.position_at_end(true_block)
builder.branch(merge_block)
true_value = ir.Constant(ir.IntType(1), 1)
incoming_values.append((true_value, true_block))
# Merge block: phi node
builder.position_at_end(merge_block)
phi = builder.phi(ir.IntType(1), name="or.result")
for val, block in incoming_values:
phi.add_incoming(val, block)
logger.debug(f"Generated 'or' with {len(incoming_values)} incoming values")
return phi, ir.IntType(1)
def _handle_boolean_op(
func,
module,
builder,
expr: ast.BoolOp,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
"""Handle `and` and `or` boolean operations."""
if isinstance(expr.op, ast.And):
return _handle_and_op(
func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr.op, ast.Or):
return _handle_or_op(
func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
else:
logger.error(f"Unsupported boolean operator: {type(expr.op).__name__}")
return None
def eval_expr(
func,
module,
@ -403,18 +212,6 @@ def eval_expr(
from pythonbpf.binary_ops import handle_binary_op
return handle_binary_op(expr, builder, None, local_sym_tab)
elif isinstance(expr, ast.Compare):
return _handle_compare(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr, ast.UnaryOp):
return _handle_unary_op(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
elif isinstance(expr, ast.BoolOp):
return _handle_boolean_op(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab
)
logger.info("Unsupported expression evaluation")
return None

View File

@ -7,7 +7,7 @@ from dataclasses import dataclass
from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
from pythonbpf.type_deducer import ctypes_to_ir
from pythonbpf.binary_ops import handle_binary_op
from pythonbpf.expr import eval_expr, handle_expr, convert_to_bool
from pythonbpf.expr_pass import eval_expr, handle_expr
from .return_utils import _handle_none_return, _handle_xdp_return, _is_xdp_name
@ -240,13 +240,71 @@ def handle_assign(
logger.info("Unsupported assignment value type")
def handle_cond(
func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab=None
):
val = eval_expr(
func, module, builder, cond, local_sym_tab, map_sym_tab, structs_sym_tab
)[0]
return convert_to_bool(builder, val)
def handle_cond(func, module, builder, cond, local_sym_tab, map_sym_tab):
if isinstance(cond, ast.Constant):
if isinstance(cond.value, bool):
return ir.Constant(ir.IntType(1), int(cond.value))
elif isinstance(cond.value, int):
return ir.Constant(ir.IntType(1), int(bool(cond.value)))
else:
logger.info("Unsupported constant type in condition")
return None
elif isinstance(cond, ast.Name):
if cond.id in local_sym_tab:
var = local_sym_tab[cond.id].var
val = builder.load(var)
if val.type != ir.IntType(1):
# Convert nonzero values to true, zero to false
if isinstance(val.type, ir.PointerType):
# For pointer types, compare with null pointer
zero = ir.Constant(val.type, None)
else:
# For integer types, compare with zero
zero = ir.Constant(val.type, 0)
val = builder.icmp_signed("!=", val, zero)
return val
else:
logger.info(f"Undefined variable {cond.id} in condition")
return None
elif isinstance(cond, ast.Compare):
lhs = eval_expr(func, module, builder, cond.left, local_sym_tab, map_sym_tab)[0]
if len(cond.ops) != 1 or len(cond.comparators) != 1:
logger.info("Unsupported complex comparison")
return None
rhs = eval_expr(
func, module, builder, cond.comparators[0], local_sym_tab, map_sym_tab
)[0]
op = cond.ops[0]
if lhs.type != rhs.type:
if isinstance(lhs.type, ir.IntType) and isinstance(rhs.type, ir.IntType):
# Extend the smaller type to the larger type
if lhs.type.width < rhs.type.width:
lhs = builder.sext(lhs, rhs.type)
elif lhs.type.width > rhs.type.width:
rhs = builder.sext(rhs, lhs.type)
else:
logger.info("Type mismatch in comparison")
return None
if isinstance(op, ast.Eq):
return builder.icmp_signed("==", lhs, rhs)
elif isinstance(op, ast.NotEq):
return builder.icmp_signed("!=", lhs, rhs)
elif isinstance(op, ast.Lt):
return builder.icmp_signed("<", lhs, rhs)
elif isinstance(op, ast.LtE):
return builder.icmp_signed("<=", lhs, rhs)
elif isinstance(op, ast.Gt):
return builder.icmp_signed(">", lhs, rhs)
elif isinstance(op, ast.GtE):
return builder.icmp_signed(">=", lhs, rhs)
else:
logger.info("Unsupported comparison operator")
return None
else:
logger.info("Unsupported condition expression")
return None
def handle_if(
@ -262,9 +320,7 @@ def handle_if(
else:
else_block = None
cond = handle_cond(
func, module, builder, stmt.test, local_sym_tab, map_sym_tab, structs_sym_tab
)
cond = handle_cond(func, module, builder, stmt.test, local_sym_tab, map_sym_tab)
if else_block:
builder.cbranch(cond, then_block, else_block)
else:

View File

@ -3,7 +3,7 @@ import logging
from collections.abc import Callable
from llvmlite import ir
from pythonbpf.expr import eval_expr
from pythonbpf.expr_pass import eval_expr
logger = logging.getLogger(__name__)

View File

@ -15,5 +15,8 @@ def deref(ptr):
return result if result is not None else 0
XDP_ABORTED = ctypes.c_int64(0)
XDP_DROP = ctypes.c_int64(1)
XDP_PASS = ctypes.c_int64(2)
XDP_TX = ctypes.c_int64(3)
XDP_REDIRECT = ctypes.c_int64(4)

View File

@ -3,7 +3,7 @@ from logging import Logger
from llvmlite import ir
from enum import Enum
from .maps_utils import MapProcessorRegistry
from pythonbpf.debuginfo import DebugInfoGenerator
from ..debuginfo import DebugInfoGenerator
import logging
logger: Logger = logging.getLogger(__name__)

View File

@ -1,11 +1,10 @@
#include <linux/bpf.h>
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#define u64 unsigned long long
#define u32 unsigned int
#include <bpf/bpf_endian.h>
SEC("xdp")
int hello(struct xdp_md *ctx) {
bpf_printk("Hello, World!\n");
bpf_printk("Hello, World! %ud \n", ctx->data);
return XDP_PASS;
}

19
tests/c-form/kprobe.bpf.c Normal file
View File

@ -0,0 +1,19 @@
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
char LICENSE[] SEC("license") = "Dual BSD/GPL";
SEC("kprobe/do_unlinkat")
int kprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat created");
return 0;
}
SEC("kretprobe/do_unlinkat")
int kretprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat returned\n");
return 0;
}

View File

@ -1,34 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
# NOTE: Decided against fixing this
# as a workaround is assigning the result of lookup to a variable
# and then using that variable in the if statement.
# Might fix in future.
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
if last.lookup(0) > 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,34 +0,0 @@
from pythonbpf import bpf, struct, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
# NOTE: Decided against fixing this
# as one workaround is to just check any field of the struct
# in the if statement. Ugly but works.
# Might fix in future.
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
if dat:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,43 @@
from pythonbpf import bpf, map, section, bpfglobal, compile, compile_to_ir
from pythonbpf.maps import HashMap
from vmlinux import struct_xdp_md, XDP_PASS
from ctypes import c_int64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
# 2. Run the program: python examples/xdp_pass.py
# 3. Run the program with sudo: sudo tools/check.sh run examples/xdp_pass.o
# 4. Attach object file to any network device with something like ./check.sh xdp examples/xdp_pass.o tailscale0
# 5. send traffic through the device and observe effects
@bpf
@map
def count() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1)
@bpf
@section("xdp")
def hello_world(ctx: struct_xdp_md) -> c_int64:
key = 0
one = 1
prev = count().lookup(key)
if prev:
prevval = prev + 1
print(f"count: {prevval}")
count().update(key, prevval)
return XDP_PASS
else:
count().update(key, one)
return XDP_PASS
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("xdp_pass.py", "xdp_pass.ll")
compile()

View File

@ -1,32 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
last.update(1, 2)
x = last.lookup(0)
y = last.lookup(1)
if x and y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,21 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if True:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,21 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if (0 + 1) * 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,21 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
if 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,30 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
# last.update(0, 1)
tsp = last.lookup(0)
if tsp:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,30 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
tsp = last.lookup(0)
if tsp > 0:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,30 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
# last.update(0, 1)
tsp = last.lookup(0)
if not tsp:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,32 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
# last.update(1, 2)
x = last.lookup(0)
y = last.lookup(1)
if x or y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,29 +0,0 @@
from pythonbpf import bpf, struct, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
if dat.ts:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,23 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_int32
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
y = c_int32(0)
if x == y:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,22 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
if x:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,22 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 0
if x * 1:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,22 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 2
if x > 3:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,18 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return True
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

369
tools/vmlinux-gen.py Executable file
View File

@ -0,0 +1,369 @@
#!/usr/bin/env python3
"""
BTF to Python ctypes Converter
Converts Linux kernel BTF (BPF Type Format) to Python ctypes definitions.
This tool automates the process of:
1. Dumping BTF from vmlinux
2. Preprocessing enum definitions
3. Processing struct kioctx to extract anonymous nested structs
4. Running C preprocessor
5. Converting to Python ctypes using clang2py
6. Post-processing the output
Requirements:
- bpftool
- clang
- ctypeslib2 (pip install ctypeslib2)
"""
import argparse
import os
import re
import subprocess
import sys
import tempfile
class BTFConverter:
def __init__(self, btf_source="/sys/kernel/btf/vmlinux", output_file="vmlinux.py",
keep_intermediate=False, verbose=False):
self.btf_source = btf_source
self.output_file = output_file
self.keep_intermediate = keep_intermediate
self.verbose = verbose
self.temp_dir = tempfile.mkdtemp() if not keep_intermediate else "."
def log(self, message):
"""Print message if verbose mode is enabled."""
if self.verbose:
print(f"[*] {message}")
def run_command(self, cmd, description):
"""Run a shell command and handle errors."""
self.log(f"{description}...")
try:
result = subprocess.run(
cmd,
shell=True,
check=True,
capture_output=True,
text=True
)
if self.verbose and result.stdout:
print(result.stdout)
return result
except subprocess.CalledProcessError as e:
print(f"Error during {description}:", file=sys.stderr)
print(e.stderr, file=sys.stderr)
sys.exit(1)
def step1_dump_btf(self):
"""Step 1: Dump BTF from vmlinux."""
vmlinux_h = os.path.join(self.temp_dir, "vmlinux.h")
cmd = f"bpftool btf dump file {self.btf_source} format c > {vmlinux_h}"
self.run_command(cmd, "Dumping BTF from vmlinux")
return vmlinux_h
def step2_preprocess_enums(self, input_file):
"""Step 1.5: Preprocess enum definitions."""
self.log("Preprocessing enum definitions...")
with open(input_file, 'r') as f:
original_code = f.read()
# Extract anonymous enums
enums = re.findall(
r'(?<!typedef\s)(enum\s*\{[^}]*\})\s*(\w+)\s*(?::\s*\d+)?\s*;',
original_code
)
enum_defs = [enum_block + ';' for enum_block, _ in enums]
# Replace anonymous enums with int declarations
processed_code = re.sub(
r'(?<!typedef\s)enum\s*\{[^}]*\}\s*(\w+)\s*(?::\s*\d+)?\s*;',
r'int \1;',
original_code
)
# Prepend enum definitions
if enum_defs:
enum_text = '\n'.join(enum_defs) + '\n\n'
processed_code = enum_text + processed_code
output_file = os.path.join(self.temp_dir, "vmlinux_processed.h")
with open(output_file, 'w') as f:
f.write(processed_code)
return output_file
def step2_5_process_kioctx(self, input_file):
#TODO: this is a very bad bug and design decision. A single struct has an issue mostly.
"""Step 2.5: Process struct kioctx to extract nested anonymous structs."""
self.log("Processing struct kioctx nested structs...")
with open(input_file, 'r') as f:
content = f.read()
# Pattern to match struct kioctx with its full body (handles multiple nesting levels)
kioctx_pattern = r'struct\s+kioctx\s*\{(?:[^{}]|\{(?:[^{}]|\{[^{}]*\})*\})*\}\s*;'
def process_kioctx_replacement(match):
full_struct = match.group(0)
self.log(f"Found struct kioctx, length: {len(full_struct)} chars")
# Extract the struct body (everything between outermost { and })
body_match = re.search(r'struct\s+kioctx\s*\{(.*)\}\s*;', full_struct, re.DOTALL)
if not body_match:
return full_struct
body = body_match.group(1)
# Find all anonymous structs within the body
# Pattern: struct { ... } followed by ; (not a member name)
anon_struct_pattern = r'struct\s*\{[^}]*\}'
anon_structs = []
anon_counter = 4 # Start from 4, counting down to 1
def replace_anonymous_struct(m):
nonlocal anon_counter
anon_struct_content = m.group(0)
# Extract the body of the anonymous struct
anon_body_match = re.search(r'struct\s*\{(.*)\}', anon_struct_content, re.DOTALL)
if not anon_body_match:
return anon_struct_content
anon_body = anon_body_match.group(1)
# Create the named struct definition
anon_name = f"__anon{anon_counter}"
member_name = f"a{anon_counter}"
# Store the struct definition
anon_structs.append(f"struct {anon_name} {{{anon_body}}};")
anon_counter -= 1
# Return the member declaration
return f"struct {anon_name} {member_name}"
# Process the body, finding and replacing anonymous structs
# We need to be careful to only match anonymous structs followed by ;
processed_body = body
# Find all occurrences and process them
pattern_with_semicolon = r'struct\s*\{([^}]*)\}\s*;'
matches = list(re.finditer(pattern_with_semicolon, body, re.DOTALL))
if not matches:
self.log("No anonymous structs found in kioctx")
return full_struct
self.log(f"Found {len(matches)} anonymous struct(s)")
# Process in reverse order to maintain string positions
for match in reversed(matches):
anon_struct_content = match.group(1)
start_pos = match.start()
end_pos = match.end()
# Create the named struct definition
anon_name = f"__anon{anon_counter}"
member_name = f"a{anon_counter}"
# Store the struct definition
anon_structs.insert(0, f"struct {anon_name} {{{anon_struct_content}}};")
# Replace in the body
replacement = f"struct {anon_name} {member_name};"
processed_body = processed_body[:start_pos] + replacement + processed_body[end_pos:]
anon_counter -= 1
# Rebuild the complete definition
if anon_structs:
# Prepend the anonymous struct definitions
anon_definitions = '\n'.join(anon_structs) + '\n\n'
new_struct = f"struct kioctx {{{processed_body}}};"
return anon_definitions + new_struct
else:
return full_struct
# Apply the transformation
processed_content = re.sub(
kioctx_pattern,
process_kioctx_replacement,
content,
flags=re.DOTALL
)
output_file = os.path.join(self.temp_dir, "vmlinux_kioctx_processed.h")
with open(output_file, 'w') as f:
f.write(processed_content)
self.log(f"Saved kioctx-processed output to {output_file}")
return output_file
def step3_run_preprocessor(self, input_file):
"""Step 2: Run C preprocessor."""
output_file = os.path.join(self.temp_dir, "vmlinux.i")
cmd = f"clang -E {input_file} > {output_file}"
self.run_command(cmd, "Running C preprocessor")
return output_file
def step4_convert_to_ctypes(self, input_file):
"""Step 3: Convert to Python ctypes using clang2py."""
output_file = os.path.join(self.temp_dir, "vmlinux_raw.py")
cmd = (
f"clang2py {input_file} -o {output_file} "
f"--clang-args=\"-fno-ms-extensions -I/usr/include -I/usr/include/linux\""
)
self.run_command(cmd, "Converting to Python ctypes")
return output_file
def step5_postprocess(self, input_file):
"""Step 4: Post-process the generated Python file."""
self.log("Post-processing Python ctypes definitions...")
with open(input_file, "r") as f:
data = f.read()
# Remove lines like ('_45', ctypes.c_int64, 0)
data = re.sub(r"\('_[0-9]+',\s*ctypes\.[a-zA-Z0-9_]+,\s*0\),?\s*\n?", "", data)
# Replace ('_20', ctypes.c_uint64, 64) → ('_20', ctypes.c_uint64)
data = re.sub(r"\('(_[0-9]+)',\s*(ctypes\.[a-zA-Z0-9_]+),\s*[0-9]+\)", r"('\1', \2)", data)
# Replace ('_20', ctypes.c_char, 8) with ('_20', ctypes.c_uint8, 8)
data = re.sub(
r"(ctypes\.c_char)(\s*,\s*\d+\))",
r"ctypes.c_uint8\2",
data
)
# Remove ctypes. prefix from invalid entries
invalid_ctypes = ["bpf_iter_state", "_cache_type", "fs_context_purpose"]
for name in invalid_ctypes:
data = re.sub(rf"\bctypes\.{name}\b", name, data)
with open(self.output_file, "w") as f:
f.write(data)
self.log(f"Saved final output to {self.output_file}")
def cleanup(self):
"""Remove temporary files if not keeping them."""
if not self.keep_intermediate and self.temp_dir != ".":
self.log(f"Cleaning up temporary directory: {self.temp_dir}")
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def convert(self):
"""Run the complete conversion pipeline."""
try:
self.log("Starting BTF to Python ctypes conversion...")
# Check dependencies
self.check_dependencies()
# Run conversion pipeline
vmlinux_h = self.step1_dump_btf()
vmlinux_processed_h = self.step2_preprocess_enums(vmlinux_h)
vmlinux_kioctx_h = self.step2_5_process_kioctx(vmlinux_processed_h)
vmlinux_i = self.step3_run_preprocessor(vmlinux_kioctx_h)
vmlinux_raw_py = self.step4_convert_to_ctypes(vmlinux_i)
self.step5_postprocess(vmlinux_raw_py)
print(f"\n✓ Conversion complete! Output saved to: {self.output_file}")
except Exception as e:
print(f"\n✗ Error during conversion: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
finally:
self.cleanup()
def check_dependencies(self):
"""Check if required tools are available."""
self.log("Checking dependencies...")
dependencies = {
"bpftool": "bpftool --version",
"clang": "clang --version",
"clang2py": "clang2py --version"
}
missing = []
for tool, cmd in dependencies.items():
try:
subprocess.run(
cmd,
shell=True,
check=True,
capture_output=True
)
except subprocess.CalledProcessError:
missing.append(tool)
if missing:
print("Error: Missing required dependencies:", file=sys.stderr)
for tool in missing:
print(f" - {tool}", file=sys.stderr)
if "clang2py" in missing:
print("\nInstall ctypeslib2: pip install ctypeslib2", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="Convert Linux kernel BTF to Python ctypes definitions",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s
%(prog)s -o kernel_types.py
%(prog)s --btf-source /sys/kernel/btf/custom_module -k -v
"""
)
parser.add_argument(
"--btf-source",
default="/sys/kernel/btf/vmlinux",
help="Path to BTF source (default: /sys/kernel/btf/vmlinux)"
)
parser.add_argument(
"-o", "--output",
default="vmlinux.py",
help="Output Python file (default: vmlinux.py)"
)
parser.add_argument(
"-k", "--keep-intermediate",
action="store_true",
help="Keep intermediate files (vmlinux.h, vmlinux_processed.h, etc.)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
converter = BTFConverter(
btf_source=args.btf_source,
output_file=args.output,
keep_intermediate=args.keep_intermediate,
verbose=args.verbose
)
converter.convert()
if __name__ == "__main__":
main()