4 Commits

38 changed files with 399 additions and 1443 deletions

View File

@ -68,6 +68,8 @@ def callback(cpu, event):
perf = b["events"].open_perf_buffer(callback, struct_name="data_t")
print("Starting to poll... (Ctrl+C to stop)")
print("Try running: fork() or clone() system calls to trigger events")
try:
while True:
b["events"].poll(1000)

View File

@ -26,7 +26,7 @@ classifiers = [
]
readme = "README.md"
license = {text = "Apache-2.0"}
requires-python = ">=3.10"
requires-python = ">=3.8"
dependencies = [
"llvmlite",

View File

@ -1,6 +1,6 @@
import ast
import logging
import ctypes
from llvmlite import ir
from .local_symbol import LocalSymbol
from pythonbpf.helper import HelperHandlerRegistry
@ -81,7 +81,7 @@ def _allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab):
call_type = rval.func.id
# C type constructors
if call_type in ("c_int32", "c_int64", "c_uint32", "c_uint64", "c_void_p"):
if call_type in ("c_int32", "c_int64", "c_uint32", "c_uint64"):
ir_type = ctypes_to_ir(call_type)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
@ -177,33 +177,17 @@ def _allocate_for_binop(builder, var_name, local_sym_tab):
logger.info(f"Pre-allocated {var_name} for binop result")
def _get_type_name(ir_type):
"""Get a string representation of an IR type."""
if isinstance(ir_type, ir.IntType):
return f"i{ir_type.width}"
elif isinstance(ir_type, ir.PointerType):
return "ptr"
elif isinstance(ir_type, ir.ArrayType):
return f"[{ir_type.count}x{_get_type_name(ir_type.element)}]"
else:
return str(ir_type).replace(" ", "")
def allocate_temp_pool(builder, max_temps, local_sym_tab):
"""Allocate the temporary scratch space pool for helper arguments."""
if not max_temps:
logger.info("No temp pool allocation needed")
if max_temps == 0:
return
for tmp_type, cnt in max_temps.items():
type_name = _get_type_name(tmp_type)
logger.info(f"Allocating temp pool of {cnt} variables of type {type_name}")
for i in range(cnt):
temp_name = f"__helper_temp_{type_name}_{i}"
temp_var = builder.alloca(tmp_type, name=temp_name)
temp_var.align = _get_alignment(tmp_type)
local_sym_tab[temp_name] = LocalSymbol(temp_var, tmp_type)
logger.debug(f"Allocated temp variable: {temp_name}")
logger.info(f"Allocating temp pool of {max_temps} variables")
for i in range(max_temps):
temp_name = f"__helper_temp_{i}"
temp_var = builder.alloca(ir.IntType(64), name=temp_name)
temp_var.align = 8
local_sym_tab[temp_name] = LocalSymbol(temp_var, ir.IntType(64))
def _allocate_for_name(builder, var_name, rval, local_sym_tab):
@ -265,58 +249,7 @@ def _allocate_for_attribute(builder, var_name, rval, local_sym_tab, structs_sym_
].var = base_ptr # This is repurposing of var to store the pointer of the base type
local_sym_tab[struct_var].ir_type = field_ir
# Determine the actual IR type based on the field's type
actual_ir_type = None
# Check if it's a ctypes primitive
if field.type.__module__ == ctypes.__name__:
try:
field_size_bytes = ctypes.sizeof(field.type)
field_size_bits = field_size_bytes * 8
if field_size_bits in [8, 16, 32, 64]:
# Special case: struct_xdp_md i32 fields should allocate as i64
# because load_ctx_field will zero-extend them to i64
if (
vmlinux_struct_name == "struct_xdp_md"
and field_size_bits == 32
):
actual_ir_type = ir.IntType(64)
logger.info(
f"Allocating {var_name} as i64 for i32 field from struct_xdp_md.{field_name} "
"(will be zero-extended during load)"
)
else:
actual_ir_type = ir.IntType(field_size_bits)
else:
logger.warning(
f"Unusual field size {field_size_bits} bits for {field_name}"
)
actual_ir_type = ir.IntType(64)
except Exception as e:
logger.warning(
f"Could not determine size for ctypes field {field_name}: {e}"
)
actual_ir_type = ir.IntType(64)
# Check if it's a nested vmlinux struct or complex type
elif field.type.__module__ == "vmlinux":
# For pointers to structs, use pointer type (64-bit)
if field.ctype_complex_type is not None and issubclass(
field.ctype_complex_type, ctypes._Pointer
):
actual_ir_type = ir.IntType(64) # Pointer is always 64-bit
# For embedded structs, this is more complex - might need different handling
else:
logger.warning(
f"Field {field_name} is a nested vmlinux struct, using i64 for now"
)
actual_ir_type = ir.IntType(64)
else:
logger.warning(
f"Unknown field type module {field.type.__module__} for {field_name}"
)
actual_ir_type = ir.IntType(64)
actual_ir_type = ir.IntType(64)
# Allocate with the actual IR type, not the GlobalVariable
var = _allocate_with_type(builder, var_name, actual_ir_type)

View File

@ -152,30 +152,15 @@ def handle_variable_assignment(
if val_type != var_type:
if isinstance(val_type, Field):
logger.info("Handling assignment to struct field")
# Special handling for struct_xdp_md i32 fields that are zero-extended to i64
# The load_ctx_field already extended them, so val is i64 but val_type.type shows c_uint
if (
hasattr(val_type, "type")
and val_type.type.__name__ == "c_uint"
and isinstance(var_type, ir.IntType)
and var_type.width == 64
):
# This is the struct_xdp_md case - value is already i64
builder.store(val, var_ptr)
logger.info(
f"Assigned zero-extended struct_xdp_md i32 field to {var_name} (i64)"
)
return True
# TODO: handling only ctype struct fields for now. Handle other stuff too later.
elif var_type == ctypes_to_ir(val_type.type.__name__):
if var_type == ctypes_to_ir(val_type.type.__name__):
builder.store(val, var_ptr)
logger.info(f"Assigned ctype struct field to {var_name}")
return True
else:
logger.error(
f"Failed to assign ctype struct field to {var_name}: {val_type} != {var_type}"
)
return False
logger.error(
f"Failed to assign ctype struct field to {var_name}: {val_type} != {var_type}"
)
return False
elif isinstance(val_type, ir.IntType) and isinstance(var_type, ir.IntType):
# Allow implicit int widening
if val_type.width < var_type.width:

View File

@ -86,7 +86,7 @@ def processor(source_code, filename, module):
license_processing(tree, module)
globals_processing(tree, module)
structs_sym_tab = structs_proc(tree, module, bpf_chunks)
map_sym_tab = maps_proc(tree, module, bpf_chunks, structs_sym_tab)
map_sym_tab = maps_proc(tree, module, bpf_chunks)
func_proc(tree, module, bpf_chunks, map_sym_tab, structs_sym_tab)
globals_list_creation(tree, module)
@ -218,11 +218,13 @@ def compile(loglevel=logging.WARNING) -> bool:
def BPF(loglevel=logging.WARNING) -> BpfObject:
caller_frame = inspect.stack()[1]
src = inspect.getsource(caller_frame.frame)
with (
tempfile.NamedTemporaryFile(mode="w+", delete=True, suffix=".py") as f,
tempfile.NamedTemporaryFile(mode="w+", delete=True, suffix=".ll") as inter,
tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".o") as obj_file,
):
with tempfile.NamedTemporaryFile(
mode="w+", delete=True, suffix=".py"
) as f, tempfile.NamedTemporaryFile(
mode="w+", delete=True, suffix=".ll"
) as inter, tempfile.NamedTemporaryFile(
mode="w+", delete=False, suffix=".o"
) as obj_file:
f.write(src)
f.flush()
source = f.name

View File

@ -12,7 +12,6 @@ from .type_normalization import (
get_base_type_and_depth,
deref_to_depth,
)
from pythonbpf.vmlinux_parser.assignment_info import Field
from .vmlinux_registry import VmlinuxHandlerRegistry
logger: Logger = logging.getLogger(__name__)
@ -280,45 +279,16 @@ def _handle_ctypes_call(
call_type = expr.func.id
expected_type = ctypes_to_ir(call_type)
# Extract the actual IR value and type
# val could be (value, ir_type) or (value, Field)
value, val_type = val
# If val_type is a Field object (from vmlinux struct), get the actual IR type of the value
if isinstance(val_type, Field):
# The value is already the correct IR value (potentially zero-extended)
# Get the IR type from the value itself
actual_ir_type = value.type
logger.info(
f"Converting vmlinux field {val_type.name} (IR type: {actual_ir_type}) to {call_type}"
)
else:
actual_ir_type = val_type
if actual_ir_type != expected_type:
if val[1] != expected_type:
# NOTE: We are only considering casting to and from int types for now
if isinstance(actual_ir_type, ir.IntType) and isinstance(
expected_type, ir.IntType
):
if actual_ir_type.width < expected_type.width:
value = builder.sext(value, expected_type)
logger.info(
f"Sign-extended from i{actual_ir_type.width} to i{expected_type.width}"
)
elif actual_ir_type.width > expected_type.width:
value = builder.trunc(value, expected_type)
logger.info(
f"Truncated from i{actual_ir_type.width} to i{expected_type.width}"
)
if isinstance(val[1], ir.IntType) and isinstance(expected_type, ir.IntType):
if val[1].width < expected_type.width:
val = (builder.sext(val[0], expected_type), expected_type)
else:
# Same width, just use as-is (e.g., both i64)
pass
val = (builder.trunc(val[0], expected_type), expected_type)
else:
raise ValueError(
f"Type mismatch: expected {expected_type}, got {actual_ir_type} (original type: {val_type})"
)
return value, expected_type
raise ValueError(f"Type mismatch: expected {expected_type}, got {val[1]}")
return val
def _handle_compare(

View File

@ -49,27 +49,17 @@ def generate_function_debug_info(
"The first argument should always be a pointer to a struct or a void pointer"
)
context_debug_info = VmlinuxHandlerRegistry.get_struct_debug_info(annotation.id)
# Create pointer to context this must be created fresh for each function
# to avoid circular reference issues when the same struct is used in multiple functions
pointer_to_context_debug_info = generator.create_pointer_type(
context_debug_info, 64
)
# Create subroutine type - also fresh for each function
subroutine_type = generator.create_subroutine_type(
return_type, pointer_to_context_debug_info
)
# Create local variable - fresh for each function with unique name
context_local_variable = generator.create_local_variable_debug_info(
leading_argument_name, 1, pointer_to_context_debug_info
)
retained_nodes = [context_local_variable]
logger.info(f"Generating debug info for function {func_node.name}")
# Create subprogram with is_distinct=True to ensure each function gets unique debug info
print("function name", func_node.name)
subprogram_debug_info = generator.create_subprogram(
func_node.name, subroutine_type, retained_nodes
)

View File

@ -39,7 +39,7 @@ logger = logging.getLogger(__name__)
def count_temps_in_call(call_node, local_sym_tab):
"""Count the number of temporary variables needed for a function call."""
count = {}
count = 0
is_helper = False
# NOTE: We exclude print calls for now
@ -49,28 +49,21 @@ def count_temps_in_call(call_node, local_sym_tab):
and call_node.func.id != "print"
):
is_helper = True
func_name = call_node.func.id
elif isinstance(call_node.func, ast.Attribute):
if HelperHandlerRegistry.has_handler(call_node.func.attr):
is_helper = True
func_name = call_node.func.attr
if not is_helper:
return {} # No temps needed
return 0
for arg_idx in range(len(call_node.args)):
for arg in call_node.args:
# NOTE: Count all non-name arguments
# For struct fields, if it is being passed as an argument,
# The struct object should already exist in the local_sym_tab
arg = call_node.args[arg_idx]
if isinstance(arg, ast.Name) or (
if not isinstance(arg, ast.Name) and not (
isinstance(arg, ast.Attribute) and arg.value.id in local_sym_tab
):
continue
param_type = HelperHandlerRegistry.get_param_type(func_name, arg_idx)
if isinstance(param_type, ir.PointerType):
pointee_type = param_type.pointee
count[pointee_type] = count.get(pointee_type, 0) + 1
count += 1
return count
@ -106,15 +99,11 @@ def handle_if_allocation(
def allocate_mem(
module, builder, body, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab
):
max_temps_needed = {}
def merge_type_counts(count_dict):
nonlocal max_temps_needed
for typ, cnt in count_dict.items():
max_temps_needed[typ] = max(max_temps_needed.get(typ, 0), cnt)
max_temps_needed = 0
def update_max_temps_for_stmt(stmt):
nonlocal max_temps_needed
temps_needed = 0
if isinstance(stmt, ast.If):
for s in stmt.body:
@ -123,13 +112,10 @@ def allocate_mem(
update_max_temps_for_stmt(s)
return
stmt_temps = {}
for node in ast.walk(stmt):
if isinstance(node, ast.Call):
call_temps = count_temps_in_call(node, local_sym_tab)
for typ, cnt in call_temps.items():
stmt_temps[typ] = stmt_temps.get(typ, 0) + cnt
merge_type_counts(stmt_temps)
temps_needed += count_temps_in_call(node, local_sym_tab)
max_temps_needed = max(max_temps_needed, temps_needed)
for stmt in body:
update_max_temps_for_stmt(stmt)

View File

@ -1,21 +1,7 @@
from .helper_registry import HelperHandlerRegistry
from .helper_utils import reset_scratch_pool
from .bpf_helper_handler import handle_helper_call, emit_probe_read_kernel_str_call
from .helpers import (
ktime,
pid,
deref,
comm,
probe_read_str,
random,
probe_read,
smp_processor_id,
uid,
skb_store_bytes,
get_stack,
XDP_DROP,
XDP_PASS,
)
from .helpers import ktime, pid, deref, comm, probe_read_str, XDP_DROP, XDP_PASS
# Register the helper handler with expr module
@ -79,12 +65,6 @@ __all__ = [
"deref",
"comm",
"probe_read_str",
"random",
"probe_read",
"smp_processor_id",
"uid",
"skb_store_bytes",
"get_stack",
"XDP_DROP",
"XDP_PASS",
]

View File

@ -8,43 +8,30 @@ from .helper_utils import (
get_flags_val,
get_data_ptr_and_size,
get_buffer_ptr_and_size,
get_char_array_ptr_and_size,
get_ptr_from_arg,
get_int_value_from_arg,
)
from .printk_formatter import simple_string_print, handle_fstring_print
from pythonbpf.maps import BPFMapType
from logging import Logger
import logging
logger = logging.getLogger(__name__)
logger: Logger = logging.getLogger(__name__)
class BPFHelperID(Enum):
BPF_MAP_LOOKUP_ELEM = 1
BPF_MAP_UPDATE_ELEM = 2
BPF_MAP_DELETE_ELEM = 3
BPF_PROBE_READ = 4
BPF_KTIME_GET_NS = 5
BPF_PRINTK = 6
BPF_GET_PRANDOM_U32 = 7
BPF_GET_SMP_PROCESSOR_ID = 8
BPF_SKB_STORE_BYTES = 9
BPF_GET_CURRENT_PID_TGID = 14
BPF_GET_CURRENT_UID_GID = 15
BPF_GET_CURRENT_COMM = 16
BPF_PERF_EVENT_OUTPUT = 25
BPF_GET_STACK = 67
BPF_PROBE_READ_KERNEL_STR = 115
BPF_RINGBUF_OUTPUT = 130
BPF_RINGBUF_RESERVE = 131
BPF_RINGBUF_SUBMIT = 132
BPF_RINGBUF_DISCARD = 133
@HelperHandlerRegistry.register(
"ktime",
param_types=[],
return_type=ir.IntType(64),
)
@HelperHandlerRegistry.register("ktime")
def bpf_ktime_get_ns_emitter(
call,
map_ptr,
@ -67,11 +54,7 @@ def bpf_ktime_get_ns_emitter(
return result, ir.IntType(64)
@HelperHandlerRegistry.register(
"lookup",
param_types=[ir.PointerType(ir.IntType(64))],
return_type=ir.PointerType(ir.IntType(64)),
)
@HelperHandlerRegistry.register("lookup")
def bpf_map_lookup_elem_emitter(
call,
map_ptr,
@ -113,7 +96,6 @@ def bpf_map_lookup_elem_emitter(
return result, ir.PointerType()
# NOTE: This has special handling so we won't reflect the signature here.
@HelperHandlerRegistry.register("print")
def bpf_printk_emitter(
call,
@ -162,15 +144,7 @@ def bpf_printk_emitter(
return True
@HelperHandlerRegistry.register(
"update",
param_types=[
ir.PointerType(ir.IntType(64)),
ir.PointerType(ir.IntType(64)),
ir.IntType(64),
],
return_type=ir.PointerType(ir.IntType(64)),
)
@HelperHandlerRegistry.register("update")
def bpf_map_update_elem_emitter(
call,
map_ptr,
@ -225,11 +199,7 @@ def bpf_map_update_elem_emitter(
return result, None
@HelperHandlerRegistry.register(
"delete",
param_types=[ir.PointerType(ir.IntType(64))],
return_type=ir.PointerType(ir.IntType(64)),
)
@HelperHandlerRegistry.register("delete")
def bpf_map_delete_elem_emitter(
call,
map_ptr,
@ -269,11 +239,7 @@ def bpf_map_delete_elem_emitter(
return result, None
@HelperHandlerRegistry.register(
"comm",
param_types=[ir.PointerType(ir.IntType(8))],
return_type=ir.IntType(64),
)
@HelperHandlerRegistry.register("comm")
def bpf_get_current_comm_emitter(
call,
map_ptr,
@ -330,11 +296,7 @@ def bpf_get_current_comm_emitter(
return result, None
@HelperHandlerRegistry.register(
"pid",
param_types=[],
return_type=ir.IntType(64),
)
@HelperHandlerRegistry.register("pid")
def bpf_get_current_pid_tgid_emitter(
call,
map_ptr,
@ -356,12 +318,12 @@ def bpf_get_current_pid_tgid_emitter(
result = builder.call(fn_ptr, [], tail=False)
# Extract the lower 32 bits (PID) using bitwise AND with 0xFFFFFFFF
# TODO: return both PID and TGID if we end up needing TGID somewhere
mask = ir.Constant(ir.IntType(64), 0xFFFFFFFF)
pid = builder.and_(result, mask)
return pid, ir.IntType(64)
@HelperHandlerRegistry.register("output")
def bpf_perf_event_output_handler(
call,
map_ptr,
@ -372,10 +334,6 @@ def bpf_perf_event_output_handler(
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_perf_event_output helper function call.
"""
if len(call.args) != 1:
raise ValueError(
f"Perf event output expects exactly one argument, got {len(call.args)}"
@ -413,98 +371,6 @@ def bpf_perf_event_output_handler(
return result, None
def bpf_ringbuf_output_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_ringbuf_output helper function call.
"""
if len(call.args) != 1:
raise ValueError(
f"Ringbuf output expects exactly one argument, got {len(call.args)}"
)
data_arg = call.args[0]
data_ptr, size_val = get_data_ptr_and_size(data_arg, local_sym_tab, struct_sym_tab)
flags_val = ir.Constant(ir.IntType(64), 0)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
data_void_ptr = builder.bitcast(data_ptr, ir.PointerType())
fn_type = ir.FunctionType(
ir.IntType(64),
[
ir.PointerType(),
ir.PointerType(),
ir.IntType(64),
ir.IntType(64),
],
var_arg=False,
)
fn_ptr_type = ir.PointerType(fn_type)
# helper id
fn_addr = ir.Constant(ir.IntType(64), BPFHelperID.BPF_RINGBUF_OUTPUT.value)
fn_ptr = builder.inttoptr(fn_addr, fn_ptr_type)
result = builder.call(
fn_ptr, [map_void_ptr, data_void_ptr, size_val, flags_val], tail=False
)
return result, None
@HelperHandlerRegistry.register(
"output",
param_types=[ir.PointerType(ir.IntType(8))],
return_type=ir.IntType(64),
)
def handle_output_helper(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Route output helper to the appropriate emitter based on map type.
"""
match map_sym_tab[map_ptr.name].type:
case BPFMapType.PERF_EVENT_ARRAY:
return bpf_perf_event_output_handler(
call,
map_ptr,
module,
builder,
func,
local_sym_tab,
struct_sym_tab,
map_sym_tab,
)
case BPFMapType.RINGBUF:
return bpf_ringbuf_output_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab,
struct_sym_tab,
map_sym_tab,
)
case _:
logger.error("Unsupported map type for output helper.")
raise NotImplementedError("Output helper for this map type is not implemented.")
def emit_probe_read_kernel_str_call(builder, dst_ptr, dst_size, src_ptr):
"""Emit LLVM IR call to bpf_probe_read_kernel_str"""
@ -532,14 +398,7 @@ def emit_probe_read_kernel_str_call(builder, dst_ptr, dst_size, src_ptr):
return result
@HelperHandlerRegistry.register(
"probe_read_str",
param_types=[
ir.PointerType(ir.IntType(8)),
ir.PointerType(ir.IntType(8)),
],
return_type=ir.IntType(64),
)
@HelperHandlerRegistry.register("probe_read_str")
def bpf_probe_read_kernel_str_emitter(
call,
map_ptr,
@ -558,8 +417,8 @@ def bpf_probe_read_kernel_str_emitter(
)
# Get destination buffer (char array -> i8*)
dst_ptr, dst_size = get_or_create_ptr_from_arg(
func, module, call.args[0], builder, local_sym_tab, map_sym_tab, struct_sym_tab
dst_ptr, dst_size = get_char_array_ptr_and_size(
call.args[0], builder, local_sym_tab, struct_sym_tab
)
# Get source pointer (evaluate expression)
@ -574,430 +433,6 @@ def bpf_probe_read_kernel_str_emitter(
return result, ir.IntType(64)
@HelperHandlerRegistry.register(
"random",
param_types=[],
return_type=ir.IntType(32),
)
def bpf_get_prandom_u32_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_prandom_u32 helper function call.
"""
helper_id = ir.Constant(ir.IntType(64), BPFHelperID.BPF_GET_PRANDOM_U32.value)
fn_type = ir.FunctionType(ir.IntType(32), [], var_arg=False)
fn_ptr_type = ir.PointerType(fn_type)
fn_ptr = builder.inttoptr(helper_id, fn_ptr_type)
result = builder.call(fn_ptr, [], tail=False)
return result, ir.IntType(32)
@HelperHandlerRegistry.register(
"probe_read",
param_types=[
ir.PointerType(ir.IntType(8)),
ir.IntType(32),
ir.PointerType(ir.IntType(8)),
],
return_type=ir.IntType(64),
)
def bpf_probe_read_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_probe_read helper function
"""
if len(call.args) != 3:
logger.warn("Expected 3 args for probe_read helper")
return
dst_ptr = get_or_create_ptr_from_arg(
func,
module,
call.args[0],
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
ir.IntType(8),
)
size_val = get_int_value_from_arg(
call.args[1],
func,
module,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
)
src_ptr = get_or_create_ptr_from_arg(
func,
module,
call.args[2],
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
ir.IntType(8),
)
fn_type = ir.FunctionType(
ir.IntType(64),
[ir.PointerType(), ir.IntType(32), ir.PointerType()],
var_arg=False,
)
fn_ptr = builder.inttoptr(
ir.Constant(ir.IntType(64), BPFHelperID.BPF_PROBE_READ.value),
ir.PointerType(fn_type),
)
result = builder.call(
fn_ptr,
[
builder.bitcast(dst_ptr, ir.PointerType()),
builder.trunc(size_val, ir.IntType(32)),
builder.bitcast(src_ptr, ir.PointerType()),
],
tail=False,
)
logger.info(f"Emitted bpf_probe_read (size={size_val})")
return result, ir.IntType(64)
@HelperHandlerRegistry.register(
"smp_processor_id",
param_types=[],
return_type=ir.IntType(32),
)
def bpf_get_smp_processor_id_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_smp_processor_id helper function call.
"""
helper_id = ir.Constant(ir.IntType(64), BPFHelperID.BPF_GET_SMP_PROCESSOR_ID.value)
fn_type = ir.FunctionType(ir.IntType(32), [], var_arg=False)
fn_ptr_type = ir.PointerType(fn_type)
fn_ptr = builder.inttoptr(helper_id, fn_ptr_type)
result = builder.call(fn_ptr, [], tail=False)
logger.info("Emitted bpf_get_smp_processor_id call")
return result, ir.IntType(32)
@HelperHandlerRegistry.register(
"uid",
param_types=[],
return_type=ir.IntType(64),
)
def bpf_get_current_uid_gid_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_current_uid_gid helper function call.
"""
helper_id = ir.Constant(ir.IntType(64), BPFHelperID.BPF_GET_CURRENT_UID_GID.value)
fn_type = ir.FunctionType(ir.IntType(64), [], var_arg=False)
fn_ptr_type = ir.PointerType(fn_type)
fn_ptr = builder.inttoptr(helper_id, fn_ptr_type)
result = builder.call(fn_ptr, [], tail=False)
# Extract the lower 32 bits (UID) using bitwise AND with 0xFFFFFFFF
# TODO: return both UID and GID if we end up needing GID somewhere
mask = ir.Constant(ir.IntType(64), 0xFFFFFFFF)
pid = builder.and_(result, mask)
return pid, ir.IntType(64)
@HelperHandlerRegistry.register(
"skb_store_bytes",
param_types=[
ir.IntType(32),
ir.PointerType(ir.IntType(8)),
ir.IntType(32),
ir.IntType(64),
],
return_type=ir.IntType(64),
)
def bpf_skb_store_bytes_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_skb_store_bytes helper function call.
Expected call signature: skb_store_bytes(skb, offset, from, len, flags)
"""
args_signature = [
ir.PointerType(), # skb pointer
ir.IntType(32), # offset
ir.PointerType(), # from
ir.IntType(32), # len
ir.IntType(64), # flags
]
if len(call.args) not in (3, 4):
raise ValueError(
f"skb_store_bytes expects 3 or 4 args (offset, from, len, flags), got {len(call.args)}"
)
skb_ptr = func.args[0] # First argument to the function is skb
offset_val = get_int_value_from_arg(
call.args[0],
func,
module,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
)
from_ptr = get_or_create_ptr_from_arg(
func,
module,
call.args[1],
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
args_signature[2],
)
len_val = get_int_value_from_arg(
call.args[2],
func,
module,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
)
if len(call.args) == 4:
flags_val = get_flags_val(call.args[3], builder, local_sym_tab)
else:
flags_val = 0
if isinstance(flags_val, int):
flags = ir.Constant(ir.IntType(64), flags_val)
else:
flags = flags_val
fn_type = ir.FunctionType(
ir.IntType(64),
args_signature,
var_arg=False,
)
fn_ptr = builder.inttoptr(
ir.Constant(ir.IntType(64), BPFHelperID.BPF_SKB_STORE_BYTES.value),
ir.PointerType(fn_type),
)
result = builder.call(
fn_ptr,
[
builder.bitcast(skb_ptr, ir.PointerType()),
builder.trunc(offset_val, ir.IntType(32)),
builder.bitcast(from_ptr, ir.PointerType()),
builder.trunc(len_val, ir.IntType(32)),
flags,
],
tail=False,
)
logger.info("Emitted bpf_skb_store_bytes call")
return result, ir.IntType(64)
@HelperHandlerRegistry.register(
"reserve",
param_types=[ir.IntType(64)],
return_type=ir.PointerType(ir.IntType(8)),
)
def bpf_ringbuf_reserve_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_ringbuf_reserve helper function call.
Expected call signature: ringbuf.reserve(size)
"""
if len(call.args) != 1:
raise ValueError(
f"ringbuf.reserve expects exactly one argument (size), got {len(call.args)}"
)
size_val = get_int_value_from_arg(
call.args[0],
func,
module,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
fn_type = ir.FunctionType(
ir.PointerType(ir.IntType(8)),
[ir.PointerType(), ir.IntType(64)],
var_arg=False,
)
fn_ptr_type = ir.PointerType(fn_type)
fn_addr = ir.Constant(ir.IntType(64), BPFHelperID.BPF_RINGBUF_RESERVE.value)
fn_ptr = builder.inttoptr(fn_addr, fn_ptr_type)
result = builder.call(fn_ptr, [map_void_ptr, size_val], tail=False)
return result, ir.PointerType(ir.IntType(8))
@HelperHandlerRegistry.register(
"submit",
param_types=[ir.PointerType(ir.IntType(8)), ir.IntType(64)],
return_type=ir.VoidType(),
)
def bpf_ringbuf_submit_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_ringbuf_submit helper function call.
Expected call signature: ringbuf.submit(data, flags=0)
"""
if len(call.args) not in (1, 2):
raise ValueError(
f"ringbuf.submit expects 1 or 2 args (data, flags), got {len(call.args)}"
)
data_arg = call.args[0]
flags_arg = call.args[1] if len(call.args) == 2 else None
data_ptr = get_or_create_ptr_from_arg(
func,
module,
data_arg,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
ir.PointerType(ir.IntType(8)),
)
flags_const = get_flags_val(flags_arg, builder, local_sym_tab)
if isinstance(flags_const, int):
flags_const = ir.Constant(ir.IntType(64), flags_const)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
fn_type = ir.FunctionType(
ir.VoidType(),
[ir.PointerType(), ir.PointerType(), ir.IntType(64)],
var_arg=False,
)
fn_ptr_type = ir.PointerType(fn_type)
fn_addr = ir.Constant(ir.IntType(64), BPFHelperID.BPF_RINGBUF_SUBMIT.value)
fn_ptr = builder.inttoptr(fn_addr, fn_ptr_type)
result = builder.call(fn_ptr, [map_void_ptr, data_ptr, flags_const], tail=False)
return result, None
@HelperHandlerRegistry.register(
"get_stack",
param_types=[ir.PointerType(ir.IntType(8)), ir.IntType(64)],
return_type=ir.IntType(64),
)
def bpf_get_stack_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_stack helper function call.
"""
if len(call.args) not in (1, 2):
raise ValueError(
f"get_stack expects atmost two arguments (buf, flags), got {len(call.args)}"
)
ctx_ptr = func.args[0] # First argument to the function is ctx
buf_arg = call.args[0]
flags_arg = call.args[1] if len(call.args) == 2 else None
buf_ptr, buf_size = get_buffer_ptr_and_size(
buf_arg, builder, local_sym_tab, struct_sym_tab
)
flags_val = get_flags_val(flags_arg, builder, local_sym_tab)
if isinstance(flags_val, int):
flags_val = ir.Constant(ir.IntType(64), flags_val)
buf_void_ptr = builder.bitcast(buf_ptr, ir.PointerType())
fn_type = ir.FunctionType(
ir.IntType(64),
[
ir.PointerType(ir.IntType(8)),
ir.PointerType(),
ir.IntType(64),
ir.IntType(64),
],
var_arg=False,
)
fn_ptr_type = ir.PointerType(fn_type)
fn_addr = ir.Constant(ir.IntType(64), BPFHelperID.BPF_GET_STACK.value)
fn_ptr = builder.inttoptr(fn_addr, fn_ptr_type)
result = builder.call(
fn_ptr,
[ctx_ptr, buf_void_ptr, ir.Constant(ir.IntType(64), buf_size), flags_val],
tail=False,
)
return result, ir.IntType(64)
def handle_helper_call(
call,
module,
@ -1052,6 +487,6 @@ def handle_helper_call(
if not map_sym_tab or map_name not in map_sym_tab:
raise ValueError(f"Map '{map_name}' not found in symbol table")
return invoke_helper(method_name, map_sym_tab[map_name].sym)
return invoke_helper(method_name, map_sym_tab[map_name])
return None

View File

@ -1,31 +1,17 @@
from dataclasses import dataclass
from llvmlite import ir
from typing import Callable
@dataclass
class HelperSignature:
"""Signature of a BPF helper function"""
arg_types: list[ir.Type]
return_type: ir.Type
func: Callable
class HelperHandlerRegistry:
"""Registry for BPF helpers"""
_handlers: dict[str, HelperSignature] = {}
_handlers: dict[str, Callable] = {}
@classmethod
def register(cls, helper_name, param_types=None, return_type=None):
def register(cls, helper_name):
"""Decorator to register a handler function for a helper"""
def decorator(func):
helper_sig = HelperSignature(
arg_types=param_types, return_type=return_type, func=func
)
cls._handlers[helper_name] = helper_sig
cls._handlers[helper_name] = func
return func
return decorator
@ -33,29 +19,9 @@ class HelperHandlerRegistry:
@classmethod
def get_handler(cls, helper_name):
"""Get the handler function for a helper"""
handler = cls._handlers.get(helper_name)
return handler.func if handler else None
return cls._handlers.get(helper_name)
@classmethod
def has_handler(cls, helper_name):
"""Check if a handler function is registered for a helper"""
return helper_name in cls._handlers
@classmethod
def get_signature(cls, helper_name):
"""Get the signature of a helper function"""
return cls._handlers.get(helper_name)
@classmethod
def get_param_type(cls, helper_name, index):
"""Get the type of a parameter of a helper function by the index"""
signature = cls.get_signature(helper_name)
if signature and signature.arg_types and 0 <= index < len(signature.arg_types):
return signature.arg_types[index]
return None
@classmethod
def get_return_type(cls, helper_name):
"""Get the return type of a helper function"""
signature = cls.get_signature(helper_name)
return signature.return_type if signature else None

View File

@ -14,43 +14,26 @@ class ScratchPoolManager:
"""Manage the temporary helper variables in local_sym_tab"""
def __init__(self):
self._counters = {}
self._counter = 0
@property
def counter(self):
return sum(self._counters.values())
return self._counter
def reset(self):
self._counters.clear()
self._counter = 0
logger.debug("Scratch pool counter reset to 0")
def _get_type_name(self, ir_type):
if isinstance(ir_type, ir.PointerType):
return "ptr"
elif isinstance(ir_type, ir.IntType):
return f"i{ir_type.width}"
elif isinstance(ir_type, ir.ArrayType):
return f"[{ir_type.count}x{self._get_type_name(ir_type.element)}]"
else:
return str(ir_type).replace(" ", "")
def get_next_temp(self, local_sym_tab, expected_type=None):
# Default to i64 if no expected type provided
type_name = self._get_type_name(expected_type) if expected_type else "i64"
if type_name not in self._counters:
self._counters[type_name] = 0
counter = self._counters[type_name]
temp_name = f"__helper_temp_{type_name}_{counter}"
self._counters[type_name] += 1
def get_next_temp(self, local_sym_tab):
temp_name = f"__helper_temp_{self._counter}"
self._counter += 1
if temp_name not in local_sym_tab:
raise ValueError(
f"Scratch pool exhausted or inadequate: {temp_name}. "
f"Type: {type_name} Counter: {counter}"
f"Current counter: {self._counter}"
)
logger.debug(f"Using {temp_name} for type {type_name}")
return local_sym_tab[temp_name].var, temp_name
@ -77,73 +60,24 @@ def get_var_ptr_from_name(var_name, local_sym_tab):
def create_int_constant_ptr(value, builder, local_sym_tab, int_width=64):
"""Create a pointer to an integer constant."""
int_type = ir.IntType(int_width)
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab, int_type)
# Default to 64-bit integer
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab)
logger.info(f"Using temp variable '{temp_name}' for int constant {value}")
const_val = ir.Constant(int_type, value)
const_val = ir.Constant(ir.IntType(int_width), value)
builder.store(const_val, ptr)
return ptr
def get_or_create_ptr_from_arg(
func,
module,
arg,
builder,
local_sym_tab,
map_sym_tab,
struct_sym_tab=None,
expected_type=None,
func, module, arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab=None
):
"""Extract or create pointer from the call arguments."""
logger.info(f"Getting pointer from arg: {ast.dump(arg)}")
sz = None
if isinstance(arg, ast.Name):
# Stack space is already allocated
ptr = get_var_ptr_from_name(arg.id, local_sym_tab)
elif isinstance(arg, ast.Constant) and isinstance(arg.value, int):
int_width = 64 # Default to i64
if expected_type and isinstance(expected_type, ir.IntType):
int_width = expected_type.width
ptr = create_int_constant_ptr(arg.value, builder, local_sym_tab, int_width)
elif isinstance(arg, ast.Attribute):
# A struct field
struct_name = arg.value.id
field_name = arg.attr
if not local_sym_tab or struct_name not in local_sym_tab:
raise ValueError(f"Struct '{struct_name}' not found")
struct_type = local_sym_tab[struct_name].metadata
if not struct_sym_tab or struct_type not in struct_sym_tab:
raise ValueError(f"Struct type '{struct_type}' not found")
struct_info = struct_sym_tab[struct_type]
if field_name not in struct_info.fields:
raise ValueError(
f"Field '{field_name}' not found in struct '{struct_name}'"
)
field_type = struct_info.field_type(field_name)
struct_ptr = local_sym_tab[struct_name].var
# Special handling for char arrays
if (
isinstance(field_type, ir.ArrayType)
and isinstance(field_type.element, ir.IntType)
and field_type.element.width == 8
):
ptr, sz = get_char_array_ptr_and_size(
arg, builder, local_sym_tab, struct_sym_tab
)
if not ptr:
raise ValueError("Failed to get char array pointer from struct field")
else:
ptr = struct_info.gep(builder, struct_ptr, field_name)
ptr = create_int_constant_ptr(arg.value, builder, local_sym_tab)
else:
# NOTE: For any integer expression reaching this branch, it is probably a struct field or a binop
# Evaluate the expression and store the result in a temp variable
val = get_operand_value(
func, module, arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab
@ -151,20 +85,13 @@ def get_or_create_ptr_from_arg(
if val is None:
raise ValueError("Failed to evaluate expression for helper arg.")
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab, expected_type)
# NOTE: We assume the result is an int64 for now
# if isinstance(arg, ast.Attribute):
# return val
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab)
logger.info(f"Using temp variable '{temp_name}' for expression result")
if (
isinstance(val.type, ir.IntType)
and expected_type
and val.type.width > expected_type.width
):
val = builder.trunc(val, expected_type)
builder.store(val, ptr)
# NOTE: For char arrays, also return size
if sz:
return ptr, sz
return ptr
@ -287,10 +214,7 @@ def get_char_array_ptr_and_size(buf_arg, builder, local_sym_tab, struct_sym_tab)
field_type = struct_info.field_type(field_name)
if not _is_char_array(field_type):
logger.info(
"Field is not a char array, falling back to int or ptr detection"
)
return None, 0
raise ValueError("Expected char array field")
struct_ptr = local_sym_tab[var_name].var
field_ptr = struct_info.gep(builder, struct_ptr, field_name)
@ -350,23 +274,3 @@ def get_ptr_from_arg(
raise ValueError(f"Expected pointer type, got {val_type}")
return val, val_type
def get_int_value_from_arg(
arg, func, module, builder, local_sym_tab, map_sym_tab, struct_sym_tab
):
"""Evaluate argument and return integer value"""
result = eval_expr(
func, module, builder, arg, local_sym_tab, map_sym_tab, struct_sym_tab
)
if not result:
raise ValueError("Failed to evaluate argument")
val, val_type = result
if not isinstance(val_type, ir.IntType):
raise ValueError(f"Expected integer type, got {val_type}")
return val

View File

@ -27,36 +27,6 @@ def probe_read_str(dst, src):
return ctypes.c_int64(0)
def random():
"""get a pseudorandom u32 number"""
return ctypes.c_int32(0)
def probe_read(dst, size, src):
"""Safely read data from kernel memory"""
return ctypes.c_int64(0)
def smp_processor_id():
"""get the current CPU id"""
return ctypes.c_int32(0)
def uid():
"""get current user id"""
return ctypes.c_int32(0)
def skb_store_bytes(offset, from_buf, size, flags=0):
"""store bytes into a socket buffer"""
return ctypes.c_int64(0)
def get_stack(buf, flags=0):
"""get the current stack trace"""
return ctypes.c_int64(0)
XDP_ABORTED = ctypes.c_int64(0)
XDP_DROP = ctypes.c_int64(1)
XDP_PASS = ctypes.c_int64(2)

View File

@ -4,7 +4,6 @@ import logging
from llvmlite import ir
from pythonbpf.expr import eval_expr, get_base_type_and_depth, deref_to_depth
from pythonbpf.expr.vmlinux_registry import VmlinuxHandlerRegistry
from pythonbpf.helper.helper_utils import get_char_array_ptr_and_size
logger = logging.getLogger(__name__)
@ -220,12 +219,11 @@ def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_ta
"""Evaluate and prepare an expression to use as an arg for bpf_printk."""
# Special case: struct field char array needs pointer to first element
if isinstance(expr, ast.Attribute):
char_array_ptr, _ = get_char_array_ptr_and_size(
expr, builder, local_sym_tab, struct_sym_tab
)
if char_array_ptr:
return char_array_ptr
char_array_ptr = _get_struct_char_array_ptr(
expr, builder, local_sym_tab, struct_sym_tab
)
if char_array_ptr:
return char_array_ptr
# Regular expression evaluation
val, _ = eval_expr(func, module, builder, expr, local_sym_tab, None, struct_sym_tab)
@ -244,6 +242,52 @@ def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_ta
return ir.Constant(ir.IntType(64), 0)
def _get_struct_char_array_ptr(expr, builder, local_sym_tab, struct_sym_tab):
"""Get pointer to first element of char array in struct field, or None."""
if not (isinstance(expr, ast.Attribute) and isinstance(expr.value, ast.Name)):
return None
var_name = expr.value.id
field_name = expr.attr
# Check if it's a valid struct field
if not (
local_sym_tab
and var_name in local_sym_tab
and struct_sym_tab
and local_sym_tab[var_name].metadata in struct_sym_tab
):
return None
struct_type = local_sym_tab[var_name].metadata
struct_info = struct_sym_tab[struct_type]
if field_name not in struct_info.fields:
return None
field_type = struct_info.field_type(field_name)
# Check if it's a char array
is_char_array = (
isinstance(field_type, ir.ArrayType)
and isinstance(field_type.element, ir.IntType)
and field_type.element.width == 8
)
if not is_char_array:
return None
# Get field pointer and GEP to first element: [N x i8]* -> i8*
struct_ptr = local_sym_tab[var_name].var
field_ptr = struct_info.gep(builder, struct_ptr, field_name)
return builder.gep(
field_ptr,
[ir.Constant(ir.IntType(32), 0), ir.Constant(ir.IntType(32), 0)],
inbounds=True,
)
def _handle_pointer_arg(val, func, builder):
"""Convert pointer type for bpf_printk."""
target, depth = get_base_type_and_depth(val.type)

View File

@ -1,5 +1,4 @@
from .maps import HashMap, PerfEventArray, RingBuffer
from .maps import HashMap, PerfEventArray, RingBuf
from .maps_pass import maps_proc
from .map_types import BPFMapType
__all__ = ["HashMap", "PerfEventArray", "maps_proc", "RingBuffer", "BPFMapType"]
__all__ = ["HashMap", "PerfEventArray", "maps_proc", "RingBuf"]

View File

@ -2,7 +2,7 @@ from pythonbpf.debuginfo import DebugInfoGenerator
from .map_types import BPFMapType
def create_map_debug_info(module, map_global, map_name, map_params, structs_sym_tab):
def create_map_debug_info(module, map_global, map_name, map_params):
"""Generate debug info metadata for BPF maps HASH and PERF_EVENT_ARRAY"""
generator = DebugInfoGenerator(module)
@ -64,13 +64,7 @@ def create_map_debug_info(module, map_global, map_name, map_params, structs_sym_
return global_var
# TODO: This should not be exposed outside of the module.
# Ideally we should expose a single create_map_debug_info function that handles all map types.
# We can probably use a registry pattern to register different map types and their debug info generators.
# map_params["type"] will be used to determine which generator to use.
def create_ringbuf_debug_info(
module, map_global, map_name, map_params, structs_sym_tab
):
def create_ringbuf_debug_info(module, map_global, map_name, map_params):
"""Generate debug information metadata for BPF RINGBUF map"""
generator = DebugInfoGenerator(module)

View File

@ -36,14 +36,11 @@ class PerfEventArray:
pass # Placeholder for output method
class RingBuffer:
class RingBuf:
def __init__(self, max_entries):
self.max_entries = max_entries
def output(self, data, flags=0):
pass
def reserve(self, size: int):
def reserve(self, size: int, flags=0):
if size > self.max_entries:
raise ValueError("size cannot be greater than set maximum entries")
return 0
@ -51,7 +48,4 @@ class RingBuffer:
def submit(self, data, flags=0):
pass
def discard(self, data, flags=0):
pass
# add discard, output and also give names to flags and stuff

View File

@ -3,7 +3,7 @@ import logging
from logging import Logger
from llvmlite import ir
from .maps_utils import MapProcessorRegistry, MapSymbol
from .maps_utils import MapProcessorRegistry
from .map_types import BPFMapType
from .map_debug_info import create_map_debug_info, create_ringbuf_debug_info
from pythonbpf.expr.vmlinux_registry import VmlinuxHandlerRegistry
@ -12,15 +12,13 @@ from pythonbpf.expr.vmlinux_registry import VmlinuxHandlerRegistry
logger: Logger = logging.getLogger(__name__)
def maps_proc(tree, module, chunks, structs_sym_tab):
def maps_proc(tree, module, chunks):
"""Process all functions decorated with @map to find BPF maps"""
map_sym_tab = {}
for func_node in chunks:
if is_map(func_node):
logger.info(f"Found BPF map: {func_node.name}")
map_sym_tab[func_node.name] = process_bpf_map(
func_node, module, structs_sym_tab
)
map_sym_tab[func_node.name] = process_bpf_map(func_node, module)
return map_sym_tab
@ -48,7 +46,7 @@ def create_bpf_map(module, map_name, map_params):
map_global.align = 8
logger.info(f"Created BPF map: {map_name} with params {map_params}")
return MapSymbol(type=map_params["type"], sym=map_global)
return map_global
def _parse_map_params(rval, expected_args=None):
@ -62,8 +60,7 @@ def _parse_map_params(rval, expected_args=None):
if i < len(rval.args):
arg = rval.args[i]
if isinstance(arg, ast.Name):
result = _get_vmlinux_enum(handler, arg.id)
params[arg_name] = result if result is not None else arg.id
params[arg_name] = arg.id
elif isinstance(arg, ast.Constant):
params[arg_name] = arg.value
@ -71,46 +68,33 @@ def _parse_map_params(rval, expected_args=None):
for keyword in rval.keywords:
if isinstance(keyword.value, ast.Name):
name = keyword.value.id
result = _get_vmlinux_enum(handler, name)
params[keyword.arg] = result if result is not None else name
if handler and handler.is_vmlinux_enum(name):
result = handler.get_vmlinux_enum_value(name)
params[keyword.arg] = result if result is not None else name
else:
params[keyword.arg] = name
elif isinstance(keyword.value, ast.Constant):
params[keyword.arg] = keyword.value.value
return params
def _get_vmlinux_enum(handler, name):
if handler and handler.is_vmlinux_enum(name):
return handler.get_vmlinux_enum_value(name)
@MapProcessorRegistry.register("RingBuffer")
def process_ringbuf_map(map_name, rval, module, structs_sym_tab):
@MapProcessorRegistry.register("RingBuf")
def process_ringbuf_map(map_name, rval, module):
"""Process a BPF_RINGBUF map declaration"""
logger.info(f"Processing Ringbuf: {map_name}")
map_params = _parse_map_params(rval, expected_args=["max_entries"])
map_params["type"] = BPFMapType.RINGBUF
# NOTE: constraints borrowed from https://docs.ebpf.io/linux/map-type/BPF_MAP_TYPE_RINGBUF/
max_entries = map_params.get("max_entries")
if (
not isinstance(max_entries, int)
or max_entries < 4096
or (max_entries & (max_entries - 1)) != 0
):
raise ValueError(
"Ringbuf max_entries must be a power of two greater than or equal to the page size (4096)"
)
logger.info(f"Ringbuf map parameters: {map_params}")
map_global = create_bpf_map(module, map_name, map_params)
create_ringbuf_debug_info(module, map_global.sym, map_name, map_params)
create_ringbuf_debug_info(module, map_global, map_name, map_params)
return map_global
@MapProcessorRegistry.register("HashMap")
def process_hash_map(map_name, rval, module, structs_sym_tab):
def process_hash_map(map_name, rval, module):
"""Process a BPF_HASH map declaration"""
logger.info(f"Processing HashMap: {map_name}")
map_params = _parse_map_params(rval, expected_args=["key", "value", "max_entries"])
@ -119,12 +103,12 @@ def process_hash_map(map_name, rval, module, structs_sym_tab):
logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(module, map_name, map_params)
# Generate debug info for BTF
create_map_debug_info(module, map_global.sym, map_name, map_params)
create_map_debug_info(module, map_global, map_name, map_params)
return map_global
@MapProcessorRegistry.register("PerfEventArray")
def process_perf_event_map(map_name, rval, module, structs_sym_tab):
def process_perf_event_map(map_name, rval, module):
"""Process a BPF_PERF_EVENT_ARRAY map declaration"""
logger.info(f"Processing PerfEventArray: {map_name}")
map_params = _parse_map_params(rval, expected_args=["key_size", "value_size"])
@ -133,11 +117,11 @@ def process_perf_event_map(map_name, rval, module, structs_sym_tab):
logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(module, map_name, map_params)
# Generate debug info for BTF
create_map_debug_info(module, map_global.sym, map_name, map_params)
create_map_debug_info(module, map_global, map_name, map_params)
return map_global
def process_bpf_map(func_node, module, structs_sym_tab):
def process_bpf_map(func_node, module):
"""Process a BPF map (a function decorated with @map)"""
map_name = func_node.name
logger.info(f"Processing BPF map: {map_name}")
@ -156,7 +140,7 @@ def process_bpf_map(func_node, module, structs_sym_tab):
if isinstance(rval, ast.Call) and isinstance(rval.func, ast.Name):
handler = MapProcessorRegistry.get_processor(rval.func.id)
if handler:
return handler(map_name, rval, module, structs_sym_tab)
return handler(map_name, rval, module)
else:
logger.warning(f"Unknown map type {rval.func.id}, defaulting to HashMap")
return process_hash_map(map_name, rval, module)

View File

@ -1,16 +1,5 @@
from collections.abc import Callable
from dataclasses import dataclass
from llvmlite import ir
from typing import Any
from .map_types import BPFMapType
@dataclass
class MapSymbol:
"""Class representing a symbol on the map"""
type: BPFMapType
sym: ir.GlobalVariable
class MapProcessorRegistry:

View File

@ -16,8 +16,6 @@ mapping = {
"c_long": ir.IntType(64),
"c_ulong": ir.IntType(64),
"c_longlong": ir.IntType(64),
"c_uint": ir.IntType(32),
"c_int": ir.IntType(32),
# Not so sure about this one
"str": ir.PointerType(ir.IntType(8)),
}

View File

@ -16,10 +16,37 @@ def get_module_symbols(module_name: str):
return [name for name in dir(imported_module)], imported_module
def unwrap_pointer_type(type_obj: Any) -> Any:
"""
Recursively unwrap all pointer layers to get the base type.
This handles multiply nested pointers like LP_LP_struct_attribute_group
and returns the base type (struct_attribute_group).
Stops unwrapping when reaching a non-pointer type (one without _type_ attribute).
Args:
type_obj: The type object to unwrap
Returns:
The base type after unwrapping all pointer layers
"""
current_type = type_obj
# Keep unwrapping while it's a pointer/array type (has _type_)
# But stop if _type_ is just a string or basic type marker
while hasattr(current_type, "_type_"):
next_type = current_type._type_
# Stop if _type_ is a string (like 'c' for c_char)
if isinstance(next_type, str):
break
current_type = next_type
return current_type
def process_vmlinux_class(
node,
llvm_module,
handler: DependencyHandler,
node,
llvm_module,
handler: DependencyHandler,
):
symbols_in_module, imported_module = get_module_symbols("vmlinux")
if node.name in symbols_in_module:
@ -30,10 +57,10 @@ def process_vmlinux_class(
def process_vmlinux_post_ast(
elem_type_class,
llvm_handler,
handler: DependencyHandler,
processing_stack=None,
elem_type_class,
llvm_handler,
handler: DependencyHandler,
processing_stack=None,
):
# Initialize processing stack on first call
if processing_stack is None:
@ -113,7 +140,7 @@ def process_vmlinux_post_ast(
# Process pointer to ctype
if isinstance(elem_type, type) and issubclass(
elem_type, ctypes._Pointer
elem_type, ctypes._Pointer
):
# Get the pointed-to type
pointed_type = elem_type._type_
@ -126,7 +153,7 @@ def process_vmlinux_post_ast(
# Process function pointers (CFUNCTYPE)
elif hasattr(elem_type, "_restype_") and hasattr(
elem_type, "_argtypes_"
elem_type, "_argtypes_"
):
# This is a CFUNCTYPE or similar
logger.info(
@ -158,13 +185,19 @@ def process_vmlinux_post_ast(
if hasattr(elem_type, "_length_") and is_complex_type:
type_length = elem_type._length_
if containing_type.__module__ == "vmlinux":
new_dep_node.add_dependent(
elem_type._type_.__name__
if hasattr(elem_type._type_, "__name__")
else str(elem_type._type_)
# Unwrap all pointer layers to get the base type for dependency tracking
base_type = unwrap_pointer_type(elem_type)
base_type_module = getattr(base_type, "__module__", None)
if base_type_module == "vmlinux":
base_type_name = (
base_type.__name__
if hasattr(base_type, "__name__")
else str(base_type)
)
elif containing_type.__module__ == ctypes.__name__:
new_dep_node.add_dependent(base_type_name)
elif base_type_module == ctypes.__name__ or base_type_module is None:
# Handle ctypes or types with no module (like some internal ctypes types)
if isinstance(elem_type, type):
if issubclass(elem_type, ctypes.Array):
ctype_complex_type = ctypes.Array
@ -178,7 +211,7 @@ def process_vmlinux_post_ast(
raise TypeError("Unsupported ctypes subclass")
else:
raise ImportError(
f"Unsupported module of {containing_type}"
f"Unsupported module of {base_type}: {base_type_module}"
)
logger.debug(
f"{containing_type} containing type of parent {elem_name} with {elem_type} and ctype {ctype_complex_type} and length {type_length}"
@ -191,11 +224,16 @@ def process_vmlinux_post_ast(
elem_name, ctype_complex_type
)
new_dep_node.set_field_type(elem_name, elem_type)
if containing_type.__module__ == "vmlinux":
# Check the containing_type module to decide whether to recurse
containing_type_module = getattr(containing_type, "__module__", None)
if containing_type_module == "vmlinux":
# Also unwrap containing_type to get base type name
base_containing_type = unwrap_pointer_type(containing_type)
containing_type_name = (
containing_type.__name__
if hasattr(containing_type, "__name__")
else str(containing_type)
base_containing_type.__name__
if hasattr(base_containing_type, "__name__")
else str(base_containing_type)
)
# Check for self-reference or already processed
@ -212,21 +250,21 @@ def process_vmlinux_post_ast(
)
new_dep_node.set_field_ready(elem_name, True)
else:
# Process recursively - THIS WAS MISSING
# Process recursively - use base containing type, not the pointer wrapper
new_dep_node.add_dependent(containing_type_name)
process_vmlinux_post_ast(
containing_type,
base_containing_type,
llvm_handler,
handler,
processing_stack,
)
new_dep_node.set_field_ready(elem_name, True)
elif containing_type.__module__ == ctypes.__name__:
elif containing_type_module == ctypes.__name__ or containing_type_module is None:
logger.debug(f"Processing ctype internal{containing_type}")
new_dep_node.set_field_ready(elem_name, True)
else:
raise TypeError(
"Module not supported in recursive resolution"
f"Module not supported in recursive resolution: {containing_type_module}"
)
else:
new_dep_node.add_dependent(
@ -245,9 +283,12 @@ def process_vmlinux_post_ast(
raise ValueError(
f"{elem_name} with type {elem_type} from module {module_name} not supported in recursive resolver"
)
elif module_name == ctypes.__name__ or module_name is None:
# Handle ctypes types - these don't need processing, just return
logger.debug(f"Skipping ctypes type {current_symbol_name}")
return True
else:
raise ImportError("UNSUPPORTED Module")
raise ImportError(f"UNSUPPORTED Module {module_name}")
logger.info(
f"{current_symbol_name} processed and handler readiness {handler.is_ready}"

View File

@ -11,7 +11,9 @@ from .class_handler import process_vmlinux_class
logger = logging.getLogger(__name__)
def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
def detect_import_statement(
tree: ast.AST,
) -> list[tuple[str, ast.ImportFrom, str, str]]:
"""
Parse AST and detect import statements from vmlinux.
@ -25,7 +27,7 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
List of tuples containing (module_name, imported_item) for each vmlinux import
Raises:
SyntaxError: If multiple imports from vmlinux are attempted or import * is used
SyntaxError: If import * is used
"""
vmlinux_imports = []
@ -40,28 +42,19 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
"Please import specific types explicitly."
)
# Check for multiple imports: from vmlinux import A, B, C
if len(node.names) > 1:
imported_names = [alias.name for alias in node.names]
raise SyntaxError(
f"Multiple imports from vmlinux are not supported. "
f"Found: {', '.join(imported_names)}. "
f"Please use separate import statements for each type."
)
# Check if no specific import is specified (should not happen with valid Python)
if len(node.names) == 0:
raise SyntaxError(
"Import from vmlinux must specify at least one type."
)
# Valid single import
# Support multiple imports: from vmlinux import A, B, C
for alias in node.names:
import_name = alias.name
# Use alias if provided, otherwise use the original name (commented)
# as_name = alias.asname if alias.asname else alias.name
vmlinux_imports.append(("vmlinux", node))
logger.info(f"Found vmlinux import: {import_name}")
# Use alias if provided, otherwise use the original name
as_name = alias.asname if alias.asname else alias.name
vmlinux_imports.append(("vmlinux", node, import_name, as_name))
logger.info(f"Found vmlinux import: {import_name} as {as_name}")
# Handle "import vmlinux" statements (not typical but should be rejected)
elif isinstance(node, ast.Import):
@ -73,6 +66,7 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
)
logger.info(f"Total vmlinux imports detected: {len(vmlinux_imports)}")
# print(f"\n**************\n{vmlinux_imports}\n**************\n")
return vmlinux_imports
@ -103,40 +97,37 @@ def vmlinux_proc(tree: ast.AST, module):
with open(source_file, "r") as f:
mod_ast = ast.parse(f.read(), filename=source_file)
for import_mod, import_node in import_statements:
for alias in import_node.names:
imported_name = alias.name
found = False
for mod_node in mod_ast.body:
if (
isinstance(mod_node, ast.ClassDef)
and mod_node.name == imported_name
):
process_vmlinux_class(mod_node, module, handler)
found = True
break
if isinstance(mod_node, ast.Assign):
for target in mod_node.targets:
if isinstance(target, ast.Name) and target.id == imported_name:
process_vmlinux_assign(mod_node, module, assignments)
found = True
break
if found:
break
if not found:
logger.info(
f"{imported_name} not found as ClassDef or Assign in vmlinux"
)
for import_mod, import_node, imported_name, as_name in import_statements:
found = False
for mod_node in mod_ast.body:
if isinstance(mod_node, ast.ClassDef) and mod_node.name == imported_name:
process_vmlinux_class(mod_node, module, handler)
found = True
break
if isinstance(mod_node, ast.Assign):
for target in mod_node.targets:
if isinstance(target, ast.Name) and target.id == imported_name:
process_vmlinux_assign(mod_node, module, assignments, as_name)
found = True
break
if found:
break
if not found:
logger.info(f"{imported_name} not found as ClassDef or Assign in vmlinux")
IRGenerator(module, handler, assignments)
return assignments
def process_vmlinux_assign(node, module, assignments: dict[str, AssignmentInfo]):
def process_vmlinux_assign(
node, module, assignments: dict[str, AssignmentInfo], target_name=None
):
"""Process assignments from vmlinux module."""
# Only handle single-target assignments
if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
target_name = node.targets[0].id
# Use provided target_name (for aliased imports) or fall back to original name
if target_name is None:
target_name = node.targets[0].id
# Handle constant value assignments
if isinstance(node.value, ast.Constant):

View File

@ -46,13 +46,14 @@ def debug_info_generation(
if struct.name.startswith("struct_"):
struct_name = struct.name.removeprefix("struct_")
# Create struct type with all members
struct_type = generator.create_struct_type_with_name(
struct_name, members, struct.__sizeof__() * 8, is_distinct=True
)
else:
raise ValueError("Unions are not supported in the current version")
# Create struct type with all members
struct_type = generator.create_struct_type_with_name(
struct_name, members, struct.__sizeof__() * 8, is_distinct=True
)
logger.warning("Blindly handling Unions present in vmlinux dependencies")
struct_type = None
# raise ValueError("Unions are not supported in the current version")
return struct_type
@ -62,7 +63,7 @@ def _get_field_debug_type(
generator: DebugInfoGenerator,
parent_struct: DependencyNode,
generated_debug_info: List[Tuple[DependencyNode, Any]],
) -> tuple[Any, int]:
) -> tuple[Any, int] | None:
"""
Determine the appropriate debug type for a field based on its Python/ctypes type.
@ -78,7 +79,11 @@ def _get_field_debug_type(
"""
# Handle complex types (arrays, pointers)
if field.ctype_complex_type is not None:
if issubclass(field.ctype_complex_type, ctypes.Array):
#TODO: Check if this is a CFUNCTYPE (function pointer), but sadly it just checks callable for now
if callable(field.ctype_complex_type):
# Handle function pointer types, create a void pointer as a placeholder
return generator.create_pointer_type(None), 64
elif issubclass(field.ctype_complex_type, ctypes.Array):
# Handle array types
element_type, base_type_size = _get_basic_debug_type(
field.containing_type, generator

View File

@ -11,6 +11,9 @@ logger = logging.getLogger(__name__)
class IRGenerator:
# This field keeps track of the non_struct names to avoid duplicate name errors.
type_number = 0
unprocessed_store = []
# get the assignments dict and add this stuff to it.
def __init__(self, llvm_module, handler: DependencyHandler, assignments):
self.llvm_module = llvm_module
@ -68,6 +71,7 @@ class IRGenerator:
dep_node_from_dependency, processing_stack
)
else:
print(struct)
raise RuntimeError(
f"Warning: Dependency {dependency} not found in handler"
)
@ -82,6 +86,7 @@ class IRGenerator:
members_dict = {}
for field_name, field in struct.fields.items():
# Get the generated field name from our dictionary, or use field_name if not found
print(f"DEBUG: {struct.name}, {field_name}")
if (
struct.name in self.generated_field_names
and field_name in self.generated_field_names[struct.name]
@ -129,7 +134,20 @@ class IRGenerator:
for field_name, field in struct.fields.items():
# does not take arrays and similar types into consideration yet.
if field.ctype_complex_type is not None and issubclass(
if callable(field.ctype_complex_type):
# Function pointer case - generate a simple field accessor
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index
)
print(field_co_re_name)
field_index += 1
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
self.generated_field_names[struct.name][field_name] = globvar
elif field.ctype_complex_type is not None and issubclass(
field.ctype_complex_type, ctypes.Array
):
array_size = field.type_size
@ -137,7 +155,7 @@ class IRGenerator:
if containing_type.__module__ == ctypes.__name__:
containing_type_size = ctypes.sizeof(containing_type)
if array_size == 0:
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, 0, containing_type_size
)
globvar = ir.GlobalVariable(
@ -149,7 +167,7 @@ class IRGenerator:
field_index += 1
continue
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
@ -163,12 +181,26 @@ class IRGenerator:
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == "vmlinux":
containing_type_size = self.handler[
containing_type.__name__
].current_offset
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
print(struct)
# Unwrap all pointer layers to get the base struct type
base_containing_type = containing_type
while hasattr(base_containing_type, "_type_"):
next_type = base_containing_type._type_
# Stop if _type_ is a string (like 'c' for c_char)
#TODO: stacked pointers not handl;ing ctypes check here as well
if isinstance(next_type, str):
break
base_containing_type = next_type
# Get the base struct name
base_struct_name = base_containing_type.__name__ if hasattr(base_containing_type, "__name__") else str(base_containing_type)
# Look up the size using the base struct name
containing_type_size = self.handler[base_struct_name].current_offset
print(f"GAY: {array_size}, {struct.name}, {field_name}")
if array_size == 0:
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, 0, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
@ -176,9 +208,21 @@ class IRGenerator:
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
self.generated_field_names[struct.name][field_name] = globvar
field_index += 1
field_index += 1
else:
for i in range(0, array_size):
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
self.generated_field_names[struct.name][field_name] = globvar
field_index += 1
else:
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index
)
field_index += 1
@ -198,7 +242,7 @@ class IRGenerator:
is_indexed: bool = False,
index: int = 0,
containing_type_size: int = 0,
) -> str:
) -> tuple[str, bool]:
# TODO: Does not support Unions as well as recursive pointer and array type naming
if is_indexed:
name = (
@ -208,7 +252,7 @@ class IRGenerator:
+ "$"
+ f"0:{field_index}:{index}"
)
return name
return name, True
elif struct.name.startswith("struct_"):
name = (
"llvm."
@ -217,9 +261,18 @@ class IRGenerator:
+ "$"
+ f"0:{field_index}"
)
return name
return name, True
else:
print(self.handler[struct.name])
raise TypeError(
"Name generation cannot occur due to type name not starting with struct"
logger.warning(
"Blindly handling non-struct type to avoid type errors in vmlinux IR generation. Possibly a union."
)
self.type_number += 1
unprocessed_type = "unprocessed_type_" + str(self.handler[struct.name].name)
if self.unprocessed_store.__contains__(unprocessed_type):
return unprocessed_type + "_" + str(self.type_number), False
else:
self.unprocessed_store.append(unprocessed_type)
return unprocessed_type, False
# raise TypeError(
# "Name generation cannot occur due to type name not starting with struct"
# )

View File

@ -1,6 +1,6 @@
import logging
from typing import Any
import ctypes
from llvmlite import ir
from pythonbpf.local_symbol import LocalSymbol
@ -94,19 +94,22 @@ class VmlinuxHandler:
f"Attempting to access field {field_name} of possible vmlinux struct {struct_var_name}"
)
python_type: type = var_info.metadata
struct_name = python_type.__name__
globvar_ir, field_data = self.get_field_type(struct_name, field_name)
builder.function.args[0].type = ir.PointerType(ir.IntType(8))
field_ptr = self.load_ctx_field(
builder, builder.function.args[0], globvar_ir, field_data, struct_name
globvar_ir, field_data = self.get_field_type(
python_type.__name__, field_name
)
builder.function.args[0].type = ir.PointerType(ir.IntType(8))
print(builder.function.args[0])
field_ptr = self.load_ctx_field(
builder, builder.function.args[0], globvar_ir
)
print(field_ptr)
# Return pointer to field and field type
return field_ptr, field_data
else:
raise RuntimeError("Variable accessed not found in symbol table")
@staticmethod
def load_ctx_field(builder, ctx_arg, offset_global, field_data, struct_name=None):
def load_ctx_field(builder, ctx_arg, offset_global):
"""
Generate LLVM IR to load a field from BPF context using offset.
@ -114,10 +117,9 @@ class VmlinuxHandler:
builder: llvmlite IRBuilder instance
ctx_arg: The context pointer argument (ptr/i8*)
offset_global: Global variable containing the field offset (i64)
field_data: contains data about the field
struct_name: Name of the struct being accessed (optional)
Returns:
The loaded value (i64 register or appropriately sized)
The loaded value (i64 register)
"""
# Load the offset value
@ -162,61 +164,13 @@ class VmlinuxHandler:
passthrough_fn, [ir.Constant(ir.IntType(32), 0), field_ptr], tail=True
)
# Determine the appropriate IR type based on field information
int_width = 64 # Default to 64-bit
needs_zext = False # Track if we need zero-extension for xdp_md
if field_data is not None:
# Try to determine the size from field metadata
if field_data.type.__module__ == ctypes.__name__:
try:
field_size_bytes = ctypes.sizeof(field_data.type)
field_size_bits = field_size_bytes * 8
if field_size_bits in [8, 16, 32, 64]:
int_width = field_size_bits
logger.info(f"Determined field size: {int_width} bits")
# Special handling for struct_xdp_md i32 fields
# Load as i32 but extend to i64 before storing
if struct_name == "struct_xdp_md" and int_width == 32:
needs_zext = True
logger.info(
"struct_xdp_md i32 field detected, will zero-extend to i64"
)
else:
logger.warning(
f"Unusual field size {field_size_bits} bits, using default 64"
)
except Exception as e:
logger.warning(
f"Could not determine field size: {e}, using default 64"
)
elif field_data.type.__module__ == "vmlinux":
# For pointers to structs or complex vmlinux types
if field_data.ctype_complex_type is not None and issubclass(
field_data.ctype_complex_type, ctypes._Pointer
):
int_width = 64 # Pointers are always 64-bit
logger.info("Field is a pointer type, using 64 bits")
# TODO: Add handling for other complex types (arrays, embedded structs, etc.)
else:
logger.warning("Complex vmlinux field type, using default 64 bits")
# Bitcast to appropriate pointer type based on determined width
ptr_type = ir.PointerType(ir.IntType(int_width))
typed_ptr = builder.bitcast(verified_ptr, ptr_type)
# Bitcast to i64* (assuming field is 64-bit, adjust if needed)
i64_ptr_type = ir.PointerType(ir.IntType(64))
typed_ptr = builder.bitcast(verified_ptr, i64_ptr_type)
# Load and return the value
value = builder.load(typed_ptr)
# Zero-extend i32 to i64 for struct_xdp_md fields
if needs_zext:
value = builder.zext(value, ir.IntType(64))
logger.info("Zero-extended i32 value to i64 for struct_xdp_md field")
return value
def has_field(self, struct_name, field_name):

View File

@ -1,23 +1,19 @@
BPF_CLANG := clang
CFLAGS := -emit-llvm -target bpf -c
CFLAGS := -O0 -emit-llvm -target bpf -c
SRC := $(wildcard *.bpf.c)
LL := $(SRC:.bpf.c=.bpf.ll)
LL2 := $(SRC:.bpf.c=.bpf.o2.ll)
OBJ := $(SRC:.bpf.c=.bpf.o)
.PHONY: all clean
all: $(LL) $(OBJ) $(LL2)
all: $(LL) $(OBJ)
%.bpf.o: %.bpf.c
$(BPF_CLANG) -O2 -g -target bpf -c $< -o $@
%.bpf.ll: %.bpf.c
$(BPF_CLANG) -O0 $(CFLAGS) -g -S $< -o $@
%.bpf.o2.ll: %.bpf.c
$(BPF_CLANG) -O2 $(CFLAGS) -g -S $< -o $@
$(BPF_CLANG) $(CFLAGS) -g -S $< -o $@
clean:
rm -f $(LL) $(OBJ) $(LL2)
rm -f $(LL) $(OBJ)

View File

@ -1,15 +0,0 @@
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
SEC("xdp")
int print_xdp_data(struct xdp_md *ctx)
{
// 'data' is a pointer to the start of packet data
long data = (long)ctx->data;
bpf_printk("ctx->data = %lld\n", data);
return XDP_PASS;
}
char LICENSE[] SEC("license") = "GPL";

View File

@ -1,30 +0,0 @@
import logging
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
from pythonbpf import compile # noqa: F401
from vmlinux import TASK_COMM_LEN # noqa: F401
from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
from ctypes import c_int64, c_int32, c_void_p # noqa: F401
# from vmlinux import struct_uinput_device
# from vmlinux import struct_blk_integrity_iter
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: struct_trace_event_raw_sys_enter) -> c_int64:
b = ctx.args
c = b[0]
print(f"This is context args field {c}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("args_test.py", "args_test.ll", loglevel=logging.INFO)
compile()

View File

@ -0,0 +1,21 @@
from vmlinux import struct_request, struct_pt_regs, XDP_PASS
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
import logging
@bpf
@section("kprobe/blk_mq_start_request")
def example(ctx: struct_pt_regs):
req = struct_request(ctx.di)
c = req.__data_len
d = XDP_PASS
print(f"data length {c} and test {d}")
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("requests.py", "requests.ll", loglevel=logging.INFO)

View File

@ -0,0 +1,19 @@
from vmlinux import struct_kobj_type
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
import logging
from ctypes import c_void_p
@bpf
@section("kprobe/blk_mq_start_request")
def example(ctx: c_void_p):
print(f"data lengt")
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("requests.py", "requests.ll", loglevel=logging.INFO)

View File

@ -1,29 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile, struct
from ctypes import c_void_p, c_int64, c_uint64, c_uint32
from pythonbpf.helper import probe_read
@bpf
@struct
class data_t:
pid: c_uint32
value: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def test_probe_read(ctx: c_void_p) -> c_int64:
"""Test bpf_probe_read helper function"""
data = data_t()
probe_read(data.value, 8, ctx)
probe_read(data.pid, 4, ctx)
return 0
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,25 +0,0 @@
from pythonbpf import bpf, bpfglobal, section, BPF, trace_pipe
from ctypes import c_void_p, c_int64
from pythonbpf.helper import random
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello_world(ctx: c_void_p) -> c_int64:
r = random()
print(f"Hello, World!, {r}")
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
trace_pipe()

View File

@ -1,40 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile, struct
from ctypes import c_void_p, c_int64, c_uint32, c_uint64
from pythonbpf.helper import smp_processor_id, ktime
@bpf
@struct
class cpu_event_t:
cpu_id: c_uint32
timestamp: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def trace_with_cpu(ctx: c_void_p) -> c_int64:
"""Test bpf_get_smp_processor_id helper function"""
# Get the current CPU ID
cpu = smp_processor_id()
# Print it
print(f"Running on CPU {cpu}")
# Use it in a struct
event = cpu_event_t()
event.cpu_id = smp_processor_id()
event.timestamp = ktime()
print(f"Event on CPU {event.cpu_id} at time {event.timestamp}")
return 0
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,31 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
from pythonbpf.helper import uid, pid
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def filter_by_user(ctx: c_void_p) -> c_int64:
"""Filter events by specific user ID"""
current_uid = uid()
# Only trace root user (UID 0)
if current_uid == 0:
process_id = pid()
print(f"Root process {process_id} executed")
# Or trace specific user (e.g., UID 1000)
if current_uid == 1002:
print("User 1002 executed something")
return 0
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,31 +0,0 @@
from ctypes import c_int64, c_void_p
from pythonbpf import bpf, section, bpfglobal, compile_to_ir, compile
from vmlinux import struct_xdp_md
from vmlinux import XDP_PASS
@bpf
@section("xdp")
def print_xdp_dat2a(ct2x: struct_xdp_md) -> c_int64:
data = ct2x.data # 32-bit field: packet start pointer
print(f"ct2x->data = {data}")
return c_int64(XDP_PASS)
@bpf
@section("xdp")
def print_xdp_data(ctx: struct_xdp_md) -> c_int64:
data = ctx.data # 32-bit field: packet start pointer
something = c_void_p(data)
print(f"ctx->data = {something}")
return c_int64(XDP_PASS)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("i32_test.py", "i32_test.ll")
compile()

View File

@ -1,24 +0,0 @@
from ctypes import c_int64
from pythonbpf import bpf, section, bpfglobal, compile
from vmlinux import struct_xdp_md
from vmlinux import XDP_PASS
import logging
@bpf
@section("xdp")
def print_xdp_data(ctx: struct_xdp_md) -> c_int64:
data = 0
data = ctx.data # 32-bit field: packet start pointer
something = 2 + data
print(f"ctx->data = {something}")
return c_int64(XDP_PASS)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile(logging.INFO)

View File

@ -1,24 +0,0 @@
from ctypes import c_int64
from pythonbpf import bpf, section, bpfglobal, compile, compile_to_ir
from vmlinux import struct_xdp_md
from vmlinux import XDP_PASS
import logging
@bpf
@section("xdp")
def print_xdp_data(ctx: struct_xdp_md) -> c_int64:
data = c_int64(ctx.data) # 32-bit field: packet start pointer
something = 2 + data
print(f"ctx->data = {something}")
return c_int64(XDP_PASS)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("i32_test_fail_2.py", "i32_test_fail_2.ll")
compile(logging.INFO)

View File

@ -44,4 +44,4 @@ def LICENSE() -> str:
compile_to_ir("simple_struct_test.py", "simple_struct_test.ll", loglevel=logging.DEBUG)
compile()
# compile()