nats-playground/main.go

140 lines
2.5 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
const subject = "foo"
func main() {
nc, err := nats.Connect("nats://192.168.39.175:4222")
if err != nil {
log.Fatal(err)
}
defer func() {
nc.Close()
}()
srv := newServer(nc)
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
log.Println("Server is listening on the port 8096")
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
id := uuid.NewString()
m := Message{
ID: id,
Message: fmt.Sprintf("Sending you \"%s\"", id),
}
b, _ := json.Marshal(m)
log.Println("publishing message")
if err := nc.Publish(subject, b); err != nil {
log.Println("publish", err)
}
}
}
}()
defer func() {
wg.Wait()
}()
<-ctx.Done()
ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelShutdown()
if err = srv.Shutdown(ctxShutdown); err != nil {
log.Fatalln("shutdown", err)
}
}
func newServer(nc *nats.Conn) *http.Server {
return &http.Server{
Addr: ":8096",
Handler: &httpHandler{nc: nc},
}
}
type Message struct {
ID string `json:"id"`
Message string `json:"message"`
}
type httpHandler struct {
nc *nats.Conn
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/" {
http.NotFound(w, r)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
ctx := r.Context()
ch := make(chan *nats.Msg)
defer func() {
close(ch)
}()
sub, err := h.nc.ChanSubscribe(subject, ch)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
_ = sub.Unsubscribe()
}()
for {
select {
case <-ctx.Done():
log.Println("Client stopped listening")
return
case msg := <-ch:
log.Println("message")
_, _ = w.Write(msg.Data)
flusher.Flush()
_ = msg.Ack()
}
}
}