| // Copyright 2022 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| package dev.pigweed.pw_rpc; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.protobuf.MessageLite; |
| import dev.pigweed.pw_log.Logger; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.Consumer; |
| import javax.annotation.Nullable; |
| |
| /** |
| * Call implementation that represents the call as a ListenableFuture. |
| */ |
| abstract class FutureCall<RequestT extends MessageLite, ResponseT extends MessageLite, ResultT> |
| extends AbstractCall<RequestT, ResponseT> implements ListenableFuture<ResultT> { |
| private static final Logger logger = Logger.forClass(FutureCall.class); |
| |
| private final SettableFuture<ResultT> future = SettableFuture.create(); |
| |
| private FutureCall(Endpoint rpcs, PendingRpc rpc) { |
| super(rpcs, rpc); |
| } |
| |
| // Implement the ListenableFuture interface by forwarding the internal SettableFuture. |
| |
| @Override |
| public final void addListener(Runnable runnable, Executor executor) { |
| future.addListener(runnable, executor); |
| } |
| |
| /** Cancellation means that a cancel() or cancel(boolean) call succeeded. */ |
| @Override |
| public final boolean isCancelled() { |
| return error() == Status.CANCELLED; |
| } |
| |
| @Override |
| public final boolean isDone() { |
| return future.isDone(); |
| } |
| |
| @Override |
| public final ResultT get() throws InterruptedException, ExecutionException { |
| return future.get(); |
| } |
| |
| @Override |
| public final ResultT get(long timeout, TimeUnit unit) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| return future.get(timeout, unit); |
| } |
| |
| @Override |
| public final boolean cancel(boolean mayInterruptIfRunning) { |
| try { |
| return this.cancel(); |
| } catch (ChannelOutputException e) { |
| logger.atWarning().withCause(e).log("Failed to send cancellation packet for %s", rpc()); |
| return true; // handleError() was already called, so the future was cancelled |
| } |
| } |
| |
| /** Used by derived classes to access the future instance. */ |
| final SettableFuture<ResultT> future() { |
| return future; |
| } |
| |
| void start(@Nullable RequestT request) { |
| try { |
| rpcManager().start(this, request); |
| } catch (ChannelOutputException e) { |
| // Stash the exception in the future and abort the call. |
| // |
| // In this instance, it's okay to modify the call's state from whichever thread is starting |
| // the call. This call never started, and so other threads have access to the object. |
| future.setException(e); |
| |
| // Set the status to mark the call completed. doHandleError() will have no effect since the |
| // exception was already set. |
| handleError(Status.ABORTED); |
| } |
| } |
| |
| @Override |
| public void doHandleError() { |
| future.setException(new RpcError(rpc(), error())); |
| } |
| |
| /** Future-based Call class for unary and client streaming RPCs. */ |
| static class UnaryResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite> |
| extends FutureCall<RequestT, ResponseT, UnaryResult<ResponseT>> |
| implements ClientStreamingFuture<RequestT, ResponseT> { |
| @Nullable ResponseT response = null; |
| |
| UnaryResponseFuture(Endpoint rpcs, PendingRpc rpc, @Nullable RequestT request) { |
| super(rpcs, rpc); |
| start(request); |
| } |
| |
| @Override |
| public void doHandleNext(ResponseT value) { |
| if (response == null) { |
| response = value; |
| } else { |
| future().setException(new IllegalStateException("Unary RPC received multiple responses.")); |
| } |
| } |
| |
| @Override |
| public void doHandleCompleted() { |
| if (response == null) { |
| future().setException( |
| new IllegalStateException("Unary RPC completed without a response payload")); |
| } else { |
| future().set(UnaryResult.create(response, status())); |
| } |
| } |
| } |
| |
| /** Future-based Call class for server and bidirectional streaming RPCs. */ |
| static class StreamResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite> |
| extends FutureCall<RequestT, ResponseT, Status> |
| implements BidirectionalStreamingFuture<RequestT> { |
| private final Consumer<ResponseT> onNext; |
| |
| StreamResponseFuture( |
| Endpoint rpcs, PendingRpc rpc, Consumer<ResponseT> onNext, @Nullable RequestT request) { |
| super(rpcs, rpc); |
| this.onNext = onNext; |
| start(request); |
| } |
| |
| @Override |
| public void doHandleNext(ResponseT value) { |
| onNext.accept(value); |
| } |
| |
| @Override |
| public void doHandleCompleted() { |
| future().set(status()); |
| } |
| } |
| } |