This commit is contained in:
Dawid Wysokiński 2023-04-11 08:08:02 +02:00
commit 9433d58828
Signed by: Kichiyaki
GPG Key ID: B5445E357FB8B892
5 changed files with 193 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.idea

8
docker-compose.yml Normal file
View File

@ -0,0 +1,8 @@
version: "3.6"
services:
nats:
image: nats:2.9.15
ports:
- "8222:8222"
- "4222:4222"
command: "--http_port 8222"

18
go.mod Normal file
View File

@ -0,0 +1,18 @@
module gitea.dwysokinski.me/Kichiyaki/nats-playground
go 1.20
require (
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.25.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/nats-io/nats-server/v2 v2.9.15 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

27
go.sum Normal file
View File

@ -0,0 +1,27 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c=
github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

139
main.go Normal file
View File

@ -0,0 +1,139 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
const subject = "foo"
func main() {
nc, err := nats.Connect("nats://192.168.39.175:4222")
if err != nil {
log.Fatal(err)
}
defer func() {
nc.Close()
}()
srv := newServer(nc)
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
log.Println("Server is listening on the port 8096")
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
id := uuid.NewString()
m := Message{
ID: id,
Message: fmt.Sprintf("Sending you \"%s\"", id),
}
b, _ := json.Marshal(m)
log.Println("publishing message")
if err := nc.Publish(subject, b); err != nil {
log.Println("publish", err)
}
}
}
}()
defer func() {
wg.Wait()
}()
<-ctx.Done()
ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelShutdown()
if err = srv.Shutdown(ctxShutdown); err != nil {
log.Fatalln("shutdown", err)
}
}
func newServer(nc *nats.Conn) *http.Server {
return &http.Server{
Addr: ":8096",
Handler: &httpHandler{nc: nc},
}
}
type Message struct {
ID string `json:"id"`
Message string `json:"message"`
}
type httpHandler struct {
nc *nats.Conn
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/" {
http.NotFound(w, r)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
ctx := r.Context()
ch := make(chan *nats.Msg)
defer func() {
close(ch)
}()
sub, err := h.nc.ChanSubscribe(subject, ch)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
_ = sub.Unsubscribe()
}()
for {
select {
case <-ctx.Done():
log.Println("Client stopped listening")
return
case msg := <-ch:
log.Println("message")
_, _ = w.Write(msg.Data)
flusher.Flush()
_ = msg.Ack()
}
}
}