#!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-only # Copyright(c) 2026 Meta Platforms, Inc. and affiliates. # # cdat_dump.py - Dump and decode CDAT (Coherent Device Attribute Table) # from CXL devices via sysfs # # Usage: # cdat_dump.py # dump all CXL devices with CDAT # cdat_dump.py /sys/bus/cxl/devices/endpoint0/CDAT # cdat_dump.py --raw cdat_binary.bin # decode from raw file # cdat_dump.py --hex # include hex dump of each entry import argparse import glob import os import struct import sys # CDAT Header: u32 length, u8 revision, u8 checksum, u8 reserved[6], u32 sequence CDAT_HDR_FMT = "= (1 << 40): return f"{size / (1 << 40):.2f} TiB" if size >= (1 << 30): return f"{size / (1 << 30):.2f} GiB" if size >= (1 << 20): return f"{size / (1 << 20):.2f} MiB" if size >= (1 << 10): return f"{size / (1 << 10):.2f} KiB" return f"{size} B" def fmt_port(port_id): if port_id == 0xFFFF: return "ANY" if port_id == 0x0100: return "USP (upstream)" return f"DSP {port_id}" def decode_latency_bandwidth(entry_val, base_unit, data_type): """Decode a DSLBIS/SSLBIS entry value into human-readable form.""" if entry_val == 0xFFFF or entry_val == 0: return "N/A" raw = entry_val * base_unit if data_type <= 2: # latency types (picoeconds -> nanoseconds) ns = raw / 1000.0 if ns >= 1000: return f"{ns/1000:.2f} us ({raw} ps)" return f"{ns:.2f} ns ({raw} ps)" else: # bandwidth types (MB/s) if raw >= 1024: return f"{raw/1024:.2f} GB/s ({raw} MB/s)" return f"{raw} MB/s" def decode_dsmas(data, show_hex): handle, flags, _, dpa_base, dpa_length = struct.unpack_from(DSMAS_FMT, data) flag_strs = [] if flags & (1 << 2): flag_strs.append("NonVolatile") if flags & (1 << 3): flag_strs.append("Shareable") if flags & (1 << 6): flag_strs.append("ReadOnly") flag_desc = ", ".join(flag_strs) if flag_strs else "None" print(f" DSMAD Handle: {handle}") print(f" Flags: 0x{flags:02x} ({flag_desc})") print(f" DPA Base: 0x{dpa_base:016x}") print(f" DPA Length: 0x{dpa_length:016x} ({fmt_size(dpa_length)})") def decode_dslbis(data, show_hex): handle, flags, data_type, _, base_unit, e0, e1, e2, _ = \ struct.unpack_from(DSLBIS_FMT, data) dt_name = HMAT_DATA_TYPES.get(data_type, f"Unknown ({data_type})") print(f" Handle: {handle}") print(f" Flags: 0x{flags:02x}") print(f" Data Type: {data_type} ({dt_name})") print(f" Base Unit: {base_unit}") print(f" Entry[0]: {e0} -> {decode_latency_bandwidth(e0, base_unit, data_type)}") if e1: print(f" Entry[1]: {e1} -> {decode_latency_bandwidth(e1, base_unit, data_type)}") if e2: print(f" Entry[2]: {e2} -> {decode_latency_bandwidth(e2, base_unit, data_type)}") def decode_dsmscis(data, show_hex): dsmas_handle, _, _, _, cache_size, cache_attr = \ struct.unpack_from(DSMSCIS_FMT, data) total_levels = cache_attr & 0xF cache_level = (cache_attr >> 4) & 0xF assoc = (cache_attr >> 8) & 0xF write_pol = (cache_attr >> 12) & 0xF line_size = (cache_attr >> 16) & 0xFFFF assoc_str = CACHE_ASSOCIATIVITY.get(assoc, f"Unknown ({assoc})") wp_str = CACHE_WRITE_POLICY.get(write_pol, f"Unknown ({write_pol})") print(f" DSMAS Handle: {dsmas_handle}") print(f" Cache Size: 0x{cache_size:016x} ({fmt_size(cache_size)})") print(f" Cache Attrs: 0x{cache_attr:08x}") print(f" Total Levels: {total_levels}") print(f" Cache Level: {cache_level}") print(f" Associativity: {assoc_str}") print(f" Write Policy: {wp_str}") print(f" Line Size: {line_size} bytes") def decode_dsis(data, show_hex): flags, handle, _ = struct.unpack_from(DSIS_FMT, data) mem_attached = bool(flags & 1) if mem_attached: handle_desc = f"DSMAS handle {handle}" else: handle_desc = f"Initiator handle {handle} (no memory)" print(f" Flags: 0x{flags:02x} (Memory Attached: {mem_attached})") print(f" Handle: {handle} ({handle_desc})") def decode_dsemts(data, show_hex): dsmas_handle, mem_type, _, dpa_offset, range_length = \ struct.unpack_from(DSEMTS_FMT, data) mt_str = EFI_MEM_TYPES.get(mem_type, f"Reserved ({mem_type})") print(f" DSMAS Handle: {dsmas_handle}") print(f" Memory Type: {mem_type} ({mt_str})") print(f" DPA Offset: 0x{dpa_offset:016x}") print(f" Range Length: 0x{range_length:016x} ({fmt_size(range_length)})") def decode_sslbis(data, total_len, show_hex): dt, _, _, _, base_unit = struct.unpack_from(SSLBIS_FMT, data) dt_name = HMAT_DATA_TYPES.get(dt, f"Unknown ({dt})") print(f" Data Type: {dt} ({dt_name})") print(f" Base Unit: {base_unit}") # Variable number of SSLBE entries after the fixed header entries_data = data[SSLBIS_SIZE:] n_entries = len(entries_data) // SSLBE_SIZE for i in range(n_entries): off = i * SSLBE_SIZE px, py, val, _ = struct.unpack_from(SSLBE_FMT, entries_data, off) decoded = decode_latency_bandwidth(val, base_unit, dt) print(f" Entry[{i}]: {fmt_port(px)} <-> {fmt_port(py)}: " f"{val} -> {decoded}") DECODERS = { 0: decode_dsmas, 1: decode_dslbis, 2: decode_dsmscis, 3: decode_dsis, 4: decode_dsemts, } def decode_cdat(data, source="", show_hex=False): if len(data) < CDAT_HDR_SIZE: print(f"Error: data too short for CDAT header ({len(data)} < {CDAT_HDR_SIZE})") return False # Parse header vals = struct.unpack_from(CDAT_HDR_FMT, data) length = vals[0] revision = vals[1] checksum = vals[2] # vals[3:9] are the 6 reserved bytes sequence = vals[9] # Verify checksum cksum = sum(data[:length]) & 0xFF cksum_ok = "OK" if cksum == 0 else f"FAIL (sum=0x{cksum:02x})" if source: print(f"=== CDAT from {source} ===") print(f"CDAT Header:") print(f" Length: {length} bytes") print(f" Revision: {revision}") print(f" Checksum: 0x{checksum:02x} ({cksum_ok})") print(f" Sequence: {sequence}") if show_hex: print(f" Raw header:") print(hexdump(data[:CDAT_HDR_SIZE], " ")) if length > len(data): print(f"Warning: CDAT length ({length}) > available data ({len(data)})") length = len(data) # Parse subtables offset = CDAT_HDR_SIZE entry_num = 0 counts = {} while offset + CDAT_SUBTBL_HDR_SIZE <= length: stype, _, slen = struct.unpack_from(CDAT_SUBTBL_HDR_FMT, data, offset) if slen < CDAT_SUBTBL_HDR_SIZE: print(f"\nError: subtable at offset {offset} has invalid length {slen}") break if offset + slen > length: print(f"\nError: subtable at offset {offset} extends past end " f"(offset+len={offset+slen} > {length})") break counts[stype] = counts.get(stype, 0) + 1 type_name = CDAT_TYPE_NAMES.get(stype, f"Unknown (type={stype})") print(f"\n [{entry_num}] {type_name}") print(f" Offset: {offset}, Length: {slen}") if show_hex: print(hexdump(data[offset:offset+slen], " ")) # Decode the subtable body (skip the 4-byte common header) body = data[offset + CDAT_SUBTBL_HDR_SIZE:offset + slen] if stype == 5: # SSLBIS has variable length, pass total subtable body length decode_sslbis(body, slen - CDAT_SUBTBL_HDR_SIZE, show_hex) elif stype in DECODERS: DECODERS[stype](body, show_hex) else: print(f" (unknown type, raw data follows)") print(hexdump(body, " ")) offset += slen entry_num += 1 # Summary print(f"\nSummary: {entry_num} entries") for t in sorted(counts): name = CDAT_TYPE_NAMES.get(t, f"Unknown ({t})") print(f" {name}: {counts[t]}") if offset < length: trailing = length - offset print(f"\nWarning: {trailing} trailing bytes after last subtable") print() return True def find_cdat_sysfs(): """Find all CXL devices with CDAT attributes in sysfs.""" paths = [] for dev_path in sorted(glob.glob("/sys/bus/cxl/devices/*")): cdat_path = os.path.join(dev_path, "CDAT") if os.path.exists(cdat_path): paths.append(cdat_path) return paths def read_cdat(path): """Read binary CDAT data from a sysfs attribute or file.""" try: with open(path, "rb") as f: return f.read() except PermissionError: print(f"Error: permission denied reading {path} (need root?)") return None except OSError as e: print(f"Error reading {path}: {e}") return None def main(): parser = argparse.ArgumentParser( description="Dump and decode CXL CDAT (Coherent Device Attribute Table)", epilog="Without arguments, discovers and dumps CDAT from all CXL devices.\n" "Requires root access to read sysfs CDAT attributes.", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "path", nargs="*", help="Path to sysfs CDAT attribute or raw CDAT binary file", ) parser.add_argument( "--raw", action="store_true", help="Treat input as raw CDAT binary file (not sysfs)", ) parser.add_argument( "--hex", action="store_true", help="Include hex dump of each entry", ) args = parser.parse_args() paths = args.path # Read from stdin if piped or explicitly given "-" if not sys.stdin.isatty() and not paths: data = sys.stdin.buffer.read() if not data: print("Error: no data on stdin") return 1 return 0 if decode_cdat(data, "stdin", show_hex=args.hex) else 1 if paths == ["-"]: data = sys.stdin.buffer.read() if not data: print("Error: no data on stdin") return 1 return 0 if decode_cdat(data, "stdin", show_hex=args.hex) else 1 if not paths: paths = find_cdat_sysfs() if not paths: print("No CXL devices with CDAT found in sysfs.") print("Check that CXL devices are present and the cxl_port driver is loaded.") return 1 ok = True for path in paths: data = read_cdat(path) if data is None: ok = False continue if not data: dev = os.path.basename(os.path.dirname(path)) if not args.raw else path print(f"{dev}: CDAT is empty (read from device failed at probe time)") ok = False continue source = path if not args.raw and "/sys/" in path: source = os.path.basename(os.path.dirname(path)) if not decode_cdat(data, source, show_hex=args.hex): ok = False return 0 if ok else 1 if __name__ == "__main__": sys.exit(main())