grpc-demo/cmd/server/main.go

150 lines
3.5 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"syscall"
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/cmd/internal"
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/proto"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const defaultAddress = "localhost:50001"
func main() {
logger, err := internal.NewLogger()
if err != nil {
log.Fatalln(err)
}
defer func() {
_ = logger.Sync()
}()
srv, err := newServer(defaultAddress, logger)
if err != nil {
logger.Fatal("couldn't initialize server", zap.Error(err))
}
go func(srv *server, logger *zap.Logger) {
ctx, stop := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
syscall.SIGQUIT,
)
defer stop()
<-ctx.Done()
logger.Info("shutdown signal received")
srv.GracefulStop()
}(srv, logger)
logger.Info(
"listening and serving",
zap.String("address", srv.lis.Addr().String()),
)
if err = srv.Serve(); err != nil {
logger.Fatal("something went wrong while serving", zap.Error(err))
}
logger.Info("shutdown completed")
}
type server struct {
lis net.Listener
grpcSrv *grpc.Server
}
func newServer(address string, logger *zap.Logger) (*server, error) {
lis, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}
grpc_zap.ReplaceGrpcLoggerV2(logger)
srv := grpc.NewServer(
grpc.ChainUnaryInterceptor(
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(createPanicHandler(logger))),
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_zap.UnaryServerInterceptor(logger),
),
grpc.ChainStreamInterceptor(
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(createPanicHandler(logger))),
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_zap.StreamServerInterceptor(logger),
),
)
proto.RegisterGreeterServer(srv, &greeterServer{})
return &server{
lis: lis,
grpcSrv: srv,
}, nil
}
func (s *server) Serve() error {
return s.grpcSrv.Serve(s.lis)
}
func (s *server) GracefulStop() {
// GracefulStop also closes the listener.
s.grpcSrv.GracefulStop()
}
func createPanicHandler(logger *zap.Logger) grpc_recovery.RecoveryHandlerFunc {
return func(p interface{}) error {
logger.Panic(fmt.Sprintf("%v", p), zap.Stack("stack"))
return status.Errorf(codes.Internal, "internal server error")
}
}
type greeterServer struct {
proto.UnimplementedGreeterServer
}
var _ proto.GreeterServer = (*greeterServer)(nil)
func (g *greeterServer) SayHello(_ context.Context, r *proto.HelloRequest) (*proto.HelloReply, error) {
return &proto.HelloReply{
Message: buildHelloMsg(r.GetName()),
}, nil
}
func (g *greeterServer) SayHelloBidirectionalStream(stream proto.Greeter_SayHelloBidirectionalStreamServer) error {
for {
r, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return fmt.Errorf("stream returned error: %w", err)
}
if err = stream.Send(&proto.HelloReply{Message: buildHelloMsg(r.GetName())}); err != nil {
return fmt.Errorf("couldn't send reply: %w", err)
}
}
}
func buildHelloMsg(name string) string {
return "Hello " + name
}