diff --git a/pythonbpf/vmlinux_parser/class_handler.py b/pythonbpf/vmlinux_parser/class_handler.py index 74a09bc..111b8e0 100644 --- a/pythonbpf/vmlinux_parser/class_handler.py +++ b/pythonbpf/vmlinux_parser/class_handler.py @@ -62,23 +62,41 @@ def process_vmlinux_post_ast( class_obj = getattr(imported_module, current_symbol_name) # Inspect the class fields if hasattr(class_obj, "_fields_"): - for field_name, field_type in class_obj._fields_: - field_table[field_name] = field_type + for field_elem in class_obj._fields_: + field_name = None + field_type = None + bitfield_size = None + if len(field_elem) == 2: + field_name, field_type = field_elem + elif len(field_elem) == 3: + field_name, field_type, bitfield_size = field_elem + field_table[field_name] = [field_type, bitfield_size] elif hasattr(class_obj, "__annotations__"): - for field_name, field_type in class_obj.__annotations__.items(): - field_table[field_name] = field_type + for field_elem in class_obj.__annotations__.items(): + field_name = None + field_type = None + bitfield_size = None + if len(field_elem) == 2: + field_name, field_type = field_elem + elif len(field_elem) == 3: + field_name, field_type, bitfield_size = field_elem + field_table[field_name] = [field_type, bitfield_size] else: raise TypeError("Could not get required class and definition") - logger.info(f"Extracted fields for {current_symbol_name}: {field_table}") - - for elem_name, elem_type in field_table.items(): + logger.debug(f"Extracted fields for {current_symbol_name}: {field_table}") + for elem in field_table.items(): + elem_name, elem_temp_list = elem + [elem_type, elem_bitfield_size] = elem_temp_list local_module_name = getattr(elem_type, "__module__", None) if local_module_name == ctypes.__name__: - new_dep_node.add_field(elem_name, elem_type, ready=True) + new_dep_node.add_field(elem_name, elem_type, ready=False) + new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size) + new_dep_node.set_field_ready(elem_name, is_ready=True) logger.info(f"Field {elem_name} is direct ctypes type: {elem_type}") elif local_module_name == "vmlinux": new_dep_node.add_field(elem_name, elem_type, ready=False) + new_dep_node.set_field_bitfield_size(elem_name, elem_bitfield_size) logger.debug( f"Processing vmlinux field: {elem_name}, type: {elem_type}" ) diff --git a/pythonbpf/vmlinux_parser/dependency_node.py b/pythonbpf/vmlinux_parser/dependency_node.py index 6f6d909..1b4a3bd 100644 --- a/pythonbpf/vmlinux_parser/dependency_node.py +++ b/pythonbpf/vmlinux_parser/dependency_node.py @@ -12,6 +12,7 @@ class Field: ctype_complex_type: Optional[Any] containing_type: Optional[Any] type_size: Optional[int] + bitfield_size: Optional[int] value: Any = None ready: bool = False @@ -51,6 +52,12 @@ class Field: if mark_ready: self.ready = True + def set_bitfield_size(self, bitfield_size: Any, mark_ready: bool = True) -> None: + """Set the bitfield_size of this field and optionally mark it as ready.""" + self.bitfield_size = bitfield_size + if mark_ready: + self.ready = True + @dataclass class DependencyNode: @@ -108,6 +115,7 @@ class DependencyNode: containing_type: Optional[Any] = None, type_size: Optional[int] = None, ctype_complex_type: Optional[int] = None, + bitfield_size: Optional[int] = None, ready: bool = False, ) -> None: """Add a field to the node with an optional initial value and readiness state.""" @@ -118,7 +126,8 @@ class DependencyNode: ready=ready, containing_type=containing_type, type_size=type_size, - ctype_complex_type=ctype_complex_type + ctype_complex_type=ctype_complex_type, + bitfield_size=bitfield_size ) # Invalidate readiness cache self._ready_cache = None @@ -178,6 +187,17 @@ class DependencyNode: # Invalidate readiness cache self._ready_cache = None + def set_field_bitfield_size( + self, name: str, bitfield_size: Any, mark_ready: bool = True + ) -> None: + """Set a field's bitfield_size and optionally mark it as ready.""" + if name not in self.fields: + raise KeyError(f"Field '{name}' does not exist in node '{self.name}'") + + self.fields[name].set_bitfield_size(bitfield_size, mark_ready) + # Invalidate readiness cache + self._ready_cache = None + def set_field_ready(self, name: str, is_ready: bool = True) -> None: """Mark a field as ready or not ready.""" if name not in self.fields: