From e05f5990626bced1278aa7651e0c433bd9aa1ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Wed, 3 May 2023 07:48:39 +0200 Subject: [PATCH] add client streaming example --- cmd/client/main.go | 38 +++++++++++++++++++++++++++++++++++++- cmd/server/main.go | 23 ++++++++++++++++++++++- proto/hello_world.proto | 1 + 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 0751b0a..a380715 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -27,6 +27,7 @@ const defaultAddress = "localhost:50001" const ( exampleUnary = "unary" + exampleClient = "client" exampleBidirectional = "bidirectional" ) @@ -62,6 +63,8 @@ func main() { switch example { case exampleUnary: sayHello(ctx, client, logger) + case exampleClient: + sayHelloClientStream(ctx, client, logger) case exampleBidirectional: sayHelloBidirectional(ctx, client, logger) default: @@ -75,7 +78,7 @@ func parseExample() string { &example, "example", exampleUnary, - fmt.Sprintf("%s or %s (default: %s)", exampleUnary, exampleBidirectional, exampleUnary), + fmt.Sprintf("%s, %s or %s (default: %s)", exampleUnary, exampleClient, exampleBidirectional, exampleUnary), ) flag.Parse() return example @@ -124,6 +127,39 @@ func sayHello(ctx context.Context, client proto.GreeterClient, logger *zap.Logge logger.Info("reply received", zap.String("message", reply.GetMessage())) } +func sayHelloClientStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) { + streamClient, err := client.SayHelloClientStream(context.Background()) + if err != nil { + logger.Fatal("something went wrong while calling SayHelloClientStream", zap.Error(err)) + } + defer func() { + reply, err := streamClient.CloseAndRecv() + if err != nil { + logger.Fatal("couldn't receive reply", zap.Error(err)) + } + logger.Info("reply received", zap.String("message", reply.GetMessage())) + }() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + name := uuid.NewString() + if err := streamClient.Send(&proto.HelloRequest{ + Name: name, + }); err != nil { + logger.Warn("couldn't send request", zap.Error(err)) + } else { + logger.Info("request sent", zap.String("name", name)) + } + case <-ctx.Done(): + return + } + } +} + func sayHelloBidirectional(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) { // ctx is used in the 2nd goroutine to close the stream client //nolint:contextcheck diff --git a/cmd/server/main.go b/cmd/server/main.go index 78dc0bd..657945e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -11,6 +11,7 @@ import ( "net" "os" "os/signal" + "strings" "syscall" "gitea.dwysokinski.me/Kichiyaki/grpc-g2a/cmd/internal" @@ -155,6 +156,26 @@ func (g *greeterServer) SayHello(_ context.Context, r *proto.HelloRequest) (*pro }, nil } +func (g *greeterServer) SayHelloClientStream(stream proto.Greeter_SayHelloClientStreamServer) error { + var b strings.Builder + for { + r, err := stream.Recv() + if errors.Is(err, io.EOF) { + return stream.SendAndClose(&proto.HelloReply{ + Message: b.String(), + }) + } + if err != nil { + return fmt.Errorf("stream returned error: %w", err) + } + + if b.Len() > 0 { + b.WriteString(" ") + } + b.WriteString(buildHelloMsg(r.GetName())) + } +} + func (g *greeterServer) SayHelloBidirectionalStream(stream proto.Greeter_SayHelloBidirectionalStreamServer) error { for { r, err := stream.Recv() @@ -172,5 +193,5 @@ func (g *greeterServer) SayHelloBidirectionalStream(stream proto.Greeter_SayHell } func buildHelloMsg(name string) string { - return "Hello " + name + return "Hello " + name + "!" } diff --git a/proto/hello_world.proto b/proto/hello_world.proto index 55357c4..9802120 100644 --- a/proto/hello_world.proto +++ b/proto/hello_world.proto @@ -4,6 +4,7 @@ option go_package = "gitea.dwysokinski.me/Kichiyaki/grpc-g2a/proto"; service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} + rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {} rpc SayHelloBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {} }