3 Commits

6 changed files with 787 additions and 3 deletions

View File

@ -1,14 +1,14 @@
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime, pid, comm
from pythonbpf.maps import PerfEventArray
from ctypes import c_void_p, c_int64, c_uint64
from ctypes import c_void_p, c_int64
@bpf
@struct
class data_t:
pid: c_uint64
ts: c_uint64
pid: c_int64
ts: c_int64
comm: str(16) # type: ignore [valid-type]

127
BCC-Examples/vfsreadlat.py Normal file
View File

@ -0,0 +1,127 @@
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime, pid
from pythonbpf.maps import HashMap, PerfEventArray
from ctypes import c_void_p, c_uint64
import matplotlib.pyplot as plt
import numpy as np
@bpf
@struct
class latency_event:
pid: c_uint64
delta_us: c_uint64 # Latency in microseconds
@bpf
@map
def start() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=10240)
@bpf
@map
def events() -> PerfEventArray:
return PerfEventArray(key_size=c_uint64, value_size=c_uint64)
@bpf
@section("kprobe/vfs_read")
def do_entry(ctx: c_void_p) -> c_uint64:
p, ts = pid(), ktime()
start.update(p, ts)
return 0 # type: ignore [return-value]
@bpf
@section("kretprobe/vfs_read")
def do_return(ctx: c_void_p) -> c_uint64:
p = pid()
tsp = start.lookup(p)
if tsp:
delta_ns = ktime() - tsp
# Only track if latency > 1 microsecond
if delta_ns > 1000:
evt = latency_event()
evt.pid, evt.delta_us = p, delta_ns // 1000
events.output(evt)
start.delete(p)
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
# Load BPF
print("Loading BPF program...")
b = BPF()
b.load()
b.attach_all()
# Collect latencies
latencies = []
def callback(cpu, event):
latencies.append(event.delta_us)
b["events"].open_perf_buffer(callback, struct_name="latency_event")
print("Tracing vfs_read latency... Hit Ctrl-C to end.")
try:
while True:
b["events"].poll(1000)
if len(latencies) > 0 and len(latencies) % 1000 == 0:
print(f"Collected {len(latencies)} samples...")
except KeyboardInterrupt:
print(f"Collected {len(latencies)} samples. Generating histogram...")
# Create histogram with matplotlib
if latencies:
# Use log scale for better visualization
log_latencies = np.log2(latencies)
plt.figure(figsize=(12, 6))
# Plot 1: Linear histogram
plt.subplot(1, 2, 1)
plt.hist(latencies, bins=50, edgecolor="black", alpha=0.7)
plt.xlabel("Latency (microseconds)")
plt.ylabel("Count")
plt.title("VFS Read Latency Distribution (Linear)")
plt.grid(True, alpha=0.3)
# Plot 2: Log2 histogram (like BCC)
plt.subplot(1, 2, 2)
plt.hist(log_latencies, bins=50, edgecolor="black", alpha=0.7, color="orange")
plt.xlabel("log2(Latency in µs)")
plt.ylabel("Count")
plt.title("VFS Read Latency Distribution (Log2)")
plt.grid(True, alpha=0.3)
# Add statistics
print("Statistics:")
print(f" Count: {len(latencies)}")
print(f" Min: {min(latencies)} µs")
print(f" Max: {max(latencies)} µs")
print(f" Mean: {np.mean(latencies):.2f} µs")
print(f" Median: {np.median(latencies):.2f} µs")
print(f" P95: {np.percentile(latencies, 95):.2f} µs")
print(f" P99: {np.percentile(latencies, 99):.2f} µs")
plt.tight_layout()
plt.savefig("vfs_read_latency.png", dpi=150)
print("Histogram saved to vfs_read_latency.png")
plt.show()
else:
print("No samples collected!")

View File

@ -0,0 +1,101 @@
"""BPF program for tracing VFS read latency."""
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime, pid
from pythonbpf.maps import HashMap, PerfEventArray
from ctypes import c_void_p, c_uint64
import argparse
from data_collector import LatencyCollector
from dashboard import LatencyDashboard
@bpf
@struct
class latency_event:
pid: c_uint64
delta_us: c_uint64
@bpf
@map
def start() -> HashMap:
"""Map to store start timestamps by PID."""
return HashMap(key=c_uint64, value=c_uint64, max_entries=10240)
@bpf
@map
def events() -> PerfEventArray:
"""Perf event array for sending latency events to userspace."""
return PerfEventArray(key_size=c_uint64, value_size=c_uint64)
@bpf
@section("kprobe/vfs_read")
def do_entry(ctx: c_void_p) -> c_uint64:
"""Record start time when vfs_read is called."""
p, ts = pid(), ktime()
start.update(p, ts)
return 0 # type: ignore [return-value]
@bpf
@section("kretprobe/vfs_read")
def do_return(ctx: c_void_p) -> c_uint64:
"""Calculate and record latency when vfs_read returns."""
p = pid()
tsp = start.lookup(p)
if tsp:
delta_ns = ktime() - tsp
# Only track latencies > 1 microsecond
if delta_ns > 1000:
evt = latency_event()
evt.pid, evt.delta_us = p, delta_ns // 1000
events.output(evt)
start.delete(p)
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Monitor VFS read latency with live dashboard"
)
parser.add_argument(
"--host", default="0.0.0.0", help="Dashboard host (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8050, help="Dashboard port (default: 8050)"
)
parser.add_argument(
"--buffer", type=int, default=10000, help="Recent data buffer size"
)
return parser.parse_args()
args = parse_args()
# Load BPF program
print("Loading BPF program...")
b = BPF()
b.load()
b.attach_all()
print("✅ BPF program loaded and attached")
# Setup data collector
collector = LatencyCollector(b, buffer_size=args.buffer)
collector.start()
# Create and run dashboard
dashboard = LatencyDashboard(collector)
dashboard.run(host=args.host, port=args.port)

View File

@ -0,0 +1,282 @@
"""Plotly Dash dashboard for visualizing latency data."""
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
class LatencyDashboard:
"""Interactive dashboard for latency visualization."""
def __init__(self, collector, title: str = "VFS Read Latency Monitor"):
self.collector = collector
self.app = dash.Dash(__name__)
self.app.title = title
self._setup_layout()
self._setup_callbacks()
def _setup_layout(self):
"""Create dashboard layout."""
self.app.layout = html.Div(
[
html.H1(
"🔥 VFS Read Latency Dashboard",
style={
"textAlign": "center",
"color": "#2c3e50",
"marginBottom": 20,
},
),
# Stats cards
html.Div(
[
self._create_stat_card(
"total-samples", "📊 Total Samples", "#3498db"
),
self._create_stat_card(
"mean-latency", "⚡ Mean Latency", "#e74c3c"
),
self._create_stat_card(
"p99-latency", "🔥 P99 Latency", "#f39c12"
),
],
style={
"display": "flex",
"justifyContent": "space-around",
"marginBottom": 30,
},
),
# Graphs - ✅ Make sure these IDs match the callback outputs
dcc.Graph(id="dual-histogram", style={"height": "450px"}),
dcc.Graph(id="log2-buckets", style={"height": "350px"}),
dcc.Graph(id="timeseries-graph", style={"height": "300px"}),
# Auto-update
dcc.Interval(id="interval-component", interval=1000, n_intervals=0),
],
style={"padding": 20, "fontFamily": "Arial, sans-serif"},
)
def _create_stat_card(self, id_name: str, title: str, color: str):
"""Create a statistics card."""
return html.Div(
[
html.H3(title, style={"color": color}),
html.H2(id=id_name, style={"fontSize": 48, "color": "#2c3e50"}),
],
className="stat-box",
style={
"background": "white",
"padding": 20,
"borderRadius": 10,
"boxShadow": "0 4px 6px rgba(0,0,0,0.1)",
"textAlign": "center",
"flex": 1,
"margin": "0 10px",
},
)
def _setup_callbacks(self):
"""Setup dashboard callbacks."""
@self.app.callback(
[
Output("total-samples", "children"),
Output("mean-latency", "children"),
Output("p99-latency", "children"),
Output("dual-histogram", "figure"), # ✅ Match layout IDs
Output("log2-buckets", "figure"), # ✅ Match layout IDs
Output("timeseries-graph", "figure"), # ✅ Match layout IDs
],
[Input("interval-component", "n_intervals")],
)
def update_dashboard(n):
stats = self.collector.get_stats()
if stats.total == 0:
return self._empty_state()
return (
f"{stats.total:,}",
f"{stats.mean:.1f} µs",
f"{stats.p99:.1f} µs",
self._create_dual_histogram(),
self._create_log2_buckets(),
self._create_timeseries(),
)
def _empty_state(self):
"""Return empty state for dashboard."""
empty_fig = go.Figure()
empty_fig.update_layout(
title="Waiting for data... Generate some disk I/O!", template="plotly_white"
)
# ✅ Return 6 values (3 stats + 3 figures)
return "0", "0 µs", "0 µs", empty_fig, empty_fig, empty_fig
def _create_dual_histogram(self) -> go.Figure:
"""Create side-by-side linear and log2 histograms."""
latencies = self.collector.get_all_latencies()
# Create subplots
fig = make_subplots(
rows=1,
cols=2,
subplot_titles=("Linear Scale", "Log2 Scale"),
horizontal_spacing=0.12,
)
# Linear histogram
fig.add_trace(
go.Histogram(
x=latencies,
nbinsx=50,
marker_color="rgb(55, 83, 109)",
opacity=0.75,
name="Linear",
),
row=1,
col=1,
)
# Log2 histogram
log2_latencies = np.log2(latencies + 1) # +1 to avoid log2(0)
fig.add_trace(
go.Histogram(
x=log2_latencies,
nbinsx=30,
marker_color="rgb(243, 156, 18)",
opacity=0.75,
name="Log2",
),
row=1,
col=2,
)
# Update axes
fig.update_xaxes(title_text="Latency (µs)", row=1, col=1)
fig.update_xaxes(title_text="log2(Latency in µs)", row=1, col=2)
fig.update_yaxes(title_text="Count", row=1, col=1)
fig.update_yaxes(title_text="Count", row=1, col=2)
fig.update_layout(
title_text="📊 Latency Distribution (Linear vs Log2)",
template="plotly_white",
showlegend=False,
height=450,
)
return fig
def _create_log2_buckets(self) -> go.Figure:
"""Create bar chart of log2 buckets (like BCC histogram)."""
buckets = self.collector.get_histogram_buckets()
if not buckets:
fig = go.Figure()
fig.update_layout(
title="🔥 Log2 Histogram - Waiting for data...", template="plotly_white"
)
return fig
# Sort buckets
sorted_buckets = sorted(buckets.keys())
counts = [buckets[b] for b in sorted_buckets]
# Create labels (e.g., "8-16µs", "16-32µs")
labels = []
hover_text = []
for bucket in sorted_buckets:
lower = 2**bucket
upper = 2 ** (bucket + 1)
labels.append(f"{lower}-{upper}")
# Calculate percentage
total = sum(counts)
pct = (buckets[bucket] / total) * 100 if total > 0 else 0
hover_text.append(
f"Range: {lower}-{upper} µs<br>"
f"Count: {buckets[bucket]:,}<br>"
f"Percentage: {pct:.2f}%"
)
# Create bar chart
fig = go.Figure()
fig.add_trace(
go.Bar(
x=labels,
y=counts,
marker=dict(
color=counts,
colorscale="YlOrRd",
showscale=True,
colorbar=dict(title="Count"),
),
text=counts,
textposition="outside",
hovertext=hover_text,
hoverinfo="text",
)
)
fig.update_layout(
title="🔥 Log2 Histogram (BCC-style buckets)",
xaxis_title="Latency Range (µs)",
yaxis_title="Count",
template="plotly_white",
height=350,
xaxis=dict(tickangle=-45),
)
return fig
def _create_timeseries(self) -> go.Figure:
"""Create time series figure."""
recent = self.collector.get_recent_latencies()
if not recent:
fig = go.Figure()
fig.update_layout(
title="⏱️ Real-time Latency - Waiting for data...",
template="plotly_white",
)
return fig
times = [d["time"] for d in recent]
lats = [d["latency"] for d in recent]
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=times,
y=lats,
mode="lines",
line=dict(color="rgb(231, 76, 60)", width=2),
fill="tozeroy",
fillcolor="rgba(231, 76, 60, 0.2)",
)
)
fig.update_layout(
title="⏱️ Real-time Latency (Last 10,000 samples)",
xaxis_title="Time (seconds)",
yaxis_title="Latency (µs)",
template="plotly_white",
height=300,
)
return fig
def run(self, host: str = "0.0.0.0", port: int = 8050, debug: bool = False):
"""Run the dashboard server."""
print(f"\n{'=' * 60}")
print(f"🚀 Dashboard running at: http://{host}:{port}")
print(" Access from your browser to see live graphs")
print(
" Generate disk I/O to see data: dd if=/dev/zero of=/tmp/test bs=1M count=100"
)
print(f"{'=' * 60}\n")
self.app.run(debug=debug, host=host, port=port)

View File

@ -0,0 +1,96 @@
"""Data collection and management."""
import threading
import time
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class LatencyStats:
"""Statistics computed from latency data."""
total: int = 0
mean: float = 0.0
median: float = 0.0
min: float = 0.0
max: float = 0.0
p95: float = 0.0
p99: float = 0.0
@classmethod
def from_array(cls, data: np.ndarray) -> "LatencyStats":
"""Compute stats from numpy array."""
if len(data) == 0:
return cls()
return cls(
total=len(data),
mean=float(np.mean(data)),
median=float(np.median(data)),
min=float(np.min(data)),
max=float(np.max(data)),
p95=float(np.percentile(data, 95)),
p99=float(np.percentile(data, 99)),
)
class LatencyCollector:
"""Collects and manages latency data from BPF."""
def __init__(self, bpf_object, buffer_size: int = 10000):
self.bpf = bpf_object
self.all_latencies: List[float] = []
self.recent_latencies = deque(maxlen=buffer_size) # type: ignore [var-annotated]
self.start_time = time.time()
self._lock = threading.Lock()
self._poll_thread = None
def callback(self, cpu: int, event):
"""Callback for BPF events."""
with self._lock:
self.all_latencies.append(event.delta_us)
self.recent_latencies.append(
{"time": time.time() - self.start_time, "latency": event.delta_us}
)
def start(self):
"""Start collecting data."""
self.bpf["events"].open_perf_buffer(self.callback, struct_name="latency_event")
def poll_loop():
while True:
self.bpf["events"].poll(100)
self._poll_thread = threading.Thread(target=poll_loop, daemon=True)
self._poll_thread.start()
print("✅ Data collection started")
def get_all_latencies(self) -> np.ndarray:
"""Get all latencies as numpy array."""
with self._lock:
return np.array(self.all_latencies) if self.all_latencies else np.array([])
def get_recent_latencies(self) -> List[Dict]:
"""Get recent latencies with timestamps."""
with self._lock:
return list(self.recent_latencies)
def get_stats(self) -> LatencyStats:
"""Compute current statistics."""
return LatencyStats.from_array(self.get_all_latencies())
def get_histogram_buckets(self) -> Dict[int, int]:
"""Get log2 histogram buckets."""
latencies = self.get_all_latencies()
if len(latencies) == 0:
return {}
log_buckets = np.floor(np.log2(latencies + 1)).astype(int)
buckets = {} # type: ignore [var-annotated]
for bucket in log_buckets:
buckets[bucket] = buckets.get(bucket, 0) + 1
return buckets

View File

@ -0,0 +1,178 @@
from pythonbpf import bpf, map, struct, section, bpfglobal, BPF
from pythonbpf.helper import ktime, pid
from pythonbpf.maps import HashMap, PerfEventArray
from ctypes import c_void_p, c_uint64
from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
import numpy as np
import threading
import time
from collections import Counter
# ==================== BPF Setup ====================
@bpf
@struct
class latency_event:
pid: c_uint64
delta_us: c_uint64
@bpf
@map
def start() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=10240)
@bpf
@map
def events() -> PerfEventArray:
return PerfEventArray(key_size=c_uint64, value_size=c_uint64)
@bpf
@section("kprobe/vfs_read")
def do_entry(ctx: c_void_p) -> c_uint64:
p, ts = pid(), ktime()
start.update(p, ts)
return 0 # type: ignore [return-value]
@bpf
@section("kretprobe/vfs_read")
def do_return(ctx: c_void_p) -> c_uint64:
p = pid()
tsp = start.lookup(p)
if tsp:
delta_ns = ktime() - tsp
if delta_ns > 1000:
evt = latency_event()
evt.pid, evt.delta_us = p, delta_ns // 1000
events.output(evt)
start.delete(p)
return 0 # type: ignore [return-value]
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
console = Console()
console.print("[bold green]Loading BPF program...[/]")
b = BPF()
b.load()
b.attach_all()
# ==================== Data Collection ====================
all_latencies = []
histogram_buckets = Counter() # type: ignore [var-annotated]
def callback(cpu, event):
all_latencies.append(event.delta_us)
# Create log2 bucket
bucket = int(np.floor(np.log2(event.delta_us + 1)))
histogram_buckets[bucket] += 1
b["events"].open_perf_buffer(callback, struct_name="latency_event")
def poll_events():
while True:
b["events"].poll(100)
poll_thread = threading.Thread(target=poll_events, daemon=True)
poll_thread.start()
# ==================== Live Display ====================
def generate_display():
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="stats", size=8),
Layout(name="histogram", size=20),
)
# Header
layout["header"].update(
Panel("[bold cyan]🔥 VFS Read Latency Monitor[/]", style="bold white on blue")
)
# Stats
if len(all_latencies) > 0:
lats = np.array(all_latencies)
stats_table = Table(show_header=False, box=None, padding=(0, 2))
stats_table.add_column(style="bold cyan")
stats_table.add_column(style="bold yellow")
stats_table.add_row("📊 Total Samples:", f"{len(lats):,}")
stats_table.add_row("⚡ Mean Latency:", f"{np.mean(lats):.2f} µs")
stats_table.add_row("📉 Min Latency:", f"{np.min(lats):.2f} µs")
stats_table.add_row("📈 Max Latency:", f"{np.max(lats):.2f} µs")
stats_table.add_row("🎯 P95 Latency:", f"{np.percentile(lats, 95):.2f} µs")
stats_table.add_row("🔥 P99 Latency:", f"{np.percentile(lats, 99):.2f} µs")
layout["stats"].update(
Panel(stats_table, title="Statistics", border_style="green")
)
else:
layout["stats"].update(
Panel("[yellow]Waiting for data...[/]", border_style="yellow")
)
# Histogram
if histogram_buckets:
hist_table = Table(title="Latency Distribution", box=None)
hist_table.add_column("Range", style="cyan", no_wrap=True)
hist_table.add_column("Count", justify="right", style="yellow")
hist_table.add_column("Distribution", style="green")
max_count = max(histogram_buckets.values())
for bucket in sorted(histogram_buckets.keys()):
count = histogram_buckets[bucket]
lower = 2**bucket
upper = 2 ** (bucket + 1)
# Create bar
bar_width = int((count / max_count) * 40)
bar = "" * bar_width
hist_table.add_row(
f"{lower:5d}-{upper:5d} µs",
f"{count:6d}",
f"[green]{bar}[/] {count / len(all_latencies) * 100:.1f}%",
)
layout["histogram"].update(Panel(hist_table, border_style="green"))
return layout
try:
with Live(generate_display(), refresh_per_second=2, console=console) as live:
while True:
time.sleep(0.5)
live.update(generate_display())
except KeyboardInterrupt:
console.print("\n[bold red]Stopping...[/]")
if all_latencies:
console.print(f"\n[bold green]✅ Collected {len(all_latencies):,} samples[/]")