blob: 010b8eaea4bbfb93058ad6d9f1bde8f4528223a4 [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_rpc;
import com.google.protobuf.MessageLite;
import dev.pigweed.pw_rpc.StreamObserverCall.StreamResponseFuture;
import dev.pigweed.pw_rpc.StreamObserverCall.UnaryResponseFuture;
import java.util.function.Consumer;
/**
* Represents a method ready to be invoked on a particular RPC channel.
*
* <p>The Client has the concrete MethodClient as a type parameter. This allows implementations to
* fully define the interface and semantics for RPC calls.
*/
public class MethodClient {
protected final StreamObserver<? extends MessageLite> defaultObserver;
private final RpcManager rpcs;
private final PendingRpc rpc;
MethodClient(RpcManager rpcs, PendingRpc rpc, StreamObserver<MessageLite> defaultObserver) {
this.rpcs = rpcs;
this.rpc = rpc;
this.defaultObserver = defaultObserver;
}
public final Method method() {
return rpc.method();
}
/** Gives implementations access to the RpcManager shared with the Client. */
protected final RpcManager rpcs() {
return rpcs;
}
/** Gives implementations access to the PendingRpc this MethodClient represents. */
protected final PendingRpc rpc() {
return rpc;
}
/** Invokes a unary RPC. Uses the default StreamObserver for RPC events. */
public Call invokeUnary(MessageLite request) throws ChannelOutputException {
return invokeUnary(request, defaultObserver());
}
/** Invokes a unary RPC. Uses the provided StreamObserver for RPC events. */
public Call invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer)
throws ChannelOutputException {
checkCallType(Method.Type.UNARY);
return StreamObserverCall.start(rpcs(), rpc(), observer, request);
}
/** Invokes a unary RPC with a future that collects the response. */
public <ResponseT extends MessageLite> Call.UnaryFuture<ResponseT> invokeUnaryFuture(
MessageLite request) {
checkCallType(Method.Type.UNARY);
return new UnaryResponseFuture<>(rpcs(), rpc(), request);
}
/**
* Starts a unary RPC, ignoring any errors that occur when opening. This can be used to start
* listening to responses to an RPC before the RPC server is available.
*
* <p>The RPC remains open until it is completed by the server with a response or error packet or
* cancelled.
*/
public Call openUnary(MessageLite request, StreamObserver<? extends MessageLite> observer) {
checkCallType(Method.Type.UNARY);
return StreamObserverCall.open(rpcs(), rpc(), observer, request);
}
/** Invokes a server streaming RPC. Uses the default StreamObserver for RPC events. */
public Call invokeServerStreaming(MessageLite request) throws ChannelOutputException {
return invokeServerStreaming(request, defaultObserver());
}
/** Invokes a server streaming RPC. Uses the provided StreamObserver for RPC events. */
public Call invokeServerStreaming(MessageLite request,
StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
checkCallType(Method.Type.SERVER_STREAMING);
return StreamObserverCall.start(rpcs(), rpc(), observer, request);
}
/** Invokes a server streaming RPC with a future that collects the responses. */
public Call.ServerStreamingFuture invokeServerStreamingFuture(
MessageLite request, Consumer<? extends MessageLite> onNext) {
checkCallType(Method.Type.SERVER_STREAMING);
return new StreamResponseFuture<>(rpcs(), rpc(), onNext, request);
}
/**
* Starts a server streaming RPC, ignoring any errors that occur when opening. This can be used to
* start listening to responses to an RPC before the RPC server is available.
*
* <p>The RPC remains open until it is completed by the server with a response or error packet or
* cancelled.
*/
public Call openServerStreaming(
MessageLite request, StreamObserver<? extends MessageLite> observer) {
checkCallType(Method.Type.SERVER_STREAMING);
return StreamObserverCall.open(rpcs(), rpc(), observer, request);
}
/** Invokes a client streaming RPC. Uses the default StreamObserver for RPC events. */
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming()
throws ChannelOutputException {
return invokeClientStreaming(defaultObserver());
}
/** Invokes a client streaming RPC. Uses the provided StreamObserver for RPC events. */
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming(
StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
checkCallType(Method.Type.CLIENT_STREAMING);
return StreamObserverCall.start(rpcs(), rpc(), observer, null);
}
/** Invokes a client streaming RPC with a future that collects the response. */
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT>
invokeClientStreamingFuture() {
checkCallType(Method.Type.CLIENT_STREAMING);
return new UnaryResponseFuture<>(rpcs(), rpc(), null);
}
/**
* Starts a client streaming RPC, ignoring any errors that occur when opening. This can be used to
* start listening to responses to an RPC before the RPC server is available.
*
* <p>The RPC remains open until it is completed by the server with a response or error packet or
* cancelled.
*/
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openClientStreaming(
StreamObserver<? extends MessageLite> observer) {
checkCallType(Method.Type.CLIENT_STREAMING);
return StreamObserverCall.open(rpcs(), rpc(), observer, null);
}
/** Invokes a bidirectional streaming RPC. Uses the default StreamObserver for RPC events. */
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT>
invokeBidirectionalStreaming() throws ChannelOutputException {
return invokeBidirectionalStreaming(defaultObserver());
}
/** Invokes a bidirectional streaming RPC. Uses the provided StreamObserver for RPC events. */
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeBidirectionalStreaming(
StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
return StreamObserverCall.start(rpcs(), rpc(), observer, null);
}
/** Invokes a bidirectional streaming RPC with a future that finishes when the RPC finishes. */
public <RequestT extends MessageLite, ResponseT extends MessageLite>
Call.BidirectionalStreamingFuture<RequestT> invokeBidirectionalStreamingFuture(
Consumer<ResponseT> onNext) {
checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
return new StreamResponseFuture<>(rpcs(), rpc(), onNext, null);
}
/**
* Starts a bidirectional streaming RPC, ignoring any errors that occur when opening. This can be
* used to start listening to responses to an RPC before the RPC server is available.
*
* <p>The RPC remains open until it is completed by the server with a response or error packet or
* cancelled.
*/
public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openBidirectionalStreaming(
StreamObserver<? extends MessageLite> observer) {
checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
return StreamObserverCall.open(rpcs(), rpc(), observer, null);
}
@SuppressWarnings("unchecked")
private <ResponseT extends MessageLite> StreamObserver<ResponseT> defaultObserver() {
return (StreamObserver<ResponseT>) defaultObserver;
}
private void checkCallType(Method.Type expected) {
if (!rpc().method().type().equals(expected)) {
throw new UnsupportedOperationException(String.format(
"%s is a %s method, but it was invoked as a %s method. RPCs must be invoked by the"
+ " appropriate invoke function.",
method().fullName(),
method().type().sentenceName(),
expected.sentenceName()));
}
}
}