blob: 1e95efea4451f76e7854b9bde51c655e8c9ab55b [file]
#!/usr/bin/env python3
#
# Copyright (c) 2026 Nordic Semiconductor
# SPDX-License-Identifier: Apache-2.0
#
"""
Zephyr QUIC Kconfig Optimizer
==============================
Computes optimal Kconfig values for the QUIC subsystem balancing
memory usage and throughput, given user constraints.
Usage (interactive):
python quic-kconfig-optimizer.py
Usage (CLI):
python quic-kconfig-optimizer.py \
--contexts 2 --streams-bidi 3 --streams-uni 0 \
--mtu 1500 --ram-budget 65536 \
--role client --ip-version ipv4 \
--rtt-ms 50 --loss-rate 0 \
--app-message-size 4096 \
--ooo-expected no --network-type lan
"""
import argparse
import math
import sys
# Constants / fixed overheads (bytes)
SENT_PKT_ENTRY_BYTES = 24 # per entry in sent-packet history ring buffer
TLS_CONTEXT_OVERHEAD = 8192 # rough mbedTLS per-connection overhead
STREAM_STATE_OVERHEAD = 128 # misc per-stream state (not buffers)
CONN_STATE_OVERHEAD = 512 # misc per-connection state
def ask(prompt, default=None, cast=str, choices=None, min_val=None, max_val=None):
hint = f" [{default}]" if default is not None else ""
if choices:
hint += f" ({'/'.join(str(c) for c in choices)})"
while True:
raw = input(f"{prompt}{hint}: ").strip()
if raw == "" and default is not None:
return cast(default)
try:
val = cast(raw)
except (ValueError, TypeError):
print(f" x Expected {cast.__name__}. Try again.")
continue
if choices and val not in choices:
print(f" x Must be one of {choices}.")
continue
if min_val is not None and val < min_val:
print(f" x Must be >= {min_val}.")
continue
if max_val is not None and val > max_val:
print(f" x Must be <= {max_val}.")
continue
return val
def next_power_of_two(n):
return 1 << math.ceil(math.log2(max(n, 1)))
def align_up(n, align):
return ((n + align - 1) // align) * align
def human(n):
for unit in ("B", "KiB", "MiB", "GiB"):
if n < 1024:
return f"{n:.0f} {unit}"
n /= 1024
return f"{n:.1f} GiB"
def compute_config(p):
cfg, mem = {}, {}
mtu = p["mtu"]
contexts = p["contexts"]
streams_bidi = p["streams_bidi"]
streams_uni = p["streams_uni"]
role = p["role"]
ip_ver = p["ip_version"]
rtt_ms = p["rtt_ms"]
loss_rate = p["loss_rate"]
msg_size = p["app_message_size"]
ooo = p["ooo_expected"]
net_type = p["network_type"]
ram_budget = p["ram_budget"]
total_streams = contexts * (streams_bidi + streams_uni)
# Endpoints
endpoints = 3 if (ip_ver == "both" and role in ("server", "both")) else 2
cfg["QUIC_MAX_ENDPOINTS"] = endpoints
cfg["QUIC_PKT_COUNT"] = endpoints
# TX / endpoint buffers, must be at least MTU, clamped to [1280, 1500]
tx_buf = max(1280, min(mtu, 1500))
cfg["QUIC_TX_BUFFER_SIZE"] = tx_buf
cfg["QUIC_ENDPOINT_PENDING_DATA_LEN"] = tx_buf
# Stream buffers sized to bandwidth-delay product or 2x message size
bdp_estimate = max(tx_buf, msg_size * 2)
stream_buf = max(1500, min(align_up(bdp_estimate, 512), 65535))
if net_type == "wan" or loss_rate >= 2: # keep pipeline full on lossy/slow links
stream_buf = min(stream_buf * 2, 65535)
cfg["QUIC_STREAM_TX_BUFFER_SIZE"] = stream_buf
cfg["QUIC_STREAM_RX_BUFFER_SIZE"] = stream_buf
# OOO slots
ooo_slots = (8 if net_type == "wan" else 4) if ooo else 2
ooo_seg = max(512, min(mtu, 1280)) if ooo else 512
cfg["QUIC_STREAM_OOO_SLOTS"] = ooo_slots
cfg["QUIC_STREAM_OOO_SEG_SIZE"] = ooo_seg
# CRYPTO stream reassembly buffer is shared across all handshake levels per endpoint.
# Servers and WAN clients (e.g. interoperating with browsers) need at least 4096 B
# because Chrome/Firefox may split a ClientHello into 10+ CRYPTO frame fragments.
# Embedded-to-embedded clients with simple certificates can use 2048 B.
# Clamped to Kconfig range [1024, 8192].
if role in ("server", "both") or net_type == "wan":
crypto_rx_buf = 4096
elif net_type == "loopback":
crypto_rx_buf = 1024
else:
crypto_rx_buf = 2048
cfg["QUIC_CRYPTO_RX_BUFFER_SIZE"] = crypto_rx_buf
# CRYPTO OOO slots tracks metadata for out-of-order CRYPTO frame fragments.
# Servers and WAN deployments need 8 slots (Chrome sends up to 10+ fragments).
# Embedded clients talking to embedded peers only need the minimum of 4.
# Clamped to Kconfig range [4, 16].
if role in ("server", "both") or net_type == "wan":
crypto_ooo_slots = 8
else:
crypto_ooo_slots = 4
cfg["QUIC_CRYPTO_OOO_SLOTS"] = crypto_ooo_slots
# Sent-packet history must cover at least BDP/MTU worth of in-flight packets
min_hist = max(16, math.ceil(stream_buf / tx_buf) * 2)
if net_type == "wan":
min_hist = max(min_hist, 64)
if loss_rate >= 5:
min_hist = max(min_hist, 128)
cfg["QUIC_SENT_PKT_HISTORY_SIZE"] = min(next_power_of_two(min_hist), 256)
# Flow-control transport parameters match advertised window to our buffer capacity
cfg["QUIC_INITIAL_MAX_STREAM_DATA_BIDI_LOCAL"] = stream_buf
cfg["QUIC_INITIAL_MAX_STREAM_DATA_BIDI_REMOTE"] = stream_buf
cfg["QUIC_INITIAL_MAX_STREAM_DATA_UNI"] = stream_buf if streams_uni > 0 else 16384
cfg["QUIC_INITIAL_MAX_DATA"] = max(
streams_bidi * stream_buf + streams_uni * stream_buf, stream_buf
)
cfg["QUIC_INITIAL_MAX_STREAMS_BIDI"] = streams_bidi
cfg["QUIC_INITIAL_MAX_STREAMS_UNI"] = streams_uni
# Window update, more frequent on high-latency WAN to refill windows sooner
cfg["QUIC_STREAM_RX_WINDOW_UPDATE_THRESHOLD"] = (
15 if (net_type == "wan" and rtt_ms > 100) else 25
)
# Timeouts
idle = {"wan": 30000, "lan": 10000, "loopback": 5000}[net_type]
cfg["QUIC_MAX_IDLE_TIMEOUT"] = idle
cfg["QUIC_CONNECT_TIMEOUT"] = min(max(3000, rtt_ms * 10), 60000)
cfg["QUIC_MAX_PTO_TIMEOUT_MS"] = max(1000, min(idle // 2, 10000))
# TLS transcript buffer will default 4096 in Kconfig.
# 4096 B covers typical TLS 1.3 handshakes with RSA-2048 / EC P-256 certs.
# Increase to 6144-8192 for long certificate chains or RSA-4096 certificates.
cfg["QUIC_TLS_TRANSCRIPT_BUF_LEN"] = 4096
# Pass-through constraints
cfg["QUIC_MAX_CONTEXTS"] = contexts
cfg["QUIC_MAX_STREAMS_BIDI"] = streams_bidi
cfg["QUIC_MAX_STREAMS_UNI"] = streams_uni
# Memory accounting
mem["Endpoint RX staging"] = endpoints * tx_buf * endpoints # pkts * buf
mem["Endpoint TX buffers"] = endpoints * tx_buf
mem["Crypto RX buffers"] = endpoints * crypto_rx_buf
mem["Crypto OOO slots"] = endpoints * crypto_ooo_slots * 8 # ~8 B per slot
mem["Stream TX buffers"] = total_streams * stream_buf
mem["Stream RX buffers"] = total_streams * stream_buf
mem["Stream OOO buffers"] = total_streams * ooo_slots * ooo_seg
mem["Stream state overhead"] = total_streams * STREAM_STATE_OVERHEAD
mem["Sent-packet history"] = contexts * cfg["QUIC_SENT_PKT_HISTORY_SIZE"] * SENT_PKT_ENTRY_BYTES
mem["TLS context overhead"] = contexts * TLS_CONTEXT_OVERHEAD
mem["Connection state overhead"] = contexts * CONN_STATE_OVERHEAD
mem["TLS transcript buffers"] = contexts * cfg["QUIC_TLS_TRANSCRIPT_BUF_LEN"]
mem["TOTAL"] = sum(mem.values())
if 0 < ram_budget < mem["TOTAL"]:
raise ValueError(
f"Estimated memory {human(mem['TOTAL'])} exceeds budget {human(ram_budget)}.\n"
"Reduce: contexts, streams, stream_buf sizes, or OOO slots."
)
return cfg, mem
def print_report(p, cfg, mem):
SEP = "-" * 72
print(f"\n{'=' * 72}")
print(f"{'QUIC Kconfig Optimizer -- Recommended Settings':^72}")
print(f"{'=' * 72}")
print(f"\n{SEP}\n INPUT PARAMETERS\n{SEP}")
rows = [
("Max connections (QUIC_MAX_CONTEXTS)", p["contexts"]),
("Bidi streams per connection", p["streams_bidi"]),
("Uni streams per connection", p["streams_uni"]),
("MTU (bytes)", p["mtu"]),
("RAM budget", human(p["ram_budget"]) if p["ram_budget"] else "unlimited"),
("Device role", p["role"]),
("IP version", p["ip_version"]),
("Expected RTT (ms)", p["rtt_ms"]),
("Expected packet loss (%)", p["loss_rate"]),
("Typical message size (bytes)", p["app_message_size"]),
("Out-of-order delivery expected", "yes" if p["ooo_expected"] else "no"),
("Network type", p["network_type"]),
]
for label, val in rows:
print(f" {label:<46} {val}")
sections = [
(
"Connection / stream counts",
[
"QUIC_MAX_CONTEXTS",
"QUIC_MAX_STREAMS_BIDI",
"QUIC_MAX_STREAMS_UNI",
"QUIC_MAX_ENDPOINTS",
"QUIC_PKT_COUNT",
],
),
(
"QUIC transport parameters (flow control)",
[
"QUIC_INITIAL_MAX_DATA",
"QUIC_INITIAL_MAX_STREAM_DATA_BIDI_LOCAL",
"QUIC_INITIAL_MAX_STREAM_DATA_BIDI_REMOTE",
"QUIC_INITIAL_MAX_STREAM_DATA_UNI",
"QUIC_INITIAL_MAX_STREAMS_BIDI",
"QUIC_INITIAL_MAX_STREAMS_UNI",
"QUIC_STREAM_RX_WINDOW_UPDATE_THRESHOLD",
],
),
(
"Buffer sizes (RAM)",
[
"QUIC_ENDPOINT_PENDING_DATA_LEN",
"QUIC_TX_BUFFER_SIZE",
"QUIC_CRYPTO_RX_BUFFER_SIZE",
"QUIC_CRYPTO_OOO_SLOTS",
"QUIC_STREAM_TX_BUFFER_SIZE",
"QUIC_STREAM_RX_BUFFER_SIZE",
"QUIC_STREAM_OOO_SLOTS",
"QUIC_STREAM_OOO_SEG_SIZE",
"QUIC_SENT_PKT_HISTORY_SIZE",
"QUIC_TLS_TRANSCRIPT_BUF_LEN",
],
),
("Timeouts", ["QUIC_MAX_IDLE_TIMEOUT", "QUIC_CONNECT_TIMEOUT", "QUIC_MAX_PTO_TIMEOUT_MS"]),
]
print(f"\n{SEP}\n RECOMMENDED Kconfig VALUES\n{SEP}")
for title, keys in sections:
print(f"\n [{title}]")
for k in keys:
print(f" CONFIG_{k:<50} {cfg.get(k, 'n/a')}")
print(f"\n{SEP}\n ESTIMATED RAM USAGE\n{SEP}")
total = mem["TOTAL"]
for label, n in mem.items():
bar = "#" * int(n / total * 36)
marker = " <-- TOTAL" if label == "TOTAL" else ""
print(f" {label:<35} {human(n):>10} {bar}{marker}")
if p["ram_budget"]:
pct = total / p["ram_budget"] * 100
print(f"\n Budget utilisation: {pct:.1f}% ({human(total)} / {human(p['ram_budget'])})")
print(f"\n{SEP}\n prj.conf SNIPPET\n{SEP}\n")
for _, keys in sections:
for k in keys:
print(f"CONFIG_{k}={cfg.get(k, 'n/a')}")
print(f"""
{SEP}
ADDITIONAL NOTES
{SEP}""")
notes = [
"QUIC_INITIAL_MAX_DATA must be >= any single stream's INITIAL_MAX_STREAM_DATA"
" or the connection window becomes the bottleneck.",
"Set QUIC_STREAM_RX_BUFFER_SIZE = QUIC_STREAM_TX_BUFFER_SIZE for"
" symmetric request-response / echo patterns.",
f"TLS transcript total: {cfg['QUIC_TLS_TRANSCRIPT_BUF_LEN']} x"
f" {p['contexts']} contexts = {human(cfg['QUIC_TLS_TRANSCRIPT_BUF_LEN'] * p['contexts'])}.",
f"Crypto RX buffer total: {cfg['QUIC_CRYPTO_RX_BUFFER_SIZE']} x"
f" {cfg['QUIC_MAX_ENDPOINTS']} endpoints ="
f" {human(cfg['QUIC_CRYPTO_RX_BUFFER_SIZE'] * cfg['QUIC_MAX_ENDPOINTS'])}."
" Increase for browser interoperability (Chrome/Firefox).",
]
for note in notes:
print(f" * {note}")
print()
def gather_interactive():
print("\n" + "=" * 60)
print(" Zephyr QUIC Kconfig Optimizer -- Interactive Mode")
print("=" * 60)
print(" Press Enter to accept [default] values.\n")
p = {}
p["contexts"] = ask("Max QUIC connections (QUIC_MAX_CONTEXTS)", 1, int, min_val=1, max_val=255)
p["streams_bidi"] = ask("Bidirectional streams per connection", 3, int, min_val=1, max_val=255)
p["streams_uni"] = ask("Unidirectional streams per connection", 0, int, min_val=0, max_val=255)
p["mtu"] = ask("Network MTU in bytes", 1500, int, min_val=1280, max_val=9000)
p["ram_budget"] = ask("RAM budget in bytes (0 = no limit)", 0, int, min_val=0)
p["role"] = ask("Device role", "client", str, choices=["client", "server", "both"])
p["ip_version"] = ask("IP version(s)", "ipv4", str, choices=["ipv4", "ipv6", "both"])
p["rtt_ms"] = ask("Expected RTT in ms", 10, int, min_val=1)
p["loss_rate"] = ask("Expected packet loss rate (%)", 0.0, float, min_val=0.0, max_val=100.0)
p["app_message_size"] = ask("Typical application message size (bytes)", 1024, int, min_val=1)
p["ooo_expected"] = (
ask("Expect out-of-order delivery?", "no", str, choices=["yes", "no"]) == "yes"
)
p["network_type"] = ask("Network type", "lan", str, choices=["lan", "wan", "loopback"])
return p
def main():
ap = argparse.ArgumentParser(
description="Zephyr QUIC Kconfig Optimizer",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
allow_abbrev=False,
)
ap.add_argument("--contexts", type=int)
ap.add_argument("--streams-bidi", type=int)
ap.add_argument("--streams-uni", type=int)
ap.add_argument("--mtu", type=int)
ap.add_argument("--ram-budget", type=int, default=0, help="bytes; 0 = unlimited")
ap.add_argument("--role", choices=["client", "server", "both"])
ap.add_argument("--ip-version", choices=["ipv4", "ipv6", "both"])
ap.add_argument("--rtt-ms", type=int)
ap.add_argument("--loss-rate", type=float)
ap.add_argument("--app-message-size", type=int)
ap.add_argument("--ooo-expected", choices=["yes", "no"])
ap.add_argument("--network-type", choices=["lan", "wan", "loopback"])
args = ap.parse_args()
required = [
"contexts",
"streams_bidi",
"streams_uni",
"mtu",
"role",
"ip_version",
"rtt_ms",
"loss_rate",
"app_message_size",
"ooo_expected",
"network_type",
]
cli = {
"contexts": args.contexts,
"streams_bidi": args.streams_bidi,
"streams_uni": args.streams_uni,
"mtu": args.mtu,
"ram_budget": args.ram_budget,
"role": args.role,
"ip_version": args.ip_version,
"rtt_ms": args.rtt_ms,
"loss_rate": args.loss_rate,
"app_message_size": args.app_message_size,
"ooo_expected": args.ooo_expected,
"network_type": args.network_type,
}
if all(cli.get(k) is not None for k in required):
p = cli
p["ooo_expected"] = p["ooo_expected"] == "yes"
else:
p = gather_interactive()
try:
cfg, mem = compute_config(p)
except ValueError as e:
print(f"\nERROR: {e}\n", file=sys.stderr)
sys.exit(1)
print_report(p, cfg, mem)
if __name__ == "__main__":
main()