blob: 7543e046b194d28353ae8bc53dd066f546fe9330 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (c) 2023 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import json
import logging
import os
import shlex
import subprocess
import sys
import textwrap
import typing
from pathlib import Path
import click
import paths
from lxml import etree
from matter.testing.conformance import EMPTY_CLUSTER_GLOBAL_ATTRIBUTES, ConformanceDecision
from matter.testing.spec_parsing import build_xml_clusters, build_xml_device_types
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s] %(message)s")
# Replace hardcoded paths with dynamic paths using paths.py functions
DEFAULT_CHIP_ROOT = paths.get_chip_root()
try:
import generate_data_model_xmls_gni
except ModuleNotFoundError:
sys.path.append(os.path.join(paths.get_python_testing_path(), 'matter_testing_infrastructure'))
import generate_data_model_xmls_gni
CURRENT_IN_PROGRESS_DEFINES = [
"ambient-context-sensor",
"ambientsensing",
"cameras",
"cameras-image-rotation",
"groupcast",
"hrap-pdc",
"hrap-tbrd",
"hvac-thermostat-events",
"hvac-thermostat-suggestions",
"improved-capabilities-tcr",
"partitioned-crl",
"security-sensor-events-tcr",
"tcr-smokeco-unmounted-state",
"temperature-alarm",
"thread-commissioning",
]
def get_xml_path(filename, output_dir):
xml = os.path.basename(filename).replace('.adoc', '.xml')
return os.path.abspath(os.path.join(output_dir, xml))
def make_asciidoc(target: str, include_in_progress: str, spec_dir: str, dry_run: bool) -> str:
cmd = ['make', 'PRINT_FILENAMES=1']
if include_in_progress == 'All':
cmd.append('INCLUDE_IN_PROGRESS=1')
elif include_in_progress == 'Current':
cmd.append(f'INCLUDE_IN_PROGRESS={" ".join(CURRENT_IN_PROGRESS_DEFINES)}')
cmd.append(target)
if dry_run:
log.info("Executing: %s", shlex.join(cmd))
return ''
ret = subprocess.check_output(cmd, cwd=spec_dir).decode('UTF-8').rstrip()
log.info(ret)
return ret
@click.command()
@click.option(
'--scraper',
required=False, # Required when --skip-scrape is not set. Checked in main().
type=str,
help='Path to the location of the scraper tool')
@click.option(
'--spec-root',
required=False, # Required when --skip-scrape is not set. Checked in main().
type=str,
help='Path to the spec root')
@click.option(
'--output-dir',
required=True,
help='Path to output xml files')
@click.option(
'--dry-run',
default=False,
is_flag=True,
help='Flag for dry run')
@click.option(
'--include-in-progress',
type=click.Choice(['All', 'None', 'Current']), default='All')
@click.option(
'--skip-scrape',
help='Runs only the updates that happen after the scrape from the spec repo',
is_flag=True,
default=False,
)
@click.option(
'--pre-1-2',
help='Flag to set when generating spec for pre 1.2 spec versions',
is_flag=True,
default=False,
)
def main(scraper, spec_root, output_dir, dry_run, include_in_progress, skip_scrape, pre_1_2):
'''
This script is used to generate the data model XML files found in the SDK
data_model directory.
This script runs against a checked out version of the specification
repo (https://github.com/CHIP-Specifications/connectedhomeip-spec).
Check out the desired spec revision, then pass the root of
the checkout to the script in the --spec-root parameter.
This script can set the desired include flags for the data model
files. For in-progress (ballot) specification revisions, the
include level will be "Current". This sets the include flags to
the set of flags at the top of this file. When generating files for
an in-progress spec, ensure the include flags match the set of
includes noted in the ballot. For specifications that are being
finalized, where the includes have been removed from the spec in
preparation for release, use "None". "All" can be used to get a
full tip-of-tree spec data model. None of the files checked into
the repo currently use "All".
The output directory will be data_model/<desired_revision>, passed
in through the --output-dir flag.
This script uses alchemy (https://github.com/project-chip/alchemy) for
creating the data model XML files. Download the desired release package
and pass the location of the alchemy executable in the --scraper parameter.
Information on structuring PRs for data model files and adding new files
to the supporting test script can be found in the README.md file in
the data_model folder.
'''
if not skip_scrape:
if not scraper:
log.error("Missing --scraper option. It is required when --skip-scrape is not used.")
return
if not spec_root:
log.error('Missing --spec-root option. It is required when --skip-scrape is not used.')
return
scrape_all(scraper, spec_root, output_dir, dry_run, include_in_progress, pre_1_2)
if not dry_run:
dump_versions(scraper, spec_root, output_dir)
if not dry_run:
cleanup_old_spec_dms(output_dir)
dump_json_ids(output_dir)
dump_ids_from_data_model_dirs()
# Update all the files in the python wheels
generate_data_model_xmls_gni.generate_dm_gni_file()
def scrape_all(scraper, spec_root, output_dir, dry_run, include_in_progress, pre_1_2):
log.info('Scraping main spec - this may take several seconds')
cmd = [scraper, 'dm', '--dmRoot', output_dir, '--specRoot', spec_root, '--force']
if include_in_progress == 'All':
cmd.extend(['-a', 'in-progress'])
elif include_in_progress == 'Current':
for d in CURRENT_IN_PROGRESS_DEFINES:
cmd.extend(['-a'])
cmd.extend([d])
if dry_run:
log.info("Executing: %s", shlex.join(cmd))
return
subprocess.run(cmd, check=True)
# Versions prior to 1.2 do not support file exclusion based on the pdf make
if pre_1_2:
return
log.info('Generating main spec to get file include list - this may take a few minutes')
main_out = make_asciidoc('pdf', include_in_progress, spec_root, dry_run)
log.info("Generating cluster spec to get file include list - this may take a few minutes")
cluster_out = make_asciidoc('pdf-appclusters-book', include_in_progress, spec_root, dry_run)
log.info("Generating device type library to get file include list - this may take a few minutes")
device_type_files = make_asciidoc('pdf-devicelibrary-book', include_in_progress, spec_root, dry_run)
log.info("Generating namespaces to get file include list - this may take a few minutes")
namespace_files = make_asciidoc('pdf-standardnamespaces-book', include_in_progress, spec_root, dry_run)
cluster_files = main_out + cluster_out
# Remove all the files that weren't compiled into the spec
clusters_output_dir = os.path.join(output_dir, 'clusters')
device_types_output_dir = os.path.abspath(os.path.join(output_dir, 'device_types'))
namespaces_output_dir = os.path.abspath(os.path.join(output_dir, 'namespaces'))
for filename in os.listdir(clusters_output_dir):
# There are a couple of clusters that appear in the same adoc file and they have prefixes.
# Look for these specifically.
# For 1.5 onward, we should separate these.
if "Label-Cluster" in filename or "bridge-clusters" in filename:
continue
adoc = os.path.basename(filename).replace('.xml', '.adoc')
if adoc not in cluster_files:
log.info("Removing '%s' as it was not in the generated spec document", adoc)
os.remove(os.path.join(clusters_output_dir, filename))
for filename in os.listdir(device_types_output_dir):
# Closures also has multiple device types in one file, should fix this, but for now a workaround:
if filename.startswith('ClosureBaseDeviceTypes'):
continue
adoc = os.path.basename(filename).replace('.xml', '.adoc')
if adoc not in device_type_files:
log.info("Removing '%s' as it was not in the generated spec document", adoc)
os.remove(os.path.join(device_types_output_dir, filename))
for filename in os.listdir(namespaces_output_dir):
adoc = os.path.basename(filename).replace('.xml', '.adoc')
if adoc not in namespace_files:
log.info("Removing '%s' as it was not in the generated spec document", adoc)
os.remove(os.path.join(namespaces_output_dir, filename))
def cleanup_old_spec_dms(output_dir):
''' There are a few old spec versions that have the same longstanding problems.
Unfortunately, we don't have specific branches for all these spec versions
since some were only tagged, and some were re-used for dot releases.
Applying the fixes here. Alchemy will continue to provide a faithful
representation of the spec, and going forward, we should correct these
types of problems in the spec where possible.
'''
def _fix_door_lock_device_type_features():
'''Door lock device type has a typo in the required features
for the user feature in the door lock cluster element requirements.
- FPG should be FGP (fingerprint)
'''
changed = False
filename = os.path.join(output_dir, 'device_types', 'DoorLock.xml')
if not os.path.exists(filename):
return
with open(filename, 'rt+') as file:
tree = etree.parse(file)
root = tree.getroot()
for feature_tag in root.iter('feature'):
if feature_tag.attrib.get('name', '') == 'FPG':
feature_tag.attrib['name'] = 'FGP'
changed = True
for condition_tag in root.iter('condition'):
if condition_tag.attrib.get('name', '') == 'FPG':
condition_tag.attrib['name'] = 'FGP'
condition_tag.tag = 'feature'
changed = True
if changed:
tree.write(filename, pretty_print=True, xml_declaration=True, encoding='utf-8')
def _fix_pre_1_3_base_device_type():
''' The 1.2 spec tables in the base device type are all weird because they
include separate tables for matter and zigbee still. Given that those
specs are frozen, add in the information from the missing table.
This is taken directly from the 1.3 base device type specification.
There were no changes between 1.0 and 1.3 to this device type.
'''
missing_pre_1_3_base_device_type_clusters = '''
<clusters>
<cluster id="0x001D" name="Descriptor" side="server">
<mandatoryConform/>
<features>
<feature code="" name="TagList">
<mandatoryConform>
<condition name="Duplicate"/>
</mandatoryConform>
</feature>
</features>
</cluster>
<cluster id="0x001E" name="Binding" side="server">
<mandatoryConform>
<andTerm>
<condition name="Simple"/>
<condition name="Client"/>
</andTerm>
</mandatoryConform>
</cluster>
<cluster id="0x0040" name="Fixed Label" side="server">
<optionalConform/>
</cluster>
<cluster id="0x0041" name="User Label" side="server">
<optionalConform/>
</cluster>
</clusters>
'''
changed = False
filename = os.path.join(output_dir, 'device_types', 'BaseDeviceType.xml')
if not os.path.exists(filename):
return
with open(filename, 'rt+') as file:
tree = etree.parse(file)
root = tree.getroot()
if root.find('clusters') is None:
# Cluster info is missing and we need to add it
new_xml = etree.fromstring(missing_pre_1_3_base_device_type_clusters)
device_type = root.find('deviceType')
if device_type is None:
log.error("Unable to locate device type tag in BaseDeviceType")
return
device_type.append(new_xml)
changed = True
if changed:
tree.write(filename, pretty_print=True, xml_declaration=True, encoding='utf-8')
def _fix_sit_lit_type():
# In some spec revisions, the SIT and LIT conditions are listed as features.
# This is probably fixable in alchemy, but let's just fix here until that
# gets pushed through.
changed = False
filename = os.path.join(output_dir, 'device_types', 'RootNodeDeviceType.xml')
if not os.path.exists(filename):
return
with open(filename, 'rt+') as file:
tree = etree.parse(file)
root = tree.getroot()
for tag in root.iter('feature'):
if tag.attrib.get('name', '') in ['SIT', 'LIT']:
# These should be conditions
tag.tag = 'condition'
changed = True
if changed:
tree.write(filename, pretty_print=True, xml_declaration=True, encoding='utf-8')
_fix_door_lock_device_type_features()
_fix_pre_1_3_base_device_type()
_fix_sit_lit_type()
def dump_versions(scraper, spec_root, output_dir):
sha_file = os.path.abspath(os.path.join(output_dir, 'spec_sha'))
tag_file = os.path.abspath(os.path.join(output_dir, 'spec_tag'))
out = subprocess.run(['git', 'rev-parse', 'HEAD'],
capture_output=True, encoding="utf8", cwd=spec_root)
sha = out.stdout
with open(sha_file, 'wt', encoding='utf8') as output:
output.write(sha)
cmd = ['git', 'show-ref', '--tags']
out = subprocess.run(cmd, capture_output=True, encoding="utf-8", cwd=spec_root)
tags = out.stdout.splitlines()
tag = [t for t in tags if sha.strip() in t]
if tag:
with open(tag_file, 'wt', encoding='utf8') as output:
output.write(f'{tag[0].split("/")[-1]}\n')
else:
log.warning("WARNING: no tag found for sha '%s'", sha)
with contextlib.suppress(FileNotFoundError):
os.remove(tag_file)
scraper_file = os.path.abspath(os.path.join(output_dir, 'scraper_version'))
version_cmd = 'version'
out = subprocess.run([scraper, version_cmd],
capture_output=True, encoding="utf8")
version = out.stdout
with open(scraper_file, "wt", encoding='utf8') as output:
output.write(f'alchemy {version}')
def dump_json_ids(output_dir):
clusters_output_dir = os.path.abspath(
os.path.join(output_dir, 'clusters'))
device_types_output_dir = os.path.abspath(
os.path.join(output_dir, 'device_types'))
clusters_json_file = os.path.join(clusters_output_dir, 'cluster_ids.json')
device_type_json_file = os.path.join(device_types_output_dir, 'device_type_ids.json')
clusters, _ = build_xml_clusters(Path(clusters_output_dir))
device_types, _ = build_xml_device_types(Path(device_types_output_dir))
def write_json(elements: dict, path: str) -> None:
json_dict = {id: c.name for id, c in sorted(elements.items())}
with open(path, "wt+") as outfile:
json.dump(json_dict, outfile, indent=4)
outfile.write('\n')
write_json(clusters, clusters_json_file)
write_json(device_types, device_type_json_file)
IdToNameDict = dict[int, str]
IdToPICSCodeDict = dict[int, str]
IdToSupportedDict = dict[int, str]
def dump_ids_from_data_model_dirs():
all_clusters: IdToNameDict = {}
all_device_types: IdToNameDict = {}
version_clusters: dict[str, IdToSupportedDict] = {}
version_device_types: dict[str, IdToSupportedDict] = {}
pics_code_clusters: IdToPICSCodeDict = {}
data_model_path = paths.get_data_model_path()
data_model_dirs = [d for d in os.listdir(data_model_path) if os.path.isdir(os.path.join(data_model_path, d))]
# Don't include master right now - it's in progress, not official
data_model_dirs = sorted([d for d in data_model_dirs if 'master' not in d])
for dir in data_model_dirs:
clusters, _ = build_xml_clusters(Path(os.path.join(data_model_path, dir, 'clusters')))
device_types, _ = build_xml_device_types(Path(os.path.join(data_model_path, dir, 'device_types')))
def cluster_support_str(c):
return "P" if c.is_provisional else "C"
version_clusters[dir] = {id: cluster_support_str(c) for id, c in clusters.items()}
# Update the PICS Codes
for id, c in clusters.items():
if id not in pics_code_clusters:
pics_code_clusters[id] = c.pics
elif pics_code_clusters[id] != c.pics:
log.warning("PICS Code is inconsistent among different versions of the spec for cluster ID #%s!", id)
# Device types don't currently have provisional markings in the spec
# But a device type can't be certified if it has mandatory clusters that are provisional
# TODO: create provisional
def device_type_support_str(d):
log.info("checking device type for '%s' for '%s'", d.name, dir)
dt_server_mandatory = [id for id, requirement in d.server_clusters.items() if requirement.conformance(
EMPTY_CLUSTER_GLOBAL_ATTRIBUTES).decision == ConformanceDecision.MANDATORY]
server_provisional = [clusters[c].name for c in dt_server_mandatory if clusters[c].is_provisional]
dt_client_mandatory = [id for id, requirement in d.client_clusters.items(
) if requirement.conformance(EMPTY_CLUSTER_GLOBAL_ATTRIBUTES).decision == ConformanceDecision.MANDATORY]
client_provisional = [clusters[c].name for c in dt_client_mandatory if clusters[c].is_provisional]
if server_provisional or client_provisional:
log.info("Found provisional mandatory clusters server:'%s' "
"client: '%s' in device type '%s' for revision '%s'",
server_provisional, client_provisional, d.name, dir)
return "P"
return "C"
version_device_types[dir] = {id: device_type_support_str(d) for id, d in device_types.items()}
all_clusters.update({id: c.name for id, c in clusters.items()})
all_device_types.update({id: d.name for id, d in device_types.items()})
def print_out_ids(filename: Path,
spec_section: str,
id_to_name: IdToNameDict,
supported: dict[str, IdToSupportedDict],
pics_codes: typing.Optional[IdToPICSCodeDict] = None):
header = f'# List of currently defined spec {spec_section}\n'
header += 'This file was **AUTOMATICALLY** generated by `python scripts/generate_spec_xml.py`. DO NOT EDIT BY HAND!\n\n'
table = '''
The following markers are used in this document (matches the ID master list):
| State | Description |
|-------|--------------------------------|
| blank | Not supported in this revision |
| C | Certifiable |
| P | Provisional |
'''
header = header + textwrap.dedent(table)
# Name Column
all_name_lens = [len(n) for n in id_to_name.values()]
name_len = max(all_name_lens)
title_name_raw = ' Name '
title_name = f'{title_name_raw:<{name_len}}'
# ID Columns
title_id_decimal = ' ID (Decimal) '
dec_len = len(title_id_decimal)
title_id_hex = ' ID (hex) '
hex_len = len(title_id_hex)
# Create title and hashes
title = f'|{title_id_decimal}|{title_id_hex}|{title_name}|'
hashes = f'|{"-" * dec_len}|{"-" * hex_len}|{"-" * name_len}|'
# PICS code column (only for clusters)
if pics_codes is not None:
all_pics_code_lens = [len(p) for p in pics_code_clusters.values()]
pics_code_len = max(all_pics_code_lens)
title_pics_code_raw = ' PICS Code '
title_pics_code = f'{title_pics_code_raw:<{pics_code_len}}'
# Update title and hashes
title += f'{title_pics_code}|'
hashes += f'{"-" * pics_code_len}|'
version_len: dict[str, int] = {}
for dir in supported:
tag_path = os.path.join(paths.get_data_model_path(), dir, 'spec_tag')
try:
with open(tag_path, "r") as tag_file:
version = tag_file.read().strip()
except FileNotFoundError:
version = dir
title += f'{version}|'
version_len[dir] = len(version)
hashes += f'{"-" * version_len[dir]}|'
table_header = f'{title}\n{hashes}\n'
lines = []
for id, name in sorted(id_to_name.items()):
hex_id = f'0x{id:04X}'
line = f'|{id:<{dec_len}}|{hex_id:<{hex_len}}|{name:<{name_len}}|'
# Add PICS code (only for clusters)
if pics_codes is not None:
line += f'{pics_codes[id]:<{pics_code_len}}|'
for dir in supported:
try:
line += f'{supported[dir][id]:<{version_len[dir]}}|'
except KeyError:
line += f'{" " * version_len[dir]}|'
lines.append(line)
with open(filename, 'w') as fout:
fout.write(header)
fout.write(table_header)
fout.write("\n".join(lines))
fout.write("\n")
print_out_ids(paths.get_cluster_documentation_file_path(), "clusters", all_clusters, version_clusters, pics_code_clusters)
print_out_ids(paths.get_device_types_documentation_file_path(), "device types", all_device_types, version_device_types)
if __name__ == '__main__':
main()