| #!/usr/bin/env python3 |
| # Licensed under the Apache-2.0 license |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| I2C/SMBus Monitor for TotalPhase Beagle Bus Analyzer with MCTP/SPDM Support |
| Automatically connects to the first available device and listens for I2C transactions. |
| Supports MCTP packet assembly and SPDM message parsing. |
| """ |
| |
| import sys |
| import os |
| import argparse |
| import threading |
| import shutil |
| import curses |
| from typing import List, Optional, Dict |
| from collections import defaultdict |
| import textwrap |
| |
| # Add the Beagle API to the path |
| # Assumes script runs at the same level as beagle-api-linux-x86_64-v6.00 directory |
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
| beagle_lib_path = os.path.join(script_dir, "beagle-api-linux-x86_64-v6.00", "python") |
| sys.path.insert(0, beagle_lib_path) |
| from beagle_py import * |
| |
| |
| # Global state for display |
| class DisplayState: |
| def __init__(self): |
| self.lines = [] |
| self.lock = threading.Lock() |
| self.stdscr = None |
| self.max_lines = 10000 # Keep last N lines for saving |
| self.scroll_offset = 0 # 0 = at bottom, positive = scrolled back |
| self.auto_scroll = True # Auto-scroll when at bottom |
| |
| |
| display_state = DisplayState() |
| |
| |
| # Global state for binary data capture |
| class BinaryDataCapture: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.transactions = [] # List of raw transaction data with metadata |
| self.max_transactions = 10000 # Keep last N transactions |
| |
| def add_transaction( |
| self, timestamp, raw_i2c_data, i2c_addr, i2c_rw, payload_data, status |
| ): |
| """Add a raw I2C transaction with metadata for replay""" |
| with self.lock: |
| self.transactions.append( |
| { |
| "timestamp": timestamp, |
| "data": raw_i2c_data, # Full transaction including address |
| "addr": i2c_addr, |
| "rw": i2c_rw, |
| "payload": payload_data, # Just the data bytes (no address) |
| "status": status, |
| } |
| ) |
| if len(self.transactions) > self.max_transactions: |
| self.transactions.pop(0) |
| |
| def clear(self): |
| """Clear all captured data""" |
| with self.lock: |
| self.transactions.clear() |
| |
| |
| binary_capture = BinaryDataCapture() |
| |
| |
| # Global state for statistics |
| class Statistics: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.commands_sent = 0 |
| self.acks_received = 0 |
| self.naks_received = 0 |
| self.total_bytes = 0 |
| |
| def reset(self): |
| with self.lock: |
| self.commands_sent = 0 |
| self.acks_received = 0 |
| self.naks_received = 0 |
| self.total_bytes = 0 |
| |
| |
| statistics = Statistics() |
| |
| |
| # Global state for device management |
| class DeviceState: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.available_devices = [] # List of (port, unique_id) tuples |
| self.current_device_index = None |
| self.beagle_handle = None |
| self.samplerate_khz = 10000 |
| self.timeout_ms = 500 |
| self.latency_ms = 200 |
| |
| def update_device_list(self): |
| """Refresh the list of available devices""" |
| with self.lock: |
| (num, ports, unique_ids) = bg_find_devices_ext(16, 16) |
| self.available_devices = [] |
| for i in range(num): |
| port = ports[i] |
| unique_id = unique_ids[i] |
| in_use = bool(port & BG_PORT_NOT_FREE) |
| actual_port = port & ~BG_PORT_NOT_FREE |
| self.available_devices.append( |
| { |
| "port": actual_port, |
| "unique_id": unique_id, |
| "in_use": in_use, |
| "raw_port": port, |
| } |
| ) |
| |
| |
| device_state = DeviceState() |
| |
| |
| # Global state for display mode |
| class DisplayMode: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.mode = "raw" # 'raw', 'smbus', 'mctp', 'spdm' |
| self.validate_pec = False |
| |
| def set_mode(self, mode): |
| """Set the display mode""" |
| with self.lock: |
| self.mode = mode |
| |
| def get_mode(self): |
| """Get the current display mode""" |
| with self.lock: |
| return self.mode |
| |
| |
| display_mode = DisplayMode() |
| |
| # About text |
| ABOUT_TEXT = [ |
| "I2C Monitor TUI for TotalPhase Beagle, © 2026 AMD, Inc.", |
| "Made available under the Apache 2 License as part of the OpenPRoT Project.", |
| ] |
| |
| # Command definitions with help text (alphabetically ordered) |
| COMMANDS = { |
| "about": { |
| "description": "Display program information", |
| "usage": "about", |
| "args": [], |
| }, |
| "clear": { |
| "description": "Clear the screen and reset statistics", |
| "usage": "clear", |
| "args": [], |
| }, |
| "device": { |
| "description": "Show or change the active device", |
| "usage": "device [list|use <index|id>]", |
| "args": [ |
| ("list", "Show all available devices"), |
| ("use <index>", "Switch to device by index from list"), |
| ("use <id>", "Switch to device by serial number"), |
| ], |
| }, |
| "display": { |
| "description": "Change display mode and replay buffer", |
| "usage": "display [raw|smbus|mctp|spdm]", |
| "args": [ |
| ("raw", "Show raw I2C transactions"), |
| ("smbus", "Show SMBus packets"), |
| ("mctp", "Show MCTP packets (hides non-MCTP traffic)"), |
| ("spdm", "Show SPDM messages (hides non-SPDM traffic)"), |
| ], |
| }, |
| "exit": {"description": "Exit the analyzer", "usage": "exit", "args": []}, |
| "help": { |
| "description": "Show available commands or help for a specific command", |
| "usage": "help [command]", |
| "args": [("command", "Optional. The command to get help for")], |
| }, |
| "quit": {"description": "Exit the analyzer", "usage": "quit", "args": []}, |
| "save": { |
| "description": "Save captured data to a file", |
| "usage": "save [-b] <filename>", |
| "args": [ |
| ("-b", "Save binary I2C data instead of text output"), |
| ("filename", "Path to output file"), |
| ], |
| }, |
| "settings": { |
| "description": "Show or modify device settings", |
| "usage": "settings [samplerate=KHZ] [timeout=MS] [latency=MS]", |
| "args": [ |
| ("samplerate", "Sample rate in kHz (e.g., samplerate=10000)"), |
| ("timeout", "Idle timeout in milliseconds (e.g., timeout=500)"), |
| ("latency", "Latency in milliseconds (e.g., latency=200)"), |
| ], |
| }, |
| } |
| |
| |
| # ========================================================================== |
| # COMMAND INTERFACE |
| # ========================================================================== |
| |
| |
| def add_display_line(text): |
| """Add a line to the display buffer""" |
| with display_state.lock: |
| display_state.lines.append(text) |
| if len(display_state.lines) > display_state.max_lines: |
| display_state.lines.pop(0) |
| # Reset scroll if auto-scrolling |
| if display_state.auto_scroll: |
| display_state.scroll_offset = 0 |
| |
| |
| def print_help(command=None): |
| """Print help information""" |
| if command: |
| if command in COMMANDS: |
| cmd_info = COMMANDS[command] |
| add_display_line("") |
| add_display_line(f"Command: {command}") |
| add_display_line(f"Description: {cmd_info['description']}") |
| add_display_line(f"Usage: {cmd_info['usage']}") |
| if cmd_info["args"]: |
| add_display_line("") |
| add_display_line("Arguments:") |
| for arg_name, arg_desc in cmd_info["args"]: |
| add_display_line(f" {arg_name}: {arg_desc}") |
| else: |
| add_display_line("") |
| add_display_line(f"Unknown command: {command}") |
| else: |
| add_display_line("") |
| add_display_line("Available commands:") |
| for cmd, info in COMMANDS.items(): |
| add_display_line(f" {cmd:15} - {info['description']}") |
| add_display_line("") |
| add_display_line( |
| "Type 'help <command>' for more information on a specific command" |
| ) |
| add_display_line("") |
| add_display_line("Keyboard shortcuts:") |
| add_display_line(" Up/Down - Navigate command history") |
| add_display_line(" Page Up/Down - Scroll through capture buffer") |
| add_display_line(" Home/End - Jump to top/bottom of buffer") |
| add_display_line(" Ctrl+C - Exit the analyzer") |
| |
| |
| def handle_settings_command(args): |
| """Handle settings command""" |
| if not args: |
| # Show current settings |
| with device_state.lock: |
| add_display_line("") |
| add_display_line("Current Device Settings:") |
| add_display_line(f" Sample Rate: {device_state.samplerate_khz} kHz") |
| add_display_line(f" Timeout: {device_state.timeout_ms} ms") |
| add_display_line(f" Latency: {device_state.latency_ms} ms") |
| return |
| |
| # Parse and apply settings |
| for arg in args: |
| if "=" not in arg: |
| add_display_line(f"Invalid setting format: {arg} (use key=value)") |
| continue |
| |
| key, value = arg.split("=", 1) |
| key = key.lower() |
| |
| try: |
| value_int = int(value) |
| except ValueError: |
| add_display_line(f"Invalid value for {key}: {value} (must be integer)") |
| continue |
| |
| if key == "samplerate": |
| if value_int <= 0: |
| add_display_line(f"Sample rate must be positive") |
| continue |
| with device_state.lock: |
| if device_state.beagle_handle: |
| result = bg_samplerate(device_state.beagle_handle, value_int) |
| if result < 0: |
| add_display_line( |
| f"Error setting sample rate: {bg_status_string(result)}" |
| ) |
| else: |
| device_state.samplerate_khz = result |
| add_display_line(f"Sample rate set to {result} kHz") |
| else: |
| device_state.samplerate_khz = value_int |
| add_display_line( |
| f"Sample rate will be set to {value_int} kHz on next device connection" |
| ) |
| |
| elif key == "timeout": |
| if value_int < 0: |
| add_display_line(f"Timeout must be non-negative") |
| continue |
| with device_state.lock: |
| if device_state.beagle_handle: |
| bg_timeout(device_state.beagle_handle, value_int) |
| device_state.timeout_ms = value_int |
| add_display_line(f"Timeout set to {value_int} ms") |
| |
| elif key == "latency": |
| if value_int < 0: |
| add_display_line(f"Latency must be non-negative") |
| continue |
| with device_state.lock: |
| if device_state.beagle_handle: |
| bg_latency(device_state.beagle_handle, value_int) |
| device_state.latency_ms = value_int |
| add_display_line(f"Latency set to {value_int} ms") |
| |
| else: |
| add_display_line(f"Unknown setting: {key}") |
| |
| |
| def handle_device_command(args, stop_event): |
| """Handle device command""" |
| if not args: |
| # Show current device |
| with device_state.lock: |
| add_display_line("") |
| if ( |
| device_state.current_device_index is not None |
| and device_state.current_device_index |
| < len(device_state.available_devices) |
| ): |
| dev = device_state.available_devices[device_state.current_device_index] |
| add_display_line(f"Current Device:") |
| add_display_line(f" Port: {dev['port']}") |
| add_display_line( |
| f" S/N: {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d}" |
| ) |
| else: |
| add_display_line("No device currently selected") |
| return |
| |
| subcmd = args[0].lower() |
| |
| if subcmd == "list": |
| # Show all available devices |
| device_state.update_device_list() |
| with device_state.lock: |
| add_display_line("") |
| add_display_line("Available Devices:") |
| if not device_state.available_devices: |
| add_display_line(" No devices found") |
| else: |
| for i, dev in enumerate(device_state.available_devices): |
| marker = ">" if i == device_state.current_device_index else " " |
| status = ( |
| "(in use)" |
| if dev["in_use"] and i != device_state.current_device_index |
| else "" |
| ) |
| add_display_line( |
| f"{marker} {i}: Port {dev['port']}, S/N {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d} {status}" |
| ) |
| |
| elif subcmd == "use": |
| if len(args) < 2: |
| add_display_line("Usage: device use <index|serial>") |
| return |
| |
| identifier = args[1] |
| device_state.update_device_list() |
| |
| new_index = None |
| with device_state.lock: |
| # Try as index first |
| try: |
| idx = int(identifier) |
| if 0 <= idx < len(device_state.available_devices): |
| new_index = idx |
| except ValueError: |
| # Try as serial number |
| for i, dev in enumerate(device_state.available_devices): |
| sn_full = f"{dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d}" |
| if str(dev["unique_id"]) == identifier or sn_full == identifier: |
| new_index = i |
| break |
| |
| if new_index is None: |
| add_display_line(f"Device not found: {identifier}") |
| return |
| |
| # Check if device is in use |
| with device_state.lock: |
| if ( |
| device_state.available_devices[new_index]["in_use"] |
| and new_index != device_state.current_device_index |
| ): |
| add_display_line( |
| f"Device {new_index} is already in use by another process" |
| ) |
| return |
| |
| add_display_line(f"Switching to device {new_index}...") |
| add_display_line( |
| "Note: Device switching requires restart - feature not yet fully implemented" |
| ) |
| # TODO: Implement device switching by stopping monitor thread, closing device, opening new one |
| |
| else: |
| add_display_line(f"Unknown device subcommand: {subcmd}") |
| add_display_line("Usage: device [list|use <index|id>]") |
| |
| |
| def handle_save_command(args): |
| """Handle save command""" |
| if not args: |
| add_display_line("Usage: save [-b] <filename>") |
| return |
| |
| # Check for -b flag |
| binary_mode = False |
| filename = None |
| |
| for arg in args: |
| if arg == "-b": |
| binary_mode = True |
| elif not filename: |
| filename = arg |
| |
| if not filename: |
| add_display_line("Error: filename required") |
| add_display_line("Usage: save [-b] <filename>") |
| return |
| |
| try: |
| if binary_mode: |
| # Save raw binary I2C data |
| with binary_capture.lock: |
| if not binary_capture.transactions: |
| add_display_line("No binary data captured") |
| return |
| |
| total_bytes = 0 |
| with open(filename, "wb") as f: |
| for trans in binary_capture.transactions: |
| # Write raw I2C data only |
| f.write(bytes(trans["data"])) |
| total_bytes += len(trans["data"]) |
| |
| add_display_line( |
| f"Saved {len(binary_capture.transactions)} transactions ({total_bytes} bytes) to {filename}" |
| ) |
| else: |
| # Save text output |
| with display_state.lock: |
| if not display_state.lines: |
| add_display_line("No text data to save") |
| return |
| |
| with open(filename, "w") as f: |
| for line in display_state.lines: |
| f.write(line + "\n") |
| |
| add_display_line( |
| f"Saved {len(display_state.lines)} lines to {filename}" |
| ) |
| |
| except IOError as e: |
| add_display_line(f"Error saving file: {e}") |
| except Exception as e: |
| add_display_line(f"Unexpected error: {e}") |
| |
| |
| def replay_buffer(mode): |
| """Replay captured data in specified display mode""" |
| with binary_capture.lock: |
| if not binary_capture.transactions: |
| add_display_line("No data to replay") |
| return |
| |
| # Clear display and reset scroll |
| with display_state.lock: |
| display_state.lines.clear() |
| display_state.scroll_offset = 0 |
| display_state.auto_scroll = True |
| |
| add_display_line("") |
| add_display_line("=" * 80) |
| add_display_line( |
| f"{mode.upper()} Replay - {len(binary_capture.transactions)} transactions" |
| ) |
| add_display_line("=" * 80) |
| add_display_line("") |
| |
| # Create new MCTP assembler for replay |
| mctp_assembler = MCTPAssembler() |
| |
| # Replay each transaction |
| for trans in binary_capture.transactions: |
| timestamp = trans["timestamp"] |
| addr = trans["addr"] |
| rw = trans["rw"] |
| raw_data = trans["payload"] |
| status = trans["status"] |
| |
| if not raw_data: |
| continue |
| |
| # Process based on mode |
| if mode == "raw": |
| print_raw_i2c(timestamp, addr, rw, raw_data, status) |
| |
| elif mode in ["smbus", "mctp", "spdm"]: |
| # Try to parse as SMBus with PEC |
| smbus_result = parse_smbus_header(raw_data, display_mode.validate_pec) |
| |
| if smbus_result and mode in ["mctp", "spdm"]: |
| try: |
| # Create synthetic MCTP packet |
| synthetic_mctp = [ |
| smbus_result["dest_eid"], |
| (smbus_result["mctp_hdr_version"] << 4) |
| | smbus_result["mctp_reserved"], |
| smbus_result["src_eid"], |
| ] + smbus_result["mctp_data"] |
| |
| mctp_parsed = MCTPParser.parse(synthetic_mctp) |
| |
| # Assemble fragments |
| complete_msg = mctp_assembler.add_fragment(mctp_parsed) |
| |
| if complete_msg is not None: |
| # Complete message assembled |
| if mode == "spdm" and "spdm" in complete_msg: |
| print_spdm_message(timestamp, addr, rw, complete_msg) |
| elif mode == "mctp": |
| print_mctp_message( |
| timestamp, addr, rw, smbus_result, complete_msg |
| ) |
| |
| except Exception as e: |
| add_display_line(f"[{timestamp:12d} ns] MCTP Parse Error: {e}") |
| |
| elif smbus_result and mode == "smbus": |
| print_smbus_message(timestamp, addr, rw, smbus_result, raw_data) |
| |
| add_display_line("") |
| add_display_line("=" * 80) |
| add_display_line("Replay Complete") |
| add_display_line("=" * 80) |
| |
| |
| def handle_display_command(args): |
| """Handle display mode command""" |
| if not args: |
| # Show current mode |
| mode = display_mode.get_mode() |
| add_display_line("") |
| add_display_line(f"Current display mode: {mode}") |
| add_display_line("Available modes: raw, smbus, mctp, spdm") |
| return |
| |
| new_mode = args[0].lower() |
| if new_mode not in ["raw", "smbus", "mctp", "spdm"]: |
| add_display_line(f"Unknown mode: {new_mode}") |
| add_display_line("Available modes: raw, smbus, mctp, spdm") |
| return |
| |
| # Set the new mode |
| display_mode.set_mode(new_mode) |
| add_display_line(f"Display mode changed to: {new_mode}") |
| |
| # Replay the buffer in the new mode |
| replay_buffer(new_mode) |
| |
| |
| def handle_command(cmd_line, stop_event): |
| """Handle a command input. Returns True to continue, False to stop.""" |
| parts = cmd_line.strip().split() |
| if not parts: |
| return True |
| |
| cmd = parts[0].lower() |
| args = parts[1:] |
| |
| if cmd == "about": |
| add_display_line("") |
| for line in ABOUT_TEXT: |
| add_display_line(line) |
| add_display_line("") |
| elif cmd == "clear": |
| with display_state.lock: |
| display_state.lines.clear() |
| binary_capture.clear() |
| statistics.reset() |
| elif cmd == "help": |
| if args: |
| print_help(args[0]) |
| else: |
| print_help() |
| elif cmd == "settings": |
| handle_settings_command(args) |
| elif cmd == "device": |
| handle_device_command(args, stop_event) |
| elif cmd == "save": |
| handle_save_command(args) |
| elif cmd == "display": |
| handle_display_command(args) |
| elif cmd == "quit" or cmd == "exit": |
| add_display_line("") |
| add_display_line("Stopping monitoring...") |
| stop_event.set() |
| return False |
| else: |
| add_display_line("") |
| add_display_line(f"Unknown command: {cmd}. Type 'help' for available commands.") |
| |
| return True |
| |
| |
| def draw_screen(stdscr, input_buffer, last_line_count, force_redraw=False): |
| """Draw the screen with monitoring output, command prompt, and status bar at bottom""" |
| height, width = stdscr.getmaxyx() |
| |
| # Calculate areas (3 lines for prompt + 1 status bar at bottom = 4 lines) |
| monitor_height = height - 4 |
| |
| # Check if we need to redraw |
| current_line_count = len(display_state.lines) |
| needs_redraw = force_redraw or (current_line_count != last_line_count) |
| |
| if needs_redraw: |
| # Only erase, don't clear (reduces flicker) |
| stdscr.erase() |
| |
| # Draw monitoring output |
| with display_state.lock: |
| # Calculate which lines to show based on scroll offset |
| total_lines = len(display_state.lines) |
| # scroll_offset = 0 means show the last monitor_height lines |
| # scroll_offset > 0 means scroll back that many lines |
| end_idx = total_lines - display_state.scroll_offset |
| start_idx = max(0, end_idx - monitor_height) |
| |
| # Ensure we don't go past the end |
| end_idx = min(end_idx, total_lines) |
| |
| visible_lines = display_state.lines[start_idx:end_idx] |
| |
| for i, line in enumerate(visible_lines): |
| if i >= monitor_height: |
| break |
| try: |
| # Wrap line if too long |
| if len(line) > width: |
| wrapped = textwrap.wrap(line, width) |
| for j, wrapped_line in enumerate(wrapped): |
| if i + j < monitor_height: |
| stdscr.addstr(i + j, 0, wrapped_line[: width - 1]) |
| else: |
| stdscr.addstr(i, 0, line[: width - 1]) |
| except curses.error: |
| pass |
| |
| # Always redraw prompt area (minimal flicker since it's small) |
| prompt_y = height - 4 |
| try: |
| # Top line of prompt |
| stdscr.addstr(prompt_y, 0, "-" * (width - 1)) |
| # Prompt line with > |
| prompt_line = "> " + input_buffer |
| # Clear the rest of the line |
| stdscr.addstr(prompt_y + 1, 0, (prompt_line + " " * width)[: width - 1]) |
| # Bottom line of prompt |
| stdscr.addstr(prompt_y + 2, 0, "-" * (width - 1)) |
| # Position cursor after prompt |
| cursor_x = min(2 + len(input_buffer), width - 1) |
| stdscr.move(prompt_y + 1, cursor_x) |
| except curses.error: |
| pass |
| |
| # Draw status bar (always redraw) - below the prompt |
| status_y = height - 1 |
| try: |
| with statistics.lock: |
| cmds = statistics.commands_sent |
| acks = statistics.acks_received |
| naks = statistics.naks_received |
| bytes_total = statistics.total_bytes |
| |
| # Get current display mode |
| mode = display_mode.get_mode() |
| |
| # Build status bar with color highlighting |
| x_pos = 0 |
| |
| # Clear the line first |
| stdscr.addstr(status_y, 0, " " * (width - 1), curses.A_REVERSE) |
| |
| # "Cmds: X | " |
| text = f"Cmds: {cmds} | " |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) |
| x_pos += len(text) |
| |
| # "ACKs: " (normal) |
| text = "ACKs: " |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) |
| x_pos += len(text) |
| |
| # ACK number (green) |
| text = str(acks) |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE | curses.color_pair(1)) |
| x_pos += len(text) |
| |
| # " | " |
| text = " | " |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) |
| x_pos += len(text) |
| |
| # "NAKs: " (normal) |
| text = "NAKs: " |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) |
| x_pos += len(text) |
| |
| # NAK number (red) |
| text = str(naks) |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE | curses.color_pair(2)) |
| x_pos += len(text) |
| |
| # " | Bytes: X" |
| text = f" | Bytes: {bytes_total}" |
| if x_pos + len(text) < width: |
| stdscr.addstr(status_y, x_pos, text, curses.A_REVERSE) |
| x_pos += len(text) |
| |
| # Mode (right-justified) |
| mode_text = mode |
| mode_x = width - len(mode_text) - 1 |
| if mode_x > x_pos + 1: # Make sure there's space |
| stdscr.addstr(status_y, mode_x, mode_text, curses.A_REVERSE) |
| |
| except curses.error: |
| pass |
| |
| stdscr.refresh() |
| return current_line_count |
| |
| |
| def command_loop_curses(stdscr, stop_event): |
| """Command input loop with curses UI""" |
| display_state.stdscr = stdscr |
| |
| # Configure curses |
| curses.curs_set(1) # Show cursor |
| stdscr.nodelay(True) # Non-blocking input |
| stdscr.timeout(100) # 100ms timeout for getch |
| stdscr.keypad(True) # Enable keypad mode for special keys |
| |
| # Enable mouse support |
| try: |
| curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) |
| except: |
| pass # Mouse support not available |
| |
| # Initialize colors |
| if curses.has_colors(): |
| curses.start_color() |
| curses.use_default_colors() |
| # Color pair 1: Green for ACKs |
| curses.init_pair(1, curses.COLOR_GREEN, -1) |
| # Color pair 2: Red for NAKs |
| curses.init_pair(2, curses.COLOR_RED, -1) |
| |
| input_buffer = "" |
| last_line_count = 0 |
| force_redraw = True |
| |
| # Command history |
| command_history = [] |
| history_index = -1 # -1 = not browsing history |
| saved_input = "" # Save current input when browsing history |
| |
| # For handling escape sequences |
| escape_sequence = "" |
| in_escape = False |
| |
| try: |
| while not stop_event.is_set(): |
| # Draw the screen (only redraws if content changed or forced) |
| last_line_count = draw_screen( |
| stdscr, input_buffer, last_line_count, force_redraw |
| ) |
| force_redraw = False |
| |
| # Get input |
| try: |
| ch = stdscr.getch() |
| |
| if ch == -1: # No input |
| continue |
| |
| # Handle mouse events |
| if ch == curses.KEY_MOUSE: |
| try: |
| _, mx, my, _, bstate = curses.getmouse() |
| height, width = stdscr.getmaxyx() |
| monitor_height = height - 4 |
| |
| # Mouse wheel up (scroll up in buffer) |
| if bstate & curses.BUTTON4_PRESSED: |
| with display_state.lock: |
| max_scroll = max( |
| 0, len(display_state.lines) - monitor_height |
| ) |
| display_state.scroll_offset = min( |
| display_state.scroll_offset + 3, max_scroll |
| ) |
| if display_state.scroll_offset > 0: |
| display_state.auto_scroll = False |
| force_redraw = True |
| continue |
| |
| # Mouse wheel down (scroll down in buffer) |
| elif bstate & curses.BUTTON5_PRESSED: |
| with display_state.lock: |
| display_state.scroll_offset = max( |
| 0, display_state.scroll_offset - 3 |
| ) |
| if display_state.scroll_offset == 0: |
| display_state.auto_scroll = True |
| force_redraw = True |
| continue |
| |
| except curses.error: |
| pass |
| continue |
| |
| # Handle escape sequences for arrow keys (if curses keys don't work) |
| if ch == 27: # ESC |
| in_escape = True |
| escape_sequence = "" |
| continue |
| elif in_escape: |
| escape_sequence += chr(ch) if ch < 256 else "" |
| |
| # Check if we have a complete escape sequence |
| if escape_sequence == "[A": # Up arrow |
| ch = curses.KEY_UP |
| in_escape = False |
| elif escape_sequence == "[B": # Down arrow |
| ch = curses.KEY_DOWN |
| in_escape = False |
| elif escape_sequence == "[5~": # Page Up |
| ch = curses.KEY_PPAGE |
| in_escape = False |
| elif escape_sequence == "[6~": # Page Down |
| ch = curses.KEY_NPAGE |
| in_escape = False |
| elif escape_sequence == "[H": # Home |
| ch = curses.KEY_HOME |
| in_escape = False |
| elif escape_sequence == "[F": # End |
| ch = curses.KEY_END |
| in_escape = False |
| elif len(escape_sequence) >= 3: # Unknown sequence, give up |
| in_escape = False |
| continue |
| else: |
| continue # Need more characters |
| |
| if ch == ord("\n"): # Enter |
| if input_buffer.strip(): |
| # Add to history if not duplicate of last command |
| if not command_history or command_history[-1] != input_buffer: |
| command_history.append(input_buffer) |
| # Limit history to 100 commands |
| if len(command_history) > 100: |
| command_history.pop(0) |
| |
| if not handle_command(input_buffer, stop_event): |
| break |
| input_buffer = "" |
| history_index = -1 |
| saved_input = "" |
| force_redraw = True # Redraw after command |
| |
| elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 8: # Backspace |
| if input_buffer: |
| input_buffer = input_buffer[:-1] |
| history_index = -1 # Exit history browsing |
| |
| elif ch == curses.KEY_UP: # Up arrow - previous command |
| if command_history: |
| if history_index == -1: |
| # Start browsing history, save current input |
| saved_input = input_buffer |
| history_index = len(command_history) - 1 |
| elif history_index > 0: |
| history_index -= 1 |
| |
| if history_index >= 0: |
| input_buffer = command_history[history_index] |
| |
| elif ch == curses.KEY_DOWN: # Down arrow - next command |
| if history_index != -1: |
| if history_index < len(command_history) - 1: |
| history_index += 1 |
| input_buffer = command_history[history_index] |
| else: |
| # Reached end of history, restore saved input |
| history_index = -1 |
| input_buffer = saved_input |
| |
| elif ch == curses.KEY_PPAGE: # Page Up - scroll up |
| height, width = stdscr.getmaxyx() |
| monitor_height = height - 4 |
| with display_state.lock: |
| # Scroll up one page |
| max_scroll = max(0, len(display_state.lines) - monitor_height) |
| display_state.scroll_offset = min( |
| display_state.scroll_offset + monitor_height, max_scroll |
| ) |
| # Disable auto-scroll when manually scrolling |
| if display_state.scroll_offset > 0: |
| display_state.auto_scroll = False |
| force_redraw = True |
| |
| elif ch == curses.KEY_NPAGE: # Page Down - scroll down |
| with display_state.lock: |
| display_state.scroll_offset = max( |
| 0, display_state.scroll_offset - (height - 4) |
| ) |
| # Re-enable auto-scroll when scrolled to bottom |
| if display_state.scroll_offset == 0: |
| display_state.auto_scroll = True |
| force_redraw = True |
| |
| elif ch == curses.KEY_HOME: # Home - scroll to top |
| height, width = stdscr.getmaxyx() |
| monitor_height = height - 4 |
| with display_state.lock: |
| display_state.scroll_offset = max( |
| 0, len(display_state.lines) - monitor_height |
| ) |
| display_state.auto_scroll = False |
| force_redraw = True |
| |
| elif ch == curses.KEY_END: # End - scroll to bottom |
| with display_state.lock: |
| display_state.scroll_offset = 0 |
| display_state.auto_scroll = True |
| force_redraw = True |
| |
| elif ch == 3: # Ctrl+C |
| stop_event.set() |
| break |
| elif ch == curses.KEY_RESIZE: # Terminal resized |
| force_redraw = True |
| elif 32 <= ch <= 126: # Printable characters |
| input_buffer += chr(ch) |
| history_index = -1 # Exit history browsing on new input |
| except curses.error: |
| pass |
| |
| except KeyboardInterrupt: |
| stop_event.set() |
| finally: |
| display_state.stdscr = None |
| |
| |
| # ========================================================================== |
| # MCTP/SPDM PARSING CODE (from mctp-parse) |
| # ========================================================================== |
| |
| |
| class SPDMParser: |
| """Parser for SPDM messages (DSP0274)""" |
| |
| # Request codes (DSP0274 Table 4) |
| REQUEST_CODES = { |
| 0x81: "GET_DIGESTS", |
| 0x82: "GET_CERTIFICATE", |
| 0x83: "CHALLENGE", |
| 0x84: "GET_VERSION", |
| 0x85: "CHUNK_SEND", |
| 0x86: "CHUNK_GET", |
| 0x87: "GET_ENDPOINT_INFO", |
| 0xE0: "GET_MEASUREMENTS", |
| 0xE1: "GET_CAPABILITIES", |
| 0xE2: "GET_SUPPORTED_EVENT_TYPES", |
| 0xE3: "NEGOTIATE_ALGORITHMS", |
| 0xE4: "KEY_EXCHANGE", |
| 0xE5: "FINISH", |
| 0xE6: "PSK_EXCHANGE", |
| 0xE7: "PSK_FINISH", |
| 0xE8: "HEARTBEAT", |
| 0xE9: "KEY_UPDATE", |
| 0xEA: "GET_ENCAPSULATED_REQUEST", |
| 0xEB: "DELIVER_ENCAPSULATED_RESPONSE", |
| 0xEC: "END_SESSION", |
| 0xED: "GET_CSR", |
| 0xEE: "SET_CERTIFICATE", |
| 0xEF: "GET_MEASUREMENT_EXTENSION_LOG", |
| 0xF0: "SUBSCRIBE_EVENT_TYPES", |
| 0xF1: "SEND_EVENT", |
| 0xFC: "GET_KEY_PAIR_INFO", |
| 0xFD: "SET_KEY_PAIR_INFO", |
| 0xFE: "VENDOR_DEFINED_REQUEST", |
| 0xFF: "RESPOND_IF_READY", |
| } |
| |
| # Response codes (DSP0274 Table 5) |
| RESPONSE_CODES = { |
| 0x01: "DIGESTS", |
| 0x02: "CERTIFICATE", |
| 0x03: "CHALLENGE_AUTH", |
| 0x04: "VERSION", |
| 0x05: "CHUNK_SEND_ACK", |
| 0x06: "CHUNK_RESPONSE", |
| 0x07: "ENDPOINT_INFO", |
| 0x60: "MEASUREMENTS", |
| 0x61: "CAPABILITIES", |
| 0x62: "SUPPORTED_EVENT_TYPES", |
| 0x63: "ALGORITHMS", |
| 0x64: "KEY_EXCHANGE_RSP", |
| 0x65: "FINISH_RSP", |
| 0x66: "PSK_EXCHANGE_RSP", |
| 0x67: "PSK_FINISH_RSP", |
| 0x68: "HEARTBEAT_ACK", |
| 0x69: "KEY_UPDATE_ACK", |
| 0x6A: "ENCAPSULATED_REQUEST", |
| 0x6B: "ENCAPSULATED_RESPONSE_ACK", |
| 0x6C: "END_SESSION_ACK", |
| 0x6D: "CSR", |
| 0x6E: "SET_CERTIFICATE_RSP", |
| 0x6F: "MEASUREMENT_EXTENSION_LOG", |
| 0x70: "SUBSCRIBE_EVENT_TYPES_ACK", |
| 0x71: "EVENT_ACK", |
| 0x7C: "KEY_PAIR_INFO", |
| 0x7D: "SET_KEY_PAIR_INFO_ACK", |
| 0x7E: "VENDOR_DEFINED_RESPONSE", |
| 0x7F: "ERROR", |
| } |
| |
| @staticmethod |
| def parse(payload: List[int]) -> Optional[Dict]: |
| """Parse SPDM message from payload""" |
| if len(payload) < 2: |
| return None |
| |
| result = {} |
| |
| # Byte 0: SPDM version (major [7:4], minor [3:0]) |
| version_byte = payload[0] |
| result["version_major"] = (version_byte >> 4) & 0x0F |
| result["version_minor"] = version_byte & 0x0F |
| result["version"] = f"{result['version_major']}.{result['version_minor']}" |
| |
| # Byte 1: Request/Response code |
| code = payload[1] |
| result["code"] = code |
| |
| # Determine if request or response |
| if code in SPDMParser.REQUEST_CODES: |
| result["msg_direction"] = "Request" |
| result["msg_name"] = SPDMParser.REQUEST_CODES[code] |
| elif code in SPDMParser.RESPONSE_CODES: |
| result["msg_direction"] = "Response" |
| result["msg_name"] = SPDMParser.RESPONSE_CODES[code] |
| else: |
| result["msg_direction"] = "Unknown" |
| result["msg_name"] = f"Unknown (0x{code:02X})" |
| |
| # Byte 2: Param1 (if present) |
| if len(payload) > 2: |
| result["param1"] = payload[2] |
| |
| # Byte 3: Param2 (if present) |
| if len(payload) > 3: |
| result["param2"] = payload[3] |
| |
| # Remaining data |
| if len(payload) > 4: |
| result["data"] = payload[4:] |
| result["data_len"] = len(result["data"]) |
| else: |
| result["data"] = [] |
| result["data_len"] = 0 |
| |
| return result |
| |
| |
| class MCTPParser: |
| """Parser for MCTP packets""" |
| |
| # Message type definitions (DSP0236 Table 3) |
| MESSAGE_TYPES = { |
| 0x00: "MCTP Control Message", |
| 0x01: "PLDM", |
| 0x02: "NC-SI over MCTP", |
| 0x03: "Ethernet over MCTP", |
| 0x04: "NVMe-MI over MCTP", |
| 0x05: "SPDM over MCTP", |
| 0x06: "SECDED over MCTP", |
| 0x07: "CXL FM API over MCTP", |
| 0x08: "CXL CCI over MCTP", |
| } |
| |
| @staticmethod |
| def parse(data: List[int]) -> dict: |
| """Parse MCTP packet and return structured data""" |
| if len(data) < 4: |
| raise ValueError( |
| "Packet too short - minimum 4 bytes required for MCTP header" |
| ) |
| |
| result = {} |
| |
| # Byte 0: Destination EID |
| result["dest_eid"] = data[0] |
| |
| # Byte 1: Header Version [7:4], Reserved [3:0] |
| byte1 = data[1] |
| result["header_version"] = (byte1 >> 4) & 0x0F |
| result["rsvd"] = byte1 & 0x0F |
| |
| # Byte 2: Source EID |
| result["src_eid"] = data[2] |
| |
| # Byte 3: SOM, EOM, Pkt_Seq, TO, Msg_Tag |
| byte3 = data[3] |
| result["som"] = bool(byte3 & 0x80) # Start of Message |
| result["eom"] = bool(byte3 & 0x40) # End of Message |
| result["pkt_seq"] = (byte3 >> 4) & 0x03 # Packet sequence number |
| result["to"] = bool(byte3 & 0x08) # Tag Owner |
| result["msg_tag"] = byte3 & 0x07 # Message Tag |
| |
| # Message body starts at byte 4 |
| if len(data) > 4: |
| # Byte 4: IC [7], Message Type [6:0] |
| byte4 = data[4] |
| result["ic"] = bool(byte4 & 0x80) # Integrity Check |
| result["msg_type"] = byte4 & 0x7F |
| result["msg_type_name"] = MCTPParser.MESSAGE_TYPES.get( |
| result["msg_type"], |
| ( |
| f"Vendor Defined (0x{result['msg_type']:02X})" |
| if result["msg_type"] >= 0x7E |
| else f"Reserved (0x{result['msg_type']:02X})" |
| ), |
| ) |
| |
| # Payload starts at byte 5 |
| if len(data) > 5: |
| result["payload"] = data[5:] |
| result["payload_len"] = len(result["payload"]) |
| |
| # Parse SPDM if message type is 0x05 |
| if result["msg_type"] == 0x05 and result["payload_len"] > 0: |
| result["spdm"] = SPDMParser.parse(result["payload"]) |
| else: |
| result["payload"] = [] |
| result["payload_len"] = 0 |
| |
| return result |
| |
| |
| def calculate_pec(data: List[int]) -> int: |
| """ |
| Calculate SMBus PEC (Packet Error Code) using CRC-8. |
| |
| SMBus uses CRC-8 with polynomial 0x07 (x^8 + x^2 + x + 1). |
| Initial value is 0x00. |
| """ |
| crc = 0x00 |
| polynomial = 0x07 |
| |
| for byte in data: |
| crc ^= byte |
| for _ in range(8): |
| if crc & 0x80: |
| crc = (crc << 1) ^ polynomial |
| else: |
| crc = crc << 1 |
| crc &= 0xFF |
| |
| return crc |
| |
| |
| def parse_smbus_header(data: List[int], expect_pec: bool = False) -> Optional[Dict]: |
| """ |
| Parse SMBus/I2C transport binding header (DSP0237). |
| Returns parsed transport info or None if not valid SMBus format. |
| """ |
| if len(data) < 3: |
| return None |
| |
| # Check for MCTP over SMBus command code (0x0F) |
| if data[0] != 0x0F: |
| return None |
| |
| result = {} |
| result["cmd_code"] = data[0] |
| result["byte_count"] = data[1] |
| |
| expected_total = 2 + result["byte_count"] |
| if expect_pec: |
| expected_total += 1 |
| |
| if len(data) < expected_total: |
| return None |
| |
| if expect_pec and len(data) >= expected_total: |
| result["pec_received"] = data[expected_total - 1] |
| # Calculate expected PEC |
| pec_data = data[: expected_total - 1] |
| result["pec_calculated"] = calculate_pec(pec_data) |
| result["pec_valid"] = result["pec_received"] == result["pec_calculated"] |
| data_end = expected_total - 1 |
| else: |
| result["pec_received"] = None |
| result["pec_calculated"] = None |
| result["pec_valid"] = None |
| data_end = min(len(data), expected_total) |
| |
| # SMBus-specific headers start at byte 2 |
| if result["byte_count"] < 5: |
| return None |
| |
| offset = 2 |
| result["source_slave_addr"] = data[offset] |
| offset += 1 |
| |
| # MCTP reserved + header version |
| byte_hdr = data[offset] |
| result["mctp_reserved"] = (byte_hdr >> 4) & 0x0F |
| result["mctp_hdr_version"] = byte_hdr & 0x0F |
| offset += 1 |
| |
| # Destination and Source EIDs |
| result["dest_eid"] = data[offset] |
| offset += 1 |
| result["src_eid"] = data[offset] |
| offset += 1 |
| |
| # MCTP packet starts here (SOM/EOM byte) |
| result["mctp_offset"] = offset |
| result["mctp_data"] = data[offset:data_end] |
| |
| return result |
| |
| |
| # ========================================================================== |
| # MCTP MESSAGE ASSEMBLER |
| # ========================================================================== |
| |
| |
| class MCTPAssembler: |
| """Assembles fragmented MCTP messages""" |
| |
| def __init__(self): |
| # Key: (src_eid, dest_eid, msg_tag) |
| # Value: {'fragments': [], 'expected_seq': int, 'complete': bool} |
| self.sessions = {} |
| |
| def add_fragment(self, mctp_data: Dict) -> Optional[Dict]: |
| """ |
| Add a fragment and return complete message if EOM is reached. |
| Returns None if message is incomplete. |
| """ |
| key = (mctp_data["src_eid"], mctp_data["dest_eid"], mctp_data["msg_tag"]) |
| |
| # SOM - start new session |
| if mctp_data["som"]: |
| self.sessions[key] = { |
| "fragments": [mctp_data], |
| "expected_seq": (mctp_data["pkt_seq"] + 1) % 4, |
| "src_eid": mctp_data["src_eid"], |
| "dest_eid": mctp_data["dest_eid"], |
| "msg_tag": mctp_data["msg_tag"], |
| "msg_type": mctp_data.get("msg_type"), |
| "msg_type_name": mctp_data.get("msg_type_name"), |
| } |
| |
| # Single packet message (SOM+EOM) |
| if mctp_data["eom"]: |
| session = self.sessions.pop(key) |
| return self._assemble_session(session) |
| |
| return None |
| |
| # Middle or end fragment |
| if key not in self.sessions: |
| # Fragment without SOM - ignore or could be error |
| return None |
| |
| session = self.sessions[key] |
| |
| # Check sequence number |
| if mctp_data["pkt_seq"] != session["expected_seq"]: |
| # Sequence error - drop session |
| del self.sessions[key] |
| return None |
| |
| session["fragments"].append(mctp_data) |
| session["expected_seq"] = (mctp_data["pkt_seq"] + 1) % 4 |
| |
| # EOM - assemble complete message |
| if mctp_data["eom"]: |
| complete_session = self.sessions.pop(key) |
| return self._assemble_session(complete_session) |
| |
| return None |
| |
| def _assemble_session(self, session: Dict) -> Dict: |
| """Assemble fragments into complete message""" |
| # Combine all payloads |
| complete_payload = [] |
| for frag in session["fragments"]: |
| if "payload" in frag and frag["payload"]: |
| complete_payload.extend(frag["payload"]) |
| |
| result = { |
| "src_eid": session["src_eid"], |
| "dest_eid": session["dest_eid"], |
| "msg_tag": session["msg_tag"], |
| "msg_type": session["msg_type"], |
| "msg_type_name": session["msg_type_name"], |
| "payload": complete_payload, |
| "payload_len": len(complete_payload), |
| "fragment_count": len(session["fragments"]), |
| } |
| |
| # Parse SPDM if applicable |
| if session["msg_type"] == 0x05 and complete_payload: |
| result["spdm"] = SPDMParser.parse(complete_payload) |
| |
| return result |
| |
| |
| # ========================================================================== |
| # BEAGLE DEVICE FUNCTIONS |
| # ========================================================================== |
| |
| |
| def find_and_connect(): |
| """Find and connect to the first available Beagle device.""" |
| print("Searching for Beagle devices...") |
| |
| # Update device list |
| device_state.update_device_list() |
| |
| with device_state.lock: |
| if not device_state.available_devices: |
| print("Error: No Beagle devices found!") |
| sys.exit(1) |
| |
| print(f"Found {len(device_state.available_devices)} device(s)") |
| |
| # Find the first available (not in-use) device |
| device_idx = None |
| for i, dev in enumerate(device_state.available_devices): |
| if not dev["in_use"]: |
| device_idx = i |
| print( |
| f"Connecting to device on port {dev['port']} (S/N: {dev['unique_id']:04d}-{dev['unique_id'] % 1000000:06d})" |
| ) |
| break |
| else: |
| print(f"Port {dev['port']} is in use") |
| |
| if device_idx is None: |
| print("Error: All devices are in use!") |
| sys.exit(1) |
| |
| # Open the device |
| device_port = device_state.available_devices[device_idx]["port"] |
| beagle = bg_open(device_port) |
| if beagle <= 0: |
| print(f"Error: Unable to open Beagle device on port {device_port}") |
| print(f"Error code = {beagle}") |
| sys.exit(1) |
| |
| print(f"Successfully opened Beagle device on port {device_port}") |
| |
| # Store device state |
| device_state.current_device_index = device_idx |
| device_state.beagle_handle = beagle |
| |
| return beagle |
| |
| |
| def configure_device(beagle, samplerate_khz=10000, timeout_ms=500, latency_ms=200): |
| """Configure the Beagle device for I2C monitoring.""" |
| # Store settings in device_state |
| with device_state.lock: |
| device_state.samplerate_khz = samplerate_khz |
| device_state.timeout_ms = timeout_ms |
| device_state.latency_ms = latency_ms |
| |
| # Set sampling rate |
| samplerate = bg_samplerate(beagle, samplerate_khz) |
| if samplerate < 0: |
| print(f"Error setting sample rate: {bg_status_string(samplerate)}") |
| sys.exit(1) |
| print(f"Sample rate set to {samplerate} kHz") |
| |
| with device_state.lock: |
| device_state.samplerate_khz = samplerate |
| |
| # Set idle timeout |
| bg_timeout(beagle, timeout_ms) |
| print(f"Idle timeout set to {timeout_ms} ms") |
| |
| # Set latency |
| bg_latency(beagle, latency_ms) |
| print(f"Latency set to {latency_ms} ms") |
| |
| # Disable pullups and target power (passive monitoring) |
| bg_i2c_pullup(beagle, BG_I2C_PULLUP_OFF) |
| bg_target_power(beagle, BG_TARGET_POWER_OFF) |
| |
| # Get host interface speed |
| if bg_host_ifce_speed(beagle): |
| print("Host interface: high speed") |
| else: |
| print("Host interface: full speed") |
| |
| |
| # ========================================================================== |
| # MONITORING FUNCTIONS |
| # ========================================================================== |
| |
| |
| def monitor_i2c(beagle, args, stop_event, max_packet_len=256): |
| """Monitor and print I2C transactions with optional MCTP/SPDM parsing.""" |
| # Calculate timing size |
| timing_size = bg_bit_timing_size(BG_PROTOCOL_I2C, max_packet_len) |
| |
| # Get sample rate for timestamp conversion |
| samplerate_khz = bg_samplerate(beagle, 0) |
| |
| # Enable I2C capture |
| if bg_enable(beagle, BG_PROTOCOL_I2C) != BG_OK: |
| add_display_line("Error: Could not enable I2C capture!") |
| sys.exit(1) |
| |
| # Display about text |
| add_display_line("") |
| for line in ABOUT_TEXT: |
| add_display_line(line) |
| add_display_line("") |
| |
| add_display_line("=" * 80) |
| add_display_line("I2C Monitoring Started - Type 'help' for commands") |
| add_display_line("=" * 80) |
| |
| mode = display_mode.get_mode() |
| if mode == "spdm": |
| add_display_line("Mode: SPDM") |
| add_display_line(" Hides non-SPDM traffic") |
| elif mode == "mctp": |
| add_display_line("Mode: MCTP") |
| add_display_line(" Hides non-MCTP traffic") |
| elif mode == "smbus": |
| add_display_line("Mode: SMBus") |
| if display_mode.validate_pec: |
| add_display_line(" PEC validation enabled") |
| else: |
| add_display_line("Mode: Raw I2C") |
| |
| add_display_line("=" * 80) |
| add_display_line("") |
| |
| # Allocate buffers |
| data_in = array_u16(max_packet_len) |
| timing = array_u32(timing_size) |
| |
| packet_count = 0 |
| mctp_assembler = MCTPAssembler() |
| |
| try: |
| while not stop_event.is_set(): |
| # Read I2C transaction |
| ( |
| count, |
| status, |
| time_sop, |
| time_duration, |
| time_dataoffset, |
| data_in, |
| timing, |
| ) = bg_i2c_read_bit_timing(beagle, data_in, timing) |
| |
| # Convert timestamp to nanoseconds |
| time_sop_ns = (time_sop * 1000) // (samplerate_khz // 1000) |
| |
| # Skip if no data |
| if count <= 0: |
| if count < 0: |
| add_display_line(f"Error reading I2C data: {count}") |
| break |
| continue |
| |
| packet_count += 1 |
| |
| # Update statistics - count this as a command |
| with statistics.lock: |
| statistics.commands_sent += 1 |
| |
| # Extract raw data bytes (strip NACK bits) |
| i2c_addr = None |
| i2c_rw = None |
| offset = 0 |
| has_nack = False |
| |
| # Get address if present |
| if not (status & BG_READ_ERR_MIDDLE_OF_PACKET) and count >= 1: |
| nack = data_in[0] & BG_I2C_MONITOR_NACK |
| has_nack = bool(nack) |
| if count == 1 or (data_in[0] & 0xF9) != 0xF0 or nack: |
| # 7-bit address |
| i2c_addr = int(data_in[0] & 0xFF) >> 1 |
| i2c_rw = "R" if (data_in[0] & 0x01) else "W" |
| offset = 1 |
| else: |
| # 10-bit address |
| i2c_addr = ((data_in[0] << 7) & 0x300) | (data_in[1] & 0xFF) |
| i2c_rw = "R" if (data_in[0] & 0x01) else "W" |
| offset = 2 |
| |
| # Count ACKs/NAKs in data bytes |
| for i in range(offset, count): |
| if data_in[i] & BG_I2C_MONITOR_NACK: |
| has_nack = True |
| |
| # Update ACK/NAK statistics |
| with statistics.lock: |
| if has_nack: |
| statistics.naks_received += 1 |
| else: |
| statistics.acks_received += 1 |
| |
| # Extract payload bytes |
| raw_data = [int(data_in[i] & 0xFF) for i in range(offset, count)] |
| |
| # Update byte count |
| with statistics.lock: |
| statistics.total_bytes += len(raw_data) |
| |
| # Capture raw binary data with metadata for replay |
| full_transaction = [int(data_in[i] & 0xFF) for i in range(count)] |
| binary_capture.add_transaction( |
| time_sop_ns, full_transaction, i2c_addr, i2c_rw, raw_data, status |
| ) |
| |
| # Get current display mode |
| mode = display_mode.get_mode() |
| |
| # Process based on mode |
| if mode in ["smbus", "mctp", "spdm"] and raw_data: |
| smbus_result = parse_smbus_header(raw_data, display_mode.validate_pec) |
| |
| if smbus_result and mode in ["mctp", "spdm"]: |
| # Parse MCTP |
| try: |
| # Create synthetic MCTP packet |
| synthetic_mctp = [ |
| smbus_result["dest_eid"], |
| (smbus_result["mctp_hdr_version"] << 4) |
| | smbus_result["mctp_reserved"], |
| smbus_result["src_eid"], |
| ] + smbus_result["mctp_data"] |
| |
| mctp_parsed = MCTPParser.parse(synthetic_mctp) |
| |
| # Handle MCTP assembly |
| complete_msg = mctp_assembler.add_fragment(mctp_parsed) |
| |
| if complete_msg is None: |
| # Incomplete fragment - hide in mctp/spdm mode |
| continue |
| |
| # Complete message assembled |
| mctp_parsed = complete_msg |
| |
| # Display based on mode |
| if mode == "spdm" and "spdm" in mctp_parsed: |
| print_spdm_message( |
| time_sop_ns, i2c_addr, i2c_rw, mctp_parsed |
| ) |
| elif mode == "mctp": |
| print_mctp_message( |
| time_sop_ns, i2c_addr, i2c_rw, smbus_result, mctp_parsed |
| ) |
| |
| except Exception as e: |
| add_display_line( |
| f"[{time_sop_ns:12d} ns] MCTP Parse Error: {e}" |
| ) |
| |
| elif smbus_result and mode == "smbus": |
| # SMBus mode - show all SMBus traffic |
| print_smbus_message( |
| time_sop_ns, i2c_addr, i2c_rw, smbus_result, raw_data |
| ) |
| |
| # In mctp/spdm mode, hide non-MCTP traffic |
| |
| elif mode == "raw": |
| # Raw I2C mode |
| print_raw_i2c(time_sop_ns, i2c_addr, i2c_rw, raw_data, status) |
| |
| except KeyboardInterrupt: |
| stop_event.set() |
| add_display_line("") |
| add_display_line("Capture stopped by user") |
| add_display_line(f"Total packets captured: {packet_count}") |
| |
| finally: |
| # Disable capture |
| bg_disable(beagle) |
| add_display_line(f"Total packets captured: {packet_count}") |
| |
| |
| def print_raw_i2c(timestamp, addr, rw, data, status): |
| """Print raw I2C transaction""" |
| line = f"[{timestamp:12d} ns] " |
| |
| if addr is not None: |
| line += f"[S] <0x{addr:02X}:{rw}> " |
| |
| if data: |
| hex_str = " ".join(f"0x{b:02X}" for b in data) |
| line += hex_str + " " |
| |
| if not (status & BG_READ_I2C_NO_STOP): |
| line += "[P]" |
| |
| add_display_line(line.rstrip()) |
| |
| |
| def print_smbus_message(timestamp, addr, rw, smbus_info, raw_data): |
| """Print SMBus message with PEC info""" |
| # Add X prefix if PEC validation is enabled and invalid |
| prefix = "" |
| if smbus_info["pec_valid"] is not None and not smbus_info["pec_valid"]: |
| prefix = "X " |
| |
| line = f"[{timestamp:12d} ns] {prefix}SMBus: " |
| line += f"Cmd=0x{smbus_info['cmd_code']:02X} Len={smbus_info['byte_count']} " |
| |
| if smbus_info["pec_valid"] is not None: |
| if smbus_info["pec_valid"]: |
| line += f"PEC=✓ " |
| else: |
| line += f"PEC=✗ " |
| |
| # Show data |
| hex_str = " ".join(f"{b:02X}" for b in raw_data[:16]) |
| if len(raw_data) > 16: |
| hex_str += "..." |
| line += f"[{hex_str}]" |
| |
| add_display_line(line) |
| |
| |
| def print_mctp_message(timestamp, addr, rw, smbus_info, mctp_parsed): |
| """Print MCTP message with highlighting""" |
| som_marker = "🟢 SOM" if mctp_parsed.get("som") else "" |
| eom_marker = "🔴 EOM" if mctp_parsed.get("eom") else "" |
| markers = f"{som_marker} {eom_marker}".strip() |
| |
| line = f"[{timestamp:12d} ns] MCTP: " |
| if markers: |
| line += f"{markers} " |
| |
| line += f"EID {mctp_parsed['src_eid']:02X}→{mctp_parsed['dest_eid']:02X} " |
| line += ( |
| f"Seq={mctp_parsed.get('pkt_seq', '?')} Tag={mctp_parsed.get('msg_tag', '?')} " |
| ) |
| line += f"Type=0x{mctp_parsed.get('msg_type', 0):02X} ({mctp_parsed.get('msg_type_name', 'Unknown')})" |
| |
| # Show fragment count if assembled |
| if "fragment_count" in mctp_parsed and mctp_parsed["fragment_count"] > 1: |
| line += f" [{mctp_parsed['fragment_count']} fragments]" |
| |
| add_display_line(line) |
| |
| |
| def print_spdm_message(timestamp, addr, rw, mctp_parsed): |
| """Print SPDM message with request/response highlighting""" |
| spdm = mctp_parsed.get("spdm") |
| if not spdm: |
| return |
| |
| # Highlight request vs response |
| if spdm["msg_direction"] == "Request": |
| direction_marker = "📤 REQ" |
| elif spdm["msg_direction"] == "Response": |
| direction_marker = "📥 RSP" |
| else: |
| direction_marker = "❓" |
| |
| line = f"[{timestamp:12d} ns] SPDM: {direction_marker} " |
| |
| line += f"EID {mctp_parsed['src_eid']:02X}→{mctp_parsed['dest_eid']:02X} " |
| if "fragment_count" in mctp_parsed and mctp_parsed["fragment_count"] > 1: |
| line += f"[{mctp_parsed['fragment_count']} frags] " |
| |
| line += f"v{spdm['version']} {spdm['msg_name']} " |
| |
| if "param1" in spdm: |
| line += f"P1=0x{spdm['param1']:02X} " |
| if "param2" in spdm: |
| line += f"P2=0x{spdm['param2']:02X} " |
| |
| if spdm.get("data_len", 0) > 0: |
| line += f"+{spdm['data_len']}B" |
| |
| add_display_line(line.rstrip()) |
| |
| |
| # ========================================================================== |
| # MAIN |
| # ========================================================================== |
| |
| |
| def main(): |
| """Main entry point.""" |
| parser = argparse.ArgumentParser( |
| description="I2C/SMBus Monitor with MCTP/SPDM Support", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| %(prog)s # Raw I2C monitoring |
| %(prog)s --smbus --validate-pec # SMBus with PEC validation |
| %(prog)s --mctp # MCTP packet monitoring |
| %(prog)s --spdm # SPDM message monitoring |
| %(prog)s --samplerate 5000 --timeout 1000 # Custom sample rate and timeout |
| """, |
| ) |
| |
| # Protocol mode arguments |
| parser.add_argument( |
| "--smbus", action="store_true", help="Treat all packets as having SMBus header" |
| ) |
| parser.add_argument( |
| "--validate-pec", |
| action="store_true", |
| help="Validate PEC and mark invalid packets with X (requires --smbus)", |
| ) |
| parser.add_argument( |
| "--mctp", |
| action="store_true", |
| help="MCTP mode (implies --smbus --validate-pec, hides non-MCTP traffic)", |
| ) |
| parser.add_argument( |
| "--mctp-no-partial", |
| action="store_true", |
| help="Hide MCTP fragments until EOM (requires --mctp or --spdm)", |
| ) |
| parser.add_argument( |
| "--spdm", |
| action="store_true", |
| help="SPDM mode (implies --mctp, hides non-SPDM traffic)", |
| ) |
| |
| # Device configuration arguments |
| parser.add_argument( |
| "--samplerate", |
| type=int, |
| default=10000, |
| metavar="KHZ", |
| help="Sample rate in kHz (default: 10000)", |
| ) |
| parser.add_argument( |
| "--timeout", |
| type=int, |
| default=500, |
| metavar="MS", |
| help="Idle timeout in milliseconds (default: 500)", |
| ) |
| parser.add_argument( |
| "--latency", |
| type=int, |
| default=200, |
| metavar="MS", |
| help="Latency in milliseconds (default: 200)", |
| ) |
| |
| args = parser.parse_args() |
| |
| # Handle argument implications and set initial display mode |
| if args.spdm: |
| args.mctp = True |
| args.mctp_no_partial = True # SPDM mode always waits for complete messages |
| display_mode.set_mode("spdm") |
| |
| if args.mctp: |
| args.smbus = True |
| args.validate_pec = True |
| if not args.spdm: |
| display_mode.set_mode("mctp") |
| |
| if args.smbus and not args.mctp: |
| display_mode.set_mode("smbus") |
| |
| # Store validate_pec in display_mode |
| display_mode.validate_pec = args.validate_pec |
| |
| # Validate argument combinations |
| if args.validate_pec and not args.smbus: |
| parser.error("--validate-pec requires --smbus") |
| |
| if args.mctp_no_partial and not (args.mctp or args.spdm): |
| parser.error("--mctp-no-partial requires --mctp or --spdm") |
| |
| # Validate configuration arguments |
| if args.samplerate <= 0: |
| parser.error("Sample rate must be positive") |
| if args.timeout < 0: |
| parser.error("Timeout must be non-negative") |
| if args.latency < 0: |
| parser.error("Latency must be non-negative") |
| |
| # Print about text to console before starting TUI |
| print() |
| for line in ABOUT_TEXT: |
| print(line) |
| print() |
| |
| # Find and connect to device |
| beagle = find_and_connect() |
| |
| # Create stop event for coordinating threads |
| stop_event = threading.Event() |
| |
| try: |
| # Configure device |
| print() |
| configure_device(beagle, args.samplerate, args.timeout, args.latency) |
| |
| # Start monitoring in a separate thread |
| monitor_thread = threading.Thread( |
| target=monitor_i2c, args=(beagle, args, stop_event), daemon=True |
| ) |
| monitor_thread.start() |
| |
| # Run command loop in main thread with curses |
| curses.wrapper(command_loop_curses, stop_event) |
| |
| # Wait for monitoring thread to finish |
| monitor_thread.join(timeout=2.0) |
| |
| finally: |
| # Clean up |
| stop_event.set() |
| bg_close(beagle) |
| print("Beagle device closed") |
| |
| |
| if __name__ == "__main__": |
| main() |