blob: 0a3e50cb8019d2e331b432edf896e1a74c987835 [file] [edit]
#!/usr/bin/env python3
"""
MCTP Packet Parser
Parses MCTP packets based on DSP0236 v1.3.1 and SPDM over MCTP (DSP0275 v1.0.2)
Parses SPDM messages based on DSP0274 v1.3.0
"""
import sys
import argparse
from typing import List, Optional, Dict
class SPDMParser:
"""Parser for SPDM messages (DSP0274)"""
# Request codes (DSP0274 Table 4)
REQUEST_CODES = {
0x81: "GET_DIGESTS",
0x82: "GET_CERTIFICATE",
0x83: "CHALLENGE",
0x84: "GET_VERSION",
0x85: "CHUNK_SEND",
0x86: "CHUNK_GET",
0x87: "GET_ENDPOINT_INFO",
0xE0: "GET_MEASUREMENTS",
0xE1: "GET_CAPABILITIES",
0xE2: "GET_SUPPORTED_EVENT_TYPES",
0xE3: "NEGOTIATE_ALGORITHMS",
0xE4: "KEY_EXCHANGE",
0xE5: "FINISH",
0xE6: "PSK_EXCHANGE",
0xE7: "PSK_FINISH",
0xE8: "HEARTBEAT",
0xE9: "KEY_UPDATE",
0xEA: "GET_ENCAPSULATED_REQUEST",
0xEB: "DELIVER_ENCAPSULATED_RESPONSE",
0xEC: "END_SESSION",
0xED: "GET_CSR",
0xEE: "SET_CERTIFICATE",
0xEF: "GET_MEASUREMENT_EXTENSION_LOG",
0xF0: "SUBSCRIBE_EVENT_TYPES",
0xF1: "SEND_EVENT",
0xFC: "GET_KEY_PAIR_INFO",
0xFD: "SET_KEY_PAIR_INFO",
0xFE: "VENDOR_DEFINED_REQUEST",
0xFF: "RESPOND_IF_READY",
}
# Response codes (DSP0274 Table 5)
RESPONSE_CODES = {
0x01: "DIGESTS",
0x02: "CERTIFICATE",
0x03: "CHALLENGE_AUTH",
0x04: "VERSION",
0x05: "CHUNK_SEND_ACK",
0x06: "CHUNK_RESPONSE",
0x07: "ENDPOINT_INFO",
0x60: "MEASUREMENTS",
0x61: "CAPABILITIES",
0x62: "SUPPORTED_EVENT_TYPES",
0x63: "ALGORITHMS",
0x64: "KEY_EXCHANGE_RSP",
0x65: "FINISH_RSP",
0x66: "PSK_EXCHANGE_RSP",
0x67: "PSK_FINISH_RSP",
0x68: "HEARTBEAT_ACK",
0x69: "KEY_UPDATE_ACK",
0x6A: "ENCAPSULATED_REQUEST",
0x6B: "ENCAPSULATED_RESPONSE_ACK",
0x6C: "END_SESSION_ACK",
0x6D: "CSR",
0x6E: "SET_CERTIFICATE_RSP",
0x6F: "MEASUREMENT_EXTENSION_LOG",
0x70: "SUBSCRIBE_EVENT_TYPES_ACK",
0x71: "EVENT_ACK",
0x7C: "KEY_PAIR_INFO",
0x7D: "SET_KEY_PAIR_INFO_ACK",
0x7E: "VENDOR_DEFINED_RESPONSE",
0x7F: "ERROR",
}
@staticmethod
def parse(payload: List[int]) -> Optional[Dict]:
"""Parse SPDM message from payload"""
if len(payload) < 2:
return None
result = {}
# Byte 0: SPDM version (major [7:4], minor [3:0])
version_byte = payload[0]
result['version_major'] = (version_byte >> 4) & 0x0F
result['version_minor'] = version_byte & 0x0F
result['version'] = f"{result['version_major']}.{result['version_minor']}"
# Byte 1: Request/Response code
code = payload[1]
result['code'] = code
# Determine if request or response
if code in SPDMParser.REQUEST_CODES:
result['msg_direction'] = "Request"
result['msg_name'] = SPDMParser.REQUEST_CODES[code]
elif code in SPDMParser.RESPONSE_CODES:
result['msg_direction'] = "Response"
result['msg_name'] = SPDMParser.RESPONSE_CODES[code]
else:
result['msg_direction'] = "Unknown"
result['msg_name'] = f"Unknown (0x{code:02X})"
# Byte 2: Param1 (if present)
if len(payload) > 2:
result['param1'] = payload[2]
# Byte 3: Param2 (if present)
if len(payload) > 3:
result['param2'] = payload[3]
# Remaining data
if len(payload) > 4:
result['data'] = payload[4:]
result['data_len'] = len(result['data'])
else:
result['data'] = []
result['data_len'] = 0
return result
class MCTPParser:
"""Parser for MCTP packets"""
# Message type definitions (DSP0236 Table 3)
MESSAGE_TYPES = {
0x00: "MCTP Control Message",
0x01: "PLDM",
0x02: "NC-SI over MCTP",
0x03: "Ethernet over MCTP",
0x04: "NVMe-MI over MCTP",
0x05: "SPDM over MCTP",
0x06: "SECDED over MCTP",
0x07: "CXL FM API over MCTP",
0x08: "CXL CCI over MCTP",
}
def __init__(self, data: List[int]):
"""Initialize parser with packet data"""
self.data = data
self.offset = 0
def parse(self) -> dict:
"""Parse MCTP packet and return structured data"""
if len(self.data) < 4:
raise ValueError("Packet too short - minimum 4 bytes required for MCTP header")
result = {}
# Byte 0: Destination EID
result['dest_eid'] = self.data[0]
# Byte 1: Header Version [7:4], Reserved [3:0]
byte1 = self.data[1]
result['header_version'] = (byte1 >> 4) & 0x0F
result['rsvd'] = byte1 & 0x0F
# Byte 2: Source EID
result['src_eid'] = self.data[2]
# Byte 3: SOM, EOM, Pkt_Seq, TO, Msg_Tag
byte3 = self.data[3]
result['som'] = bool(byte3 & 0x80) # Start of Message
result['eom'] = bool(byte3 & 0x40) # End of Message
result['pkt_seq'] = (byte3 >> 4) & 0x03 # Packet sequence number
result['to'] = bool(byte3 & 0x08) # Tag Owner
result['msg_tag'] = byte3 & 0x07 # Message Tag
# Message body starts at byte 4
if len(self.data) > 4:
# Byte 4: IC [7], Message Type [6:0]
byte4 = self.data[4]
result['ic'] = bool(byte4 & 0x80) # Integrity Check
result['msg_type'] = byte4 & 0x7F
result['msg_type_name'] = self.MESSAGE_TYPES.get(
result['msg_type'],
f"Vendor Defined (0x{result['msg_type']:02X})" if result['msg_type'] >= 0x7E
else f"Reserved (0x{result['msg_type']:02X})"
)
# Payload starts at byte 5
if len(self.data) > 5:
result['payload'] = self.data[5:]
result['payload_len'] = len(result['payload'])
# Parse SPDM if message type is 0x05
if result['msg_type'] == 0x05 and result['payload_len'] > 0:
result['spdm'] = SPDMParser.parse(result['payload'])
else:
result['payload'] = []
result['payload_len'] = 0
return result
def format_output(self, parsed: dict) -> str:
"""Format parsed data as human-readable output"""
lines = []
lines.append("=" * 70)
lines.append("MCTP Packet Analysis")
lines.append("=" * 70)
lines.append("\nTransport Header:")
lines.append(f" Destination EID: 0x{parsed['dest_eid']:02X} ({parsed['dest_eid']})")
lines.append(f" Source EID: 0x{parsed['src_eid']:02X} ({parsed['src_eid']})")
lines.append(f" Header Version: {parsed['header_version']}")
lines.append("\nMessage Framing:")
lines.append(f" SOM (Start): {'Yes' if parsed['som'] else 'No'}")
lines.append(f" EOM (End): {'Yes' if parsed['eom'] else 'No'}")
lines.append(f" Packet Sequence: {parsed['pkt_seq']}")
lines.append(f" Tag Owner (TO): {parsed['to']}")
lines.append(f" Message Tag: {parsed['msg_tag']}")
if 'msg_type' in parsed:
lines.append("\nMessage Body:")
lines.append(f" Integrity Check: {'Yes' if parsed['ic'] else 'No'}")
lines.append(f" Message Type: 0x{parsed['msg_type']:02X} - {parsed['msg_type_name']}")
# SPDM message parsing
if 'spdm' in parsed and parsed['spdm']:
spdm = parsed['spdm']
lines.append("\n" + "=" * 70)
lines.append("SPDM Message (DSP0274)")
lines.append("=" * 70)
lines.append(f" SPDM Version: {spdm['version']}")
lines.append(f" Message Direction: {spdm['msg_direction']}")
lines.append(f" Message Code: 0x{spdm['code']:02X}")
lines.append(f" Message Name: {spdm['msg_name']}")
if 'param1' in spdm:
lines.append(f" Param1: 0x{spdm['param1']:02X} ({spdm['param1']})")
if 'param2' in spdm:
lines.append(f" Param2: 0x{spdm['param2']:02X} ({spdm['param2']})")
if spdm['data_len'] > 0:
lines.append(f"\n SPDM Data ({spdm['data_len']} bytes):")
data = spdm['data']
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
lines.append(f" {i:04X}: {hex_str:<48} {ascii_str}")
elif parsed['payload_len'] > 0:
lines.append(f"\nPayload ({parsed['payload_len']} bytes):")
# Format payload in hex dump style
payload = parsed['payload']
for i in range(0, len(payload), 16):
chunk = payload[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
lines.append(f" {i:04X}: {hex_str:<48} {ascii_str}")
lines.append("=" * 70)
return '\n'.join(lines)
def calculate_pec(data: List[int]) -> int:
"""
Calculate SMBus PEC (Packet Error Code) using CRC-8.
SMBus uses CRC-8 with polynomial 0x07 (x^8 + x^2 + x + 1).
Initial value is 0x00.
Args:
data: List of bytes to calculate PEC for
Returns:
PEC value (0-255)
"""
crc = 0x00
polynomial = 0x07
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ polynomial
else:
crc = crc << 1
crc &= 0xFF
return crc
def parse_hex_bytes(hex_string: str) -> List[int]:
"""Parse space-separated hex bytes without 0x prefix"""
hex_string = hex_string.strip()
if not hex_string:
return []
# Split on whitespace and filter empty strings
parts = [p.strip() for p in hex_string.split() if p.strip()]
bytes_list = []
for part in parts:
# Remove any 0x prefix if present
if part.lower().startswith('0x'):
part = part[2:]
try:
bytes_list.append(int(part, 16))
except ValueError:
raise ValueError(f"Invalid hex byte: {part}")
return bytes_list
def parse_smbus_transport(data: List[int]) -> Optional[Dict]:
"""
Parse SMBus/I2C transport binding header (DSP0237).
Returns parsed transport info or None if not SMBus format.
"""
if len(data) < 3:
return None
# Check for MCTP over SMBus command code (0x0F)
if data[0] != 0x0F:
return None
result = {}
result['cmd_code'] = data[0]
result['byte_count'] = data[1]
expected_total = 2 + result['byte_count'] + 1 # cmd + count + data + PEC
has_pec = (len(data) == expected_total)
if has_pec:
result['pec_received'] = data[-1]
# Calculate expected PEC over all bytes except the PEC itself
# For MCTP over SMBus, PEC is calculated over: cmd_code + byte_count + all data bytes
pec_data = data[:-1]
result['pec_calculated'] = calculate_pec(pec_data)
result['pec_valid'] = (result['pec_received'] == result['pec_calculated'])
data_end = -1
else:
result['pec_received'] = None
result['pec_calculated'] = None
result['pec_valid'] = None
data_end = len(data)
# SMBus-specific headers start at byte 2
if result['byte_count'] < 5:
return None
offset = 2
result['source_slave_addr'] = data[offset]
offset += 1
# MCTP reserved + header version
byte_hdr = data[offset]
result['mctp_reserved'] = (byte_hdr >> 4) & 0x0F
result['mctp_hdr_version'] = byte_hdr & 0x0F
offset += 1
# Destination and Source EIDs are swapped in SMBus binding
result['dest_eid'] = data[offset]
offset += 1
result['src_eid'] = data[offset]
offset += 1
# MCTP packet starts here (SOM/EOM byte)
result['mctp_offset'] = offset
return result
def main():
parser = argparse.ArgumentParser(
description='Parse MCTP packets (DSP0236 v1.3.1)',
epilog='Data format: Space-separated hex bytes without 0x prefix (e.g., "01 02 03 04")'
)
parser.add_argument(
'-i', '--input',
type=str,
metavar='FILE',
help='Read packet data from file instead of command line'
)
parser.add_argument(
'--no-transport',
action='store_true',
help='Skip I2C transport binding header detection, treat as raw MCTP'
)
parser.add_argument(
'--append-pec',
action='store_true',
help='Calculate PEC for all input bytes (default: assumes last byte is PEC)'
)
parser.add_argument(
'--dest-addr',
type=str,
metavar='ADDR',
help='I2C destination address (7-bit) to prepend in PEC calculation, like libmctp does (e.g., "0x10" or "10")'
)
parser.add_argument(
'data',
nargs='*',
help='Hex bytes to parse (space-separated, no 0x prefix)'
)
args = parser.parse_args()
# Get data from file or command line
if args.input:
try:
with open(args.input, 'r') as f:
data_str = f.read()
except FileNotFoundError:
print(f"Error: File not found: {args.input}", file=sys.stderr)
return 1
except IOError as e:
print(f"Error reading file: {e}", file=sys.stderr)
return 1
elif args.data:
data_str = ' '.join(args.data)
else:
parser.print_help()
return 1
# Parse hex bytes
try:
packet_bytes = parse_hex_bytes(data_str)
except ValueError as e:
print(f"Error parsing hex data: {e}", file=sys.stderr)
return 1
if not packet_bytes:
print("Error: No data provided", file=sys.stderr)
return 1
# Parse destination address if provided
dest_addr_byte = None
if args.dest_addr:
try:
addr_str = args.dest_addr.strip()
if addr_str.lower().startswith('0x'):
addr_str = addr_str[2:]
dest_addr_7bit = int(addr_str, 16)
if dest_addr_7bit > 0x7F:
print(f"Error: Destination address must be 7-bit (0x00-0x7F), got 0x{dest_addr_7bit:02X}", file=sys.stderr)
return 1
# Convert to I2C write address (7-bit address << 1, write bit = 0)
dest_addr_byte = (dest_addr_7bit << 1) | 0
except ValueError:
print(f"Error: Invalid destination address: {args.dest_addr}", file=sys.stderr)
return 1
# Detect and handle SMBus/I2C transport binding
smbus_info = None
if not args.no_transport:
smbus_info = parse_smbus_transport(packet_bytes)
if smbus_info:
# Print SMBus transport header
print("=" * 70)
print("SMBus/I2C Transport Binding (DSP0237)")
print("=" * 70)
print(f" Command Code: 0x{smbus_info['cmd_code']:02X}")
print(f" Byte Count: {smbus_info['byte_count']}")
print(f" Source Slave Addr: 0x{smbus_info['source_slave_addr']:02X}")
print(f" MCTP Reserved: 0x{smbus_info['mctp_reserved']:X}")
print(f" MCTP Header Ver: {smbus_info['mctp_hdr_version']}")
print(f" Destination EID: 0x{smbus_info['dest_eid']:02X} ({smbus_info['dest_eid']})")
print(f" Source EID: 0x{smbus_info['src_eid']:02X} ({smbus_info['src_eid']})")
if smbus_info['pec_received'] is not None:
print(f" PEC (Received): 0x{smbus_info['pec_received']:02X}")
print(f" PEC (Calculated): 0x{smbus_info['pec_calculated']:02X}")
if smbus_info['pec_valid']:
print(f" PEC Status: ✓ VALID")
else:
print(f" PEC Status: ✗ INVALID (mismatch)")
print()
# Extract MCTP packet portion (starting from SOM/EOM byte)
mctp_start = smbus_info['mctp_offset']
if smbus_info['pec_received'] is not None:
mctp_end = 2 + smbus_info['byte_count'] # Exclude PEC
else:
mctp_end = len(packet_bytes)
mctp_bytes = packet_bytes[mctp_start:mctp_end]
# Parse MCTP framing and message
try:
# Create a synthetic MCTP packet with EIDs for the parser
# MCTPParser expects: dest_eid, hdr_ver, src_eid, som/eom, ic/msg_type, payload
synthetic_packet = [
smbus_info['dest_eid'],
(smbus_info['mctp_hdr_version'] << 4) | smbus_info['mctp_reserved'],
smbus_info['src_eid']
] + mctp_bytes
mctp = MCTPParser(synthetic_packet)
parsed = mctp.parse()
output = mctp.format_output(parsed)
print(output)
# Verify all bytes accounted for
total_bytes = len(packet_bytes)
accounted = 2 + smbus_info['byte_count'] # cmd + count + data
if smbus_info['pec_received'] is not None:
accounted += 1
if accounted != total_bytes:
print(f"\nWarning: {total_bytes - accounted} byte(s) unaccounted for")
except Exception as e:
print(f"Error parsing MCTP packet: {e}", file=sys.stderr)
return 1
else:
# Parse as raw MCTP packet
try:
mctp = MCTPParser(packet_bytes)
parsed = mctp.parse()
output = mctp.format_output(parsed)
print(output)
except Exception as e:
print(f"Error parsing packet: {e}", file=sys.stderr)
return 1
# Always calculate and display PEC for the input data
print("\n" + "=" * 70)
print("PEC Calculation")
print("=" * 70)
if args.append_pec:
# Calculate PEC for all input bytes (user wants to append PEC)
if dest_addr_byte is not None:
# libmctp style: include destination address in PEC
print(f"Using libmctp-style PEC (includes I2C destination address)")
print(f" Destination address (7-bit): 0x{dest_addr_byte >> 1:02X}")
print(f" Destination address (8-bit with W bit): 0x{dest_addr_byte:02X}")
pec = calculate_pec([dest_addr_byte] + packet_bytes)
else:
# mctp-estack style: no destination address
print(f"Using mctp-estack-style PEC (no destination address)")
pec = calculate_pec(packet_bytes)
print(f"\nInput data ({len(packet_bytes)} bytes):")
for i in range(0, len(packet_bytes), 16):
chunk = packet_bytes[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
print(f" {hex_str}")
print(f"\nCalculated PEC: 0x{pec:02X} ({pec})")
print(f"\nComplete message with PEC ({len(packet_bytes) + 1} bytes):")
complete = packet_bytes + [pec]
for i in range(0, len(complete), 16):
chunk = complete[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
print(f" {hex_str}")
else:
# Default: assume last byte is PEC, verify it
if len(packet_bytes) < 2:
print("Warning: Message too short to have PEC (need at least 2 bytes)")
else:
received_pec = packet_bytes[-1]
data_bytes = packet_bytes[:-1]
if dest_addr_byte is not None:
# libmctp style: include destination address in PEC
print(f"Using libmctp-style PEC (includes I2C destination address)")
print(f" Destination address (7-bit): 0x{dest_addr_byte >> 1:02X}")
print(f" Destination address (8-bit with W bit): 0x{dest_addr_byte:02X}")
calculated_pec = calculate_pec([dest_addr_byte] + data_bytes)
else:
# mctp-estack style: no destination address
print(f"Using mctp-estack-style PEC (no destination address)")
calculated_pec = calculate_pec(data_bytes)
print(f"\nMessage data ({len(data_bytes)} bytes):")
for i in range(0, len(data_bytes), 16):
chunk = data_bytes[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
print(f" {hex_str}")
print(f"\nPEC (Received): 0x{received_pec:02X} ({received_pec})")
print(f"PEC (Calculated): 0x{calculated_pec:02X} ({calculated_pec})")
if received_pec == calculated_pec:
print(f"PEC Status: ✓ VALID")
else:
print(f"PEC Status: ✗ INVALID (mismatch)")
print(f"\nCorrect message should be ({len(packet_bytes)} bytes):")
correct = data_bytes + [calculated_pec]
for i in range(0, len(correct), 16):
chunk = correct[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
print(f" {hex_str}")
print("=" * 70)
return 0
if __name__ == '__main__':
sys.exit(main())