| # Copyright 2024 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| from __future__ import annotations |
| |
| import configparser |
| import dataclasses |
| import io |
| import re |
| from typing import TYPE_CHECKING |
| import urllib.parse |
| |
| from PB.recipe_modules.pigweed.checkout.options import ( |
| Options as CheckoutOptions, |
| ) |
| from recipe_engine import recipe_api |
| from RECIPE_MODULES.fuchsia.roll_commit_message import ( |
| api as roll_commit_message_api, |
| ) |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from typing import Generator |
| from PB.recipe_modules.pigweed.txt_roll.txt_entry import TxtEntry |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| |
| |
| class CopyRoll(roll_commit_message_api.BaseRoll): |
| |
| def __init__( |
| self, |
| destination_path: str, |
| old_value: str, |
| new_value: str, |
| *args, |
| **kwargs, |
| ): |
| super().__init__(*args, **kwargs) |
| self.destination_path = destination_path |
| self.old_value = old_value |
| self.new_value = new_value |
| |
| def short_name(self) -> str: |
| return self.destination_path |
| |
| def message_header(self, force_summary_version: bool = False) -> str: |
| header = self.destination_path |
| if force_summary_version: |
| return header |
| |
| try: |
| value_string = self.new_value.strip() |
| if '\n' not in value_string: |
| new_header = f'{header}: {value_string}' |
| if len(new_header) <= 62: |
| header = new_header |
| |
| except UnicodeDecodeError: # pragma: no cover |
| pass |
| |
| return header |
| |
| def message_body( |
| self, |
| *, |
| force_summary_version: bool = False, |
| escape_tags: Sequence[str] = (), |
| filter_tags: Sequence[str] = (), |
| ) -> str: |
| result = [] |
| if ( |
| '\n' not in str(self.old_value).strip() |
| and '\n' not in self.new_value.strip() |
| ): |
| if self.old_value is None: |
| result.append('Initialized to') # pragma: no cover |
| else: |
| result.append('From') |
| result.append(f' {self.old_value.strip()}') |
| result.append('To') |
| result.append(f' {self.new_value.strip()}') |
| result.append('') |
| |
| return '\n'.join(result) |
| |
| def message_footer(self, *, send_comment: bool) -> str: |
| return '' |
| |
| def output_property(self) -> dict[str, str]: |
| return { |
| "path": self.destination_path, |
| "old": self.old_value, |
| "new": self.new_value, |
| } |
| |
| |
| class CopyRollApi(recipe_api.RecipeApi): |
| |
| CopyRoll = CopyRoll |
| |
| def update( |
| self, |
| checkout: checkout_api.CheckoutContext, |
| copy_entry: CopyEntry, |
| ) -> list[CopyRoll]: |
| with self.m.step.nest(copy_entry.destination_path): |
| new_revision = self.m.git_roll_util.resolve_new_revision( |
| copy_entry.remote, |
| copy_entry.branch or 'main', |
| checkout.remotes_equivalent, |
| ) |
| |
| parsed_remote = urllib.parse.urlparse(copy_entry.remote) |
| |
| destination = checkout.root / copy_entry.destination_path |
| self.m.file.ensure_directory( |
| f'ensure directory {destination.parent}', |
| destination.parent, |
| ) |
| old_value: str | None = None |
| self.m.path.mock_add_file(destination) |
| if self.m.path.isfile(destination): |
| old_value = self.m.file.read_text( |
| f'read destination {copy_entry.destination_path}', |
| destination, |
| ) |
| |
| new_value = self.m.gitiles.fetch( |
| host=parsed_remote.netloc, |
| project=parsed_remote.path.lstrip('/'), |
| ref=new_revision, |
| path=copy_entry.source_path, |
| step_name=f'read source {copy_entry.source_path}', |
| ).decode() |
| |
| new = self.m.step.empty(f'new {copy_entry.source_path}') |
| new.presentation.step_summary_text = repr(new_value) |
| |
| if old_value == new_value: |
| pres = self.m.step.empty('values are identical').presentation |
| pres.step_summary_text = repr(old_value) |
| return [] |
| |
| self.m.file.write_text( |
| f'write destination {copy_entry.destination_path}', |
| destination, |
| new_value, |
| ) |
| |
| return [ |
| CopyRoll( |
| destination_path=copy_entry.destination_path, |
| old_value=old_value, |
| new_value=new_value, |
| ), |
| ] |