blob: c1bc8316fe945cd16fcdc228920dd4ac514ecada [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Abandon old changes owned by the current service account."""
from __future__ import annotations
from typing import TYPE_CHECKING
from recipe_engine import recipe_api
if TYPE_CHECKING: # pragma: no cover
from recipe_engine import config_types, engine_types
class AbandonOldChangesApi(recipe_api.RecipeApi):
"""Abandon old changes owned by the current service account."""
def __call__(
self,
*,
host: str,
age_days: int = 30,
continue_on_failure: bool = True,
max_to_abandon: int = 10,
):
with self.m.step.nest('abandon old changes'):
try:
self._abandon_old_changes(
host=self.m.gerrit.host_from_remote_url(host),
age_days=age_days,
max_to_abandon=max_to_abandon,
)
self.m.step.empty('success')
except self.m.step.StepFailure:
self.m.step.empty('failure')
if not continue_on_failure:
raise # pragma: no cover
def _abandon_old_changes(
self,
*,
host: str,
age_days: int,
max_to_abandon: int,
):
query_string = (
f'is:open '
f'owner:{self.m.buildbucket.swarming_task_service_account} '
f'age:{age_days}d '
)
changes = self.m.gerrit.change_query(
name='get old changes',
query_string=query_string,
host=host,
max_attempts=1,
timeout=30,
test_data=self.m.json.test_api.output(
[
{'_number': 1001},
{'_number': 1002},
],
),
).json.output
futures = []
for change in changes[:max_to_abandon]:
futures.append(
self.m.futures.spawn(
self.m.gerrit.abandon,
f'abandon {change["_number"]}',
change_id=change['_number'],
message='Abandoning orphaned roll change.',
host=host,
),
)
self.m.futures.wait(futures)