rerunner: Use luci-scheduler to launch builds

Use luci-scheduler instead of buildbucket to launch builds. This will
limit most builders to at most one concurrent build since luci-scheduler
manages concurrency and buildbucket does not.

Bug: b/288464961
Change-Id: I1006553167759fd112224dcfb527ee0a5c6f5c25
Reviewed-on: https://pigweed-review.googlesource.com/c/infra/recipes/+/156790
Commit-Queue: Rob Mohr <mohrr@google.com>
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Oliver Newman <olivernewman@google.com>
diff --git a/recipes/rerunner.py b/recipes/rerunner.py
index bbc6eae..114adb3 100644
--- a/recipes/rerunner.py
+++ b/recipes/rerunner.py
@@ -31,12 +31,16 @@
 be bumped from the 10 most recent builds before long.
 """
 
+import collections
 import fnmatch
 from typing import Sequence, Tuple
 
 from PB.recipe_engine import result as result_pb2
 from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2
 from PB.go.chromium.org.luci.buildbucket.proto import project_config as bb_cfg
+from PB.go.chromium.org.luci.scheduler.appengine.messages import (
+    config as scheduler_pb2,
+)
 from PB.recipes.pigweed.rerunner import InputProperties
 from PB.recipe_engine import result
 from recipe_engine import post_process
@@ -46,6 +50,7 @@
     'fuchsia/luci_config',
     'recipe_engine/buildbucket',
     'recipe_engine/properties',
+    'recipe_engine/scheduler',
     'recipe_engine/step',
     'recipe_engine/time',
 ]
@@ -70,11 +75,11 @@
 
 
 def RunSteps(api, props):  # pylint: disable=invalid-name
-    cfg = api.luci_config.buildbucket()
+    bb_cfg = api.luci_config.buildbucket()
 
-    schedule_requests = []
+    builds_to_launch = []
 
-    for bucket in cfg.buckets:
+    for bucket in bb_cfg.buckets:
         if not include_bucket(props, bucket.name):
             api.step(
                 f'excluding {len(bucket.swarming.builders)} builders in '
@@ -116,22 +121,43 @@
 
                     api.step('scheduling', None)
 
-                    schedule_requests.append(
-                        api.buildbucket.schedule_request(
-                            bucket=bucket.name, builder=builder.name,
-                        )
-                    )
+                    builds_to_launch.append((bucket.name, builder.name))
 
-    if not schedule_requests:
+    if not builds_to_launch:
         api.step('nothing to launch', None)
         return result_pb2.RawResult(
             summary_markdown='nothing to launch', status=common_pb2.SUCCESS,
         )
 
+    sched_cfg = api.luci_config.scheduler()
+    jobs = collections.defaultdict(list)
+    for job in sched_cfg.job:
+        bucket_builder = (job.buildbucket.bucket, job.buildbucket.builder)
+        jobs[bucket_builder].append(job.id)
+
+    bb_requests = []
+    sched_ids = []
+
+    for bucket_builder in builds_to_launch:
+        if bucket_builder in jobs and len(jobs[bucket_builder]) == 1:
+            sched_ids.append(jobs[bucket_builder][0])
+
+        else:
+            bb_requests.append(
+                api.buildbucket.schedule_request(
+                    bucket=bucket_builder[0], builder=bucket_builder[1],
+                )
+            )
+
+    def scheduler_link(job_id):
+        project = api.buildbucket.build.builder.project
+        return f'https://luci-scheduler.appspot.com/jobs/{project}/{job_id}'
+
     if props.dry_run:
         with api.step.nest('dry-run, not launching builds'):
-            links = []
-            for req in schedule_requests:
+            links = [(x, scheduler_link(x)) for x in sched_ids]
+
+            for req in bb_requests:
                 bucket_builder = f'{req.builder.bucket}/{req.builder.builder}'
                 api.step(bucket_builder, None)
                 links.append(
@@ -152,13 +178,25 @@
         )
 
     with api.step.nest('launch') as pres:
-        builds = api.buildbucket.schedule(schedule_requests)
         links = []
-        for build in builds:
-            bucket_builder = f'{build.builder.bucket}/{build.builder.builder}'
-            link = api.buildbucket.build_url(build_id=build.id)
-            pres.links[bucket_builder] = link
-            links.append((bucket_builder, link))
+
+        if sched_ids:
+            api.scheduler.emit_trigger(
+                api.scheduler.BuildbucketTrigger(),
+                api.buildbucket.build.builder.project,
+                sched_ids,
+            )
+            links.extend((x, scheduler_link(x)) for x in sched_ids)
+
+        if bb_requests:
+            builds = api.buildbucket.schedule(bb_requests)
+            for build in builds:
+                bucket_builder = (
+                    f'{build.builder.bucket}/{build.builder.builder}'
+                )
+                link = api.buildbucket.build_url(build_id=build.id)
+                pres.links[bucket_builder] = link
+                links.append((bucket_builder, link))
 
         links_combined = ''.join(
             f'<br/>[{name}]({link})' for name, link in links
@@ -200,7 +238,9 @@
     def builder_config(name: str):
         return bb_cfg.BuilderConfig(name=name)
 
-    def mock_config(*buckets_builders: Sequence[Tuple[str, Sequence[str]]]):
+    def mock_buildbucket_config(
+        *buckets_builders: Sequence[Tuple[str, Sequence[str]]],
+    ):
         buckets: List[bb_cfg.Bucket] = []
         for bucket_name, builder_names in buckets_builders:
             builders: List[bb_cfg.BuilderConfig] = []
@@ -213,6 +253,24 @@
             data=buildbucket_config(buckets),
         )
 
+    def mock_scheduler_config(
+        *buckets_builders: Sequence[Tuple[str, Sequence[str]]],
+    ):
+        cfg = scheduler_pb2.ProjectConfig()
+        for bucket_name, builder_names in buckets_builders:
+            for builder_name in builder_names:
+                cfg.job.append(
+                    scheduler_pb2.Job(
+                        id=f'{bucket_name}-{builder_name}',
+                        buildbucket=scheduler_pb2.BuildbucketTask(
+                            bucket=bucket_name, builder=builder_name,
+                        ),
+                    )
+                )
+        return api.luci_config.mock_config(
+            project='pigweed', config_name='luci-scheduler.cfg', data=cfg,
+        )
+
     def excluding(bucket, num):
         return api.post_process(
             post_process.MustRun,
@@ -245,11 +303,16 @@
     def assert_scheduling(prefix):
         return api.post_process(post_process.MustRun, f'{prefix}.scheduling')
 
-    def assert_launched():
+    def assert_buildbucket_scheduled():
         return api.post_process(
             post_process.MustRun, f'launch.buildbucket.schedule',
         )
 
+    def assert_scheduler_triggered():
+        return api.post_process(
+            post_process.MustRun, 'launch.luci-scheduler.EmitTriggers',
+        )
+
     def assert_nothing_to_launch():
         return api.post_process(post_process.MustRun, f'nothing to launch',)
 
@@ -265,7 +328,7 @@
 
     yield (
         test('default-ci-only')
-        + mock_config(('try', ('foo', 'bar', 'baz')))
+        + mock_buildbucket_config(('try', ('foo', 'bar', 'baz')))
         + excluding('try', 3)
         + assert_nothing_to_launch()
         + drop_expectations_must_be_last()
@@ -274,7 +337,7 @@
     yield (
         test('ci-only')
         + properties(included_buckets=("*.ci", "ci"))
-        + mock_config(('try', ('foo', 'bar', 'baz')))
+        + mock_buildbucket_config(('try', ('foo', 'bar', 'baz')))
         + excluding('try', 3)
         + assert_nothing_to_launch()
         + drop_expectations_must_be_last()
@@ -285,7 +348,7 @@
         + properties(
             included_buckets=("*.ci", "ci"), excluded_buckets=("abc.*"),
         )
-        + mock_config(('abc.ci', ('foo', 'bar', 'baz')))
+        + mock_buildbucket_config(('abc.ci', ('foo', 'bar', 'baz')))
         + excluding('abc.ci', 3)
         + assert_nothing_to_launch()
         + drop_expectations_must_be_last()
@@ -293,7 +356,7 @@
 
     yield (
         test('scheduled')
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('scheduled', prefix='abc.ci.foo')
         + assert_skip_is_incomplete('abc.ci.foo')
@@ -303,7 +366,7 @@
 
     yield (
         test('running')
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('running', prefix='abc.ci.foo')
         + assert_skip_is_incomplete('abc.ci.foo')
@@ -313,7 +376,7 @@
 
     yield (
         test('passed')
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('passed', prefix='abc.ci.foo')
         + assert_skip_is_passing('abc.ci.foo')
@@ -323,7 +386,7 @@
 
     yield (
         test('no_recent_passes')
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('failure', 'failure', 'failure', prefix='abc.ci.foo')
         + assert_skip_no_recent_passes('abc.ci.foo')
@@ -333,21 +396,28 @@
 
     yield (
         test('recent_passes')
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo', 'bar')))
+        + mock_scheduler_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('failure', 'failure', 'passed', prefix='abc.ci.foo')
+        + build_status('failure', 'failure', 'passed', prefix='abc.ci.bar')
         + assert_scheduling('abc.ci.foo')
-        + assert_launched()
+        + assert_scheduling('abc.ci.bar')
+        + assert_buildbucket_scheduled()
+        + assert_scheduler_triggered()
         + drop_expectations_must_be_last()
     )
 
     yield (
         test('dry_run')
         + properties(dry_run=True)
-        + mock_config(('abc.ci', ('foo',)))
+        + mock_buildbucket_config(('abc.ci', ('foo', 'bar')))
+        + mock_scheduler_config(('abc.ci', ('foo',)))
         + including('abc.ci')
         + build_status('failure', 'failure', 'passed', prefix='abc.ci.foo')
+        + build_status('failure', 'failure', 'passed', prefix='abc.ci.bar')
         + assert_scheduling('abc.ci.foo')
+        + assert_scheduling('abc.ci.bar')
         + assert_dry_run()
         + drop_expectations_must_be_last()
     )