pw_web_ui: Improve error handling of WebSerialTransport

Don't throw an error if a user tries to reconnect to a port. Pass errors
in an 'errors' observable that consumers can subscribe to. Don't put the
'chunks' observable into an errored state, since that prevents it from
ever sending anything else again, and we often want to continue reading
despite an error.

Change-Id: I114240e1c3c0556fd88b7d4ec1b40fec34a8ccb3
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/17167
Reviewed-by: Max Koopman <koopman@google.com>
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Reviewed-by: Matthew Soulanille <msoulanille@google.com>
Commit-Queue: Matthew Soulanille <msoulanille@google.com>
diff --git a/pw_web_ui/src/transport/web_serial_transport.ts b/pw_web_ui/src/transport/web_serial_transport.ts
index 5150eb7..fca0a71 100644
--- a/pw_web_ui/src/transport/web_serial_transport.ts
+++ b/pw_web_ui/src/transport/web_serial_transport.ts
@@ -13,7 +13,7 @@
 // the License.
 
 /* eslint-env browser */
-import {BehaviorSubject, Subject} from 'rxjs';
+import {BehaviorSubject, Subject, Subscription, Observable} from 'rxjs';
 import DeviceTransport from './device_transport';
 
 const DEFAULT_SERIAL_OPTIONS: SerialOptions = {
@@ -23,14 +23,36 @@
   stopbits: 1,
 };
 
+interface PortReadConnection {
+  chunks: Observable<Uint8Array>;
+  errors: Observable<Error>;
+}
+
+interface PortConnection extends PortReadConnection {
+  sendChunk: (chunk: Uint8Array) => Promise<void>;
+}
+
+export class DeviceLostError extends Error {
+  message = 'The device has been lost';
+}
+
+export class DeviceLockedError extends Error {
+  message =
+    "The device's port is locked. Try unplugging it" +
+    ' and plugging it back in.';
+}
+
 /**
  * WebSerialTransport sends and receives UInt8Arrays to and
  * from a serial device connected over USB.
  */
 export class WebSerialTransport implements DeviceTransport {
   chunks = new Subject<Uint8Array>();
+  errors = new Subject<Error>();
   connected = new BehaviorSubject<boolean>(false);
-  private writer?: WritableStreamDefaultWriter<Uint8Array>;
+  private portConnections: Map<SerialPort, PortConnection> = new Map();
+  private activePortConnectionConnection: PortConnection | undefined;
+  private rxSubscriptions: Subscription[] = [];
 
   constructor(
     private serial: Serial = navigator.serial,
@@ -43,9 +65,8 @@
    * @param {Uint8Array} chunk The chunk to send
    */
   async sendChunk(chunk: Uint8Array): Promise<void> {
-    if (this.writer !== undefined && this.connected.getValue()) {
-      await this.writer.ready;
-      return this.writer.write(chunk);
+    if (this.activePortConnectionConnection) {
+      return this.activePortConnectionConnection.sendChunk(chunk);
     }
     throw new Error('Device not connected');
   }
@@ -56,41 +77,112 @@
    * be called in response to user interaction.
    */
   async connect(): Promise<void> {
-    let port: SerialPort;
-    try {
-      port = await this.serial.requestPort({filters: this.filters});
-    } catch (e) {
-      // Ignore errors where the user did not select a port.
-      if (!(e instanceof DOMException)) {
-        throw e;
-      }
-      return;
-    }
-
-    await port.open(this.serialOptions);
-    this.writer = port.writable.getWriter();
-
-    this.getChunks(port);
+    const port = await this.serial.requestPort({filters: this.filters});
+    await this.connectPort(port);
   }
 
-  private getChunks(port: SerialPort) {
-    port.readable.pipeTo(
-      new WritableStream({
-        write: chunk => {
+  private disconnect() {
+    for (const subscription of this.rxSubscriptions) {
+      subscription.unsubscribe();
+    }
+    this.rxSubscriptions = [];
+
+    this.activePortConnectionConnection = undefined;
+    this.connected.next(false);
+  }
+
+  private async connectPort(port: SerialPort): Promise<void> {
+    this.disconnect();
+
+    this.activePortConnectionConnection =
+      this.portConnections.get(port) ?? (await this.conectNewPort(port));
+
+    this.connected.next(true);
+
+    this.rxSubscriptions.push(
+      this.activePortConnectionConnection.chunks.subscribe(
+        chunk => {
           this.chunks.next(chunk);
         },
-        close: () => {
-          port.close();
-          this.writer?.releaseLock();
-          this.connected.next(false);
+        err => {
+          throw new Error(`Chunks observable had an unexpeted error ${err}`);
         },
-        abort: () => {
-          // Reconnect to the port
+        () => {
           this.connected.next(false);
-          this.getChunks(port);
-        },
+          this.portConnections.delete(port);
+          // Don't complete the chunks observable because then it would not
+          // be able to forward any future chunks.
+        }
+      )
+    );
+
+    this.rxSubscriptions.push(
+      this.activePortConnectionConnection.errors.subscribe(error => {
+        this.errors.next(error);
+        if (error instanceof DeviceLostError) {
+          // The device has been lost
+          this.connected.next(false);
+        }
       })
     );
-    this.connected.next(true);
+  }
+
+  private async conectNewPort(port: SerialPort): Promise<PortConnection> {
+    await port.open(this.serialOptions);
+    const writer = port.writable.getWriter();
+
+    async function sendChunk(chunk: Uint8Array) {
+      await writer.ready;
+      await writer.write(chunk);
+    }
+
+    const {chunks, errors} = this.getChunks(port);
+
+    const connection: PortConnection = {sendChunk, chunks, errors};
+    this.portConnections.set(port, connection);
+    return connection;
+  }
+
+  private getChunks(port: SerialPort): PortReadConnection {
+    const chunks = new Subject<Uint8Array>();
+    const errors = new Subject<Error>();
+
+    async function read() {
+      if (!port.readable) {
+        throw new DeviceLostError();
+      }
+      if (port.readable.locked) {
+        throw new DeviceLockedError();
+      }
+      await port.readable.pipeTo(
+        new WritableStream({
+          write: chunk => {
+            chunks.next(chunk);
+          },
+          close: () => {
+            chunks.complete();
+            errors.complete();
+          },
+          abort: () => {
+            // Reconnect to the port.
+            connect();
+          },
+        })
+      );
+    }
+
+    function connect() {
+      read().catch(err => {
+        // Don't error the chunks observable since that stops it from
+        // reading any more packets, and we often want to continue
+        // despite an error. Instead, push errors to the 'errors'
+        // observable.
+        errors.next(err);
+      });
+    }
+
+    connect();
+
+    return {chunks, errors};
   }
 }
diff --git a/pw_web_ui/src/transport/web_serial_transport_test.ts b/pw_web_ui/src/transport/web_serial_transport_test.ts
index a194015..ee57058 100644
--- a/pw_web_ui/src/transport/web_serial_transport_test.ts
+++ b/pw_web_ui/src/transport/web_serial_transport_test.ts
@@ -15,7 +15,7 @@
 /* eslint-env browser, jasmine */
 import {last, take} from 'rxjs/operators';
 import {SerialMock} from './serial_mock';
-import {WebSerialTransport} from './web_serial_transport';
+import {WebSerialTransport, DeviceLockedError} from './web_serial_transport';
 
 describe('WebSerialTransport', () => {
   let serialMock: SerialMock;
@@ -48,13 +48,15 @@
     expect(serialMock.serialPort.writable.locked).toBeTrue();
   });
 
-  it('stops reading when it reaches the final chunk', async () => {
+  it('is disconnected when it reaches the final chunk', async () => {
     const transport = new WebSerialTransport(serialMock as Serial);
     await transport.connect();
-    const closePromise = transport.connected.pipe(take(2), last()).toPromise();
+    const disconnectPromise = transport.connected
+      .pipe(take(2), last())
+      .toPromise();
     serialMock.closeFromDevice();
 
-    expect(await closePromise).toBeFalse();
+    expect(await disconnectPromise).toBeFalse();
   });
 
   it('waits for the writer to be ready', async () => {
@@ -85,4 +87,21 @@
     await transport.sendChunk(data);
     expect(await dataToDevice).toEqual(data);
   });
+
+  it('throws an error on failing to connect', async () => {
+    const connectError = new Error('Example connection error');
+    spyOn(serialMock, 'requestPort').and.throwError(connectError);
+    const transport = new WebSerialTransport(serialMock as Serial);
+    await expectAsync(transport.connect()).toBeRejectedWith(connectError);
+  });
+
+  it("emits connection errors in the 'errors' observable", async () => {
+    const transport = new WebSerialTransport(serialMock as Serial);
+    await transport.connect();
+
+    const reportedErrorPromise = transport.errors.pipe(take(1)).toPromise();
+    serialMock.serialPort.errorFromDevice(new Error());
+
+    expect(await reportedErrorPromise).toEqual(new DeviceLockedError());
+  });
 });