87 Commits

Author SHA1 Message Date
7ae629e8f7 bump version to v0.1.5 2025-10-16 19:04:04 +05:30
dd734ea2aa Merge pull request #56 from pythonbpf/vmlinux-ir-gen
Adds IR and debug info generation capabilities for vmlinux imported structs
2025-10-16 18:59:32 +05:30
71d005b6b1 complete vmlinux struct name generation in IR.
* Breaks when it finds unions.
* Still does not support function pointers.
2025-10-16 18:58:28 +05:30
5d9a29ee8e format chore 2025-10-16 18:22:25 +05:30
041e538b53 fix errors. Does not support union name resolution yet. 2025-10-16 18:21:14 +05:30
5413cc793b something fixed itself. 2025-10-16 18:06:36 +05:30
f21837aefe support most bitfields 2025-10-16 04:13:04 +05:30
0f5c1fa752 format chore 2025-10-16 04:10:24 +05:30
de02731ea1 add support with ctypes getattr offset. Also supports bitfields.
* breaks when struct_ring_buffer_per_cpu
2025-10-16 04:08:06 +05:30
c22d85ceb8 add array field generation support 2025-10-15 23:56:04 +05:30
2b3c81affa TODO added for llvmlite attribute issue
*Refer: https://github.com/numba/llvmlite/issues/1331

Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
2025-10-15 21:35:28 +05:30
8372111616 add basic IR gen strategy 2025-10-15 21:25:53 +05:30
eb4ee64ee5 Revert "float vmlinux_assignments_symtab"
This reverts commit ce7b170fea.
2025-10-15 19:11:53 +05:30
ce7b170fea float vmlinux_assignments_symtab 2025-10-15 18:19:51 +05:30
9a60dd87e3 Merge pull request #55 from pythonbpf/vmlinux-ir-gen
remove bitfield support and add assignment support
2025-10-15 18:07:27 +05:30
c499fe7421 solve static typing issues 2025-10-15 18:05:57 +05:30
8239097fbb format chore 2025-10-15 17:49:38 +05:30
a4cfc2b7aa add assignments table and offset handler 2025-10-15 17:49:20 +05:30
69b73003ca setup skeleton for offset calculation 2025-10-15 04:42:38 +05:30
11e8e72188 add base for ir gen 2025-10-15 02:00:23 +05:30
d3f0e3b2ef remove tbaa_gen and make IR generator module 2025-10-14 03:09:18 +05:30
09ba749b46 Merge pull request #52 from pythonbpf/vmlinux-ir-gen
Dependency tree functionality to semantic analyser
2025-10-14 02:37:43 +05:30
a03d3e5d4c format chore 2025-10-14 02:36:04 +05:30
e1f9ac6ba0 add dependency tree functionality 2025-10-14 02:35:49 +05:30
27ab3aaf1e Cleanup codegen.py 2025-10-13 19:17:58 +05:30
b34f7dd68a format chore 2025-10-13 19:11:59 +05:30
69d8669e44 Merge pull request #28 from pythonbpf/vmlinux-working
Add compilation mechanism from vmlinux
Still  does not compile to IR. only does semantic analysis.
Another PR will be opened soon for IR generation.
2025-10-13 19:08:41 +05:30
d4f5a9c36e fix static typing errors 2025-10-13 19:07:06 +05:30
b2a57edf11 Simplify maps_pass 2025-10-13 19:01:01 +05:30
20ec307288 format chore 2025-10-13 19:00:59 +05:30
0b4c6264a8 complete dependency tree readiness resolution 2025-10-13 19:00:28 +05:30
6345fcdeff Remove unused is_helper_call from allocation_pass 2025-10-13 18:38:07 +05:30
6b41f1fb84 Move print logic to helper/printk_emitter.py 2025-10-13 18:32:51 +05:30
74d8014ade Move HelperHandlerRegistry to helper_registry.py 2025-10-13 18:21:50 +05:30
5d0a888542 Remove deadcode and seperate modules for pythonbpf.functions 2025-10-13 04:41:46 +05:30
0042280ff1 Rename public API and remove deadcode in return_utils 2025-10-13 04:23:58 +05:30
7a67041ea3 Move CallHandlerRegistry to expr/call_registry.py, annotate eval_expr 2025-10-13 04:16:22 +05:30
45e6ce5e5c Move deref_to_depth to expr/ir_ops.py 2025-10-13 04:01:27 +05:30
c5f0a2806f Make printk_emiiter return True to prevent bogus logger warnings in eval_expr 2025-10-13 02:40:34 +05:30
b0ea93a786 Merge pull request #50 from pythonbpf/dep_inv
Use dependency inversion to remove handler delayed import in eval_expr
2025-10-13 02:32:18 +05:30
fc058c4341 Use dependency inversion to remove handler delayed import in eval_expr 2025-10-13 02:28:00 +05:30
158cc42e1e Move binop handling logic to expr_pass, remove delayed imports of get_operand_value 2025-10-13 00:36:42 +05:30
2a1eabc10d Fix regression in struct_perf_output 2025-10-13 00:00:43 +05:30
31645f0316 Merge pull request #40 from pythonbpf/refactor_assign
Refactor assignment statement handling and the typing mechanism around it
2025-10-12 12:19:51 +05:30
e0ad1bfb0f Move bulk of allocation logic to allocation_pass 2025-10-12 12:14:46 +05:30
69bee5fee9 Seperate LocalSymbol from functions 2025-10-12 12:10:09 +05:30
2f1aaa4834 Fix typos 2025-10-12 11:41:01 +05:30
0f6971bcc2 Refactor allocate_mem 2025-10-12 11:34:40 +05:30
08c0ccf0ac Pass map_sym_tab to handle_struct_field_assign 2025-10-12 10:37:20 +05:30
64e44d0d58 Use handle_struct_field_assignment in handle_assign 2025-10-12 10:30:46 +05:30
3ad1b73c5a Add handle_struct_field_assignment to assign_pass 2025-10-12 10:19:52 +05:30
105c5a7bd0 Cleanup handle_assign 2025-10-12 10:12:45 +05:30
933d2a5c77 Fix comprehensive assignment test 2025-10-12 09:47:57 +05:30
b93f704eb8 Tweak the comprehensive assignment test 2025-10-12 09:46:16 +05:30
fa82dc7ebd Add comprehensive passing test for assignment 2025-10-12 09:39:33 +05:30
e8026a13bf Allow helpers to be called within themselves 2025-10-12 09:30:37 +05:30
a3b4d09652 Fix errorstring in _handle_unary_op 2025-10-12 09:13:04 +05:30
4e33fd4a32 Add negation UnaryOp 2025-10-12 09:11:56 +05:30
2cf68f6473 Allow map-based helpers to be used as helper args / within binops which are helper args 2025-10-12 07:57:55 +05:30
d66e6a6aff Allow struct members as helper args 2025-10-12 06:00:50 +05:30
cd74e896cf Allow binops as args to helpers accepting int* 2025-10-12 04:20:46 +05:30
207f714027 Use scratch space to store consts passed to helpers 2025-10-12 04:17:37 +05:30
5dcf670f49 Add ScratchPoolManager and it's singleton 2025-10-12 01:47:11 +05:30
6bce29b90f Allocate scratch space for temp vars at the end of allocate_mem 2025-10-12 00:37:57 +05:30
321415fa28 Add update_max_temps_for_stmt in allocate_mem 2025-10-12 00:33:07 +05:30
8776d7607f Add count_temps_in_call to call scratch space needed in a helper call 2025-10-12 00:17:10 +05:30
8b7b1c08a5 Add struct_and_helper_binops passing test for assignments 2025-10-11 22:03:32 +05:30
c9bbe1ffd8 Call eval_expr properly within get_operand_value 2025-10-11 03:21:09 +05:30
91a3fe140d Remove unnecessary return artifacts from get_operand_value 2025-10-11 03:06:24 +05:30
c2c17741e5 Remove store_through_chain 2025-10-11 03:04:26 +05:30
cac88d1560 Allow different int widths in binops 2025-10-11 02:44:08 +05:30
317575644f Interpret bools as ints in binops 2025-10-11 00:18:11 +05:30
a756f5e4b7 Add passing helper test for assignment 2025-10-10 23:55:12 +05:30
7529820c0b Allow int** pointers to store binops of type int** op int 2025-10-10 20:36:37 +05:30
9febadffd3 Add pointer handling to helper_utils, finish pointer assignment 2025-10-10 15:01:15 +05:30
99aacca94b WIP: allow pointer assignments to var 2025-10-10 13:48:40 +05:30
1d517d4e09 Add double_alloc in alloc_mem 2025-10-10 12:28:45 +05:30
047f361ea9 Allocate twice for map lookups 2025-10-10 06:09:46 +05:30
489244a015 Add store_through_chain 2025-10-10 02:56:11 +05:30
8bab07ed72 Remove recursive_dereferencer 2025-10-10 00:13:35 +05:30
1253f51ff3 Use deref_to_val instead of recursive_dereferencer in get_operand value 2025-10-09 23:11:06 +05:30
23afb0bd33 Add deref_to_val to deref into final value and return the chain as well in binops 2025-10-09 21:47:28 +05:30
c596213b2a Add cst_var_binop.py as passing assign test 2025-10-09 03:42:25 +05:30
054a834464 Add failing assign test retype.py, with explanation 2025-10-09 03:28:07 +05:30
d7bfe86524 Add handle_variable_assignment to assign_pass 2025-10-09 03:09:10 +05:30
84ed27f222 Add handle_variable_assignment stub and boilerplate in handle_assign 2025-10-08 22:55:03 +05:30
6008d9841f Change loglevel of multi-assignment warning in handle_assign 2025-10-08 22:45:09 +05:30
39 changed files with 2102 additions and 1128 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ __pycache__/
.ipynb_checkpoints/
vmlinux.py
~*
vmlinux.h

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "pythonbpf"
version = "0.1.4"
version = "0.1.5"
description = "Reduced Python frontend for eBPF"
authors = [
{ name = "r41k0u", email="pragyanshchaturvedi18@gmail.com" },

View File

@ -0,0 +1,178 @@
import ast
import logging
from llvmlite import ir
from dataclasses import dataclass
from typing import Any
from pythonbpf.helper import HelperHandlerRegistry
from pythonbpf.type_deducer import ctypes_to_ir
logger = logging.getLogger(__name__)
@dataclass
class LocalSymbol:
var: ir.AllocaInstr
ir_type: ir.Type
metadata: Any = None
def __iter__(self):
yield self.var
yield self.ir_type
yield self.metadata
def handle_assign_allocation(builder, stmt, local_sym_tab, structs_sym_tab):
"""Handle memory allocation for assignment statements."""
# Validate assignment
if len(stmt.targets) != 1:
logger.warning("Multi-target assignment not supported, skipping allocation")
return
target = stmt.targets[0]
# Skip non-name targets (e.g., struct field assignments)
if isinstance(target, ast.Attribute):
logger.debug(f"Struct field assignment to {target.attr}, no allocation needed")
return
if not isinstance(target, ast.Name):
logger.warning(f"Unsupported assignment target type: {type(target).__name__}")
return
var_name = target.id
rval = stmt.value
# Skip if already allocated
if var_name in local_sym_tab:
logger.debug(f"Variable {var_name} already allocated, skipping")
return
# Determine type and allocate based on rval
if isinstance(rval, ast.Call):
_allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab)
elif isinstance(rval, ast.Constant):
_allocate_for_constant(builder, var_name, rval, local_sym_tab)
elif isinstance(rval, ast.BinOp):
_allocate_for_binop(builder, var_name, local_sym_tab)
else:
logger.warning(
f"Unsupported assignment value type for {var_name}: {type(rval).__name__}"
)
def _allocate_for_call(builder, var_name, rval, local_sym_tab, structs_sym_tab):
"""Allocate memory for variable assigned from a call."""
if isinstance(rval.func, ast.Name):
call_type = rval.func.id
# C type constructors
if call_type in ("c_int32", "c_int64", "c_uint32", "c_uint64"):
ir_type = ctypes_to_ir(call_type)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} as {call_type}")
# Helper functions
elif HelperHandlerRegistry.has_handler(call_type):
ir_type = ir.IntType(64) # Assume i64 return type
var = builder.alloca(ir_type, name=var_name)
var.align = 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} for helper {call_type}")
# Deref function
elif call_type == "deref":
ir_type = ir.IntType(64) # Assume i64 return type
var = builder.alloca(ir_type, name=var_name)
var.align = 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} for deref")
# Struct constructors
elif call_type in structs_sym_tab:
struct_info = structs_sym_tab[call_type]
var = builder.alloca(struct_info.ir_type, name=var_name)
local_sym_tab[var_name] = LocalSymbol(var, struct_info.ir_type, call_type)
logger.info(f"Pre-allocated {var_name} for struct {call_type}")
else:
logger.warning(f"Unknown call type for allocation: {call_type}")
elif isinstance(rval.func, ast.Attribute):
# Map method calls - need double allocation for ptr handling
_allocate_for_map_method(builder, var_name, local_sym_tab)
else:
logger.warning(f"Unsupported call function type for {var_name}")
def _allocate_for_map_method(builder, var_name, local_sym_tab):
"""Allocate memory for variable assigned from map method (double alloc)."""
# Main variable (pointer to pointer)
ir_type = ir.PointerType(ir.IntType(64))
var = builder.alloca(ir_type, name=var_name)
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
# Temporary variable for computed values
tmp_ir_type = ir.IntType(64)
var_tmp = builder.alloca(tmp_ir_type, name=f"{var_name}_tmp")
local_sym_tab[f"{var_name}_tmp"] = LocalSymbol(var_tmp, tmp_ir_type)
logger.info(f"Pre-allocated {var_name} and {var_name}_tmp for map method")
def _allocate_for_constant(builder, var_name, rval, local_sym_tab):
"""Allocate memory for variable assigned from a constant."""
if isinstance(rval.value, bool):
ir_type = ir.IntType(1)
var = builder.alloca(ir_type, name=var_name)
var.align = 1
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} as bool")
elif isinstance(rval.value, int):
ir_type = ir.IntType(64)
var = builder.alloca(ir_type, name=var_name)
var.align = 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} as i64")
elif isinstance(rval.value, str):
ir_type = ir.PointerType(ir.IntType(8))
var = builder.alloca(ir_type, name=var_name)
var.align = 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} as string")
else:
logger.warning(
f"Unsupported constant type for {var_name}: {type(rval.value).__name__}"
)
def _allocate_for_binop(builder, var_name, local_sym_tab):
"""Allocate memory for variable assigned from a binary operation."""
ir_type = ir.IntType(64) # Assume i64 result
var = builder.alloca(ir_type, name=var_name)
var.align = 8
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
logger.info(f"Pre-allocated {var_name} for binop result")
def allocate_temp_pool(builder, max_temps, local_sym_tab):
"""Allocate the temporary scratch space pool for helper arguments."""
if max_temps == 0:
return
logger.info(f"Allocating temp pool of {max_temps} variables")
for i in range(max_temps):
temp_name = f"__helper_temp_{i}"
temp_var = builder.alloca(ir.IntType(64), name=temp_name)
temp_var.align = 8
local_sym_tab[temp_name] = LocalSymbol(temp_var, ir.IntType(64))

108
pythonbpf/assign_pass.py Normal file
View File

@ -0,0 +1,108 @@
import ast
import logging
from llvmlite import ir
from pythonbpf.expr import eval_expr
logger = logging.getLogger(__name__)
def handle_struct_field_assignment(
func, module, builder, target, rval, local_sym_tab, map_sym_tab, structs_sym_tab
):
"""Handle struct field assignment (obj.field = value)."""
var_name = target.value.id
field_name = target.attr
if var_name not in local_sym_tab:
logger.error(f"Variable '{var_name}' not found in symbol table")
return
struct_type = local_sym_tab[var_name].metadata
struct_info = structs_sym_tab[struct_type]
if field_name not in struct_info.fields:
logger.error(f"Field '{field_name}' not found in struct '{struct_type}'")
return
# Get field pointer and evaluate value
field_ptr = struct_info.gep(builder, local_sym_tab[var_name].var, field_name)
val = eval_expr(
func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab
)
if val is None:
logger.error(f"Failed to evaluate value for {var_name}.{field_name}")
return
# TODO: Handle string assignment to char array (not a priority)
field_type = struct_info.field_type(field_name)
if isinstance(field_type, ir.ArrayType) and val[1] == ir.PointerType(ir.IntType(8)):
logger.warning(
f"String to char array assignment not implemented for {var_name}.{field_name}"
)
return
# Store the value
builder.store(val[0], field_ptr)
logger.info(f"Assigned to struct field {var_name}.{field_name}")
def handle_variable_assignment(
func, module, builder, var_name, rval, local_sym_tab, map_sym_tab, structs_sym_tab
):
"""Handle single named variable assignment."""
if var_name not in local_sym_tab:
logger.error(f"Variable {var_name} not declared.")
return False
var_ptr = local_sym_tab[var_name].var
var_type = local_sym_tab[var_name].ir_type
# NOTE: Special case for struct initialization
if isinstance(rval, ast.Call) and isinstance(rval.func, ast.Name):
struct_name = rval.func.id
if struct_name in structs_sym_tab and len(rval.args) == 0:
struct_info = structs_sym_tab[struct_name]
ir_struct = struct_info.ir_type
builder.store(ir.Constant(ir_struct, None), var_ptr)
logger.info(f"Initialized struct {struct_name} for variable {var_name}")
return True
val_result = eval_expr(
func, module, builder, rval, local_sym_tab, map_sym_tab, structs_sym_tab
)
if val_result is None:
logger.error(f"Failed to evaluate value for {var_name}")
return False
val, val_type = val_result
logger.info(f"Evaluated value for {var_name}: {val} of type {val_type}, {var_type}")
if val_type != var_type:
if isinstance(val_type, ir.IntType) and isinstance(var_type, ir.IntType):
# Allow implicit int widening
if val_type.width < var_type.width:
val = builder.sext(val, var_type)
logger.info(f"Implicitly widened int for variable {var_name}")
elif val_type.width > var_type.width:
val = builder.trunc(val, var_type)
logger.info(f"Implicitly truncated int for variable {var_name}")
elif isinstance(val_type, ir.IntType) and isinstance(var_type, ir.PointerType):
# NOTE: This is assignment to a PTR_TO_MAP_VALUE_OR_NULL
logger.info(
f"Creating temporary variable for pointer assignment to {var_name}"
)
var_ptr_tmp = local_sym_tab[f"{var_name}_tmp"].var
builder.store(val, var_ptr_tmp)
val = var_ptr_tmp
else:
logger.error(
f"Type mismatch for variable {var_name}: {val_type} vs {var_type}"
)
return False
builder.store(val, var_ptr)
logger.info(f"Assigned value to variable {var_name}")
return True

View File

@ -1,72 +0,0 @@
import ast
from llvmlite import ir
from logging import Logger
import logging
logger: Logger = logging.getLogger(__name__)
def recursive_dereferencer(var, builder):
"""dereference until primitive type comes out"""
# TODO: Not worrying about stack overflow for now
logger.info(f"Dereferencing {var}, type is {var.type}")
if isinstance(var.type, ir.PointerType):
a = builder.load(var)
return recursive_dereferencer(a, builder)
elif isinstance(var.type, ir.IntType):
return var
else:
raise TypeError(f"Unsupported type for dereferencing: {var.type}")
def get_operand_value(operand, builder, local_sym_tab):
"""Extract the value from an operand, handling variables and constants."""
if isinstance(operand, ast.Name):
if operand.id in local_sym_tab:
return recursive_dereferencer(local_sym_tab[operand.id].var, builder)
raise ValueError(f"Undefined variable: {operand.id}")
elif isinstance(operand, ast.Constant):
if isinstance(operand.value, int):
return ir.Constant(ir.IntType(64), operand.value)
raise TypeError(f"Unsupported constant type: {type(operand.value)}")
elif isinstance(operand, ast.BinOp):
return handle_binary_op_impl(operand, builder, local_sym_tab)
raise TypeError(f"Unsupported operand type: {type(operand)}")
def handle_binary_op_impl(rval, builder, local_sym_tab):
op = rval.op
left = get_operand_value(rval.left, builder, local_sym_tab)
right = get_operand_value(rval.right, builder, local_sym_tab)
logger.info(f"left is {left}, right is {right}, op is {op}")
# Map AST operation nodes to LLVM IR builder methods
op_map = {
ast.Add: builder.add,
ast.Sub: builder.sub,
ast.Mult: builder.mul,
ast.Div: builder.sdiv,
ast.Mod: builder.srem,
ast.LShift: builder.shl,
ast.RShift: builder.lshr,
ast.BitOr: builder.or_,
ast.BitXor: builder.xor,
ast.BitAnd: builder.and_,
ast.FloorDiv: builder.udiv,
}
if type(op) in op_map:
result = op_map[type(op)](left, right)
return result
else:
raise SyntaxError("Unsupported binary operation")
def handle_binary_op(rval, builder, var_name, local_sym_tab):
result = handle_binary_op_impl(rval, builder, local_sym_tab)
if var_name and var_name in local_sym_tab:
logger.info(
f"Storing result {result} into variable {local_sym_tab[var_name].var}"
)
builder.store(result, local_sym_tab[var_name].var)
return result, result.type

View File

@ -19,10 +19,20 @@ from pylibbpf import BpfProgram
import tempfile
from logging import Logger
import logging
import re
logger: Logger = logging.getLogger(__name__)
VERSION = "v0.1.4"
VERSION = "v0.1.5"
def finalize_module(original_str):
"""After all IR generation is complete, we monkey patch btf_ama attribute"""
# Create a string with applied transformation of btf_ama attribute addition to BTF struct field accesses.
pattern = r'(@"llvm\.[^"]+:[^"]*" = external global i64, !llvm\.preserve\.access\.index ![0-9]+)'
replacement = r'\1 "btf_ama"'
return re.sub(pattern, replacement, original_str)
def find_bpf_chunks(tree):
@ -121,15 +131,44 @@ def compile_to_ir(filename: str, output: str, loglevel=logging.INFO):
module.add_named_metadata("llvm.ident", [f"PythonBPF {VERSION}"])
module_string = finalize_module(str(module))
logger.info(f"IR written to {output}")
with open(output, "w") as f:
f.write(f'source_filename = "{filename}"\n')
f.write(str(module))
f.write(module_string)
f.write("\n")
return output
def _run_llc(ll_file, obj_file):
"""Compile LLVM IR to BPF object file using llc."""
logger.info(f"Compiling IR to object: {ll_file} -> {obj_file}")
result = subprocess.run(
[
"llc",
"-march=bpf",
"-filetype=obj",
"-O2",
str(ll_file),
"-o",
str(obj_file),
],
check=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info(f"Object file written to {obj_file}")
return True
else:
logger.error(f"llc compilation failed: {result.stderr}")
return False
def compile(loglevel=logging.INFO) -> bool:
# Look one level up the stack to the caller of this function
caller_frame = inspect.stack()[1]
@ -143,21 +182,7 @@ def compile(loglevel=logging.INFO) -> bool:
compile_to_ir(str(caller_file), str(ll_file), loglevel=loglevel) and success
)
success = bool(
subprocess.run(
[
"llc",
"-march=bpf",
"-filetype=obj",
"-O2",
str(ll_file),
"-o",
str(o_file),
],
check=True,
)
and success
)
success = _run_llc(ll_file, o_file) and success
logger.info(f"Object written to {o_file}")
return success
@ -177,17 +202,6 @@ def BPF(loglevel=logging.INFO) -> BpfProgram:
f.flush()
source = f.name
compile_to_ir(source, str(inter.name), loglevel=loglevel)
subprocess.run(
[
"llc",
"-march=bpf",
"-filetype=obj",
"-O2",
str(inter.name),
"-o",
str(obj_file.name),
],
check=True,
)
_run_llc(str(inter.name), str(obj_file.name))
return BpfProgram(str(obj_file.name))

View File

@ -1,4 +1,14 @@
from .expr_pass import eval_expr, handle_expr
from .type_normalization import convert_to_bool
from .expr_pass import eval_expr, handle_expr, get_operand_value
from .type_normalization import convert_to_bool, get_base_type_and_depth
from .ir_ops import deref_to_depth
from .call_registry import CallHandlerRegistry
__all__ = ["eval_expr", "handle_expr", "convert_to_bool"]
__all__ = [
"eval_expr",
"handle_expr",
"convert_to_bool",
"get_base_type_and_depth",
"deref_to_depth",
"get_operand_value",
"CallHandlerRegistry",
]

View File

@ -0,0 +1,20 @@
class CallHandlerRegistry:
"""Registry for handling different types of calls (helpers, etc.)"""
_handler = None
@classmethod
def set_handler(cls, handler):
"""Set the handler for unknown calls"""
cls._handler = handler
@classmethod
def handle_call(
cls, call, module, builder, func, local_sym_tab, map_sym_tab, structs_sym_tab
):
"""Handle a call using the registered handler"""
if cls._handler is None:
return None
return cls._handler(
call, module, builder, func, local_sym_tab, map_sym_tab, structs_sym_tab
)

View File

@ -5,10 +5,20 @@ import logging
from typing import Dict
from pythonbpf.type_deducer import ctypes_to_ir, is_ctypes
from .type_normalization import convert_to_bool, handle_comparator
from .call_registry import CallHandlerRegistry
from .type_normalization import (
convert_to_bool,
handle_comparator,
get_base_type_and_depth,
deref_to_depth,
)
logger: Logger = logging.getLogger(__name__)
# ============================================================================
# Leaf Handlers (No Recursive eval_expr calls)
# ============================================================================
def _handle_name_expr(expr: ast.Name, local_sym_tab: Dict, builder: ir.IRBuilder):
"""Handle ast.Name expressions."""
@ -21,12 +31,26 @@ def _handle_name_expr(expr: ast.Name, local_sym_tab: Dict, builder: ir.IRBuilder
return None
def _handle_constant_expr(expr: ast.Constant):
def _handle_constant_expr(module, builder, expr: ast.Constant):
"""Handle ast.Constant expressions."""
if isinstance(expr.value, int) or isinstance(expr.value, bool):
return ir.Constant(ir.IntType(64), int(expr.value)), ir.IntType(64)
elif isinstance(expr.value, str):
str_name = f".str.{id(expr)}"
str_bytes = expr.value.encode("utf-8") + b"\x00"
str_type = ir.ArrayType(ir.IntType(8), len(str_bytes))
str_constant = ir.Constant(str_type, bytearray(str_bytes))
# Create global variable
global_str = ir.GlobalVariable(module, str_type, name=str_name)
global_str.linkage = "internal"
global_str.global_constant = True
global_str.initializer = str_constant
str_ptr = builder.bitcast(global_str, ir.PointerType(ir.IntType(8)))
return str_ptr, ir.PointerType(ir.IntType(8))
else:
logger.error("Unsupported constant type")
logger.error(f"Unsupported constant type {ast.dump(expr)}")
return None
@ -88,6 +112,118 @@ def _handle_deref_call(expr: ast.Call, local_sym_tab: Dict, builder: ir.IRBuilde
return val, local_sym_tab[arg.id].ir_type
# ============================================================================
# Binary Operations
# ============================================================================
def get_operand_value(
func, module, operand, builder, local_sym_tab, map_sym_tab, structs_sym_tab=None
):
"""Extract the value from an operand, handling variables and constants."""
logger.info(f"Getting operand value for: {ast.dump(operand)}")
if isinstance(operand, ast.Name):
if operand.id in local_sym_tab:
var = local_sym_tab[operand.id].var
var_type = var.type
base_type, depth = get_base_type_and_depth(var_type)
logger.info(f"var is {var}, base_type is {base_type}, depth is {depth}")
val = deref_to_depth(func, builder, var, depth)
return val
raise ValueError(f"Undefined variable: {operand.id}")
elif isinstance(operand, ast.Constant):
if isinstance(operand.value, int):
cst = ir.Constant(ir.IntType(64), int(operand.value))
return cst
raise TypeError(f"Unsupported constant type: {type(operand.value)}")
elif isinstance(operand, ast.BinOp):
res = _handle_binary_op_impl(
func, module, operand, builder, local_sym_tab, map_sym_tab, structs_sym_tab
)
return res
else:
res = eval_expr(
func, module, builder, operand, local_sym_tab, map_sym_tab, structs_sym_tab
)
if res is None:
raise ValueError(f"Failed to evaluate call expression: {operand}")
val, _ = res
logger.info(f"Evaluated expr to {val} of type {val.type}")
base_type, depth = get_base_type_and_depth(val.type)
if depth > 0:
val = deref_to_depth(func, builder, val, depth)
return val
raise TypeError(f"Unsupported operand type: {type(operand)}")
def _handle_binary_op_impl(
func, module, rval, builder, local_sym_tab, map_sym_tab, structs_sym_tab=None
):
op = rval.op
left = get_operand_value(
func, module, rval.left, builder, local_sym_tab, map_sym_tab, structs_sym_tab
)
right = get_operand_value(
func, module, rval.right, builder, local_sym_tab, map_sym_tab, structs_sym_tab
)
logger.info(f"left is {left}, right is {right}, op is {op}")
# NOTE: Before doing the operation, if the operands are integers
# we always extend them to i64. The assignment to LHS will take
# care of truncation if needed.
if isinstance(left.type, ir.IntType) and left.type.width < 64:
left = builder.sext(left, ir.IntType(64))
if isinstance(right.type, ir.IntType) and right.type.width < 64:
right = builder.sext(right, ir.IntType(64))
# Map AST operation nodes to LLVM IR builder methods
op_map = {
ast.Add: builder.add,
ast.Sub: builder.sub,
ast.Mult: builder.mul,
ast.Div: builder.sdiv,
ast.Mod: builder.srem,
ast.LShift: builder.shl,
ast.RShift: builder.lshr,
ast.BitOr: builder.or_,
ast.BitXor: builder.xor,
ast.BitAnd: builder.and_,
ast.FloorDiv: builder.udiv,
}
if type(op) in op_map:
result = op_map[type(op)](left, right)
return result
else:
raise SyntaxError("Unsupported binary operation")
def _handle_binary_op(
func,
module,
rval,
builder,
var_name,
local_sym_tab,
map_sym_tab,
structs_sym_tab=None,
):
result = _handle_binary_op_impl(
func, module, rval, builder, local_sym_tab, map_sym_tab, structs_sym_tab
)
if var_name and var_name in local_sym_tab:
logger.info(
f"Storing result {result} into variable {local_sym_tab[var_name].var}"
)
builder.store(result, local_sym_tab[var_name].var)
return result, result.type
# ============================================================================
# Comparison and Unary Operations
# ============================================================================
def _handle_ctypes_call(
func,
module,
@ -176,21 +312,31 @@ def _handle_unary_op(
structs_sym_tab=None,
):
"""Handle ast.UnaryOp expressions."""
if not isinstance(expr.op, ast.Not):
logger.error("Only 'not' unary operator is supported")
if not isinstance(expr.op, ast.Not) and not isinstance(expr.op, ast.USub):
logger.error("Only 'not' and '-' unary operators are supported")
return None
operand = eval_expr(
func, module, builder, expr.operand, local_sym_tab, map_sym_tab, structs_sym_tab
operand = get_operand_value(
func, module, expr.operand, builder, local_sym_tab, map_sym_tab, structs_sym_tab
)
if operand is None:
logger.error("Failed to evaluate operand for unary operation")
return None
operand_val, operand_type = operand
true_const = ir.Constant(ir.IntType(1), 1)
result = builder.xor(convert_to_bool(builder, operand_val), true_const)
return result, ir.IntType(1)
if isinstance(expr.op, ast.Not):
true_const = ir.Constant(ir.IntType(1), 1)
result = builder.xor(convert_to_bool(builder, operand), true_const)
return result, ir.IntType(1)
elif isinstance(expr.op, ast.USub):
# Multiply by -1
neg_one = ir.Constant(ir.IntType(64), -1)
result = builder.mul(operand, neg_one)
return result, ir.IntType(64)
# ============================================================================
# Boolean Operations
# ============================================================================
def _handle_and_op(func, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab):
@ -323,6 +469,11 @@ def _handle_boolean_op(
return None
# ============================================================================
# Expression Dispatcher
# ============================================================================
def eval_expr(
func,
module,
@ -336,7 +487,7 @@ def eval_expr(
if isinstance(expr, ast.Name):
return _handle_name_expr(expr, local_sym_tab, builder)
elif isinstance(expr, ast.Constant):
return _handle_constant_expr(expr)
return _handle_constant_expr(module, builder, expr)
elif isinstance(expr, ast.Call):
if isinstance(expr.func, ast.Name) and expr.func.id == "deref":
return _handle_deref_call(expr, local_sym_tab, builder)
@ -352,57 +503,27 @@ def eval_expr(
structs_sym_tab,
)
# delayed import to avoid circular dependency
from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
result = CallHandlerRegistry.handle_call(
expr, module, builder, func, local_sym_tab, map_sym_tab, structs_sym_tab
)
if result is not None:
return result
if isinstance(expr.func, ast.Name) and HelperHandlerRegistry.has_handler(
expr.func.id
):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func, ast.Attribute):
logger.info(f"Handling method call: {ast.dump(expr.func)}")
if isinstance(expr.func.value, ast.Call) and isinstance(
expr.func.value.func, ast.Name
):
method_name = expr.func.attr
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr.func.value, ast.Name):
obj_name = expr.func.value.id
method_name = expr.func.attr
if obj_name in map_sym_tab:
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
expr,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
logger.warning(f"Unknown call: {ast.dump(expr)}")
return None
elif isinstance(expr, ast.Attribute):
return _handle_attribute_expr(expr, local_sym_tab, structs_sym_tab, builder)
elif isinstance(expr, ast.BinOp):
from pythonbpf.binary_ops import handle_binary_op
return handle_binary_op(expr, builder, None, local_sym_tab)
return _handle_binary_op(
func,
module,
expr,
builder,
None,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
elif isinstance(expr, ast.Compare):
return _handle_compare(
func, module, builder, expr, local_sym_tab, map_sym_tab, structs_sym_tab

50
pythonbpf/expr/ir_ops.py Normal file
View File

@ -0,0 +1,50 @@
import logging
from llvmlite import ir
logger = logging.getLogger(__name__)
def deref_to_depth(func, builder, val, target_depth):
"""Dereference a pointer to a certain depth."""
cur_val = val
cur_type = val.type
for depth in range(target_depth):
if not isinstance(val.type, ir.PointerType):
logger.error("Cannot dereference further, non-pointer type")
return None
# dereference with null check
pointee_type = cur_type.pointee
null_check_block = builder.block
not_null_block = func.append_basic_block(name=f"deref_not_null_{depth}")
merge_block = func.append_basic_block(name=f"deref_merge_{depth}")
null_ptr = ir.Constant(cur_type, None)
is_not_null = builder.icmp_signed("!=", cur_val, null_ptr)
logger.debug(f"Inserted null check for pointer at depth {depth}")
builder.cbranch(is_not_null, not_null_block, merge_block)
builder.position_at_end(not_null_block)
dereferenced_val = builder.load(cur_val)
logger.debug(f"Dereferenced to depth {depth - 1}, type: {pointee_type}")
builder.branch(merge_block)
builder.position_at_end(merge_block)
phi = builder.phi(pointee_type, name=f"deref_result_{depth}")
zero_value = (
ir.Constant(pointee_type, 0)
if isinstance(pointee_type, ir.IntType)
else ir.Constant(pointee_type, None)
)
phi.add_incoming(zero_value, null_check_block)
phi.add_incoming(dereferenced_val, not_null_block)
# Continue with phi result
cur_val = phi
cur_type = pointee_type
return cur_val

View File

@ -1,6 +1,7 @@
from llvmlite import ir
import logging
import ast
from llvmlite import ir
from .ir_ops import deref_to_depth
logger = logging.getLogger(__name__)
@ -16,7 +17,7 @@ COMPARISON_OPS = {
}
def _get_base_type_and_depth(ir_type):
def get_base_type_and_depth(ir_type):
"""Get the base type for pointer types."""
cur_type = ir_type
depth = 0
@ -26,52 +27,6 @@ def _get_base_type_and_depth(ir_type):
return cur_type, depth
def _deref_to_depth(func, builder, val, target_depth):
"""Dereference a pointer to a certain depth."""
cur_val = val
cur_type = val.type
for depth in range(target_depth):
if not isinstance(val.type, ir.PointerType):
logger.error("Cannot dereference further, non-pointer type")
return None
# dereference with null check
pointee_type = cur_type.pointee
null_check_block = builder.block
not_null_block = func.append_basic_block(name=f"deref_not_null_{depth}")
merge_block = func.append_basic_block(name=f"deref_merge_{depth}")
null_ptr = ir.Constant(cur_type, None)
is_not_null = builder.icmp_signed("!=", cur_val, null_ptr)
logger.debug(f"Inserted null check for pointer at depth {depth}")
builder.cbranch(is_not_null, not_null_block, merge_block)
builder.position_at_end(not_null_block)
dereferenced_val = builder.load(cur_val)
logger.debug(f"Dereferenced to depth {depth - 1}, type: {pointee_type}")
builder.branch(merge_block)
builder.position_at_end(merge_block)
phi = builder.phi(pointee_type, name=f"deref_result_{depth}")
zero_value = (
ir.Constant(pointee_type, 0)
if isinstance(pointee_type, ir.IntType)
else ir.Constant(pointee_type, None)
)
phi.add_incoming(zero_value, null_check_block)
phi.add_incoming(dereferenced_val, not_null_block)
# Continue with phi result
cur_val = phi
cur_type = pointee_type
return cur_val
def _normalize_types(func, builder, lhs, rhs):
"""Normalize types for comparison."""
@ -88,13 +43,13 @@ def _normalize_types(func, builder, lhs, rhs):
logger.error(f"Type mismatch: {lhs.type} vs {rhs.type}")
return None, None
else:
lhs_base, lhs_depth = _get_base_type_and_depth(lhs.type)
rhs_base, rhs_depth = _get_base_type_and_depth(rhs.type)
lhs_base, lhs_depth = get_base_type_and_depth(lhs.type)
rhs_base, rhs_depth = get_base_type_and_depth(rhs.type)
if lhs_base == rhs_base:
if lhs_depth < rhs_depth:
rhs = _deref_to_depth(func, builder, rhs, rhs_depth - lhs_depth)
rhs = deref_to_depth(func, builder, rhs, rhs_depth - lhs_depth)
elif rhs_depth < lhs_depth:
lhs = _deref_to_depth(func, builder, lhs, lhs_depth - rhs_depth)
lhs = deref_to_depth(func, builder, lhs, lhs_depth - rhs_depth)
return _normalize_types(func, builder, lhs, rhs)

View File

@ -1,22 +0,0 @@
from typing import Dict
class StatementHandlerRegistry:
"""Registry for statement handlers."""
_handlers: Dict = {}
@classmethod
def register(cls, stmt_type):
"""Register a handler for a specific statement type."""
def decorator(handler):
cls._handlers[stmt_type] = handler
return handler
return decorator
@classmethod
def __getitem__(cls, stmt_type):
"""Get the handler for a specific statement type."""
return cls._handlers.get(stmt_type, None)

View File

@ -0,0 +1,88 @@
import ast
def get_probe_string(func_node):
"""Extract the probe string from the decorator of the function node"""
# TODO: right now we have the whole string in the section decorator
# But later we can implement typed tuples for tracepoints and kprobes
# For helper functions, we return "helper"
for decorator in func_node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "bpfglobal":
return None
if isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Name):
if decorator.func.id == "section" and len(decorator.args) == 1:
arg = decorator.args[0]
if isinstance(arg, ast.Constant) and isinstance(arg.value, str):
return arg.value
return "helper"
def is_global_function(func_node):
"""Check if the function is a global"""
for decorator in func_node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id in (
"map",
"bpfglobal",
"struct",
):
return True
return False
def infer_return_type(func_node: ast.FunctionDef):
if not isinstance(func_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise TypeError("Expected ast.FunctionDef")
if func_node.returns is not None:
try:
return ast.unparse(func_node.returns)
except Exception:
node = func_node.returns
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
return getattr(node, "attr", type(node).__name__)
try:
return str(node)
except Exception:
return type(node).__name__
found_type = None
def _expr_type(e):
if e is None:
return "None"
if isinstance(e, ast.Constant):
return type(e.value).__name__
if isinstance(e, ast.Name):
return e.id
if isinstance(e, ast.Call):
f = e.func
if isinstance(f, ast.Name):
return f.id
if isinstance(f, ast.Attribute):
try:
return ast.unparse(f)
except Exception:
return getattr(f, "attr", type(f).__name__)
try:
return ast.unparse(f)
except Exception:
return type(f).__name__
if isinstance(e, ast.Attribute):
try:
return ast.unparse(e)
except Exception:
return getattr(e, "attr", type(e).__name__)
try:
return ast.unparse(e)
except Exception:
return type(e).__name__
for walked_node in ast.walk(func_node):
if isinstance(walked_node, ast.Return):
t = _expr_type(walked_node.value)
if found_type is None:
found_type = t
elif found_type != t:
raise ValueError(f"Conflicting return types: {found_type} vs {t}")
return found_type or "None"

View File

@ -1,243 +1,187 @@
from llvmlite import ir
import ast
import logging
from typing import Any
from dataclasses import dataclass
from pythonbpf.helper import HelperHandlerRegistry, handle_helper_call
from pythonbpf.helper import (
HelperHandlerRegistry,
reset_scratch_pool,
)
from pythonbpf.type_deducer import ctypes_to_ir
from pythonbpf.binary_ops import handle_binary_op
from pythonbpf.expr import eval_expr, handle_expr, convert_to_bool
from pythonbpf.assign_pass import (
handle_variable_assignment,
handle_struct_field_assignment,
)
from pythonbpf.allocation_pass import handle_assign_allocation, allocate_temp_pool
from .return_utils import _handle_none_return, _handle_xdp_return, _is_xdp_name
from .return_utils import handle_none_return, handle_xdp_return, is_xdp_name
from .function_metadata import get_probe_string, is_global_function, infer_return_type
logger = logging.getLogger(__name__)
@dataclass
class LocalSymbol:
var: ir.AllocaInstr
ir_type: ir.Type
metadata: Any = None
def __iter__(self):
yield self.var
yield self.ir_type
yield self.metadata
# ============================================================================
# SECTION 1: Memory Allocation
# ============================================================================
def get_probe_string(func_node):
"""Extract the probe string from the decorator of the function node."""
# TODO: right now we have the whole string in the section decorator
# But later we can implement typed tuples for tracepoints and kprobes
# For helper functions, we return "helper"
def count_temps_in_call(call_node, local_sym_tab):
"""Count the number of temporary variables needed for a function call."""
for decorator in func_node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "bpfglobal":
return None
if isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Name):
if decorator.func.id == "section" and len(decorator.args) == 1:
arg = decorator.args[0]
if isinstance(arg, ast.Constant) and isinstance(arg.value, str):
return arg.value
return "helper"
count = 0
is_helper = False
# NOTE: We exclude print calls for now
if isinstance(call_node.func, ast.Name):
if (
HelperHandlerRegistry.has_handler(call_node.func.id)
and call_node.func.id != "print"
):
is_helper = True
elif isinstance(call_node.func, ast.Attribute):
if HelperHandlerRegistry.has_handler(call_node.func.attr):
is_helper = True
if not is_helper:
return 0
for arg in call_node.args:
# NOTE: Count all non-name arguments
# For struct fields, if it is being passed as an argument,
# The struct object should already exist in the local_sym_tab
if not isinstance(arg, ast.Name) and not (
isinstance(arg, ast.Attribute) and arg.value.id in local_sym_tab
):
count += 1
return count
def handle_if_allocation(
module, builder, stmt, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab
):
"""Recursively handle allocations in if/else branches."""
if stmt.body:
allocate_mem(
module,
builder,
stmt.body,
func,
ret_type,
map_sym_tab,
local_sym_tab,
structs_sym_tab,
)
if stmt.orelse:
allocate_mem(
module,
builder,
stmt.orelse,
func,
ret_type,
map_sym_tab,
local_sym_tab,
structs_sym_tab,
)
def allocate_mem(
module, builder, body, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab
):
max_temps_needed = 0
def update_max_temps_for_stmt(stmt):
nonlocal max_temps_needed
temps_needed = 0
if isinstance(stmt, ast.If):
for s in stmt.body:
update_max_temps_for_stmt(s)
for s in stmt.orelse:
update_max_temps_for_stmt(s)
return
for node in ast.walk(stmt):
if isinstance(node, ast.Call):
temps_needed += count_temps_in_call(node, local_sym_tab)
max_temps_needed = max(max_temps_needed, temps_needed)
for stmt in body:
update_max_temps_for_stmt(stmt)
# Handle allocations
if isinstance(stmt, ast.If):
handle_if_allocation(
module,
builder,
stmt,
func,
ret_type,
map_sym_tab,
local_sym_tab,
structs_sym_tab,
)
elif isinstance(stmt, ast.Assign):
handle_assign_allocation(builder, stmt, local_sym_tab, structs_sym_tab)
allocate_temp_pool(builder, max_temps_needed, local_sym_tab)
return local_sym_tab
# ============================================================================
# SECTION 2: Statement Handlers
# ============================================================================
def handle_assign(
func, module, builder, stmt, map_sym_tab, local_sym_tab, structs_sym_tab
):
"""Handle assignment statements in the function body."""
if len(stmt.targets) != 1:
logger.info("Unsupported multiassignment")
return
num_types = ("c_int32", "c_int64", "c_uint32", "c_uint64")
# TODO: Support this later
# GH #37
if len(stmt.targets) != 1:
logger.error("Multi-target assignment is not supported for now")
return
target = stmt.targets[0]
logger.info(f"Handling assignment to {ast.dump(target)}")
if not isinstance(target, ast.Name) and not isinstance(target, ast.Attribute):
logger.info("Unsupported assignment target")
return
var_name = target.id if isinstance(target, ast.Name) else target.value.id
rval = stmt.value
if isinstance(target, ast.Name):
# NOTE: Simple variable assignment case: x = 5
var_name = target.id
result = handle_variable_assignment(
func,
module,
builder,
var_name,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if not result:
logger.error(f"Failed to handle assignment to {var_name}")
return
if isinstance(target, ast.Attribute):
# struct field assignment
field_name = target.attr
if var_name in local_sym_tab:
struct_type = local_sym_tab[var_name].metadata
struct_info = structs_sym_tab[struct_type]
if field_name in struct_info.fields:
field_ptr = struct_info.gep(
builder, local_sym_tab[var_name].var, field_name
)
val = eval_expr(
func,
module,
builder,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if isinstance(struct_info.field_type(field_name), ir.ArrayType) and val[
1
] == ir.PointerType(ir.IntType(8)):
# TODO: Figure it out, not a priority rn
# Special case for string assignment to char array
# str_len = struct_info["field_types"][field_idx].count
# assign_string_to_array(builder, field_ptr, val[0], str_len)
# print(f"Assigned to struct field {var_name}.{field_name}")
pass
if val is None:
logger.info("Failed to evaluate struct field assignment")
return
logger.info(field_ptr)
builder.store(val[0], field_ptr)
logger.info(f"Assigned to struct field {var_name}.{field_name}")
return
elif isinstance(rval, ast.Constant):
if isinstance(rval.value, bool):
if rval.value:
builder.store(
ir.Constant(ir.IntType(1), 1), local_sym_tab[var_name].var
)
else:
builder.store(
ir.Constant(ir.IntType(1), 0), local_sym_tab[var_name].var
)
logger.info(f"Assigned constant {rval.value} to {var_name}")
elif isinstance(rval.value, int):
# Assume c_int64 for now
# var = builder.alloca(ir.IntType(64), name=var_name)
# var.align = 8
builder.store(
ir.Constant(ir.IntType(64), rval.value), local_sym_tab[var_name].var
)
logger.info(f"Assigned constant {rval.value} to {var_name}")
elif isinstance(rval.value, str):
str_val = rval.value.encode("utf-8") + b"\x00"
str_const = ir.Constant(
ir.ArrayType(ir.IntType(8), len(str_val)), bytearray(str_val)
)
global_str = ir.GlobalVariable(
module, str_const.type, name=f"{var_name}_str"
)
global_str.linkage = "internal"
global_str.global_constant = True
global_str.initializer = str_const
str_ptr = builder.bitcast(global_str, ir.PointerType(ir.IntType(8)))
builder.store(str_ptr, local_sym_tab[var_name].var)
logger.info(f"Assigned string constant '{rval.value}' to {var_name}")
else:
logger.info("Unsupported constant type")
elif isinstance(rval, ast.Call):
if isinstance(rval.func, ast.Name):
call_type = rval.func.id
logger.info(f"Assignment call type: {call_type}")
if (
call_type in num_types
and len(rval.args) == 1
and isinstance(rval.args[0], ast.Constant)
and isinstance(rval.args[0].value, int)
):
ir_type = ctypes_to_ir(call_type)
# var = builder.alloca(ir_type, name=var_name)
# var.align = ir_type.width // 8
builder.store(
ir.Constant(ir_type, rval.args[0].value),
local_sym_tab[var_name].var,
)
logger.info(
f"Assigned {call_type} constant {rval.args[0].value} to {var_name}"
)
elif HelperHandlerRegistry.has_handler(call_type):
# var = builder.alloca(ir.IntType(64), name=var_name)
# var.align = 8
val = handle_helper_call(
rval,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
builder.store(val[0], local_sym_tab[var_name].var)
logger.info(f"Assigned constant {rval.func.id} to {var_name}")
elif call_type == "deref" and len(rval.args) == 1:
logger.info(f"Handling deref assignment {ast.dump(rval)}")
val = eval_expr(
func,
module,
builder,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
if val is None:
logger.info("Failed to evaluate deref argument")
return
logger.info(f"Dereferenced value: {val}, storing in {var_name}")
builder.store(val[0], local_sym_tab[var_name].var)
logger.info(f"Dereferenced and assigned to {var_name}")
elif call_type in structs_sym_tab and len(rval.args) == 0:
struct_info = structs_sym_tab[call_type]
ir_type = struct_info.ir_type
# var = builder.alloca(ir_type, name=var_name)
# Null init
builder.store(ir.Constant(ir_type, None), local_sym_tab[var_name].var)
logger.info(f"Assigned struct {call_type} to {var_name}")
else:
logger.info(f"Unsupported assignment call type: {call_type}")
elif isinstance(rval.func, ast.Attribute):
logger.info(f"Assignment call attribute: {ast.dump(rval.func)}")
if isinstance(rval.func.value, ast.Name):
if rval.func.value.id in map_sym_tab:
map_name = rval.func.value.id
method_name = rval.func.attr
if HelperHandlerRegistry.has_handler(method_name):
val = handle_helper_call(
rval,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
builder.store(val[0], local_sym_tab[var_name].var)
else:
# TODO: probably a struct access
logger.info(f"TODO STRUCT ACCESS {ast.dump(rval)}")
elif isinstance(rval.func.value, ast.Call) and isinstance(
rval.func.value.func, ast.Name
):
map_name = rval.func.value.func.id
method_name = rval.func.attr
if map_name in map_sym_tab:
if HelperHandlerRegistry.has_handler(method_name):
val = handle_helper_call(
rval,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
# var = builder.alloca(ir.IntType(64), name=var_name)
# var.align = 8
builder.store(val[0], local_sym_tab[var_name].var)
else:
logger.info("Unsupported assignment call structure")
else:
logger.info("Unsupported assignment call function type")
elif isinstance(rval, ast.BinOp):
handle_binary_op(rval, builder, var_name, local_sym_tab)
else:
logger.info("Unsupported assignment value type")
# NOTE: Struct field assignment case: pkt.field = value
handle_struct_field_assignment(
func,
module,
builder,
target,
rval,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
return
# Unsupported target type
logger.error(f"Unsupported assignment target: {ast.dump(target)}")
def handle_cond(
@ -300,9 +244,9 @@ def handle_if(
def handle_return(builder, stmt, local_sym_tab, ret_type):
logger.info(f"Handling return statement: {ast.dump(stmt)}")
if stmt.value is None:
return _handle_none_return(builder)
elif isinstance(stmt.value, ast.Name) and _is_xdp_name(stmt.value.id):
return _handle_xdp_return(stmt, builder, ret_type)
return handle_none_return(builder)
elif isinstance(stmt.value, ast.Name) and is_xdp_name(stmt.value.id):
return handle_xdp_return(stmt, builder, ret_type)
else:
val = eval_expr(
func=None,
@ -330,6 +274,7 @@ def process_stmt(
ret_type=ir.IntType(64),
):
logger.info(f"Processing statement: {ast.dump(stmt)}")
reset_scratch_pool()
if isinstance(stmt, ast.Expr):
handle_expr(
func,
@ -360,120 +305,9 @@ def process_stmt(
return did_return
def allocate_mem(
module, builder, body, func, ret_type, map_sym_tab, local_sym_tab, structs_sym_tab
):
for stmt in body:
has_metadata = False
if isinstance(stmt, ast.If):
if stmt.body:
local_sym_tab = allocate_mem(
module,
builder,
stmt.body,
func,
ret_type,
map_sym_tab,
local_sym_tab,
structs_sym_tab,
)
if stmt.orelse:
local_sym_tab = allocate_mem(
module,
builder,
stmt.orelse,
func,
ret_type,
map_sym_tab,
local_sym_tab,
structs_sym_tab,
)
elif isinstance(stmt, ast.Assign):
if len(stmt.targets) != 1:
logger.info("Unsupported multiassignment")
continue
target = stmt.targets[0]
if not isinstance(target, ast.Name):
logger.info("Unsupported assignment target")
continue
var_name = target.id
rval = stmt.value
if var_name in local_sym_tab:
logger.info(f"Variable {var_name} already allocated")
continue
if isinstance(rval, ast.Call):
if isinstance(rval.func, ast.Name):
call_type = rval.func.id
if call_type in ("c_int32", "c_int64", "c_uint32", "c_uint64"):
ir_type = ctypes_to_ir(call_type)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
logger.info(
f"Pre-allocated variable {var_name} of type {call_type}"
)
elif HelperHandlerRegistry.has_handler(call_type):
# Assume return type is int64 for now
ir_type = ir.IntType(64)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
logger.info(f"Pre-allocated variable {var_name} for helper")
elif call_type == "deref" and len(rval.args) == 1:
# Assume return type is int64 for now
ir_type = ir.IntType(64)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
logger.info(f"Pre-allocated variable {var_name} for deref")
elif call_type in structs_sym_tab:
struct_info = structs_sym_tab[call_type]
ir_type = struct_info.ir_type
var = builder.alloca(ir_type, name=var_name)
has_metadata = True
logger.info(
f"Pre-allocated variable {var_name} for struct {call_type}"
)
elif isinstance(rval.func, ast.Attribute):
ir_type = ir.PointerType(ir.IntType(64))
var = builder.alloca(ir_type, name=var_name)
# var.align = ir_type.width // 8
logger.info(f"Pre-allocated variable {var_name} for map")
else:
logger.info("Unsupported assignment call function type")
continue
elif isinstance(rval, ast.Constant):
if isinstance(rval.value, bool):
ir_type = ir.IntType(1)
var = builder.alloca(ir_type, name=var_name)
var.align = 1
logger.info(f"Pre-allocated variable {var_name} of type c_bool")
elif isinstance(rval.value, int):
# Assume c_int64 for now
ir_type = ir.IntType(64)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
logger.info(f"Pre-allocated variable {var_name} of type c_int64")
elif isinstance(rval.value, str):
ir_type = ir.PointerType(ir.IntType(8))
var = builder.alloca(ir_type, name=var_name)
var.align = 8
logger.info(f"Pre-allocated variable {var_name} of type string")
else:
logger.info("Unsupported constant type")
continue
elif isinstance(rval, ast.BinOp):
# Assume c_int64 for now
ir_type = ir.IntType(64)
var = builder.alloca(ir_type, name=var_name)
var.align = ir_type.width // 8
logger.info(f"Pre-allocated variable {var_name} of type c_int64")
else:
logger.info("Unsupported assignment value type")
continue
if has_metadata:
local_sym_tab[var_name] = LocalSymbol(var, ir_type, call_type)
else:
local_sym_tab[var_name] = LocalSymbol(var, ir_type)
return local_sym_tab
# ============================================================================
# SECTION 3: Function Body Processing
# ============================================================================
def process_func_body(
@ -555,18 +389,14 @@ def process_bpf_chunk(func_node, module, return_type, map_sym_tab, structs_sym_t
return func
# ============================================================================
# SECTION 4: Top-Level Function Processor
# ============================================================================
def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
for func_node in chunks:
is_global = False
for decorator in func_node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id in (
"map",
"bpfglobal",
"struct",
):
is_global = True
break
if is_global:
if is_global_function(func_node):
continue
func_type = get_probe_string(func_node)
logger.info(f"Found probe_string of {func_node.name}: {func_type}")
@ -580,67 +410,7 @@ def func_proc(tree, module, chunks, map_sym_tab, structs_sym_tab):
)
def infer_return_type(func_node: ast.FunctionDef):
if not isinstance(func_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise TypeError("Expected ast.FunctionDef")
if func_node.returns is not None:
try:
return ast.unparse(func_node.returns)
except Exception:
node = func_node.returns
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
return getattr(node, "attr", type(node).__name__)
try:
return str(node)
except Exception:
return type(node).__name__
found_type = None
def _expr_type(e):
if e is None:
return "None"
if isinstance(e, ast.Constant):
return type(e.value).__name__
if isinstance(e, ast.Name):
return e.id
if isinstance(e, ast.Call):
f = e.func
if isinstance(f, ast.Name):
return f.id
if isinstance(f, ast.Attribute):
try:
return ast.unparse(f)
except Exception:
return getattr(f, "attr", type(f).__name__)
try:
return ast.unparse(f)
except Exception:
return type(f).__name__
if isinstance(e, ast.Attribute):
try:
return ast.unparse(e)
except Exception:
return getattr(e, "attr", type(e).__name__)
try:
return ast.unparse(e)
except Exception:
return type(e).__name__
for walked_node in ast.walk(func_node):
if isinstance(walked_node, ast.Return):
t = _expr_type(walked_node.value)
if found_type is None:
found_type = t
elif found_type != t:
raise ValueError(f"Conflicting return types: {found_type} vs {t}")
return found_type or "None"
# For string assignment to fixed-size arrays
# TODO: WIP, for string assignment to fixed-size arrays
def assign_string_to_array(builder, target_array_ptr, source_string_ptr, array_length):
"""
Copy a string (i8*) to a fixed-size array ([N x i8]*)

View File

@ -14,19 +14,19 @@ XDP_ACTIONS = {
}
def _handle_none_return(builder) -> bool:
def handle_none_return(builder) -> bool:
"""Handle return or return None -> returns 0."""
builder.ret(ir.Constant(ir.IntType(64), 0))
logger.debug("Generated default return: 0")
return True
def _is_xdp_name(name: str) -> bool:
def is_xdp_name(name: str) -> bool:
"""Check if a name is an XDP action"""
return name in XDP_ACTIONS
def _handle_xdp_return(stmt: ast.Return, builder, ret_type) -> bool:
def handle_xdp_return(stmt: ast.Return, builder, ret_type) -> bool:
"""Handle XDP returns"""
if not isinstance(stmt.value, ast.Name):
return False
@ -37,7 +37,6 @@ def _handle_xdp_return(stmt: ast.Return, builder, ret_type) -> bool:
raise ValueError(
f"Unknown XDP action: {action_name}. Available: {XDP_ACTIONS.keys()}"
)
return False
value = XDP_ACTIONS[action_name]
builder.ret(ir.Constant(ret_type, value))

View File

@ -1,9 +1,63 @@
from .helper_utils import HelperHandlerRegistry
from .helper_registry import HelperHandlerRegistry
from .helper_utils import reset_scratch_pool
from .bpf_helper_handler import handle_helper_call
from .helpers import ktime, pid, deref, XDP_DROP, XDP_PASS
# Register the helper handler with expr module
def _register_helper_handler():
"""Register helper call handler with the expression evaluator"""
from pythonbpf.expr.expr_pass import CallHandlerRegistry
def helper_call_handler(
call, module, builder, func, local_sym_tab, map_sym_tab, structs_sym_tab
):
"""Check if call is a helper and handle it"""
import ast
# Check for direct helper calls (e.g., ktime(), print())
if isinstance(call.func, ast.Name):
if HelperHandlerRegistry.has_handler(call.func.id):
return handle_helper_call(
call,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
# Check for method calls (e.g., map.lookup())
elif isinstance(call.func, ast.Attribute):
method_name = call.func.attr
# Handle: my_map.lookup(key)
if isinstance(call.func.value, ast.Name):
obj_name = call.func.value.id
if map_sym_tab and obj_name in map_sym_tab:
if HelperHandlerRegistry.has_handler(method_name):
return handle_helper_call(
call,
module,
builder,
func,
local_sym_tab,
map_sym_tab,
structs_sym_tab,
)
return None
CallHandlerRegistry.set_handler(helper_call_handler)
# Register on module import
_register_helper_handler()
__all__ = [
"HelperHandlerRegistry",
"reset_scratch_pool",
"handle_helper_call",
"ktime",
"pid",

View File

@ -1,14 +1,15 @@
import ast
from llvmlite import ir
from enum import Enum
from .helper_registry import HelperHandlerRegistry
from .helper_utils import (
HelperHandlerRegistry,
get_or_create_ptr_from_arg,
get_flags_val,
handle_fstring_print,
simple_string_print,
get_data_ptr_and_size,
)
from .printk_formatter import simple_string_print, handle_fstring_print
from logging import Logger
import logging
@ -34,6 +35,7 @@ def bpf_ktime_get_ns_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_ktime_get_ns helper function call.
@ -56,6 +58,7 @@ def bpf_map_lookup_elem_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_map_lookup_elem helper function call.
@ -64,11 +67,17 @@ def bpf_map_lookup_elem_emitter(
raise ValueError(
f"Map lookup expects exactly one argument (key), got {len(call.args)}"
)
key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab)
key_ptr = get_or_create_ptr_from_arg(
func, module, call.args[0], builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
# TODO: I have changed the return type to i64*, as we are
# allocating space for that type in allocate_mem. This is
# temporary, and we will honour other widths later. But this
# allows us to have cool binary ops on the returned value.
fn_type = ir.FunctionType(
ir.PointerType(), # Return type: void*
ir.PointerType(ir.IntType(64)), # Return type: void*
[ir.PointerType(), ir.PointerType()], # Args: (void*, void*)
var_arg=False,
)
@ -91,6 +100,7 @@ def bpf_printk_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""Emit LLVM IR for bpf_printk helper function call."""
if not hasattr(func, "_fmt_counter"):
@ -126,7 +136,7 @@ def bpf_printk_emitter(
fn_ptr = builder.inttoptr(fn_addr, fn_ptr_type)
builder.call(fn_ptr, args, tail=True)
return None
return True
@HelperHandlerRegistry.register("update")
@ -138,6 +148,7 @@ def bpf_map_update_elem_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_map_update_elem helper function call.
@ -152,8 +163,12 @@ def bpf_map_update_elem_emitter(
value_arg = call.args[1]
flags_arg = call.args[2] if len(call.args) > 2 else None
key_ptr = get_or_create_ptr_from_arg(key_arg, builder, local_sym_tab)
value_ptr = get_or_create_ptr_from_arg(value_arg, builder, local_sym_tab)
key_ptr = get_or_create_ptr_from_arg(
func, module, key_arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
value_ptr = get_or_create_ptr_from_arg(
func, module, value_arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
flags_val = get_flags_val(flags_arg, builder, local_sym_tab)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
@ -188,6 +203,7 @@ def bpf_map_delete_elem_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_map_delete_elem helper function call.
@ -197,7 +213,9 @@ def bpf_map_delete_elem_emitter(
raise ValueError(
f"Map delete expects exactly one argument (key), got {len(call.args)}"
)
key_ptr = get_or_create_ptr_from_arg(call.args[0], builder, local_sym_tab)
key_ptr = get_or_create_ptr_from_arg(
func, module, call.args[0], builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
map_void_ptr = builder.bitcast(map_ptr, ir.PointerType())
# Define function type for bpf_map_delete_elem
@ -225,6 +243,7 @@ def bpf_get_current_pid_tgid_emitter(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
"""
Emit LLVM IR for bpf_get_current_pid_tgid helper function call.
@ -251,6 +270,7 @@ def bpf_perf_event_output_handler(
func,
local_sym_tab=None,
struct_sym_tab=None,
map_sym_tab=None,
):
if len(call.args) != 1:
raise ValueError(
@ -315,6 +335,7 @@ def handle_helper_call(
func,
local_sym_tab,
struct_sym_tab,
map_sym_tab,
)
# Handle direct function calls (e.g., print(), ktime())

View File

@ -0,0 +1,27 @@
from typing import Callable
class HelperHandlerRegistry:
"""Registry for BPF helpers"""
_handlers: dict[str, Callable] = {}
@classmethod
def register(cls, helper_name):
"""Decorator to register a handler function for a helper"""
def decorator(func):
cls._handlers[helper_name] = func
return func
return decorator
@classmethod
def get_handler(cls, helper_name):
"""Get the handler function for a helper"""
return cls._handlers.get(helper_name)
@classmethod
def has_handler(cls, helper_name):
"""Check if a handler function is registered for a helper"""
return helper_name in cls._handlers

View File

@ -1,37 +1,52 @@
import ast
import logging
from collections.abc import Callable
from llvmlite import ir
from pythonbpf.expr import eval_expr
from pythonbpf.expr import (
get_operand_value,
)
logger = logging.getLogger(__name__)
class HelperHandlerRegistry:
"""Registry for BPF helpers"""
class ScratchPoolManager:
"""Manage the temporary helper variables in local_sym_tab"""
_handlers: dict[str, Callable] = {}
def __init__(self):
self._counter = 0
@classmethod
def register(cls, helper_name):
"""Decorator to register a handler function for a helper"""
@property
def counter(self):
return self._counter
def decorator(func):
cls._handlers[helper_name] = func
return func
def reset(self):
self._counter = 0
logger.debug("Scratch pool counter reset to 0")
return decorator
def get_next_temp(self, local_sym_tab):
temp_name = f"__helper_temp_{self._counter}"
self._counter += 1
@classmethod
def get_handler(cls, helper_name):
"""Get the handler function for a helper"""
return cls._handlers.get(helper_name)
if temp_name not in local_sym_tab:
raise ValueError(
f"Scratch pool exhausted or inadequate: {temp_name}. "
f"Current counter: {self._counter}"
)
@classmethod
def has_handler(cls, helper_name):
"""Check if a handler function is registered for a helper"""
return helper_name in cls._handlers
return local_sym_tab[temp_name].var, temp_name
_temp_pool_manager = ScratchPoolManager() # Singleton instance
def reset_scratch_pool():
"""Reset the scratch pool counter"""
_temp_pool_manager.reset()
# ============================================================================
# Argument Preparation
# ============================================================================
def get_var_ptr_from_name(var_name, local_sym_tab):
@ -41,27 +56,41 @@ def get_var_ptr_from_name(var_name, local_sym_tab):
raise ValueError(f"Variable '{var_name}' not found in local symbol table")
def create_int_constant_ptr(value, builder, int_width=64):
def create_int_constant_ptr(value, builder, local_sym_tab, int_width=64):
"""Create a pointer to an integer constant."""
# Default to 64-bit integer
int_type = ir.IntType(int_width)
ptr = builder.alloca(int_type)
ptr.align = int_type.width // 8
builder.store(ir.Constant(int_type, value), ptr)
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab)
logger.info(f"Using temp variable '{temp_name}' for int constant {value}")
const_val = ir.Constant(ir.IntType(int_width), value)
builder.store(const_val, ptr)
return ptr
def get_or_create_ptr_from_arg(arg, builder, local_sym_tab):
def get_or_create_ptr_from_arg(
func, module, arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab=None
):
"""Extract or create pointer from the call arguments."""
if isinstance(arg, ast.Name):
ptr = get_var_ptr_from_name(arg.id, local_sym_tab)
elif isinstance(arg, ast.Constant) and isinstance(arg.value, int):
ptr = create_int_constant_ptr(arg.value, builder)
ptr = create_int_constant_ptr(arg.value, builder, local_sym_tab)
else:
raise NotImplementedError(
"Only simple variable names are supported as args in map helpers."
# Evaluate the expression and store the result in a temp variable
val = get_operand_value(
func, module, arg, builder, local_sym_tab, map_sym_tab, struct_sym_tab
)
if val is None:
raise ValueError("Failed to evaluate expression for helper arg.")
# NOTE: We assume the result is an int64 for now
# if isinstance(arg, ast.Attribute):
# return val
ptr, temp_name = _temp_pool_manager.get_next_temp(local_sym_tab)
logger.info(f"Using temp variable '{temp_name}' for expression result")
builder.store(val, ptr)
return ptr
@ -84,204 +113,6 @@ def get_flags_val(arg, builder, local_sym_tab):
)
def simple_string_print(string_value, module, builder, func):
"""Prepare arguments for bpf_printk from a simple string value"""
fmt_str = string_value + "\n\0"
fmt_ptr = _create_format_string_global(fmt_str, func, module, builder)
args = [fmt_ptr, ir.Constant(ir.IntType(32), len(fmt_str))]
return args
def handle_fstring_print(
joined_str,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
):
"""Handle f-string formatting for bpf_printk emitter."""
fmt_parts = []
exprs = []
for value in joined_str.values:
logger.debug(f"Processing f-string value: {ast.dump(value)}")
if isinstance(value, ast.Constant):
_process_constant_in_fstring(value, fmt_parts, exprs)
elif isinstance(value, ast.FormattedValue):
_process_fval(
value,
fmt_parts,
exprs,
local_sym_tab,
struct_sym_tab,
)
else:
raise NotImplementedError(f"Unsupported f-string value type: {type(value)}")
fmt_str = "".join(fmt_parts)
args = simple_string_print(fmt_str, module, builder, func)
# NOTE: Process expressions (limited to 3 due to BPF constraints)
if len(exprs) > 3:
logger.warning("bpf_printk supports up to 3 args, extra args will be ignored.")
for expr in exprs[:3]:
arg_value = _prepare_expr_args(
expr,
func,
module,
builder,
local_sym_tab,
struct_sym_tab,
)
args.append(arg_value)
return args
def _process_constant_in_fstring(cst, fmt_parts, exprs):
"""Process constant values in f-string."""
if isinstance(cst.value, str):
fmt_parts.append(cst.value)
elif isinstance(cst.value, int):
fmt_parts.append("%lld")
exprs.append(ir.Constant(ir.IntType(64), cst.value))
else:
raise NotImplementedError(
f"Unsupported constant type in f-string: {type(cst.value)}"
)
def _process_fval(fval, fmt_parts, exprs, local_sym_tab, struct_sym_tab):
"""Process formatted values in f-string."""
logger.debug(f"Processing formatted value: {ast.dump(fval)}")
if isinstance(fval.value, ast.Name):
_process_name_in_fval(fval.value, fmt_parts, exprs, local_sym_tab)
elif isinstance(fval.value, ast.Attribute):
_process_attr_in_fval(
fval.value,
fmt_parts,
exprs,
local_sym_tab,
struct_sym_tab,
)
else:
raise NotImplementedError(
f"Unsupported formatted value in f-string: {type(fval.value)}"
)
def _process_name_in_fval(name_node, fmt_parts, exprs, local_sym_tab):
"""Process name nodes in formatted values."""
if local_sym_tab and name_node.id in local_sym_tab:
_, var_type, tmp = local_sym_tab[name_node.id]
_populate_fval(var_type, name_node, fmt_parts, exprs)
def _process_attr_in_fval(attr_node, fmt_parts, exprs, local_sym_tab, struct_sym_tab):
"""Process attribute nodes in formatted values."""
if (
isinstance(attr_node.value, ast.Name)
and local_sym_tab
and attr_node.value.id in local_sym_tab
):
var_name = attr_node.value.id
field_name = attr_node.attr
var_type = local_sym_tab[var_name].metadata
if var_type not in struct_sym_tab:
raise ValueError(
f"Struct '{var_type}' for '{var_name}' not in symbol table"
)
struct_info = struct_sym_tab[var_type]
if field_name not in struct_info.fields:
raise ValueError(f"Field '{field_name}' not found in struct '{var_type}'")
field_type = struct_info.field_type(field_name)
_populate_fval(field_type, attr_node, fmt_parts, exprs)
else:
raise NotImplementedError(
"Only simple attribute on local vars is supported in f-strings."
)
def _populate_fval(ftype, node, fmt_parts, exprs):
"""Populate format parts and expressions based on field type."""
if isinstance(ftype, ir.IntType):
# TODO: We print as signed integers only for now
if ftype.width == 64:
fmt_parts.append("%lld")
exprs.append(node)
elif ftype.width == 32:
fmt_parts.append("%d")
exprs.append(node)
else:
raise NotImplementedError(
f"Unsupported integer width in f-string: {ftype.width}"
)
elif ftype == ir.PointerType(ir.IntType(8)):
# NOTE: We assume i8* is a string
fmt_parts.append("%s")
exprs.append(node)
else:
raise NotImplementedError(f"Unsupported field type in f-string: {ftype}")
def _create_format_string_global(fmt_str, func, module, builder):
"""Create a global variable for the format string."""
fmt_name = f"{func.name}____fmt{func._fmt_counter}"
func._fmt_counter += 1
fmt_gvar = ir.GlobalVariable(
module, ir.ArrayType(ir.IntType(8), len(fmt_str)), name=fmt_name
)
fmt_gvar.global_constant = True
fmt_gvar.initializer = ir.Constant(
ir.ArrayType(ir.IntType(8), len(fmt_str)), bytearray(fmt_str.encode("utf8"))
)
fmt_gvar.linkage = "internal"
fmt_gvar.align = 1
return builder.bitcast(fmt_gvar, ir.PointerType())
def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_tab):
"""Evaluate and prepare an expression to use as an arg for bpf_printk."""
val, _ = eval_expr(
func,
module,
builder,
expr,
local_sym_tab,
None,
struct_sym_tab,
)
if val:
if isinstance(val.type, ir.PointerType):
val = builder.ptrtoint(val, ir.IntType(64))
elif isinstance(val.type, ir.IntType):
if val.type.width < 64:
val = builder.sext(val, ir.IntType(64))
else:
logger.warning(
"Only int and ptr supported in bpf_printk args. Others default to 0."
)
val = ir.Constant(ir.IntType(64), 0)
return val
else:
logger.warning(
"Failed to evaluate expression for bpf_printk argument. "
"It will be converted to 0."
)
return ir.Constant(ir.IntType(64), 0)
def get_data_ptr_and_size(data_arg, local_sym_tab, struct_sym_tab):
"""Extract data pointer and size information for perf event output."""
if isinstance(data_arg, ast.Name):

View File

@ -0,0 +1,240 @@
import ast
import logging
from llvmlite import ir
from pythonbpf.expr import eval_expr, get_base_type_and_depth, deref_to_depth
logger = logging.getLogger(__name__)
def simple_string_print(string_value, module, builder, func):
"""Prepare arguments for bpf_printk from a simple string value"""
fmt_str = string_value + "\n\0"
fmt_ptr = _create_format_string_global(fmt_str, func, module, builder)
args = [fmt_ptr, ir.Constant(ir.IntType(32), len(fmt_str))]
return args
def handle_fstring_print(
joined_str,
module,
builder,
func,
local_sym_tab=None,
struct_sym_tab=None,
):
"""Handle f-string formatting for bpf_printk emitter."""
fmt_parts = []
exprs = []
for value in joined_str.values:
logger.debug(f"Processing f-string value: {ast.dump(value)}")
if isinstance(value, ast.Constant):
_process_constant_in_fstring(value, fmt_parts, exprs)
elif isinstance(value, ast.FormattedValue):
_process_fval(
value,
fmt_parts,
exprs,
local_sym_tab,
struct_sym_tab,
)
else:
raise NotImplementedError(f"Unsupported f-string value type: {type(value)}")
fmt_str = "".join(fmt_parts)
args = simple_string_print(fmt_str, module, builder, func)
# NOTE: Process expressions (limited to 3 due to BPF constraints)
if len(exprs) > 3:
logger.warning("bpf_printk supports up to 3 args, extra args will be ignored.")
for expr in exprs[:3]:
arg_value = _prepare_expr_args(
expr,
func,
module,
builder,
local_sym_tab,
struct_sym_tab,
)
args.append(arg_value)
return args
# ============================================================================
# Internal Helpers
# ============================================================================
def _process_constant_in_fstring(cst, fmt_parts, exprs):
"""Process constant values in f-string."""
if isinstance(cst.value, str):
fmt_parts.append(cst.value)
elif isinstance(cst.value, int):
fmt_parts.append("%lld")
exprs.append(ir.Constant(ir.IntType(64), cst.value))
else:
raise NotImplementedError(
f"Unsupported constant type in f-string: {type(cst.value)}"
)
def _process_fval(fval, fmt_parts, exprs, local_sym_tab, struct_sym_tab):
"""Process formatted values in f-string."""
logger.debug(f"Processing formatted value: {ast.dump(fval)}")
if isinstance(fval.value, ast.Name):
_process_name_in_fval(fval.value, fmt_parts, exprs, local_sym_tab)
elif isinstance(fval.value, ast.Attribute):
_process_attr_in_fval(
fval.value,
fmt_parts,
exprs,
local_sym_tab,
struct_sym_tab,
)
else:
raise NotImplementedError(
f"Unsupported formatted value in f-string: {type(fval.value)}"
)
def _process_name_in_fval(name_node, fmt_parts, exprs, local_sym_tab):
"""Process name nodes in formatted values."""
if local_sym_tab and name_node.id in local_sym_tab:
_, var_type, tmp = local_sym_tab[name_node.id]
_populate_fval(var_type, name_node, fmt_parts, exprs)
def _process_attr_in_fval(attr_node, fmt_parts, exprs, local_sym_tab, struct_sym_tab):
"""Process attribute nodes in formatted values."""
if (
isinstance(attr_node.value, ast.Name)
and local_sym_tab
and attr_node.value.id in local_sym_tab
):
var_name = attr_node.value.id
field_name = attr_node.attr
var_type = local_sym_tab[var_name].metadata
if var_type not in struct_sym_tab:
raise ValueError(
f"Struct '{var_type}' for '{var_name}' not in symbol table"
)
struct_info = struct_sym_tab[var_type]
if field_name not in struct_info.fields:
raise ValueError(f"Field '{field_name}' not found in struct '{var_type}'")
field_type = struct_info.field_type(field_name)
_populate_fval(field_type, attr_node, fmt_parts, exprs)
else:
raise NotImplementedError(
"Only simple attribute on local vars is supported in f-strings."
)
def _populate_fval(ftype, node, fmt_parts, exprs):
"""Populate format parts and expressions based on field type."""
if isinstance(ftype, ir.IntType):
# TODO: We print as signed integers only for now
if ftype.width == 64:
fmt_parts.append("%lld")
exprs.append(node)
elif ftype.width == 32:
fmt_parts.append("%d")
exprs.append(node)
else:
raise NotImplementedError(
f"Unsupported integer width in f-string: {ftype.width}"
)
elif isinstance(ftype, ir.PointerType):
target, depth = get_base_type_and_depth(ftype)
if isinstance(target, ir.IntType):
if target.width == 64:
fmt_parts.append("%lld")
exprs.append(node)
elif target.width == 32:
fmt_parts.append("%d")
exprs.append(node)
elif target.width == 8 and depth == 1:
# NOTE: Assume i8* is a string
fmt_parts.append("%s")
exprs.append(node)
else:
raise NotImplementedError(
f"Unsupported pointer target type in f-string: {target}"
)
else:
raise NotImplementedError(
f"Unsupported pointer target type in f-string: {target}"
)
else:
raise NotImplementedError(f"Unsupported field type in f-string: {ftype}")
def _create_format_string_global(fmt_str, func, module, builder):
"""Create a global variable for the format string."""
fmt_name = f"{func.name}____fmt{func._fmt_counter}"
func._fmt_counter += 1
fmt_gvar = ir.GlobalVariable(
module, ir.ArrayType(ir.IntType(8), len(fmt_str)), name=fmt_name
)
fmt_gvar.global_constant = True
fmt_gvar.initializer = ir.Constant(
ir.ArrayType(ir.IntType(8), len(fmt_str)), bytearray(fmt_str.encode("utf8"))
)
fmt_gvar.linkage = "internal"
fmt_gvar.align = 1
return builder.bitcast(fmt_gvar, ir.PointerType())
def _prepare_expr_args(expr, func, module, builder, local_sym_tab, struct_sym_tab):
"""Evaluate and prepare an expression to use as an arg for bpf_printk."""
val, _ = eval_expr(
func,
module,
builder,
expr,
local_sym_tab,
None,
struct_sym_tab,
)
if val:
if isinstance(val.type, ir.PointerType):
target, depth = get_base_type_and_depth(val.type)
if isinstance(target, ir.IntType):
if target.width >= 32:
val = deref_to_depth(func, builder, val, depth)
val = builder.sext(val, ir.IntType(64))
elif target.width == 8 and depth == 1:
# NOTE: i8* is string, no need to deref
pass
else:
logger.warning(
"Only int and ptr supported in bpf_printk args. Others default to 0."
)
val = ir.Constant(ir.IntType(64), 0)
elif isinstance(val.type, ir.IntType):
if val.type.width < 64:
val = builder.sext(val, ir.IntType(64))
else:
logger.warning(
"Only int and ptr supported in bpf_printk args. Others default to 0."
)
val = ir.Constant(ir.IntType(64), 0)
return val
else:
logger.warning(
"Failed to evaluate expression for bpf_printk argument. "
"It will be converted to 0."
)
return ir.Constant(ir.IntType(64), 0)

View File

@ -0,0 +1,93 @@
from pythonbpf.debuginfo import DebugInfoGenerator
from .map_types import BPFMapType
def create_map_debug_info(module, map_global, map_name, map_params):
"""Generate debug info metadata for BPF maps HASH and PERF_EVENT_ARRAY"""
generator = DebugInfoGenerator(module)
uint_type = generator.get_uint32_type()
ulong_type = generator.get_uint64_type()
array_type = generator.create_array_type(
uint_type, map_params.get("type", BPFMapType.UNSPEC).value
)
type_ptr = generator.create_pointer_type(array_type, 64)
key_ptr = generator.create_pointer_type(
array_type if "key_size" in map_params else ulong_type, 64
)
value_ptr = generator.create_pointer_type(
array_type if "value_size" in map_params else ulong_type, 64
)
elements_arr = []
# Create struct members
# scope field does not appear for some reason
cnt = 0
for elem in map_params:
if elem == "max_entries":
continue
if elem == "type":
ptr = type_ptr
elif "key" in elem:
ptr = key_ptr
else:
ptr = value_ptr
# TODO: the best way to do this is not 64, but get the size each time. this will not work for structs.
member = generator.create_struct_member(elem, ptr, cnt * 64)
elements_arr.append(member)
cnt += 1
if "max_entries" in map_params:
max_entries_array = generator.create_array_type(
uint_type, map_params["max_entries"]
)
max_entries_ptr = generator.create_pointer_type(max_entries_array, 64)
max_entries_member = generator.create_struct_member(
"max_entries", max_entries_ptr, cnt * 64
)
elements_arr.append(max_entries_member)
# Create the struct type
struct_type = generator.create_struct_type(
elements_arr, 64 * len(elements_arr), is_distinct=True
)
# Create global variable debug info
global_var = generator.create_global_var_debug_info(
map_name, struct_type, is_local=False
)
# Attach debug info to the global variable
map_global.set_metadata("dbg", global_var)
return global_var
def create_ringbuf_debug_info(module, map_global, map_name, map_params):
"""Generate debug information metadata for BPF RINGBUF map"""
generator = DebugInfoGenerator(module)
int_type = generator.get_int32_type()
type_array = generator.create_array_type(
int_type, map_params.get("type", BPFMapType.RINGBUF).value
)
type_ptr = generator.create_pointer_type(type_array, 64)
type_member = generator.create_struct_member("type", type_ptr, 0)
max_entries_array = generator.create_array_type(int_type, map_params["max_entries"])
max_entries_ptr = generator.create_pointer_type(max_entries_array, 64)
max_entries_member = generator.create_struct_member(
"max_entries", max_entries_ptr, 64
)
elements_arr = [type_member, max_entries_member]
struct_type = generator.create_struct_type(elements_arr, 128, is_distinct=True)
global_var = generator.create_global_var_debug_info(
map_name, struct_type, is_local=False
)
map_global.set_metadata("dbg", global_var)
return global_var

View File

@ -0,0 +1,39 @@
from enum import Enum
class BPFMapType(Enum):
UNSPEC = 0
HASH = 1
ARRAY = 2
PROG_ARRAY = 3
PERF_EVENT_ARRAY = 4
PERCPU_HASH = 5
PERCPU_ARRAY = 6
STACK_TRACE = 7
CGROUP_ARRAY = 8
LRU_HASH = 9
LRU_PERCPU_HASH = 10
LPM_TRIE = 11
ARRAY_OF_MAPS = 12
HASH_OF_MAPS = 13
DEVMAP = 14
SOCKMAP = 15
CPUMAP = 16
XSKMAP = 17
SOCKHASH = 18
CGROUP_STORAGE_DEPRECATED = 19
CGROUP_STORAGE = 19
REUSEPORT_SOCKARRAY = 20
PERCPU_CGROUP_STORAGE_DEPRECATED = 21
PERCPU_CGROUP_STORAGE = 21
QUEUE = 22
STACK = 23
SK_STORAGE = 24
DEVMAP_HASH = 25
STRUCT_OPS = 26
RINGBUF = 27
INODE_STORAGE = 28
TASK_STORAGE = 29
BLOOM_FILTER = 30
USER_RINGBUF = 31
CGRP_STORAGE = 32

View File

@ -1,10 +1,11 @@
import ast
import logging
from logging import Logger
from llvmlite import ir
from enum import Enum
from .maps_utils import MapProcessorRegistry
from pythonbpf.debuginfo import DebugInfoGenerator
import logging
from .map_types import BPFMapType
from .map_debug_info import create_map_debug_info, create_ringbuf_debug_info
logger: Logger = logging.getLogger(__name__)
@ -26,44 +27,6 @@ def is_map(func_node):
)
class BPFMapType(Enum):
UNSPEC = 0
HASH = 1
ARRAY = 2
PROG_ARRAY = 3
PERF_EVENT_ARRAY = 4
PERCPU_HASH = 5
PERCPU_ARRAY = 6
STACK_TRACE = 7
CGROUP_ARRAY = 8
LRU_HASH = 9
LRU_PERCPU_HASH = 10
LPM_TRIE = 11
ARRAY_OF_MAPS = 12
HASH_OF_MAPS = 13
DEVMAP = 14
SOCKMAP = 15
CPUMAP = 16
XSKMAP = 17
SOCKHASH = 18
CGROUP_STORAGE_DEPRECATED = 19
CGROUP_STORAGE = 19
REUSEPORT_SOCKARRAY = 20
PERCPU_CGROUP_STORAGE_DEPRECATED = 21
PERCPU_CGROUP_STORAGE = 21
QUEUE = 22
STACK = 23
SK_STORAGE = 24
DEVMAP_HASH = 25
STRUCT_OPS = 26
RINGBUF = 27
INODE_STORAGE = 28
TASK_STORAGE = 29
BLOOM_FILTER = 30
USER_RINGBUF = 31
CGRP_STORAGE = 32
def create_bpf_map(module, map_name, map_params):
"""Create a BPF map in the module with given parameters and debug info"""
@ -84,114 +47,37 @@ def create_bpf_map(module, map_name, map_params):
return map_global
def create_map_debug_info(module, map_global, map_name, map_params):
"""Generate debug info metadata for BPF maps HASH and PERF_EVENT_ARRAY"""
generator = DebugInfoGenerator(module)
def _parse_map_params(rval, expected_args=None):
"""Parse map parameters from call arguments and keywords."""
uint_type = generator.get_uint32_type()
ulong_type = generator.get_uint64_type()
array_type = generator.create_array_type(
uint_type, map_params.get("type", BPFMapType.UNSPEC).value
)
type_ptr = generator.create_pointer_type(array_type, 64)
key_ptr = generator.create_pointer_type(
array_type if "key_size" in map_params else ulong_type, 64
)
value_ptr = generator.create_pointer_type(
array_type if "value_size" in map_params else ulong_type, 64
)
params = {}
elements_arr = []
# Parse positional arguments
if expected_args:
for i, arg_name in enumerate(expected_args):
if i < len(rval.args):
arg = rval.args[i]
if isinstance(arg, ast.Name):
params[arg_name] = arg.id
elif isinstance(arg, ast.Constant):
params[arg_name] = arg.value
# Create struct members
# scope field does not appear for some reason
cnt = 0
for elem in map_params:
if elem == "max_entries":
continue
if elem == "type":
ptr = type_ptr
elif "key" in elem:
ptr = key_ptr
else:
ptr = value_ptr
# TODO: the best way to do this is not 64, but get the size each time. this will not work for structs.
member = generator.create_struct_member(elem, ptr, cnt * 64)
elements_arr.append(member)
cnt += 1
# Parse keyword arguments (override positional)
for keyword in rval.keywords:
if isinstance(keyword.value, ast.Name):
params[keyword.arg] = keyword.value.id
elif isinstance(keyword.value, ast.Constant):
params[keyword.arg] = keyword.value.value
if "max_entries" in map_params:
max_entries_array = generator.create_array_type(
uint_type, map_params["max_entries"]
)
max_entries_ptr = generator.create_pointer_type(max_entries_array, 64)
max_entries_member = generator.create_struct_member(
"max_entries", max_entries_ptr, cnt * 64
)
elements_arr.append(max_entries_member)
# Create the struct type
struct_type = generator.create_struct_type(
elements_arr, 64 * len(elements_arr), is_distinct=True
)
# Create global variable debug info
global_var = generator.create_global_var_debug_info(
map_name, struct_type, is_local=False
)
# Attach debug info to the global variable
map_global.set_metadata("dbg", global_var)
return global_var
def create_ringbuf_debug_info(module, map_global, map_name, map_params):
"""Generate debug information metadata for BPF RINGBUF map"""
generator = DebugInfoGenerator(module)
int_type = generator.get_int32_type()
type_array = generator.create_array_type(
int_type, map_params.get("type", BPFMapType.RINGBUF).value
)
type_ptr = generator.create_pointer_type(type_array, 64)
type_member = generator.create_struct_member("type", type_ptr, 0)
max_entries_array = generator.create_array_type(int_type, map_params["max_entries"])
max_entries_ptr = generator.create_pointer_type(max_entries_array, 64)
max_entries_member = generator.create_struct_member(
"max_entries", max_entries_ptr, 64
)
elements_arr = [type_member, max_entries_member]
struct_type = generator.create_struct_type(elements_arr, 128, is_distinct=True)
global_var = generator.create_global_var_debug_info(
map_name, struct_type, is_local=False
)
map_global.set_metadata("dbg", global_var)
return global_var
return params
@MapProcessorRegistry.register("RingBuf")
def process_ringbuf_map(map_name, rval, module):
"""Process a BPF_RINGBUF map declaration"""
logger.info(f"Processing Ringbuf: {map_name}")
map_params = {"type": BPFMapType.RINGBUF}
# Parse max_entries if present
if len(rval.args) >= 1 and isinstance(rval.args[0], ast.Constant):
const_val = rval.args[0].value
if isinstance(const_val, int):
map_params["max_entries"] = const_val
for keyword in rval.keywords:
if keyword.arg == "max_entries" and isinstance(keyword.value, ast.Constant):
const_val = keyword.value.value
if isinstance(const_val, int):
map_params["max_entries"] = const_val
map_params = _parse_map_params(rval, expected_args=["max_entries"])
map_params["type"] = BPFMapType.RINGBUF
logger.info(f"Ringbuf map parameters: {map_params}")
@ -204,27 +90,8 @@ def process_ringbuf_map(map_name, rval, module):
def process_hash_map(map_name, rval, module):
"""Process a BPF_HASH map declaration"""
logger.info(f"Processing HashMap: {map_name}")
map_params = {"type": BPFMapType.HASH}
# Assuming order: key_type, value_type, max_entries
if len(rval.args) >= 1 and isinstance(rval.args[0], ast.Name):
map_params["key"] = rval.args[0].id
if len(rval.args) >= 2 and isinstance(rval.args[1], ast.Name):
map_params["value"] = rval.args[1].id
if len(rval.args) >= 3 and isinstance(rval.args[2], ast.Constant):
const_val = rval.args[2].value
if isinstance(const_val, (int, str)): # safe check
map_params["max_entries"] = const_val
for keyword in rval.keywords:
if keyword.arg == "key" and isinstance(keyword.value, ast.Name):
map_params["key"] = keyword.value.id
elif keyword.arg == "value" and isinstance(keyword.value, ast.Name):
map_params["value"] = keyword.value.id
elif keyword.arg == "max_entries" and isinstance(keyword.value, ast.Constant):
const_val = keyword.value.value
if isinstance(const_val, (int, str)):
map_params["max_entries"] = const_val
map_params = _parse_map_params(rval, expected_args=["key", "value", "max_entries"])
map_params["type"] = BPFMapType.HASH
logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(module, map_name, map_params)
@ -237,18 +104,8 @@ def process_hash_map(map_name, rval, module):
def process_perf_event_map(map_name, rval, module):
"""Process a BPF_PERF_EVENT_ARRAY map declaration"""
logger.info(f"Processing PerfEventArray: {map_name}")
map_params = {"type": BPFMapType.PERF_EVENT_ARRAY}
if len(rval.args) >= 1 and isinstance(rval.args[0], ast.Name):
map_params["key_size"] = rval.args[0].id
if len(rval.args) >= 2 and isinstance(rval.args[1], ast.Name):
map_params["value_size"] = rval.args[1].id
for keyword in rval.keywords:
if keyword.arg == "key_size" and isinstance(keyword.value, ast.Name):
map_params["key_size"] = keyword.value.id
elif keyword.arg == "value_size" and isinstance(keyword.value, ast.Name):
map_params["value_size"] = keyword.value.id
map_params = _parse_map_params(rval, expected_args=["key_size", "value_size"])
map_params["type"] = BPFMapType.PERF_EVENT_ARRAY
logger.info(f"Map parameters: {map_params}")
map_global = create_bpf_map(module, map_name, map_params)

View File

@ -4,7 +4,7 @@ import importlib
from .dependency_handler import DependencyHandler
from .dependency_node import DependencyNode
import ctypes
from typing import Optional, Any
from typing import Optional, Any, Dict
logger = logging.getLogger(__name__)
@ -33,20 +33,24 @@ def process_vmlinux_post_ast(
symbols_in_module, imported_module = get_module_symbols("vmlinux")
current_symbol_name = elem_type_class.__name__
field_table = {}
logger.info(f"Begin {current_symbol_name} Processing")
field_table: Dict[str, list] = {}
is_complex_type = False
containing_type: Optional[Any] = None
ctype_complex_type: Optional[Any] = None
type_length: Optional[int] = None
module_name = getattr(elem_type_class, "__module__", None)
if current_symbol_name in processing_stack:
logger.info(f"Circular dependency detected for {current_symbol_name}, skipping")
return True
# Check if already processed
if handler.has_node(current_symbol_name):
logger.info(f"Node {current_symbol_name} already processed and ready")
logger.debug(f"Node {current_symbol_name} already processed and ready")
return True
# XXX:Check it's use. It's probably not being used.
if current_symbol_name in processing_stack:
logger.debug(
f"Dependency already in processing stack for {current_symbol_name}, skipping"
)
return True
processing_stack.add(current_symbol_name)
@ -56,14 +60,18 @@ def process_vmlinux_post_ast(
pass
else:
new_dep_node = DependencyNode(name=current_symbol_name)
# elem_type_class is the actual vmlinux struct/class
new_dep_node.set_ctype_struct(elem_type_class)
handler.add_node(new_dep_node)
class_obj = getattr(imported_module, current_symbol_name)
# Inspect the class fields
if hasattr(class_obj, "_fields_"):
for field_elem in class_obj._fields_:
field_name = None
field_type = None
bitfield_size = None
field_name: str = ""
field_type: Optional[Any] = None
bitfield_size: Optional[int] = None
if len(field_elem) == 2:
field_name, field_type = field_elem
elif len(field_elem) == 3:
@ -71,13 +79,15 @@ def process_vmlinux_post_ast(
field_table[field_name] = [field_type, bitfield_size]
elif hasattr(class_obj, "__annotations__"):
for field_elem in class_obj.__annotations__.items():
field_name = None
field_type = None
bitfield_size = None
if len(field_elem) == 2:
field_name, field_type = field_elem
bitfield_size = None
elif len(field_elem) == 3:
field_name, field_type, bitfield_size = field_elem
else:
raise ValueError(
"Number of fields in items() of class object unexpected"
)
field_table[field_name] = [field_type, bitfield_size]
else:
raise TypeError("Could not get required class and definition")
@ -87,13 +97,14 @@ def process_vmlinux_post_ast(
elem_name, elem_temp_list = elem
[elem_type, elem_bitfield_size] = elem_temp_list
local_module_name = getattr(elem_type, "__module__", None)
new_dep_node.add_field(elem_name, elem_type, ready=False)
if local_module_name == ctypes.__name__:
new_dep_node.add_field(elem_name, elem_type, ready=False)
new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size)
new_dep_node.set_field_ready(elem_name, is_ready=True)
logger.info(f"Field {elem_name} is direct ctypes type: {elem_type}")
logger.debug(
f"Field {elem_name} is direct ctypes type: {elem_type}"
)
elif local_module_name == "vmlinux":
new_dep_node.add_field(elem_name, elem_type, ready=False)
new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size)
logger.debug(
f"Processing vmlinux field: {elem_name}, type: {elem_type}"
@ -103,8 +114,13 @@ def process_vmlinux_post_ast(
containing_type = elem_type._type_
if hasattr(elem_type, "_length_") and is_complex_type:
type_length = elem_type._length_
if containing_type.__module__ == "vmlinux":
pass
new_dep_node.add_dependent(
elem_type._type_.__name__
if hasattr(elem_type._type_, "__name__")
else str(elem_type._type_)
)
elif containing_type.__module__ == ctypes.__name__:
if isinstance(elem_type, type):
if issubclass(elem_type, ctypes.Array):
@ -117,7 +133,7 @@ def process_vmlinux_post_ast(
raise ImportError(
f"Unsupported module of {containing_type}"
)
logger.info(
logger.debug(
f"{containing_type} containing type of parent {elem_name} with {elem_type} and ctype {ctype_complex_type} and length {type_length}"
)
new_dep_node.set_field_containing_type(
@ -129,20 +145,51 @@ def process_vmlinux_post_ast(
)
new_dep_node.set_field_type(elem_name, elem_type)
if containing_type.__module__ == "vmlinux":
if process_vmlinux_post_ast(
containing_type, llvm_handler, handler, processing_stack
):
containing_type_name = (
containing_type.__name__
if hasattr(containing_type, "__name__")
else str(containing_type)
)
# Check for self-reference or already processed
if containing_type_name == current_symbol_name:
# Self-referential pointer
logger.debug(
f"Self-referential pointer in {current_symbol_name}.{elem_name}"
)
new_dep_node.set_field_ready(elem_name, True)
elif handler.has_node(containing_type_name):
# Already processed
logger.debug(
f"Reusing already processed {containing_type_name}"
)
new_dep_node.set_field_ready(elem_name, True)
else:
# Process recursively - THIS WAS MISSING
new_dep_node.add_dependent(containing_type_name)
process_vmlinux_post_ast(
containing_type,
llvm_handler,
handler,
processing_stack,
)
new_dep_node.set_field_ready(elem_name, True)
elif containing_type.__module__ == ctypes.__name__:
logger.info(f"Processing ctype internal{containing_type}")
logger.debug(f"Processing ctype internal{containing_type}")
new_dep_node.set_field_ready(elem_name, True)
else:
raise TypeError(
"Module not supported in recursive resolution"
)
continue
if process_vmlinux_post_ast(
elem_type, llvm_handler, handler, processing_stack
):
else:
new_dep_node.add_dependent(
elem_type.__name__
if hasattr(elem_type, "__name__")
else str(elem_type)
)
process_vmlinux_post_ast(
elem_type, llvm_handler, handler, processing_stack
)
new_dep_node.set_field_ready(elem_name, True)
else:
raise ValueError(
@ -152,5 +199,7 @@ def process_vmlinux_post_ast(
else:
raise ImportError("UNSUPPORTED Module")
print(current_symbol_name, "DONE")
print(f"handler readiness {handler.is_ready}")
logging.info(
f"{current_symbol_name} processed and handler readiness {handler.is_ready}"
)
return True

View File

@ -147,3 +147,27 @@ class DependencyHandler:
int: The number of nodes
"""
return len(self._nodes)
def __getitem__(self, name: str) -> DependencyNode:
"""
Get a node by name using dictionary-style access.
Args:
name: The name of the node to retrieve
Returns:
DependencyNode: The node with the given name
Raises:
KeyError: If no node with the given name exists
Example:
node = handler["some-dep_node_name"]
"""
if name not in self._nodes:
raise KeyError(f"No node with name '{name}' found")
return self._nodes[name]
@property
def nodes(self):
return self._nodes

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
import ctypes
# TODO: FIX THE FUCKING TYPE NAME CONVENTION.
@ -13,6 +14,7 @@ class Field:
containing_type: Optional[Any]
type_size: Optional[int]
bitfield_size: Optional[int]
offset: int
value: Any = None
ready: bool = False
@ -20,46 +22,50 @@ class Field:
"""Set the readiness state of this field."""
self.ready = is_ready
def set_value(self, value: Any, mark_ready: bool = True) -> None:
def set_value(self, value: Any, mark_ready: bool = False) -> None:
"""Set the value of this field and optionally mark it as ready."""
self.value = value
if mark_ready:
self.ready = True
def set_type(self, given_type, mark_ready: bool = True) -> None:
def set_type(self, given_type, mark_ready: bool = False) -> None:
"""Set value of the type field and mark as ready"""
self.type = given_type
if mark_ready:
self.ready = True
def set_containing_type(
self, containing_type: Optional[Any], mark_ready: bool = True
self, containing_type: Optional[Any], mark_ready: bool = False
) -> None:
"""Set the containing_type of this field and optionally mark it as ready."""
self.containing_type = containing_type
if mark_ready:
self.ready = True
def set_type_size(self, type_size: Any, mark_ready: bool = True) -> None:
def set_type_size(self, type_size: Any, mark_ready: bool = False) -> None:
"""Set the type_size of this field and optionally mark it as ready."""
self.type_size = type_size
if mark_ready:
self.ready = True
def set_ctype_complex_type(
self, ctype_complex_type: Any, mark_ready: bool = True
self, ctype_complex_type: Any, mark_ready: bool = False
) -> None:
"""Set the ctype_complex_type of this field and optionally mark it as ready."""
self.ctype_complex_type = ctype_complex_type
if mark_ready:
self.ready = True
def set_bitfield_size(self, bitfield_size: Any, mark_ready: bool = True) -> None:
def set_bitfield_size(self, bitfield_size: Any, mark_ready: bool = False) -> None:
"""Set the bitfield_size of this field and optionally mark it as ready."""
self.bitfield_size = bitfield_size
if mark_ready:
self.ready = True
def set_offset(self, offset: int) -> None:
"""Set the offset of this field"""
self.offset = offset
@dataclass
class DependencyNode:
@ -106,8 +112,11 @@ class DependencyNode:
"""
name: str
depends_on: Optional[list[str]] = None
fields: Dict[str, Field] = field(default_factory=dict)
_ready_cache: Optional[bool] = field(default=None, repr=False)
current_offset: int = 0
ctype_struct: Optional[Any] = field(default=None, repr=False)
def add_field(
self,
@ -119,8 +128,11 @@ class DependencyNode:
ctype_complex_type: Optional[int] = None,
bitfield_size: Optional[int] = None,
ready: bool = False,
offset: int = 0,
) -> None:
"""Add a field to the node with an optional initial value and readiness state."""
if self.depends_on is None:
self.depends_on = []
self.fields[name] = Field(
name=name,
type=field_type,
@ -130,15 +142,26 @@ class DependencyNode:
type_size=type_size,
ctype_complex_type=ctype_complex_type,
bitfield_size=bitfield_size,
offset=offset,
)
# Invalidate readiness cache
self._ready_cache = None
def set_ctype_struct(self, ctype_struct: Any) -> None:
"""Set the ctypes structure for automatic offset calculation."""
self.ctype_struct = ctype_struct
def __sizeof__(self):
# If we have a ctype_struct, use its size
if self.ctype_struct is not None:
return ctypes.sizeof(self.ctype_struct)
return self.current_offset
def get_field(self, name: str) -> Field:
"""Get a field by name."""
return self.fields[name]
def set_field_value(self, name: str, value: Any, mark_ready: bool = True) -> None:
def set_field_value(self, name: str, value: Any, mark_ready: bool = False) -> None:
"""Set a field's value and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
@ -147,7 +170,7 @@ class DependencyNode:
# Invalidate readiness cache
self._ready_cache = None
def set_field_type(self, name: str, type: Any, mark_ready: bool = True) -> None:
def set_field_type(self, name: str, type: Any, mark_ready: bool = False) -> None:
"""Set a field's type and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
@ -157,7 +180,7 @@ class DependencyNode:
self._ready_cache = None
def set_field_containing_type(
self, name: str, containing_type: Any, mark_ready: bool = True
self, name: str, containing_type: Any, mark_ready: bool = False
) -> None:
"""Set a field's containing_type and optionally mark it as ready."""
if name not in self.fields:
@ -168,7 +191,7 @@ class DependencyNode:
self._ready_cache = None
def set_field_type_size(
self, name: str, type_size: Any, mark_ready: bool = True
self, name: str, type_size: Any, mark_ready: bool = False
) -> None:
"""Set a field's type_size and optionally mark it as ready."""
if name not in self.fields:
@ -179,7 +202,7 @@ class DependencyNode:
self._ready_cache = None
def set_field_ctype_complex_type(
self, name: str, ctype_complex_type: Any, mark_ready: bool = True
self, name: str, ctype_complex_type: Any, mark_ready: bool = False
) -> None:
"""Set a field's ctype_complex_type and optionally mark it as ready."""
if name not in self.fields:
@ -190,7 +213,7 @@ class DependencyNode:
self._ready_cache = None
def set_field_bitfield_size(
self, name: str, bitfield_size: Any, mark_ready: bool = True
self, name: str, bitfield_size: Any, mark_ready: bool = False
) -> None:
"""Set a field's bitfield_size and optionally mark it as ready."""
if name not in self.fields:
@ -200,15 +223,112 @@ class DependencyNode:
# Invalidate readiness cache
self._ready_cache = None
def set_field_ready(self, name: str, is_ready: bool = True) -> None:
def set_field_ready(
self,
name: str,
is_ready: bool = False,
size_of_containing_type: Optional[int] = None,
) -> None:
"""Mark a field as ready or not ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_ready(is_ready)
# Use ctypes built-in offset if available
if self.ctype_struct is not None:
try:
self.fields[name].set_offset(getattr(self.ctype_struct, name).offset)
except AttributeError:
# Fallback to manual calculation if field not found in ctype_struct
self.fields[name].set_offset(self.current_offset)
self.current_offset += self._calculate_size(
name, size_of_containing_type
)
else:
# Manual offset calculation when no ctype_struct is available
self.fields[name].set_offset(self.current_offset)
self.current_offset += self._calculate_size(name, size_of_containing_type)
# Invalidate readiness cache
self._ready_cache = None
def _calculate_size(
self, name: str, size_of_containing_type: Optional[int] = None
) -> int:
processing_field = self.fields[name]
# size_of_field will be in bytes
if processing_field.type.__module__ == ctypes.__name__:
size_of_field = ctypes.sizeof(processing_field.type)
return size_of_field
elif processing_field.type.__module__ == "vmlinux":
if processing_field.ctype_complex_type is not None:
if issubclass(processing_field.ctype_complex_type, ctypes.Array):
if processing_field.containing_type.__module__ == ctypes.__name__:
if (
processing_field.containing_type is not None
and processing_field.type_size is not None
):
size_of_field = (
ctypes.sizeof(processing_field.containing_type)
* processing_field.type_size
)
else:
raise RuntimeError(
f"{processing_field} has no containing_type or type_size"
)
return size_of_field
elif processing_field.containing_type.__module__ == "vmlinux":
if (
size_of_containing_type is not None
and processing_field.type_size is not None
):
size_of_field = (
size_of_containing_type * processing_field.type_size
)
else:
raise RuntimeError(
f"{processing_field} has no containing_type or type_size"
)
return size_of_field
elif issubclass(processing_field.ctype_complex_type, ctypes._Pointer):
return ctypes.sizeof(ctypes.c_void_p)
else:
raise NotImplementedError(
"This subclass of ctype not supported yet"
)
elif processing_field.type_size is not None:
# Handle vmlinux types with type_size but no ctype_complex_type
# This means it's a direct vmlinux struct field (not array/pointer wrapped)
# The type_size should already contain the full size of the struct
# But if there's a containing_type from vmlinux, we need that size
if processing_field.containing_type is not None:
if processing_field.containing_type.__module__ == "vmlinux":
# For vmlinux containing types, we need the pre-calculated size
if size_of_containing_type is not None:
return size_of_containing_type * processing_field.type_size
else:
raise RuntimeError(
f"Field {name}: vmlinux containing_type requires size_of_containing_type"
)
else:
raise ModuleNotFoundError(
f"Containing type module {processing_field.containing_type.__module__} not supported"
)
else:
raise RuntimeError("Wrong type found with no containing type")
else:
# No ctype_complex_type and no type_size, must rely on size_of_containing_type
if size_of_containing_type is None:
raise RuntimeError(
f"Size of containing type {size_of_containing_type} is None"
)
return size_of_containing_type
else:
raise ModuleNotFoundError("Module is not supported for the operation")
raise RuntimeError("control should not reach here")
@property
def is_ready(self) -> bool:
"""Check if the node is ready (all fields are ready)."""
@ -218,8 +338,8 @@ class DependencyNode:
# Calculate readiness only when needed
if not self.fields:
self._ready_cache = False
return False
self._ready_cache = True
return True
self._ready_cache = all(elem.ready for elem in self.fields.values())
return self._ready_cache
@ -231,3 +351,13 @@ class DependencyNode:
def get_ready_fields(self) -> Dict[str, Field]:
"""Get all fields that are marked as ready."""
return {name: elem for name, elem in self.fields.items() if elem.ready}
def get_not_ready_fields(self) -> Dict[str, Field]:
"""Get all fields that are marked as not ready."""
return {name: elem for name, elem in self.fields.items() if not elem.ready}
def add_dependent(self, dep_type):
if dep_type in self.depends_on:
return
else:
self.depends_on.append(dep_type)

View File

@ -1,11 +1,11 @@
import ast
import logging
from typing import List, Tuple, Dict
from typing import List, Tuple, Any
import importlib
import inspect
from .dependency_handler import DependencyHandler
from .ir_generation import IRGenerator
from .ir_gen import IRGenerator
from .class_handler import process_vmlinux_class
logger = logging.getLogger(__name__)
@ -82,7 +82,7 @@ def vmlinux_proc(tree: ast.AST, module):
# initialise dependency handler
handler = DependencyHandler()
# initialise assignment dictionary of name to type
assignments: Dict[str, type] = {}
assignments: dict[str, tuple[type, Any]] = {}
if not import_statements:
logger.info("No vmlinux imports found")
@ -129,7 +129,19 @@ def vmlinux_proc(tree: ast.AST, module):
)
IRGenerator(module, handler)
return assignments
def process_vmlinux_assign(node, module, assignments: Dict[str, type]):
raise NotImplementedError("Assignment handling has not been implemented yet")
def process_vmlinux_assign(node, module, assignments: dict[str, tuple[type, Any]]):
# Check if this is a simple assignment with a constant value
if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
target_name = node.targets[0].id
if isinstance(node.value, ast.Constant):
assignments[target_name] = (type(node.value.value), node.value.value)
logger.info(
f"Added assignment: {target_name} = {node.value.value!r} of type {type(node.value.value)}"
)
else:
raise ValueError(f"Unsupported assignment type for {target_name}")
else:
raise ValueError("Not a simple assignment")

View File

@ -0,0 +1,3 @@
from .ir_generation import IRGenerator
__all__ = ["IRGenerator"]

View File

@ -0,0 +1,15 @@
from pythonbpf.debuginfo import DebugInfoGenerator
def debug_info_generation(struct, llvm_module):
generator = DebugInfoGenerator(llvm_module)
# this is sample debug info generation
# i64type = generator.get_uint64_type()
struct_type = generator.create_struct_type([], 64 * 4, is_distinct=True)
global_var = generator.create_global_var_debug_info(
struct.name, struct_type, is_local=False
)
return global_var

View File

@ -0,0 +1,161 @@
import ctypes
import logging
from ..dependency_handler import DependencyHandler
from .debug_info_gen import debug_info_generation
from ..dependency_node import DependencyNode
import llvmlite.ir as ir
logger = logging.getLogger(__name__)
class IRGenerator:
# get the assignments dict and add this stuff to it.
def __init__(self, llvm_module, handler: DependencyHandler, assignment=None):
self.llvm_module = llvm_module
self.handler: DependencyHandler = handler
self.generated: list[str] = []
if not handler.is_ready:
raise ImportError(
"Semantic analysis of vmlinux imports failed. Cannot generate IR"
)
for struct in handler:
self.struct_processor(struct)
def struct_processor(self, struct, processing_stack=None):
# Initialize processing stack on first call
if processing_stack is None:
processing_stack = set()
# If already generated, skip
if struct.name in self.generated:
return
# Detect circular dependency
if struct.name in processing_stack:
logger.info(
f"Circular dependency detected for {struct.name}, skipping recursive processing"
)
# For circular dependencies, we can either:
# 1. Use forward declarations (opaque pointers)
# 2. Mark as incomplete and process later
# 3. Generate a placeholder type
# Here we'll just skip and let it be processed in its own call
return
logger.info(f"IR generating for {struct.name}")
# Add to processing stack before processing dependencies
processing_stack.add(struct.name)
try:
# Process all dependencies first
if struct.depends_on is None:
pass
else:
for dependency in struct.depends_on:
if dependency not in self.generated:
# Check if dependency exists in handler
if dependency in self.handler.nodes:
dep_node_from_dependency = self.handler[dependency]
# Pass the processing_stack down to track circular refs
self.struct_processor(
dep_node_from_dependency, processing_stack
)
else:
raise RuntimeError(
f"Warning: Dependency {dependency} not found in handler"
)
# Actual processor logic here after dependencies are resolved
self.gen_ir(struct)
self.generated.append(struct.name)
finally:
# Remove from processing stack after we're done
processing_stack.discard(struct.name)
def gen_ir(self, struct):
# TODO: we add the btf_ama attribute by monkey patching in the end of compilation, but once llvmlite
# accepts our issue, we will resort to normal accessed attribute based attribute addition
# currently we generate all possible field accesses for CO-RE and put into the assignment table
debug_info = debug_info_generation(struct, self.llvm_module)
field_index = 0
for field_name, field in struct.fields.items():
# does not take arrays and similar types into consideration yet.
if field.ctype_complex_type is not None and issubclass(
field.ctype_complex_type, ctypes.Array
):
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == ctypes.__name__:
containing_type_size = ctypes.sizeof(containing_type)
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
field_index += 1
elif field.type_size is not None:
array_size = field.type_size
containing_type = field.containing_type
if containing_type.__module__ == "vmlinux":
containing_type_size = self.handler[
containing_type.__name__
].current_offset
for i in range(0, array_size):
field_co_re_name = self._struct_name_generator(
struct, field, field_index, True, i, containing_type_size
)
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
field_index += 1
else:
field_co_re_name = self._struct_name_generator(
struct, field, field_index
)
field_index += 1
globvar = ir.GlobalVariable(
self.llvm_module, ir.IntType(64), name=field_co_re_name
)
globvar.linkage = "external"
globvar.set_metadata("llvm.preserve.access.index", debug_info)
def _struct_name_generator(
self,
struct: DependencyNode,
field,
field_index: int,
is_indexed: bool = False,
index: int = 0,
containing_type_size: int = 0,
) -> str:
if is_indexed:
name = (
"llvm."
+ struct.name.removeprefix("struct_")
+ f":0:{field.offset + index * containing_type_size}"
+ "$"
+ f"0:{field_index}:{index}"
)
return name
elif struct.name.startswith("struct_"):
name = (
"llvm."
+ struct.name.removeprefix("struct_")
+ f":0:{field.offset}"
+ "$"
+ f"0:{field_index}"
)
return name
else:
print(self.handler[struct.name])
raise TypeError(
"Name generation cannot occur due to type name not starting with struct"
)

View File

@ -1,8 +0,0 @@
# here, we will iterate through the dependencies and generate IR once dependencies are resolved fully
from .dependency_handler import DependencyHandler
class IRGenerator:
def __init__(self, module, handler):
self.module = module
self.handler: DependencyHandler = handler

View File

@ -1,23 +1,9 @@
// SPDX-License-Identifier: GPL-2.0
#include <linux/bpf.h>
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
struct trace_entry {
short unsigned int type;
unsigned char flags;
unsigned char preempt_count;
int pid;
};
struct trace_event_raw_sys_enter {
struct trace_entry ent;
long int id;
long unsigned int args[6];
char __data[0];
};
struct event {
__u32 pid;
__u32 uid;

View File

@ -0,0 +1,40 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
# NOTE: This example tries to reinterpret the variable `x` to a different type.
# We do not allow this for now, as stack allocations are typed and have to be
# done in the first basic block. Allowing re-interpretation would require
# re-allocation of stack space (possibly in a new basic block), which is not
# supported in eBPF yet.
# We can allow bitcasts in cases where the width of the types is the same in
# the future. But for now, we do not allow any re-interpretation of variables.
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
x = last.lookup(0)
x = 20
if x == 2:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,12 +1,17 @@
from pythonbpf import bpf, map, section, bpfglobal, compile_to_ir
from pythonbpf.maps import HashMap
from pythonbpf.helper import XDP_PASS
from vmlinux import TASK_COMM_LEN # noqa: F401
from vmlinux import struct_qspinlock # noqa: F401
# from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
# from vmlinux import struct_posix_cputimers # noqa: F401
from vmlinux import struct_xdp_md
from vmlinux import struct_ring_buffer_per_cpu # noqa: F401
from vmlinux import struct_xdp_buff # noqa: F401
# from vmlinux import struct_xdp_md
# from vmlinux import struct_trace_event_raw_sys_enter # noqa: F401
# from vmlinux import struct_ring_buffer_per_cpu # noqa: F401
# from vmlinux import struct_request # noqa: F401
from ctypes import c_int64
# Instructions to how to run this program

View File

@ -0,0 +1,74 @@
from pythonbpf import bpf, map, section, bpfglobal, compile, struct
from ctypes import c_void_p, c_int64, c_int32, c_uint64
from pythonbpf.maps import HashMap
from pythonbpf.helper import ktime
# NOTE: This is a comprehensive test combining struct, helper, and map features
# Please note that at line 50, though we have used an absurd expression to test
# the compiler, it is recommended to use named variables to reduce the amount of
# scratch space that needs to be allocated.
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
dat.pid = 123
dat.pid = dat.pid + 1
print(f"pid is {dat.pid}")
tu = 9
last.update(0, tu)
last.update(1, -last.lookup(0))
x = last.lookup(0)
print(f"Map value at index 0: {x}")
x = x + c_int32(1)
print(f"x after adding 32-bit 1 is {x}")
x = ktime() - 121
print(f"ktime - 121 is {x}")
x = last.lookup(0)
x = x + 1
print(f"x is {x}")
if x == 10:
jat = data_t()
jat.ts = 456
print(f"Hello, World!, ts is {jat.ts}")
a = last.lookup(0)
print(f"a is {a}")
last.update(9, 9)
last.update(
0,
last.lookup(last.lookup(0))
+ last.lookup(last.lookup(0))
+ last.lookup(last.lookup(0)),
)
z = last.lookup(0)
print(f"new map val at index 0 is {z}")
else:
a = last.lookup(0)
print("Goodbye, World!")
c = last.lookup(1 - 1)
print(f"c is {c}")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,27 @@
from pythonbpf import bpf, section, bpfglobal, compile
from ctypes import c_void_p, c_int64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
x = 1
print(f"Initial x: {x}")
a = 20
x = a
print(f"Updated x with a: {x}")
x = (x + x) * 3
if x == 2:
print("Hello, World!")
else:
print(f"Goodbye, World! {x}")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,34 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.maps import HashMap
# NOTE: An example of i64** assignment with binops on the RHS
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
last.update(0, 1)
x = last.lookup(0)
print(f"{x}")
x = x + 1
if x == 2:
print("Hello, World!")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -0,0 +1,40 @@
from pythonbpf import bpf, section, bpfglobal, compile, struct
from ctypes import c_void_p, c_int64, c_uint64
from pythonbpf.helper import ktime
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello_world(ctx: c_void_p) -> c_int64:
dat = data_t()
dat.pid = 123
dat.pid = dat.pid + 1
print(f"pid is {dat.pid}")
x = ktime() - 121
print(f"ktime is {x}")
x = 1
x = x + 1
print(f"x is {x}")
if x == 2:
jat = data_t()
jat.ts = 456
print(f"Hello, World!, ts is {jat.ts}")
else:
print("Goodbye, World!")
return
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()