blob: f40043e153f6ddb39cf5fefee2d7433faee29a50 [file] [log] [blame]
// Copyright 2019 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
// Package pw_target_runner implements a target runner gRPC server which queues
// and distributes executables among a group of worker routines.
package pw_target_runner
import (
"context"
"errors"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
pb "pigweed.dev/proto/pw_target_runner/target_runner_pb"
)
var (
errServerNotBound = errors.New("Server not bound to a port")
errServerNotRunning = errors.New("Server is not running")
)
// Server is a gRPC server that runs a TargetRunner service.
type Server struct {
grpcServer *grpc.Server
listener net.Listener
tasksPassed uint32
tasksFailed uint32
startTime time.Time
active bool
workerPool *WorkerPool
}
// NewServer creates a gRPC server with a registered TargetRunner service.
func NewServer() *Server {
s := &Server{
grpcServer: grpc.NewServer(),
workerPool: newWorkerPool("ServerWorkerPool"),
}
reflection.Register(s.grpcServer)
pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s})
return s
}
// Bind starts a TCP listener on a specified port.
func (s *Server) Bind(port int) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
s.listener = lis
return nil
}
// RegisterWorker adds a worker to the server's worker pool.
func (s *Server) RegisterWorker(worker DeviceRunner) {
s.workerPool.RegisterWorker(worker)
}
// RunBinary runs an executable through a worker in the server, returning
// the worker's response. The function blocks until the executable has been
// processed.
func (s *Server) RunBinary(path string) (*RunResponse, error) {
if !s.active {
return nil, errServerNotRunning
}
resChan := make(chan *RunResponse, 1)
defer close(resChan)
s.workerPool.QueueExecutable(&RunRequest{
Path: path,
ResponseChannel: resChan,
})
res := <-resChan
if res.Err != nil {
return nil, res.Err
}
if res.Status == pb.RunStatus_SUCCESS {
s.tasksPassed++
} else {
s.tasksFailed++
}
return res, nil
}
// Serve starts the gRPC server on its configured port. Bind must have been
// called before this; an error is returned if it is not. This function blocks
// until the server is terminated.
func (s *Server) Serve() error {
if s.listener == nil {
return errServerNotBound
}
log.Printf("Starting gRPC server on %v\n", s.listener.Addr())
s.startTime = time.Now()
s.active = true
s.workerPool.Start()
return s.grpcServer.Serve(s.listener)
}
// pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC
// service.
type pwTargetRunnerService struct {
server *Server
}
// RunBinary runs a single executable on-device and returns its result.
func (s *pwTargetRunnerService) RunBinary(
ctx context.Context,
desc *pb.RunBinaryRequest,
) (*pb.RunBinaryResponse, error) {
runRes, err := s.server.RunBinary(desc.FilePath)
if err != nil {
return nil, status.Error(codes.Internal, "Internal server error")
}
res := &pb.RunBinaryResponse{
Result: runRes.Status,
QueueTimeNs: uint64(runRes.QueueTime),
RunTimeNs: uint64(runRes.RunTime),
Output: runRes.Output,
}
return res, nil
}
// Status returns information about the server.
func (s *pwTargetRunnerService) Status(
ctx context.Context,
_ *pb.Empty,
) (*pb.ServerStatus, error) {
resp := &pb.ServerStatus{
UptimeNs: uint64(time.Since(s.server.startTime)),
TasksPassed: s.server.tasksPassed,
TasksFailed: s.server.tasksFailed,
}
return resp, nil
}