grpc-demo/cmd/client/main.go

330 lines
8.0 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"gitea.dwysokinski.me/Kichiyaki/grpc-demo/cmd/internal"
"gitea.dwysokinski.me/Kichiyaki/grpc-demo/proto"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const defaultAddress = "localhost:50001"
const (
exampleUnary = "unary"
exampleServer = "server"
exampleClient = "client"
exampleBidirectional = "bidirectional"
exampleJames = "james"
exampleJamesError = "jameserror"
)
func main() {
logger, err := internal.NewLogger()
if err != nil {
log.Fatalln(err)
}
defer func() {
_ = logger.Sync()
}()
example := parseExample()
conn, err := newConn()
if err != nil {
logger.Fatal("couldn't connect to grpc server", zap.Error(err), zap.String("address", defaultAddress))
}
defer func() {
_ = conn.Close()
}()
client := proto.NewGreeterClient(conn)
ctx, stop := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
syscall.SIGQUIT,
)
defer stop()
switch example {
case exampleUnary:
sayHello(ctx, client, logger)
case exampleJames:
sayHelloToJames(ctx, client, logger)
case exampleJamesError:
sayHelloToJamesError(ctx, client, logger)
case exampleServer:
sayHelloServerStream(ctx, client, logger)
case exampleClient:
sayHelloClientStream(ctx, client, logger)
case exampleBidirectional:
sayHelloBidirectional(ctx, client, logger)
default:
logger.Error("example doesn't exist", zap.String("example", example))
}
}
func parseExample() string {
example := ""
flag.StringVar(&example, "example", exampleUnary, fmt.Sprintf(
"%s, %s, %s, %s, %s or %s (default: %s)",
exampleUnary,
exampleJames,
exampleJamesError,
exampleServer,
exampleClient,
exampleBidirectional,
exampleUnary,
))
flag.Parse()
return example
}
func newConn() (*grpc.ClientConn, error) {
transport, err := newTransportCredentials()
if err != nil {
return nil, err
}
return grpc.Dial(
"localhost:50001",
grpc.WithTransportCredentials(transport),
grpc.WithBlock(),
)
}
func newTransportCredentials() (credentials.TransportCredentials, error) {
cert, err := tls.LoadX509KeyPair("./certs/client.pem", "./certs/client-key.pem")
if err != nil {
return nil, fmt.Errorf("couldn't load client cert: %w", err)
}
data, err := os.ReadFile("./certs/ca.pem")
if err != nil {
return nil, fmt.Errorf("couldn't load CA file: %w", err)
}
capool := x509.NewCertPool()
if !capool.AppendCertsFromPEM(data) {
return nil, errors.New("couldn't add ca cart to cert pool")
}
return credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: capool,
MinVersion: tls.VersionTLS13,
}), nil
}
func sayHello(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
name := uuid.NewString()
logger.Info("sending request", zap.String("name", name))
headers := metadata.New(nil)
trailer := metadata.New(nil)
reply, err := client.SayHello(
metadata.NewOutgoingContext(ctx, metadata.Pairs("k1", "v1")),
&proto.HelloRequest{Name: name},
grpc.UseCompressor(gzip.Name),
grpc.Header(&headers),
grpc.Trailer(&trailer),
)
if err != nil {
logger.Fatal("something went wrong while calling SayHello", zap.Error(err))
}
logger.Info(
"reply received",
zap.String("message", reply.GetMessage()),
zap.Any("headers", headers),
zap.Any("trailer", trailer),
)
}
func sayHelloToJames(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
name := "James"
logger.Info("sending request", zap.String("name", name))
reply, err := client.SayHelloToJames(ctx, &proto.HelloRequest{Name: name})
if err != nil {
logger.Fatal("something went wrong while calling SayHello", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
func sayHelloToJamesError(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
name := "SomeoneElse"
logger.Info("sending request", zap.String("name", name))
reply, err := client.SayHelloToJames(ctx, &proto.HelloRequest{Name: name})
if err == nil {
logger.Info("unexpected reply received", zap.String("message", reply.GetMessage()))
return
}
st, ok := status.FromError(err)
if !ok {
logger.Fatal("received unexpected error", zap.Error(err))
}
var violations []string
for _, detail := range st.Details() {
switch d := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range d.GetFieldViolations() {
violations = append(violations, violation.GetField()+": "+violation.GetDescription())
}
default:
logger.Debug("unhandled detail", zap.Any("detail", d))
}
}
logger.Info("received error", zap.Error(err), zap.Strings("violations", violations))
}
const (
namesLen = 20
)
func sayHelloServerStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
names := make([]string, namesLen)
for i := range names {
names[i] = uuid.NewString()
}
logger.Info("sending request", zap.Strings("names", names))
streamClient, err := client.SayHelloServerStream(ctx, &proto.MultiHelloRequest{Names: names})
if err != nil {
logger.Fatal("something went wrong while calling SayHelloServerStream", zap.Error(err))
}
defer func() {
_ = streamClient.CloseSend()
}()
for {
reply, err := streamClient.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Fatal("couldn't receive reply", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
}
func sayHelloClientStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
// ctx is used later to close the stream client
//nolint:contextcheck
streamClient, err := client.SayHelloClientStream(context.Background())
if err != nil {
logger.Fatal("something went wrong while calling SayHelloClientStream", zap.Error(err))
}
defer func() {
reply, err := streamClient.CloseAndRecv()
if err != nil {
logger.Fatal("couldn't receive reply", zap.Error(err))
}
logger.Info("reply received", zap.Strings("messages", reply.GetMessages()))
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
name := uuid.NewString()
if err := streamClient.Send(&proto.HelloRequest{
Name: name,
}); err != nil {
logger.Warn("couldn't send request", zap.Error(err))
} else {
logger.Info("request sent", zap.String("name", name))
}
case <-ctx.Done():
return
}
}
}
func sayHelloBidirectional(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
// ctx is used in the 2nd goroutine to close the stream client
//nolint:contextcheck
streamClient, err := client.SayHelloBidirectionalStream(context.Background())
if err != nil {
logger.Fatal("something went wrong while calling SayHelloBidirectionalStream", zap.Error(err))
}
// streamClient is closed later in the 2nd goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
reply, err := streamClient.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Warn("couldn't receive reply", zap.Error(err))
return
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
name := uuid.NewString()
err := streamClient.Send(&proto.HelloRequest{
Name: name,
})
if err != nil {
logger.Warn("couldn't send request", zap.Error(err))
} else {
logger.Info("request sent", zap.String("name", name))
}
case <-ctx.Done():
if err := streamClient.CloseSend(); err != nil {
logger.Warn("couldn't close stream", zap.Error(err))
} else {
logger.Info("stream client closed")
}
return
}
}
}()
wg.Wait()
}