blob: 27b91617768793e2adc01809e0204168c95243d1 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
//
// Package integration_test implements a client to exercise the pw_grpc server implementation
package integration_test
import (
"bufio"
"context"
"fmt"
"hash/crc32"
"io"
"os/exec"
"strconv"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/status"
)
const port = "3402"
func TestUnaryEcho(t *testing.T) {
const num_connections = 1
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
defer conn.Close()
go logServer(t, reader)
testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
t.Logf("call UnaryEcho(%v)", msg)
resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg})
if err != nil {
t.Logf("... failed with error: %v", err.Error())
if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
t.Errorf("Error unexpected %v", err)
}
} else {
t.Logf("... Recv %v", resp)
if resp.Message != msg {
t.Errorf("Unexpected response %v", resp)
}
}
})
}
func TestFragmentedMessage(t *testing.T) {
// Test sending successively larger messages, larger than the maximum
// HTTP2 data frame size (16384), ensuring messages are fragmented across
// frames.
const num_connections = 1
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
defer conn.Close()
go logServer(t, reader)
const num_calls = 4
for i := 0; i < num_calls; i++ {
t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
msg := "crc32:" + strings.Repeat("testmessage!", 1500*(i+1))
checksum := strconv.FormatUint(uint64(crc32.ChecksumIEEE([]byte(msg))), 10)
done := make(chan struct{})
go func() {
t.Logf("call UnaryChecksum")
resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg})
if err != nil {
t.Logf("... failed with error: %v", err.Error())
if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
t.Errorf("Error unexpected %v", err)
}
} else {
t.Logf("... Recv %v", resp)
if resp.Message != checksum {
t.Errorf("Unexpected response %v", resp)
}
}
close(done)
}()
<-done
})
}
}
func TestMultipleConnections(t *testing.T) {
const num_connections = 3
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
go logServer(t, reader)
for i := 0; i < num_connections; i++ {
t.Run(fmt.Sprintf("connection %d of %d", i+1, num_connections), func(t *testing.T) {
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: "message0"})
if err != nil {
t.Errorf("... failed with error: %v", err.Error())
} else {
t.Logf("... Recv %v", resp)
if resp.Message != "message0" {
t.Errorf("Unexpected response %v", resp)
}
}
conn.Close()
})
}
}
func TestServerStreamingEcho(t *testing.T) {
const num_connections = 1
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
defer conn.Close()
go logServer(t, reader)
testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
t.Logf("call ServerStreamingEcho(%v)", msg)
client, err := echo_client.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: msg})
if err != nil {
t.Errorf("... failed with error: %v", err)
return
}
for {
resp, err := client.Recv()
if err == io.EOF {
t.Logf("... completed")
return
}
if err != nil {
t.Logf("... Recv failed with error: %v", err)
if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
t.Errorf("Error unexpected %v", err)
}
return
}
t.Logf("... Recv %v", resp)
if resp.Message != msg && resp.Message != "done" {
t.Errorf("Unexpected response %v", resp)
}
}
})
}
func TestClientStreamingEcho(t *testing.T) {
const num_connections = 1
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
defer conn.Close()
go logServer(t, reader)
testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
t.Logf("call ClientStreamingEcho()")
client, err := echo_client.ClientStreamingEcho(ctx)
if err != nil {
t.Errorf("... failed with error: %v", err)
return
}
for i := 0; i < 3; i++ {
t.Logf("... Send %v", msg)
if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil {
t.Errorf("... Send failed with error: %v", err)
return
}
}
if err := client.CloseSend(); err != nil {
t.Errorf("... CloseSend failed with error: %v", err)
return
}
resp, err := client.CloseAndRecv()
if err != nil {
t.Logf("... CloseAndRecv failed with error: %v", err)
if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
t.Errorf("Error unexpected %v", err)
}
} else {
t.Logf("... CloseAndRecv %v", resp)
if resp.Message != "done" {
t.Errorf("Unexpected response %v", resp)
}
}
})
}
func TestBidirectionalStreamingEcho(t *testing.T) {
const num_connections = 1
cmd, reader, err := launchServer(t, num_connections)
if err != nil {
t.Errorf("Failed to launch %v", err)
}
defer cmd.Wait()
conn, echo_client, err := connectServer()
if err != nil {
t.Errorf("Failed to connect %v", err)
}
defer conn.Close()
go logServer(t, reader)
testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
t.Logf("call BidirectionalStreamingEcho()")
client, err := echo_client.BidirectionalStreamingEcho(ctx)
if err != nil {
t.Logf("... failed with error: %v", err)
return
}
for i := 0; i < 3; i++ {
t.Logf("... Send %v", msg)
if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil {
t.Errorf("... Send failed with error: %v", err)
return
}
}
if err := client.CloseSend(); err != nil {
t.Logf("... CloseSend failed with error: %v", err)
return
}
for {
resp, err := client.Recv()
if err == io.EOF {
t.Logf("... completed")
return
}
if err != nil {
t.Logf("... Recv failed with error: %v", err)
if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
t.Errorf("Error unexpected %v", err)
}
return
}
t.Logf("... Recv %v", resp)
if resp.Message != msg {
t.Errorf("Unexpected response %v", resp)
}
}
})
}
func logServer(t *testing.T, reader *bufio.Reader) {
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
t.Logf("SERVER: %v", line)
}
}
func launchServer(t *testing.T, num_connections int) (*exec.Cmd, *bufio.Reader, error) {
cmd := exec.Command("./test_pw_rpc_server", port, strconv.Itoa(num_connections))
output, err := cmd.StdoutPipe()
if err != nil {
t.Errorf("Failed to get stdout of server %v", err)
return nil, nil, err
}
if err := cmd.Start(); err != nil {
t.Errorf("Failed to launch server %v", err)
return nil, nil, err
}
reader := bufio.NewReader(output)
for {
line, _ := reader.ReadString('\n')
if strings.Contains(line, "Accept") {
break
}
}
return cmd, reader, nil
}
func connectServer() (*grpc.ClientConn, pb.EchoClient, error) {
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}
echo_client := pb.NewEchoClient(conn)
return conn, echo_client, nil
}
func testRPC(t *testing.T, call func(t *testing.T, ctx context.Context, msg string)) {
const num_calls = 30
for i := 0; i < num_calls; i++ {
t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
msg := fmt.Sprintf("message%d", i)
if i == num_calls-1 {
msg = "quiet"
}
done := make(chan struct{})
go func() {
call(t, ctx, msg)
close(done)
}()
// Test cancellation. When we sent "quiet", the server won't echo anything
// back and instead will hold onto the request. Sleep a bit to make sure
// the server doesn't respond. Then cancel the request, which should
// complete the RPC.
if msg == "quiet" {
time.Sleep(100 * time.Millisecond)
cancel()
}
<-done
})
}
}