"""Terminal User Interface for container monitoring.""" import time import curses import threading from typing import Optional, List from data_collection import ContainerDataCollector from web_dashboard import WebDashboard def _safe_addstr(stdscr, y: int, x: int, text: str, *args): """Safely add string to screen with bounds checking.""" try: height, width = stdscr.getmaxyx() if 0 <= y < height and 0 <= x < width: # Truncate text to fit max_len = width - x - 1 if max_len > 0: stdscr.addstr(y, x, text[:max_len], *args) except curses.error: pass def _draw_fancy_header(stdscr, title: str, subtitle: str): """Draw a fancy header with title and subtitle.""" height, width = stdscr.getmaxyx() # Top border _safe_addstr(stdscr, 0, 0, "═" * width, curses.color_pair(6) | curses.A_BOLD) # Title _safe_addstr( stdscr, 0, max(0, (width - len(title)) // 2), f" {title} ", curses.color_pair(6) | curses.A_BOLD, ) # Subtitle _safe_addstr( stdscr, 1, max(0, (width - len(subtitle)) // 2), subtitle, curses.color_pair(1), ) # Bottom border _safe_addstr(stdscr, 2, 0, "═" * width, curses.color_pair(6)) def _draw_metric_box( stdscr, y: int, x: int, width: int, label: str, value: str, detail: str, color_pair: int, ): """Draw a fancy box for displaying a metric.""" height, _ = stdscr.getmaxyx() if y + 4 >= height: return # Top border _safe_addstr( stdscr, y, x, "┌" + "─" * (width - 2) + "┐", color_pair | curses.A_BOLD ) # Label _safe_addstr(stdscr, y + 1, x, "│", color_pair | curses.A_BOLD) _safe_addstr(stdscr, y + 1, x + 2, label, color_pair | curses.A_BOLD) _safe_addstr(stdscr, y + 1, x + width - 1, "│", color_pair | curses.A_BOLD) # Value _safe_addstr(stdscr, y + 2, x, "│", color_pair | curses.A_BOLD) _safe_addstr(stdscr, y + 2, x + 4, value, curses.color_pair(2) | curses.A_BOLD) _safe_addstr( stdscr, y + 2, min(x + width - len(detail) - 3, x + width - 2), detail, color_pair | curses.A_BOLD, ) _safe_addstr(stdscr, y + 2, x + width - 1, "│", color_pair | curses.A_BOLD) # Bottom border _safe_addstr( stdscr, y + 3, x, "└" + "─" * (width - 2) + "┘", color_pair | curses.A_BOLD ) def _draw_section_header(stdscr, y: int, title: str, color_pair: int): """Draw a section header.""" height, width = stdscr.getmaxyx() if y >= height: return _safe_addstr(stdscr, y, 2, title, curses.color_pair(color_pair) | curses.A_BOLD) _safe_addstr( stdscr, y, len(title) + 3, "─" * (width - len(title) - 5), curses.color_pair(color_pair) | curses.A_BOLD, ) def _calculate_rates(history: List) -> dict: """Calculate per-second rates from history.""" if len(history) < 2: return { "syscalls_per_sec": 0.0, "rx_bytes_per_sec": 0.0, "tx_bytes_per_sec": 0.0, "rx_pkts_per_sec": 0.0, "tx_pkts_per_sec": 0.0, "read_bytes_per_sec": 0.0, "write_bytes_per_sec": 0.0, "read_ops_per_sec": 0.0, "write_ops_per_sec": 0.0, } # Calculate delta between last two samples recent = history[-1] previous = history[-2] time_delta = recent.timestamp - previous.timestamp if time_delta <= 0: time_delta = 1.0 return { "syscalls_per_sec": (recent.syscall_count - previous.syscall_count) / time_delta, "rx_bytes_per_sec": (recent.rx_bytes - previous.rx_bytes) / time_delta, "tx_bytes_per_sec": (recent.tx_bytes - previous.tx_bytes) / time_delta, "rx_pkts_per_sec": (recent.rx_packets - previous.rx_packets) / time_delta, "tx_pkts_per_sec": (recent.tx_packets - previous.tx_packets) / time_delta, "read_bytes_per_sec": (recent.read_bytes - previous.read_bytes) / time_delta, "write_bytes_per_sec": (recent.write_bytes - previous.write_bytes) / time_delta, "read_ops_per_sec": (recent.read_ops - previous.read_ops) / time_delta, "write_ops_per_sec": (recent.write_ops - previous.write_ops) / time_delta, } def _format_bytes(bytes_val: float) -> str: """Format bytes into human-readable string.""" if bytes_val < 0: bytes_val = 0 for unit in ["B", "KB", "MB", "GB", "TB"]: if bytes_val < 1024.0: return f"{bytes_val:.1f}{unit}" bytes_val /= 1024.0 return f"{bytes_val:.1f}PB" def _draw_bar_graph_enhanced( stdscr, y: int, x: int, width: int, height: int, data: List[float], color_pair: int, ): """Draw an enhanced bar graph with axis and scale.""" screen_height, screen_width = stdscr.getmaxyx() if not data or width < 2 or y + height >= screen_height: return # Calculate statistics max_val = max(data) if max(data) > 0 else 1 min_val = min(data) avg_val = sum(data) / len(data) # Take last 'width - 12' data points (leave room for Y-axis) graph_width = max(1, width - 12) recent_data = data[-graph_width:] if len(data) > graph_width else data # Draw Y-axis labels (with bounds checking) if y < screen_height: _safe_addstr( stdscr, y, x, f"│{_format_bytes(max_val):>9}", curses.color_pair(7) ) if y + height // 2 < screen_height: _safe_addstr( stdscr, y + height // 2, x, f"│{_format_bytes(avg_val):>9}", curses.color_pair(7), ) if y + height - 1 < screen_height: _safe_addstr( stdscr, y + height - 1, x, f"│{_format_bytes(min_val):>9}", curses.color_pair(7), ) # Draw bars for row in range(height): if y + row >= screen_height: break threshold = (height - row) / height bar_line = "" for val in recent_data: normalized = val / max_val if max_val > 0 else 0 if normalized >= threshold: bar_line += "█" elif normalized >= threshold - 0.15: bar_line += "▓" elif normalized >= threshold - 0.35: bar_line += "▒" elif normalized >= threshold - 0.5: bar_line += "░" else: bar_line += " " _safe_addstr(stdscr, y + row, x + 11, bar_line, color_pair) # Draw X-axis if y + height < screen_height: _safe_addstr( stdscr, y + height, x + 10, "├" + "─" * len(recent_data), curses.color_pair(7), ) _safe_addstr( stdscr, y + height, x + 10 + len(recent_data), "→ time", curses.color_pair(7), ) def _draw_labeled_graph( stdscr, y: int, x: int, width: int, height: int, label: str, rate: str, detail: str, data: List[float], color_pair: int, description: str, ): """Draw a graph with labels and legend.""" screen_height, screen_width = stdscr.getmaxyx() if y >= screen_height or y + height + 2 >= screen_height: return # Header with metrics _safe_addstr(stdscr, y, x, label, curses.color_pair(1) | curses.A_BOLD) _safe_addstr(stdscr, y, x + len(label) + 2, rate, curses.color_pair(2)) _safe_addstr( stdscr, y, x + len(label) + len(rate) + 4, detail, curses.color_pair(7) ) # Draw the graph if len(data) > 1: _draw_bar_graph_enhanced(stdscr, y + 1, x, width, height, data, color_pair) else: _safe_addstr(stdscr, y + 2, x + 2, "Collecting data...", curses.color_pair(7)) # Graph legend if y + height + 1 < screen_height: _safe_addstr( stdscr, y + height + 1, x, f"└─ {description}", curses.color_pair(7) ) class ContainerMonitorTUI: """TUI for container monitoring with cgroup selection and live graphs.""" def __init__(self, collector: ContainerDataCollector): self.collector = collector self.selected_cgroup: Optional[int] = None self.current_screen = "selection" # "selection" or "monitoring" self.selected_index = 0 self.scroll_offset = 0 self.web_dashboard = None self.web_thread = None def run(self): """Run the TUI application.""" curses.wrapper(self._main_loop) def _main_loop(self, stdscr): """Main curses loop.""" # Configure curses curses.curs_set(0) # Hide cursor stdscr.nodelay(True) # Non-blocking input stdscr.timeout(100) # Refresh every 100ms # Initialize colors curses.start_color() curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) curses.init_pair(4, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK) curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE) curses.init_pair(7, curses.COLOR_BLUE, curses.COLOR_BLACK) curses.init_pair(8, curses.COLOR_WHITE, curses.COLOR_CYAN) while True: stdscr.clear() try: height, width = stdscr.getmaxyx() # Check minimum terminal size if height < 25 or width < 80: msg = "Terminal too small! Minimum: 80x25" stdscr.attron(curses.color_pair(4) | curses.A_BOLD) stdscr.addstr( height // 2, max(0, (width - len(msg)) // 2), msg[: width - 1] ) stdscr.attroff(curses.color_pair(4) | curses.A_BOLD) stdscr.refresh() key = stdscr.getch() if key == ord("q") or key == ord("Q"): break continue if self.current_screen == "selection": self._draw_selection_screen(stdscr) elif self.current_screen == "monitoring": self._draw_monitoring_screen(stdscr) stdscr.refresh() # Handle input key = stdscr.getch() if key != -1: if not self._handle_input(key, stdscr): break # Exit requested except KeyboardInterrupt: break except curses.error: # Curses error - likely terminal too small, just continue pass except Exception as e: # Show error briefly height, width = stdscr.getmaxyx() error_msg = f"Error: {str(e)[: width - 10]}" stdscr.addstr(0, 0, error_msg[: width - 1]) stdscr.refresh() time.sleep(1) def _draw_selection_screen(self, stdscr): """Draw the cgroup selection screen.""" height, width = stdscr.getmaxyx() # Draw fancy header box _draw_fancy_header(stdscr, "🐳 CONTAINER MONITOR", "Select a Cgroup to Monitor") # Instructions instructions = ( "↑↓: Navigate | ENTER: Select | w: Web Mode | q: Quit | r: Refresh" ) _safe_addstr( stdscr, 3, max(0, (width - len(instructions)) // 2), instructions, curses.color_pair(3), ) # Get cgroups cgroups = self.collector.get_all_cgroups() if not cgroups: msg = "No cgroups found. Waiting for activity..." _safe_addstr( stdscr, height // 2, max(0, (width - len(msg)) // 2), msg, curses.color_pair(4), ) return # Sort cgroups by name cgroups.sort(key=lambda c: c.name) # Adjust selection bounds if self.selected_index >= len(cgroups): self.selected_index = len(cgroups) - 1 if self.selected_index < 0: self.selected_index = 0 # Calculate visible range list_height = max(1, height - 8) if self.selected_index < self.scroll_offset: self.scroll_offset = self.selected_index elif self.selected_index >= self.scroll_offset + list_height: self.scroll_offset = self.selected_index - list_height + 1 # Calculate max name length and ID width for alignment max_name_len = min(50, max(len(cg.name) for cg in cgroups)) max_id_len = max(len(str(cg.id)) for cg in cgroups) # Draw cgroup list with fancy borders start_y = 5 _safe_addstr( stdscr, start_y, 2, "╔" + "═" * (width - 6) + "╗", curses.color_pair(1) ) # Header row header = f" {'CGROUP NAME':<{max_name_len}} │ {'ID':>{max_id_len}} " _safe_addstr(stdscr, start_y + 1, 2, "║", curses.color_pair(1)) _safe_addstr( stdscr, start_y + 1, 3, header, curses.color_pair(1) | curses.A_BOLD ) _safe_addstr(stdscr, start_y + 1, width - 3, "║", curses.color_pair(1)) # Separator _safe_addstr( stdscr, start_y + 2, 2, "╟" + "─" * (width - 6) + "╢", curses.color_pair(1) ) for i in range(list_height): idx = self.scroll_offset + i y = start_y + 3 + i if y >= height - 2: break _safe_addstr(stdscr, y, 2, "║", curses.color_pair(1)) _safe_addstr(stdscr, y, width - 3, "║", curses.color_pair(1)) if idx >= len(cgroups): continue cgroup = cgroups[idx] # Truncate name if too long display_name = ( cgroup.name if len(cgroup.name) <= max_name_len else cgroup.name[: max_name_len - 3] + "..." ) if idx == self.selected_index: # Highlight selected with proper alignment line = f" ► {display_name:<{max_name_len}} │ {cgroup.id:>{max_id_len}} " _safe_addstr(stdscr, y, 3, line, curses.color_pair(8) | curses.A_BOLD) else: line = f" {display_name:<{max_name_len}} │ {cgroup.id:>{max_id_len}} " _safe_addstr(stdscr, y, 3, line, curses.color_pair(7)) # Bottom border bottom_y = min(start_y + 3 + list_height, height - 3) _safe_addstr( stdscr, bottom_y, 2, "╚" + "═" * (width - 6) + "╝", curses.color_pair(1) ) # Footer footer = f"Total: {len(cgroups)} cgroups" if len(cgroups) > list_height: footer += f" │ Showing {self.scroll_offset + 1}-{min(self.scroll_offset + list_height, len(cgroups))}" _safe_addstr( stdscr, height - 2, max(0, (width - len(footer)) // 2), footer, curses.color_pair(1), ) def _draw_monitoring_screen(self, stdscr): """Draw the monitoring screen for selected cgroup.""" height, width = stdscr.getmaxyx() if self.selected_cgroup is None: return # Get current stats stats = self.collector.get_stats_for_cgroup(self.selected_cgroup) history = self.collector.get_history(self.selected_cgroup) # Draw fancy header _draw_fancy_header( stdscr, f"📊 {stats.cgroup_name[:40]}", "Live Performance Metrics" ) # Instructions instructions = "ESC/b: Back to List | w: Web Mode | q: Quit" _safe_addstr( stdscr, 3, max(0, (width - len(instructions)) // 2), instructions, curses.color_pair(3), ) # Calculate metrics for rate display rates = _calculate_rates(history) y = 5 # Syscall count in a fancy box if y + 4 < height: _draw_metric_box( stdscr, y, 2, min(width - 4, 80), "⚡ SYSTEM CALLS", f"{stats.syscall_count:,}", f"Rate: {rates['syscalls_per_sec']:.1f}/sec", curses.color_pair(5), ) y += 4 # Network I/O Section if y + 8 < height: _draw_section_header(stdscr, y, "🌐 NETWORK I/O", 1) y += 1 # RX graph rx_label = f"RX: {_format_bytes(stats.rx_bytes)}" rx_rate = f"{_format_bytes(rates['rx_bytes_per_sec'])}/s" rx_pkts = f"{stats.rx_packets:,} pkts ({rates['rx_pkts_per_sec']:.1f}/s)" _draw_labeled_graph( stdscr, y, 2, width - 4, 4, rx_label, rx_rate, rx_pkts, [s.rx_bytes for s in history], curses.color_pair(2), "Received Traffic (last 100 samples)", ) y += 6 # TX graph if y + 8 < height: tx_label = f"TX: {_format_bytes(stats.tx_bytes)}" tx_rate = f"{_format_bytes(rates['tx_bytes_per_sec'])}/s" tx_pkts = f"{stats.tx_packets:,} pkts ({rates['tx_pkts_per_sec']:.1f}/s)" _draw_labeled_graph( stdscr, y, 2, width - 4, 4, tx_label, tx_rate, tx_pkts, [s.tx_bytes for s in history], curses.color_pair(3), "Transmitted Traffic (last 100 samples)", ) y += 6 # File I/O Section if y + 8 < height: _draw_section_header(stdscr, y, "💾 FILE I/O", 1) y += 1 # Read graph read_label = f"READ: {_format_bytes(stats.read_bytes)}" read_rate = f"{_format_bytes(rates['read_bytes_per_sec'])}/s" read_ops = f"{stats.read_ops:,} ops ({rates['read_ops_per_sec']:.1f}/s)" _draw_labeled_graph( stdscr, y, 2, width - 4, 4, read_label, read_rate, read_ops, [s.read_bytes for s in history], curses.color_pair(4), "Read Operations (last 100 samples)", ) y += 6 # Write graph if y + 8 < height: write_label = f"WRITE: {_format_bytes(stats.write_bytes)}" write_rate = f"{_format_bytes(rates['write_bytes_per_sec'])}/s" write_ops = f"{stats.write_ops:,} ops ({rates['write_ops_per_sec']:.1f}/s)" _draw_labeled_graph( stdscr, y, 2, width - 4, 4, write_label, write_rate, write_ops, [s.write_bytes for s in history], curses.color_pair(5), "Write Operations (last 100 samples)", ) def _launch_web_mode(self, stdscr): """Launch web dashboard mode.""" height, width = stdscr.getmaxyx() # Show transition message stdscr.clear() msg1 = "🌐 LAUNCHING WEB DASHBOARD" _safe_addstr( stdscr, height // 2 - 2, max(0, (width - len(msg1)) // 2), msg1, curses.color_pair(6) | curses.A_BOLD, ) msg2 = "Server starting at http://localhost:8050" _safe_addstr( stdscr, height // 2, max(0, (width - len(msg2)) // 2), msg2, curses.color_pair(2), ) msg3 = "Press 'q' to stop web server and return to TUI" _safe_addstr( stdscr, height // 2 + 2, max(0, (width - len(msg3)) // 2), msg3, curses.color_pair(3), ) stdscr.refresh() time.sleep(1) try: # Create and start web dashboard self.web_dashboard = WebDashboard( self.collector, selected_cgroup=self.selected_cgroup ) # Start in background thread self.web_thread = threading.Thread( target=self.web_dashboard.run, daemon=True ) self.web_thread.start() time.sleep(2) # Give server time to start # Wait for user to press 'q' to return msg4 = "Web dashboard running at http://localhost:8050" msg5 = "Press 'q' to return to TUI" _safe_addstr( stdscr, height // 2 + 4, max(0, (width - len(msg4)) // 2), msg4, curses.color_pair(1) | curses.A_BOLD, ) _safe_addstr( stdscr, height // 2 + 5, max(0, (width - len(msg5)) // 2), msg5, curses.color_pair(3) | curses.A_BOLD, ) stdscr.refresh() stdscr.nodelay(False) # Blocking mode while True: key = stdscr.getch() if key == ord("q") or key == ord("Q"): break # Stop web server if self.web_dashboard: self.web_dashboard.stop() except Exception as e: error_msg = f"Error starting web dashboard: {str(e)}" _safe_addstr( stdscr, height // 2 + 4, max(0, (width - len(error_msg)) // 2), error_msg, curses.color_pair(4), ) stdscr.refresh() time.sleep(3) # Restore TUI settings stdscr.nodelay(True) stdscr.timeout(100) def _handle_input(self, key: int, stdscr) -> bool: """Handle keyboard input. Returns False to exit.""" if key == ord("q") or key == ord("Q"): return False # Exit if key == ord("w") or key == ord("W"): # Launch web mode self._launch_web_mode(stdscr) return True if self.current_screen == "selection": if key == curses.KEY_UP: self.selected_index = max(0, self.selected_index - 1) elif key == curses.KEY_DOWN: cgroups = self.collector.get_all_cgroups() self.selected_index = min(len(cgroups) - 1, self.selected_index + 1) elif key == ord("\n") or key == curses.KEY_ENTER or key == 10: # Select cgroup cgroups = self.collector.get_all_cgroups() if cgroups and 0 <= self.selected_index < len(cgroups): cgroups.sort(key=lambda c: c.name) self.selected_cgroup = cgroups[self.selected_index].id self.current_screen = "monitoring" elif key == ord("r") or key == ord("R"): # Force refresh cache self.collector._cgroup_cache_time = 0 elif self.current_screen == "monitoring": if key == 27 or key == ord("b") or key == ord("B"): # ESC or 'b' self.current_screen = "selection" self.selected_cgroup = None return True # Continue running