blob: cd21b7600561e6c709ecca085572d38204e399c9 [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from __future__ import annotations
import configparser
import dataclasses
import io
import re
from typing import TYPE_CHECKING
import urllib.parse
from PB.recipe_modules.pigweed.checkout.options import (
Options as CheckoutOptions,
)
from recipe_engine import recipe_api
from RECIPE_MODULES.fuchsia.roll_commit_message import (
api as roll_commit_message_api,
)
if TYPE_CHECKING: # pragma: no cover
from typing import Generator
from PB.recipe_modules.pigweed.txt_roll.txt_entry import TxtEntry
from RECIPE_MODULES.pigweed.checkout import api as checkout_api
class CopyRoll(roll_commit_message_api.BaseRoll):
def __init__(
self,
destination_path: str,
old_value: str,
new_value: str,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.destination_path = destination_path
self.old_value = old_value
self.new_value = new_value
def short_name(self) -> str:
return self.destination_path
def message_header(self, force_summary_version: bool = False) -> str:
header = self.destination_path
if force_summary_version:
return header
try:
value_string = self.new_value.strip()
if '\n' not in value_string:
new_header = f'{header}: {value_string}'
if len(new_header) <= 62:
header = new_header
except UnicodeDecodeError: # pragma: no cover
pass
return header
def message_body(
self,
*,
force_summary_version: bool = False,
escape_tags: Sequence[str] = (),
filter_tags: Sequence[str] = (),
) -> str:
result = []
if (
'\n' not in str(self.old_value).strip()
and '\n' not in self.new_value.strip()
):
if self.old_value is None:
result.append('Initialized to') # pragma: no cover
else:
result.append('From')
result.append(f' {self.old_value.strip()}')
result.append('To')
result.append(f' {self.new_value.strip()}')
result.append('')
return '\n'.join(result)
def message_footer(self, *, send_comment: bool) -> str:
return ''
def output_property(self) -> dict[str, str]:
return {
"path": self.destination_path,
"old": self.old_value,
"new": self.new_value,
}
class CopyRollApi(recipe_api.RecipeApi):
CopyRoll = CopyRoll
def update(
self,
checkout: checkout_api.CheckoutContext,
copy_entry: CopyEntry,
) -> list[CopyRoll]:
with self.m.step.nest(copy_entry.destination_path):
new_revision = self.m.git_roll_util.resolve_new_revision(
copy_entry.remote,
copy_entry.branch or 'main',
checkout.remotes_equivalent,
)
parsed_remote = urllib.parse.urlparse(copy_entry.remote)
destination = checkout.root / copy_entry.destination_path
self.m.file.ensure_directory(
f'ensure directory {destination.parent}',
destination.parent,
)
old_value: str | None = None
self.m.path.mock_add_file(destination)
if self.m.path.isfile(destination):
old_value = self.m.file.read_text(
f'read destination {copy_entry.destination_path}',
destination,
)
new_value = self.m.gitiles.fetch(
host=parsed_remote.netloc,
project=parsed_remote.path.lstrip('/'),
ref=new_revision,
path=copy_entry.source_path,
step_name=f'read source {copy_entry.source_path}',
).decode()
new = self.m.step.empty(f'new {copy_entry.source_path}')
new.presentation.step_summary_text = repr(new_value)
if old_value == new_value:
pres = self.m.step.empty('values are identical').presentation
pres.step_summary_text = repr(old_value)
return []
self.m.file.write_text(
f'write destination {copy_entry.destination_path}',
destination,
new_value,
)
return [
CopyRoll(
destination_path=copy_entry.destination_path,
old_value=old_value,
new_value=new_value,
),
]