Merge pull request #9 from pythonbpf/refactor

Cleanup and rename examples
This commit is contained in:
varunrmallya
2025-09-30 21:07:06 +05:30
committed by GitHub
24 changed files with 20 additions and 167 deletions

View File

@ -6,8 +6,8 @@ from ctypes import c_void_p, c_int64, c_uint64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
# 2. Run the program: python demo/pybpf3.py
# 3. Run the program with sudo: sudo examples/check.sh run demo/pybpf3.o
# 2. Run the program: python examples/binops_demo.py
# 3. Run the program with sudo: sudo tools/check.sh run examples/binops_demo.py
# 4. Start up any program and watch the output
@bpf

View File

@ -12,7 +12,7 @@ import matplotlib.pyplot as plt
# and then plots the distribution as a histogram using matplotlib.
# It provides a quick view of process creation activity over 10 seconds.
# Everything is done with Python only code and with the new pylibbpf library.
# Run `sudo /path/to/python/binary/ pybpf4.py`
# Run `sudo /path/to/python/binary/ clone_plot.py`
@bpf
@map

View File

@ -1,35 +0,0 @@
from pythonbpf.decorators import bpf, map, section, bpfglobal
from ctypes import c_void_p, c_int64, c_int32, c_uint64
from pythonbpf.helpers import ktime
from pythonbpf.maps import HashMap
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=1)
@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello(ctx: c_void_p) -> c_int32:
print("entered")
print("multi constant support")
return c_int32(0)
@bpf
@section("tracepoint/syscalls/sys_exit_execve")
def hello_again(ctx: c_void_p) -> c_int64:
print("exited")
key = 0
tsp = last().lookup(key)
print(tsp)
ts = ktime()
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"

View File

@ -1,45 +0,0 @@
from pythonbpf import bpf, map, section, bpfglobal, compile
from pythonbpf.helpers import ktime, deref
from pythonbpf.maps import HashMap
from ctypes import c_void_p, c_int64, c_int32, c_uint64
@bpf
@map
def last() -> HashMap:
return HashMap(key=c_uint64, value=c_uint64, max_entries=3)
@bpf
@section("tracepoint/syscalls/sys_exit_execve")
def hello_again(ctx: c_void_p) -> c_int64:
print("multi constant support")
print("exited")
key = 0
delta = 0
dddelta = 0
tsp = last().lookup(key)
if True:
delta = ktime()
ddelta = deref(delta)
ttsp = deref(deref(tsp))
dddelta = ddelta - ttsp
if dddelta < 1000000000:
print("execve called within last second")
last().delete(key)
ts = ktime()
last().update(key, ts)
va = 8
nm = 5 + va
al = 6 & 3
print(f"this is a variable {nm}")
return c_int64(0)
@bpf
@bpfglobal
def LICENSE() -> str:
return "GPL"
compile()

View File

@ -1,11 +1,9 @@
from pythonbpf import bpf, section, bpfglobal, compile
from pythonbpf import bpf, section, bpfglobal, compile, BPF
from ctypes import c_void_p, c_int64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
# 2. Run the program: python demo/pybpf0.py
# 3. Run the program with sudo: sudo examples/check.sh run demo/pybpf0.o
# 2. Run the program: sudo python examples/hello_world.py
# 4. Start up any program and watch the output
@ -20,4 +18,11 @@ def hello_world(ctx: c_void_p) -> c_int64:
def LICENSE() -> str:
return "GPL"
compile()
b = BPF()
b.load_and_attach()
if b.is_loaded() and b.is_attached():
print("Successfully loaded and attached")
else:
print("Could not load successfully")
# Now cat /sys/kernel/debug/tracing/trace_pipe to see results of the execve syscall.

View File

@ -6,8 +6,8 @@ from ctypes import c_void_p, c_int64, c_uint64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
# 2. Run the program: python demo/pybpf2.py
# 3. Run the program with sudo: sudo examples/check.sh run demo/pybpf2.o
# 2. Run the program: python examples/sys_sync.py
# 3. Run the program with sudo: sudo tools/check.sh run examples/sys_sync.o
# 4. Start a Python repl and `import os` and then keep entering `os.sync()` to see reponses.
@bpf

View File

@ -6,9 +6,9 @@ from ctypes import c_void_p, c_int64
# Instructions to how to run this program
# 1. Install PythonBPF: pip install pythonbpf
# 2. Run the program: python demo/pybpf1.py
# 3. Run the program with sudo: sudo examples/check.sh run demo/pybpf1.o
# 4. Attach object file to any network device with something like ./check.sh xdp ../demo/pybpf1.o tailscale0
# 2. Run the program: python examples/xdp_pass.py
# 3. Run the program with sudo: sudo tools/check.sh run examples/xdp_pass.o
# 4. Attach object file to any network device with something like ./check.sh xdp examples/xdp_pass.o tailscale0
# 5. send traffic through the device and observe effects
@bpf

View File

@ -219,6 +219,7 @@ def bpf_printk_emitter(call, map_ptr, module, builder, func, local_sym_tab=None,
builder.call(fn_ptr, [fmt_ptr, ir.Constant(
ir.IntType(32), len(fmt_str))], tail=True)
return None
def bpf_map_update_elem_emitter(call, map_ptr, module, builder, func, local_sym_tab=None, struct_sym_tab=None, local_var_metadata=None):
@ -496,3 +497,4 @@ def handle_helper_call(call, module, builder, func, local_sym_tab=None, map_sym_
else:
raise NotImplementedError(
"Attribute not supported for map method calls.")
return None

View File

@ -1,54 +0,0 @@
class TraceEvent:
def __init__(self, timestamp, comm, pid, cpu, flags, message):
"""Represents a parsed trace pipe event"""
self.timestamp = timestamp # float: timestamp in seconds
self.comm = comm # str: command name
self.pid = pid # int: process ID
self.cpu = cpu # int: CPU number
self.flags = flags # str: trace flags
self.message = message # str: the actual message
def __iter__(self):
"""Allow unpacking like the original BCC tuple"""
yield self.comm
yield self.pid
yield self.cpu
yield self.flags
yield self.timestamp
yield self.message
class TraceReader:
def __init__(self, trace_pipe_path="/sys/kernel/debug/tracing/trace_pipe"):
self.trace_pipe_path = trace_pipe_path
self.file = None
def __enter__(self):
self.file = open(self.trace_pipe_path, "r")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
def __iter__(self):
while True:
event = self.trace_fields()
if event:
yield event
def trace_fields(self):
"""Read and parse one line from the trace pipe"""
if not self.file:
self.file = open(self.trace_pipe_path, "r")
line = self.file.readline()
if not line:
return None
# Parse the line into components (simplified)
# Real implementation would need more robust parsing
parts = self._parse_trace_line(line)
return TraceEvent(*parts)
def _parse_trace_line(self, line):
# TODO: Implement
pass

View File

@ -1,20 +0,0 @@
#!/usr/bin/env python3
import argparse, subprocess, os
from pythonbpf import codegen
def main():
parser = argparse.ArgumentParser()
parser.add_argument("source", help="Python BPF program")
args = parser.parse_args()
ll_file = os.path.splitext(args.source)[0] + ".ll"
o_file = os.path.splitext(args.source)[0] + ".o"
print(f"[+] Compiling {args.source}{ll_file}")
codegen.compile_to_ir(args.source, ll_file)
print("[+] Running llc -march=bpf")
subprocess.run(["llc", "-march=bpf", "-filetype=obj", "-O2", ll_file, "-o", o_file], check=True)
if __name__ == "__main__":
main()