package main import ( "context" "encoding/json" "errors" "fmt" "log" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/google/uuid" "github.com/nats-io/nats.go" ) const subject = "foo" func main() { nc, err := nats.Connect("nats://192.168.39.175:4222") if err != nil { log.Fatal(err) } defer func() { nc.Close() }() srv := newServer(nc) go func() { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatal(err) } }() log.Println("Server is listening on the port 8096") ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: id := uuid.NewString() m := Message{ ID: id, Message: fmt.Sprintf("Sending you \"%s\"", id), } b, _ := json.Marshal(m) log.Println("publishing message") if err := nc.Publish(subject, b); err != nil { log.Println("publish", err) } } } }() defer func() { wg.Wait() }() <-ctx.Done() ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) defer cancelShutdown() if err = srv.Shutdown(ctxShutdown); err != nil { log.Fatalln("shutdown", err) } } func newServer(nc *nats.Conn) *http.Server { return &http.Server{ Addr: ":8096", Handler: &httpHandler{nc: nc}, } } type Message struct { ID string `json:"id"` Message string `json:"message"` } type httpHandler struct { nc *nats.Conn } func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet || r.URL.Path != "/" { http.NotFound(w, r) return } flusher, ok := w.(http.Flusher) if !ok { http.NotFound(w, r) return } // Send the initial headers saying we're gonna stream the response. w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) ctx := r.Context() ch := make(chan *nats.Msg) defer func() { close(ch) }() sub, err := h.nc.ChanSubscribe(subject, ch) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer func() { _ = sub.Unsubscribe() }() for { select { case <-ctx.Done(): log.Println("Client stopped listening") return case msg := <-ch: log.Println("message") _, _ = w.Write(msg.Data) flusher.Flush() _ = msg.Ack() } } }