add taskVacuumServerDB
This commit is contained in:
parent
f058facc32
commit
1598b833bb
|
@ -70,7 +70,7 @@ func Attach(c *cron.Cron, cfg Config) error {
|
|||
if cfg.RunOnStartup {
|
||||
go func() {
|
||||
//h.updateServerData()
|
||||
//vacuumDatabase()
|
||||
h.vacuumDatabase()
|
||||
//for _, fn := range updateHistoryFuncs {
|
||||
// go fn()
|
||||
//}
|
||||
|
|
|
@ -178,37 +178,5 @@ func (h *handler) updateStats(location *time.Location) {
|
|||
}
|
||||
|
||||
func (h *handler) vacuumDatabase() {
|
||||
servers := []*models.Server{}
|
||||
err := h.db.
|
||||
Model(&servers).
|
||||
Select()
|
||||
if err != nil {
|
||||
log.Errorln(errors.Wrap(err, "vacuumDatabase"))
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, server := range servers {
|
||||
h.pool.waitForWorker()
|
||||
wg.Add(1)
|
||||
worker := &vacuumServerDBWorker{
|
||||
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||
}
|
||||
go func(server *models.Server, worker *vacuumServerDBWorker) {
|
||||
defer func() {
|
||||
h.pool.releaseWorker()
|
||||
wg.Done()
|
||||
}()
|
||||
log := log.WithField("serverKey", server.Key)
|
||||
log.Infof("vacuumDatabase: %s: vacuuming database", server.Key)
|
||||
if err := worker.vacuum(); err != nil {
|
||||
log.Errorln("vacuumDatabase:", errors.Wrap(err, server.Key))
|
||||
return
|
||||
}
|
||||
log.Infof("vacuumDatabase: %s: database vacuumed", server.Key)
|
||||
}(server, worker)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
|
||||
}
|
||||
|
|
|
@ -54,6 +54,11 @@ func RegisterTasks(cfg *Config) error {
|
|||
RetryLimit: defaultRetryLimit,
|
||||
Handler: (&taskVacuum{t}).execute,
|
||||
})
|
||||
taskq.RegisterTask(&taskq.TaskOptions{
|
||||
Name: TaskNameVacuumServerDB,
|
||||
RetryLimit: defaultRetryLimit,
|
||||
Handler: (&taskVacuumServerDB{t}).execute,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Reference in New Issue
Block a user