blob: ce43abd33a8f94870b985ce9c9864d1634a5bd7b [file] [log] [blame]
# Copyright 2019 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Test API for cq_deps."""
import base64
import json
from typing import Sequence
from PB.recipe_modules.pigweed.cq_deps.properties import InputProperties
from recipe_engine import post_process, recipe_test_api
class CqDepsTestApi(recipe_test_api.RecipeTestApi):
def properties(self, **kwargs):
return {'$pigweed/cq_deps': InputProperties(**kwargs)}
def lookup_cl(self, commit_hash, number=None, prefix='', found=True):
if not found:
return self.override_step_data(
f'{prefix}resolve CL deps.number {commit_hash}',
self.m.json.output(None),
)
return self.override_step_data(
f'{prefix}resolve CL deps.number {commit_hash}',
self.m.json.output([{'_number': number}] if number else []),
)
def _step_name(self, prefix, name, n):
return '{}resolve CL deps.details {}{}'.format(
prefix,
name,
' ({})'.format(n) if n and n > 1 else '',
)
def patches_json(
self,
origin: str,
*reqs: Sequence[str],
prefix: str = '',
):
"""Test data for the retrieval of patches.json.
Args:
origin: The Gerrit name plus the change number, separated by a colon
(e.g., "pigweed:12345")
reqs: A list of requirements of the origin, in the same format as
the origin.
prefix: Prefix for the step name.
"""
if prefix and not prefix.endswith('.'):
# Next line is really only here for tests outside this module.
prefix += '.' # pragma: no cover
patches = []
for req in reqs:
host, number = req.split(':')
number = int(number)
patches.append({'gerrit_name': host, 'number': number})
return self.m.gitiles.fetch(
f'{prefix}resolve CL deps.resolve deps for {origin}.'
'fetch patches.json',
json.dumps(patches).encode(),
)
def patches_json_error(self, origin, status_code=404, **kwargs):
"""Test data for the unsuccessful retrieval of patches.json.
Args:
origin: The Gerrit name plus the change number, separated by a colon
(e.g., "pigweed:12345")
status_code: The HTTP error code from the attempt to read
patches.json.
prefix: Prefix for the step name.
"""
prefix = kwargs.pop('prefix', '')
assert not kwargs
if prefix and not prefix.endswith('.'):
# Next line is really only here for tests outside this module.
prefix += '.' # pragma: no cover
return self.m.url.error(
f'{prefix}resolve CL deps.resolve deps for {origin}.'
'fetch patches.json',
status_code,
'error',
**kwargs,
)
def details(
self,
name,
message='',
status='NEW',
project='project',
patchset=1,
prefix='',
commit_hash='HASH',
parent=None,
n=None,
patches_json=False,
topic=None,
):
"""Test data for the retrieval of change details from Gerrit."""
number = int(name.split(':')[1])
if not parent:
parent = f'parent-{number}'
assert status in ('NEW', 'MERGED', 'ABANDONED')
if patches_json:
files = {'patches.json': {'status': 'A'}}
else:
files = {'foo.py': {'status': 'A'}}
return self.override_step_data(
self._step_name(prefix, name, n),
self.m.json.output(
{
'_number': number,
'current_revision': commit_hash,
'project': project,
'revisions': {
commit_hash: {
'_number': patchset,
'commit': {
'message': message,
'parents': [
{
'commit': parent,
}
],
},
'files': files,
},
},
'status': status,
'topic': topic or '',
}
),
)
def transient(self, name, prefix='', n=None):
return self.override_step_data(
self._step_name(prefix, name, n),
self.m.json.output({'transient': True}),
retcode=1,
)
def forbidden(self, name, prefix='', attempts=2):
data = []
for i in range(0, attempts):
data.append(
self.override_step_data(
self._step_name(prefix, name, i + 1),
self.m.json.output({}),
retcode=1,
)
)
return sum(data[1:], data[0])
def has_deps(self, *names):
results = []
for name in names:
results.append(
self.post_process(
post_process.MustRun,
f'final deps.{name}',
)
)
return sum(results[1:], results[0])
def lacks_deps(self, *names):
results = []
for name in names:
results.append(
self.post_process(
post_process.DoesNotRun,
f'final deps.{name}',
)
)
return sum(results[1:], results[0])