mirror of
https://github.com/varun-r-mallya/pylibbpf.git
synced 2026-02-12 16:11:00 +00:00
Add PerfEventArray and BpfObject wrappers
This commit is contained in:
80
wrappers.py
Normal file
80
wrappers.py
Normal file
@ -0,0 +1,80 @@
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
class PerfEventArrayHelper:
|
||||
"""Fluent wrapper for PERF_EVENT_ARRAY maps."""
|
||||
|
||||
def __init__(self, bpf_map):
|
||||
self._map = bpf_map
|
||||
self._perf_buffer = None
|
||||
|
||||
def open_perf_buffer(
|
||||
self,
|
||||
callback: Callable,
|
||||
struct_name: str = "",
|
||||
page_cnt: int = 8,
|
||||
lost_callback: Optional[Callable] = None
|
||||
):
|
||||
"""Open perf buffer with auto-deserialization."""
|
||||
from .pylibbpf import PerfEventArray
|
||||
|
||||
if struct_name:
|
||||
self._perf_buffer = PerfEventArray(
|
||||
self._map,
|
||||
page_cnt,
|
||||
callback,
|
||||
struct_name,
|
||||
lost_callback or (lambda cpu, cnt: None)
|
||||
)
|
||||
else:
|
||||
self._perf_buffer = PerfEventArray(
|
||||
self._map,
|
||||
page_cnt,
|
||||
callback,
|
||||
lost_callback or (lambda cpu, cnt: None)
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
def poll(self, timeout_ms: int = -1) -> int:
|
||||
if not self._perf_buffer:
|
||||
raise RuntimeError("Call open_perf_buffer() first")
|
||||
return self._perf_buffer.poll(timeout_ms)
|
||||
|
||||
def consume(self) -> int:
|
||||
if not self._perf_buffer:
|
||||
raise RuntimeError("Call open_perf_buffer() first")
|
||||
return self._perf_buffer.consume()
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._map, name)
|
||||
|
||||
|
||||
class BpfObjectWrapper:
|
||||
"""Smart wrapper that returns map-specific helpers."""
|
||||
|
||||
BPF_MAP_TYPE_PERF_EVENT_ARRAY = 4
|
||||
BPF_MAP_TYPE_RINGBUF = 27
|
||||
|
||||
def __init__(self, bpf_object):
|
||||
self._obj = bpf_object
|
||||
self._map_helpers = {}
|
||||
|
||||
def __getitem__(self, name: str):
|
||||
"""Return appropriate helper based on map type."""
|
||||
if name in self._map_helpers:
|
||||
return self._map_helpers[name]
|
||||
|
||||
map_obj = self._obj[name]
|
||||
map_type = map_obj.get_type()
|
||||
|
||||
if map_type == self.BPF_MAP_TYPE_PERF_EVENT_ARRAY:
|
||||
helper = PerfEventArrayHelper(map_obj)
|
||||
else:
|
||||
helper = map_obj
|
||||
|
||||
self._map_helpers[name] = helper
|
||||
return helper
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._obj, name)
|
||||
Reference in New Issue
Block a user