24 Commits

Author SHA1 Message Date
7ae629e8f7 bump version to v0.1.5 2025-10-16 19:04:04 +05:30
dd734ea2aa Merge pull request #56 from pythonbpf/vmlinux-ir-gen
Adds IR and debug info generation capabilities for vmlinux imported structs
2025-10-16 18:59:32 +05:30
71d005b6b1 complete vmlinux struct name generation in IR.
* Breaks when it finds unions.
* Still does not support function pointers.
2025-10-16 18:58:28 +05:30
5d9a29ee8e format chore 2025-10-16 18:22:25 +05:30
041e538b53 fix errors. Does not support union name resolution yet. 2025-10-16 18:21:14 +05:30
5413cc793b something fixed itself. 2025-10-16 18:06:36 +05:30
f21837aefe support most bitfields 2025-10-16 04:13:04 +05:30
0f5c1fa752 format chore 2025-10-16 04:10:24 +05:30
de02731ea1 add support with ctypes getattr offset. Also supports bitfields.
* breaks when struct_ring_buffer_per_cpu
2025-10-16 04:08:06 +05:30
c22d85ceb8 add array field generation support 2025-10-15 23:56:04 +05:30
2b3c81affa TODO added for llvmlite attribute issue
*Refer: https://github.com/numba/llvmlite/issues/1331

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-15 21:35:28 +05:30
8372111616 add basic IR gen strategy 2025-10-15 21:25:53 +05:30
eb4ee64ee5 Revert "float vmlinux_assignments_symtab"
This reverts commit ce7b170fea.
2025-10-15 19:11:53 +05:30
ce7b170fea float vmlinux_assignments_symtab 2025-10-15 18:19:51 +05:30
9a60dd87e3 Merge pull request #55 from pythonbpf/vmlinux-ir-gen
remove bitfield support and add assignment support
2025-10-15 18:07:27 +05:30
c499fe7421 solve static typing issues 2025-10-15 18:05:57 +05:30
8239097fbb format chore 2025-10-15 17:49:38 +05:30
a4cfc2b7aa add assignments table and offset handler 2025-10-15 17:49:20 +05:30
69b73003ca setup skeleton for offset calculation 2025-10-15 04:42:38 +05:30
11e8e72188 add base for ir gen 2025-10-15 02:00:23 +05:30
d3f0e3b2ef remove tbaa_gen and make IR generator module 2025-10-14 03:09:18 +05:30
09ba749b46 Merge pull request #52 from pythonbpf/vmlinux-ir-gen
Dependency tree functionality to semantic analyser
2025-10-14 02:37:43 +05:30
a03d3e5d4c format chore 2025-10-14 02:36:04 +05:30
e1f9ac6ba0 add dependency tree functionality 2025-10-14 02:35:49 +05:30
32 changed files with 545 additions and 1188 deletions

View File

@ -1,34 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, BPF, trace_fields
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
# header
print(f"{'TIME(s)':<18} {'COMM':<16} {'PID':<6} {'MESSAGE'}")
# format output
while True:
try:
(task, pid, cpu, flags, ts, msg) = trace_fields()
except ValueError:
continue
except KeyboardInterrupt:
exit()
print(f"{ts:<18} {task:<16} {pid:<6} {msg}")

View File

@ -1,61 +0,0 @@
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime, pid, comm
from pythonbpf.maps import PerfEventArray
from ctypes import c_void_p, c_int64, c_uint64
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
comm: str(16) # type: ignore [valid-type]
@bpf
@map
def events() -> PerfEventArray:
return PerfEventArray(key_size=c_int64, value_size=c_int64)
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello(ctx: c_void_p) -> c_int64:
dataobj = data_t()
dataobj.pid, dataobj.ts = pid(), ktime()
comm(dataobj.comm)
events.output(dataobj)
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
start = 0
def callback(cpu, event):
global start
if start == 0:
start = event.ts
ts = (event.ts - start) / 1e9
print(f"[CPU {cpu}] PID: {event.pid}, TS: {ts}, COMM: {event.comm.decode()}")
perf = b["events"].open_perf_buffer(callback, struct_name="data_t")
print("Starting to poll... (Ctrl+C to stop)")
print("Try running: fork() or clone() system calls to trigger events")
try:
while True:
b["events"].poll(1000)
except KeyboardInterrupt:
print("Stopping...")

View File

@ -1,23 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, BPF, trace_pipe
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello_world(ctx: c_void_p) -> c_int64:
print("Hello, World!")
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
trace_pipe()

View File

@ -1,58 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, BPF, trace_fields
from pythonbpf.helper import ktime
from pythonbpf.maps import HashMap
from ctypes import c_void_p, c_int64
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=2)
@bpf
@section("tracepoint/syscalls/sys_enter_sync")
def do_trace(ctx: c_void_p) -> c_int64:
ts_key, cnt_key = 0, 1
tsp, cntp = last.lookup(ts_key), last.lookup(cnt_key)
if not cntp:
last.update(cnt_key, 0)
cntp = last.lookup(cnt_key)
if tsp:
delta = ktime() - tsp
if delta < 1000000000:
time_ms = delta // 1000000
print(f"{time_ms} {cntp}")
last.delete(ts_key)
else:
last.update(ts_key, ktime())
last.update(cnt_key, cntp + 1)
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
print("Tracing for quick sync's... Ctrl-C to end")
# format output
start = 0
while True:
try:
task, pid, cpu, flags, ts, msg = trace_fields()
if start == 0:
start = ts
ts -= start
ms, cnt = msg.split()
print(f"At time {ts} s: Multiple syncs detected, last {ms} ms ago. Count {cnt}")
except KeyboardInterrupt:
exit()

View File

@ -1,78 +0,0 @@
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime
from pythonbpf.maps import HashMap
from pythonbpf.maps import PerfEventArray
from ctypes import c_void_p, c_int64
@bpf
@struct
class data_t:
ts: c_int64
ms: c_int64
@bpf
@map
def events() -> PerfEventArray:
return PerfEventArray(key_size=c_int64, value_size=c_int64)
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1)
@bpf
@section("tracepoint/syscalls/sys_enter_sync")
def do_trace(ctx: c_void_p) -> c_int64:
dat, dat.ts, key = data_t(), ktime(), 0
tsp = last.lookup(key)
if tsp:
delta = ktime() - tsp
if delta < 1000000000:
dat.ms = delta // 1000000
events.output(dat)
last.delete(key)
else:
last.update(key, ktime())
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
print("Tracing for quick sync's... Ctrl-C to end")
# format output
start = 0
def callback(cpu, event):
global start
if start == 0:
start = event.ts
event.ts -= start
print(
f"At time {event.ts / 1e9} s: Multiple sync detected, Last sync: {event.ms} ms ago"
)
perf = b["events"].open_perf_buffer(callback, struct_name="data_t")
print("Starting to poll... (Ctrl+C to stop)")
print("Try running: fork() or clone() system calls to trigger events")
try:
while True:
b["events"].poll(1000)
except KeyboardInterrupt:
print("Stopping...")

View File

@ -1,53 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, BPF, trace_fields
from pythonbpf.helper import ktime
from pythonbpf.maps import HashMap
from ctypes import c_void_p, c_int64
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1)
@bpf
@section("tracepoint/syscalls/sys_enter_sync")
def do_trace(ctx: c_void_p) -> c_int64:
key = 0
tsp = last.lookup(key)
if tsp:
delta = ktime() - tsp
if delta < 1000000000:
time_ms = delta // 1000000
print(f"{time_ms}")
last.delete(key)
else:
last.update(key, ktime())
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
print("Tracing for quick sync's... Ctrl-C to end")
# format output
start = 0
while True:
try:
task, pid, cpu, flags, ts, ms = trace_fields()
if start == 0:
start = ts
ts -= start
print(f"At time {ts} s: Multiple syncs detected, last {ms} ms ago")
except KeyboardInterrupt:
exit()

View File

@ -1,23 +0,0 @@
from pythonbpf import bpf, section, bpfglobal, BPF, trace_pipe
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_sync")
def hello_world(ctx: c_void_p) -> c_int64:
print("sys_sync() called")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Compile and load
b = BPF()
b.load()
b.attach_all()
print("Tracing sys_sync()... Ctrl-C to end.")
trace_pipe()

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "pythonbpf" name = "pythonbpf"
version = "0.1.4" version = "0.1.5"
description = "Reduced Python frontend for eBPF" description = "Reduced Python frontend for eBPF"
authors = [ authors = [
{ name = "r41k0u", email="pragyanshchaturvedi18@gmail.com" }, { name = "r41k0u", email="pragyanshchaturvedi18@gmail.com" },

View File

@ -1,6 +1,5 @@
from .decorators import bpf, map, section, bpfglobal, struct from .decorators import bpf, map, section, bpfglobal, struct
from .codegen import compile_to_ir, compile, BPF from .codegen import compile_to_ir, compile, BPF
from .utils import trace_pipe, trace_fields
__all__ = [ __all__ = [
"bpf", "bpf",
@ -11,6 +10,4 @@ __all__ = [
"compile_to_ir", "compile_to_ir",
"compile", "compile",
"BPF", "BPF",
"trace_pipe",
"trace_fields",
] ]

View File

@ -22,68 +22,44 @@ class LocalSymbol:
yield self.metadata yield self.metadata
def create_targets_and_rvals(stmt):
"""Create lists of targets and right-hand values from an assignment statement."""
if isinstance(stmt.targets[0], ast.Tuple):
if not isinstance(stmt.value, ast.Tuple):
logger.warning("Mismatched multi-target assignment, skipping allocation")
return
targets, rvals = stmt.targets[0].elts, stmt.value.elts
if len(targets) != len(rvals):
logger.warning("length of LHS != length of RHS, skipping allocation")
return
return targets, rvals
return stmt.targets, [stmt.value]
def handle_assign_allocation(builder, stmt, local_sym_tab, structs_sym_tab): def handle_assign_allocation(builder, stmt, local_sym_tab, structs_sym_tab):
"""Handle memory allocation for assignment statements.""" """Handle memory allocation for assignment statements."""
logger.info(f"Handling assignment for allocation: {ast.dump(stmt)}") # Validate assignment
if len(stmt.targets) != 1:
logger.warning("Multi-target assignment not supported, skipping allocation")
return
# NOTE: Support multi-target assignments (e.g.: a, b = 1, 2) target = stmt.targets[0]
targets, rvals = create_targets_and_rvals(stmt)
for target, rval in zip(targets, rvals): # Skip non-name targets (e.g., struct field assignments)
# Skip non-name targets (e.g., struct field assignments) if isinstance(target, ast.Attribute):
if isinstance(target, ast.Attribute): logger.debug(f"Struct field assignment to {target.attr}, no allocation needed")
logger.debug( return
f"Struct field assignment to {target.attr}, no allocation needed"
)
continue
if not isinstance(target, ast.Name): if not isinstance(target, ast.Name):
logger.warning( logger.warning(f"Unsupported assignment target type: {type(target).__name__}")
f"Unsupported assignment target type: {type(target).__name__}" return
)
continue
var_name = target.id var_name = target.id
rval = stmt.value
# Skip if already allocated # Skip if already allocated
if var_name in local_sym_tab: if var_name in local_sym_tab:
logger.debug(f"Variable {var_name} already allocated, skipping") logger.debug(f"Variable {var_name} already allocated, skipping")
continue return
# Determine type and allocate based on rval # Determine type and allocate based on rval
if isinstance(rval, ast.Call): if isinstance(rval, ast.Call):
_allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab) _allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab)
elif isinstance(rval, ast.Constant): elif isinstance(rval, ast.Constant):
_allocate_for_constant(builder, var_name, rval, local_sym_tab) _allocate_for_constant(builder, var_name, rval, local_sym_tab)
elif isinstance(rval, ast.BinOp): elif isinstance(rval, ast.BinOp):
_allocate_for_binop(builder, var_name, local_sym_tab) _allocate_for_binop(builder, var_name, local_sym_tab)
elif isinstance(rval, ast.Name): else:
# Variable-to-variable assignment (b = a) logger.warning(
_allocate_for_name(builder, var_name, rval, local_sym_tab) f"Unsupported assignment value type for {var_name}: {type(rval).__name__}"
elif isinstance(rval, ast.Attribute): )
# Struct field-to-variable assignment (a = dat.fld)
_allocate_for_attribute(
builder, var_name, rval, local_sym_tab, structs_sym_tab
)
else:
logger.warning(
f"Unsupported assignment value type for {var_name}: {type(rval).__name__}"
)
def _allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab): def _allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab):
@ -200,88 +176,3 @@ def allocate_temp_pool(builder, max_temps, local_sym_tab):
temp_var = builder.alloca(ir.IntType(64), name=temp_name) temp_var = builder.alloca(ir.IntType(64), name=temp_name)
temp_var.align = 8 temp_var.align = 8
local_sym_tab[temp_name] = LocalSymbol(temp_var, ir.IntType(64)) local_sym_tab[temp_name] = LocalSymbol(temp_var, ir.IntType(64))
def _allocate_for_name(builder, var_name, rval, local_sym_tab):
"""Allocate memory for variable-to-variable assignment (b = a)."""
source_var = rval.id
if source_var not in local_sym_tab:
logger.error(f"Source variable '{source_var}' not found in symbol table")
return
# Get type and metadata from source variable
source_symbol = local_sym_tab[source_var]
# Allocate with same type and alignment
var = _allocate_with_type(builder, var_name, source_symbol.ir_type)
local_sym_tab[var_name] = LocalSymbol(
var, source_symbol.ir_type, source_symbol.metadata
)
logger.info(
f"Pre-allocated {var_name} from {source_var} with type {source_symbol.ir_type}"
)
def _allocate_for_attribute(builder, var_name, rval, local_sym_tab, structs_sym_tab):
"""Allocate memory for struct field-to-variable assignment (a = dat.fld)."""
if not isinstance(rval.value, ast.Name):
logger.warning(f"Complex attribute access not supported for {var_name}")
return
struct_var = rval.value.id
field_name = rval.attr
# Validate struct and field
if struct_var not in local_sym_tab:
logger.error(f"Struct variable '{struct_var}' not found")
return
struct_type = local_sym_tab[struct_var].metadata
if not struct_type or struct_type not in structs_sym_tab:
logger.error(f"Struct type '{struct_type}' not found")
return
struct_info = structs_sym_tab[struct_type]
if field_name not in struct_info.fields:
logger.error(f"Field '{field_name}' not found in struct '{struct_type}'")
return
# Get field type
field_type = struct_info.field_type(field_name)
# Special case: char array -> allocate as i8* pointer instead
if (
isinstance(field_type, ir.ArrayType)
and isinstance(field_type.element, ir.IntType)
and field_type.element.width == 8
):
alloc_type = ir.PointerType(ir.IntType(8))
logger.info(f"Allocating {var_name} as i8* (pointer to char array)")
else:
alloc_type = field_type
var = _allocate_with_type(builder, var_name, alloc_type)
local_sym_tab[var_name] = LocalSymbol(var, alloc_type)
logger.info(
f"Pre-allocated {var_name} from {struct_var}.{field_name} with type {alloc_type}"
)
def _allocate_with_type(builder, var_name, ir_type):
"""Allocate variable with appropriate alignment for type."""
var = builder.alloca(ir_type, name=var_name)
var.align = _get_alignment(ir_type)
return var
def _get_alignment(ir_type):
"""Get appropriate alignment for IR type."""
if isinstance(ir_type, ir.IntType):
return ir_type.width // 8
elif isinstance(ir_type, ir.ArrayType) and isinstance(ir_type.element, ir.IntType):
return ir_type.element.width // 8
else:
return 8 # Default: pointer size

View File

@ -2,7 +2,6 @@ import ast
import logging import logging
from llvmlite import ir from llvmlite import ir
from pythonbpf.expr import eval_expr from pythonbpf.expr import eval_expr
from pythonbpf.helper import emit_probe_read_kernel_str_call
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,82 +27,27 @@ def handle_struct_field_assignment(
# Get field pointer and evaluate value # Get field pointer and evaluate value
field_ptr = struct_info.gep(builder, local_sym_tab[var_name].var, field_name) field_ptr = struct_info.gep(builder, local_sym_tab[var_name].var, field_name)
field_type = struct_info.field_type(field_name) val = eval_expr(
val_result = eval_expr(
func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab
) )
if val_result is None: if val is None:
logger.error(f"Failed to evaluate value for {var_name}.{field_name}") logger.error(f"Failed to evaluate value for {var_name}.{field_name}")
return return
val, val_type = val_result # TODO: Handle string assignment to char array (not a priority)
field_type = struct_info.field_type(field_name)
# Special case: i8* string to [N x i8] char array if isinstance(field_type, ir.ArrayType) and val[1] == ir.PointerType(ir.IntType(8)):
if _is_char_array(field_type) and _is_i8_ptr(val_type): logger.warning(
_copy_string_to_char_array( f"String to char array assignment not implemented for {var_name}.{field_name}"
func,
module,
builder,
val,
field_ptr,
field_type,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
) )
logger.info(f"Copied string to char array {var_name}.{field_name}")
return return
# Regular assignment # Store the value
builder.store(val, field_ptr) builder.store(val[0], field_ptr)
logger.info(f"Assigned to struct field {var_name}.{field_name}") logger.info(f"Assigned to struct field {var_name}.{field_name}")
def _copy_string_to_char_array(
func,
module,
builder,
src_ptr,
dst_ptr,
array_type,
local_sym_tab,
map_sym_tab,
struct_sym_tab,
):
"""Copy string (i8*) to char array ([N x i8]) using bpf_probe_read_kernel_str"""
array_size = array_type.count
# Get pointer to first element: [N x i8]* -> i8*
dst_i8_ptr = builder.gep(
dst_ptr,
[ir.Constant(ir.IntType(32), 0), ir.Constant(ir.IntType(32), 0)],
inbounds=True,
)
# Use the shared emitter function
emit_probe_read_kernel_str_call(builder, dst_i8_ptr, array_size, src_ptr)
def _is_char_array(ir_type):
"""Check if type is [N x i8]."""
return (
isinstance(ir_type, ir.ArrayType)
and isinstance(ir_type.element, ir.IntType)
and ir_type.element.width == 8
)
def _is_i8_ptr(ir_type):
"""Check if type is i8*."""
return (
isinstance(ir_type, ir.PointerType)
and isinstance(ir_type.pointee, ir.IntType)
and ir_type.pointee.width == 8
)
def handle_variable_assignment( def handle_variable_assignment(
func, module, builder, var_name, rval, local_sym_tab, map_sym_tab, structs_sym_tab func, module, builder, var_name, rval, local_sym_tab, map_sym_tab, structs_sym_tab
): ):
@ -127,17 +71,6 @@ def handle_variable_assignment(
logger.info(f"Initialized struct {struct_name} for variable {var_name}") logger.info(f"Initialized struct {struct_name} for variable {var_name}")
return True return True
# Special case: struct field char array -> pointer
# Handle this before eval_expr to get the pointer, not the value
if isinstance(rval, ast.Attribute) and isinstance(rval.value, ast.Name):
converted_val = _try_convert_char_array_to_ptr(
rval, var_type, builder, local_sym_tab, structs_sym_tab
)
if converted_val is not None:
builder.store(converted_val, var_ptr)
logger.info(f"Assigned char array pointer to {var_name}")
return True
val_result = eval_expr( val_result = eval_expr(
func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab
) )
@ -173,52 +106,3 @@ def handle_variable_assignment(
builder.store(val, var_ptr) builder.store(val, var_ptr)
logger.info(f"Assigned value to variable {var_name}") logger.info(f"Assigned value to variable {var_name}")
return True return True
def _try_convert_char_array_to_ptr(
rval, var_type, builder, local_sym_tab, structs_sym_tab
):
"""Try to convert char array field to i8* pointer"""
# Only convert if target is i8*
if not (
isinstance(var_type, ir.PointerType)
and isinstance(var_type.pointee, ir.IntType)
and var_type.pointee.width == 8
):
return None
struct_var = rval.value.id
field_name = rval.attr
# Validate struct
if struct_var not in local_sym_tab:
return None
struct_type = local_sym_tab[struct_var].metadata
if not struct_type or struct_type not in structs_sym_tab:
return None
struct_info = structs_sym_tab[struct_type]
if field_name not in struct_info.fields:
return None
field_type = struct_info.field_type(field_name)
# Check if it's a char array
if not (
isinstance(field_type, ir.ArrayType)
and isinstance(field_type.element, ir.IntType)
and field_type.element.width == 8
):
return None
# Get pointer to struct field
struct_ptr = local_sym_tab[struct_var].var
field_ptr = struct_info.gep(builder, struct_ptr, field_name)
# GEP to first element: [N x i8]* -> i8*
return builder.gep(
field_ptr,
[ir.Constant(ir.IntType(32), 0), ir.Constant(ir.IntType(32), 0)],
inbounds=True,
)

View File

@ -15,14 +15,24 @@ import os
import subprocess import subprocess
import inspect import inspect
from pathlib import Path from pathlib import Path
from pylibbpf import BpfObject from pylibbpf import BpfProgram
import tempfile import tempfile
from logging import Logger from logging import Logger
import logging import logging
import re
logger: Logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
VERSION = "v0.1.4" VERSION = "v0.1.5"
def finalize_module(original_str):
"""After all IR generation is complete, we monkey patch btf_ama attribute"""
# Create a string with applied transformation of btf_ama attribute addition to BTF struct field accesses.
pattern = r'(@"llvm\.[^"]+:[^"]*" = external global i64, !llvm\.preserve\.access\.index ![0-9]+)'
replacement = r'\1 "btf_ama"'
return re.sub(pattern, replacement, original_str)
def find_bpf_chunks(tree): def find_bpf_chunks(tree):
@ -55,7 +65,6 @@ def processor(source_code, filename, module):
func_proc(tree, module, bpf_chunks, map_sym_tab, structs_sym_tab) func_proc(tree, module, bpf_chunks, map_sym_tab, structs_sym_tab)
globals_list_creation(tree, module) globals_list_creation(tree, module)
return structs_sym_tab, map_sym_tab
def compile_to_ir(filename: str, output: str, loglevel=logging.INFO): def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
@ -81,7 +90,7 @@ def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
True, True,
) )
structs_sym_tab, maps_sym_tab = processor(source, filename, module) processor(source, filename, module)
wchar_size = module.add_metadata( wchar_size = module.add_metadata(
[ [
@ -122,13 +131,15 @@ def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
module.add_named_metadata("llvm.ident", [f"PythonBPF {VERSION}"]) module.add_named_metadata("llvm.ident", [f"PythonBPF {VERSION}"])
module_string = finalize_module(str(module))
logger.info(f"IR written to {output}") logger.info(f"IR written to {output}")
with open(output, "w") as f: with open(output, "w") as f:
f.write(f'source_filename = "{filename}"\n') f.write(f'source_filename = "{filename}"\n')
f.write(str(module)) f.write(module_string)
f.write("\n") f.write("\n")
return output, structs_sym_tab, maps_sym_tab return output
def _run_llc(ll_file, obj_file): def _run_llc(ll_file, obj_file):
@ -158,7 +169,7 @@ def _run_llc(ll_file, obj_file):
return False return False
def compile(loglevel=logging.WARNING) -> bool: def compile(loglevel=logging.INFO) -> bool:
# Look one level up the stack to the caller of this function # Look one level up the stack to the caller of this function
caller_frame = inspect.stack()[1] caller_frame = inspect.stack()[1]
caller_file = Path(caller_frame.filename).resolve() caller_file = Path(caller_frame.filename).resolve()
@ -166,19 +177,18 @@ def compile(loglevel=logging.WARNING) -> bool:
ll_file = Path("/tmp") / caller_file.with_suffix(".ll").name ll_file = Path("/tmp") / caller_file.with_suffix(".ll").name
o_file = caller_file.with_suffix(".o") o_file = caller_file.with_suffix(".o")
_, structs_sym_tab, maps_sym_tab = compile_to_ir( success = True
str(caller_file), str(ll_file), loglevel=loglevel success = (
compile_to_ir(str(caller_file), str(ll_file), loglevel=loglevel) and success
) )
if not _run_llc(ll_file, o_file): success = _run_llc(ll_file, o_file) and success
logger.error("Compilation to object file failed.")
return False
logger.info(f"Object written to {o_file}") logger.info(f"Object written to {o_file}")
return True return success
def BPF(loglevel=logging.WARNING) -> BpfObject: def BPF(loglevel=logging.INFO) -> BpfProgram:
caller_frame = inspect.stack()[1] caller_frame = inspect.stack()[1]
src = inspect.getsource(caller_frame.frame) src = inspect.getsource(caller_frame.frame)
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
@ -191,9 +201,7 @@ def BPF(loglevel=logging.WARNING) -> BpfObject:
f.write(src) f.write(src)
f.flush() f.flush()
source = f.name source = f.name
_, structs_sym_tab, maps_sym_tab = compile_to_ir( compile_to_ir(source, str(inter.name), loglevel=loglevel)
source, str(inter.name), loglevel=loglevel
)
_run_llc(str(inter.name), str(obj_file.name)) _run_llc(str(inter.name), str(obj_file.name))
return BpfObject(str(obj_file.name), structs=structs_sym_tab) return BpfProgram(str(obj_file.name))

View File

@ -12,11 +12,7 @@ from pythonbpf.assign_pass import (
handle_variable_assignment, handle_variable_assignment,
handle_struct_field_assignment, handle_struct_field_assignment,
) )
from pythonbpf.allocation_pass import ( from pythonbpf.allocation_pass import handle_assign_allocation, allocate_temp_pool
handle_assign_allocation,
allocate_temp_pool,
create_targets_and_rvals,
)
from .return_utils import handle_none_return, handle_xdp_return, is_xdp_name from .return_utils import handle_none_return, handle_xdp_return, is_xdp_name
from .function_metadata import get_probe_string, is_global_function, infer_return_type from .function_metadata import get_probe_string, is_global_function, infer_return_type
@ -144,43 +140,48 @@ def handle_assign(
): ):
"""Handle assignment statements in the function body.""" """Handle assignment statements in the function body."""
# NOTE: Support multi-target assignments (e.g.: a, b = 1, 2) # TODO: Support this later
targets, rvals = create_targets_and_rvals(stmt) # GH #37
if len(stmt.targets) != 1:
logger.error("Multi-target assignment is not supported for now")
return
for target, rval in zip(targets, rvals): target = stmt.targets[0]
if isinstance(target, ast.Name): rval = stmt.value
# NOTE: Simple variable assignment case: x = 5
var_name = target.id
result = handle_variable_assignment(
func,
module,
builder,
var_name,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if not result:
logger.error(f"Failed to handle assignment to {var_name}")
continue
if isinstance(target, ast.Attribute): if isinstance(target, ast.Name):
# NOTE: Struct field assignment case: pkt.field = value # NOTE: Simple variable assignment case: x = 5
handle_struct_field_assignment( var_name = target.id
func, result = handle_variable_assignment(
module, func,
builder, module,
target, builder,
rval, var_name,
local_sym_tab, rval,
map_sym_tab, local_sym_tab,
structs_sym_tab, map_sym_tab,
) structs_sym_tab,
continue )
if not result:
logger.error(f"Failed to handle assignment to {var_name}")
return
# Unsupported target type if isinstance(target, ast.Attribute):
logger.error(f"Unsupported assignment target: {ast.dump(target)}") # NOTE: Struct field assignment case: pkt.field = value
handle_struct_field_assignment(
func,
module,
builder,
target,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
return
# Unsupported target type
logger.error(f"Unsupported assignment target: {ast.dump(target)}")
def handle_cond( def handle_cond(

View File

@ -1,7 +1,7 @@
from .helper_registry import HelperHandlerRegistry from .helper_registry import HelperHandlerRegistry
from .helper_utils import reset_scratch_pool from .helper_utils import reset_scratch_pool
from .bpf_helper_handler import handle_helper_call, emit_probe_read_kernel_str_call from .bpf_helper_handler import handle_helper_call
from .helpers import ktime, pid, deref, comm, probe_read_str, XDP_DROP, XDP_PASS from .helpers import ktime, pid, deref, XDP_DROP, XDP_PASS
# Register the helper handler with expr module # Register the helper handler with expr module
@ -59,12 +59,9 @@ __all__ = [
"HelperHandlerRegistry", "HelperHandlerRegistry",
"reset_scratch_pool", "reset_scratch_pool",
"handle_helper_call", "handle_helper_call",
"emit_probe_read_kernel_str_call",
"ktime", "ktime",
"pid", "pid",
"deref", "deref",
"comm",
"probe_read_str",
"XDP_DROP", "XDP_DROP",
"XDP_PASS", "XDP_PASS",
] ]

View File

@ -7,9 +7,6 @@ from .helper_utils import (
get_or_create_ptr_from_arg, get_or_create_ptr_from_arg,
get_flags_val, get_flags_val,
get_data_ptr_and_size, get_data_ptr_and_size,
get_buffer_ptr_and_size,
get_char_array_ptr_and_size,
get_ptr_from_arg,
) )
from .printk_formatter import simple_string_print, handle_fstring_print from .printk_formatter import simple_string_print, handle_fstring_print
@ -26,9 +23,7 @@ class BPFHelperID(Enum):
BPF_KTIME_GET_NS = 5 BPF_KTIME_GET_NS = 5
BPF_PRINTK = 6 BPF_PRINTK = 6
BPF_GET_CURRENT_PID_TGID = 14 BPF_GET_CURRENT_PID_TGID = 14
BPF_GET_CURRENT_COMM = 16
BPF_PERF_EVENT_OUTPUT = 25 BPF_PERF_EVENT_OUTPUT = 25
BPF_PROBE_READ_KERNEL_STR = 115
@HelperHandlerRegistry.register("ktime") @HelperHandlerRegistry.register("ktime")
@ -239,63 +234,6 @@ def bpf_map_delete_elem_emitter(
return result, None return result, None
@HelperHandlerRegistry.register("comm")
def bpf_get_current_comm_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_current_comm helper function call.
Accepts: comm(dataobj.field) or comm(my_buffer)
"""
if not call.args or len(call.args) != 1:
raise ValueError(
f"comm expects exactly one argument (buffer), got {len(call.args)}"
)
buf_arg = call.args[0]
# Extract buffer pointer and size
buf_ptr, buf_size = get_buffer_ptr_and_size(
buf_arg, builder, local_sym_tab, struct_sym_tab
)
# Validate it's a char array
if not isinstance(
buf_ptr.type.pointee, ir.ArrayType
) or buf_ptr.type.pointee.element != ir.IntType(8):
raise ValueError(
f"comm expects a char array buffer, got {buf_ptr.type.pointee}"
)
# Cast to void* and call helper
buf_void_ptr = builder.bitcast(buf_ptr, ir.PointerType())
fn_type = ir.FunctionType(
ir.IntType(64),
[ir.PointerType(), ir.IntType(32)],
var_arg=False,
)
fn_ptr = builder.inttoptr(
ir.Constant(ir.IntType(64), BPFHelperID.BPF_GET_CURRENT_COMM.value),
ir.PointerType(fn_type),
)
result = builder.call(
fn_ptr, [buf_void_ptr, ir.Constant(ir.IntType(32), buf_size)], tail=False
)
logger.info(f"Emitted bpf_get_current_comm with {buf_size} byte buffer")
return result, None
@HelperHandlerRegistry.register("pid") @HelperHandlerRegistry.register("pid")
def bpf_get_current_pid_tgid_emitter( def bpf_get_current_pid_tgid_emitter(
call, call,
@ -371,68 +309,6 @@ def bpf_perf_event_output_handler(
return result, None return result, None
def emit_probe_read_kernel_str_call(builder, dst_ptr, dst_size, src_ptr):
"""Emit LLVM IR call to bpf_probe_read_kernel_str"""
fn_type = ir.FunctionType(
ir.IntType(64),
[ir.PointerType(), ir.IntType(32), ir.PointerType()],
var_arg=False,
)
fn_ptr = builder.inttoptr(
ir.Constant(ir.IntType(64), BPFHelperID.BPF_PROBE_READ_KERNEL_STR.value),
ir.PointerType(fn_type),
)
result = builder.call(
fn_ptr,
[
builder.bitcast(dst_ptr, ir.PointerType()),
ir.Constant(ir.IntType(32), dst_size),
builder.bitcast(src_ptr, ir.PointerType()),
],
tail=False,
)
logger.info(f"Emitted bpf_probe_read_kernel_str (size={dst_size})")
return result
@HelperHandlerRegistry.register("probe_read_str")
def bpf_probe_read_kernel_str_emitter(
call,
map_ptr,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""Emit LLVM IR for bpf_probe_read_kernel_str helper."""
if len(call.args) != 2:
raise ValueError(
f"probe_read_str expects 2 args (dst, src), got {len(call.args)}"
)
# Get destination buffer (char array -> i8*)
dst_ptr, dst_size = get_char_array_ptr_and_size(
call.args[0], builder, local_sym_tab, struct_sym_tab
)
# Get source pointer (evaluate expression)
src_ptr, src_type = get_ptr_from_arg(
call.args[1], func, module, builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
# Emit the helper call
result = emit_probe_read_kernel_str_call(builder, dst_ptr, dst_size, src_ptr)
logger.info(f"Emitted bpf_probe_read_kernel_str (size={dst_size})")
return result, ir.IntType(64)
def handle_helper_call( def handle_helper_call(
call, call,
module, module,

View File

@ -4,7 +4,6 @@ import logging
from llvmlite import ir from llvmlite import ir
from pythonbpf.expr import ( from pythonbpf.expr import (
get_operand_value, get_operand_value,
eval_expr,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -137,140 +136,3 @@ def get_data_ptr_and_size(data_arg, local_sym_tab, struct_sym_tab):
raise NotImplementedError( raise NotImplementedError(
"Only simple object names are supported as data in perf event output." "Only simple object names are supported as data in perf event output."
) )
def get_buffer_ptr_and_size(buf_arg, builder, local_sym_tab, struct_sym_tab):
"""Extract buffer pointer and size from either a struct field or variable."""
# Case 1: Struct field (obj.field)
if isinstance(buf_arg, ast.Attribute):
if not isinstance(buf_arg.value, ast.Name):
raise ValueError(
"Only simple struct field access supported (e.g., obj.field)"
)
struct_name = buf_arg.value.id
field_name = buf_arg.attr
# Lookup struct
if not local_sym_tab or struct_name not in local_sym_tab:
raise ValueError(f"Struct '{struct_name}' not found")
struct_type = local_sym_tab[struct_name].metadata
if not struct_sym_tab or struct_type not in struct_sym_tab:
raise ValueError(f"Struct type '{struct_type}' not found")
struct_info = struct_sym_tab[struct_type]
# Get field pointer and type
struct_ptr = local_sym_tab[struct_name].var
field_ptr = struct_info.gep(builder, struct_ptr, field_name)
field_type = struct_info.field_type(field_name)
if not isinstance(field_type, ir.ArrayType):
raise ValueError(f"Field '{field_name}' must be an array type")
return field_ptr, field_type.count
# Case 2: Variable name
elif isinstance(buf_arg, ast.Name):
var_name = buf_arg.id
if not local_sym_tab or var_name not in local_sym_tab:
raise ValueError(f"Variable '{var_name}' not found")
var_ptr = local_sym_tab[var_name].var
var_type = local_sym_tab[var_name].ir_type
if not isinstance(var_type, ir.ArrayType):
raise ValueError(f"Variable '{var_name}' must be an array type")
return var_ptr, var_type.count
else:
raise ValueError(
"comm expects either a struct field (obj.field) or variable name"
)
def get_char_array_ptr_and_size(buf_arg, builder, local_sym_tab, struct_sym_tab):
"""Get pointer to char array and its size."""
# Struct field: obj.field
if isinstance(buf_arg, ast.Attribute) and isinstance(buf_arg.value, ast.Name):
var_name = buf_arg.value.id
field_name = buf_arg.attr
if not (local_sym_tab and var_name in local_sym_tab):
raise ValueError(f"Variable '{var_name}' not found")
struct_type = local_sym_tab[var_name].metadata
if not (struct_sym_tab and struct_type in struct_sym_tab):
raise ValueError(f"Struct type '{struct_type}' not found")
struct_info = struct_sym_tab[struct_type]
if field_name not in struct_info.fields:
raise ValueError(f"Field '{field_name}' not found")
field_type = struct_info.field_type(field_name)
if not _is_char_array(field_type):
raise ValueError("Expected char array field")
struct_ptr = local_sym_tab[var_name].var
field_ptr = struct_info.gep(builder, struct_ptr, field_name)
# GEP to first element: [N x i8]* -> i8*
buf_ptr = builder.gep(
field_ptr,
[ir.Constant(ir.IntType(32), 0), ir.Constant(ir.IntType(32), 0)],
inbounds=True,
)
return buf_ptr, field_type.count
elif isinstance(buf_arg, ast.Name):
# NOTE: We shouldn't be doing this as we can't get size info
var_name = buf_arg.id
if not (local_sym_tab and var_name in local_sym_tab):
raise ValueError(f"Variable '{var_name}' not found")
var_ptr = local_sym_tab[var_name].var
var_type = local_sym_tab[var_name].ir_type
if not isinstance(var_type, ir.PointerType) and not isinstance(
var_type.pointee, ir.IntType(8)
):
raise ValueError("Expected str ptr variable")
return var_ptr, 256 # Size unknown for str ptr, using 256 as default
else:
raise ValueError("Expected struct field or variable name")
def _is_char_array(ir_type):
"""Check if IR type is [N x i8]."""
return (
isinstance(ir_type, ir.ArrayType)
and isinstance(ir_type.element, ir.IntType)
and ir_type.element.width == 8
)
def get_ptr_from_arg(
arg, func, module, builder, local_sym_tab, map_sym_tab, struct_sym_tab
):
"""Evaluate argument and return pointer value"""
result = eval_expr(
func, module, builder, arg, local_sym_tab, map_sym_tab, struct_sym_tab
)
if not result:
raise ValueError("Failed to evaluate argument")
val, val_type = result
if not isinstance(val_type, ir.PointerType):
raise ValueError(f"Expected pointer type, got {val_type}")
return val, val_type

View File

@ -2,31 +2,19 @@ import ctypes
def ktime(): def ktime():
"""get current ktime"""
return ctypes.c_int64(0) return ctypes.c_int64(0)
def pid(): def pid():
"""get current process id"""
return ctypes.c_int32(0) return ctypes.c_int32(0)
def deref(ptr): def deref(ptr):
"""dereference a pointer""" "dereference a pointer"
result = ctypes.cast(ptr, ctypes.POINTER(ctypes.c_void_p)).contents.value result = ctypes.cast(ptr, ctypes.POINTER(ctypes.c_void_p)).contents.value
return result if result is not None else 0 return result if result is not None else 0
def comm(buf):
"""get current process command name"""
return ctypes.c_int64(0)
def probe_read_str(dst, src):
"""Safely read a null-terminated string from kernel memory"""
return ctypes.c_int64(0)
XDP_ABORTED = ctypes.c_int64(0) XDP_ABORTED = ctypes.c_int64(0)
XDP_DROP = ctypes.c_int64(1) XDP_DROP = ctypes.c_int64(1)
XDP_PASS = ctypes.c_int64(2) XDP_PASS = ctypes.c_int64(2)

View File

@ -173,15 +173,6 @@ def _populate_fval(ftype, node, fmt_parts, exprs):
raise NotImplementedError( raise NotImplementedError(
f"Unsupported pointer target type in f-string: {target}" f"Unsupported pointer target type in f-string: {target}"
) )
elif isinstance(ftype, ir.ArrayType):
if isinstance(ftype.element, ir.IntType) and ftype.element.width == 8:
# Char array
fmt_parts.append("%s")
exprs.append(node)
else:
raise NotImplementedError(
f"Unsupported array element type in f-string: {ftype.element}"
)
else: else:
raise NotImplementedError(f"Unsupported field type in f-string: {ftype}") raise NotImplementedError(f"Unsupported field type in f-string: {ftype}")
@ -206,100 +197,44 @@ def _create_format_string_global(fmt_str, func, module, builder):
def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_tab): def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_tab):
"""Evaluate and prepare an expression to use as an arg for bpf_printk.""" """Evaluate and prepare an expression to use as an arg for bpf_printk."""
val, _ = eval_expr(
# Special case: struct field char array needs pointer to first element func,
char_array_ptr = _get_struct_char_array_ptr( module,
expr, builder, local_sym_tab, struct_sym_tab builder,
) expr,
if char_array_ptr: local_sym_tab,
return char_array_ptr None,
struct_sym_tab,
# Regular expression evaluation
val, _ = eval_expr(func, module, builder, expr, local_sym_tab, None, struct_sym_tab)
if not val:
logger.warning("Failed to evaluate expression for bpf_printk, defaulting to 0")
return ir.Constant(ir.IntType(64), 0)
# Convert value to bpf_printk compatible type
if isinstance(val.type, ir.PointerType):
return _handle_pointer_arg(val, func, builder)
elif isinstance(val.type, ir.IntType):
return _handle_int_arg(val, builder)
else:
logger.warning(f"Unsupported type {val.type} in bpf_printk, defaulting to 0")
return ir.Constant(ir.IntType(64), 0)
def _get_struct_char_array_ptr(expr, builder, local_sym_tab, struct_sym_tab):
"""Get pointer to first element of char array in struct field, or None."""
if not (isinstance(expr, ast.Attribute) and isinstance(expr.value, ast.Name)):
return None
var_name = expr.value.id
field_name = expr.attr
# Check if it's a valid struct field
if not (
local_sym_tab
and var_name in local_sym_tab
and struct_sym_tab
and local_sym_tab[var_name].metadata in struct_sym_tab
):
return None
struct_type = local_sym_tab[var_name].metadata
struct_info = struct_sym_tab[struct_type]
if field_name not in struct_info.fields:
return None
field_type = struct_info.field_type(field_name)
# Check if it's a char array
is_char_array = (
isinstance(field_type, ir.ArrayType)
and isinstance(field_type.element, ir.IntType)
and field_type.element.width == 8
) )
if not is_char_array: if val:
return None if isinstance(val.type, ir.PointerType):
target, depth = get_base_type_and_depth(val.type)
if isinstance(target, ir.IntType):
if target.width >= 32:
val = deref_to_depth(func, builder, val, depth)
val = builder.sext(val, ir.IntType(64))
elif target.width == 8 and depth == 1:
# NOTE: i8* is string, no need to deref
pass
# Get field pointer and GEP to first element: [N x i8]* -> i8* else:
struct_ptr = local_sym_tab[var_name].var logger.warning(
field_ptr = struct_info.gep(builder, struct_ptr, field_name) "Only int and ptr supported in bpf_printk args. Others default to 0."
)
return builder.gep( val = ir.Constant(ir.IntType(64), 0)
field_ptr, elif isinstance(val.type, ir.IntType):
[ir.Constant(ir.IntType(32), 0), ir.Constant(ir.IntType(32), 0)], if val.type.width < 64:
inbounds=True, val = builder.sext(val, ir.IntType(64))
) else:
logger.warning(
"Only int and ptr supported in bpf_printk args. Others default to 0."
def _handle_pointer_arg(val, func, builder): )
"""Convert pointer type for bpf_printk.""" val = ir.Constant(ir.IntType(64), 0)
target, depth = get_base_type_and_depth(val.type)
if not isinstance(target, ir.IntType):
logger.warning("Only int pointers supported in bpf_printk, defaulting to 0")
return ir.Constant(ir.IntType(64), 0)
# i8* is string - use as-is
if target.width == 8 and depth == 1:
return val return val
else:
# Integer pointers: dereference and sign-extend to i64 logger.warning(
if target.width >= 32: "Failed to evaluate expression for bpf_printk argument. "
val = deref_to_depth(func, builder, val, depth) "It will be converted to 0."
return builder.sext(val, ir.IntType(64)) )
return ir.Constant(ir.IntType(64), 0)
logger.warning("Unsupported pointer width in bpf_printk, defaulting to 0")
return ir.Constant(ir.IntType(64), 0)
def _handle_int_arg(val, builder):
"""Convert integer type for bpf_printk (sign-extend to i64)."""
if val.type.width < 64:
return builder.sext(val, ir.IntType(64))
return val

View File

@ -1,56 +0,0 @@
import subprocess
def trace_pipe():
"""Util to read from the trace pipe."""
try:
subprocess.run(["cat", "/sys/kernel/tracing/trace_pipe"])
except KeyboardInterrupt:
print("Tracing stopped.")
def trace_fields():
"""Parse one line from trace_pipe into fields."""
with open("/sys/kernel/tracing/trace_pipe", "rb", buffering=0) as f:
while True:
line = f.readline().rstrip()
if not line:
continue
# Skip lost event lines
if line.startswith(b"CPU:"):
continue
# Parse BCC-style: first 16 bytes = task
task = line[:16].lstrip().decode("utf-8")
line = line[17:] # Skip past task field and space
# Find the colon that ends "pid cpu flags timestamp"
ts_end = line.find(b":")
if ts_end == -1:
raise ValueError("Cannot parse trace line")
# Split "pid [cpu] flags timestamp"
try:
parts = line[:ts_end].split()
if len(parts) < 4:
raise ValueError("Not enough fields")
pid = int(parts[0])
cpu = parts[1][1:-1] # Remove brackets from [cpu]
cpu = int(cpu)
flags = parts[2]
ts = float(parts[3])
except (ValueError, IndexError):
raise ValueError("Cannot parse trace line")
# Get message: skip ": symbol:" part
line = line[ts_end + 1 :] # Skip first ":"
sym_end = line.find(b":")
if sym_end != -1:
msg = line[sym_end + 2 :].decode("utf-8") # Skip ": " after symbol
else:
msg = line.lstrip().decode("utf-8")
return (task, pid, cpu, flags, ts, msg)

View File

@ -60,6 +60,10 @@ def process_vmlinux_post_ast(
pass pass
else: else:
new_dep_node = DependencyNode(name=current_symbol_name) new_dep_node = DependencyNode(name=current_symbol_name)
# elem_type_class is the actual vmlinux struct/class
new_dep_node.set_ctype_struct(elem_type_class)
handler.add_node(new_dep_node) handler.add_node(new_dep_node)
class_obj = getattr(imported_module, current_symbol_name) class_obj = getattr(imported_module, current_symbol_name)
# Inspect the class fields # Inspect the class fields
@ -112,7 +116,11 @@ def process_vmlinux_post_ast(
type_length = elem_type._length_ type_length = elem_type._length_
if containing_type.__module__ == "vmlinux": if containing_type.__module__ == "vmlinux":
pass new_dep_node.add_dependent(
elem_type._type_.__name__
if hasattr(elem_type._type_, "__name__")
else str(elem_type._type_)
)
elif containing_type.__module__ == ctypes.__name__: elif containing_type.__module__ == ctypes.__name__:
if isinstance(elem_type, type): if isinstance(elem_type, type):
if issubclass(elem_type, ctypes.Array): if issubclass(elem_type, ctypes.Array):
@ -137,10 +145,35 @@ def process_vmlinux_post_ast(
) )
new_dep_node.set_field_type(elem_name, elem_type) new_dep_node.set_field_type(elem_name, elem_type)
if containing_type.__module__ == "vmlinux": if containing_type.__module__ == "vmlinux":
process_vmlinux_post_ast( containing_type_name = (
containing_type, llvm_handler, handler, processing_stack containing_type.__name__
if hasattr(containing_type, "__name__")
else str(containing_type)
) )
new_dep_node.set_field_ready(elem_name, True)
# Check for self-reference or already processed
if containing_type_name == current_symbol_name:
# Self-referential pointer
logger.debug(
f"Self-referential pointer in {current_symbol_name}.{elem_name}"
)
new_dep_node.set_field_ready(elem_name, True)
elif handler.has_node(containing_type_name):
# Already processed
logger.debug(
f"Reusing already processed {containing_type_name}"
)
new_dep_node.set_field_ready(elem_name, True)
else:
# Process recursively - THIS WAS MISSING
new_dep_node.add_dependent(containing_type_name)
process_vmlinux_post_ast(
containing_type,
llvm_handler,
handler,
processing_stack,
)
new_dep_node.set_field_ready(elem_name, True)
elif containing_type.__module__ == ctypes.__name__: elif containing_type.__module__ == ctypes.__name__:
logger.debug(f"Processing ctype internal{containing_type}") logger.debug(f"Processing ctype internal{containing_type}")
new_dep_node.set_field_ready(elem_name, True) new_dep_node.set_field_ready(elem_name, True)
@ -149,6 +182,11 @@ def process_vmlinux_post_ast(
"Module not supported in recursive resolution" "Module not supported in recursive resolution"
) )
else: else:
new_dep_node.add_dependent(
elem_type.__name__
if hasattr(elem_type, "__name__")
else str(elem_type)
)
process_vmlinux_post_ast( process_vmlinux_post_ast(
elem_type, llvm_handler, handler, processing_stack elem_type, llvm_handler, handler, processing_stack
) )

View File

@ -147,3 +147,27 @@ class DependencyHandler:
int: The number of nodes int: The number of nodes
""" """
return len(self._nodes) return len(self._nodes)
def __getitem__(self, name: str) -> DependencyNode:
"""
Get a node by name using dictionary-style access.
Args:
name: The name of the node to retrieve
Returns:
DependencyNode: The node with the given name
Raises:
KeyError: If no node with the given name exists
Example:
node = handler["some-dep_node_name"]
"""
if name not in self._nodes:
raise KeyError(f"No node with name '{name}' found")
return self._nodes[name]
@property
def nodes(self):
return self._nodes

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import ctypes
# TODO: FIX THE FUCKING TYPE NAME CONVENTION. # TODO: FIX THE FUCKING TYPE NAME CONVENTION.
@ -13,6 +14,7 @@ class Field:
containing_type: Optional[Any] containing_type: Optional[Any]
type_size: Optional[int] type_size: Optional[int]
bitfield_size: Optional[int] bitfield_size: Optional[int]
offset: int
value: Any = None value: Any = None
ready: bool = False ready: bool = False
@ -60,6 +62,10 @@ class Field:
if mark_ready: if mark_ready:
self.ready = True self.ready = True
def set_offset(self, offset: int) -> None:
"""Set the offset of this field"""
self.offset = offset
@dataclass @dataclass
class DependencyNode: class DependencyNode:
@ -106,8 +112,11 @@ class DependencyNode:
""" """
name: str name: str
depends_on: Optional[list[str]] = None
fields: Dict[str, Field] = field(default_factory=dict) fields: Dict[str, Field] = field(default_factory=dict)
_ready_cache: Optional[bool] = field(default=None, repr=False) _ready_cache: Optional[bool] = field(default=None, repr=False)
current_offset: int = 0
ctype_struct: Optional[Any] = field(default=None, repr=False)
def add_field( def add_field(
self, self,
@ -119,8 +128,11 @@ class DependencyNode:
ctype_complex_type: Optional[int] = None, ctype_complex_type: Optional[int] = None,
bitfield_size: Optional[int] = None, bitfield_size: Optional[int] = None,
ready: bool = False, ready: bool = False,
offset: int = 0,
) -> None: ) -> None:
"""Add a field to the node with an optional initial value and readiness state.""" """Add a field to the node with an optional initial value and readiness state."""
if self.depends_on is None:
self.depends_on = []
self.fields[name] = Field( self.fields[name] = Field(
name=name, name=name,
type=field_type, type=field_type,
@ -130,10 +142,21 @@ class DependencyNode:
type_size=type_size, type_size=type_size,
ctype_complex_type=ctype_complex_type, ctype_complex_type=ctype_complex_type,
bitfield_size=bitfield_size, bitfield_size=bitfield_size,
offset=offset,
) )
# Invalidate readiness cache # Invalidate readiness cache
self._ready_cache = None self._ready_cache = None
def set_ctype_struct(self, ctype_struct: Any) -> None:
"""Set the ctypes structure for automatic offset calculation."""
self.ctype_struct = ctype_struct
def __sizeof__(self):
# If we have a ctype_struct, use its size
if self.ctype_struct is not None:
return ctypes.sizeof(self.ctype_struct)
return self.current_offset
def get_field(self, name: str) -> Field: def get_field(self, name: str) -> Field:
"""Get a field by name.""" """Get a field by name."""
return self.fields[name] return self.fields[name]
@ -200,15 +223,112 @@ class DependencyNode:
# Invalidate readiness cache # Invalidate readiness cache
self._ready_cache = None self._ready_cache = None
def set_field_ready(self, name: str, is_ready: bool = False) -> None: def set_field_ready(
self,
name: str,
is_ready: bool = False,
size_of_containing_type: Optional[int] = None,
) -> None:
"""Mark a field as ready or not ready.""" """Mark a field as ready or not ready."""
if name not in self.fields: if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'") raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_ready(is_ready) self.fields[name].set_ready(is_ready)
# Use ctypes built-in offset if available
if self.ctype_struct is not None:
try:
self.fields[name].set_offset(getattr(self.ctype_struct, name).offset)
except AttributeError:
# Fallback to manual calculation if field not found in ctype_struct
self.fields[name].set_offset(self.current_offset)
self.current_offset += self._calculate_size(
name, size_of_containing_type
)
else:
# Manual offset calculation when no ctype_struct is available
self.fields[name].set_offset(self.current_offset)
self.current_offset += self._calculate_size(name, size_of_containing_type)
# Invalidate readiness cache # Invalidate readiness cache
self._ready_cache = None self._ready_cache = None
def _calculate_size(
self, name: str, size_of_containing_type: Optional[int] = None
) -> int:
processing_field = self.fields[name]
# size_of_field will be in bytes
if processing_field.type.__module__ == ctypes.__name__:
size_of_field = ctypes.sizeof(processing_field.type)
return size_of_field
elif processing_field.type.__module__ == "vmlinux":
if processing_field.ctype_complex_type is not None:
if issubclass(processing_field.ctype_complex_type, ctypes.Array):
if processing_field.containing_type.__module__ == ctypes.__name__:
if (
processing_field.containing_type is not None
and processing_field.type_size is not None
):
size_of_field = (
ctypes.sizeof(processing_field.containing_type)
* processing_field.type_size
)
else:
raise RuntimeError(
f"{processing_field} has no containing_type or type_size"
)
return size_of_field
elif processing_field.containing_type.__module__ == "vmlinux":
if (
size_of_containing_type is not None
and processing_field.type_size is not None
):
size_of_field = (
size_of_containing_type * processing_field.type_size
)
else:
raise RuntimeError(
f"{processing_field} has no containing_type or type_size"
)
return size_of_field
elif issubclass(processing_field.ctype_complex_type, ctypes._Pointer):
return ctypes.sizeof(ctypes.c_void_p)
else:
raise NotImplementedError(
"This subclass of ctype not supported yet"
)
elif processing_field.type_size is not None:
# Handle vmlinux types with type_size but no ctype_complex_type
# This means it's a direct vmlinux struct field (not array/pointer wrapped)
# The type_size should already contain the full size of the struct
# But if there's a containing_type from vmlinux, we need that size
if processing_field.containing_type is not None:
if processing_field.containing_type.__module__ == "vmlinux":
# For vmlinux containing types, we need the pre-calculated size
if size_of_containing_type is not None:
return size_of_containing_type * processing_field.type_size
else:
raise RuntimeError(
f"Field {name}: vmlinux containing_type requires size_of_containing_type"
)
else:
raise ModuleNotFoundError(
f"Containing type module {processing_field.containing_type.__module__} not supported"
)
else:
raise RuntimeError("Wrong type found with no containing type")
else:
# No ctype_complex_type and no type_size, must rely on size_of_containing_type
if size_of_containing_type is None:
raise RuntimeError(
f"Size of containing type {size_of_containing_type} is None"
)
return size_of_containing_type
else:
raise ModuleNotFoundError("Module is not supported for the operation")
raise RuntimeError("control should not reach here")
@property @property
def is_ready(self) -> bool: def is_ready(self) -> bool:
"""Check if the node is ready (all fields are ready).""" """Check if the node is ready (all fields are ready)."""
@ -235,3 +355,9 @@ class DependencyNode:
def get_not_ready_fields(self) -> Dict[str, Field]: def get_not_ready_fields(self) -> Dict[str, Field]:
"""Get all fields that are marked as not ready.""" """Get all fields that are marked as not ready."""
return {name: elem for name, elem in self.fields.items() if not elem.ready} return {name: elem for name, elem in self.fields.items() if not elem.ready}
def add_dependent(self, dep_type):
if dep_type in self.depends_on:
return
else:
self.depends_on.append(dep_type)

View File

@ -1,11 +1,11 @@
import ast import ast
import logging import logging
from typing import List, Tuple, Dict from typing import List, Tuple, Any
import importlib import importlib
import inspect import inspect
from .dependency_handler import DependencyHandler from .dependency_handler import DependencyHandler
from .ir_generation import IRGenerator from .ir_gen import IRGenerator
from .class_handler import process_vmlinux_class from .class_handler import process_vmlinux_class
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -82,7 +82,7 @@ def vmlinux_proc(tree: ast.AST, module):
# initialise dependency handler # initialise dependency handler
handler = DependencyHandler() handler = DependencyHandler()
# initialise assignment dictionary of name to type # initialise assignment dictionary of name to type
assignments: Dict[str, type] = {} assignments: dict[str, tuple[type, Any]] = {}
if not import_statements: if not import_statements:
logger.info("No vmlinux imports found") logger.info("No vmlinux imports found")
@ -129,7 +129,19 @@ def vmlinux_proc(tree: ast.AST, module):
) )
IRGenerator(module, handler) IRGenerator(module, handler)
return assignments
def process_vmlinux_assign(node, module, assignments: Dict[str, type]): def process_vmlinux_assign(node, module, assignments: dict[str, tuple[type, Any]]):
raise NotImplementedError("Assignment handling has not been implemented yet") # Check if this is a simple assignment with a constant value
if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
target_name = node.targets[0].id
if isinstance(node.value, ast.Constant):
assignments[target_name] = (type(node.value.value), node.value.value)
logger.info(
f"Added assignment: {target_name} = {node.value.value!r} of type {type(node.value.value)}"
)
else:
raise ValueError(f"Unsupported assignment type for {target_name}")
else:
raise ValueError("Not a simple assignment")

View File

@ -0,0 +1,3 @@
from .ir_generation import IRGenerator
__all__ = ["IRGenerator"]

View File

@ -0,0 +1,15 @@
from pythonbpf.debuginfo import DebugInfoGenerator
def debug_info_generation(struct, llvm_module):
generator = DebugInfoGenerator(llvm_module)
# this is sample debug info generation
# i64type = generator.get_uint64_type()
struct_type = generator.create_struct_type([], 64 * 4, is_distinct=True)
global_var = generator.create_global_var_debug_info(
struct.name, struct_type, is_local=False
)
return global_var

View File

@ -0,0 +1,161 @@
import ctypes
import logging
from ..dependency_handler import DependencyHandler
from .debug_info_gen import debug_info_generation
from ..dependency_node import DependencyNode
import llvmlite.ir as ir
logger = logging.getLogger(__name__)
class IRGenerator:
# get the assignments dict and add this stuff to it.
def __init__(self, llvm_module, handler: DependencyHandler, assignment=None):
self.llvm_module = llvm_module
self.handler: DependencyHandler = handler
self.generated: list[str] = []
if not handler.is_ready:
raise ImportError(
"Semantic analysis of vmlinux imports failed. Cannot generate IR"
)
for struct in handler:
self.struct_processor(struct)
def struct_processor(self, struct, processing_stack=None):
# Initialize processing stack on first call
if processing_stack is None:
processing_stack = set()
# If already generated, skip
if struct.name in self.generated:
return
# Detect circular dependency
if struct.name in processing_stack:
logger.info(
f"Circular dependency detected for {struct.name}, skipping recursive processing"
)
# For circular dependencies, we can either:
# 1. Use forward declarations (opaque pointers)
# 2. Mark as incomplete and process later
# 3. Generate a placeholder type
# Here we'll just skip and let it be processed in its own call
return
logger.info(f"IR generating for {struct.name}")
# Add to processing stack before processing dependencies
processing_stack.add(struct.name)
try:
# Process all dependencies first
if struct.depends_on is None:
pass
else:
for dependency in struct.depends_on:
if dependency not in self.generated:
# Check if dependency exists in handler
if dependency in self.handler.nodes:
dep_node_from_dependency = self.handler[dependency]
# Pass the processing_stack down to track circular refs
self.struct_processor(
dep_node_from_dependency, processing_stack
)
else:
raise RuntimeError(
f"Warning: Dependency {dependency} not found in handler"
)
# Actual processor logic here after dependencies are resolved
self.gen_ir(struct)
self.generated.append(struct.name)
finally:
# Remove from processing stack after we're done
processing_stack.discard(struct.name)
def gen_ir(self, struct):
# TODO: we add the btf_ama attribute by monkey patching in the end of compilation, but once llvmlite
# accepts our issue, we will resort to normal accessed attribute based attribute addition
# currently we generate all possible field accesses for CO-RE and put into the assignment table
debug_info = debug_info_generation(struct, self.llvm_module)
field_index = 0
for field_name, field in struct.fields.items():
# does not take arrays and similar types into consideration yet.
if field.ctype_complex_type is not None and issubclass(
field.ctype_complex_type, ctypes.Array
):
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == ctypes.__name__:
containing_type_size = ctypes.sizeof(containing_type)
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
field_index += 1
elif field.type_size is not None:
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == "vmlinux":
containing_type_size = self.handler[
containing_type.__name__
].current_offset
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
field_index += 1
else:
field_co_re_name = self._struct_name_generator(
struct, field, field_index
)
field_index += 1
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
def _struct_name_generator(
self,
struct: DependencyNode,
field,
field_index: int,
is_indexed: bool = False,
index: int = 0,
containing_type_size: int = 0,
) -> str:
if is_indexed:
name = (
"llvm."
+ struct.name.removeprefix("struct_")
+ f":0:{field.offset + index * containing_type_size}"
+ "$"
+ f"0:{field_index}:{index}"
)
return name
elif struct.name.startswith("struct_"):
name = (
"llvm."
+ struct.name.removeprefix("struct_")
+ f":0:{field.offset}"
+ "$"
+ f"0:{field_index}"
)
return name
else:
print(self.handler[struct.name])
raise TypeError(
"Name generation cannot occur due to type name not starting with struct"
)

View File

@ -1,14 +0,0 @@
import logging
from .dependency_handler import DependencyHandler
logger = logging.getLogger(__name__)
class IRGenerator:
def __init__(self, module, handler: DependencyHandler):
self.module = module
self.handler: DependencyHandler = handler
if not handler.is_ready:
raise ImportError(
"Semantic analysis of vmlinux imports failed. Cannot generate IR"
)

View File

@ -1,23 +1,9 @@
// SPDX-License-Identifier: GPL-2.0 // SPDX-License-Identifier: GPL-2.0
#include <linux/bpf.h> #include "vmlinux.h"
#include <bpf/bpf_helpers.h> #include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h> #include <bpf/bpf_tracing.h>
struct trace_entry {
short unsigned int type;
unsigned char flags;
unsigned char preempt_count;
int pid;
};
struct trace_event_raw_sys_enter {
struct trace_entry ent;
long int id;
long unsigned int args[6];
char __data[0];
};
struct event { struct event {
__u32 pid; __u32 pid;
__u32 uid; __u32 uid;

View File

@ -1,10 +1,17 @@
from pythonbpf import bpf, map, section, bpfglobal, compile_to_ir from pythonbpf import bpf, map, section, bpfglobal, compile_to_ir
from pythonbpf.maps import HashMap from pythonbpf.maps import HashMap
from pythonbpf.helper import XDP_PASS from pythonbpf.helper import XDP_PASS
from vmlinux import struct_xdp_md from vmlinux import TASK_COMM_LEN # noqa: F401
from vmlinux import struct_xdp_buff # noqa: F401
from vmlinux import struct_ring_buffer_per_cpu # noqa: F401
from vmlinux import struct_qspinlock # noqa: F401
# from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
# from vmlinux import struct_posix_cputimers # noqa: F401
from vmlinux import struct_xdp_md
# from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
# from vmlinux import struct_ring_buffer_per_cpu # noqa: F401
# from vmlinux import struct_request # noqa: F401
from ctypes import c_int64 from ctypes import c_int64
# Instructions to how to run this program # Instructions to how to run this program

View File

@ -1,28 +0,0 @@
from pythonbpf import bpf, struct, section, bpfglobal
from pythonbpf.helper import comm
from ctypes import c_void_p, c_int64
@bpf
@struct
class data_t:
comm: str(16) # type: ignore [valid-type]
copp: str(16) # type: ignore [valid-type]
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello(ctx: c_void_p) -> c_int64:
dataobj = data_t()
comm(dataobj.comm)
strobj = dataobj.comm
dataobj.copp = strobj
print(f"clone called by comm {dataobj.copp}")
return 0
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"

View File

@ -1,26 +0,0 @@
from pythonbpf import bpf, struct, section, bpfglobal
from pythonbpf.helper import comm
from ctypes import c_void_p, c_int64
@bpf
@struct
class data_t:
comm: str(16) # type: ignore [valid-type]
@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def hello(ctx: c_void_p) -> c_int64:
dataobj = data_t()
comm(dataobj.comm)
strobj = dataobj.comm
print(f"clone called by comm {strobj}")
return 0
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"