From 9433d58828f66e6c6bfa8bb658752772601730dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Tue, 11 Apr 2023 08:08:02 +0200 Subject: [PATCH] init --- .gitignore | 1 + docker-compose.yml | 8 +++ go.mod | 18 ++++++ go.sum | 27 +++++++++ main.go | 139 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 193 insertions(+) create mode 100644 .gitignore create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0d98d54 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: "3.6" +services: + nats: + image: nats:2.9.15 + ports: + - "8222:8222" + - "4222:4222" + command: "--http_port 8222" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ba6d883 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module gitea.dwysokinski.me/Kichiyaki/nats-playground + +go 1.20 + +require ( + github.com/google/uuid v1.3.0 + github.com/nats-io/nats.go v1.25.0 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/nats-io/nats-server/v2 v2.9.15 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/sys v0.5.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bb4bd29 --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c= +github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE= +github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= +github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/main.go b/main.go new file mode 100644 index 0000000..3f0588d --- /dev/null +++ b/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/google/uuid" + "github.com/nats-io/nats.go" +) + +const subject = "foo" + +func main() { + nc, err := nats.Connect("nats://192.168.39.175:4222") + if err != nil { + log.Fatal(err) + } + defer func() { + nc.Close() + }() + + srv := newServer(nc) + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal(err) + } + }() + + log.Println("Server is listening on the port 8096") + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + id := uuid.NewString() + m := Message{ + ID: id, + Message: fmt.Sprintf("Sending you \"%s\"", id), + } + b, _ := json.Marshal(m) + log.Println("publishing message") + if err := nc.Publish(subject, b); err != nil { + log.Println("publish", err) + } + } + } + }() + defer func() { + wg.Wait() + }() + + <-ctx.Done() + + ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelShutdown() + if err = srv.Shutdown(ctxShutdown); err != nil { + log.Fatalln("shutdown", err) + } +} + +func newServer(nc *nats.Conn) *http.Server { + return &http.Server{ + Addr: ":8096", + Handler: &httpHandler{nc: nc}, + } +} + +type Message struct { + ID string `json:"id"` + Message string `json:"message"` +} + +type httpHandler struct { + nc *nats.Conn +} + +func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet || r.URL.Path != "/" { + http.NotFound(w, r) + return + } + + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } + + // Send the initial headers saying we're gonna stream the response. + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + + ctx := r.Context() + + ch := make(chan *nats.Msg) + defer func() { + close(ch) + }() + sub, err := h.nc.ChanSubscribe(subject, ch) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer func() { + _ = sub.Unsubscribe() + }() + + for { + select { + case <-ctx.Done(): + log.Println("Client stopped listening") + return + case msg := <-ch: + log.Println("message") + _, _ = w.Write(msg.Data) + flusher.Flush() + _ = msg.Ack() + } + } +}