grpc-demo/cmd/client/main.go

187 lines
4.0 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/cmd/internal"
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/proto"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const defaultAddress = "localhost:50001"
const (
exampleUnary = "unary"
exampleBidirectional = "bidirectional"
)
func main() {
logger, err := internal.NewLogger()
if err != nil {
log.Fatalln(err)
}
defer func() {
_ = logger.Sync()
}()
example := parseExample()
conn, err := newConn()
if err != nil {
logger.Fatal("couldn't connect to grpc server", zap.Error(err), zap.String("address", defaultAddress))
}
defer func() {
_ = conn.Close()
}()
client := proto.NewGreeterClient(conn)
ctx, stop := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
syscall.SIGQUIT,
)
defer stop()
switch example {
case exampleUnary:
sayHello(ctx, client, logger)
case exampleBidirectional:
sayHelloBidirectional(ctx, client, logger)
default:
logger.Error("example doesn't exist", zap.String("example", example))
}
}
func parseExample() string {
example := ""
flag.StringVar(
&example,
"example",
exampleUnary,
fmt.Sprintf("%s or %s (default: %s)", exampleUnary, exampleBidirectional, exampleUnary),
)
flag.Parse()
return example
}
func newConn() (*grpc.ClientConn, error) {
transport, err := newTransportCredentials()
if err != nil {
return nil, err
}
return grpc.Dial(
"localhost:50001",
grpc.WithTransportCredentials(transport),
grpc.WithBlock(),
)
}
func newTransportCredentials() (credentials.TransportCredentials, error) {
cert, err := tls.LoadX509KeyPair("./certs/client.pem", "./certs/client-key.pem")
if err != nil {
return nil, fmt.Errorf("couldn't load client cert: %w", err)
}
data, err := os.ReadFile("./certs/ca.pem")
if err != nil {
return nil, fmt.Errorf("couldn't load CA file: %w", err)
}
capool := x509.NewCertPool()
if !capool.AppendCertsFromPEM(data) {
return nil, errors.New("couldn't add ca cart to cert pool")
}
return credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: capool,
}), nil
}
func sayHello(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
reply, err := client.SayHello(ctx, &proto.HelloRequest{Name: uuid.NewString()})
if err != nil {
logger.Fatal("something went wrong while calling SayHello", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
func sayHelloBidirectional(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
// ctx is used in the 2nd goroutine to close the stream client
//nolint:contextcheck
streamClient, err := client.SayHelloBidirectionalStream(context.Background())
if err != nil {
logger.Fatal("something went wrong while calling SayHelloBidirectionalStream", zap.Error(err))
}
// streamClient is closed later in the 2nd goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
reply, err := streamClient.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Warn("couldn't receive reply", zap.Error(err))
return
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
name := uuid.NewString()
err := streamClient.Send(&proto.HelloRequest{
Name: name,
})
if err != nil {
logger.Warn("couldn't send request", zap.Error(err))
} else {
logger.Info("request sent", zap.String("name", name))
}
case <-ctx.Done():
if err := streamClient.CloseSend(); err != nil {
logger.Warn("couldn't close stream", zap.Error(err))
} else {
logger.Info("stream client closed")
}
return
}
}
}()
wg.Wait()
}