add client streaming example
This commit is contained in:
parent
0ef6d7f682
commit
e05f599062
|
@ -27,6 +27,7 @@ const defaultAddress = "localhost:50001"
|
|||
|
||||
const (
|
||||
exampleUnary = "unary"
|
||||
exampleClient = "client"
|
||||
exampleBidirectional = "bidirectional"
|
||||
)
|
||||
|
||||
|
@ -62,6 +63,8 @@ func main() {
|
|||
switch example {
|
||||
case exampleUnary:
|
||||
sayHello(ctx, client, logger)
|
||||
case exampleClient:
|
||||
sayHelloClientStream(ctx, client, logger)
|
||||
case exampleBidirectional:
|
||||
sayHelloBidirectional(ctx, client, logger)
|
||||
default:
|
||||
|
@ -75,7 +78,7 @@ func parseExample() string {
|
|||
&example,
|
||||
"example",
|
||||
exampleUnary,
|
||||
fmt.Sprintf("%s or %s (default: %s)", exampleUnary, exampleBidirectional, exampleUnary),
|
||||
fmt.Sprintf("%s, %s or %s (default: %s)", exampleUnary, exampleClient, exampleBidirectional, exampleUnary),
|
||||
)
|
||||
flag.Parse()
|
||||
return example
|
||||
|
@ -124,6 +127,39 @@ func sayHello(ctx context.Context, client proto.GreeterClient, logger *zap.Logge
|
|||
logger.Info("reply received", zap.String("message", reply.GetMessage()))
|
||||
}
|
||||
|
||||
func sayHelloClientStream(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
|
||||
streamClient, err := client.SayHelloClientStream(context.Background())
|
||||
if err != nil {
|
||||
logger.Fatal("something went wrong while calling SayHelloClientStream", zap.Error(err))
|
||||
}
|
||||
defer func() {
|
||||
reply, err := streamClient.CloseAndRecv()
|
||||
if err != nil {
|
||||
logger.Fatal("couldn't receive reply", zap.Error(err))
|
||||
}
|
||||
logger.Info("reply received", zap.String("message", reply.GetMessage()))
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
name := uuid.NewString()
|
||||
if err := streamClient.Send(&proto.HelloRequest{
|
||||
Name: name,
|
||||
}); err != nil {
|
||||
logger.Warn("couldn't send request", zap.Error(err))
|
||||
} else {
|
||||
logger.Info("request sent", zap.String("name", name))
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sayHelloBidirectional(ctx context.Context, client proto.GreeterClient, logger *zap.Logger) {
|
||||
// ctx is used in the 2nd goroutine to close the stream client
|
||||
//nolint:contextcheck
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"gitea.dwysokinski.me/Kichiyaki/grpc-g2a/cmd/internal"
|
||||
|
@ -155,6 +156,26 @@ func (g *greeterServer) SayHello(_ context.Context, r *proto.HelloRequest) (*pro
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (g *greeterServer) SayHelloClientStream(stream proto.Greeter_SayHelloClientStreamServer) error {
|
||||
var b strings.Builder
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
return stream.SendAndClose(&proto.HelloReply{
|
||||
Message: b.String(),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream returned error: %w", err)
|
||||
}
|
||||
|
||||
if b.Len() > 0 {
|
||||
b.WriteString(" ")
|
||||
}
|
||||
b.WriteString(buildHelloMsg(r.GetName()))
|
||||
}
|
||||
}
|
||||
|
||||
func (g *greeterServer) SayHelloBidirectionalStream(stream proto.Greeter_SayHelloBidirectionalStreamServer) error {
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
|
@ -172,5 +193,5 @@ func (g *greeterServer) SayHelloBidirectionalStream(stream proto.Greeter_SayHell
|
|||
}
|
||||
|
||||
func buildHelloMsg(name string) string {
|
||||
return "Hello " + name
|
||||
return "Hello " + name + "!"
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ option go_package = "gitea.dwysokinski.me/Kichiyaki/grpc-g2a/proto";
|
|||
|
||||
service Greeter {
|
||||
rpc SayHello (HelloRequest) returns (HelloReply) {}
|
||||
rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}
|
||||
rpc SayHelloBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue