blob: ff16f3e12fb587b9a8f7be27883e85353955661c [file]
# Licensed under the Apache-2.0 license
# SPDX-License-Identifier: Apache-2.0
"""Utility to invoke the caliptra emulator and pipe the output through the detokenizer."""
import argparse
import logging
import subprocess
import sys
import tempfile
import threading
import time
import pathlib
from pathlib import Path
from pw_tokenizer import detokenize
_LOG = logging.getLogger(__name__)
_LOG.setLevel(logging.INFO)
try:
import caliptra.emulator_cptra_rom # type: ignore
import caliptra.emulator_cptra_firmware # type: ignore
import caliptra.emulator_mcu_rom # type: ignore
import caliptra.emulator_exe # type: ignore
from python.runfiles import runfiles # type: ignore
r = runfiles.Create()
_CPTRA_ROM = r.Rlocation(*caliptra.emulator_cptra_rom.RLOCATION)
_CPTRA_FIRMWARE = r.Rlocation(*caliptra.emulator_cptra_firmware.RLOCATION)
_MCU_ROM = r.Rlocation(*caliptra.emulator_mcu_rom.RLOCATION)
_EMULATOR = r.Rlocation(*caliptra.emulator_exe.RLOCATION)
except ImportError as e:
_LOG.fatal("runfiles could not open resources: %r", e)
def _parse_args():
"""Parse and return command line arguments."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--interface",
type=str,
help="interface type",
)
parser.add_argument(
"--elf",
type=pathlib.Path,
help="elf file ",
)
parser.add_argument(
"--bin",
type=pathlib.Path,
help="bin file",
)
parser.add_argument(
"--manifest",
type=pathlib.Path,
help="authorization manifest",
)
parser.add_argument(
"--vendor-pk-hash",
type=str,
help="SHA384 of public keys",
)
return parser.parse_args()
def _detokenizer(image: Path, tokenized_file: Path, finished: threading.Event):
try:
detokenizer = detokenize.Detokenizer(image)
line_buffer = ""
with open(tokenized_file, "r", buffering=1) as f:
while not finished.is_set():
try:
chunk = f.readline()
if chunk:
# qemu may not write a complete line, so buffer
# the chunks until there is a complete line to
# pass to the detokenizer.
line_buffer += chunk
# Use a while loop, as there could also potentially
# be multiple lines printed in-between iterations.
while "\n" in line_buffer:
newline_pos = line_buffer.find("\n") + 1
complete_line = line_buffer[:newline_pos]
if not complete_line.endswith("\r\n"):
complete_line = complete_line.replace("\n", "\r\n")
detokenizer.detokenize_text_to_file(
complete_line, sys.stdout.buffer
)
sys.stdout.flush()
line_buffer = line_buffer[newline_pos:]
except BlockingIOError:
# If writing to stdout too fast, it's sometimes possible
# to get BlockingIOError due to the stdout buffer being
# full, so sleep and try again.
time.sleep(0.1)
# detokenize any remaining data in the buffer.
if line_buffer:
detokenizer.detokenize_text_to_file(complete_line, sys.stdout.buffer)
sys.stdout.flush()
except OSError as e:
print(f"Exception opening file {e}", file=sys.stderr)
def load_and_run(
image: Path,
interface: str,
manifest: str,
vendor_pk_hash: str,
) -> list[str]:
"""Prepare arguments to load an image into a board and spawn a console."""
if interface == "emulator":
cmd = [
_EMULATOR,
f"--rom={_MCU_ROM}",
f"--firmware={image}",
f"--caliptra-rom={_CPTRA_ROM}",
f"--caliptra-firmware={_CPTRA_FIRMWARE}",
"--i3c-port=65534",
"--rom-offset=0x80000000",
"--rom-size=0x8000",
"--dccm-offset=0x50000000",
"--dccm-size=0x4000",
"--sram-offset=0x40000000",
"--sram-size=0x80000",
"--pic-offset=0x60000000",
"--i3c-offset=0x20004000",
"--i3c-size=0x1000",
"--mci-offset=0x21000000",
"--mci-size=0xe00000",
"--mbox-offset=0x30020000",
"--mbox-size=0x28",
"--soc-offset=0x30030000",
"--soc-size=0x5e0",
"--otp-offset=0x70000000",
"--otp-size=0x140",
"--lc-offset=0x70000400",
"--lc-size=0x8c",
]
if manifest and str(manifest) != "None":
cmd.append(f"--soc-manifest={manifest}")
if vendor_pk_hash and str(vendor_pk_hash) != "None":
cmd.append(f"--vendor-pk-hash={vendor_pk_hash}")
return cmd
else:
raise Exception("unknown mechanism", mechanism)
def simple_console(cmd: list[str]):
"""Invoke for a simple (non-tokenized) console."""
_LOG.info("Invoking mcu emulator: %s", cmd)
return subprocess.run(cmd, check=False).returncode
def tokenized_console(cmd: list[str]):
"""Invoke for a tokenized console."""
_LOG.info("Invoking mcu emulator: %s", cmd)
with tempfile.NamedTemporaryFile() as f:
with subprocess.Popen(
args=cmd,
stdout=f,
) as proc:
# Capturing the sub process stdout or stderr and then writing to
# stdout can cause deadlocks (see
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr)
# due to a write buffer (child process) filling up the pipe
# buffer before the parent process can consume it.
# To work around this, write to a temp file, and have the
# detokenizer poll and detokenize the temp file.
finished_event = threading.Event()
stdout_thread = threading.Thread(
target=_detokenizer,
args=(Path(args.elf), Path(f.name), finished_event),
daemon=True,
)
stdout_thread.start()
out, err = proc.communicate()
finished_event.set()
if out:
print(out)
if err:
print(err)
return_code = proc.returncode
stdout_thread.join()
return return_code
def _main(args) -> int:
cmd = load_and_run(
args.bin,
args.interface,
args.manifest,
args.vendor_pk_hash,
)
# TODO(cfrantz): add support for the tokenized console.
return_code = simple_console(cmd)
sys.exit(return_code)
if __name__ == "__main__":
logging.basicConfig()
_main(_parse_args())