Files
python-bpf/examples/anomaly-detection/lib/probe.py

118 lines
3.0 KiB
Python

"""
PythonBPF eBPF Probe for Syscall Histogram Collection
"""
from vmlinux import struct_trace_event_raw_sys_enter
from pythonbpf import bpf, map, section, bpfglobal, BPF
from pythonbpf.helper import pid
from pythonbpf.maps import HashMap
from ctypes import c_int64
from lib import MAX_SYSCALLS, comm_for_pid
@bpf
@map
def histogram() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1024)
@bpf
@map
def target_pid_map() -> HashMap:
return HashMap(key=c_int64, value=c_int64, max_entries=1)
@bpf
@section("tracepoint/raw_syscalls/sys_enter")
def trace_syscall(ctx: struct_trace_event_raw_sys_enter) -> c_int64:
syscall_id = ctx.id
current_pid = pid()
target = target_pid_map.lookup(0)
if target:
if current_pid != target:
return 0 # type: ignore
if syscall_id < 0 or syscall_id >= 548:
return 0 # type: ignore
count = histogram.lookup(syscall_id)
if count:
histogram.update(syscall_id, count + 1)
else:
histogram.update(syscall_id, 1)
return 0 # type: ignore
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
ebpf_prog = BPF()
class Probe:
"""
Syscall histogram probe for a target process.
Usage:
probe = Probe(target_pid=1234)
probe.start()
histogram = probe.get_histogram()
"""
def __init__(self, target_pid: int, max_syscalls: int = MAX_SYSCALLS):
self.target_pid = target_pid
self.max_syscalls = max_syscalls
self.comm = comm_for_pid(target_pid)
if self.comm is None:
raise ValueError(f"Cannot find process with PID {target_pid}")
self._bpf = None
self._histogram_map = None
self._target_map = None
def start(self):
"""Compile, load, and attach the BPF probe."""
# Compile and load
self._bpf = ebpf_prog
self._bpf.load()
self._bpf.attach_all()
# Get map references
self._histogram_map = self._bpf["histogram"]
self._target_map = self._bpf["target_pid_map"]
# Set target PID in the map
self._target_map.update(0, self.target_pid)
return self
def get_histogram(self) -> list:
"""Read current histogram values as a list."""
if self._histogram_map is None:
raise RuntimeError("Probe not started. Call start() first.")
result = [0] * self.max_syscalls
for syscall_id in range(self.max_syscalls):
try:
count = self._histogram_map.lookup(syscall_id)
if count is not None:
result[syscall_id] = int(count)
except Exception:
pass
return result
def __getitem__(self, syscall_id: int) -> int:
"""Allow indexing: probe[syscall_id]"""
if self._histogram_map is None:
raise RuntimeError("Probe not started")
try:
count = self._histogram_map.lookup(syscall_id)
return int(count) if count is not None else 0
except Exception:
return 0