change web version

This commit is contained in:
2025-11-28 22:11:41 +05:30
parent 76c982e15e
commit c97efb2570
2 changed files with 1061 additions and 600 deletions

View File

@ -2,13 +2,289 @@
import time
import curses
import webbrowser
import threading
from typing import Optional, List
from data_collection import ContainerDataCollector
from web_dashboard import WebDashboard
def _safe_addstr(stdscr, y: int, x: int, text: str, *args):
"""Safely add string to screen with bounds checking."""
try:
height, width = stdscr.getmaxyx()
if 0 <= y < height and 0 <= x < width:
# Truncate text to fit
max_len = width - x - 1
if max_len > 0:
stdscr.addstr(y, x, text[:max_len], *args)
except curses.error:
pass
def _draw_fancy_header(stdscr, title: str, subtitle: str):
"""Draw a fancy header with title and subtitle."""
height, width = stdscr.getmaxyx()
# Top border
_safe_addstr(stdscr, 0, 0, "" * width, curses.color_pair(6) | curses.A_BOLD)
# Title
_safe_addstr(
stdscr,
0,
max(0, (width - len(title)) // 2),
f" {title} ",
curses.color_pair(6) | curses.A_BOLD,
)
# Subtitle
_safe_addstr(
stdscr,
1,
max(0, (width - len(subtitle)) // 2),
subtitle,
curses.color_pair(1),
)
# Bottom border
_safe_addstr(stdscr, 2, 0, "" * width, curses.color_pair(6))
def _draw_metric_box(
stdscr,
y: int,
x: int,
width: int,
label: str,
value: str,
detail: str,
color_pair: int,
):
"""Draw a fancy box for displaying a metric."""
height, _ = stdscr.getmaxyx()
if y + 4 >= height:
return
# Top border
_safe_addstr(
stdscr, y, x, "" + "" * (width - 2) + "", color_pair | curses.A_BOLD
)
# Label
_safe_addstr(stdscr, y + 1, x, "", color_pair | curses.A_BOLD)
_safe_addstr(stdscr, y + 1, x + 2, label, color_pair | curses.A_BOLD)
_safe_addstr(stdscr, y + 1, x + width - 1, "", color_pair | curses.A_BOLD)
# Value
_safe_addstr(stdscr, y + 2, x, "", color_pair | curses.A_BOLD)
_safe_addstr(stdscr, y + 2, x + 4, value, curses.color_pair(2) | curses.A_BOLD)
_safe_addstr(
stdscr,
y + 2,
min(x + width - len(detail) - 3, x + width - 2),
detail,
color_pair | curses.A_BOLD,
)
_safe_addstr(stdscr, y + 2, x + width - 1, "", color_pair | curses.A_BOLD)
# Bottom border
_safe_addstr(
stdscr, y + 3, x, "" + "" * (width - 2) + "", color_pair | curses.A_BOLD
)
def _draw_section_header(stdscr, y: int, title: str, color_pair: int):
"""Draw a section header."""
height, width = stdscr.getmaxyx()
if y >= height:
return
_safe_addstr(stdscr, y, 2, title, curses.color_pair(color_pair) | curses.A_BOLD)
_safe_addstr(
stdscr,
y,
len(title) + 3,
"" * (width - len(title) - 5),
curses.color_pair(color_pair) | curses.A_BOLD,
)
def _calculate_rates(history: List) -> dict:
"""Calculate per-second rates from history."""
if len(history) < 2:
return {
"syscalls_per_sec": 0.0,
"rx_bytes_per_sec": 0.0,
"tx_bytes_per_sec": 0.0,
"rx_pkts_per_sec": 0.0,
"tx_pkts_per_sec": 0.0,
"read_bytes_per_sec": 0.0,
"write_bytes_per_sec": 0.0,
"read_ops_per_sec": 0.0,
"write_ops_per_sec": 0.0,
}
# Calculate delta between last two samples
recent = history[-1]
previous = history[-2]
time_delta = recent.timestamp - previous.timestamp
if time_delta <= 0:
time_delta = 1.0
return {
"syscalls_per_sec": (recent.syscall_count - previous.syscall_count)
/ time_delta,
"rx_bytes_per_sec": (recent.rx_bytes - previous.rx_bytes) / time_delta,
"tx_bytes_per_sec": (recent.tx_bytes - previous.tx_bytes) / time_delta,
"rx_pkts_per_sec": (recent.rx_packets - previous.rx_packets) / time_delta,
"tx_pkts_per_sec": (recent.tx_packets - previous.tx_packets) / time_delta,
"read_bytes_per_sec": (recent.read_bytes - previous.read_bytes) / time_delta,
"write_bytes_per_sec": (recent.write_bytes - previous.write_bytes) / time_delta,
"read_ops_per_sec": (recent.read_ops - previous.read_ops) / time_delta,
"write_ops_per_sec": (recent.write_ops - previous.write_ops) / time_delta,
}
def _format_bytes(bytes_val: float) -> str:
"""Format bytes into human-readable string."""
if bytes_val < 0:
bytes_val = 0
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_val < 1024.0:
return f"{bytes_val:.1f}{unit}"
bytes_val /= 1024.0
return f"{bytes_val:.1f}PB"
def _draw_bar_graph_enhanced(
stdscr,
y: int,
x: int,
width: int,
height: int,
data: List[float],
color_pair: int,
):
"""Draw an enhanced bar graph with axis and scale."""
screen_height, screen_width = stdscr.getmaxyx()
if not data or width < 2 or y + height >= screen_height:
return
# Calculate statistics
max_val = max(data) if max(data) > 0 else 1
min_val = min(data)
avg_val = sum(data) / len(data)
# Take last 'width - 12' data points (leave room for Y-axis)
graph_width = max(1, width - 12)
recent_data = data[-graph_width:] if len(data) > graph_width else data
# Draw Y-axis labels (with bounds checking)
if y < screen_height:
_safe_addstr(
stdscr, y, x, f"{_format_bytes(max_val):>9}", curses.color_pair(7)
)
if y + height // 2 < screen_height:
_safe_addstr(
stdscr,
y + height // 2,
x,
f"{_format_bytes(avg_val):>9}",
curses.color_pair(7),
)
if y + height - 1 < screen_height:
_safe_addstr(
stdscr,
y + height - 1,
x,
f"{_format_bytes(min_val):>9}",
curses.color_pair(7),
)
# Draw bars
for row in range(height):
if y + row >= screen_height:
break
threshold = (height - row) / height
bar_line = ""
for val in recent_data:
normalized = val / max_val if max_val > 0 else 0
if normalized >= threshold:
bar_line += ""
elif normalized >= threshold - 0.15:
bar_line += ""
elif normalized >= threshold - 0.35:
bar_line += ""
elif normalized >= threshold - 0.5:
bar_line += ""
else:
bar_line += " "
_safe_addstr(stdscr, y + row, x + 11, bar_line, color_pair)
# Draw X-axis
if y + height < screen_height:
_safe_addstr(
stdscr,
y + height,
x + 10,
"" + "" * len(recent_data),
curses.color_pair(7),
)
_safe_addstr(
stdscr,
y + height,
x + 10 + len(recent_data),
"→ time",
curses.color_pair(7),
)
def _draw_labeled_graph(
stdscr,
y: int,
x: int,
width: int,
height: int,
label: str,
rate: str,
detail: str,
data: List[float],
color_pair: int,
description: str,
):
"""Draw a graph with labels and legend."""
screen_height, screen_width = stdscr.getmaxyx()
if y >= screen_height or y + height + 2 >= screen_height:
return
# Header with metrics
_safe_addstr(stdscr, y, x, label, curses.color_pair(1) | curses.A_BOLD)
_safe_addstr(stdscr, y, x + len(label) + 2, rate, curses.color_pair(2))
_safe_addstr(
stdscr, y, x + len(label) + len(rate) + 4, detail, curses.color_pair(7)
)
# Draw the graph
if len(data) > 1:
_draw_bar_graph_enhanced(stdscr, y + 1, x, width, height, data, color_pair)
else:
_safe_addstr(stdscr, y + 2, x + 2, "Collecting data...", curses.color_pair(7))
# Graph legend
if y + height + 1 < screen_height:
_safe_addstr(
stdscr, y + height + 1, x, f"└─ {description}", curses.color_pair(7)
)
class ContainerMonitorTUI:
"""TUI for container monitoring with cgroup selection and live graphs."""
@ -51,13 +327,15 @@ class ContainerMonitorTUI:
# Check minimum terminal size
if height < 25 or width < 80:
msg = "Terminal too small! Minimum: 80x25"
msg = "Terminal too small! Minimum: 80x25"
stdscr.attron(curses.color_pair(4) | curses.A_BOLD)
stdscr.addstr(height // 2, max(0, (width - len(msg)) // 2), msg[:width-1])
stdscr.addstr(
height // 2, max(0, (width - len(msg)) // 2), msg[: width - 1]
)
stdscr.attroff(curses.color_pair(4) | curses.A_BOLD)
stdscr.refresh()
key = stdscr.getch()
if key == ord('q') or key == ord('Q'):
if key == ord("q") or key == ord("Q"):
break
continue
@ -76,51 +354,48 @@ class ContainerMonitorTUI:
except KeyboardInterrupt:
break
except curses.error as e:
except curses.error:
# Curses error - likely terminal too small, just continue
pass
except Exception as e:
# Show error briefly
try:
height, width = stdscr.getmaxyx()
error_msg = f"Error: {str(e)[:width-10]}"
stdscr.addstr(0, 0, error_msg[:width-1])
stdscr.refresh()
time.sleep(1)
except:
pass
def _safe_addstr(self, stdscr, y: int, x: int, text: str, *args):
"""Safely add string to screen with bounds checking."""
try:
height, width = stdscr.getmaxyx()
if 0 <= y < height and 0 <= x < width:
# Truncate text to fit
max_len = width - x - 1
if max_len > 0:
stdscr.addstr(y, x, text[:max_len], *args)
except curses.error:
pass
height, width = stdscr.getmaxyx()
error_msg = f"Error: {str(e)[: width - 10]}"
stdscr.addstr(0, 0, error_msg[: width - 1])
stdscr.refresh()
time.sleep(1)
def _draw_selection_screen(self, stdscr):
"""Draw the cgroup selection screen."""
height, width = stdscr.getmaxyx()
# Draw fancy header box
self._draw_fancy_header(stdscr, "🐳 CONTAINER MONITOR", "Select a Cgroup to Monitor")
_draw_fancy_header(stdscr, "🐳 CONTAINER MONITOR", "Select a Cgroup to Monitor")
# Instructions
instructions = "↑↓: Navigate | ENTER: Select | w: Web Mode | q: Quit | r: Refresh"
self._safe_addstr(stdscr, 3, max(0, (width - len(instructions)) // 2),
instructions, curses.color_pair(3))
instructions = (
"↑↓: Navigate | ENTER: Select | w: Web Mode | q: Quit | r: Refresh"
)
_safe_addstr(
stdscr,
3,
max(0, (width - len(instructions)) // 2),
instructions,
curses.color_pair(3),
)
# Get cgroups
cgroups = self.collector.get_all_cgroups()
if not cgroups:
msg = "No cgroups found. Waiting for activity..."
self._safe_addstr(stdscr, height // 2, max(0, (width - len(msg)) // 2),
msg, curses.color_pair(4))
_safe_addstr(
stdscr,
height // 2,
max(0, (width - len(msg)) // 2),
msg,
curses.color_pair(4),
)
return
# Sort cgroups by name
@ -139,46 +414,76 @@ class ContainerMonitorTUI:
elif self.selected_index >= self.scroll_offset + list_height:
self.scroll_offset = self.selected_index - list_height + 1
# Calculate max name length and ID width for alignment
max_name_len = min(50, max(len(cg.name) for cg in cgroups))
max_id_len = max(len(str(cg.id)) for cg in cgroups)
# Draw cgroup list with fancy borders
start_y = 5
self._safe_addstr(stdscr, start_y, 2, "" + "" * (width - 6) + "",
curses.color_pair(1))
_safe_addstr(
stdscr, start_y, 2, "" + "" * (width - 6) + "", curses.color_pair(1)
)
# Header row
header = f" {'CGROUP NAME':<{max_name_len}}{'ID':>{max_id_len}} "
_safe_addstr(stdscr, start_y + 1, 2, "", curses.color_pair(1))
_safe_addstr(
stdscr, start_y + 1, 3, header, curses.color_pair(1) | curses.A_BOLD
)
_safe_addstr(stdscr, start_y + 1, width - 3, "", curses.color_pair(1))
# Separator
_safe_addstr(
stdscr, start_y + 2, 2, "" + "" * (width - 6) + "", curses.color_pair(1)
)
for i in range(list_height):
idx = self.scroll_offset + i
y = start_y + 1 + i
y = start_y + 3 + i
if y >= height - 2:
break
self._safe_addstr(stdscr, y, 2, "", curses.color_pair(1))
self._safe_addstr(stdscr, y, width - 3, "", curses.color_pair(1))
_safe_addstr(stdscr, y, 2, "", curses.color_pair(1))
_safe_addstr(stdscr, y, width - 3, "", curses.color_pair(1))
if idx >= len(cgroups):
continue
cgroup = cgroups[idx]
# Truncate name if too long
display_name = (
cgroup.name
if len(cgroup.name) <= max_name_len
else cgroup.name[: max_name_len - 3] + "..."
)
if idx == self.selected_index:
# Highlight selected
line = f"{cgroup.name:<35} ID: {cgroup.id} "
self._safe_addstr(stdscr, y, 3, line,
curses.color_pair(8) | curses.A_BOLD)
# Highlight selected with proper alignment
line = f"{display_name:<{max_name_len}}{cgroup.id:>{max_id_len}} "
_safe_addstr(stdscr, y, 3, line, curses.color_pair(8) | curses.A_BOLD)
else:
line = f" {cgroup.name:<35} ID: {cgroup.id} "
self._safe_addstr(stdscr, y, 3, line, curses.color_pair(7))
line = f" {display_name:<{max_name_len}}{cgroup.id:>{max_id_len}} "
_safe_addstr(stdscr, y, 3, line, curses.color_pair(7))
# Bottom border
bottom_y = min(start_y + 1 + list_height, height - 3)
self._safe_addstr(stdscr, bottom_y, 2, "" + "" * (width - 6) + "",
curses.color_pair(1))
bottom_y = min(start_y + 3 + list_height, height - 3)
_safe_addstr(
stdscr, bottom_y, 2, "" + "" * (width - 6) + "", curses.color_pair(1)
)
# Footer
footer = f"Total: {len(cgroups)} cgroups"
if len(cgroups) > list_height:
footer += f" │ Showing {self.scroll_offset + 1}-{min(self.scroll_offset + list_height, len(cgroups))}"
self._safe_addstr(stdscr, height - 2, max(0, (width - len(footer)) // 2),
footer, curses.color_pair(1))
_safe_addstr(
stdscr,
height - 2,
max(0, (width - len(footer)) // 2),
footer,
curses.color_pair(1),
)
def _draw_monitoring_screen(self, stdscr):
"""Draw the monitoring screen for selected cgroup."""
@ -192,296 +497,130 @@ class ContainerMonitorTUI:
history = self.collector.get_history(self.selected_cgroup)
# Draw fancy header
self._draw_fancy_header(stdscr, f"📊 {stats.cgroup_name[:40]}", "Live Performance Metrics")
_draw_fancy_header(
stdscr, f"📊 {stats.cgroup_name[:40]}", "Live Performance Metrics"
)
# Instructions
instructions = "ESC/b: Back to List | w: Web Mode | q: Quit"
self._safe_addstr(stdscr, 3, max(0, (width - len(instructions)) // 2),
instructions, curses.color_pair(3))
_safe_addstr(
stdscr,
3,
max(0, (width - len(instructions)) // 2),
instructions,
curses.color_pair(3),
)
# Calculate metrics for rate display
rates = self._calculate_rates(history)
rates = _calculate_rates(history)
y = 5
# Syscall count in a fancy box
if y + 4 < height:
self._draw_metric_box(
stdscr, y, 2, min(width - 4, 80),
_draw_metric_box(
stdscr,
y,
2,
min(width - 4, 80),
"⚡ SYSTEM CALLS",
f"{stats.syscall_count:,}",
f"Rate: {rates['syscalls_per_sec']:.1f}/sec",
curses.color_pair(5)
curses.color_pair(5),
)
y += 4
# Network I/O Section
if y + 8 < height:
self._draw_section_header(stdscr, y, "🌐 NETWORK I/O", 1)
_draw_section_header(stdscr, y, "🌐 NETWORK I/O", 1)
y += 1
# RX graph
rx_label = f"RX: {self._format_bytes(stats.rx_bytes)}"
rx_rate = f"{self._format_bytes(rates['rx_bytes_per_sec'])}/s"
rx_label = f"RX: {_format_bytes(stats.rx_bytes)}"
rx_rate = f"{_format_bytes(rates['rx_bytes_per_sec'])}/s"
rx_pkts = f"{stats.rx_packets:,} pkts ({rates['rx_pkts_per_sec']:.1f}/s)"
self._draw_labeled_graph(
stdscr, y, 2, width - 4, 4,
rx_label, rx_rate, rx_pkts,
_draw_labeled_graph(
stdscr,
y,
2,
width - 4,
4,
rx_label,
rx_rate,
rx_pkts,
[s.rx_bytes for s in history],
curses.color_pair(2),
"Received Traffic (last 100 samples)"
"Received Traffic (last 100 samples)",
)
y += 6
# TX graph
if y + 8 < height:
tx_label = f"TX: {self._format_bytes(stats.tx_bytes)}"
tx_rate = f"{self._format_bytes(rates['tx_bytes_per_sec'])}/s"
tx_label = f"TX: {_format_bytes(stats.tx_bytes)}"
tx_rate = f"{_format_bytes(rates['tx_bytes_per_sec'])}/s"
tx_pkts = f"{stats.tx_packets:,} pkts ({rates['tx_pkts_per_sec']:.1f}/s)"
self._draw_labeled_graph(
stdscr, y, 2, width - 4, 4,
tx_label, tx_rate, tx_pkts,
_draw_labeled_graph(
stdscr,
y,
2,
width - 4,
4,
tx_label,
tx_rate,
tx_pkts,
[s.tx_bytes for s in history],
curses.color_pair(3),
"Transmitted Traffic (last 100 samples)"
"Transmitted Traffic (last 100 samples)",
)
y += 6
# File I/O Section
if y + 8 < height:
self._draw_section_header(stdscr, y, "💾 FILE I/O", 1)
_draw_section_header(stdscr, y, "💾 FILE I/O", 1)
y += 1
# Read graph
read_label = f"READ: {self._format_bytes(stats.read_bytes)}"
read_rate = f"{self._format_bytes(rates['read_bytes_per_sec'])}/s"
read_label = f"READ: {_format_bytes(stats.read_bytes)}"
read_rate = f"{_format_bytes(rates['read_bytes_per_sec'])}/s"
read_ops = f"{stats.read_ops:,} ops ({rates['read_ops_per_sec']:.1f}/s)"
self._draw_labeled_graph(
stdscr, y, 2, width - 4, 4,
read_label, read_rate, read_ops,
_draw_labeled_graph(
stdscr,
y,
2,
width - 4,
4,
read_label,
read_rate,
read_ops,
[s.read_bytes for s in history],
curses.color_pair(4),
"Read Operations (last 100 samples)"
"Read Operations (last 100 samples)",
)
y += 6
# Write graph
if y + 8 < height:
write_label = f"WRITE: {self._format_bytes(stats.write_bytes)}"
write_rate = f"{self._format_bytes(rates['write_bytes_per_sec'])}/s"
write_label = f"WRITE: {_format_bytes(stats.write_bytes)}"
write_rate = f"{_format_bytes(rates['write_bytes_per_sec'])}/s"
write_ops = f"{stats.write_ops:,} ops ({rates['write_ops_per_sec']:.1f}/s)"
self._draw_labeled_graph(
stdscr, y, 2, width - 4, 4,
write_label, write_rate, write_ops,
_draw_labeled_graph(
stdscr,
y,
2,
width - 4,
4,
write_label,
write_rate,
write_ops,
[s.write_bytes for s in history],
curses.color_pair(5),
"Write Operations (last 100 samples)"
"Write Operations (last 100 samples)",
)
def _draw_fancy_header(self, stdscr, title: str, subtitle: str):
"""Draw a fancy header with title and subtitle."""
height, width = stdscr.getmaxyx()
# Top border
self._safe_addstr(stdscr, 0, 0, "" * width, curses.color_pair(6) | curses.A_BOLD)
# Title
self._safe_addstr(stdscr, 0, max(0, (width - len(title)) // 2), f" {title} ",
curses.color_pair(6) | curses.A_BOLD)
# Subtitle
self._safe_addstr(stdscr, 1, max(0, (width - len(subtitle)) // 2), subtitle,
curses.color_pair(1))
# Bottom border
self._safe_addstr(stdscr, 2, 0, "" * width, curses.color_pair(6))
def _draw_metric_box(self, stdscr, y: int, x: int, width: int,
label: str, value: str, detail: str, color_pair: int):
"""Draw a fancy box for displaying a metric."""
height, _ = stdscr.getmaxyx()
if y + 4 >= height:
return
# Top border
self._safe_addstr(stdscr, y, x, "" + "" * (width - 2) + "",
color_pair | curses.A_BOLD)
# Label
self._safe_addstr(stdscr, y + 1, x, "", color_pair | curses.A_BOLD)
self._safe_addstr(stdscr, y + 1, x + 2, label, color_pair | curses.A_BOLD)
self._safe_addstr(stdscr, y + 1, x + width - 1, "", color_pair | curses.A_BOLD)
# Value
self._safe_addstr(stdscr, y + 2, x, "", color_pair | curses.A_BOLD)
self._safe_addstr(stdscr, y + 2, x + 4, value, curses.color_pair(2) | curses.A_BOLD)
self._safe_addstr(stdscr, y + 2, min(x + width - len(detail) - 3, x + width - 2), detail,
color_pair | curses.A_BOLD)
self._safe_addstr(stdscr, y + 2, x + width - 1, "", color_pair | curses.A_BOLD)
# Bottom border
self._safe_addstr(stdscr, y + 3, x, "" + "" * (width - 2) + "",
color_pair | curses.A_BOLD)
def _draw_section_header(self, stdscr, y: int, title: str, color_pair: int):
"""Draw a section header."""
height, width = stdscr.getmaxyx()
if y >= height:
return
self._safe_addstr(stdscr, y, 2, title, curses.color_pair(color_pair) | curses.A_BOLD)
self._safe_addstr(stdscr, y, len(title) + 3, "" * (width - len(title) - 5),
curses.color_pair(color_pair) | curses.A_BOLD)
def _draw_labeled_graph(
self, stdscr, y: int, x: int, width: int, height: int,
label: str, rate: str, detail: str,
data: List[float], color_pair: int, description: str
):
"""Draw a graph with labels and legend."""
screen_height, screen_width = stdscr.getmaxyx()
if y >= screen_height or y + height + 2 >= screen_height:
return
# Header with metrics
self._safe_addstr(stdscr, y, x, label, curses.color_pair(1) | curses.A_BOLD)
self._safe_addstr(stdscr, y, x + len(label) + 2, rate, curses.color_pair(2))
self._safe_addstr(stdscr, y, x + len(label) + len(rate) + 4, detail,
curses.color_pair(7))
# Draw the graph
if len(data) > 1:
self._draw_bar_graph_enhanced(
stdscr, y + 1, x, width, height,
data, color_pair
)
else:
self._safe_addstr(stdscr, y + 2, x + 2, "Collecting data...",
curses.color_pair(7))
# Graph legend
if y + height + 1 < screen_height:
self._safe_addstr(stdscr, y + height + 1, x, f"└─ {description}",
curses.color_pair(7))
def _draw_bar_graph_enhanced(
self,
stdscr,
y: int,
x: int,
width: int,
height: int,
data: List[float],
color_pair: int,
):
"""Draw an enhanced bar graph with axis and scale."""
screen_height, screen_width = stdscr.getmaxyx()
if not data or width < 2 or y + height >= screen_height:
return
# Calculate statistics
max_val = max(data) if max(data) > 0 else 1
min_val = min(data)
avg_val = sum(data) / len(data)
# Take last 'width - 12' data points (leave room for Y-axis)
graph_width = max(1, width - 12)
recent_data = data[-graph_width:] if len(data) > graph_width else data
# Draw Y-axis labels (with bounds checking)
if y < screen_height:
self._safe_addstr(stdscr, y, x, f"{self._format_bytes(max_val):>9}",
curses.color_pair(7))
if y + height // 2 < screen_height:
self._safe_addstr(stdscr, y + height // 2, x, f"{self._format_bytes(avg_val):>9}",
curses.color_pair(7))
if y + height - 1 < screen_height:
self._safe_addstr(stdscr, y + height - 1, x, f"{self._format_bytes(min_val):>9}",
curses.color_pair(7))
# Draw bars
for row in range(height):
if y + row >= screen_height:
break
threshold = (height - row) / height
bar_line = ""
for val in recent_data:
normalized = val / max_val if max_val > 0 else 0
if normalized >= threshold:
bar_line += ""
elif normalized >= threshold - 0.15:
bar_line += ""
elif normalized >= threshold - 0.35:
bar_line += ""
elif normalized >= threshold - 0.5:
bar_line += ""
else:
bar_line += " "
self._safe_addstr(stdscr, y + row, x + 11, bar_line, color_pair)
# Draw X-axis
if y + height < screen_height:
self._safe_addstr(stdscr, y + height, x + 10, "" + "" * len(recent_data),
curses.color_pair(7))
self._safe_addstr(stdscr, y + height, x + 10 + len(recent_data), "→ time",
curses.color_pair(7))
def _calculate_rates(self, history: List) -> dict:
"""Calculate per-second rates from history."""
if len(history) < 2:
return {
'syscalls_per_sec': 0.0,
'rx_bytes_per_sec': 0.0,
'tx_bytes_per_sec': 0.0,
'rx_pkts_per_sec': 0.0,
'tx_pkts_per_sec': 0.0,
'read_bytes_per_sec': 0.0,
'write_bytes_per_sec': 0.0,
'read_ops_per_sec': 0.0,
'write_ops_per_sec': 0.0,
}
# Calculate delta between last two samples
recent = history[-1]
previous = history[-2]
time_delta = recent.timestamp - previous.timestamp
if time_delta <= 0:
time_delta = 1.0
return {
'syscalls_per_sec': (recent.syscall_count - previous.syscall_count) / time_delta,
'rx_bytes_per_sec': (recent.rx_bytes - previous.rx_bytes) / time_delta,
'tx_bytes_per_sec': (recent.tx_bytes - previous.tx_bytes) / time_delta,
'rx_pkts_per_sec': (recent.rx_packets - previous.rx_packets) / time_delta,
'tx_pkts_per_sec': (recent.tx_packets - previous.tx_packets) / time_delta,
'read_bytes_per_sec': (recent.read_bytes - previous.read_bytes) / time_delta,
'write_bytes_per_sec': (recent.write_bytes - previous.write_bytes) / time_delta,
'read_ops_per_sec': (recent.read_ops - previous.read_ops) / time_delta,
'write_ops_per_sec': (recent.write_ops - previous.write_ops) / time_delta,
}
def _format_bytes(self, bytes_val: float) -> str:
"""Format bytes into human-readable string."""
if bytes_val < 0:
bytes_val = 0
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_val < 1024.0:
return f"{bytes_val:.1f}{unit}"
bytes_val /= 1024.0
return f"{bytes_val:.1f}PB"
def _launch_web_mode(self, stdscr):
"""Launch web dashboard mode."""
height, width = stdscr.getmaxyx()
@ -490,50 +629,72 @@ class ContainerMonitorTUI:
stdscr.clear()
msg1 = "🌐 LAUNCHING WEB DASHBOARD"
self._safe_addstr(stdscr, height // 2 - 2, max(0, (width - len(msg1)) // 2), msg1,
curses.color_pair(6) | curses.A_BOLD)
_safe_addstr(
stdscr,
height // 2 - 2,
max(0, (width - len(msg1)) // 2),
msg1,
curses.color_pair(6) | curses.A_BOLD,
)
msg2 = "Opening browser at http://localhost:8050"
self._safe_addstr(stdscr, height // 2, max(0, (width - len(msg2)) // 2), msg2,
curses.color_pair(2))
msg2 = "Server starting at http://localhost:8050"
_safe_addstr(
stdscr,
height // 2,
max(0, (width - len(msg2)) // 2),
msg2,
curses.color_pair(2),
)
msg3 = "Press 'q' to stop web server and return to TUI"
self._safe_addstr(stdscr, height // 2 + 2, max(0, (width - len(msg3)) // 2), msg3,
curses.color_pair(3))
_safe_addstr(
stdscr,
height // 2 + 2,
max(0, (width - len(msg3)) // 2),
msg3,
curses.color_pair(3),
)
stdscr.refresh()
time.sleep(1)
try:
# Create and start web dashboard
self.web_dashboard = WebDashboard(self.collector, selected_cgroup=self.selected_cgroup)
self.web_dashboard = WebDashboard(
self.collector, selected_cgroup=self.selected_cgroup
)
# Start in background thread
self.web_thread = threading.Thread(target=self.web_dashboard.run, daemon=True)
self.web_thread = threading.Thread(
target=self.web_dashboard.run, daemon=True
)
self.web_thread.start()
time.sleep(2) # Give server more time to start
# Open browser
try:
webbrowser.open('http://localhost:8050')
except Exception as e:
error_msg = f"Could not open browser: {str(e)}"
self._safe_addstr(stdscr, height // 2 + 4, max(0, (width - len(error_msg)) // 2),
error_msg, curses.color_pair(4))
stdscr.refresh()
time.sleep(2)
time.sleep(2) # Give server time to start
# Wait for user to press 'q' to return
msg4 = "Web dashboard running! Press 'q' to return to TUI"
self._safe_addstr(stdscr, height // 2 + 4, max(0, (width - len(msg4)) // 2), msg4,
curses.color_pair(1) | curses.A_BOLD)
msg4 = "Web dashboard running at http://localhost:8050"
msg5 = "Press 'q' to return to TUI"
_safe_addstr(
stdscr,
height // 2 + 4,
max(0, (width - len(msg4)) // 2),
msg4,
curses.color_pair(1) | curses.A_BOLD,
)
_safe_addstr(
stdscr,
height // 2 + 5,
max(0, (width - len(msg5)) // 2),
msg5,
curses.color_pair(3) | curses.A_BOLD,
)
stdscr.refresh()
stdscr.nodelay(False) # Blocking mode
while True:
key = stdscr.getch()
if key == ord('q') or key == ord('Q'):
if key == ord("q") or key == ord("Q"):
break
# Stop web server
@ -542,8 +703,13 @@ class ContainerMonitorTUI:
except Exception as e:
error_msg = f"Error starting web dashboard: {str(e)}"
self._safe_addstr(stdscr, height // 2 + 4, max(0, (width - len(error_msg)) // 2),
error_msg, curses.color_pair(4))
_safe_addstr(
stdscr,
height // 2 + 4,
max(0, (width - len(error_msg)) // 2),
error_msg,
curses.color_pair(4),
)
stdscr.refresh()
time.sleep(3)

File diff suppressed because it is too large Load Diff