From 6994607e0c6600c7be8a2b744ea1ff6b6688afab Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 25 Apr 2021 19:06:17 +0200 Subject: [PATCH] add taskUpdateEnnoblements --- cron/cron.go | 4 +- cron/handler.go | 42 ++--------------- cron/tasks/task_update_ennoblements.go | 38 ++++++++++++++++ .../task_vacuum_server_db.go} | 45 ++++++++++++++++--- cron/tasks/tasks.go | 11 ++++- 5 files changed, 90 insertions(+), 50 deletions(-) create mode 100644 cron/tasks/task_update_ennoblements.go rename cron/{vacuum_server_db_worker.go => tasks/task_vacuum_server_db.go} (58%) diff --git a/cron/cron.go b/cron/cron.go index c3e3ec5..ee4d901 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -64,13 +64,13 @@ func Attach(c *cron.Cron, cfg Config) error { if _, err := c.AddFunc("20 1 * * *", h.vacuumDatabase); err != nil { return err } - if _, err := c.AddFunc("@every 1m", h.updateServerEnnoblements); err != nil { + if _, err := c.AddFunc("@every 1m", h.updateEnnoblements); err != nil { return err } if cfg.RunOnStartup { go func() { //h.updateServerData() - h.vacuumDatabase() + //h.vacuumDatabase() //for _, fn := range updateHistoryFuncs { // go fn() //} diff --git a/cron/handler.go b/cron/handler.go index e7789d2..0025070 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -2,16 +2,13 @@ package cron import ( "context" - "fmt" "runtime" "sync" "time" - "github.com/tribalwarshelp/shared/models" - "github.com/tribalwarshelp/shared/tw/dataloader" - "github.com/go-pg/pg/v10" "github.com/pkg/errors" + "github.com/tribalwarshelp/shared/models" "github.com/tribalwarshelp/cron/cron/queue" "github.com/tribalwarshelp/cron/cron/tasks" @@ -40,41 +37,8 @@ func (h *handler) updateServerData() { h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) } -func (h *handler) updateServerEnnoblements() { - servers := []*models.Server{} - if err := h.db.Model(&servers).Relation("Version").Where("status = ?", models.ServerStatusOpen).Select(); err != nil { - log.Error(errors.Wrap(err, "updateServerEnnoblements: cannot load ennoblements")) - } - log. - WithField("numberOfServers", len(servers)). - Info("updateServerEnnoblements: servers loaded") - - var wg sync.WaitGroup - pool := newPool(h.maxConcurrentWorkers) - for _, server := range servers { - pool.waitForWorker() - wg.Add(1) - sh := &updateServerEnnoblementsWorker{ - db: h.db.WithParam("SERVER", pg.Safe(server.Key)), - server: server, - dataloader: dataloader.New(&dataloader.Config{ - BaseURL: fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), - }), - } - go func(worker *updateServerEnnoblementsWorker, server *models.Server) { - defer func() { - pool.releaseWorker() - wg.Done() - }() - log := log.WithField("serverKey", server.Key) - err := sh.update() - if err != nil { - log.Errorln("updateServerEnnoblements:", errors.Wrap(err, server.Key)) - return - } - }(sh, server) - } - wg.Wait() +func (h *handler) updateEnnoblements() { + h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) } func (h *handler) updateHistory(location *time.Location) { diff --git a/cron/tasks/task_update_ennoblements.go b/cron/tasks/task_update_ennoblements.go new file mode 100644 index 0000000..4f212a6 --- /dev/null +++ b/cron/tasks/task_update_ennoblements.go @@ -0,0 +1,38 @@ +package tasks + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/tribalwarshelp/shared/models" + + "github.com/tribalwarshelp/cron/cron/queue" +) + +type taskUpdateEnnoblements struct { + *task +} + +func (t *taskUpdateEnnoblements) execute() error { + var servers []*models.Server + err := t.db. + Model(&servers). + Relation("Version"). + Where("status = ?", models.ServerStatusOpen). + Select() + if err != nil { + err = errors.Wrap(err, "taskUpdateEnnoblements.execute") + log.Errorln(err) + return err + } + log.Debug("Updating ennoblements...") + for _, server := range servers { + s := server + t.queue.Add( + queue.MainQueue, + Get(TaskNameVacuumServerDB). + WithArgs(context.Background(), fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), s), + ) + } + return nil +} diff --git a/cron/vacuum_server_db_worker.go b/cron/tasks/task_vacuum_server_db.go similarity index 58% rename from cron/vacuum_server_db_worker.go rename to cron/tasks/task_vacuum_server_db.go index 07588ab..dbef830 100644 --- a/cron/vacuum_server_db_worker.go +++ b/cron/tasks/task_vacuum_server_db.go @@ -1,17 +1,48 @@ -package cron +package tasks import ( - "time" - "github.com/go-pg/pg/v10" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/models" + "time" ) const ( day = 24 * time.Hour ) +type taskVacuumServerDB struct { + *task +} + +func (t *taskVacuumServerDB) execute(server *models.Server) error { + if err := t.validatePayload(server); err != nil { + log.Debug(err) + return nil + } + entry := log.WithField("key", server.Key) + entry.Infof("%s: vacumming the database...", server.Key) + err := (&vacuumServerDBWorker{ + db: t.db.WithParam("SERVER", pg.Safe(server.Key)), + }).vacuum() + if err != nil { + err = errors.Wrap(err, "taskVacuumServerDB.execute") + entry.Error(err) + return err + } + entry.Infof("%s: the database has been vacummed", server.Key) + + return nil +} + +func (t *taskVacuumServerDB) validatePayload(server *models.Server) error { + if server == nil { + return errors.Errorf("taskVacuumServerDB.validatePayload: Expected *models.Server, got nil") + } + + return nil +} + type vacuumServerDBWorker struct { db *pg.DB } @@ -31,7 +62,7 @@ func (w *vacuumServerDBWorker) vacuum() error { Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*180)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old player history records") + return errors.Wrap(err, "couldn't delete the old player history records") } _, err = tx.Model(&models.TribeHistory{}). @@ -39,7 +70,7 @@ func (w *vacuumServerDBWorker) vacuum() error { Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*180)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old tribe history records") + return errors.Wrap(err, "couldn't delete the old tribe history records") } _, err = tx.Model(&models.DailyPlayerStats{}). @@ -47,7 +78,7 @@ func (w *vacuumServerDBWorker) vacuum() error { Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*180)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old player stats records") + return errors.Wrap(err, "couldn't delete the old player stats records") } _, err = tx.Model(&models.DailyTribeStats{}). @@ -55,7 +86,7 @@ func (w *vacuumServerDBWorker) vacuum() error { Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*180)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old tribe stats records") + return errors.Wrap(err, "couldn't delete the old tribe stats records") } return tx.Commit() diff --git a/cron/tasks/tasks.go b/cron/tasks/tasks.go index fbe0262..17783c5 100644 --- a/cron/tasks/tasks.go +++ b/cron/tasks/tasks.go @@ -10,11 +10,13 @@ import ( ) const ( - TaskNameLoadVersionsAndUpdateServerData = "loadVersions" - TaskNameLoadServersAndUpdateData = "loadServers" + TaskNameLoadVersionsAndUpdateServerData = "loadVersionsAndUpdateServerData" + TaskNameLoadServersAndUpdateData = "loadServersAndUpdateData" TaskNameUpdateServerData = "updateServerData" TaskNameVacuum = "vacuum" TaskNameVacuumServerDB = "vacuumServerDB" + TaskUpdateEnnoblements = "updateEnnoblements" + TaskServerUpdateEnnoblements = "serverUpdateEnnoblements" defaultRetryLimit = 3 ) @@ -59,6 +61,11 @@ func RegisterTasks(cfg *Config) error { RetryLimit: defaultRetryLimit, Handler: (&taskVacuumServerDB{t}).execute, }) + taskq.RegisterTask(&taskq.TaskOptions{ + Name: TaskUpdateEnnoblements, + RetryLimit: defaultRetryLimit, + Handler: (&taskUpdateEnnoblements{t}).execute, + }) return nil }