mirror of
https://github.com/varun-r-mallya/Python-BPF.git
synced 2025-12-31 21:06:25 +00:00
97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
"""Data collection and management."""
|
|
|
|
import threading
|
|
import time
|
|
import numpy as np
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict
|
|
|
|
|
|
@dataclass
|
|
class LatencyStats:
|
|
"""Statistics computed from latency data."""
|
|
|
|
total: int = 0
|
|
mean: float = 0.0
|
|
median: float = 0.0
|
|
min: float = 0.0
|
|
max: float = 0.0
|
|
p95: float = 0.0
|
|
p99: float = 0.0
|
|
|
|
@classmethod
|
|
def from_array(cls, data: np.ndarray) -> "LatencyStats":
|
|
"""Compute stats from numpy array."""
|
|
if len(data) == 0:
|
|
return cls()
|
|
|
|
return cls(
|
|
total=len(data),
|
|
mean=float(np.mean(data)),
|
|
median=float(np.median(data)),
|
|
min=float(np.min(data)),
|
|
max=float(np.max(data)),
|
|
p95=float(np.percentile(data, 95)),
|
|
p99=float(np.percentile(data, 99)),
|
|
)
|
|
|
|
|
|
class LatencyCollector:
|
|
"""Collects and manages latency data from BPF."""
|
|
|
|
def __init__(self, bpf_object, buffer_size: int = 10000):
|
|
self.bpf = bpf_object
|
|
self.all_latencies: List[float] = []
|
|
self.recent_latencies = deque(maxlen=buffer_size) # type: ignore [var-annotated]
|
|
self.start_time = time.time()
|
|
self._lock = threading.Lock()
|
|
self._poll_thread = None
|
|
|
|
def callback(self, cpu: int, event):
|
|
"""Callback for BPF events."""
|
|
with self._lock:
|
|
self.all_latencies.append(event.delta_us)
|
|
self.recent_latencies.append(
|
|
{"time": time.time() - self.start_time, "latency": event.delta_us}
|
|
)
|
|
|
|
def start(self):
|
|
"""Start collecting data."""
|
|
self.bpf["events"].open_perf_buffer(self.callback, struct_name="latency_event")
|
|
|
|
def poll_loop():
|
|
while True:
|
|
self.bpf["events"].poll(100)
|
|
|
|
self._poll_thread = threading.Thread(target=poll_loop, daemon=True)
|
|
self._poll_thread.start()
|
|
print("✅ Data collection started")
|
|
|
|
def get_all_latencies(self) -> np.ndarray:
|
|
"""Get all latencies as numpy array."""
|
|
with self._lock:
|
|
return np.array(self.all_latencies) if self.all_latencies else np.array([])
|
|
|
|
def get_recent_latencies(self) -> List[Dict]:
|
|
"""Get recent latencies with timestamps."""
|
|
with self._lock:
|
|
return list(self.recent_latencies)
|
|
|
|
def get_stats(self) -> LatencyStats:
|
|
"""Compute current statistics."""
|
|
return LatencyStats.from_array(self.get_all_latencies())
|
|
|
|
def get_histogram_buckets(self) -> Dict[int, int]:
|
|
"""Get log2 histogram buckets."""
|
|
latencies = self.get_all_latencies()
|
|
if len(latencies) == 0:
|
|
return {}
|
|
|
|
log_buckets = np.floor(np.log2(latencies + 1)).astype(int)
|
|
buckets = {} # type: ignore [var-annotated]
|
|
for bucket in log_buckets:
|
|
buckets[bucket] = buckets.get(bucket, 0) + 1
|
|
|
|
return buckets
|