improve handler.vacuumDatabase, handler.updateServersStats, handler.updateHistory performance

This commit is contained in:
Dawid Wysokiński 2020-10-10 09:38:46 +02:00
parent 1fd2d805f9
commit 8bec38dae5
2 changed files with 48 additions and 35 deletions

View File

@ -40,7 +40,7 @@ func Attach(c *cron.Cron, cfg Config) error {
if _, err := c.AddFunc("0 * * * *", h.updateServersData); err != nil {
return err
}
if _, err := c.AddFunc("30 0 * * *", h.updateServerHistory); err != nil {
if _, err := c.AddFunc("30 0 * * *", h.updateHistory); err != nil {
return err
}
if _, err := c.AddFunc("30 1 * * *", h.vacuumDatabase); err != nil {
@ -52,7 +52,7 @@ func Attach(c *cron.Cron, cfg Config) error {
go func() {
h.updateServersData()
h.vacuumDatabase()
h.updateServerHistory()
h.updateHistory()
h.updateStats()
}()
@ -244,7 +244,7 @@ func (h *handler) updateServersData() {
}
}
func (h *handler) updateServerHistory() {
func (h *handler) updateHistory() {
servers := []*models.Server{}
now := time.Now()
t1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 30, 0, 0, time.UTC)
@ -253,34 +253,30 @@ func (h *handler) updateServerHistory() {
Where("status = ? AND (history_updated_at < ? OR history_updated_at IS NULL)", models.ServerStatusOpen, t1).
Select()
if err != nil {
log.Println(errors.Wrap(err, "updateServerHistory"))
log.Println(errors.Wrap(err, "updateHistory"))
return
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
p := newPool(runtime.NumCPU())
defer p.close()
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &updateServerHistoryWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
count++
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *updateServerHistoryWorker) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: updating history", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: history updated", server.Key)
}
log.Printf("%s: history updated", server.Key)
}(server, sh)
}
@ -298,29 +294,25 @@ func (h *handler) updateServersStats(t time.Time) error {
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
p := newPool(runtime.NumCPU())
defer p.close()
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &updateServerStatsWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
count++
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *updateServerStatsWorker) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: updating stats", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: stats updated", server.Key)
}
log.Printf("%s: stats updated", server.Key)
}(server, sh)
}
@ -333,7 +325,6 @@ func (h *handler) updateStats() {
t1 := time.Date(now.Year(), now.Month(), now.Day(), 1, 30, 0, 0, time.UTC)
if err := h.updateServersStats(t1); err != nil {
log.Println(err)
return
}
}
@ -343,34 +334,30 @@ func (h *handler) vacuumDatabase() {
Model(&servers).
Select()
if err != nil {
log.Fatal(errors.Wrap(err, "vacuumDatabase"))
log.Println(errors.Wrap(err, "vacuumDatabase"))
return
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
p := newPool(runtime.NumCPU())
defer p.close()
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &vacuumServerDBWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
}
count++
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *vacuumServerDBWorker) {
go func(server *models.Server, sh *vacuumServerDBWorker, p *pool) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: vacuuming database", server.Key)
if err := sh.vacuum(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: database vacuumed", server.Key)
}
}(server, sh)
log.Printf("%s: database vacuumed", server.Key)
}(server, sh, p)
}
wg.Wait()

26
cron/pool.go Normal file
View File

@ -0,0 +1,26 @@
package cron
type pool struct {
workers chan bool
}
func newPool(capacity int) *pool {
p := &pool{}
p.workers = make(chan bool, capacity)
for i := 0; i < capacity; i++ {
p.releaseWorker()
}
return p
}
func (p *pool) releaseWorker() {
p.workers <- true
}
func (p *pool) waitForWorker() bool {
return <-p.workers
}
func (p *pool) close() {
close(p.workers)
}