add server streaming example

This commit is contained in:
Dawid Wysokiński 2023-05-03 08:11:10 +02:00
parent 58634b3761
commit 9069494e7f
Signed by: Kichiyaki
GPG Key ID: B5445E357FB8B892
3 changed files with 65 additions and 12 deletions

View File

@ -27,6 +27,7 @@ const defaultAddress = "localhost:50001"
const (
exampleUnary = "unary"
exampleServer = "server"
exampleClient = "client"
exampleBidirectional = "bidirectional"
)
@ -63,6 +64,8 @@ func main() {
switch example {
case exampleUnary:
sayHello(ctx, client, logger)
case exampleServer:
sayHelloServerStream(ctx, client, logger)
case exampleClient:
sayHelloClientStream(ctx, client, logger)
case exampleBidirectional:
@ -78,7 +81,7 @@ func parseExample() string {
&example,
"example",
exampleUnary,
fmt.Sprintf("%s, %s or %s (default: %s)", exampleUnary, exampleClient, exampleBidirectional, exampleUnary),
fmt.Sprintf("%s, %s, %s or %s (default: %s)", exampleUnary, exampleServer, exampleClient, exampleBidirectional, exampleUnary),
)
flag.Parse()
return example
@ -121,13 +124,47 @@ func newTransportCredentials() (credentials.TransportCredentials, error) {
}
func sayHello(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
reply, err := client.SayHello(ctx, &proto.HelloRequest{Name: uuid.NewString()})
name := uuid.NewString()
logger.Info("sending request", zap.String("name", name))
reply, err := client.SayHello(ctx, &proto.HelloRequest{Name: name})
if err != nil {
logger.Fatal("something went wrong while calling SayHello", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
const (
namesLen = 20
)
func sayHelloServerStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
names := make([]string, namesLen)
for i := range names {
names[i] = uuid.NewString()
}
logger.Info("sending request", zap.Strings("names", names))
streamClient, err := client.SayHelloServerStream(ctx, &proto.MultiHelloRequest{Names: names})
if err != nil {
logger.Fatal("something went wrong while calling SayHelloServerStream", zap.Error(err))
}
defer func() {
_ = streamClient.CloseSend()
}()
for {
reply, err := streamClient.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Fatal("couldn't receive reply", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
}
}
func sayHelloClientStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
// ctx is used later to close the stream client
//nolint:contextcheck
@ -140,7 +177,7 @@ func sayHelloClientStream(ctx context.Context, client proto.GreeterClient, logge
if err != nil {
logger.Fatal("couldn't receive reply", zap.Error(err))
}
logger.Info("reply received", zap.String("message", reply.GetMessage()))
logger.Info("reply received", zap.Strings("messages", reply.GetMessages()))
}()
ticker := time.NewTicker(time.Second)

View File

@ -11,7 +11,6 @@ import (
"net"
"os"
"os/signal"
"strings"
"syscall"
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/cmd/internal"
@ -157,23 +156,31 @@ func (g *greeterServer) SayHello(_ context.Context, r *proto.HelloRequest) (*pro
}, nil
}
func (g *greeterServer) SayHelloServerStream(r *proto.MultiHelloRequest, stream proto.Greeter_SayHelloServerStreamServer) error {
for _, n := range r.GetNames() {
if err := stream.Send(&proto.HelloReply{
Message: buildHelloMsg(n),
}); err != nil {
return err
}
}
return nil
}
func (g *greeterServer) SayHelloClientStream(stream proto.Greeter_SayHelloClientStreamServer) error {
var b strings.Builder
var messages []string
for {
r, err := stream.Recv()
if errors.Is(err, io.EOF) {
return stream.SendAndClose(&proto.HelloReply{
Message: b.String(),
return stream.SendAndClose(&proto.MultiHelloReply{
Messages: messages,
})
}
if err != nil {
return fmt.Errorf("stream returned error: %w", err)
}
if b.Len() > 0 {
b.WriteString(" ")
}
b.WriteString(buildHelloMsg(r.GetName()))
messages = append(messages, buildHelloMsg(r.GetName()))
}
}

View File

@ -4,7 +4,8 @@ option go_package = "gitea.dwysokinski.me/Kichiyaki/grpc-g2a/proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}
rpc SayHelloClientStream (stream HelloRequest) returns (MultiHelloReply) {}
rpc SayHelloServerStream (MultiHelloRequest) returns (stream HelloReply) {}
rpc SayHelloBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
}
@ -12,6 +13,14 @@ message HelloRequest {
string name = 1;
}
message MultiHelloRequest {
repeated string names = 1;
}
message HelloReply {
string message = 1;
}
message MultiHelloReply {
repeated string messages = 1;
}