pw_web: Add Device helper for calling ServerStreamingMethodStub RPCs

This makes listening to RPC logs easier:
```
device.rpcs.pw.log.Logs.Listen((msg) => {
  ...
});
```

Change-Id: I1b8f23c5a0bbc26b8896fad7edc77d92eba5e029
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/109010
Reviewed-by: Anthony DiGirolamo <tonymd@google.com>
Commit-Queue: Asad Memon <asadmemon@google.com>
diff --git a/pw_web/webconsole/common/logService.ts b/pw_web/webconsole/common/logService.ts
index 2d49380..106520c 100644
--- a/pw_web/webconsole/common/logService.ts
+++ b/pw_web/webconsole/common/logService.ts
@@ -12,29 +12,14 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
-import {Device, pw_rpc} from "pigweedjs";
-type Client = pw_rpc.Client;
-
-function createDefaultRPCLogService(client: Client) {
-  const logService = client.channel()!
-    .methodStub('pw.log.Logs.Listen');
-
-  return logService;
-}
+import {Device} from "pigweedjs";
 
 export async function listenToDefaultLogService(
   device: Device,
   onFrame: (frame: Uint8Array) => void) {
-  const client = device.client;
-  // @ts-ignore
-  const logService: pw_rpc.ServerStreamingMethodStub = (createDefaultRPCLogService(client))!;
-  const request = new logService.method.responseType();
-  // @ts-ignore
-  const call = logService.invoke(request, (msg) => {
-    // @ts-ignore
-    msg.getEntriesList().forEach(entry => onFrame(entry.getMessage()));
-  });
-
+  const call = device.rpcs.pw.log.Logs.Listen((msg: any) => {
+    msg.getEntriesList().forEach((entry: any) => onFrame(entry.getMessage()));
+  })
   return () => {
     call.cancel();
   };
diff --git a/ts/device/index.ts b/ts/device/index.ts
index cb68c7f..3250070 100644
--- a/ts/device/index.ts
+++ b/ts/device/index.ts
@@ -14,10 +14,20 @@
 
 import objectPath from 'object-path';
 import {Decoder, Encoder} from 'pigweedjs/pw_hdlc';
-import {Client, Channel, ServiceClient, UnaryMethodStub, MethodStub} from 'pigweedjs/pw_rpc';
+import {
+  Client,
+  Channel,
+  ServiceClient,
+  UnaryMethodStub,
+  MethodStub,
+  ServerStreamingMethodStub
+} from 'pigweedjs/pw_rpc';
 import {WebSerialTransport} from '../transport/web_serial_transport';
 import {ProtoCollection} from 'pigweedjs/pw_protobuf_compiler';
 
+function protoFieldToMethodName(string) {
+  return string.split("_").map(titleCase).join("");
+}
 function titleCase(string) {
   return string.charAt(0).toUpperCase() + string.slice(1);
 }
@@ -85,7 +95,9 @@
     let methodMap = {};
     let methodKeys = Array.from(service.methodsByName.keys());
     methodKeys
-      .filter((method: any) => service.methodsByName.get(method) instanceof UnaryMethodStub)
+      .filter((method: any) =>
+        service.methodsByName.get(method) instanceof UnaryMethodStub
+        || service.methodsByName.get(method) instanceof ServerStreamingMethodStub)
       .forEach(key => {
         let fn = this.createMethodWrapper(
           service.methodsByName.get(key),
@@ -97,9 +109,32 @@
     return methodMap;
   }
 
-  private createMethodWrapper(realMethod: MethodStub, methodName: string, fullMethodPath: string) {
-    const requestType = realMethod.method.descriptor.getInputType().replace(/^\./, '');
-    const requestProtoDescriptor = this.protoCollection.getDescriptorProto(requestType);
+  private createMethodWrapper(
+    realMethod: MethodStub,
+    methodName: string,
+    fullMethodPath: string) {
+    if (realMethod instanceof UnaryMethodStub) {
+      return this.createUnaryMethodWrapper(
+        realMethod,
+        methodName,
+        fullMethodPath);
+    }
+    else if (realMethod instanceof ServerStreamingMethodStub) {
+      return this.createServerStreamingMethodWrapper(
+        realMethod,
+        methodName,
+        fullMethodPath);
+    }
+  }
+
+  private createUnaryMethodWrapper(
+    realMethod: UnaryMethodStub,
+    methodName: string,
+    fullMethodPath: string) {
+    const requestType =
+      realMethod.method.descriptor.getInputType().replace(/^\./, '');
+    const requestProtoDescriptor =
+      this.protoCollection.getDescriptorProto(requestType);
     const requestFields = requestProtoDescriptor.getFieldList();
     const functionArguments = requestFields
       .map(field => field.getName())
@@ -116,12 +151,46 @@
     let fn = new Function(...functionArguments).bind((args) => {
       const request = new realMethod.method.requestType();
       requestFields.forEach((field, index) => {
-        console.log("setting", `set${titleCase(field.getName())}`, args[index]);
         request[`set${titleCase(field.getName())}`](args[index]);
       })
-      if (realMethod instanceof UnaryMethodStub) {
-        return realMethod.call(request);
-      }
+      return realMethod.call(request);
+    });
+    return fn;
+  }
+
+  private createServerStreamingMethodWrapper(
+    realMethod: ServerStreamingMethodStub,
+    methodName: string,
+    fullMethodPath: string) {
+    const requestType = realMethod.method.descriptor.getInputType().replace(/^\./, '');
+    const requestProtoDescriptor =
+      this.protoCollection.getDescriptorProto(requestType);
+    const requestFields = requestProtoDescriptor.getFieldList();
+    const functionArguments = requestFields
+      .map(field => field.getName())
+      .concat(
+        [
+          'onNext',
+          'onComplete',
+          'onError',
+          'return this(arguments);'
+        ]
+      );
+
+    // We store field names so REPL can show hints in autocomplete using these.
+    this.nameToMethodArgumentsMap[fullMethodPath] = requestFields
+      .map(field => field.getName());
+
+    // We create a new JS function dynamically here that takes
+    // proto message fields as arguments and calls the actual RPC method.
+    let fn = new Function(...functionArguments).bind((args) => {
+      const request = new realMethod.method.requestType();
+      requestFields.forEach((field, index) => {
+        request[`set${protoFieldToMethodName(field.getName())}`](args[index]);
+      })
+      const callbacks = Array.from(args).slice(requestFields.length);
+      // @ts-ignore
+      return realMethod.invoke(request, callbacks[0], callbacks[1], callbacks[2]);
     });
     return fn;
   }
diff --git a/ts/device/index_test.ts b/ts/device/index_test.ts
index 1218dbb..3a383bf 100644
--- a/ts/device/index_test.ts
+++ b/ts/device/index_test.ts
@@ -18,10 +18,60 @@
 import {ProtoCollection} from 'pigweedjs/protos/collection';
 import {WebSerialTransport} from '../transport/web_serial_transport';
 import {Serial} from 'pigweedjs/types/serial';
+import {Message} from 'google-protobuf';
+import {RpcPacket, PacketType} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
+import {Method, ServerStreamingMethodStub} from 'pigweedjs/pw_rpc';
+import {Status} from 'pigweedjs/pw_status';
+import {
+  Response,
+} from 'pigweedjs/protos/pw_rpc/ts/test_pb';
 
 describe('WebSerialTransport', () => {
   let device: Device;
   let serialMock: SerialMock;
+
+  function newResponse(payload = '._.'): Message {
+    const response = new Response();
+    response.setPayload(payload);
+    return response;
+  }
+
+  function generateResponsePacket(
+    channelId: number,
+    method: Method,
+    status: Status,
+    response?: Message
+  ) {
+    const packet = new RpcPacket();
+    packet.setType(PacketType.RESPONSE);
+    packet.setChannelId(channelId);
+    packet.setServiceId(method.service.id);
+    packet.setMethodId(method.id);
+    packet.setStatus(status);
+    if (response === undefined) {
+      packet.setPayload(new Uint8Array());
+    } else {
+      packet.setPayload(response.serializeBinary());
+    }
+    return packet.serializeBinary();
+  }
+
+  function generateStreamingPacket(
+    channelId: number,
+    method: Method,
+    response: Message,
+    status: Status = Status.OK
+  ) {
+    const packet = new RpcPacket();
+    packet.setType(PacketType.SERVER_STREAM);
+    packet.setChannelId(channelId);
+    packet.setServiceId(method.service.id);
+    packet.setMethodId(method.id);
+    packet.setPayload(response.serializeBinary());
+    packet.setStatus(status);
+    return packet.serializeBinary();
+  }
+
   beforeEach(() => {
     serialMock = new SerialMock();
     device = new Device(new ProtoCollection(), new WebSerialTransport(serialMock as Serial));
@@ -45,10 +95,32 @@
       71, 139, 109, 127, 108, 165, 126]);
 
     await device.connect();
-    console.log(device.rpcs.pw.rpc.EchoService.Echo);
     serialMock.dataFromDevice(helloResponse);
     const [status, response] = await device.rpcs.pw.rpc.EchoService.Echo("hello");
     expect(response.getMsg()).toBe("hello");
     expect(status).toBe(0);
   });
+
+  it('server streaming rpc sends response', async () => {
+    await device.connect();
+    const response1 = newResponse('!!!');
+    const response2 = newResponse('?');
+    const serverStreaming = device.client
+      .channel()
+      ?.methodStub(
+        'pw.rpc.test1.TheTestService.SomeServerStreaming'
+      )! as ServerStreamingMethodStub;
+    const onNext = jest.fn();
+    const onCompleted = jest.fn();
+    const onError = jest.fn();
+
+    device.rpcs.pw.rpc.test1.TheTestService.SomeServerStreaming(4, onNext, onCompleted, onError);
+    device.client.processPacket(generateStreamingPacket(1, serverStreaming.method, response1));
+    device.client.processPacket(generateStreamingPacket(1, serverStreaming.method, response2));
+    device.client.processPacket(generateResponsePacket(1, serverStreaming.method, Status.ABORTED));
+
+    expect(onNext).toBeCalledWith(response1);
+    expect(onNext).toBeCalledWith(response2);
+    expect(onCompleted).toBeCalledWith(Status.ABORTED);
+  });
 });