From 8bec38dae5e4b9df407f93d4bafaa01758d6dfea Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sat, 10 Oct 2020 09:38:46 +0200 Subject: [PATCH] improve handler.vacuumDatabase, handler.updateServersStats, handler.updateHistory performance --- cron/handler.go | 57 +++++++++++++++++++------------------------------ cron/pool.go | 26 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 35 deletions(-) create mode 100644 cron/pool.go diff --git a/cron/handler.go b/cron/handler.go index da2dcf2..67bde8d 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -40,7 +40,7 @@ func Attach(c *cron.Cron, cfg Config) error { if _, err := c.AddFunc("0 * * * *", h.updateServersData); err != nil { return err } - if _, err := c.AddFunc("30 0 * * *", h.updateServerHistory); err != nil { + if _, err := c.AddFunc("30 0 * * *", h.updateHistory); err != nil { return err } if _, err := c.AddFunc("30 1 * * *", h.vacuumDatabase); err != nil { @@ -52,7 +52,7 @@ func Attach(c *cron.Cron, cfg Config) error { go func() { h.updateServersData() h.vacuumDatabase() - h.updateServerHistory() + h.updateHistory() h.updateStats() }() @@ -244,7 +244,7 @@ func (h *handler) updateServersData() { } } -func (h *handler) updateServerHistory() { +func (h *handler) updateHistory() { servers := []*models.Server{} now := time.Now() t1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 30, 0, 0, time.UTC) @@ -253,34 +253,30 @@ func (h *handler) updateServerHistory() { Where("status = ? AND (history_updated_at < ? OR history_updated_at IS NULL)", models.ServerStatusOpen, t1). Select() if err != nil { - log.Println(errors.Wrap(err, "updateServerHistory")) + log.Println(errors.Wrap(err, "updateHistory")) return } var wg sync.WaitGroup - max := runtime.NumCPU() * 5 - count := 0 + p := newPool(runtime.NumCPU()) + defer p.close() for _, server := range servers { - if count >= max { - wg.Wait() - count = 0 - } sh := &updateServerHistoryWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, } - count++ + p.waitForWorker() wg.Add(1) go func(server *models.Server, sh *updateServerHistoryWorker) { + defer p.releaseWorker() defer wg.Done() log.Printf("%s: updating history", server.Key) if err := sh.update(); err != nil { log.Println(errors.Wrap(err, server.Key)) return - } else { - log.Printf("%s: history updated", server.Key) } + log.Printf("%s: history updated", server.Key) }(server, sh) } @@ -298,29 +294,25 @@ func (h *handler) updateServersStats(t time.Time) error { } var wg sync.WaitGroup - max := runtime.NumCPU() * 5 - count := 0 + p := newPool(runtime.NumCPU()) + defer p.close() for _, server := range servers { - if count >= max { - wg.Wait() - count = 0 - } sh := &updateServerStatsWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, } - count++ + p.waitForWorker() wg.Add(1) go func(server *models.Server, sh *updateServerStatsWorker) { + defer p.releaseWorker() defer wg.Done() log.Printf("%s: updating stats", server.Key) if err := sh.update(); err != nil { log.Println(errors.Wrap(err, server.Key)) return - } else { - log.Printf("%s: stats updated", server.Key) } + log.Printf("%s: stats updated", server.Key) }(server, sh) } @@ -333,7 +325,6 @@ func (h *handler) updateStats() { t1 := time.Date(now.Year(), now.Month(), now.Day(), 1, 30, 0, 0, time.UTC) if err := h.updateServersStats(t1); err != nil { log.Println(err) - return } } @@ -343,34 +334,30 @@ func (h *handler) vacuumDatabase() { Model(&servers). Select() if err != nil { - log.Fatal(errors.Wrap(err, "vacuumDatabase")) + log.Println(errors.Wrap(err, "vacuumDatabase")) return } var wg sync.WaitGroup - max := runtime.NumCPU() * 5 - count := 0 + p := newPool(runtime.NumCPU()) + defer p.close() for _, server := range servers { - if count >= max { - wg.Wait() - count = 0 - } sh := &vacuumServerDBWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), } - count++ + p.waitForWorker() wg.Add(1) - go func(server *models.Server, sh *vacuumServerDBWorker) { + go func(server *models.Server, sh *vacuumServerDBWorker, p *pool) { + defer p.releaseWorker() defer wg.Done() log.Printf("%s: vacuuming database", server.Key) if err := sh.vacuum(); err != nil { log.Println(errors.Wrap(err, server.Key)) return - } else { - log.Printf("%s: database vacuumed", server.Key) } - }(server, sh) + log.Printf("%s: database vacuumed", server.Key) + }(server, sh, p) } wg.Wait() diff --git a/cron/pool.go b/cron/pool.go new file mode 100644 index 0000000..22068e2 --- /dev/null +++ b/cron/pool.go @@ -0,0 +1,26 @@ +package cron + +type pool struct { + workers chan bool +} + +func newPool(capacity int) *pool { + p := &pool{} + p.workers = make(chan bool, capacity) + for i := 0; i < capacity; i++ { + p.releaseWorker() + } + return p +} + +func (p *pool) releaseWorker() { + p.workers <- true +} + +func (p *pool) waitForWorker() bool { + return <-p.workers +} + +func (p *pool) close() { + close(p.workers) +}