40 Commits

Author SHA1 Message Date
c8801f4c3e nonetype not parsed 2025-11-19 23:35:10 +05:30
49740598ea format chore 2025-11-13 09:31:10 +05:30
73bbf00e7c add tests 2025-11-13 09:29:53 +05:30
f7dee329cb fix nested pointers issue in array generation and also fix zero length array IR generation 2025-11-10 20:29:28 +05:30
5031f90377 fix stacked vmlinux struct parsing issue 2025-11-10 20:06:04 +05:30
95a624044a fix type error 2025-11-08 20:28:56 +05:30
c5bef26b88 add multi imports to single import line. 2025-11-08 18:08:04 +05:30
a9d82d40d3 Merge pull request #60 from pythonbpf/vmlinux-handler
vmlinux handler with struct support for only int64 and unsigned uint64 type struct fields.
2025-11-01 08:15:14 +05:30
85a62d6cd8 add example and support unsigned i64 2025-11-01 08:13:22 +05:30
c3fc790c71 remove fixed TODOs 2025-11-01 07:05:42 +05:30
22e30f04b4 Merge pull request #66 from pythonbpf/dependabot/github_actions/actions-3249c11fdc
Bump the actions group with 2 updates
2025-10-27 17:21:49 +05:30
620b8cb1e7 Bump the actions group with 2 updates
Bumps the actions group with 2 updates: [actions/upload-artifact](https://github.com/actions/upload-artifact) and [actions/download-artifact](https://github.com/actions/download-artifact).


Updates `actions/upload-artifact` from 4 to 5
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v4...v5)

Updates `actions/download-artifact` from 5 to 6
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
- dependency-name: actions/download-artifact
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-27 11:49:59 +00:00
1207fe9f92 Update .gitattributes to include new directories 2025-10-27 03:43:38 +05:30
b138405931 Merge pull request #65 from pythonbpf/varun-r-mallya-patch-1
Mark Jupyter Notebook files as vendored
2025-10-27 03:41:59 +05:30
262f00f635 Mark Jupyter Notebook files as vendored 2025-10-27 03:41:15 +05:30
07580dabf2 revert struct reference pointer sizes to i8 to ensure that compiler does not optimize 2025-10-27 03:29:15 +05:30
ac74b03b14 Add TODO to specify flags and DISubprogram. 2025-10-27 03:01:56 +05:30
3bf85e733e add DI subprogram to make CO-RE work fully. 2025-10-27 03:00:13 +05:30
73f7c80eca add scope field separately to subroutine type remove circular dependency 2025-10-27 02:48:06 +05:30
238697469a create debug info to subroutine type 2025-10-27 02:19:08 +05:30
8bd210cede add debug info storage on assignment_info.py dataclass 2025-10-26 15:46:42 +05:30
7bf6f9c48c add function_debug_info.py and format 2025-10-26 15:12:36 +05:30
a1fe2ed4bc change to 64 bit pointers. May be an issue. revert this commit if issues arise 2025-10-26 15:00:53 +05:30
93285dbdd8 geenrate gep IR 2025-10-26 02:12:33 +05:30
1ea44dd8e1 Use pointer arithmetic to resolve vmlinux struct fields 2025-10-25 05:40:45 +05:30
96216d4411 Consistently use Dataclass syntac for AssignmentInfo and related classes 2025-10-25 05:10:47 +05:30
028d9c2c08 generate IR partly 2025-10-25 04:41:13 +05:30
c6b5ecb47e find global variable ir and field data from metadata 2025-10-24 03:34:27 +05:30
30bcfcbbd0 remove compile error on normal c_void_p in arg and separate localsymbol to avoid circular dep 2025-10-24 03:08:39 +05:30
f18a4399ea format chore 2025-10-24 02:40:07 +05:30
4e01df735f complete part of expr passing for attribute of i64 type 2025-10-24 02:38:39 +05:30
64674cf646 add alloc for only i64 2025-10-24 02:06:39 +05:30
36a1a0903e Merge branch 'master' into vmlinux-handler 2025-10-22 12:02:51 +05:30
f2bc7f1434 pass context to memory allocation 2025-10-22 12:01:52 +05:30
b3921c424d parse context from first function argument to local symbol table 2025-10-22 11:40:49 +05:30
adf32560a0 bpf passthrough gen in codegen
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-22 03:45:54 +05:30
21cea97d78 add return None statements 2025-10-21 07:02:34 +05:30
d8729342dc add bpf_passthrough generation 2025-10-21 07:01:37 +05:30
4179fbfc88 move around examples 2025-10-21 06:03:16 +05:30
ba397036b4 add failing examples to work on 2025-10-21 05:49:44 +05:30
31 changed files with 1020 additions and 121857 deletions

2
.gitattributes vendored
View File

@ -1 +1,3 @@
tests/c-form/vmlinux.h linguist-vendored
examples/ linguist-vendored
BCC-Examples/ linguist-vendored

View File

@ -33,7 +33,7 @@ jobs:
python -m build
- name: Upload distributions
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: release-dists
path: dist/
@ -59,7 +59,7 @@ jobs:
steps:
- name: Retrieve release distributions
uses: actions/download-artifact@v5
uses: actions/download-artifact@v6
with:
name: release-dists
path: dist/

View File

@ -2,27 +2,15 @@ import ast
import logging
from llvmlite import ir
from dataclasses import dataclass
from typing import Any
from .local_symbol import LocalSymbol
from pythonbpf.helper import HelperHandlerRegistry
from pythonbpf.vmlinux_parser.dependency_node import Field
from .expr import VmlinuxHandlerRegistry
from pythonbpf.type_deducer import ctypes_to_ir
logger = logging.getLogger(__name__)
@dataclass
class LocalSymbol:
var: ir.AllocaInstr
ir_type: ir.Type
metadata: Any = None
def __iter__(self):
yield self.var
yield self.ir_type
yield self.metadata
def create_targets_and_rvals(stmt):
"""Create lists of targets and right-hand values from an assignment statement."""
if isinstance(stmt.targets[0], ast.Tuple):
@ -60,21 +48,11 @@ def handle_assign_allocation(builder, stmt, local_sym_tab, structs_sym_tab):
continue
var_name = target.id
# Skip if already allocated
if var_name in local_sym_tab:
logger.debug(f"Variable {var_name} already allocated, skipping")
continue
# When allocating a variable, check if it's a vmlinux struct type
if isinstance(
stmt.value, ast.Name
) and VmlinuxHandlerRegistry.is_vmlinux_struct(stmt.value.id):
# Handle vmlinux struct allocation
# This requires more implementation
print(stmt.value)
pass
# Determine type and allocate based on rval
if isinstance(rval, ast.Call):
_allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab)
@ -248,9 +226,41 @@ def _allocate_for_attribute(builder, var_name, rval, local_sym_tab, structs_sym_
logger.error(f"Struct variable '{struct_var}' not found")
return
struct_type = local_sym_tab[struct_var].metadata
struct_type: type = local_sym_tab[struct_var].metadata
if not struct_type or struct_type not in structs_sym_tab:
logger.error(f"Struct type '{struct_type}' not found")
if VmlinuxHandlerRegistry.is_vmlinux_struct(struct_type.__name__):
# Handle vmlinux struct field access
vmlinux_struct_name = struct_type.__name__
if not VmlinuxHandlerRegistry.has_field(vmlinux_struct_name, field_name):
logger.error(
f"Field '{field_name}' not found in vmlinux struct '{vmlinux_struct_name}'"
)
return
field_type: tuple[ir.GlobalVariable, Field] = (
VmlinuxHandlerRegistry.get_field_type(vmlinux_struct_name, field_name)
)
field_ir, field = field_type
# TODO: For now, we only support integer type allocations.
# This always assumes first argument of function to be the context struct
base_ptr = builder.function.args[0]
local_sym_tab[
struct_var
].var = base_ptr # This is repurposing of var to store the pointer of the base type
local_sym_tab[struct_var].ir_type = field_ir
actual_ir_type = ir.IntType(64)
# Allocate with the actual IR type, not the GlobalVariable
var = _allocate_with_type(builder, var_name, actual_ir_type)
local_sym_tab[var_name] = LocalSymbol(var, actual_ir_type, field)
logger.info(
f"Pre-allocated {var_name} from vmlinux struct {vmlinux_struct_name}.{field_name}"
)
return
else:
logger.error(f"Struct type '{struct_type}' not found")
return
struct_info = structs_sym_tab[struct_type]

View File

@ -3,6 +3,8 @@ import logging
from llvmlite import ir
from pythonbpf.expr import eval_expr
from pythonbpf.helper import emit_probe_read_kernel_str_call
from pythonbpf.type_deducer import ctypes_to_ir
from pythonbpf.vmlinux_parser.dependency_node import Field
logger = logging.getLogger(__name__)
@ -148,7 +150,18 @@ def handle_variable_assignment(
val, val_type = val_result
logger.info(f"Evaluated value for {var_name}: {val} of type {val_type}, {var_type}")
if val_type != var_type:
if isinstance(val_type, ir.IntType) and isinstance(var_type, ir.IntType):
if isinstance(val_type, Field):
logger.info("Handling assignment to struct field")
# TODO: handling only ctype struct fields for now. Handle other stuff too later.
if var_type == ctypes_to_ir(val_type.type.__name__):
builder.store(val, var_ptr)
logger.info(f"Assigned ctype struct field to {var_name}")
return True
logger.error(
f"Failed to assign ctype struct field to {var_name}: {val_type} != {var_type}"
)
return False
elif isinstance(val_type, ir.IntType) and isinstance(var_type, ir.IntType):
# Allow implicit int widening
if val_type.width < var_type.width:
val = builder.sext(val, var_type)

View File

@ -37,6 +37,24 @@ def finalize_module(original_str):
return re.sub(pattern, replacement, original_str)
def bpf_passthrough_gen(module):
i32_ty = ir.IntType(32)
ptr_ty = ir.PointerType(ir.IntType(8))
fnty = ir.FunctionType(ptr_ty, [i32_ty, ptr_ty])
# Declare the intrinsic
passthrough = ir.Function(module, fnty, "llvm.bpf.passthrough.p0.p0")
# Set function attributes
# TODO: the ones commented are supposed to be there but cannot be added due to llvmlite limitations at the moment
# passthrough.attributes.add("nofree")
# passthrough.attributes.add("nosync")
passthrough.attributes.add("nounwind")
# passthrough.attributes.add("memory(none)")
return passthrough
def find_bpf_chunks(tree):
"""Find all functions decorated with @bpf in the AST."""
bpf_functions = []
@ -57,6 +75,8 @@ def processor(source_code, filename, module):
for func_node in bpf_chunks:
logger.info(f"Found BPF function/struct: {func_node.name}")
bpf_passthrough_gen(module)
vmlinux_symtab = vmlinux_proc(tree, module)
if vmlinux_symtab:
handler = VmlinuxHandler.initialize(vmlinux_symtab)
@ -137,7 +157,7 @@ def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
module.add_named_metadata("llvm.ident", [f"PythonBPF {VERSION}"])
module_string = finalize_module(str(module))
module_string: str = finalize_module(str(module))
logger.info(f"IR written to {output}")
with open(output, "w") as f:

View File

@ -184,3 +184,83 @@ class DebugInfoGenerator:
"DIGlobalVariableExpression",
{"var": global_var, "expr": self.module.add_debug_info("DIExpression", {})},
)
def get_int64_type(self):
return self.get_basic_type("long", 64, dc.DW_ATE_signed)
def create_subroutine_type(self, return_type, param_types):
"""
Create a DISubroutineType given return type and list of parameter types.
Equivalent to: !DISubroutineType(types: !{ret, args...})
"""
type_array = [return_type]
if isinstance(param_types, (list, tuple)):
type_array.extend(param_types)
else:
type_array.append(param_types)
return self.module.add_debug_info("DISubroutineType", {"types": type_array})
def create_local_variable_debug_info(
self, name: str, arg: int, var_type: Any
) -> Any:
"""
Create debug info for a local variable (DILocalVariable) without scope.
Example:
!DILocalVariable(name: "ctx", arg: 1, file: !3, line: 20, type: !7)
"""
return self.module.add_debug_info(
"DILocalVariable",
{
"name": name,
"arg": arg,
"file": self.module._file_metadata,
"type": var_type,
},
)
def add_scope_to_local_variable(self, local_variable_debug_info, scope_value):
"""
Add scope information to an existing local variable debug info object.
"""
# TODO: this is a workaround a flaw in the debug info generation. Fix this if possible in the future.
# We should not be touching llvmlite's internals like this.
if hasattr(local_variable_debug_info, "operands"):
# LLVM metadata operands is a tuple, so we need to rebuild it
existing_operands = local_variable_debug_info.operands
# Convert tuple to list, add scope, convert back to tuple
operands_list = list(existing_operands)
operands_list.append(("scope", scope_value))
# Reassign the new tuple
local_variable_debug_info.operands = tuple(operands_list)
def create_subprogram(
self, name: str, subroutine_type: Any, retained_nodes: List[Any]
) -> Any:
"""
Create a DISubprogram for a function.
Args:
name: Function name
subroutine_type: DISubroutineType for the function signature
retained_nodes: List of DILocalVariable nodes for function parameters/variables
Returns:
DISubprogram metadata
"""
return self.module.add_debug_info(
"DISubprogram",
{
"name": name,
"scope": self.module._file_metadata,
"file": self.module._file_metadata,
"type": subroutine_type,
# TODO: the following flags do not exist at the moment in our dwarf constants file. We need to add them.
# "flags": dc.DW_FLAG_Prototyped | dc.DW_FLAG_AllCallsDescribed,
# "spFlags": dc.DW_SPFLAG_Definition | dc.DW_SPFLAG_Optimized,
"unit": self.module._debug_compile_unit,
"retainedNodes": retained_nodes,
},
is_distinct=True,
)

View File

@ -72,20 +72,28 @@ def _handle_attribute_expr(
if var_name in local_sym_tab:
var_ptr, var_type, var_metadata = local_sym_tab[var_name]
logger.info(f"Loading attribute {attr_name} from variable {var_name}")
logger.info(f"Variable type: {var_type}, Variable ptr: {var_ptr}")
logger.info(
f"Variable type: {var_type}, Variable ptr: {var_ptr}, Variable Metadata: {var_metadata}"
)
if (
hasattr(var_metadata, "__module__")
and var_metadata.__module__ == "vmlinux"
):
# Try vmlinux handler when var_metadata is not a string, but has a module attribute.
# This has been done to keep everything separate in vmlinux struct handling.
vmlinux_result = VmlinuxHandlerRegistry.handle_attribute(
expr, local_sym_tab, None, builder
)
if vmlinux_result is not None:
return vmlinux_result
else:
raise RuntimeError("Vmlinux struct did not process successfully")
metadata = structs_sym_tab[var_metadata]
if attr_name in metadata.fields:
gep = metadata.gep(builder, var_ptr, attr_name)
val = builder.load(gep)
field_type = metadata.field_type(attr_name)
return val, field_type
# Try vmlinux handler as fallback
vmlinux_result = VmlinuxHandlerRegistry.handle_attribute(
expr, local_sym_tab, None, builder
)
if vmlinux_result is not None:
return vmlinux_result
return None

View File

@ -1,5 +1,7 @@
import ast
from pythonbpf.vmlinux_parser.vmlinux_exports_handler import VmlinuxHandler
class VmlinuxHandlerRegistry:
"""Registry for vmlinux handler operations"""
@ -7,7 +9,7 @@ class VmlinuxHandlerRegistry:
_handler = None
@classmethod
def set_handler(cls, handler):
def set_handler(cls, handler: VmlinuxHandler):
"""Set the vmlinux handler"""
cls._handler = handler
@ -37,9 +39,37 @@ class VmlinuxHandlerRegistry:
)
return None
@classmethod
def get_struct_debug_info(cls, name):
if cls._handler is None:
return False
return cls._handler.get_struct_debug_info(name)
@classmethod
def is_vmlinux_struct(cls, name):
"""Check if a name refers to a vmlinux struct"""
if cls._handler is None:
return False
return cls._handler.is_vmlinux_struct(name)
@classmethod
def get_struct_type(cls, name):
"""Try to handle a struct name as vmlinux struct"""
if cls._handler is None:
return None
return cls._handler.get_vmlinux_struct_type(name)
@classmethod
def has_field(cls, vmlinux_struct_name, field_name):
"""Check if a vmlinux struct has a specific field"""
if cls._handler is None:
return False
return cls._handler.has_field(vmlinux_struct_name, field_name)
@classmethod
def get_field_type(cls, vmlinux_struct_name, field_name):
"""Get the type of a field in a vmlinux struct"""
if cls._handler is None:
return None
assert isinstance(cls._handler, VmlinuxHandler)
return cls._handler.get_field_type(vmlinux_struct_name, field_name)

View File

@ -0,0 +1,72 @@
import ast
import llvmlite.ir as ir
import logging
from pythonbpf.debuginfo import DebugInfoGenerator
from pythonbpf.expr import VmlinuxHandlerRegistry
import ctypes
logger = logging.getLogger(__name__)
def generate_function_debug_info(
func_node: ast.FunctionDef, module: ir.Module, func: ir.Function
):
generator = DebugInfoGenerator(module)
leading_argument = func_node.args.args[0]
leading_argument_name = leading_argument.arg
annotation = leading_argument.annotation
if func_node.returns is None:
# TODO: should check if this logic is consistent with function return type handling elsewhere
return_type = ctypes.c_int64()
elif hasattr(func_node.returns, "id"):
return_type = func_node.returns.id
if return_type == "c_int32":
return_type = generator.get_int32_type()
elif return_type == "c_int64":
return_type = generator.get_int64_type()
elif return_type == "c_uint32":
return_type = generator.get_uint32_type()
elif return_type == "c_uint64":
return_type = generator.get_uint64_type()
else:
logger.warning(
"Return type should be int32, int64, uint32 or uint64 only. Falling back to int64"
)
return_type = generator.get_int64_type()
else:
return_type = ctypes.c_int64()
# context processing
if annotation is None:
logger.warning("Type of context of function not found.")
return
if hasattr(annotation, "id"):
ctype_name = annotation.id
if ctype_name == "c_void_p":
return
elif ctype_name.startswith("ctypes"):
raise SyntaxError(
"The first argument should always be a pointer to a struct or a void pointer"
)
context_debug_info = VmlinuxHandlerRegistry.get_struct_debug_info(annotation.id)
pointer_to_context_debug_info = generator.create_pointer_type(
context_debug_info, 64
)
subroutine_type = generator.create_subroutine_type(
return_type, pointer_to_context_debug_info
)
context_local_variable = generator.create_local_variable_debug_info(
leading_argument_name, 1, pointer_to_context_debug_info
)
retained_nodes = [context_local_variable]
print("function name", func_node.name)
subprogram_debug_info = generator.create_subprogram(
func_node.name, subroutine_type, retained_nodes
)
generator.add_scope_to_local_variable(
context_local_variable, subprogram_debug_info
)
func.set_metadata("dbg", subprogram_debug_info)
else:
logger.error(f"Invalid annotation type for argument '{leading_argument_name}'")

View File

@ -7,7 +7,12 @@ from pythonbpf.helper import (
reset_scratch_pool,
)
from pythonbpf.type_deducer import ctypes_to_ir
from pythonbpf.expr import eval_expr, handle_expr, convert_to_bool
from pythonbpf.expr import (
eval_expr,
handle_expr,
convert_to_bool,
VmlinuxHandlerRegistry,
)
from pythonbpf.assign_pass import (
handle_variable_assignment,
handle_struct_field_assignment,
@ -16,8 +21,9 @@ from pythonbpf.allocation_pass import (
handle_assign_allocation,
allocate_temp_pool,
create_targets_and_rvals,
LocalSymbol,
)
from .function_debug_info import generate_function_debug_info
from .return_utils import handle_none_return, handle_xdp_return, is_xdp_name
from .function_metadata import get_probe_string, is_global_function, infer_return_type
@ -324,6 +330,28 @@ def process_func_body(
local_sym_tab = {}
# Add the context parameter (first function argument) to the local symbol table
if func_node.args.args and len(func_node.args.args) > 0:
context_arg = func_node.args.args[0]
context_name = context_arg.arg
if hasattr(context_arg, "annotation") and context_arg.annotation:
if isinstance(context_arg.annotation, ast.Name):
context_type_name = context_arg.annotation.id
elif isinstance(context_arg.annotation, ast.Attribute):
context_type_name = context_arg.annotation.attr
else:
raise TypeError(
f"Unsupported annotation type: {ast.dump(context_arg.annotation)}"
)
if VmlinuxHandlerRegistry.is_vmlinux_struct(context_type_name):
resolved_type = VmlinuxHandlerRegistry.get_struct_type(
context_type_name
)
context_type = LocalSymbol(None, None, resolved_type)
local_sym_tab[context_name] = context_type
logger.info(f"Added argument '{context_name}' to local symbol table")
# pre-allocate dynamic variables
local_sym_tab = allocate_mem(
module,
@ -374,7 +402,7 @@ def process_bpf_chunk(func_node, module, return_type, map_sym_tab, structs_sym_t
func.linkage = "dso_local"
func.attributes.add("nounwind")
func.attributes.add("noinline")
func.attributes.add("optnone")
# func.attributes.add("optnone")
if func_node.args.args:
# Only look at the first argument for now
@ -412,7 +440,7 @@ def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
func_type = get_probe_string(func_node)
logger.info(f"Found probe_string of {func_node.name}: {func_type}")
process_bpf_chunk(
func = process_bpf_chunk(
func_node,
module,
ctypes_to_ir(infer_return_type(func_node)),
@ -420,6 +448,9 @@ def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
structs_sym_tab,
)
logger.info(f"Generating Debug Info for Function {func_node.name}")
generate_function_debug_info(func_node, module, func)
# TODO: WIP, for string assignment to fixed-size arrays
def assign_string_to_array(builder, target_array_ptr, source_string_ptr, array_length):

View File

@ -78,9 +78,9 @@ def bpf_map_lookup_elem_emitter(
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
# TODO: I have changed the return type to i64*, as we are
# allocating space for that type in allocate_mem. This is
# temporary, and we will honour other widths later. But this
# allows us to have cool binary ops on the returned value.
# allocating space for that type in allocate_mem. This is
# temporary, and we will honour other widths later. But this
# allows us to have cool binary ops on the returned value.
fn_type = ir.FunctionType(
ir.PointerType(ir.IntType(64)), # Return type: void*
[ir.PointerType(), ir.PointerType()], # Args: (void*, void*)

15
pythonbpf/local_symbol.py Normal file
View File

@ -0,0 +1,15 @@
import llvmlite.ir as ir
from dataclasses import dataclass
from typing import Any
@dataclass
class LocalSymbol:
var: ir.AllocaInstr
ir_type: ir.Type
metadata: Any = None
def __iter__(self):
yield self.var
yield self.ir_type
yield self.metadata

View File

@ -13,6 +13,9 @@ mapping = {
"c_float": ir.FloatType(),
"c_double": ir.DoubleType(),
"c_void_p": ir.IntType(64),
"c_long": ir.IntType(64),
"c_ulong": ir.IntType(64),
"c_longlong": ir.IntType(64),
# Not so sure about this one
"str": ir.PointerType(ir.IntType(8)),
}

View File

@ -6,7 +6,6 @@ import llvmlite.ir as ir
from pythonbpf.vmlinux_parser.dependency_node import Field
@dataclass
class AssignmentType(Enum):
CONSTANT = auto()
STRUCT = auto()
@ -34,3 +33,4 @@ class AssignmentInfo:
# Value is a tuple that contains the global variable representing that field
# along with all the information about that field as a Field type.
members: Optional[Dict[str, tuple[ir.GlobalVariable, Field]]] # For structs.
debug_info: Any

View File

@ -16,10 +16,37 @@ def get_module_symbols(module_name: str):
return [name for name in dir(imported_module)], imported_module
def unwrap_pointer_type(type_obj: Any) -> Any:
"""
Recursively unwrap all pointer layers to get the base type.
This handles multiply nested pointers like LP_LP_struct_attribute_group
and returns the base type (struct_attribute_group).
Stops unwrapping when reaching a non-pointer type (one without _type_ attribute).
Args:
type_obj: The type object to unwrap
Returns:
The base type after unwrapping all pointer layers
"""
current_type = type_obj
# Keep unwrapping while it's a pointer/array type (has _type_)
# But stop if _type_ is just a string or basic type marker
while hasattr(current_type, "_type_"):
next_type = current_type._type_
# Stop if _type_ is a string (like 'c' for c_char)
if isinstance(next_type, str):
break
current_type = next_type
return current_type
def process_vmlinux_class(
node,
llvm_module,
handler: DependencyHandler,
node,
llvm_module,
handler: DependencyHandler,
):
symbols_in_module, imported_module = get_module_symbols("vmlinux")
if node.name in symbols_in_module:
@ -30,10 +57,10 @@ def process_vmlinux_class(
def process_vmlinux_post_ast(
elem_type_class,
llvm_handler,
handler: DependencyHandler,
processing_stack=None,
elem_type_class,
llvm_handler,
handler: DependencyHandler,
processing_stack=None,
):
# Initialize processing stack on first call
if processing_stack is None:
@ -113,7 +140,7 @@ def process_vmlinux_post_ast(
# Process pointer to ctype
if isinstance(elem_type, type) and issubclass(
elem_type, ctypes._Pointer
elem_type, ctypes._Pointer
):
# Get the pointed-to type
pointed_type = elem_type._type_
@ -126,7 +153,7 @@ def process_vmlinux_post_ast(
# Process function pointers (CFUNCTYPE)
elif hasattr(elem_type, "_restype_") and hasattr(
elem_type, "_argtypes_"
elem_type, "_argtypes_"
):
# This is a CFUNCTYPE or similar
logger.info(
@ -158,13 +185,90 @@ def process_vmlinux_post_ast(
if hasattr(elem_type, "_length_") and is_complex_type:
type_length = elem_type._length_
if containing_type.__module__ == "vmlinux":
new_dep_node.add_dependent(
elem_type._type_.__name__
if hasattr(elem_type._type_, "__name__")
else str(elem_type._type_)
# Unwrap all pointer layers to get the base type for dependency tracking
base_type = unwrap_pointer_type(elem_type)
base_type_module = getattr(base_type, "__module__", None)
if base_type_module == "vmlinux":
base_type_name = (
base_type.__name__
if hasattr(base_type, "__name__")
else str(base_type)
)
# ONLY add vmlinux types as dependencies
new_dep_node.add_dependent(base_type_name)
logger.debug(
f"{containing_type} containing type of parent {elem_name} with {elem_type} and ctype {ctype_complex_type} and length {type_length}"
)
new_dep_node.set_field_containing_type(
elem_name, containing_type
)
new_dep_node.set_field_type_size(elem_name, type_length)
new_dep_node.set_field_ctype_complex_type(
elem_name, ctype_complex_type
)
new_dep_node.set_field_type(elem_name, elem_type)
# Check the containing_type module to decide whether to recurse
containing_type_module = getattr(
containing_type, "__module__", None
)
if containing_type_module == "vmlinux":
# Also unwrap containing_type to get base type name
base_containing_type = unwrap_pointer_type(
containing_type
)
containing_type_name = (
base_containing_type.__name__
if hasattr(base_containing_type, "__name__")
else str(base_containing_type)
)
# Check for self-reference or already processed
if containing_type_name == current_symbol_name:
# Self-referential pointer
logger.debug(
f"Self-referential pointer in {current_symbol_name}.{elem_name}"
)
new_dep_node.set_field_ready(elem_name, True)
elif handler.has_node(containing_type_name):
# Already processed
logger.debug(
f"Reusing already processed {containing_type_name}"
)
new_dep_node.set_field_ready(elem_name, True)
else:
# Process recursively - use base containing type, not the pointer wrapper
new_dep_node.add_dependent(containing_type_name)
process_vmlinux_post_ast(
base_containing_type,
llvm_handler,
handler,
processing_stack,
)
new_dep_node.set_field_ready(elem_name, True)
elif (
containing_type_module == ctypes.__name__
or containing_type_module is None
):
logger.debug(
f"Processing ctype internal{containing_type}"
)
new_dep_node.set_field_ready(elem_name, True)
else:
raise TypeError(
f"Module not supported in recursive resolution: {containing_type_module}"
)
elif (
base_type_module == ctypes.__name__
or base_type_module is None
):
# Handle ctypes or types with no module (like some internal ctypes types)
# DO NOT add ctypes as dependencies - just set field metadata and mark ready
logger.debug(
f"Base type {base_type} is ctypes - NOT adding as dependency, just processing field"
)
elif containing_type.__module__ == ctypes.__name__:
if isinstance(elem_type, type):
if issubclass(elem_type, ctypes.Array):
ctype_complex_type = ctypes.Array
@ -176,57 +280,20 @@ def process_vmlinux_post_ast(
)
else:
raise TypeError("Unsupported ctypes subclass")
else:
raise ImportError(
f"Unsupported module of {containing_type}"
)
logger.debug(
f"{containing_type} containing type of parent {elem_name} with {elem_type} and ctype {ctype_complex_type} and length {type_length}"
)
new_dep_node.set_field_containing_type(
elem_name, containing_type
)
new_dep_node.set_field_type_size(elem_name, type_length)
new_dep_node.set_field_ctype_complex_type(
elem_name, ctype_complex_type
)
new_dep_node.set_field_type(elem_name, elem_type)
if containing_type.__module__ == "vmlinux":
containing_type_name = (
containing_type.__name__
if hasattr(containing_type, "__name__")
else str(containing_type)
)
# Check for self-reference or already processed
if containing_type_name == current_symbol_name:
# Self-referential pointer
logger.debug(
f"Self-referential pointer in {current_symbol_name}.{elem_name}"
)
new_dep_node.set_field_ready(elem_name, True)
elif handler.has_node(containing_type_name):
# Already processed
logger.debug(
f"Reusing already processed {containing_type_name}"
)
new_dep_node.set_field_ready(elem_name, True)
else:
# Process recursively - THIS WAS MISSING
new_dep_node.add_dependent(containing_type_name)
process_vmlinux_post_ast(
containing_type,
llvm_handler,
handler,
processing_stack,
)
new_dep_node.set_field_ready(elem_name, True)
elif containing_type.__module__ == ctypes.__name__:
logger.debug(f"Processing ctype internal{containing_type}")
# Set field metadata but DO NOT add dependency or recurse
new_dep_node.set_field_containing_type(
elem_name, containing_type
)
new_dep_node.set_field_type_size(elem_name, type_length)
new_dep_node.set_field_ctype_complex_type(
elem_name, ctype_complex_type
)
new_dep_node.set_field_type(elem_name, elem_type)
new_dep_node.set_field_ready(elem_name, True)
else:
raise TypeError(
"Module not supported in recursive resolution"
raise ImportError(
f"Unsupported module of {base_type}: {base_type_module}"
)
else:
new_dep_node.add_dependent(
@ -245,9 +312,12 @@ def process_vmlinux_post_ast(
raise ValueError(
f"{elem_name} with type {elem_type} from module {module_name} not supported in recursive resolver"
)
elif module_name == ctypes.__name__ or module_name is None:
# Handle ctypes types - these don't need processing, just return
logger.debug(f"Skipping ctypes type {current_symbol_name}")
return True
else:
raise ImportError("UNSUPPORTED Module")
raise ImportError(f"UNSUPPORTED Module {module_name}")
logger.info(
f"{current_symbol_name} processed and handler readiness {handler.is_ready}"

View File

@ -11,7 +11,9 @@ from .class_handler import process_vmlinux_class
logger = logging.getLogger(__name__)
def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
def detect_import_statement(
tree: ast.AST,
) -> list[tuple[str, ast.ImportFrom, str, str]]:
"""
Parse AST and detect import statements from vmlinux.
@ -25,7 +27,7 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
List of tuples containing (module_name, imported_item) for each vmlinux import
Raises:
SyntaxError: If multiple imports from vmlinux are attempted or import * is used
SyntaxError: If import * is used
"""
vmlinux_imports = []
@ -40,28 +42,19 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
"Please import specific types explicitly."
)
# Check for multiple imports: from vmlinux import A, B, C
if len(node.names) > 1:
imported_names = [alias.name for alias in node.names]
raise SyntaxError(
f"Multiple imports from vmlinux are not supported. "
f"Found: {', '.join(imported_names)}. "
f"Please use separate import statements for each type."
)
# Check if no specific import is specified (should not happen with valid Python)
if len(node.names) == 0:
raise SyntaxError(
"Import from vmlinux must specify at least one type."
)
# Valid single import
# Support multiple imports: from vmlinux import A, B, C
for alias in node.names:
import_name = alias.name
# Use alias if provided, otherwise use the original name (commented)
# as_name = alias.asname if alias.asname else alias.name
vmlinux_imports.append(("vmlinux", node))
logger.info(f"Found vmlinux import: {import_name}")
# Use alias if provided, otherwise use the original name
as_name = alias.asname if alias.asname else alias.name
vmlinux_imports.append(("vmlinux", node, import_name, as_name))
logger.info(f"Found vmlinux import: {import_name} as {as_name}")
# Handle "import vmlinux" statements (not typical but should be rejected)
elif isinstance(node, ast.Import):
@ -73,6 +66,7 @@ def detect_import_statement(tree: ast.AST) -> list[tuple[str, ast.ImportFrom]]:
)
logger.info(f"Total vmlinux imports detected: {len(vmlinux_imports)}")
# print(f"\n**************\n{vmlinux_imports}\n**************\n")
return vmlinux_imports
@ -86,57 +80,54 @@ def vmlinux_proc(tree: ast.AST, module):
if not import_statements:
logger.info("No vmlinux imports found")
return
return None
# Import vmlinux module directly
try:
vmlinux_mod = importlib.import_module("vmlinux")
except ImportError:
logger.warning("Could not import vmlinux module")
return
return None
source_file = inspect.getsourcefile(vmlinux_mod)
if source_file is None:
logger.warning("Cannot find source for vmlinux module")
return
return None
with open(source_file, "r") as f:
mod_ast = ast.parse(f.read(), filename=source_file)
for import_mod, import_node in import_statements:
for alias in import_node.names:
imported_name = alias.name
found = False
for mod_node in mod_ast.body:
if (
isinstance(mod_node, ast.ClassDef)
and mod_node.name == imported_name
):
process_vmlinux_class(mod_node, module, handler)
found = True
break
if isinstance(mod_node, ast.Assign):
for target in mod_node.targets:
if isinstance(target, ast.Name) and target.id == imported_name:
process_vmlinux_assign(mod_node, module, assignments)
found = True
break
if found:
break
if not found:
logger.info(
f"{imported_name} not found as ClassDef or Assign in vmlinux"
)
for import_mod, import_node, imported_name, as_name in import_statements:
found = False
for mod_node in mod_ast.body:
if isinstance(mod_node, ast.ClassDef) and mod_node.name == imported_name:
process_vmlinux_class(mod_node, module, handler)
found = True
break
if isinstance(mod_node, ast.Assign):
for target in mod_node.targets:
if isinstance(target, ast.Name) and target.id == imported_name:
process_vmlinux_assign(mod_node, module, assignments, as_name)
found = True
break
if found:
break
if not found:
logger.info(f"{imported_name} not found as ClassDef or Assign in vmlinux")
IRGenerator(module, handler, assignments)
return assignments
def process_vmlinux_assign(node, module, assignments: dict[str, AssignmentInfo]):
def process_vmlinux_assign(
node, module, assignments: dict[str, AssignmentInfo], target_name=None
):
"""Process assignments from vmlinux module."""
# Only handle single-target assignments
if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
target_name = node.targets[0].id
# Use provided target_name (for aliased imports) or fall back to original name
if target_name is None:
target_name = node.targets[0].id
# Handle constant value assignments
if isinstance(node.value, ast.Constant):
@ -148,6 +139,7 @@ def process_vmlinux_assign(node, module, assignments: dict[str, AssignmentInfo])
pointer_level=None,
signature=None,
members=None,
debug_info=None,
)
logger.info(
f"Added assignment: {target_name} = {node.value.value!r} of type {type(node.value.value)}"

View File

@ -46,13 +46,14 @@ def debug_info_generation(
if struct.name.startswith("struct_"):
struct_name = struct.name.removeprefix("struct_")
# Create struct type with all members
struct_type = generator.create_struct_type_with_name(
struct_name, members, struct.__sizeof__() * 8, is_distinct=True
)
else:
raise ValueError("Unions are not supported in the current version")
# Create struct type with all members
struct_type = generator.create_struct_type_with_name(
struct_name, members, struct.__sizeof__() * 8, is_distinct=True
)
logger.warning("Blindly handling Unions present in vmlinux dependencies")
struct_type = None
# raise ValueError("Unions are not supported in the current version")
return struct_type
@ -62,7 +63,7 @@ def _get_field_debug_type(
generator: DebugInfoGenerator,
parent_struct: DependencyNode,
generated_debug_info: List[Tuple[DependencyNode, Any]],
) -> tuple[Any, int]:
) -> tuple[Any, int] | None:
"""
Determine the appropriate debug type for a field based on its Python/ctypes type.
@ -78,7 +79,11 @@ def _get_field_debug_type(
"""
# Handle complex types (arrays, pointers)
if field.ctype_complex_type is not None:
if issubclass(field.ctype_complex_type, ctypes.Array):
# TODO: Check if this is a CFUNCTYPE (function pointer), but sadly it just checks callable for now
if callable(field.ctype_complex_type):
# Handle function pointer types, create a void pointer as a placeholder
return generator.create_pointer_type(None), 64
elif issubclass(field.ctype_complex_type, ctypes.Array):
# Handle array types
element_type, base_type_size = _get_basic_debug_type(
field.containing_type, generator

View File

@ -11,6 +11,10 @@ logger = logging.getLogger(__name__)
class IRGenerator:
# This field keeps track of the non_struct names to avoid duplicate name errors.
type_number = 0
unprocessed_store = []
# get the assignments dict and add this stuff to it.
def __init__(self, llvm_module, handler: DependencyHandler, assignments):
self.llvm_module = llvm_module
@ -68,14 +72,14 @@ class IRGenerator:
dep_node_from_dependency, processing_stack
)
else:
print(struct)
raise RuntimeError(
f"Warning: Dependency {dependency} not found in handler"
)
# Generate IR first to populate field names
self.generated_debug_info.append(
(struct, self.gen_ir(struct, self.generated_debug_info))
)
struct_debug_info = self.gen_ir(struct, self.generated_debug_info)
self.generated_debug_info.append((struct, struct_debug_info))
# Fill the assignments dictionary with struct information
if struct.name not in self.assignments:
@ -83,6 +87,7 @@ class IRGenerator:
members_dict = {}
for field_name, field in struct.fields.items():
# Get the generated field name from our dictionary, or use field_name if not found
print(f"DEBUG: {struct.name}, {field_name}")
if (
struct.name in self.generated_field_names
and field_name in self.generated_field_names[struct.name]
@ -105,6 +110,7 @@ class IRGenerator:
pointer_level=None,
signature=None,
members=members_dict,
debug_info=struct_debug_info,
)
logger.info(f"Added struct assignment info for {struct.name}")
@ -129,7 +135,20 @@ class IRGenerator:
for field_name, field in struct.fields.items():
# does not take arrays and similar types into consideration yet.
if field.ctype_complex_type is not None and issubclass(
if callable(field.ctype_complex_type):
# Function pointer case - generate a simple field accessor
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index
)
print(field_co_re_name)
field_index += 1
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
self.generated_field_names[struct.name][field_name] = globvar
elif field.ctype_complex_type is not None and issubclass(
field.ctype_complex_type, ctypes.Array
):
array_size = field.type_size
@ -137,7 +156,7 @@ class IRGenerator:
if containing_type.__module__ == ctypes.__name__:
containing_type_size = ctypes.sizeof(containing_type)
if array_size == 0:
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, 0, containing_type_size
)
globvar = ir.GlobalVariable(
@ -149,7 +168,7 @@ class IRGenerator:
field_index += 1
continue
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
@ -163,12 +182,30 @@ class IRGenerator:
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == "vmlinux":
containing_type_size = self.handler[
containing_type.__name__
].current_offset
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
print(struct)
# Unwrap all pointer layers to get the base struct type
base_containing_type = containing_type
while hasattr(base_containing_type, "_type_"):
next_type = base_containing_type._type_
# Stop if _type_ is a string (like 'c' for c_char)
# TODO: stacked pointers not handl;ing ctypes check here as well
if isinstance(next_type, str):
break
base_containing_type = next_type
# Get the base struct name
base_struct_name = (
base_containing_type.__name__
if hasattr(base_containing_type, "__name__")
else str(base_containing_type)
)
# Look up the size using the base struct name
containing_type_size = self.handler[base_struct_name].current_offset
print(f"GAY: {array_size}, {struct.name}, {field_name}")
if array_size == 0:
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index, True, 0, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
@ -176,9 +213,30 @@ class IRGenerator:
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
self.generated_field_names[struct.name][field_name] = globvar
field_index += 1
field_index += 1
else:
for i in range(0, array_size):
field_co_re_name, returned = self._struct_name_generator(
struct,
field,
field_index,
True,
i,
containing_type_size,
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata(
"llvm.preserve.access.index", debug_info
)
self.generated_field_names[struct.name][field_name] = (
globvar
)
field_index += 1
else:
field_co_re_name = self._struct_name_generator(
field_co_re_name, returned = self._struct_name_generator(
struct, field, field_index
)
field_index += 1
@ -198,7 +256,7 @@ class IRGenerator:
is_indexed: bool = False,
index: int = 0,
containing_type_size: int = 0,
) -> str:
) -> tuple[str, bool]:
# TODO: Does not support Unions as well as recursive pointer and array type naming
if is_indexed:
name = (
@ -208,7 +266,7 @@ class IRGenerator:
+ "$"
+ f"0:{field_index}:{index}"
)
return name
return name, True
elif struct.name.startswith("struct_"):
name = (
"llvm."
@ -217,9 +275,18 @@ class IRGenerator:
+ "$"
+ f"0:{field_index}"
)
return name
return name, True
else:
print(self.handler[struct.name])
raise TypeError(
"Name generation cannot occur due to type name not starting with struct"
logger.warning(
"Blindly handling non-struct type to avoid type errors in vmlinux IR generation. Possibly a union."
)
self.type_number += 1
unprocessed_type = "unprocessed_type_" + str(self.handler[struct.name].name)
if self.unprocessed_store.__contains__(unprocessed_type):
return unprocessed_type + "_" + str(self.type_number), False
else:
self.unprocessed_store.append(unprocessed_type)
return unprocessed_type, False
# raise TypeError(
# "Name generation cannot occur due to type name not starting with struct"
# )

View File

@ -1,6 +1,9 @@
import logging
from typing import Any
from llvmlite import ir
from pythonbpf.local_symbol import LocalSymbol
from pythonbpf.vmlinux_parser.assignment_info import AssignmentType
logger = logging.getLogger(__name__)
@ -36,20 +39,39 @@ class VmlinuxHandler:
"""Check if name is a vmlinux enum constant"""
return (
name in self.vmlinux_symtab
and self.vmlinux_symtab[name]["value_type"] == AssignmentType.CONSTANT
and self.vmlinux_symtab[name].value_type == AssignmentType.CONSTANT
)
def get_struct_debug_info(self, name: str) -> Any:
if (
name in self.vmlinux_symtab
and self.vmlinux_symtab[name].value_type == AssignmentType.STRUCT
):
return self.vmlinux_symtab[name].debug_info
else:
raise ValueError(f"{name} is not a vmlinux struct type")
def get_vmlinux_struct_type(self, name):
"""Check if name is a vmlinux struct type"""
if (
name in self.vmlinux_symtab
and self.vmlinux_symtab[name].value_type == AssignmentType.STRUCT
):
return self.vmlinux_symtab[name].python_type
else:
raise ValueError(f"{name} is not a vmlinux struct type")
def is_vmlinux_struct(self, name):
"""Check if name is a vmlinux struct"""
return (
name in self.vmlinux_symtab
and self.vmlinux_symtab[name]["value_type"] == AssignmentType.STRUCT
and self.vmlinux_symtab[name].value_type == AssignmentType.STRUCT
)
def handle_vmlinux_enum(self, name):
"""Handle vmlinux enum constants by returning LLVM IR constants"""
if self.is_vmlinux_enum(name):
value = self.vmlinux_symtab[name]["value"]
value = self.vmlinux_symtab[name].value
logger.info(f"Resolving vmlinux enum {name} = {value}")
return ir.Constant(ir.IntType(64), value), ir.IntType(64)
return None
@ -57,34 +79,131 @@ class VmlinuxHandler:
def get_vmlinux_enum_value(self, name):
"""Handle vmlinux enum constants by returning LLVM IR constants"""
if self.is_vmlinux_enum(name):
value = self.vmlinux_symtab[name]["value"]
value = self.vmlinux_symtab[name].value
logger.info(f"The value of vmlinux enum {name} = {value}")
return value
return None
def handle_vmlinux_struct(self, struct_name, module, builder):
"""Handle vmlinux struct initializations"""
if self.is_vmlinux_struct(struct_name):
# TODO: Implement core-specific struct handling
# This will be more complex and depends on the BTF information
logger.info(f"Handling vmlinux struct {struct_name}")
# Return struct type and allocated pointer
# This is a stub, actual implementation will be more complex
return None
return None
def handle_vmlinux_struct_field(
self, struct_var_name, field_name, module, builder, local_sym_tab
):
"""Handle access to vmlinux struct fields"""
# Check if it's a variable of vmlinux struct type
if struct_var_name in local_sym_tab:
var_info = local_sym_tab[struct_var_name] # noqa: F841
# Need to check if this variable is a vmlinux struct
# This will depend on how you track vmlinux struct types in your symbol table
var_info: LocalSymbol = local_sym_tab[struct_var_name]
logger.info(
f"Attempting to access field {field_name} of possible vmlinux struct {struct_var_name}"
)
python_type: type = var_info.metadata
globvar_ir, field_data = self.get_field_type(
python_type.__name__, field_name
)
builder.function.args[0].type = ir.PointerType(ir.IntType(8))
print(builder.function.args[0])
field_ptr = self.load_ctx_field(
builder, builder.function.args[0], globvar_ir
)
print(field_ptr)
# Return pointer to field and field type
return None
return None
return field_ptr, field_data
else:
raise RuntimeError("Variable accessed not found in symbol table")
@staticmethod
def load_ctx_field(builder, ctx_arg, offset_global):
"""
Generate LLVM IR to load a field from BPF context using offset.
Args:
builder: llvmlite IRBuilder instance
ctx_arg: The context pointer argument (ptr/i8*)
offset_global: Global variable containing the field offset (i64)
Returns:
The loaded value (i64 register)
"""
# Load the offset value
offset = builder.load(offset_global)
# Ensure ctx_arg is treated as i8* (byte pointer)
i8_ptr_type = ir.PointerType()
# Cast ctx_arg to i8* if it isn't already
if str(ctx_arg.type) != str(i8_ptr_type):
ctx_i8_ptr = builder.bitcast(ctx_arg, i8_ptr_type)
else:
ctx_i8_ptr = ctx_arg
# GEP with explicit type - this is the key fix
field_ptr = builder.gep(
ctx_i8_ptr,
[offset],
inbounds=False,
)
# Get or declare the BPF passthrough intrinsic
module = builder.function.module
try:
passthrough_fn = module.globals.get("llvm.bpf.passthrough.p0.p0")
if passthrough_fn is None:
raise KeyError
except (KeyError, AttributeError):
passthrough_type = ir.FunctionType(
i8_ptr_type,
[ir.IntType(32), i8_ptr_type],
)
passthrough_fn = ir.Function(
module,
passthrough_type,
name="llvm.bpf.passthrough.p0.p0",
)
# Call passthrough to satisfy BPF verifier
verified_ptr = builder.call(
passthrough_fn, [ir.Constant(ir.IntType(32), 0), field_ptr], tail=True
)
# Bitcast to i64* (assuming field is 64-bit, adjust if needed)
i64_ptr_type = ir.PointerType(ir.IntType(64))
typed_ptr = builder.bitcast(verified_ptr, i64_ptr_type)
# Load and return the value
value = builder.load(typed_ptr)
return value
def has_field(self, struct_name, field_name):
"""Check if a vmlinux struct has a specific field"""
if self.is_vmlinux_struct(struct_name):
python_type = self.vmlinux_symtab[struct_name].python_type
return hasattr(python_type, field_name)
return False
def get_field_type(self, vmlinux_struct_name, field_name):
"""Get the type of a field in a vmlinux struct"""
if self.is_vmlinux_struct(vmlinux_struct_name):
python_type = self.vmlinux_symtab[vmlinux_struct_name].python_type
if hasattr(python_type, field_name):
return self.vmlinux_symtab[vmlinux_struct_name].members[field_name]
else:
raise ValueError(
f"Field {field_name} not found in vmlinux struct {vmlinux_struct_name}"
)
else:
raise ValueError(f"{vmlinux_struct_name} is not a vmlinux struct")
def get_field_index(self, vmlinux_struct_name, field_name):
"""Get the type of a field in a vmlinux struct"""
if self.is_vmlinux_struct(vmlinux_struct_name):
python_type = self.vmlinux_symtab[vmlinux_struct_name].python_type
if hasattr(python_type, field_name):
return list(
self.vmlinux_symtab[vmlinux_struct_name].members.keys()
).index(field_name)
else:
raise ValueError(
f"Field {field_name} not found in vmlinux struct {vmlinux_struct_name}"
)
else:
raise ValueError(f"{vmlinux_struct_name} is not a vmlinux struct")

View File

@ -1,5 +1,5 @@
BPF_CLANG := clang
CFLAGS := -O2 -emit-llvm -target bpf -c
CFLAGS := -O0 -emit-llvm -target bpf -c
SRC := $(wildcard *.bpf.c)
LL := $(SRC:.bpf.c=.bpf.ll)

View File

@ -1,25 +0,0 @@
#define __TARGET_ARCH_arm64
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
// Map: key = struct request*, value = u64 timestamp
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct request *);
__type(value, u64);
__uint(max_entries, 1024);
} start SEC(".maps");
// Attach to kprobe for blk_start_request
SEC("kprobe/blk_start_request")
int BPF_KPROBE(trace_start, struct request *req)
{
u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&start, &req, &ts, BPF_ANY);
return 0;
}
char LICENSE[] SEC("license") = "GPL";

View File

@ -2,18 +2,75 @@
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
char LICENSE[] SEC("license") = "Dual BSD/GPL";
char LICENSE[] SEC("license") = "GPL";
SEC("kprobe/do_unlinkat")
int kprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat created");
return 0;
}
SEC("kretprobe/do_unlinkat")
int kretprobe_execve(struct pt_regs *ctx)
{
bpf_printk("unlinkat returned\n");
unsigned long r15 = ctx->r15;
bpf_printk("r15: %lld", r15);
unsigned long r14 = ctx->r14;
bpf_printk("r14: %lld", r14);
unsigned long r13 = ctx->r13;
bpf_printk("r13: %lld", r13);
unsigned long r12 = ctx->r12;
bpf_printk("r12: %lld", r12);
unsigned long bp = ctx->bp;
bpf_printk("rbp: %lld", bp);
unsigned long bx = ctx->bx;
bpf_printk("rbx: %lld", bx);
unsigned long r11 = ctx->r11;
bpf_printk("r11: %lld", r11);
unsigned long r10 = ctx->r10;
bpf_printk("r10: %lld", r10);
unsigned long r9 = ctx->r9;
bpf_printk("r9: %lld", r9);
unsigned long r8 = ctx->r8;
bpf_printk("r8: %lld", r8);
unsigned long ax = ctx->ax;
bpf_printk("rax: %lld", ax);
unsigned long cx = ctx->cx;
bpf_printk("rcx: %lld", cx);
unsigned long dx = ctx->dx;
bpf_printk("rdx: %lld", dx);
unsigned long si = ctx->si;
bpf_printk("rsi: %lld", si);
unsigned long di = ctx->di;
bpf_printk("rdi: %lld", di);
unsigned long orig_ax = ctx->orig_ax;
bpf_printk("orig_rax: %lld", orig_ax);
unsigned long ip = ctx->ip;
bpf_printk("rip: %lld", ip);
unsigned long cs = ctx->cs;
bpf_printk("cs: %lld", cs);
unsigned long flags = ctx->flags;
bpf_printk("eflags: %lld", flags);
unsigned long sp = ctx->sp;
bpf_printk("rsp: %lld", sp);
unsigned long ss = ctx->ss;
bpf_printk("ss: %lld", ss);
return 0;
}

View File

@ -0,0 +1,42 @@
// SPDX-License-Identifier: GPL-2.0
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
/*
Information gained from reversing this (multiple kernel versions):
There is no point of
```llvm
tail call void @llvm.dbg.value(metadata ptr %0, metadata !60, metadata !DIExpression()), !dbg !70
```
and the first argument of passthrough is fucking useless. It just needs to be a distinct integer:
```llvm
%9 = tail call ptr @llvm.bpf.passthrough.p0.p0(i32 3, ptr %8)
```
*/
SEC("tp/syscalls/sys_enter_execve")
int handle_setuid_entry(struct trace_event_raw_sys_enter *ctx) {
// Access each argument separately with clear variable assignments
long int id = ctx->id;
bpf_printk("This is context field %d", id);
/*
* the IR to aim for is
* %2 = alloca ptr, align 8
* store ptr %0, ptr %2, align 8
* Above, %0 is the arg pointer
* %5 = load ptr, ptr %2, align 8
* %6 = getelementptr inbounds %struct.trace_event_raw_sys_enter, ptr %5, i32 0, i32 2
* %7 = load i64, ptr @"llvm.trace_event_raw_sys_enter:0:16$0:2:0", align 8
* %8 = bitcast ptr %5 to ptr
* %9 = getelementptr i8, ptr %8, i64 %7
* %10 = bitcast ptr %9 to ptr
* %11 = call ptr @llvm.bpf.passthrough.p0.p0(i32 0, ptr %10)
* %12 = load i64, ptr %11, align 8, !dbg !101
*
*/
return 0;
}
char LICENSE[] SEC("license") = "GPL";

121617
tests/c-form/vmlinux.h vendored

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
// xdp_rewrite.c
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <linux/if_ether.h>
SEC("xdp")
int xdp_rewrite_mac(struct xdp_md *ctx)
{
void *data_end = (void *)(long)ctx->data_end;
void *data = (void *)(long)ctx->data;
struct ethhdr *eth = data;
if ((void*)(eth + 1) > data_end)
return XDP_PASS;
__u8 new_src[ETH_ALEN] = {0x02,0x00,0x00,0x00,0x00,0x02};
for (int i = 0; i < ETH_ALEN; i++) eth->h_source[i] = new_src[i];
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";

View File

@ -0,0 +1,22 @@
from vmlinux import XDP_PASS
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
import logging
from ctypes import c_int64, c_void_p
@bpf
@section("kprobe/blk_mq_start_request")
def example(ctx: c_void_p) -> c_int64:
d = XDP_PASS # This gives an error, but
e = XDP_PASS + 0 # this does not
print(f"test1 {e} test2 {d}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("assignment_handling.py", "assignment_handling.ll", loglevel=logging.INFO)

View File

@ -0,0 +1,22 @@
from vmlinux import struct_request, struct_pt_regs
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
import logging
from ctypes import c_int64
@bpf
@section("kprobe/blk_mq_start_request")
def example(ctx: struct_pt_regs) -> c_int64:
req = struct_request(ctx.di)
c = req.__data_len
print(f"data length {c}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("requests.py", "requests.ll", loglevel=logging.INFO)

View File

@ -0,0 +1,21 @@
from vmlinux import struct_pt_regs, struct_request
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
import logging
from ctypes import c_int64
@bpf
@section("kprobe/blk_mq_start_request")
def example(ctx: struct_pt_regs) -> c_int64:
req = ctx.di
print(f"data length {req}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("requests2.py", "requests2.ll", loglevel=logging.INFO)

View File

@ -1,4 +1,4 @@
from pythonbpf import bpf, map, section, bpfglobal, compile, struct
from pythonbpf import bpf, map, section, bpfglobal, compile, struct, compile_to_ir
from ctypes import c_void_p, c_int64, c_int32, c_uint64
from pythonbpf.maps import HashMap
from pythonbpf.helper import ktime
@ -71,4 +71,5 @@ def LICENSE() -> str:
return "GPL"
compile_to_ir("comprehensive.py", "comprehensive.ll")
compile()

View File

@ -0,0 +1,53 @@
from pythonbpf import bpf, section, bpfglobal, BPF, trace_pipe
from pythonbpf import compile # noqa: F401
from vmlinux import struct_pt_regs
from ctypes import c_int64, c_int32, c_void_p # noqa: F401
@bpf
@section("kprobe/do_unlinkat")
def kprobe_execve(ctx: struct_pt_regs) -> c_int64:
r15 = ctx.r15
r14 = ctx.r14
r13 = ctx.r13
r12 = ctx.r12
bp = ctx.bp
bx = ctx.bx
r11 = ctx.r11
r10 = ctx.r10
r9 = ctx.r9
r8 = ctx.r8
ax = ctx.ax
cx = ctx.cx
dx = ctx.dx
si = ctx.si
di = ctx.di
orig_ax = ctx.orig_ax
ip = ctx.ip
cs = ctx.cs
flags = ctx.flags
sp = ctx.sp
ss = ctx.ss
print(f"r15={r15} r14={r14} r13={r13}")
print(f"r12={r12} rbp={bp} rbx={bx}")
print(f"r11={r11} r10={r10} r9={r9}")
print(f"r8={r8} rax={ax} rcx={cx}")
print(f"rdx={dx} rsi={si} rdi={di}")
print(f"orig_rax={orig_ax} rip={ip} cs={cs}")
print(f"eflags={flags} rsp={sp} ss={ss}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
b = BPF()
b.load()
b.attach_all()
trace_pipe()

View File

@ -0,0 +1,29 @@
import logging
from pythonbpf import bpf, section, bpfglobal, compile_to_ir
from pythonbpf import compile # noqa: F401
from vmlinux import TASK_COMM_LEN # noqa: F401
from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
from ctypes import c_int64, c_int32, c_void_p # noqa: F401
# from vmlinux import struct_uinput_device
# from vmlinux import struct_blk_integrity_iter
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: struct_trace_event_raw_sys_enter) -> c_int64:
b = ctx.id
print(f"This is context field {b}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile_to_ir("struct_field_access.py", "struct_field_access.ll", loglevel=logging.INFO)
compile()