Files
python-bpf/pythonbpf/trace.py
2025-09-07 13:31:38 +05:30

55 lines
1.7 KiB
Python

class TraceEvent:
def __init__(self, timestamp, comm, pid, cpu, flags, message):
"""Represents a parsed trace pipe event"""
self.timestamp = timestamp # float: timestamp in seconds
self.comm = comm # str: command name
self.pid = pid # int: process ID
self.cpu = cpu # int: CPU number
self.flags = flags # str: trace flags
self.message = message # str: the actual message
def __iter__(self):
"""Allow unpacking like the original BCC tuple"""
yield self.comm
yield self.pid
yield self.cpu
yield self.flags
yield self.timestamp
yield self.message
class TraceReader:
def __init__(self, trace_pipe_path="/sys/kernel/debug/tracing/trace_pipe"):
self.trace_pipe_path = trace_pipe_path
self.file = None
def __enter__(self):
self.file = open(self.trace_pipe_path, "r")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
def __iter__(self):
while True:
event = self.trace_fields()
if event:
yield event
def trace_fields(self):
"""Read and parse one line from the trace pipe"""
if not self.file:
self.file = open(self.trace_pipe_path, "r")
line = self.file.readline()
if not line:
return None
# Parse the line into components (simplified)
# Real implementation would need more robust parsing
parts = self._parse_trace_line(line)
return TraceEvent(*parts)
def _parse_trace_line(self, line):
# TODO: Implement
pass