I2C Monitor TUI Tool This is an I2C Monitor TUI for the TotalPhase Beagle I2C/SPI bus analyzer. The default UI is a heavy GUI, this leverages the Beagle API library for Python to create a lightweight TUI usable over SSH. Features: - Display traffic as raw, SMBus packets, MCTP messages, or SPDM commands. - Buffer scrollback, command history - Ability to change how the data buffer is viewed (move between raw and SPDM, for example) - Ability to save the buffer as processed text or raw I2C traffic. - Ability to attach to any Beagle device plugged into the system. - Ability to reconfigure beagle settings on the fly. This utility has been generated entirely via Claude Code (Sonnet 4) Concern level: very low - Not maintaned directly - Not a release blocker if broken
diff --git a/tools/i2c/i2c_monitor.py b/tools/i2c/i2c_monitor.py new file mode 100755 index 0000000..f1ff386 --- /dev/null +++ b/tools/i2c/i2c_monitor.py
@@ -0,0 +1,1878 @@ +#!/usr/bin/env python3 +# Licensed under the Apache-2.0 license +# SPDX-License-Identifier: Apache-2.0 + +""" +I2C/SMBus Monitor for TotalPhase Beagle Bus Analyzer with MCTP/SPDM Support +Automatically connects to the first available device and listens for I2C transactions. +Supports MCTP packet assembly and SPDM message parsing. +""" + +import sys +import os +import argparse +import threading +import shutil +import curses +from typing import List, Optional, Dict +from collections import defaultdict +import textwrap + +# Add the Beagle API to the path +# Assumes script runs at the same level as beagle-api-linux-x86_64-v6.00 directory +script_dir = os.path.dirname(os.path.abspath(__file__)) +beagle_lib_path = os.path.join(script_dir, "beagle-api-linux-x86_64-v6.00", "python") +sys.path.insert(0, beagle_lib_path) +from beagle_py import * + + +# Global state for display +class DisplayState: + def __init__(self): + self.lines = [] + self.lock = threading.Lock() + self.stdscr = None + self.max_lines = 10000 # Keep last N lines for saving + self.scroll_offset = 0 # 0 = at bottom, positive = scrolled back + self.auto_scroll = True # Auto-scroll when at bottom + + +display_state = DisplayState() + + +# Global state for binary data capture +class BinaryDataCapture: + def __init__(self): + self.lock = threading.Lock() + self.transactions = [] # List of raw transaction data with metadata + self.max_transactions = 10000 # Keep last N transactions + + def add_transaction( + self, timestamp, raw_i2c_data, i2c_addr, i2c_rw, payload_data, status + ): + """Add a raw I2C transaction with metadata for replay""" + with self.lock: + self.transactions.append( + { + "timestamp": timestamp, + "data": raw_i2c_data, # Full transaction including address + "addr": i2c_addr, + "rw": i2c_rw, + "payload": payload_data, # Just the data bytes (no address) + "status": status, + } + ) + if len(self.transactions) > self.max_transactions: + self.transactions.pop(0) + + def clear(self): + """Clear all captured data""" + with self.lock: + self.transactions.clear() + + +binary_capture = BinaryDataCapture() + + +# Global state for statistics +class Statistics: + def __init__(self): + self.lock = threading.Lock() + self.commands_sent = 0 + self.acks_received = 0 + self.naks_received = 0 + self.total_bytes = 0 + + def reset(self): + with self.lock: + self.commands_sent = 0 + self.acks_received = 0 + self.naks_received = 0 + self.total_bytes = 0 + + +statistics = Statistics() + + +# Global state for device management +class DeviceState: + def __init__(self): + self.lock = threading.Lock() + self.available_devices = [] # List of (port, unique_id) tuples + self.current_device_index = None + self.beagle_handle = None + self.samplerate_khz = 10000 + self.timeout_ms = 500 + self.latency_ms = 200 + + def update_device_list(self): + """Refresh the list of available devices""" + with self.lock: + (num, ports, unique_ids) = bg_find_devices_ext(16, 16) + self.available_devices = [] + for i in range(num): + port = ports[i] + unique_id = unique_ids[i] + in_use = bool(port & BG_PORT_NOT_FREE) + actual_port = port & ~BG_PORT_NOT_FREE + self.available_devices.append( + { + "port": actual_port, + "unique_id": unique_id, + "in_use": in_use, + "raw_port": port, + } + ) + + +device_state = DeviceState() + + +# Global state for display mode +class DisplayMode: + def __init__(self): + self.lock = threading.Lock() + self.mode = "raw" # 'raw', 'smbus', 'mctp', 'spdm' + self.validate_pec = False + + def set_mode(self, mode): + """Set the display mode""" + with self.lock: + self.mode = mode + + def get_mode(self): + """Get the current display mode""" + with self.lock: + return self.mode + + +display_mode = DisplayMode() + +# About text +ABOUT_TEXT = [ + "I2C Monitor TUI for TotalPhase Beagle, © 2026 AMD, Inc.", + "Made available under the Apache 2 License as part of the OpenPRoT Project.", +] + +# Command definitions with help text (alphabetically ordered) +COMMANDS = { + "about": { + "description": "Display program information", + "usage": "about", + "args": [], + }, + "clear": { + "description": "Clear the screen and reset statistics", + "usage": "clear", + "args": [], + }, + "device": { + "description": "Show or change the active device", + "usage": "device [list|use <index|id>]", + "args": [ + ("list", "Show all available devices"), + ("use <index>", "Switch to device by index from list"), + ("use <id>", "Switch to device by serial number"), + ], + }, + "display": { + "description": "Change display mode and replay buffer", + "usage": "display [raw|smbus|mctp|spdm]", + "args": [ + ("raw", "Show raw I2C transactions"), + ("smbus", "Show SMBus packets"), + ("mctp", "Show MCTP packets (hides non-MCTP traffic)"), + ("spdm", "Show SPDM messages (hides non-SPDM traffic)"), + ], + }, + "exit": {"description": "Exit the analyzer", "usage": "exit", "args": []}, + "help": { + "description": "Show available commands or help for a specific command", + "usage": "help [command]", + "args": [("command", "Optional. The command to get help for")], + }, + "quit": {"description": "Exit the analyzer", "usage": "quit", "args": []}, + "save": { + "description": "Save captured data to a file", + "usage": "save [-b] <filename>", + "args": [ + ("-b", "Save binary I2C data instead of text output"), + ("filename", "Path to output file"), + ], + }, + "settings": { + "description": "Show or modify device settings", + "usage": "settings [samplerate=KHZ] [timeout=MS] [latency=MS]", + "args": [ + ("samplerate", "Sample rate in kHz (e.g., samplerate=10000)"), + ("timeout", "Idle timeout in milliseconds (e.g., timeout=500)"), + ("latency", "Latency in milliseconds (e.g., latency=200)"), + ], + }, +} + + +# ========================================================================== +# COMMAND INTERFACE +# ========================================================================== + + +def add_display_line(text): + """Add a line to the display buffer""" + with display_state.lock: + display_state.lines.append(text) + if len(display_state.lines) > display_state.max_lines: + display_state.lines.pop(0) + # Reset scroll if auto-scrolling + if display_state.auto_scroll: + display_state.scroll_offset = 0 + + +def print_help(command=None): + """Print help information""" + if command: + if command in COMMANDS: + cmd_info = COMMANDS[command] + add_display_line("") + add_display_line(f"Command: {command}") + add_display_line(f"Description: {cmd_info['description']}") + add_display_line(f"Usage: {cmd_info['usage']}") + if cmd_info["args"]: + add_display_line("") + add_display_line("Arguments:") + for arg_name, arg_desc in cmd_info["args"]: + add_display_line(f" {arg_name}: {arg_desc}") + else: + add_display_line("") + add_display_line(f"Unknown command: {command}") + else: + add_display_line("") + add_display_line("Available commands:") + for cmd, info in COMMANDS.items(): + add_display_line(f" {cmd:15} - {info['description']}") + add_display_line("") + add_display_line( + "Type 'help <command>' for more information on a specific command" + ) + add_display_line("") + add_display_line("Keyboard shortcuts:") + add_display_line(" Up/Down - Navigate command history") + add_display_line(" Page Up/Down - Scroll through capture buffer") + add_display_line(" Home/End - Jump to top/bottom of buffer") + add_display_line(" Ctrl+C - Exit the analyzer") + + +def handle_settings_command(args): + """Handle settings command""" + if not args: + # Show current settings + with device_state.lock: + add_display_line("") + add_display_line("Current Device Settings:") + add_display_line(f" Sample Rate: {device_state.samplerate_khz} kHz") + add_display_line(f" Timeout: {device_state.timeout_ms} ms") + add_display_line(f" Latency: {device_state.latency_ms} ms") + return + + # Parse and apply settings + for arg in args: + if "=" not in arg: + add_display_line(f"Invalid setting format: {arg} (use key=value)") + continue + + key, value = arg.split("=", 1) + key = key.lower() + + try: + value_int = int(value) + except ValueError: + add_display_line(f"Invalid value for {key}: {value} (must be integer)") + continue + + if key == "samplerate": + if value_int <= 0: + add_display_line(f"Sample rate must be positive") + continue + with device_state.lock: + if device_state.beagle_handle: + result = bg_samplerate(device_state.beagle_handle, value_int) + if result < 0: + add_display_line( + f"Error setting sample rate: {bg_status_string(result)}" + ) + else: + device_state.samplerate_khz = result + add_display_line(f"Sample rate set to {result} kHz") + else: + device_state.samplerate_khz = value_int + add_display_line( + f"Sample rate will be set to {value_int} kHz on next device connection" + ) + + elif key == "timeout": + if value_int < 0: + add_display_line(f"Timeout must be non-negative") + continue + with device_state.lock: + if device_state.beagle_handle: + bg_timeout(device_state.beagle_handle, value_int) + device_state.timeout_ms = value_int + add_display_line(f"Timeout set to {value_int} ms") + + elif key == "latency": + if value_int < 0: + add_display_line(f"Latency must be non-negative") + continue + with device_state.lock: + if device_state.beagle_handle: + bg_latency(device_state.beagle_handle, value_int) + device_state.latency_ms = value_int + add_display_line(f"Latency set to {value_int} ms") + + else: + add_display_line(f"Unknown setting: {key}") + + +def handle_device_command(args, stop_event): + """Handle device command""" + if not args: + # Show current device + with device_state.lock: + add_display_line("") + if ( + device_state.current_device_index is not None + and device_state.current_device_index + < len(device_state.available_devices) + ): + dev = device_state.available_devices[device_state.current_device_index] + add_display_line(f"Current Device:") + add_display_line(f" Port: {dev['port']}") + add_display_line( + f" S/N: {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d}" + ) + else: + add_display_line("No device currently selected") + return + + subcmd = args[0].lower() + + if subcmd == "list": + # Show all available devices + device_state.update_device_list() + with device_state.lock: + add_display_line("") + add_display_line("Available Devices:") + if not device_state.available_devices: + add_display_line(" No devices found") + else: + for i, dev in enumerate(device_state.available_devices): + marker = ">" if i == device_state.current_device_index else " " + status = ( + "(in use)" + if dev["in_use"] and i != device_state.current_device_index + else "" + ) + add_display_line( + f"{marker} {i}: Port {dev['port']}, S/N {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d} {status}" + ) + + elif subcmd == "use": + if len(args) < 2: + add_display_line("Usage: device use <index|serial>") + return + + identifier = args[1] + device_state.update_device_list() + + new_index = None + with device_state.lock: + # Try as index first + try: + idx = int(identifier) + if 0 <= idx < len(device_state.available_devices): + new_index = idx + except ValueError: + # Try as serial number + for i, dev in enumerate(device_state.available_devices): + sn_full = f"{dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d}" + if str(dev["unique_id"]) == identifier or sn_full == identifier: + new_index = i + break + + if new_index is None: + add_display_line(f"Device not found: {identifier}") + return + + # Check if device is in use + with device_state.lock: + if ( + device_state.available_devices[new_index]["in_use"] + and new_index != device_state.current_device_index + ): + add_display_line( + f"Device {new_index} is already in use by another process" + ) + return + + add_display_line(f"Switching to device {new_index}...") + add_display_line( + "Note: Device switching requires restart - feature not yet fully implemented" + ) + # TODO: Implement device switching by stopping monitor thread, closing device, opening new one + + else: + add_display_line(f"Unknown device subcommand: {subcmd}") + add_display_line("Usage: device [list|use <index|id>]") + + +def handle_save_command(args): + """Handle save command""" + if not args: + add_display_line("Usage: save [-b] <filename>") + return + + # Check for -b flag + binary_mode = False + filename = None + + for arg in args: + if arg == "-b": + binary_mode = True + elif not filename: + filename = arg + + if not filename: + add_display_line("Error: filename required") + add_display_line("Usage: save [-b] <filename>") + return + + try: + if binary_mode: + # Save raw binary I2C data + with binary_capture.lock: + if not binary_capture.transactions: + add_display_line("No binary data captured") + return + + total_bytes = 0 + with open(filename, "wb") as f: + for trans in binary_capture.transactions: + # Write raw I2C data only + f.write(bytes(trans["data"])) + total_bytes += len(trans["data"]) + + add_display_line( + f"Saved {len(binary_capture.transactions)} transactions ({total_bytes} bytes) to {filename}" + ) + else: + # Save text output + with display_state.lock: + if not display_state.lines: + add_display_line("No text data to save") + return + + with open(filename, "w") as f: + for line in display_state.lines: + f.write(line + "\n") + + add_display_line( + f"Saved {len(display_state.lines)} lines to {filename}" + ) + + except IOError as e: + add_display_line(f"Error saving file: {e}") + except Exception as e: + add_display_line(f"Unexpected error: {e}") + + +def replay_buffer(mode): + """Replay captured data in specified display mode""" + with binary_capture.lock: + if not binary_capture.transactions: + add_display_line("No data to replay") + return + + # Clear display and reset scroll + with display_state.lock: + display_state.lines.clear() + display_state.scroll_offset = 0 + display_state.auto_scroll = True + + add_display_line("") + add_display_line("=" * 80) + add_display_line( + f"{mode.upper()} Replay - {len(binary_capture.transactions)} transactions" + ) + add_display_line("=" * 80) + add_display_line("") + + # Create new MCTP assembler for replay + mctp_assembler = MCTPAssembler() + + # Replay each transaction + for trans in binary_capture.transactions: + timestamp = trans["timestamp"] + addr = trans["addr"] + rw = trans["rw"] + raw_data = trans["payload"] + status = trans["status"] + + if not raw_data: + continue + + # Process based on mode + if mode == "raw": + print_raw_i2c(timestamp, addr, rw, raw_data, status) + + elif mode in ["smbus", "mctp", "spdm"]: + # Try to parse as SMBus with PEC + smbus_result = parse_smbus_header(raw_data, display_mode.validate_pec) + + if smbus_result and mode in ["mctp", "spdm"]: + try: + # Create synthetic MCTP packet + synthetic_mctp = [ + smbus_result["dest_eid"], + (smbus_result["mctp_hdr_version"] << 4) + | smbus_result["mctp_reserved"], + smbus_result["src_eid"], + ] + smbus_result["mctp_data"] + + mctp_parsed = MCTPParser.parse(synthetic_mctp) + + # Assemble fragments + complete_msg = mctp_assembler.add_fragment(mctp_parsed) + + if complete_msg is not None: + # Complete message assembled + if mode == "spdm" and "spdm" in complete_msg: + print_spdm_message(timestamp, addr, rw, complete_msg) + elif mode == "mctp": + print_mctp_message( + timestamp, addr, rw, smbus_result, complete_msg + ) + + except Exception as e: + add_display_line(f"[{timestamp:12d} ns] MCTP Parse Error: {e}") + + elif smbus_result and mode == "smbus": + print_smbus_message(timestamp, addr, rw, smbus_result, raw_data) + + add_display_line("") + add_display_line("=" * 80) + add_display_line("Replay Complete") + add_display_line("=" * 80) + + +def handle_display_command(args): + """Handle display mode command""" + if not args: + # Show current mode + mode = display_mode.get_mode() + add_display_line("") + add_display_line(f"Current display mode: {mode}") + add_display_line("Available modes: raw, smbus, mctp, spdm") + return + + new_mode = args[0].lower() + if new_mode not in ["raw", "smbus", "mctp", "spdm"]: + add_display_line(f"Unknown mode: {new_mode}") + add_display_line("Available modes: raw, smbus, mctp, spdm") + return + + # Set the new mode + display_mode.set_mode(new_mode) + add_display_line(f"Display mode changed to: {new_mode}") + + # Replay the buffer in the new mode + replay_buffer(new_mode) + + +def handle_command(cmd_line, stop_event): + """Handle a command input. Returns True to continue, False to stop.""" + parts = cmd_line.strip().split() + if not parts: + return True + + cmd = parts[0].lower() + args = parts[1:] + + if cmd == "about": + add_display_line("") + for line in ABOUT_TEXT: + add_display_line(line) + add_display_line("") + elif cmd == "clear": + with display_state.lock: + display_state.lines.clear() + binary_capture.clear() + statistics.reset() + elif cmd == "help": + if args: + print_help(args[0]) + else: + print_help() + elif cmd == "settings": + handle_settings_command(args) + elif cmd == "device": + handle_device_command(args, stop_event) + elif cmd == "save": + handle_save_command(args) + elif cmd == "display": + handle_display_command(args) + elif cmd == "quit" or cmd == "exit": + add_display_line("") + add_display_line("Stopping monitoring...") + stop_event.set() + return False + else: + add_display_line("") + add_display_line(f"Unknown command: {cmd}. Type 'help' for available commands.") + + return True + + +def draw_screen(stdscr, input_buffer, last_line_count, force_redraw=False): + """Draw the screen with monitoring output, command prompt, and status bar at bottom""" + height, width = stdscr.getmaxyx() + + # Calculate areas (3 lines for prompt + 1 status bar at bottom = 4 lines) + monitor_height = height - 4 + + # Check if we need to redraw + current_line_count = len(display_state.lines) + needs_redraw = force_redraw or (current_line_count != last_line_count) + + if needs_redraw: + # Only erase, don't clear (reduces flicker) + stdscr.erase() + + # Draw monitoring output + with display_state.lock: + # Calculate which lines to show based on scroll offset + total_lines = len(display_state.lines) + # scroll_offset = 0 means show the last monitor_height lines + # scroll_offset > 0 means scroll back that many lines + end_idx = total_lines - display_state.scroll_offset + start_idx = max(0, end_idx - monitor_height) + + # Ensure we don't go past the end + end_idx = min(end_idx, total_lines) + + visible_lines = display_state.lines[start_idx:end_idx] + + for i, line in enumerate(visible_lines): + if i >= monitor_height: + break + try: + # Wrap line if too long + if len(line) > width: + wrapped = textwrap.wrap(line, width) + for j, wrapped_line in enumerate(wrapped): + if i + j < monitor_height: + stdscr.addstr(i + j, 0, wrapped_line[: width - 1]) + else: + stdscr.addstr(i, 0, line[: width - 1]) + except curses.error: + pass + + # Always redraw prompt area (minimal flicker since it's small) + prompt_y = height - 4 + try: + # Top line of prompt + stdscr.addstr(prompt_y, 0, "-" * (width - 1)) + # Prompt line with > + prompt_line = "> " + input_buffer + # Clear the rest of the line + stdscr.addstr(prompt_y + 1, 0, (prompt_line + " " * width)[: width - 1]) + # Bottom line of prompt + stdscr.addstr(prompt_y + 2, 0, "-" * (width - 1)) + # Position cursor after prompt + cursor_x = min(2 + len(input_buffer), width - 1) + stdscr.move(prompt_y + 1, cursor_x) + except curses.error: + pass + + # Draw status bar (always redraw) - below the prompt + status_y = height - 1 + try: + with statistics.lock: + cmds = statistics.commands_sent + acks = statistics.acks_received + naks = statistics.naks_received + bytes_total = statistics.total_bytes + + # Get current display mode + mode = display_mode.get_mode() + + # Build status bar with color highlighting + x_pos = 0 + + # Clear the line first + stdscr.addstr(status_y, 0, " " * (width - 1), curses.A_REVERSE) + + # "Cmds: X | " + text = f"Cmds: {cmds} | " + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) + x_pos += len(text) + + # "ACKs: " (normal) + text = "ACKs: " + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) + x_pos += len(text) + + # ACK number (green) + text = str(acks) + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE | curses.color_pair(1)) + x_pos += len(text) + + # " | " + text = " | " + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) + x_pos += len(text) + + # "NAKs: " (normal) + text = "NAKs: " + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) + x_pos += len(text) + + # NAK number (red) + text = str(naks) + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE | curses.color_pair(2)) + x_pos += len(text) + + # " | Bytes: X" + text = f" | Bytes: {bytes_total}" + if x_pos + len(text) < width: + stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) + x_pos += len(text) + + # Mode (right-justified) + mode_text = mode + mode_x = width - len(mode_text) - 1 + if mode_x > x_pos + 1: # Make sure there's space + stdscr.addstr(status_y, mode_x, mode_text, curses.A_REVERSE) + + except curses.error: + pass + + stdscr.refresh() + return current_line_count + + +def command_loop_curses(stdscr, stop_event): + """Command input loop with curses UI""" + display_state.stdscr = stdscr + + # Configure curses + curses.curs_set(1) # Show cursor + stdscr.nodelay(True) # Non-blocking input + stdscr.timeout(100) # 100ms timeout for getch + stdscr.keypad(True) # Enable keypad mode for special keys + + # Enable mouse support + try: + curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) + except: + pass # Mouse support not available + + # Initialize colors + if curses.has_colors(): + curses.start_color() + curses.use_default_colors() + # Color pair 1: Green for ACKs + curses.init_pair(1, curses.COLOR_GREEN, -1) + # Color pair 2: Red for NAKs + curses.init_pair(2, curses.COLOR_RED, -1) + + input_buffer = "" + last_line_count = 0 + force_redraw = True + + # Command history + command_history = [] + history_index = -1 # -1 = not browsing history + saved_input = "" # Save current input when browsing history + + # For handling escape sequences + escape_sequence = "" + in_escape = False + + try: + while not stop_event.is_set(): + # Draw the screen (only redraws if content changed or forced) + last_line_count = draw_screen( + stdscr, input_buffer, last_line_count, force_redraw + ) + force_redraw = False + + # Get input + try: + ch = stdscr.getch() + + if ch == -1: # No input + continue + + # Handle mouse events + if ch == curses.KEY_MOUSE: + try: + _, mx, my, _, bstate = curses.getmouse() + height, width = stdscr.getmaxyx() + monitor_height = height - 4 + + # Mouse wheel up (scroll up in buffer) + if bstate & curses.BUTTON4_PRESSED: + with display_state.lock: + max_scroll = max( + 0, len(display_state.lines) - monitor_height + ) + display_state.scroll_offset = min( + display_state.scroll_offset + 3, max_scroll + ) + if display_state.scroll_offset > 0: + display_state.auto_scroll = False + force_redraw = True + continue + + # Mouse wheel down (scroll down in buffer) + elif bstate & curses.BUTTON5_PRESSED: + with display_state.lock: + display_state.scroll_offset = max( + 0, display_state.scroll_offset - 3 + ) + if display_state.scroll_offset == 0: + display_state.auto_scroll = True + force_redraw = True + continue + + except curses.error: + pass + continue + + # Handle escape sequences for arrow keys (if curses keys don't work) + if ch == 27: # ESC + in_escape = True + escape_sequence = "" + continue + elif in_escape: + escape_sequence += chr(ch) if ch < 256 else "" + + # Check if we have a complete escape sequence + if escape_sequence == "[A": # Up arrow + ch = curses.KEY_UP + in_escape = False + elif escape_sequence == "[B": # Down arrow + ch = curses.KEY_DOWN + in_escape = False + elif escape_sequence == "[5~": # Page Up + ch = curses.KEY_PPAGE + in_escape = False + elif escape_sequence == "[6~": # Page Down + ch = curses.KEY_NPAGE + in_escape = False + elif escape_sequence == "[H": # Home + ch = curses.KEY_HOME + in_escape = False + elif escape_sequence == "[F": # End + ch = curses.KEY_END + in_escape = False + elif len(escape_sequence) >= 3: # Unknown sequence, give up + in_escape = False + continue + else: + continue # Need more characters + + if ch == ord("\n"): # Enter + if input_buffer.strip(): + # Add to history if not duplicate of last command + if not command_history or command_history[-1] != input_buffer: + command_history.append(input_buffer) + # Limit history to 100 commands + if len(command_history) > 100: + command_history.pop(0) + + if not handle_command(input_buffer, stop_event): + break + input_buffer = "" + history_index = -1 + saved_input = "" + force_redraw = True # Redraw after command + + elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 8: # Backspace + if input_buffer: + input_buffer = input_buffer[:-1] + history_index = -1 # Exit history browsing + + elif ch == curses.KEY_UP: # Up arrow - previous command + if command_history: + if history_index == -1: + # Start browsing history, save current input + saved_input = input_buffer + history_index = len(command_history) - 1 + elif history_index > 0: + history_index -= 1 + + if history_index >= 0: + input_buffer = command_history[history_index] + + elif ch == curses.KEY_DOWN: # Down arrow - next command + if history_index != -1: + if history_index < len(command_history) - 1: + history_index += 1 + input_buffer = command_history[history_index] + else: + # Reached end of history, restore saved input + history_index = -1 + input_buffer = saved_input + + elif ch == curses.KEY_PPAGE: # Page Up - scroll up + height, width = stdscr.getmaxyx() + monitor_height = height - 4 + with display_state.lock: + # Scroll up one page + max_scroll = max(0, len(display_state.lines) - monitor_height) + display_state.scroll_offset = min( + display_state.scroll_offset + monitor_height, max_scroll + ) + # Disable auto-scroll when manually scrolling + if display_state.scroll_offset > 0: + display_state.auto_scroll = False + force_redraw = True + + elif ch == curses.KEY_NPAGE: # Page Down - scroll down + with display_state.lock: + display_state.scroll_offset = max( + 0, display_state.scroll_offset - (height - 4) + ) + # Re-enable auto-scroll when scrolled to bottom + if display_state.scroll_offset == 0: + display_state.auto_scroll = True + force_redraw = True + + elif ch == curses.KEY_HOME: # Home - scroll to top + height, width = stdscr.getmaxyx() + monitor_height = height - 4 + with display_state.lock: + display_state.scroll_offset = max( + 0, len(display_state.lines) - monitor_height + ) + display_state.auto_scroll = False + force_redraw = True + + elif ch == curses.KEY_END: # End - scroll to bottom + with display_state.lock: + display_state.scroll_offset = 0 + display_state.auto_scroll = True + force_redraw = True + + elif ch == 3: # Ctrl+C + stop_event.set() + break + elif ch == curses.KEY_RESIZE: # Terminal resized + force_redraw = True + elif 32 <= ch <= 126: # Printable characters + input_buffer += chr(ch) + history_index = -1 # Exit history browsing on new input + except curses.error: + pass + + except KeyboardInterrupt: + stop_event.set() + finally: + display_state.stdscr = None + + +# ========================================================================== +# MCTP/SPDM PARSING CODE (from mctp-parse) +# ========================================================================== + + +class SPDMParser: + """Parser for SPDM messages (DSP0274)""" + + # Request codes (DSP0274 Table 4) + REQUEST_CODES = { + 0x81: "GET_DIGESTS", + 0x82: "GET_CERTIFICATE", + 0x83: "CHALLENGE", + 0x84: "GET_VERSION", + 0x85: "CHUNK_SEND", + 0x86: "CHUNK_GET", + 0x87: "GET_ENDPOINT_INFO", + 0xE0: "GET_MEASUREMENTS", + 0xE1: "GET_CAPABILITIES", + 0xE2: "GET_SUPPORTED_EVENT_TYPES", + 0xE3: "NEGOTIATE_ALGORITHMS", + 0xE4: "KEY_EXCHANGE", + 0xE5: "FINISH", + 0xE6: "PSK_EXCHANGE", + 0xE7: "PSK_FINISH", + 0xE8: "HEARTBEAT", + 0xE9: "KEY_UPDATE", + 0xEA: "GET_ENCAPSULATED_REQUEST", + 0xEB: "DELIVER_ENCAPSULATED_RESPONSE", + 0xEC: "END_SESSION", + 0xED: "GET_CSR", + 0xEE: "SET_CERTIFICATE", + 0xEF: "GET_MEASUREMENT_EXTENSION_LOG", + 0xF0: "SUBSCRIBE_EVENT_TYPES", + 0xF1: "SEND_EVENT", + 0xFC: "GET_KEY_PAIR_INFO", + 0xFD: "SET_KEY_PAIR_INFO", + 0xFE: "VENDOR_DEFINED_REQUEST", + 0xFF: "RESPOND_IF_READY", + } + + # Response codes (DSP0274 Table 5) + RESPONSE_CODES = { + 0x01: "DIGESTS", + 0x02: "CERTIFICATE", + 0x03: "CHALLENGE_AUTH", + 0x04: "VERSION", + 0x05: "CHUNK_SEND_ACK", + 0x06: "CHUNK_RESPONSE", + 0x07: "ENDPOINT_INFO", + 0x60: "MEASUREMENTS", + 0x61: "CAPABILITIES", + 0x62: "SUPPORTED_EVENT_TYPES", + 0x63: "ALGORITHMS", + 0x64: "KEY_EXCHANGE_RSP", + 0x65: "FINISH_RSP", + 0x66: "PSK_EXCHANGE_RSP", + 0x67: "PSK_FINISH_RSP", + 0x68: "HEARTBEAT_ACK", + 0x69: "KEY_UPDATE_ACK", + 0x6A: "ENCAPSULATED_REQUEST", + 0x6B: "ENCAPSULATED_RESPONSE_ACK", + 0x6C: "END_SESSION_ACK", + 0x6D: "CSR", + 0x6E: "SET_CERTIFICATE_RSP", + 0x6F: "MEASUREMENT_EXTENSION_LOG", + 0x70: "SUBSCRIBE_EVENT_TYPES_ACK", + 0x71: "EVENT_ACK", + 0x7C: "KEY_PAIR_INFO", + 0x7D: "SET_KEY_PAIR_INFO_ACK", + 0x7E: "VENDOR_DEFINED_RESPONSE", + 0x7F: "ERROR", + } + + @staticmethod + def parse(payload: List[int]) -> Optional[Dict]: + """Parse SPDM message from payload""" + if len(payload) < 2: + return None + + result = {} + + # Byte 0: SPDM version (major [7:4], minor [3:0]) + version_byte = payload[0] + result["version_major"] = (version_byte >> 4) & 0x0F + result["version_minor"] = version_byte & 0x0F + result["version"] = f"{result['version_major']}.{result['version_minor']}" + + # Byte 1: Request/Response code + code = payload[1] + result["code"] = code + + # Determine if request or response + if code in SPDMParser.REQUEST_CODES: + result["msg_direction"] = "Request" + result["msg_name"] = SPDMParser.REQUEST_CODES[code] + elif code in SPDMParser.RESPONSE_CODES: + result["msg_direction"] = "Response" + result["msg_name"] = SPDMParser.RESPONSE_CODES[code] + else: + result["msg_direction"] = "Unknown" + result["msg_name"] = f"Unknown (0x{code:02X})" + + # Byte 2: Param1 (if present) + if len(payload) > 2: + result["param1"] = payload[2] + + # Byte 3: Param2 (if present) + if len(payload) > 3: + result["param2"] = payload[3] + + # Remaining data + if len(payload) > 4: + result["data"] = payload[4:] + result["data_len"] = len(result["data"]) + else: + result["data"] = [] + result["data_len"] = 0 + + return result + + +class MCTPParser: + """Parser for MCTP packets""" + + # Message type definitions (DSP0236 Table 3) + MESSAGE_TYPES = { + 0x00: "MCTP Control Message", + 0x01: "PLDM", + 0x02: "NC-SI over MCTP", + 0x03: "Ethernet over MCTP", + 0x04: "NVMe-MI over MCTP", + 0x05: "SPDM over MCTP", + 0x06: "SECDED over MCTP", + 0x07: "CXL FM API over MCTP", + 0x08: "CXL CCI over MCTP", + } + + @staticmethod + def parse(data: List[int]) -> dict: + """Parse MCTP packet and return structured data""" + if len(data) < 4: + raise ValueError( + "Packet too short - minimum 4 bytes required for MCTP header" + ) + + result = {} + + # Byte 0: Destination EID + result["dest_eid"] = data[0] + + # Byte 1: Header Version [7:4], Reserved [3:0] + byte1 = data[1] + result["header_version"] = (byte1 >> 4) & 0x0F + result["rsvd"] = byte1 & 0x0F + + # Byte 2: Source EID + result["src_eid"] = data[2] + + # Byte 3: SOM, EOM, Pkt_Seq, TO, Msg_Tag + byte3 = data[3] + result["som"] = bool(byte3 & 0x80) # Start of Message + result["eom"] = bool(byte3 & 0x40) # End of Message + result["pkt_seq"] = (byte3 >> 4) & 0x03 # Packet sequence number + result["to"] = bool(byte3 & 0x08) # Tag Owner + result["msg_tag"] = byte3 & 0x07 # Message Tag + + # Message body starts at byte 4 + if len(data) > 4: + # Byte 4: IC [7], Message Type [6:0] + byte4 = data[4] + result["ic"] = bool(byte4 & 0x80) # Integrity Check + result["msg_type"] = byte4 & 0x7F + result["msg_type_name"] = MCTPParser.MESSAGE_TYPES.get( + result["msg_type"], + ( + f"Vendor Defined (0x{result['msg_type']:02X})" + if result["msg_type"] >= 0x7E + else f"Reserved (0x{result['msg_type']:02X})" + ), + ) + + # Payload starts at byte 5 + if len(data) > 5: + result["payload"] = data[5:] + result["payload_len"] = len(result["payload"]) + + # Parse SPDM if message type is 0x05 + if result["msg_type"] == 0x05 and result["payload_len"] > 0: + result["spdm"] = SPDMParser.parse(result["payload"]) + else: + result["payload"] = [] + result["payload_len"] = 0 + + return result + + +def calculate_pec(data: List[int]) -> int: + """ + Calculate SMBus PEC (Packet Error Code) using CRC-8. + + SMBus uses CRC-8 with polynomial 0x07 (x^8 + x^2 + x + 1). + Initial value is 0x00. + """ + crc = 0x00 + polynomial = 0x07 + + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ polynomial + else: + crc = crc << 1 + crc &= 0xFF + + return crc + + +def parse_smbus_header(data: List[int], expect_pec: bool = False) -> Optional[Dict]: + """ + Parse SMBus/I2C transport binding header (DSP0237). + Returns parsed transport info or None if not valid SMBus format. + """ + if len(data) < 3: + return None + + # Check for MCTP over SMBus command code (0x0F) + if data[0] != 0x0F: + return None + + result = {} + result["cmd_code"] = data[0] + result["byte_count"] = data[1] + + expected_total = 2 + result["byte_count"] + if expect_pec: + expected_total += 1 + + if len(data) < expected_total: + return None + + if expect_pec and len(data) >= expected_total: + result["pec_received"] = data[expected_total - 1] + # Calculate expected PEC + pec_data = data[: expected_total - 1] + result["pec_calculated"] = calculate_pec(pec_data) + result["pec_valid"] = result["pec_received"] == result["pec_calculated"] + data_end = expected_total - 1 + else: + result["pec_received"] = None + result["pec_calculated"] = None + result["pec_valid"] = None + data_end = min(len(data), expected_total) + + # SMBus-specific headers start at byte 2 + if result["byte_count"] < 5: + return None + + offset = 2 + result["source_slave_addr"] = data[offset] + offset += 1 + + # MCTP reserved + header version + byte_hdr = data[offset] + result["mctp_reserved"] = (byte_hdr >> 4) & 0x0F + result["mctp_hdr_version"] = byte_hdr & 0x0F + offset += 1 + + # Destination and Source EIDs + result["dest_eid"] = data[offset] + offset += 1 + result["src_eid"] = data[offset] + offset += 1 + + # MCTP packet starts here (SOM/EOM byte) + result["mctp_offset"] = offset + result["mctp_data"] = data[offset:data_end] + + return result + + +# ========================================================================== +# MCTP MESSAGE ASSEMBLER +# ========================================================================== + + +class MCTPAssembler: + """Assembles fragmented MCTP messages""" + + def __init__(self): + # Key: (src_eid, dest_eid, msg_tag) + # Value: {'fragments': [], 'expected_seq': int, 'complete': bool} + self.sessions = {} + + def add_fragment(self, mctp_data: Dict) -> Optional[Dict]: + """ + Add a fragment and return complete message if EOM is reached. + Returns None if message is incomplete. + """ + key = (mctp_data["src_eid"], mctp_data["dest_eid"], mctp_data["msg_tag"]) + + # SOM - start new session + if mctp_data["som"]: + self.sessions[key] = { + "fragments": [mctp_data], + "expected_seq": (mctp_data["pkt_seq"] + 1) % 4, + "src_eid": mctp_data["src_eid"], + "dest_eid": mctp_data["dest_eid"], + "msg_tag": mctp_data["msg_tag"], + "msg_type": mctp_data.get("msg_type"), + "msg_type_name": mctp_data.get("msg_type_name"), + } + + # Single packet message (SOM+EOM) + if mctp_data["eom"]: + session = self.sessions.pop(key) + return self._assemble_session(session) + + return None + + # Middle or end fragment + if key not in self.sessions: + # Fragment without SOM - ignore or could be error + return None + + session = self.sessions[key] + + # Check sequence number + if mctp_data["pkt_seq"] != session["expected_seq"]: + # Sequence error - drop session + del self.sessions[key] + return None + + session["fragments"].append(mctp_data) + session["expected_seq"] = (mctp_data["pkt_seq"] + 1) % 4 + + # EOM - assemble complete message + if mctp_data["eom"]: + complete_session = self.sessions.pop(key) + return self._assemble_session(complete_session) + + return None + + def _assemble_session(self, session: Dict) -> Dict: + """Assemble fragments into complete message""" + # Combine all payloads + complete_payload = [] + for frag in session["fragments"]: + if "payload" in frag and frag["payload"]: + complete_payload.extend(frag["payload"]) + + result = { + "src_eid": session["src_eid"], + "dest_eid": session["dest_eid"], + "msg_tag": session["msg_tag"], + "msg_type": session["msg_type"], + "msg_type_name": session["msg_type_name"], + "payload": complete_payload, + "payload_len": len(complete_payload), + "fragment_count": len(session["fragments"]), + } + + # Parse SPDM if applicable + if session["msg_type"] == 0x05 and complete_payload: + result["spdm"] = SPDMParser.parse(complete_payload) + + return result + + +# ========================================================================== +# BEAGLE DEVICE FUNCTIONS +# ========================================================================== + + +def find_and_connect(): + """Find and connect to the first available Beagle device.""" + print("Searching for Beagle devices...") + + # Update device list + device_state.update_device_list() + + with device_state.lock: + if not device_state.available_devices: + print("Error: No Beagle devices found!") + sys.exit(1) + + print(f"Found {len(device_state.available_devices)} device(s)") + + # Find the first available (not in-use) device + device_idx = None + for i, dev in enumerate(device_state.available_devices): + if not dev["in_use"]: + device_idx = i + print( + f"Connecting to device on port {dev['port']} (S/N: {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d})" + ) + break + else: + print(f"Port {dev['port']} is in use") + + if device_idx is None: + print("Error: All devices are in use!") + sys.exit(1) + + # Open the device + device_port = device_state.available_devices[device_idx]["port"] + beagle = bg_open(device_port) + if beagle <= 0: + print(f"Error: Unable to open Beagle device on port {device_port}") + print(f"Error code = {beagle}") + sys.exit(1) + + print(f"Successfully opened Beagle device on port {device_port}") + + # Store device state + device_state.current_device_index = device_idx + device_state.beagle_handle = beagle + + return beagle + + +def configure_device(beagle, samplerate_khz=10000, timeout_ms=500, latency_ms=200): + """Configure the Beagle device for I2C monitoring.""" + # Store settings in device_state + with device_state.lock: + device_state.samplerate_khz = samplerate_khz + device_state.timeout_ms = timeout_ms + device_state.latency_ms = latency_ms + + # Set sampling rate + samplerate = bg_samplerate(beagle, samplerate_khz) + if samplerate < 0: + print(f"Error setting sample rate: {bg_status_string(samplerate)}") + sys.exit(1) + print(f"Sample rate set to {samplerate} kHz") + + with device_state.lock: + device_state.samplerate_khz = samplerate + + # Set idle timeout + bg_timeout(beagle, timeout_ms) + print(f"Idle timeout set to {timeout_ms} ms") + + # Set latency + bg_latency(beagle, latency_ms) + print(f"Latency set to {latency_ms} ms") + + # Disable pullups and target power (passive monitoring) + bg_i2c_pullup(beagle, BG_I2C_PULLUP_OFF) + bg_target_power(beagle, BG_TARGET_POWER_OFF) + + # Get host interface speed + if bg_host_ifce_speed(beagle): + print("Host interface: high speed") + else: + print("Host interface: full speed") + + +# ========================================================================== +# MONITORING FUNCTIONS +# ========================================================================== + + +def monitor_i2c(beagle, args, stop_event, max_packet_len=256): + """Monitor and print I2C transactions with optional MCTP/SPDM parsing.""" + # Calculate timing size + timing_size = bg_bit_timing_size(BG_PROTOCOL_I2C, max_packet_len) + + # Get sample rate for timestamp conversion + samplerate_khz = bg_samplerate(beagle, 0) + + # Enable I2C capture + if bg_enable(beagle, BG_PROTOCOL_I2C) != BG_OK: + add_display_line("Error: Could not enable I2C capture!") + sys.exit(1) + + # Display about text + add_display_line("") + for line in ABOUT_TEXT: + add_display_line(line) + add_display_line("") + + add_display_line("=" * 80) + add_display_line("I2C Monitoring Started - Type 'help' for commands") + add_display_line("=" * 80) + + mode = display_mode.get_mode() + if mode == "spdm": + add_display_line("Mode: SPDM") + add_display_line(" Hides non-SPDM traffic") + elif mode == "mctp": + add_display_line("Mode: MCTP") + add_display_line(" Hides non-MCTP traffic") + elif mode == "smbus": + add_display_line("Mode: SMBus") + if display_mode.validate_pec: + add_display_line(" PEC validation enabled") + else: + add_display_line("Mode: Raw I2C") + + add_display_line("=" * 80) + add_display_line("") + + # Allocate buffers + data_in = array_u16(max_packet_len) + timing = array_u32(timing_size) + + packet_count = 0 + mctp_assembler = MCTPAssembler() + + try: + while not stop_event.is_set(): + # Read I2C transaction + ( + count, + status, + time_sop, + time_duration, + time_dataoffset, + data_in, + timing, + ) = bg_i2c_read_bit_timing(beagle, data_in, timing) + + # Convert timestamp to nanoseconds + time_sop_ns = (time_sop * 1000) // (samplerate_khz // 1000) + + # Skip if no data + if count <= 0: + if count < 0: + add_display_line(f"Error reading I2C data: {count}") + break + continue + + packet_count += 1 + + # Update statistics - count this as a command + with statistics.lock: + statistics.commands_sent += 1 + + # Extract raw data bytes (strip NACK bits) + i2c_addr = None + i2c_rw = None + offset = 0 + has_nack = False + + # Get address if present + if not (status & BG_READ_ERR_MIDDLE_OF_PACKET) and count >= 1: + nack = data_in[0] & BG_I2C_MONITOR_NACK + has_nack = bool(nack) + if count == 1 or (data_in[0] & 0xF9) != 0xF0 or nack: + # 7-bit address + i2c_addr = int(data_in[0] & 0xFF) >> 1 + i2c_rw = "R" if (data_in[0] & 0x01) else "W" + offset = 1 + else: + # 10-bit address + i2c_addr = ((data_in[0] << 7) & 0x300) | (data_in[1] & 0xFF) + i2c_rw = "R" if (data_in[0] & 0x01) else "W" + offset = 2 + + # Count ACKs/NAKs in data bytes + for i in range(offset, count): + if data_in[i] & BG_I2C_MONITOR_NACK: + has_nack = True + + # Update ACK/NAK statistics + with statistics.lock: + if has_nack: + statistics.naks_received += 1 + else: + statistics.acks_received += 1 + + # Extract payload bytes + raw_data = [int(data_in[i] & 0xFF) for i in range(offset, count)] + + # Update byte count + with statistics.lock: + statistics.total_bytes += len(raw_data) + + # Capture raw binary data with metadata for replay + full_transaction = [int(data_in[i] & 0xFF) for i in range(count)] + binary_capture.add_transaction( + time_sop_ns, full_transaction, i2c_addr, i2c_rw, raw_data, status + ) + + # Get current display mode + mode = display_mode.get_mode() + + # Process based on mode + if mode in ["smbus", "mctp", "spdm"] and raw_data: + smbus_result = parse_smbus_header(raw_data, display_mode.validate_pec) + + if smbus_result and mode in ["mctp", "spdm"]: + # Parse MCTP + try: + # Create synthetic MCTP packet + synthetic_mctp = [ + smbus_result["dest_eid"], + (smbus_result["mctp_hdr_version"] << 4) + | smbus_result["mctp_reserved"], + smbus_result["src_eid"], + ] + smbus_result["mctp_data"] + + mctp_parsed = MCTPParser.parse(synthetic_mctp) + + # Handle MCTP assembly + complete_msg = mctp_assembler.add_fragment(mctp_parsed) + + if complete_msg is None: + # Incomplete fragment - hide in mctp/spdm mode + continue + + # Complete message assembled + mctp_parsed = complete_msg + + # Display based on mode + if mode == "spdm" and "spdm" in mctp_parsed: + print_spdm_message( + time_sop_ns, i2c_addr, i2c_rw, mctp_parsed + ) + elif mode == "mctp": + print_mctp_message( + time_sop_ns, i2c_addr, i2c_rw, smbus_result, mctp_parsed + ) + + except Exception as e: + add_display_line( + f"[{time_sop_ns:12d} ns] MCTP Parse Error: {e}" + ) + + elif smbus_result and mode == "smbus": + # SMBus mode - show all SMBus traffic + print_smbus_message( + time_sop_ns, i2c_addr, i2c_rw, smbus_result, raw_data + ) + + # In mctp/spdm mode, hide non-MCTP traffic + + elif mode == "raw": + # Raw I2C mode + print_raw_i2c(time_sop_ns, i2c_addr, i2c_rw, raw_data, status) + + except KeyboardInterrupt: + stop_event.set() + add_display_line("") + add_display_line("Capture stopped by user") + add_display_line(f"Total packets captured: {packet_count}") + + finally: + # Disable capture + bg_disable(beagle) + add_display_line(f"Total packets captured: {packet_count}") + + +def print_raw_i2c(timestamp, addr, rw, data, status): + """Print raw I2C transaction""" + line = f"[{timestamp:12d} ns] " + + if addr is not None: + line += f"[S] <0x{addr:02X}:{rw}> " + + if data: + hex_str = " ".join(f"0x{b:02X}" for b in data) + line += hex_str + " " + + if not (status & BG_READ_I2C_NO_STOP): + line += "[P]" + + add_display_line(line.rstrip()) + + +def print_smbus_message(timestamp, addr, rw, smbus_info, raw_data): + """Print SMBus message with PEC info""" + # Add X prefix if PEC validation is enabled and invalid + prefix = "" + if smbus_info["pec_valid"] is not None and not smbus_info["pec_valid"]: + prefix = "X " + + line = f"[{timestamp:12d} ns] {prefix}SMBus: " + line += f"Cmd=0x{smbus_info['cmd_code']:02X} Len={smbus_info['byte_count']} " + + if smbus_info["pec_valid"] is not None: + if smbus_info["pec_valid"]: + line += f"PEC=✓ " + else: + line += f"PEC=✗ " + + # Show data + hex_str = " ".join(f"{b:02X}" for b in raw_data[:16]) + if len(raw_data) > 16: + hex_str += "..." + line += f"[{hex_str}]" + + add_display_line(line) + + +def print_mctp_message(timestamp, addr, rw, smbus_info, mctp_parsed): + """Print MCTP message with highlighting""" + som_marker = "🟢 SOM" if mctp_parsed.get("som") else "" + eom_marker = "🔴 EOM" if mctp_parsed.get("eom") else "" + markers = f"{som_marker} {eom_marker}".strip() + + line = f"[{timestamp:12d} ns] MCTP: " + if markers: + line += f"{markers} " + + line += f"EID {mctp_parsed['src_eid']:02X}→{mctp_parsed['dest_eid']:02X} " + line += ( + f"Seq={mctp_parsed.get('pkt_seq', '?')} Tag={mctp_parsed.get('msg_tag', '?')} " + ) + line += f"Type=0x{mctp_parsed.get('msg_type', 0):02X} ({mctp_parsed.get('msg_type_name', 'Unknown')})" + + # Show fragment count if assembled + if "fragment_count" in mctp_parsed and mctp_parsed["fragment_count"] > 1: + line += f" [{mctp_parsed['fragment_count']} fragments]" + + add_display_line(line) + + +def print_spdm_message(timestamp, addr, rw, mctp_parsed): + """Print SPDM message with request/response highlighting""" + spdm = mctp_parsed.get("spdm") + if not spdm: + return + + # Highlight request vs response + if spdm["msg_direction"] == "Request": + direction_marker = "📤 REQ" + elif spdm["msg_direction"] == "Response": + direction_marker = "📥 RSP" + else: + direction_marker = "❓" + + line = f"[{timestamp:12d} ns] SPDM: {direction_marker} " + + line += f"EID {mctp_parsed['src_eid']:02X}→{mctp_parsed['dest_eid']:02X} " + if "fragment_count" in mctp_parsed and mctp_parsed["fragment_count"] > 1: + line += f"[{mctp_parsed['fragment_count']} frags] " + + line += f"v{spdm['version']} {spdm['msg_name']} " + + if "param1" in spdm: + line += f"P1=0x{spdm['param1']:02X} " + if "param2" in spdm: + line += f"P2=0x{spdm['param2']:02X} " + + if spdm.get("data_len", 0) > 0: + line += f"+{spdm['data_len']}B" + + add_display_line(line.rstrip()) + + +# ========================================================================== +# MAIN +# ========================================================================== + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="I2C/SMBus Monitor with MCTP/SPDM Support", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s # Raw I2C monitoring + %(prog)s --smbus --validate-pec # SMBus with PEC validation + %(prog)s --mctp # MCTP packet monitoring + %(prog)s --spdm # SPDM message monitoring + %(prog)s --samplerate 5000 --timeout 1000 # Custom sample rate and timeout + """, + ) + + # Protocol mode arguments + parser.add_argument( + "--smbus", action="store_true", help="Treat all packets as having SMBus header" + ) + parser.add_argument( + "--validate-pec", + action="store_true", + help="Validate PEC and mark invalid packets with X (requires --smbus)", + ) + parser.add_argument( + "--mctp", + action="store_true", + help="MCTP mode (implies --smbus --validate-pec, hides non-MCTP traffic)", + ) + parser.add_argument( + "--mctp-no-partial", + action="store_true", + help="Hide MCTP fragments until EOM (requires --mctp or --spdm)", + ) + parser.add_argument( + "--spdm", + action="store_true", + help="SPDM mode (implies --mctp, hides non-SPDM traffic)", + ) + + # Device configuration arguments + parser.add_argument( + "--samplerate", + type=int, + default=10000, + metavar="KHZ", + help="Sample rate in kHz (default: 10000)", + ) + parser.add_argument( + "--timeout", + type=int, + default=500, + metavar="MS", + help="Idle timeout in milliseconds (default: 500)", + ) + parser.add_argument( + "--latency", + type=int, + default=200, + metavar="MS", + help="Latency in milliseconds (default: 200)", + ) + + args = parser.parse_args() + + # Handle argument implications and set initial display mode + if args.spdm: + args.mctp = True + args.mctp_no_partial = True # SPDM mode always waits for complete messages + display_mode.set_mode("spdm") + + if args.mctp: + args.smbus = True + args.validate_pec = True + if not args.spdm: + display_mode.set_mode("mctp") + + if args.smbus and not args.mctp: + display_mode.set_mode("smbus") + + # Store validate_pec in display_mode + display_mode.validate_pec = args.validate_pec + + # Validate argument combinations + if args.validate_pec and not args.smbus: + parser.error("--validate-pec requires --smbus") + + if args.mctp_no_partial and not (args.mctp or args.spdm): + parser.error("--mctp-no-partial requires --mctp or --spdm") + + # Validate configuration arguments + if args.samplerate <= 0: + parser.error("Sample rate must be positive") + if args.timeout < 0: + parser.error("Timeout must be non-negative") + if args.latency < 0: + parser.error("Latency must be non-negative") + + # Print about text to console before starting TUI + print() + for line in ABOUT_TEXT: + print(line) + print() + + # Find and connect to device + beagle = find_and_connect() + + # Create stop event for coordinating threads + stop_event = threading.Event() + + try: + # Configure device + print() + configure_device(beagle, args.samplerate, args.timeout, args.latency) + + # Start monitoring in a separate thread + monitor_thread = threading.Thread( + target=monitor_i2c, args=(beagle, args, stop_event), daemon=True + ) + monitor_thread.start() + + # Run command loop in main thread with curses + curses.wrapper(command_loop_curses, stop_event) + + # Wait for monitoring thread to finish + monitor_thread.join(timeout=2.0) + + finally: + # Clean up + stop_event.set() + bg_close(beagle) + print("Beagle device closed") + + +if __name__ == "__main__": + main()