blob: 55fd88aa545fa86189be827606aab56cbc815eec [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_rpc;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.protobuf.MessageLite;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
public final class StreamObserverMethodClientTest {
private static final Service SERVICE = new Service("pw.rpc.test1.TheTestService",
Service.unaryMethod("SomeUnary", SomeMessage.class, AnotherMessage.class),
Service.serverStreamingMethod("SomeServerStreaming", SomeMessage.class, AnotherMessage.class),
Service.clientStreamingMethod("SomeClientStreaming", SomeMessage.class, AnotherMessage.class),
Service.bidirectionalStreamingMethod(
"SomeBidirectionalStreaming", SomeMessage.class, AnotherMessage.class));
private static final Channel CHANNEL = new Channel(1, (bytes) -> {});
private static final PendingRpc UNARY_RPC =
PendingRpc.create(CHANNEL, SERVICE, SERVICE.method("SomeUnary"));
private static final PendingRpc SERVER_STREAMING_RPC =
PendingRpc.create(CHANNEL, SERVICE, SERVICE.method("SomeServerStreaming"));
private static final PendingRpc CLIENT_STREAMING_RPC =
PendingRpc.create(CHANNEL, SERVICE, SERVICE.method("SomeClientStreaming"));
private static final PendingRpc BIDIRECTIONAL_STREAMING_RPC =
PendingRpc.create(CHANNEL, SERVICE, SERVICE.method("SomeBidirectionalStreaming"));
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
@Mock private StreamObserver<MessageLite> defaultObserver;
private final RpcManager rpcManager = new RpcManager();
private MethodClient unaryMethodClient;
private MethodClient serverStreamingMethodClient;
private MethodClient clientStreamingMethodClient;
private MethodClient bidirectionalStreamingMethodClient;
@Before
public void createMethodClient() {
unaryMethodClient = new MethodClient(rpcManager, UNARY_RPC, defaultObserver);
serverStreamingMethodClient =
new MethodClient(rpcManager, SERVER_STREAMING_RPC, defaultObserver);
clientStreamingMethodClient =
new MethodClient(rpcManager, CLIENT_STREAMING_RPC, defaultObserver);
bidirectionalStreamingMethodClient =
new MethodClient(rpcManager, BIDIRECTIONAL_STREAMING_RPC, defaultObserver);
}
@Test
public void invokeWithNoObserver_usesDefaultObserver() throws Exception {
unaryMethodClient.invokeUnary(SomeMessage.getDefaultInstance());
AnotherMessage reply = AnotherMessage.newBuilder().setPayload("yo").build();
rpcManager.getPending(UNARY_RPC).onNext(reply.toByteString());
verify(defaultObserver).onNext(reply);
}
@Test
public void invoke_usesProvidedObserver() throws Exception {
@SuppressWarnings("unchecked")
StreamObserver<AnotherMessage> observer =
(StreamObserver<AnotherMessage>) mock(StreamObserver.class);
unaryMethodClient.invokeUnary(SomeMessage.getDefaultInstance(), observer);
AnotherMessage reply = AnotherMessage.newBuilder().setPayload("yo").build();
rpcManager.getPending(UNARY_RPC).onNext(reply.toByteString());
verify(observer).onNext(reply);
}
@Test
public void invokeUnary_startsRpc() throws Exception {
Call call = unaryMethodClient.invokeUnary(SomeMessage.getDefaultInstance());
assertThat(rpcManager.getPending(UNARY_RPC)).isSameInstanceAs(call);
}
@Test
public void openUnary_startsRpc() {
Call call = unaryMethodClient.openUnary(SomeMessage.getDefaultInstance(), defaultObserver);
assertThat(rpcManager.getPending(UNARY_RPC)).isSameInstanceAs(call);
}
@Test
public void invokeServerStreaming_startsRpc() throws Exception {
Call call = serverStreamingMethodClient.invokeServerStreaming(SomeMessage.getDefaultInstance());
assertThat(rpcManager.getPending(SERVER_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void openServerStreaming_startsRpc() {
Call call = serverStreamingMethodClient.openServerStreaming(
SomeMessage.getDefaultInstance(), defaultObserver);
assertThat(rpcManager.getPending(SERVER_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void invokeClientStreaming_startsRpc() throws Exception {
Call call = clientStreamingMethodClient.invokeClientStreaming();
assertThat(rpcManager.getPending(CLIENT_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void openClientStreaming_startsRpc() {
Call call = clientStreamingMethodClient.openClientStreaming(defaultObserver);
assertThat(rpcManager.getPending(CLIENT_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void invokeBidirectionalStreaming_startsRpc() throws Exception {
Call call = bidirectionalStreamingMethodClient.invokeBidirectionalStreaming();
assertThat(rpcManager.getPending(BIDIRECTIONAL_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void openBidirectionalStreaming_startsRpc() {
Call call = bidirectionalStreamingMethodClient.openBidirectionalStreaming(defaultObserver);
assertThat(rpcManager.getPending(BIDIRECTIONAL_STREAMING_RPC)).isSameInstanceAs(call);
}
@Test
public void invokeUnaryFuture_startsRpc() {
unaryMethodClient.invokeUnaryFuture(SomeMessage.getDefaultInstance());
assertThat(rpcManager.getPending(UNARY_RPC)).isNotNull();
}
@Test
public void invokeServerStreamingFuture_startsRpc() {
serverStreamingMethodClient.invokeServerStreamingFuture(
SomeMessage.getDefaultInstance(), (msg) -> {});
assertThat(rpcManager.getPending(SERVER_STREAMING_RPC)).isNotNull();
}
@Test
public void invokeClientStreamingFuture_startsRpc() {
clientStreamingMethodClient.invokeClientStreamingFuture();
assertThat(rpcManager.getPending(CLIENT_STREAMING_RPC)).isNotNull();
}
@Test
public void invokeBidirectionalStreamingFuture_startsRpc() {
bidirectionalStreamingMethodClient.invokeBidirectionalStreamingFuture((msg) -> {});
assertThat(rpcManager.getPending(BIDIRECTIONAL_STREAMING_RPC)).isNotNull();
}
@Test
public void invokeUnary_serverStreamingRpc_throwsException() {
assertThrows(UnsupportedOperationException.class,
() -> serverStreamingMethodClient.invokeUnary(SomeMessage.getDefaultInstance()));
}
@Test
public void invokeServerStreaming_unaryRpc_throwsException() {
assertThrows(UnsupportedOperationException.class,
() -> unaryMethodClient.invokeServerStreaming(SomeMessage.getDefaultInstance()));
}
@Test
public void invokeClientStreaming_bidirectionalStreamingRpc_throwsException() {
assertThrows(UnsupportedOperationException.class,
() -> bidirectionalStreamingMethodClient.invokeUnary(SomeMessage.getDefaultInstance()));
}
@Test
public void invokeBidirectionalStreaming_clientStreamingRpc_throwsException() {
assertThrows(UnsupportedOperationException.class,
() -> clientStreamingMethodClient.invokeBidirectionalStreaming());
}
}