Merge pull request #28 from pythonbpf/vmlinux-working

Add compilation mechanism from vmlinux
Still  does not compile to IR. only does semantic analysis.
Another PR will be opened soon for IR generation.
This commit is contained in:
varunrmallya
2025-10-13 19:08:41 +05:30
committed by GitHub
23 changed files with 847 additions and 248520 deletions

View File

@ -4,6 +4,7 @@ from .license_pass import license_processing
from .functions import func_proc
from .maps import maps_proc
from .structs import structs_proc
from .vmlinux_parser import vmlinux_proc
from .globals_pass import (
globals_list_creation,
globals_processing,
@ -44,6 +45,7 @@ def processor(source_code, filename, module):
for func_node in bpf_chunks:
logger.info(f"Found BPF function/struct: {func_node.name}")
vmlinux_proc(tree, module)
populate_global_symbol_table(tree, module)
license_processing(tree, module)
globals_processing(tree, module)

View File

@ -15,5 +15,8 @@ def deref(ptr):
return result if result is not None else 0
XDP_ABORTED = ctypes.c_int64(0)
XDP_DROP = ctypes.c_int64(1)
XDP_PASS = ctypes.c_int64(2)
XDP_TX = ctypes.c_int64(3)
XDP_REDIRECT = ctypes.c_int64(4)

View File

View File

@ -0,0 +1,3 @@
from .import_detector import vmlinux_proc
__all__ = ["vmlinux_proc"]

View File

@ -0,0 +1,167 @@
import logging
from functools import lru_cache
import importlib
from .dependency_handler import DependencyHandler
from .dependency_node import DependencyNode
import ctypes
from typing import Optional, Any, Dict
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_module_symbols(module_name: str):
imported_module = importlib.import_module(module_name)
return [name for name in dir(imported_module)], imported_module
def process_vmlinux_class(node, llvm_module, handler: DependencyHandler):
symbols_in_module, imported_module = get_module_symbols("vmlinux")
if node.name in symbols_in_module:
vmlinux_type = getattr(imported_module, node.name)
process_vmlinux_post_ast(vmlinux_type, llvm_module, handler)
else:
raise ImportError(f"{node.name} not in vmlinux")
def process_vmlinux_post_ast(
elem_type_class, llvm_handler, handler: DependencyHandler, processing_stack=None
):
# Initialize processing stack on first call
if processing_stack is None:
processing_stack = set()
symbols_in_module, imported_module = get_module_symbols("vmlinux")
current_symbol_name = elem_type_class.__name__
logger.info(f"Begin {current_symbol_name} Processing")
field_table: Dict[str, list] = {}
is_complex_type = False
containing_type: Optional[Any] = None
ctype_complex_type: Optional[Any] = None
type_length: Optional[int] = None
module_name = getattr(elem_type_class, "__module__", None)
# Check if already processed
if handler.has_node(current_symbol_name):
logger.debug(f"Node {current_symbol_name} already processed and ready")
return True
# XXX:Check it's use. It's probably not being used.
if current_symbol_name in processing_stack:
logger.debug(
f"Dependency already in processing stack for {current_symbol_name}, skipping"
)
return True
processing_stack.add(current_symbol_name)
if module_name == "vmlinux":
if hasattr(elem_type_class, "_type_"):
pass
else:
new_dep_node = DependencyNode(name=current_symbol_name)
handler.add_node(new_dep_node)
class_obj = getattr(imported_module, current_symbol_name)
# Inspect the class fields
if hasattr(class_obj, "_fields_"):
for field_elem in class_obj._fields_:
field_name: str = ""
field_type: Optional[Any] = None
bitfield_size: Optional[int] = None
if len(field_elem) == 2:
field_name, field_type = field_elem
elif len(field_elem) == 3:
field_name, field_type, bitfield_size = field_elem
field_table[field_name] = [field_type, bitfield_size]
elif hasattr(class_obj, "__annotations__"):
for field_elem in class_obj.__annotations__.items():
if len(field_elem) == 2:
field_name, field_type = field_elem
bitfield_size = None
elif len(field_elem) == 3:
field_name, field_type, bitfield_size = field_elem
else:
raise ValueError(
"Number of fields in items() of class object unexpected"
)
field_table[field_name] = [field_type, bitfield_size]
else:
raise TypeError("Could not get required class and definition")
logger.debug(f"Extracted fields for {current_symbol_name}: {field_table}")
for elem in field_table.items():
elem_name, elem_temp_list = elem
[elem_type, elem_bitfield_size] = elem_temp_list
local_module_name = getattr(elem_type, "__module__", None)
new_dep_node.add_field(elem_name, elem_type, ready=False)
if local_module_name == ctypes.__name__:
new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size)
new_dep_node.set_field_ready(elem_name, is_ready=True)
logger.debug(
f"Field {elem_name} is direct ctypes type: {elem_type}"
)
elif local_module_name == "vmlinux":
new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size)
logger.debug(
f"Processing vmlinux field: {elem_name}, type: {elem_type}"
)
if hasattr(elem_type, "_type_"):
is_complex_type = True
containing_type = elem_type._type_
if hasattr(elem_type, "_length_") and is_complex_type:
type_length = elem_type._length_
if containing_type.__module__ == "vmlinux":
pass
elif containing_type.__module__ == ctypes.__name__:
if isinstance(elem_type, type):
if issubclass(elem_type, ctypes.Array):
ctype_complex_type = ctypes.Array
elif issubclass(elem_type, ctypes._Pointer):
ctype_complex_type = ctypes._Pointer
else:
raise TypeError("Unsupported ctypes subclass")
else:
raise ImportError(
f"Unsupported module of {containing_type}"
)
logger.debug(
f"{containing_type} containing type of parent {elem_name} with {elem_type} and ctype {ctype_complex_type} and length {type_length}"
)
new_dep_node.set_field_containing_type(
elem_name, containing_type
)
new_dep_node.set_field_type_size(elem_name, type_length)
new_dep_node.set_field_ctype_complex_type(
elem_name, ctype_complex_type
)
new_dep_node.set_field_type(elem_name, elem_type)
if containing_type.__module__ == "vmlinux":
process_vmlinux_post_ast(
containing_type, llvm_handler, handler, processing_stack
)
new_dep_node.set_field_ready(elem_name, True)
elif containing_type.__module__ == ctypes.__name__:
logger.debug(f"Processing ctype internal{containing_type}")
new_dep_node.set_field_ready(elem_name, True)
else:
raise TypeError(
"Module not supported in recursive resolution"
)
else:
process_vmlinux_post_ast(
elem_type, llvm_handler, handler, processing_stack
)
new_dep_node.set_field_ready(elem_name, True)
else:
raise ValueError(
f"{elem_name} with type {elem_type} from module {module_name} not supported in recursive resolver"
)
else:
raise ImportError("UNSUPPORTED Module")
logging.info(
f"{current_symbol_name} processed and handler readiness {handler.is_ready}"
)
return True

View File

@ -0,0 +1,149 @@
from typing import Optional, Dict, List, Iterator
from .dependency_node import DependencyNode
class DependencyHandler:
"""
Manages a collection of DependencyNode objects with no duplicates.
Ensures that no two nodes with the same name can be added and provides
methods to check readiness and retrieve specific nodes.
Example usage:
# Create a handler
handler = DependencyHandler()
# Create some dependency nodes
node1 = DependencyNode(name="node1")
node1.add_field("field1", str)
node1.set_field_value("field1", "value1")
node2 = DependencyNode(name="node2")
node2.add_field("field1", int)
# Add nodes to the handler
handler.add_node(node1)
handler.add_node(node2)
# Check if a specific node exists
print(handler.has_node("node1")) # True
# Get a reference to a node and modify it
node = handler.get_node("node2")
node.set_field_value("field1", 42)
# Check if all nodes are ready
print(handler.is_ready) # False (node2 is ready, but node1 isn't)
"""
def __init__(self):
# Using a dictionary with node names as keys ensures name uniqueness
# and provides efficient lookups
self._nodes: Dict[str, DependencyNode] = {}
def add_node(self, node: DependencyNode) -> bool:
"""
Add a dependency node to the handler.
Args:
node: The DependencyNode to add
Returns:
bool: True if the node was added, False if a node with the same name already exists
Raises:
TypeError: If the provided object is not a DependencyNode
"""
if not isinstance(node, DependencyNode):
raise TypeError(f"Expected DependencyNode, got {type(node).__name__}")
# Check if a node with this name already exists
if node.name in self._nodes:
return False
self._nodes[node.name] = node
return True
@property
def is_ready(self) -> bool:
"""
Check if all nodes are ready.
Returns:
bool: True if all nodes are ready (or if there are no nodes), False otherwise
"""
if not self._nodes:
return True
return all(node.is_ready for node in self._nodes.values())
def has_node(self, name: str) -> bool:
"""
Check if a node with the given name exists.
Args:
name: The name to check
Returns:
bool: True if a node with the given name exists, False otherwise
"""
return name in self._nodes
def get_node(self, name: str) -> Optional[DependencyNode]:
"""
Get a node by name for manipulation.
Args:
name: The name of the node to retrieve
Returns:
Optional[DependencyNode]: The node with the given name, or None if not found
"""
return self._nodes.get(name)
def remove_node(self, node_or_name) -> bool:
"""
Remove a node by name or reference.
Args:
node_or_name: The node to remove or its name
Returns:
bool: True if the node was removed, False if not found
"""
if isinstance(node_or_name, DependencyNode):
name = node_or_name.name
else:
name = node_or_name
if name in self._nodes:
del self._nodes[name]
return True
return False
def get_all_nodes(self) -> List[DependencyNode]:
"""
Get all nodes stored in the handler.
Returns:
List[DependencyNode]: List of all nodes
"""
return list(self._nodes.values())
def __iter__(self) -> Iterator[DependencyNode]:
"""
Iterate over all nodes.
Returns:
Iterator[DependencyNode]: Iterator over all nodes
"""
return iter(self._nodes.values())
def __len__(self) -> int:
"""
Get the number of nodes in the handler.
Returns:
int: The number of nodes
"""
return len(self._nodes)

View File

@ -0,0 +1,237 @@
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
# TODO: FIX THE FUCKING TYPE NAME CONVENTION.
@dataclass
class Field:
"""Represents a field in a dependency node with its type and readiness state."""
name: str
type: type
ctype_complex_type: Optional[Any]
containing_type: Optional[Any]
type_size: Optional[int]
bitfield_size: Optional[int]
value: Any = None
ready: bool = False
def set_ready(self, is_ready: bool = True) -> None:
"""Set the readiness state of this field."""
self.ready = is_ready
def set_value(self, value: Any, mark_ready: bool = False) -> None:
"""Set the value of this field and optionally mark it as ready."""
self.value = value
if mark_ready:
self.ready = True
def set_type(self, given_type, mark_ready: bool = False) -> None:
"""Set value of the type field and mark as ready"""
self.type = given_type
if mark_ready:
self.ready = True
def set_containing_type(
self, containing_type: Optional[Any], mark_ready: bool = False
) -> None:
"""Set the containing_type of this field and optionally mark it as ready."""
self.containing_type = containing_type
if mark_ready:
self.ready = True
def set_type_size(self, type_size: Any, mark_ready: bool = False) -> None:
"""Set the type_size of this field and optionally mark it as ready."""
self.type_size = type_size
if mark_ready:
self.ready = True
def set_ctype_complex_type(
self, ctype_complex_type: Any, mark_ready: bool = False
) -> None:
"""Set the ctype_complex_type of this field and optionally mark it as ready."""
self.ctype_complex_type = ctype_complex_type
if mark_ready:
self.ready = True
def set_bitfield_size(self, bitfield_size: Any, mark_ready: bool = False) -> None:
"""Set the bitfield_size of this field and optionally mark it as ready."""
self.bitfield_size = bitfield_size
if mark_ready:
self.ready = True
@dataclass
class DependencyNode:
"""
A node with typed fields and readiness tracking.
Example usage:
# Create a dependency node for a Person
somestruct = DependencyNode(name="struct_1")
# Add fields with their types
somestruct.add_field("field_1", str)
somestruct.add_field("field_2", int)
somestruct.add_field("field_3", str)
# Check if the node is ready (should be False initially)
print(f"Is node ready? {somestruct.is_ready}") # False
# Set some field values
somestruct.set_field_value("field_1", "someproperty")
somestruct.set_field_value("field_2", 30)
# Check if the node is ready (still False because email is not ready)
print(f"Is node ready? {somestruct.is_ready}") # False
# Set the last field and make the node ready
somestruct.set_field_value("field_3", "anotherproperty")
# Now the node should be ready
print(f"Is node ready? {somestruct.is_ready}") # True
# You can also mark a field as not ready
somestruct.set_field_ready("field_3", False)
# Now the node is not ready again
print(f"Is node ready? {somestruct.is_ready}") # False
# Get all field values
print(somestruct.get_field_values()) # {'field_1': 'someproperty', 'field_2': 30, 'field_3': 'anotherproperty'}
# Get only ready fields
ready_fields = somestruct.get_ready_fields()
print(f"Ready fields: {[field.name for field in ready_fields.values()]}") # ['field_1', 'field_2']
"""
name: str
fields: Dict[str, Field] = field(default_factory=dict)
_ready_cache: Optional[bool] = field(default=None, repr=False)
def add_field(
self,
name: str,
field_type: type,
initial_value: Any = None,
containing_type: Optional[Any] = None,
type_size: Optional[int] = None,
ctype_complex_type: Optional[int] = None,
bitfield_size: Optional[int] = None,
ready: bool = False,
) -> None:
"""Add a field to the node with an optional initial value and readiness state."""
self.fields[name] = Field(
name=name,
type=field_type,
value=initial_value,
ready=ready,
containing_type=containing_type,
type_size=type_size,
ctype_complex_type=ctype_complex_type,
bitfield_size=bitfield_size,
)
# Invalidate readiness cache
self._ready_cache = None
def get_field(self, name: str) -> Field:
"""Get a field by name."""
return self.fields[name]
def set_field_value(self, name: str, value: Any, mark_ready: bool = False) -> None:
"""Set a field's value and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_value(value, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_type(self, name: str, type: Any, mark_ready: bool = False) -> None:
"""Set a field's type and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_type(type, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_containing_type(
self, name: str, containing_type: Any, mark_ready: bool = False
) -> None:
"""Set a field's containing_type and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_containing_type(containing_type, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_type_size(
self, name: str, type_size: Any, mark_ready: bool = False
) -> None:
"""Set a field's type_size and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_type_size(type_size, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_ctype_complex_type(
self, name: str, ctype_complex_type: Any, mark_ready: bool = False
) -> None:
"""Set a field's ctype_complex_type and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_ctype_complex_type(ctype_complex_type, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_bitfield_size(
self, name: str, bitfield_size: Any, mark_ready: bool = False
) -> None:
"""Set a field's bitfield_size and optionally mark it as ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_bitfield_size(bitfield_size, mark_ready)
# Invalidate readiness cache
self._ready_cache = None
def set_field_ready(self, name: str, is_ready: bool = False) -> None:
"""Mark a field as ready or not ready."""
if name not in self.fields:
raise KeyError(f"Field '{name}' does not exist in node '{self.name}'")
self.fields[name].set_ready(is_ready)
# Invalidate readiness cache
self._ready_cache = None
@property
def is_ready(self) -> bool:
"""Check if the node is ready (all fields are ready)."""
# Use cached value if available
if self._ready_cache is not None:
return self._ready_cache
# Calculate readiness only when needed
if not self.fields:
self._ready_cache = True
return True
self._ready_cache = all(elem.ready for elem in self.fields.values())
return self._ready_cache
def get_field_values(self) -> Dict[str, Any]:
"""Get a dictionary of field names to their values."""
return {name: elem.value for name, elem in self.fields.items()}
def get_ready_fields(self) -> Dict[str, Field]:
"""Get all fields that are marked as ready."""
return {name: elem for name, elem in self.fields.items() if elem.ready}
def get_not_ready_fields(self) -> Dict[str, Field]:
"""Get all fields that are marked as not ready."""
return {name: elem for name, elem in self.fields.items() if not elem.ready}

View File

@ -0,0 +1,135 @@
import ast
import logging
from typing import List, Tuple, Dict
import importlib
import inspect
from .dependency_handler import DependencyHandler
from .ir_generation import IRGenerator
from .class_handler import process_vmlinux_class
logger = logging.getLogger(__name__)
def detect_import_statement(tree: ast.AST) -> List[Tuple[str, ast.ImportFrom]]:
"""
Parse AST and detect import statements from vmlinux.
Returns a list of tuples (module_name, imported_item) for vmlinux imports.
Raises SyntaxError for invalid import patterns.
Args:
tree: The AST to parse
Returns:
List of tuples containing (module_name, imported_item) for each vmlinux import
Raises:
SyntaxError: If multiple imports from vmlinux are attempted or import * is used
"""
vmlinux_imports = []
for node in ast.walk(tree):
# Handle "from vmlinux import ..." statements
if isinstance(node, ast.ImportFrom):
if node.module == "vmlinux":
# Check for wildcard import: from vmlinux import *
if any(alias.name == "*" for alias in node.names):
raise SyntaxError(
"Wildcard imports from vmlinux are not supported. "
"Please import specific types explicitly."
)
# Check for multiple imports: from vmlinux import A, B, C
if len(node.names) > 1:
imported_names = [alias.name for alias in node.names]
raise SyntaxError(
f"Multiple imports from vmlinux are not supported. "
f"Found: {', '.join(imported_names)}. "
f"Please use separate import statements for each type."
)
# Check if no specific import is specified (should not happen with valid Python)
if len(node.names) == 0:
raise SyntaxError(
"Import from vmlinux must specify at least one type."
)
# Valid single import
for alias in node.names:
import_name = alias.name
# Use alias if provided, otherwise use the original name (commented)
# as_name = alias.asname if alias.asname else alias.name
vmlinux_imports.append(("vmlinux", node))
logger.info(f"Found vmlinux import: {import_name}")
# Handle "import vmlinux" statements (not typical but should be rejected)
elif isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "vmlinux" or alias.name.startswith("vmlinux."):
raise SyntaxError(
"Direct import of vmlinux module is not supported. "
"Use 'from vmlinux import <type>' instead."
)
logger.info(f"Total vmlinux imports detected: {len(vmlinux_imports)}")
return vmlinux_imports
def vmlinux_proc(tree: ast.AST, module):
import_statements = detect_import_statement(tree)
# initialise dependency handler
handler = DependencyHandler()
# initialise assignment dictionary of name to type
assignments: Dict[str, type] = {}
if not import_statements:
logger.info("No vmlinux imports found")
return
# Import vmlinux module directly
try:
vmlinux_mod = importlib.import_module("vmlinux")
except ImportError:
logger.warning("Could not import vmlinux module")
return
source_file = inspect.getsourcefile(vmlinux_mod)
if source_file is None:
logger.warning("Cannot find source for vmlinux module")
return
with open(source_file, "r") as f:
mod_ast = ast.parse(f.read(), filename=source_file)
for import_mod, import_node in import_statements:
for alias in import_node.names:
imported_name = alias.name
found = False
for mod_node in mod_ast.body:
if (
isinstance(mod_node, ast.ClassDef)
and mod_node.name == imported_name
):
process_vmlinux_class(mod_node, module, handler)
found = True
break
if isinstance(mod_node, ast.Assign):
for target in mod_node.targets:
if isinstance(target, ast.Name) and target.id == imported_name:
process_vmlinux_assign(mod_node, module, assignments)
found = True
break
if found:
break
if not found:
logger.info(
f"{imported_name} not found as ClassDef or Assign in vmlinux"
)
IRGenerator(module, handler)
def process_vmlinux_assign(node, module, assignments: Dict[str, type]):
raise NotImplementedError("Assignment handling has not been implemented yet")

View File

@ -0,0 +1,14 @@
import logging
from .dependency_handler import DependencyHandler
logger = logging.getLogger(__name__)
class IRGenerator:
def __init__(self, module, handler: DependencyHandler):
self.module = module
self.handler: DependencyHandler = handler
if not handler.is_ready:
raise ImportError(
"Semantic analysis of vmlinux imports failed. Cannot generate IR"
)