# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Wrapper for 'pw presubmit' in the project source tree."""

import collections
import dataclasses
import shlex
from typing import Sequence

from RECIPE_MODULES.pigweed.checkout import api as checkout_api
from PB.recipe_modules.pigweed.pw_presubmit import options as options_pb2
from recipe_engine import config_types, recipe_api, step_data

METADATA = {
    'binary_sizes': (('target', 12345), ('target.budget', 12346)),
    'test_runtimes': (('target', 200), ('target.max', 250)),
    'output_properties': (),
}


@dataclasses.dataclass
class Step:
    name: str
    dir: config_types.Path
    substeps: Sequence = dataclasses.field(default_factory=tuple)
    export_dir_name: str | None = dataclasses.field(default=None)
    metadata: dict = dataclasses.field(default_factory=dict)

    @property
    def export_dir(self) -> config_types.Path:
        if not self.export_dir_name:
            return None  # pragma: no cover
        return self.dir / self.export_dir_name


@dataclasses.dataclass
class PresubmitContext:
    options: options_pb2.Options
    root: config_types.Path
    checkout: checkout_api.CheckoutContext
    time_rng_seed: int
    _step_objects: dict[str, Step] = dataclasses.field(
        default_factory=collections.OrderedDict
    )
    list_steps_file: config_types.Path | None = dataclasses.field(default=None)
    cas_digests: dict[str, str] = dataclasses.field(default_factory=dict)

    def add_step(self, name: str, step: Step) -> None:
        self._step_objects[name] = step

    @property
    def steps(self) -> dict:
        return self._step_objects.values()


class PwPresubmitApi(recipe_api.RecipeApi):
    """Calls to checkout code."""

    def _step(self, ctx: PresubmitContext, step) -> Step:
        return Step(
            name=step['name'],
            dir=ctx.root / step['name'],
            substeps=step.get('substeps', ()),
            export_dir_name=ctx.options.export_dir_name,
        )

    def init(
        self,
        checkout: checkout_api.CheckoutContext,
        options: options_pb2.Options | None = None,
        root: config_types.Path | None = None,
    ) -> PresubmitContext:
        options.command_name = options.command_name or 'python -m pw_cli'

        ctx = PresubmitContext(
            options=options or self._options,
            checkout=checkout,
            root=root or (checkout.root / 'p'),
            time_rng_seed=self.m.time.ms_since_epoch(),
        )

        if not ctx.options.step and not ctx.options.program:
            raise self.m.step.StepFailure('no step or program properties')

        with self.m.step.nest('get steps from programs') as pres:
            args: List[str] = ['--only-list-steps']
            for program in ctx.options.program:
                args.extend(('--program', program))
            for step in ctx.options.step:
                args.extend(('--step', step))

            test_steps: List[str] = []
            for program in ctx.options.program:
                test_steps.append(f'{program}_0')
                test_steps.append(f'{program}_1')
            test_steps.extend(ctx.options.step)

            list_steps_data: Union[dict, List] = self._run(
                ctx,
                args,
                name='get steps',
                use_debug_log=False,
                stdout=self.m.json.output(),
                step_test_data=lambda: self.m.json.test_api.output_stream(
                    {
                        'all_files': ['foo.cc', 'foo.h'],
                        'steps': [{'name': x} for x in test_steps],
                    },
                ),
            ).stdout
            program_steps: List[dict]
            if isinstance(list_steps_data, dict):
                program_steps = list_steps_data['steps']
            else:
                program_steps = list_steps_data

            # The 'list steps file' is only written if the above run command
            # was successful. Whether 'pw presubmit' supports JSON output for
            # '--only-list-steps' correlates with whether it supports the
            # '--list-steps-file' argument (pwrev/116576).
            if program_steps is not None:
                ctx.list_steps_file = ctx.root / 'list_steps_file.json'
                self.m.file.write_json(
                    'write list steps file',
                    ctx.list_steps_file,
                    list_steps_data,
                )

            # TODO: b/234874288 - Remove this block. It's here until all
            # projects use the new output format with --only-list-steps.
            if program_steps is None:
                raw_steps: List[str] = (
                    self._run(
                        ctx,
                        args,
                        name=f'get steps text',
                        stdout=self.m.raw_io.output_text(),
                        step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                            '\n'.join(test_steps) + '\n'
                        ),
                        use_debug_log=False,
                    )
                    .stdout.strip()
                    .splitlines()
                )
                program_steps = [{'name': x} for x in raw_steps]

            for step in program_steps:
                ctx.add_step(
                    step['name'],
                    self._step(ctx, step),
                )

            pres.step_summary_text = '\n'.join(x['name'] for x in program_steps)

        return ctx

    @recipe_api.ignore_warnings('recipe_engine/PYTHON2_DEPRECATED')
    def _run(
        self,
        ctx: PresubmitContext,
        args: Sequence[str],
        name: str = 'run',
        use_debug_log: bool = True,
        substep: str | None = None,
        **kwargs,
    ) -> step_data.StepData:
        logging_args: Tuple[str, ...]
        if ctx.options.do_not_use_debug_log or not use_debug_log:
            logging_args = ('--loglevel', 'debug')
        else:
            logging_args = (
                '--debug-log',
                self.m.raw_io.output_text(
                    name='debug.log',
                    add_output_log=True,
                ),
            )

        cmd: List[str | config_types.Path] = ctx.options.command_name.split()
        cmd += [
            '--directory',
            ctx.checkout.root,
            *logging_args,
            'presubmit',
            '--output-directory',
            ctx.root,
        ]

        easy_rerun_cmd: List[str | config_types.Path] = ['pw', 'presubmit']

        if ctx.list_steps_file:
            cmd += ['--list-steps-file', ctx.list_steps_file]

        if ctx.options.continue_after_build_error:
            cmd.append('--continue-after-build-error')
            easy_rerun_cmd.append('--continue-after-build-error')

        if ctx.options.use_time_for_rng_seed:
            cmd.extend(('--rng-seed', ctx.time_rng_seed))

        cmd.extend(args)
        easy_rerun_cmd.extend(args)

        if ctx.options.only_on_changed_files:
            cmd.extend(('--base', 'HEAD~1'))
        elif not ctx.options.do_not_use_full_argument:
            cmd.append('--full')
            easy_rerun_cmd.append('--full')

        if substep:
            cmd.extend(('--substep', substep))

        def easy_rerun_step(status):
            rerun_step = self.m.step.empty(
                'easy rerun cmd',
                status='SUCCESS' if status == 'SUCCESS' else 'FAILURE',
            )
            rerun_step.presentation.step_summary_text = shlex.join(
                easy_rerun_cmd
            )

        with self.m.default_timeout():
            if self.m.resultdb.enabled:
                try:
                    result = self.m.step(
                        name,
                        self.m.resultdb.wrap(
                            cmd,
                            base_variant={
                                'builder': self.m.buildbucket.builder_name,
                                'step': name,
                            },
                            include=True,
                        ),
                        **kwargs,
                    )

                    if '--only-list-steps' not in easy_rerun_cmd:
                        easy_rerun_step(result.presentation.status)

                    return result

                except Exception:  # pragma: no cover
                    easy_rerun_step(f'FAILURE')
                    raise

            else:
                return self.m.step(name, cmd, **kwargs)

    def _process_metadata(self, step: Step) -> None:
        if not step.export_dir:
            return  # pragma: no cover

        for name, test_data in METADATA.items():
            step.metadata.setdefault(name, {})

            json_path: config_types.Path = step.export_dir / f'{name}.json'

            self.m.path.mock_add_file(json_path)
            if self.m.path.isfile(json_path):
                with self.m.step.nest(self.m.path.basename(json_path)):
                    step.metadata[name] = self.m.file.read_json(
                        'read', json_path, test_data=dict(test_data)
                    )

    def run(
        self,
        ctx: PresubmitContext,
        step: Step,
        env: dict[str, str] | None = None,
        log_dir: config_types.Path | None = None,
    ) -> None:
        with self.m.step.nest(step.name) as pres:
            args: List[str] = ['--step', step.name]

            if env and env.override_gn_args:
                for key, value in env.override_gn_args.items():
                    args.append('--override-gn-arg')
                    if isinstance(value, str):
                        args.append(f'{key}="{value}"')
                    else:
                        args.append(f'{key}={value!r}')

            for gn_arg in ctx.options.override_gn_arg:
                args.extend(('--override-gn-arg', gn_arg))

            with self.m.defer.context() as defer:
                if step.substeps:
                    for substep in step.substeps:
                        result = defer(
                            self._run, ctx, args, name=substep, substep=substep
                        )
                        if not result.is_ok():
                            break
                else:
                    result = defer(self._run, ctx, args, name=step.name)

                base_dir = step.dir
                bazel_output_base = step.dir / 'bazel.output.base'
                self.m.path.mock_add_file(bazel_output_base)
                if self.m.path.isfile(bazel_output_base):
                    new_base_dir = self.m.path.abs_to_path(
                        self.m.file.read_text(
                            f'read {bazel_output_base.name}',
                            bazel_output_base,
                            test_data=str(
                                self.m.path.tmp_base_dir / 'output-base'
                            ),
                        ).strip()
                    )
                    self.m.path.mock_add_directory(new_base_dir)
                    if self.m.path.isdir(new_base_dir):
                        base_dir = new_base_dir

                builder_manifest = step.dir / 'builder_manifest.json'
                if self.m.path.isfile(builder_manifest):
                    for cipd_manifest in self.m.file.read_json(
                        f'read {builder_manifest.pieces[-1]}',
                        builder_manifest,
                        test_data={'cipd_manifests': ['cipd-manifest.json']},
                    ).get('cipd_manifests', []):
                        cipd_manifest_path = step.dir / cipd_manifest

                        with (
                            self.m.default_timeout(),
                            self.m.step.nest(cipd_manifest_path.stem),
                        ):
                            defer(
                                self.m.cipd_upload.manifest,
                                manifest_path=cipd_manifest_path,
                                build_dir=base_dir,
                                checkout=ctx.checkout,
                                upload_to_cipd=(
                                    not self.m.buildbucket_util.is_dev_or_try
                                ),
                                cas_digests=ctx.cas_digests,
                            )

                defer(
                    self.m.gerrit_comment.maybe_post,
                    ctx.options.gerrit_comment,
                    result,
                )

                if log_dir:
                    step_log_dir = log_dir / step.name
                else:
                    log_dir = step.export_dir

                if step.export_dir:
                    defer(
                        self.m.file.ensure_directory,
                        f'mkdir {ctx.options.export_dir_name}',
                        step.export_dir,
                    )
                if log_dir and log_dir != step.export_dir:
                    defer(
                        self.m.file.ensure_directory,
                        'create log dir',
                        log_dir,
                    )

                # Suppress any above errors when exiting the context, unless
                # there are no errors below. (The save_logs module often
                # produces clearer error messages after parsing logs than those
                # produced by the failing step itself.)
                defer.suppress()

                defer(
                    self.m.save_logs,
                    dirs=(step.dir,),
                    export_dir=log_dir,
                    pres=pres,
                    step_passed=result.is_ok(),
                    step_name=step.name,
                )

                defer(self.m.file.listdir, 'ls out', step.dir, recursive=True)

                defer(self._process_metadata, step)

    @recipe_api.ignore_warnings('recipe_engine/PYTHON2_DEPRECATED')
    def build_id(self, ctx: PresubmitContext) -> str | None:
        command: List[str] = ctx.options.command_name.split()
        command.extend(['--directory', ctx.checkout.root, 'build-id'])
        step_data: step_data.StepData = self.m.step(
            'get build id',
            command,
            stdout=self.m.raw_io.output_text(),
            step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                '123-1234567890'
            ),
            ok_ret='any',
        )

        namespace: str | None = None
        if step_data.exc_result.retcode == 0:
            namespace: str = step_data.stdout.strip()
            if namespace == '0':
                namespace = None

        return namespace
