mirror of
https://github.com/varun-r-mallya/Python-BPF.git
synced 2025-12-31 21:06:25 +00:00
55 lines
1.7 KiB
Python
55 lines
1.7 KiB
Python
class TraceEvent:
|
|
def __init__(self, timestamp, comm, pid, cpu, flags, message):
|
|
"""Represents a parsed trace pipe event"""
|
|
self.timestamp = timestamp # float: timestamp in seconds
|
|
self.comm = comm # str: command name
|
|
self.pid = pid # int: process ID
|
|
self.cpu = cpu # int: CPU number
|
|
self.flags = flags # str: trace flags
|
|
self.message = message # str: the actual message
|
|
|
|
def __iter__(self):
|
|
"""Allow unpacking like the original BCC tuple"""
|
|
yield self.comm
|
|
yield self.pid
|
|
yield self.cpu
|
|
yield self.flags
|
|
yield self.timestamp
|
|
yield self.message
|
|
|
|
|
|
class TraceReader:
|
|
def __init__(self, trace_pipe_path="/sys/kernel/debug/tracing/trace_pipe"):
|
|
self.trace_pipe_path = trace_pipe_path
|
|
self.file = None
|
|
|
|
def __enter__(self):
|
|
self.file = open(self.trace_pipe_path, "r")
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if self.file:
|
|
self.file.close()
|
|
|
|
def __iter__(self):
|
|
while True:
|
|
event = self.trace_fields()
|
|
if event:
|
|
yield event
|
|
|
|
def trace_fields(self):
|
|
"""Read and parse one line from the trace pipe"""
|
|
if not self.file:
|
|
self.file = open(self.trace_pipe_path, "r")
|
|
line = self.file.readline()
|
|
if not line:
|
|
return None
|
|
# Parse the line into components (simplified)
|
|
# Real implementation would need more robust parsing
|
|
parts = self._parse_trace_line(line)
|
|
return TraceEvent(*parts)
|
|
|
|
def _parse_trace_line(self, line):
|
|
# TODO: Implement
|
|
pass
|