|  | # Copyright (c) 2022 Nordic Semiconductor ASA | 
|  | # Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. | 
|  | # | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  |  | 
|  | from __future__ import annotations | 
|  |  | 
|  | import logging | 
|  | import re | 
|  | from dataclasses import dataclass, field | 
|  | from pathlib import Path | 
|  |  | 
|  | import yaml | 
|  |  | 
|  | try: | 
|  | from yaml import CSafeLoader as SafeLoader | 
|  | except ImportError: | 
|  | from yaml import SafeLoader | 
|  |  | 
|  |  | 
|  | logger = logging.getLogger(__name__) | 
|  |  | 
|  |  | 
|  | class QuarantineException(Exception): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class Quarantine: | 
|  | """Handle tests under quarantine.""" | 
|  |  | 
|  | def __init__(self, quarantine_list=None) -> None: | 
|  | if quarantine_list is None: | 
|  | quarantine_list = [] | 
|  | self.quarantine = QuarantineData() | 
|  | for quarantine_file in quarantine_list: | 
|  | self.quarantine.extend(QuarantineData.load_data_from_yaml(quarantine_file)) | 
|  |  | 
|  | def get_matched_quarantine(self, testname, platform, architecture, simulator): | 
|  | qelem = self.quarantine.get_matched_quarantine(testname, platform, architecture, simulator) | 
|  | if qelem: | 
|  | logger.debug(f'{testname} quarantined with reason: {qelem.comment}') | 
|  | return qelem.comment | 
|  | return None | 
|  |  | 
|  |  | 
|  | @dataclass | 
|  | class QuarantineElement: | 
|  | scenarios: list[str] = field(default_factory=list) | 
|  | platforms: list[str] = field(default_factory=list) | 
|  | architectures: list[str] = field(default_factory=list) | 
|  | simulations: list[str] = field(default_factory=list) | 
|  | comment: str = 'NA' | 
|  | re_scenarios: list = field(default_factory=list) | 
|  | re_platforms: list = field(default_factory=list) | 
|  | re_architectures: list = field(default_factory=list) | 
|  | re_simulations: list = field(default_factory=list) | 
|  |  | 
|  | def __post_init__(self): | 
|  | # If there is no entry in filters then take all possible values. | 
|  | # To keep backward compatibility, 'all' keyword might be still used. | 
|  | if 'all' in self.scenarios: | 
|  | self.scenarios = [] | 
|  | if 'all' in self.platforms: | 
|  | self.platforms = [] | 
|  | if 'all' in self.architectures: | 
|  | self.architectures = [] | 
|  | if 'all' in self.simulations: | 
|  | self.simulations = [] | 
|  | # keep precompiled regexp entiries to speed-up matching | 
|  | self.re_scenarios = [re.compile(pat) for pat in self.scenarios] | 
|  | self.re_platforms = [re.compile(pat) for pat in self.platforms] | 
|  | self.re_architectures = [re.compile(pat) for pat in self.architectures] | 
|  | self.re_simulations = [re.compile(pat) for pat in self.simulations] | 
|  |  | 
|  | # However, at least one of the filters ('scenarios', platforms' ...) | 
|  | # must be given (there is no sense to put all possible configuration | 
|  | # into quarantine) | 
|  | if not any([self.scenarios, self.platforms, self.architectures, self.simulations]): | 
|  | raise QuarantineException("At least one of filters ('scenarios', 'platforms' ...) " | 
|  | "must be specified") | 
|  |  | 
|  |  | 
|  | @dataclass | 
|  | class QuarantineData: | 
|  | qlist: list[QuarantineElement] = field(default_factory=list) | 
|  |  | 
|  | def __post_init__(self): | 
|  | qelements = [] | 
|  | for qelem in self.qlist: | 
|  | if isinstance(qelem, QuarantineElement): | 
|  | qelements.append(qelem) | 
|  | else: | 
|  | qelements.append(QuarantineElement(**qelem)) | 
|  | self.qlist = qelements | 
|  |  | 
|  | @classmethod | 
|  | def load_data_from_yaml(cls, filename: str | Path) -> QuarantineData: | 
|  | """Load quarantine from yaml file.""" | 
|  | with open(filename, encoding='UTF-8') as yaml_fd: | 
|  | qlist_raw_data: list[dict] = yaml.load(yaml_fd, Loader=SafeLoader) | 
|  | try: | 
|  | if not qlist_raw_data: | 
|  | # in case of loading empty quarantine file | 
|  | return cls() | 
|  | return cls(qlist_raw_data) | 
|  |  | 
|  | except Exception as e: | 
|  | logger.error(f'When loading {filename} received error: {e}') | 
|  | raise QuarantineException('Cannot load Quarantine data') from e | 
|  |  | 
|  | def extend(self, qdata: QuarantineData) -> None: | 
|  | self.qlist.extend(qdata.qlist) | 
|  |  | 
|  | def get_matched_quarantine(self, | 
|  | scenario: str, | 
|  | platform: str, | 
|  | architecture: str, | 
|  | simulator_name: str) -> QuarantineElement | None: | 
|  | """Return quarantine element if test is matched to quarantine rules""" | 
|  | for qelem in self.qlist: | 
|  | matched: bool = False | 
|  | if (qelem.scenarios | 
|  | and (matched := _is_element_matched(scenario, qelem.re_scenarios)) is False): | 
|  | continue | 
|  | if (qelem.platforms | 
|  | and (matched := _is_element_matched(platform, qelem.re_platforms)) is False): | 
|  | continue | 
|  | if ( | 
|  | qelem.architectures | 
|  | and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False | 
|  | ): | 
|  | continue | 
|  | if ( | 
|  | qelem.simulations | 
|  | and (matched := _is_element_matched(simulator_name, qelem.re_simulations)) is False | 
|  | ): | 
|  | continue | 
|  |  | 
|  | if matched: | 
|  | return qelem | 
|  | return None | 
|  |  | 
|  |  | 
|  | def _is_element_matched(element: str, list_of_elements: list[re.Pattern]) -> bool: | 
|  | """Return True if given element is matching to any of elements from the list""" | 
|  | return any(pattern.fullmatch(element) for pattern in list_of_elements) |