| // Copyright 2021 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| package dev.pigweed.pw_rpc; |
| |
| import com.google.protobuf.MessageLite; |
| import dev.pigweed.pw_rpc.StreamObserverCall.StreamResponseFuture; |
| import dev.pigweed.pw_rpc.StreamObserverCall.UnaryResponseFuture; |
| import java.util.function.Consumer; |
| |
| /** |
| * Represents a method ready to be invoked on a particular RPC channel. |
| * |
| * <p>The Client has the concrete MethodClient as a type parameter. This allows implementations to |
| * fully define the interface and semantics for RPC calls. |
| */ |
| public class MethodClient { |
| protected final StreamObserver<? extends MessageLite> defaultObserver; |
| private final RpcManager rpcs; |
| private final PendingRpc rpc; |
| |
| MethodClient(RpcManager rpcs, PendingRpc rpc, StreamObserver<MessageLite> defaultObserver) { |
| this.rpcs = rpcs; |
| this.rpc = rpc; |
| this.defaultObserver = defaultObserver; |
| } |
| |
| public final Method method() { |
| return rpc.method(); |
| } |
| |
| /** Gives implementations access to the RpcManager shared with the Client. */ |
| protected final RpcManager rpcs() { |
| return rpcs; |
| } |
| |
| /** Gives implementations access to the PendingRpc this MethodClient represents. */ |
| protected final PendingRpc rpc() { |
| return rpc; |
| } |
| |
| /** Invokes a unary RPC. Uses the default StreamObserver for RPC events. */ |
| public Call invokeUnary(MessageLite request) throws ChannelOutputException { |
| return invokeUnary(request, defaultObserver()); |
| } |
| |
| /** Invokes a unary RPC. Uses the provided StreamObserver for RPC events. */ |
| public Call invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer) |
| throws ChannelOutputException { |
| checkCallType(Method.Type.UNARY); |
| return StreamObserverCall.start(rpcs(), rpc(), observer, request); |
| } |
| |
| /** Invokes a unary RPC with a future that collects the response. */ |
| public <ResponseT extends MessageLite> Call.UnaryFuture<ResponseT> invokeUnaryFuture( |
| MessageLite request) { |
| checkCallType(Method.Type.UNARY); |
| return new UnaryResponseFuture<>(rpcs(), rpc(), request); |
| } |
| |
| /** |
| * Starts a unary RPC, ignoring any errors that occur when opening. This can be used to start |
| * listening to responses to an RPC before the RPC server is available. |
| * |
| * <p>The RPC remains open until it is completed by the server with a response or error packet or |
| * cancelled. |
| */ |
| public Call openUnary(MessageLite request, StreamObserver<? extends MessageLite> observer) { |
| checkCallType(Method.Type.UNARY); |
| return StreamObserverCall.open(rpcs(), rpc(), observer, request); |
| } |
| |
| /** Invokes a server streaming RPC. Uses the default StreamObserver for RPC events. */ |
| public Call invokeServerStreaming(MessageLite request) throws ChannelOutputException { |
| return invokeServerStreaming(request, defaultObserver()); |
| } |
| |
| /** Invokes a server streaming RPC. Uses the provided StreamObserver for RPC events. */ |
| public Call invokeServerStreaming(MessageLite request, |
| StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { |
| checkCallType(Method.Type.SERVER_STREAMING); |
| return StreamObserverCall.start(rpcs(), rpc(), observer, request); |
| } |
| |
| /** Invokes a server streaming RPC with a future that collects the responses. */ |
| public Call.ServerStreamingFuture invokeServerStreamingFuture( |
| MessageLite request, Consumer<? extends MessageLite> onNext) { |
| checkCallType(Method.Type.SERVER_STREAMING); |
| return new StreamResponseFuture<>(rpcs(), rpc(), onNext, request); |
| } |
| |
| /** |
| * Starts a server streaming RPC, ignoring any errors that occur when opening. This can be used to |
| * start listening to responses to an RPC before the RPC server is available. |
| * |
| * <p>The RPC remains open until it is completed by the server with a response or error packet or |
| * cancelled. |
| */ |
| public Call openServerStreaming( |
| MessageLite request, StreamObserver<? extends MessageLite> observer) { |
| checkCallType(Method.Type.SERVER_STREAMING); |
| return StreamObserverCall.open(rpcs(), rpc(), observer, request); |
| } |
| |
| /** Invokes a client streaming RPC. Uses the default StreamObserver for RPC events. */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming() |
| throws ChannelOutputException { |
| return invokeClientStreaming(defaultObserver()); |
| } |
| |
| /** Invokes a client streaming RPC. Uses the provided StreamObserver for RPC events. */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming( |
| StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { |
| checkCallType(Method.Type.CLIENT_STREAMING); |
| return StreamObserverCall.start(rpcs(), rpc(), observer, null); |
| } |
| |
| /** Invokes a client streaming RPC with a future that collects the response. */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> |
| invokeClientStreamingFuture() { |
| checkCallType(Method.Type.CLIENT_STREAMING); |
| return new UnaryResponseFuture<>(rpcs(), rpc(), null); |
| } |
| |
| /** |
| * Starts a client streaming RPC, ignoring any errors that occur when opening. This can be used to |
| * start listening to responses to an RPC before the RPC server is available. |
| * |
| * <p>The RPC remains open until it is completed by the server with a response or error packet or |
| * cancelled. |
| */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openClientStreaming( |
| StreamObserver<? extends MessageLite> observer) { |
| checkCallType(Method.Type.CLIENT_STREAMING); |
| return StreamObserverCall.open(rpcs(), rpc(), observer, null); |
| } |
| |
| /** Invokes a bidirectional streaming RPC. Uses the default StreamObserver for RPC events. */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> |
| invokeBidirectionalStreaming() throws ChannelOutputException { |
| return invokeBidirectionalStreaming(defaultObserver()); |
| } |
| |
| /** Invokes a bidirectional streaming RPC. Uses the provided StreamObserver for RPC events. */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeBidirectionalStreaming( |
| StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { |
| checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); |
| return StreamObserverCall.start(rpcs(), rpc(), observer, null); |
| } |
| |
| /** Invokes a bidirectional streaming RPC with a future that finishes when the RPC finishes. */ |
| public <RequestT extends MessageLite, ResponseT extends MessageLite> |
| Call.BidirectionalStreamingFuture<RequestT> invokeBidirectionalStreamingFuture( |
| Consumer<ResponseT> onNext) { |
| checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); |
| return new StreamResponseFuture<>(rpcs(), rpc(), onNext, null); |
| } |
| |
| /** |
| * Starts a bidirectional streaming RPC, ignoring any errors that occur when opening. This can be |
| * used to start listening to responses to an RPC before the RPC server is available. |
| * |
| * <p>The RPC remains open until it is completed by the server with a response or error packet or |
| * cancelled. |
| */ |
| public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openBidirectionalStreaming( |
| StreamObserver<? extends MessageLite> observer) { |
| checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); |
| return StreamObserverCall.open(rpcs(), rpc(), observer, null); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <ResponseT extends MessageLite> StreamObserver<ResponseT> defaultObserver() { |
| return (StreamObserver<ResponseT>) defaultObserver; |
| } |
| |
| private void checkCallType(Method.Type expected) { |
| if (!rpc().method().type().equals(expected)) { |
| throw new UnsupportedOperationException(String.format( |
| "%s is a %s method, but it was invoked as a %s method. RPCs must be invoked by the" |
| + " appropriate invoke function.", |
| method().fullName(), |
| method().type().sentenceName(), |
| expected.sentenceName())); |
| } |
| } |
| } |