ui: Switch to a more "OO" implementation for `Dataset`

- Datasets objects now have `.optimize()`, `query()`, etc methods on
  the dataset objects themselves instead of having to use standalone
  functions. This negates the need for the namespace so this has been
  removed.
- Datasets are now defined using classes, so we must use `new
  XYZDataset()` to create datasets, rather than POJOs.
  I.e.: return new SourceDataset({...})

Change-Id: Ie0d8d4d9d1e06088d02b62c2d218d3da879c5bfe
diff --git a/ui/src/frontend/base_slice_track.ts b/ui/src/frontend/base_slice_track.ts
index 45cbc9e..0c18970 100644
--- a/ui/src/frontend/base_slice_track.ts
+++ b/ui/src/frontend/base_slice_track.ts
@@ -35,7 +35,7 @@
 import {TrackMouseEvent, TrackRenderContext} from '../public/track';
 import {Point2D, VerticalBounds} from '../base/geom';
 import {Trace} from '../public/trace';
-import {Ds} from '../trace_processor/dataset';
+import {SourceDataset, Dataset} from '../trace_processor/dataset';
 
 // The common class that underpins all tracks drawing slices.
 
@@ -974,15 +974,15 @@
     return {ts: Time.fromRaw(row.ts), dur: Duration.fromRaw(row.dur)};
   }
 
-  getDataset(): Ds.Dataset | undefined {
-    return {
+  getDataset(): Dataset | undefined {
+    return new SourceDataset({
       src: this.getSqlSource(),
       schema: {
         id: NUM,
         ts: LONG,
         dur: LONG,
       },
-    };
+    });
   }
 }
 
diff --git a/ui/src/frontend/named_slice_track.ts b/ui/src/frontend/named_slice_track.ts
index edb7ec9..7a23285 100644
--- a/ui/src/frontend/named_slice_track.ts
+++ b/ui/src/frontend/named_slice_track.ts
@@ -30,7 +30,7 @@
 import {renderDuration} from './widgets/duration';
 import {TraceImpl} from '../core/trace_impl';
 import {assertIsInstance} from '../base/logging';
-import {Ds} from '../trace_processor/dataset';
+import {SourceDataset, Dataset} from '../trace_processor/dataset';
 
 export const NAMED_ROW = {
   // Base columns (tsq, ts, dur, id, depth).
@@ -82,8 +82,8 @@
     return new ThreadSliceDetailsPanel(assertIsInstance(this.trace, TraceImpl));
   }
 
-  override getDataset(): Ds.Dataset | undefined {
-    return {
+  override getDataset(): Dataset | undefined {
+    return new SourceDataset({
       src: this.getSqlSource(),
       schema: {
         id: NUM,
@@ -91,6 +91,6 @@
         ts: LONG,
         dur: LONG,
       },
-    };
+    });
   }
 }
diff --git a/ui/src/plugins/dev.perfetto.AsyncSlices/async_slice_track.ts b/ui/src/plugins/dev.perfetto.AsyncSlices/async_slice_track.ts
index 4c8898b..0486e6e 100644
--- a/ui/src/plugins/dev.perfetto.AsyncSlices/async_slice_track.ts
+++ b/ui/src/plugins/dev.perfetto.AsyncSlices/async_slice_track.ts
@@ -14,12 +14,12 @@
 
 import {BigintMath as BIMath} from '../../base/bigint_math';
 import {clamp} from '../../base/math_utils';
-import {Ds} from '../../trace_processor/dataset';
 import {NAMED_ROW, NamedSliceTrack} from '../../frontend/named_slice_track';
 import {SLICE_LAYOUT_FIT_CONTENT_DEFAULTS} from '../../frontend/slice_layout';
 import {NewTrackArgs} from '../../frontend/track';
 import {TrackEventDetails} from '../../public/selection';
 import {Slice} from '../../public/track';
+import {SourceDataset, Dataset} from '../../trace_processor/dataset';
 import {
   LONG,
   LONG_NULL,
@@ -112,8 +112,8 @@
     };
   }
 
-  override getDataset(): Ds.Dataset {
-    return {
+  override getDataset(): Dataset {
+    return new SourceDataset({
       src: `slice`,
       filter: {
         col: 'track_id',
@@ -126,6 +126,6 @@
         dur: LONG,
         parent_id: NUM_NULL,
       },
-    };
+    });
   }
 }
diff --git a/ui/src/plugins/dev.perfetto.AsyncSlices/slice_selection_aggregator.ts b/ui/src/plugins/dev.perfetto.AsyncSlices/slice_selection_aggregator.ts
index 0364152..23226bc 100644
--- a/ui/src/plugins/dev.perfetto.AsyncSlices/slice_selection_aggregator.ts
+++ b/ui/src/plugins/dev.perfetto.AsyncSlices/slice_selection_aggregator.ts
@@ -16,7 +16,7 @@
 import {AreaSelection} from '../../public/selection';
 import {Engine} from '../../trace_processor/engine';
 import {AreaSelectionAggregator} from '../../public/selection';
-import {Ds} from '../../trace_processor/dataset';
+import {UnionDataset} from '../../trace_processor/dataset';
 import {LONG, NUM, STR} from '../../trace_processor/query_result';
 
 export class SliceSelectionAggregator implements AreaSelectionAggregator {
@@ -30,13 +30,13 @@
       dur: LONG,
     };
     const validDatasets = area.tracks
-      .map((t) => t.track.getDataset?.())
-      .filter((d) => d !== undefined)
-      .filter((d) => Ds.doesImplement(d, desiredSchema));
+      .map((track) => track.track.getDataset?.())
+      .filter((ds) => ds !== undefined)
+      .filter((ds) => ds.implements(desiredSchema));
     if (validDatasets.length === 0) {
       return false;
     }
-    const optimizedDataset = Ds.optimize({union: validDatasets});
+    const unionDataset = new UnionDataset(validDatasets);
     await engine.query(`
       create or replace perfetto table ${this.id} as
       select
@@ -44,7 +44,7 @@
         sum(dur) AS total_dur,
         sum(dur)/count() as avg_dur,
         count() as occurrences
-        from (${Ds.query(optimizedDataset)})
+        from (${unionDataset.optimize().query()})
       where
         ts + dur > ${area.start}
         and ts < ${area.end}
diff --git a/ui/src/plugins/dev.perfetto.Ftrace/ftrace_track.ts b/ui/src/plugins/dev.perfetto.Ftrace/ftrace_track.ts
index d056a8d..4ef7793 100644
--- a/ui/src/plugins/dev.perfetto.Ftrace/ftrace_track.ts
+++ b/ui/src/plugins/dev.perfetto.Ftrace/ftrace_track.ts
@@ -24,7 +24,7 @@
 import {FtraceFilter} from './common';
 import {Monitor} from '../../base/monitor';
 import {TrackRenderContext} from '../../public/track';
-import {Ds} from '../../trace_processor/dataset';
+import {SourceDataset, Dataset} from '../../trace_processor/dataset';
 
 const MARGIN = 2;
 const RECT_HEIGHT = 18;
@@ -57,8 +57,8 @@
     this.monitor = new Monitor([() => store.state]);
   }
 
-  getDataset(): Ds.Dataset {
-    return {
+  getDataset(): Dataset {
+    return new SourceDataset({
       // 'ftrace_event' doesn't have a dur column, but injecting dur=0 (all
       // ftrace events are effectively 'instant') allows us to participate in
       // generic slice aggregations
@@ -73,7 +73,7 @@
         col: 'cpu',
         eq: this.cpu,
       },
-    };
+    });
   }
 
   async onUpdate({
diff --git a/ui/src/public/track.ts b/ui/src/public/track.ts
index 93d493e..6d1b1dc 100644
--- a/ui/src/public/track.ts
+++ b/ui/src/public/track.ts
@@ -20,7 +20,7 @@
 import {ColorScheme} from './color_scheme';
 import {TrackEventDetailsPanel} from './details_panel';
 import {TrackEventDetails, TrackEventSelection} from './selection';
-import {Ds} from '../trace_processor/dataset';
+import {Dataset} from '../trace_processor/dataset';
 
 export interface TrackManager {
   /**
@@ -179,7 +179,7 @@
    * Optional: Returns a dataset that represents the events displayed on this
    * track.
    */
-  getDataset?(): Ds.Dataset | undefined;
+  getDataset?(): Dataset | undefined;
 
   /**
    * Optional: Get details of a track event given by eventId on this track.
diff --git a/ui/src/trace_processor/dataset.ts b/ui/src/trace_processor/dataset.ts
index 6863570..25c64cb 100644
--- a/ui/src/trace_processor/dataset.ts
+++ b/ui/src/trace_processor/dataset.ts
@@ -16,71 +16,43 @@
 import {getOrCreate} from '../base/utils';
 import {ColumnType, SqlValue} from './query_result';
 
-export namespace Ds {
-  export type Dataset = UnionDataset | SourceDataset;
-  export type Schema = Record<string, ColumnType>;
-
+/**
+ * A dataset defines a set of rows in TraceProcessor and a schema of the
+ * resultant columns. Dataset implementations describe how to get the data in
+ * different ways - e.g. 'source' datasets define a dataset as a table name (or
+ * select statement) + filters, whereas a 'union' dataset defines a dataset as
+ * the union of other datasets.
+ *
+ * The idea is that users can build arbitrarily complex trees of datasets, then
+ * at any point call `optimize()` to create the smallest possible tree that
+ * represents the same dataset, and `query()` which produces a select statement
+ * for the resultant dataset.
+ *
+ * Users can also use the `schema` property and `implements()` to get and test
+ * the schema of a given dataset.
+ */
+export interface Dataset {
   /**
-   * Defines a dataset with a source SQL select statement of table name, a
-   * schema describing the columns, and an optional filter.
+   * Get or calculate the resultant schema of this dataset.
    */
-  export interface SourceDataset {
-    readonly src: string;
-    readonly schema: Schema;
-    readonly filter?: EqFilter | InFilter;
-  }
+  readonly schema: DatasetSchema;
 
   /**
-   * A dataset that represents the union of multiple datasets.
-   */
-  export interface UnionDataset {
-    readonly union: ReadonlyArray<Dataset>;
-  }
-
-  /**
-   * Generic filter type.
-   */
-  export type Filter = EqFilter | InFilter;
-
-  /**
-   * A filter used to express that a column must equal a value.
-   */
-  export interface EqFilter {
-    readonly col: string;
-    readonly eq: SqlValue;
-  }
-
-  /**
-   * A filter used to express that column must be one of a set of values.
-   */
-  export interface InFilter {
-    readonly col: string;
-    readonly in: ReadonlyArray<SqlValue>;
-  }
-
-  /**
-   * Returns true if the dataset implements a given schema.
+   * Produce a query for this dataset.
    *
-   * Note: `implements` is a reserved keyword in TS so we can't call this
-   * function `implements`.
-   *
-   * @param dataset - The dataset to test.
-   * @param testSchema - The schema to test against.
+   * @param schema - The schema to use for extracting columns - if undefined,
+   * the most specific possible schema is evaluated from the dataset first and
+   * used instead.
    */
-  export function doesImplement(dataset: Dataset, testSchema: Schema): boolean {
-    const datasetSchema = schema(dataset);
-    return Object.entries(testSchema).every(([name, kind]) => {
-      return name in datasetSchema && datasetSchema[name] === kind;
-    });
-  }
+  query(schema?: DatasetSchema): string;
 
   /**
-   * This function optimizes a dataset into the smallest possible expression.
+   * Optimizes a dataset into the smallest possible expression.
    *
    * For example by combining elements of union data sets that have the same src
    * and similar filters into a single set.
    *
-   * For example, the following union data set...
+   * For example, the following 'union' dataset...
    *
    * ```
    * {
@@ -105,7 +77,7 @@
    * }
    * ```
    *
-   * ...will be combined into a single set...
+   * ...will be combined into a single 'source' dataset...
    *
    * ```
    * {
@@ -117,142 +89,202 @@
    *   filter: {col: 'a', in: [1, 2]},
    * },
    * ```
-   *
-   * @param dataset - The dataset to optimize.
    */
-  export function optimize(dataset: Dataset): Dataset {
-    if ('src' in dataset) {
-      // No optimization possible for individual datasets
-      return dataset;
-    } else if ('union' in dataset) {
-      // Recursively optimize each dataset of this union
-      const optimizedUnion = dataset.union.map(optimize);
-
-      // Find all source datasets and combine then based on src
-      const combinedSrcSets = new Map<string, SourceDataset[]>();
-      const otherDatasets: Dataset[] = [];
-      for (const e of optimizedUnion) {
-        if ('src' in e) {
-          const set = getOrCreate(combinedSrcSets, e.src, () => []);
-          set.push(e);
-        } else {
-          otherDatasets.push(e);
-        }
-      }
-
-      const mergedSrcSets = Array.from(combinedSrcSets.values()).map(
-        (srcGroup) => {
-          if (srcGroup.length === 1) return srcGroup[0];
-
-          // Combine schema across all members in the union
-          const combinedSchema = srcGroup.reduce((acc, e) => {
-            Object.assign(acc, e.schema);
-            return acc;
-          }, {} as Schema);
-
-          // Merge filters for the same src
-          const inFilters: InFilter[] = [];
-          for (const {filter} of srcGroup) {
-            if (filter) {
-              if ('eq' in filter) {
-                inFilters.push({col: filter.col, in: [filter.eq]});
-              } else {
-                inFilters.push(filter);
-              }
-            }
-          }
-
-          const mergedFilter = mergeFilters(inFilters);
-          return {
-            src: srcGroup[0].src,
-            schema: combinedSchema,
-            filter: mergedFilter,
-          };
-        },
-      );
-
-      const finalUnion = [...mergedSrcSets, ...otherDatasets];
-
-      if (finalUnion.length === 1) {
-        return finalUnion[0];
-      } else {
-        return {union: finalUnion};
-      }
-    } else {
-      assertUnreachable(dataset);
-    }
-  }
-
-  function mergeFilters(filters: InFilter[]): InFilter | undefined {
-    if (filters.length === 0) return undefined;
-    const col = filters[0].col;
-    const values = new Set(filters.flatMap((filter) => filter.in));
-    return {col, in: Array.from(values)};
-  }
+  optimize(): Dataset;
 
   /**
-   * Get the schema of an dataset.
+   * Returns true if this dataset implements a given schema.
    *
-   * @param dataset - The dataset to get the schema of.
+   * @param schema - The schema to test against.
    */
-  export function schema(dataset: Dataset): Schema {
-    if ('src' in dataset) {
-      return dataset.schema;
-    } else if ('union' in dataset) {
-      // Find the minimal set of columns that are supported by all datasets of
-      // the union
-      let sch: Record<string, ColumnType> | undefined = undefined;
-      dataset.union.forEach((e) => {
-        const eSchema = schema(e);
-        if (sch === undefined) {
-          // First time just use this one
-          sch = eSchema;
-        } else {
-          const newSch: Record<string, ColumnType> = {};
-          for (const [key, kind] of Object.entries(sch)) {
-            if (key in eSchema && eSchema[key] === kind) {
-              newSch[key] = kind;
-            }
-          }
-          sch = newSch;
-        }
-      });
-      return sch ?? {};
-    } else {
-      assertUnreachable(dataset);
-    }
+  implements(schema: DatasetSchema): boolean;
+}
+
+/**
+ * Defines a list of columns and types that define the shape of the data
+ * represented by a dataset.
+ */
+export type DatasetSchema = Record<string, ColumnType>;
+
+/**
+ * A filter used to express that a column must equal a value.
+ */
+interface EqFilter {
+  readonly col: string;
+  readonly eq: SqlValue;
+}
+
+/**
+ * A filter used to express that column must be one of a set of values.
+ */
+interface InFilter {
+  readonly col: string;
+  readonly in: ReadonlyArray<SqlValue>;
+}
+
+/**
+ * Union of all filter types.
+ */
+type Filter = EqFilter | InFilter;
+
+/**
+ * Named arguments for a SourceDataset.
+ */
+interface SourceDatasetConfig {
+  readonly src: string;
+  readonly schema: DatasetSchema;
+  readonly filter?: Filter;
+}
+
+/**
+ * Defines a dataset with a source SQL select statement of table name, a
+ * schema describing the columns, and an optional filter.
+ */
+export class SourceDataset implements Dataset {
+  readonly src: string;
+  readonly schema: DatasetSchema;
+  readonly filter?: Filter;
+
+  constructor(config: SourceDatasetConfig) {
+    this.src = config.src;
+    this.schema = config.schema;
+    this.filter = config.filter;
   }
 
-  /**
-   * Produce a query for this dataset.
-   *
-   * @param dataset - The dataset to get the query for.
-   * @param sch - The schema to use for extracting columns - if undefined, the
-   * most specific possible schema is evaluated from the dataset first and used
-   * instead.
-   */
-  export function query(dataset: Dataset, sch?: Schema): string {
-    function filterToQuery(filter: Filter) {
-      if ('eq' in filter) {
-        return `where ${filter.col} = ${filter.eq}`;
-      } else if ('in' in filter) {
-        return `where ${filter.col} in (${filter.in.join(',')})`;
+  query(schema?: DatasetSchema) {
+    schema = schema ?? this.schema;
+    const cols = Object.keys(schema);
+    const whereClause = this.filterToQuery();
+    return `select ${cols.join(', ')} from (${this.src}) ${whereClause}`.trim();
+  }
+
+  optimize() {
+    // Cannot optimize SourceDataset
+    return this;
+  }
+
+  implements(schema: DatasetSchema) {
+    return Object.entries(schema).every(([name, kind]) => {
+      return name in this.schema && this.schema[name] === kind;
+    });
+  }
+
+  private filterToQuery() {
+    const filter = this.filter;
+    if (filter === undefined) {
+      return '';
+    }
+    if ('eq' in filter) {
+      return `where ${filter.col} = ${filter.eq}`;
+    } else if ('in' in filter) {
+      return `where ${filter.col} in (${filter.in.join(',')})`;
+    } else {
+      assertUnreachable(filter);
+    }
+  }
+}
+
+/**
+ * A dataset that represents the union of multiple datasets.
+ */
+export class UnionDataset implements Dataset {
+  constructor(readonly union: ReadonlyArray<Dataset>) {}
+
+  get schema(): DatasetSchema {
+    // Find the minimal set of columns that are supported by all datasets of
+    // the union
+    let sch: Record<string, ColumnType> | undefined = undefined;
+    this.union.forEach((ds) => {
+      const dsSchema = ds.schema;
+      if (sch === undefined) {
+        // First time just use this one
+        sch = dsSchema;
       } else {
-        assertUnreachable(filter);
+        const newSch: Record<string, ColumnType> = {};
+        for (const [key, kind] of Object.entries(sch)) {
+          if (key in dsSchema && dsSchema[key] === kind) {
+            newSch[key] = kind;
+          }
+        }
+        sch = newSch;
+      }
+    });
+    return sch ?? {};
+  }
+
+  query(schema?: DatasetSchema): string {
+    schema = schema ?? this.schema;
+    return this.union
+      .map((dataset) => dataset.query(schema))
+      .join(' union all ');
+  }
+
+  optimize(): Dataset {
+    // Recursively optimize each dataset of this union
+    const optimizedUnion = this.union.map((ds) => ds.optimize());
+
+    // Find all source datasets and combine then based on src
+    const combinedSrcSets = new Map<string, SourceDataset[]>();
+    const otherDatasets: Dataset[] = [];
+    for (const e of optimizedUnion) {
+      if (e instanceof SourceDataset) {
+        const set = getOrCreate(combinedSrcSets, e.src, () => []);
+        set.push(e);
+      } else {
+        otherDatasets.push(e);
       }
     }
 
-    sch = sch ?? schema(dataset);
-    if ('src' in dataset) {
-      const whereClause = dataset.filter ? filterToQuery(dataset.filter) : '';
-      const cols = Object.keys(sch);
-      return `select ${cols.join(', ')} from (${dataset.src}) ${whereClause}`.trim();
-    } else if ('union' in dataset) {
-      return dataset.union
-        .map((dataset) => query(dataset, sch))
-        .join(' union all ');
+    const mergedSrcSets = Array.from(combinedSrcSets.values()).map(
+      (srcGroup) => {
+        if (srcGroup.length === 1) return srcGroup[0];
+
+        // Combine schema across all members in the union
+        const combinedSchema = srcGroup.reduce((acc, e) => {
+          Object.assign(acc, e.schema);
+          return acc;
+        }, {} as DatasetSchema);
+
+        // Merge filters for the same src
+        const inFilters: InFilter[] = [];
+        for (const {filter} of srcGroup) {
+          if (filter) {
+            if ('eq' in filter) {
+              inFilters.push({col: filter.col, in: [filter.eq]});
+            } else {
+              inFilters.push(filter);
+            }
+          }
+        }
+
+        const mergedFilter = mergeFilters(inFilters);
+        return new SourceDataset({
+          src: srcGroup[0].src,
+          schema: combinedSchema,
+          filter: mergedFilter,
+        });
+      },
+    );
+
+    const finalUnion = [...mergedSrcSets, ...otherDatasets];
+
+    if (finalUnion.length === 1) {
+      return finalUnion[0];
     } else {
-      assertUnreachable(dataset);
+      return new UnionDataset(finalUnion);
     }
   }
+
+  implements(schema: DatasetSchema) {
+    return Object.entries(schema).every(([name, kind]) => {
+      return name in this.schema && this.schema[name] === kind;
+    });
+  }
+}
+
+function mergeFilters(filters: InFilter[]): InFilter | undefined {
+  if (filters.length === 0) return undefined;
+  const col = filters[0].col;
+  const values = new Set(filters.flatMap((filter) => filter.in));
+  return {col, in: Array.from(values)};
 }
diff --git a/ui/src/trace_processor/dataset_unittest.ts b/ui/src/trace_processor/dataset_unittest.ts
index e354b54..2bd4e53 100644
--- a/ui/src/trace_processor/dataset_unittest.ts
+++ b/ui/src/trace_processor/dataset_unittest.ts
@@ -12,168 +12,158 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-import {Ds} from './dataset';
+import {SourceDataset, UnionDataset} from './dataset';
 import {LONG, NUM, STR} from './query_result';
 
 test('get query for simple dataset', () => {
-  const dataset: Ds.Dataset = {
+  const dataset = new SourceDataset({
     src: 'slice',
     schema: {id: NUM},
-  };
+  });
 
-  expect(Ds.query(dataset)).toEqual('select id from (slice)');
+  expect(dataset.query()).toEqual('select id from (slice)');
 });
 
 test("get query for simple dataset with 'eq' filter", () => {
-  const dataset: Ds.Dataset = {
+  const dataset = new SourceDataset({
     src: 'slice',
     schema: {id: NUM},
     filter: {
       col: 'id',
       eq: 123,
     },
-  };
+  });
 
-  expect(Ds.query(dataset)).toEqual('select id from (slice) where id = 123');
+  expect(dataset.query()).toEqual('select id from (slice) where id = 123');
 });
 
 test("get query for simple dataset with an 'in' filter", () => {
-  const dataset: Ds.Dataset = {
+  const dataset = new SourceDataset({
     src: 'slice',
     schema: {id: NUM},
     filter: {
       col: 'id',
       in: [123, 456],
     },
-  };
+  });
 
-  expect(Ds.query(dataset)).toEqual(
+  expect(dataset.query()).toEqual(
     'select id from (slice) where id in (123,456)',
   );
 });
 
 test('get query for union dataset', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {id: NUM},
-        filter: {
-          col: 'id',
-          eq: 123,
-        },
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {id: NUM},
+      filter: {
+        col: 'id',
+        eq: 123,
       },
-      {
-        src: 'slice',
-        schema: {id: NUM},
-        filter: {
-          col: 'id',
-          eq: 456,
-        },
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {id: NUM},
+      filter: {
+        col: 'id',
+        eq: 456,
       },
-    ],
-  };
+    }),
+  ]);
 
-  expect(Ds.query(dataset)).toEqual(
+  expect(dataset.query()).toEqual(
     'select id from (slice) where id = 123 union all select id from (slice) where id = 456',
   );
 });
 
 test('doesImplement', () => {
-  const dataset = {
+  const dataset = new SourceDataset({
     src: 'slice',
     schema: {id: NUM, ts: LONG},
-  };
+  });
 
-  expect(Ds.doesImplement(dataset, {id: NUM})).toBe(true);
-  expect(Ds.doesImplement(dataset, {id: NUM, ts: LONG})).toBe(true);
-  expect(Ds.doesImplement(dataset, {id: NUM, ts: LONG, name: STR})).toBe(false);
-  expect(Ds.doesImplement(dataset, {id: LONG})).toBe(false);
+  expect(dataset.implements({id: NUM})).toBe(true);
+  expect(dataset.implements({id: NUM, ts: LONG})).toBe(true);
+  expect(dataset.implements({id: NUM, ts: LONG, name: STR})).toBe(false);
+  expect(dataset.implements({id: LONG})).toBe(false);
 });
 
 test('find the schema of a simple dataset', () => {
-  const dataset: Ds.Dataset = {
+  const dataset = new SourceDataset({
     src: 'slice',
     schema: {id: NUM, ts: LONG},
-  };
+  });
 
-  expect(Ds.schema(dataset)).toMatchObject({id: NUM, ts: LONG});
+  expect(dataset.schema).toMatchObject({id: NUM, ts: LONG});
 });
 
 test('find the schema of a union where source sets differ in their names', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {foo: NUM},
-      },
-      {
-        src: 'slice',
-        schema: {bar: NUM},
-      },
-    ],
-  };
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: NUM},
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {bar: NUM},
+    }),
+  ]);
 
-  expect(Ds.schema(dataset)).toMatchObject({});
+  expect(dataset.schema).toMatchObject({});
 });
 
 test('find the schema of a union with differing source sets', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {foo: NUM},
-      },
-      {
-        src: 'slice',
-        schema: {foo: LONG},
-      },
-    ],
-  };
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: NUM},
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: LONG},
+    }),
+  ]);
 
-  expect(Ds.schema(dataset)).toMatchObject({});
+  expect(dataset.schema).toMatchObject({});
 });
 
 test('find the schema of a union with one column in common', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {foo: NUM, bar: NUM},
-      },
-      {
-        src: 'slice',
-        schema: {foo: NUM, baz: NUM},
-      },
-    ],
-  };
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: NUM, bar: NUM},
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: NUM, baz: NUM},
+    }),
+  ]);
 
-  expect(Ds.schema(dataset)).toMatchObject({foo: NUM});
+  expect(dataset.schema).toMatchObject({foo: NUM});
 });
 
 test('optimize a union dataset', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {},
-        filter: {
-          col: 'track_id',
-          eq: 123,
-        },
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {},
+      filter: {
+        col: 'track_id',
+        eq: 123,
       },
-      {
-        src: 'slice',
-        schema: {},
-        filter: {
-          col: 'track_id',
-          eq: 456,
-        },
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {},
+      filter: {
+        col: 'track_id',
+        eq: 456,
       },
-    ],
-  };
+    }),
+  ]);
 
-  expect(Ds.optimize(dataset)).toEqual({
+  expect(dataset.optimize()).toEqual({
     src: 'slice',
     schema: {},
     filter: {
@@ -184,28 +174,26 @@
 });
 
 test('optimize a union dataset with different types of filters', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {},
-        filter: {
-          col: 'track_id',
-          eq: 123,
-        },
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {},
+      filter: {
+        col: 'track_id',
+        eq: 123,
       },
-      {
-        src: 'slice',
-        schema: {},
-        filter: {
-          col: 'track_id',
-          in: [456, 789],
-        },
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {},
+      filter: {
+        col: 'track_id',
+        in: [456, 789],
       },
-    ],
-  };
+    }),
+  ]);
 
-  expect(Ds.optimize(dataset)).toEqual({
+  expect(dataset.optimize()).toEqual({
     src: 'slice',
     schema: {},
     filter: {
@@ -216,20 +204,18 @@
 });
 
 test('optimize a union dataset with different schemas', () => {
-  const dataset: Ds.Dataset = {
-    union: [
-      {
-        src: 'slice',
-        schema: {foo: NUM},
-      },
-      {
-        src: 'slice',
-        schema: {bar: NUM},
-      },
-    ],
-  };
+  const dataset = new UnionDataset([
+    new SourceDataset({
+      src: 'slice',
+      schema: {foo: NUM},
+    }),
+    new SourceDataset({
+      src: 'slice',
+      schema: {bar: NUM},
+    }),
+  ]);
 
-  expect(Ds.optimize(dataset)).toEqual({
+  expect(dataset.optimize()).toEqual({
     src: 'slice',
     // The resultant schema is the combination of the union's member's schemas,
     // as we know the source is the same as we know we can get all of the 'seen'