From a4da69e08520c7dcfb0a2b1fd2b0b713df019f8b Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Fri, 15 Jan 2021 18:25:13 +0100 Subject: [PATCH] - update github.com/tribalwarshelp/shared - add a new worker - updateServerEnnoblementsWorker - delete the updateServerDataWorker.loadEnnoblements method --- cron/cron.go | 4 ++ cron/handler.go | 42 +++++++++++++ cron/update_server_data_worker.go | 76 ++++++++--------------- cron/update_server_ennoblements_worker.go | 44 +++++++++++++ cron/update_server_history_worker.go | 16 ++--- cron/update_server_stats_worker.go | 28 ++++----- cron/vacuum_server_db_worker.go | 8 +-- go.mod | 2 +- go.sum | 4 +- main.go | 3 +- 10 files changed, 146 insertions(+), 81 deletions(-) create mode 100644 cron/update_server_ennoblements_worker.go diff --git a/cron/cron.go b/cron/cron.go index d0147eb..d9f8023 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -36,6 +36,7 @@ func Attach(c *cron.Cron, cfg Config) error { updateServerData := utils.TrackExecutionTime(log, h.updateServerData, "updateServerData") vacuumDatabase := utils.TrackExecutionTime(log, h.vacuumDatabase, "vacuumDatabase") + updateServerEnnoblements := utils.TrackExecutionTime(log, h.updateServerEnnoblements, "updateServerEnnoblements") updateHistoryFuncs := []func(){} updateStatsFuncs := []func(){} for _, version := range versions { @@ -62,6 +63,9 @@ func Attach(c *cron.Cron, cfg Config) error { if _, err := c.AddFunc("20 1 * * *", vacuumDatabase); err != nil { return err } + if _, err := c.AddFunc("@every 1m", updateServerEnnoblements); err != nil { + return err + } if cfg.RunOnStartup { go func() { updateServerData() diff --git a/cron/handler.go b/cron/handler.go index fe91083..be898a6 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -266,6 +266,9 @@ func (h *handler) updateServerData() { server: server, dataloader: dataloader.New(&dataloader.Config{ BaseURL: url, + Client: &http.Client{ + Timeout: 10 * time.Second, + }, }), } go func(worker *updateServerDataWorker, server *models.Server, url string, log *logrus.Entry) { @@ -286,6 +289,45 @@ func (h *handler) updateServerData() { wg.Wait() } +func (h *handler) updateServerEnnoblements() { + servers := []*models.Server{} + if err := h.db.Model(&servers).Relation("Version").Where("status = ?", models.ServerStatusOpen).Select(); err != nil { + log.Error(errors.Wrap(err, "updateServerEnnoblements: cannot load ennoblements")) + } + log. + WithField("numberOfServers", len(servers)). + Info("updateServerEnnoblements: servers loaded") + var wg sync.WaitGroup + + for _, server := range servers { + h.pool.waitForWorker() + wg.Add(1) + sh := &updateServerEnnoblementsWorker{ + db: h.db.WithParam("SERVER", pg.Safe(server.Key)), + server: server, + dataloader: dataloader.New(&dataloader.Config{ + BaseURL: fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), + }), + } + go func(worker *updateServerEnnoblementsWorker, server *models.Server) { + defer func() { + h.pool.releaseWorker() + wg.Done() + }() + log := log.WithField("serverKey", server.Key) + log.Infof("updateServerEnnoblements: %s: updating ennoblements", server.Key) + err := sh.update() + if err != nil { + log.Errorln("updateServerEnnoblements:", errors.Wrap(err, server.Key)) + return + } + log.Infof("updateServerEnnoblements: %s: ennoblements updated", server.Key) + }(sh, server) + } + + wg.Wait() +} + func (h *handler) updateHistory(location *time.Location) { servers := []*models.Server{} log := log.WithField("timezone", location.String()) diff --git a/cron/update_server_data_worker.go b/cron/update_server_data_worker.go index a100f92..a759411 100644 --- a/cron/update_server_data_worker.go +++ b/cron/update_server_data_worker.go @@ -17,14 +17,14 @@ type updateServerDataWorker struct { server *models.Server } -func (h *updateServerDataWorker) loadPlayers(od map[int]*models.OpponentsDefeated) ([]*models.Player, error) { +func (w *updateServerDataWorker) loadPlayers(od map[int]*models.OpponentsDefeated) ([]*models.Player, error) { ennoblements := []*models.Ennoblement{} - err := h.db.Model(&ennoblements).DistinctOn("new_owner_id").Order("new_owner_id ASC", "ennobled_at ASC").Select() + err := w.db.Model(&ennoblements).DistinctOn("new_owner_id").Order("new_owner_id ASC", "ennobled_at ASC").Select() if err != nil { return nil, errors.Wrap(err, "loadPlayers: cannot load ennoblements") } - players, err := h.dataloader.LoadPlayers() + players, err := w.dataloader.LoadPlayers() if err != nil { return nil, err } @@ -46,8 +46,8 @@ func (h *updateServerDataWorker) loadPlayers(od map[int]*models.OpponentsDefeate return players, nil } -func (h *updateServerDataWorker) loadTribes(od map[int]*models.OpponentsDefeated, numberOfVillages int) ([]*models.Tribe, error) { - tribes, err := h.dataloader.LoadTribes() +func (w *updateServerDataWorker) loadTribes(od map[int]*models.OpponentsDefeated, numberOfVillages int) ([]*models.Tribe, error) { + tribes, err := w.dataloader.LoadTribes() if err != nil { return nil, err } @@ -65,22 +65,7 @@ func (h *updateServerDataWorker) loadTribes(od map[int]*models.OpponentsDefeated return tribes, nil } -func (h *updateServerDataWorker) loadEnnoblements() ([]*models.Ennoblement, error) { - lastEnnoblement := &models.Ennoblement{} - if err := h.db. - Model(lastEnnoblement). - Limit(1). - Order("ennobled_at DESC"). - Select(); err != nil && err != pg.ErrNoRows { - return nil, errors.Wrapf(err, "cannot load last ennoblement") - } - - return h.dataloader.LoadEnnoblements(&dataloader.LoadEnnoblementsConfig{ - EnnobledAtGTE: lastEnnoblement.EnnobledAt, - }) -} - -func (h *updateServerDataWorker) calculateODifference(od1 models.OpponentsDefeated, od2 models.OpponentsDefeated) models.OpponentsDefeated { +func (w *updateServerDataWorker) calculateODifference(od1 models.OpponentsDefeated, od2 models.OpponentsDefeated) models.OpponentsDefeated { return models.OpponentsDefeated{ RankAtt: (od1.RankAtt - od2.RankAtt) * -1, ScoreAtt: od1.ScoreAtt - od2.ScoreAtt, @@ -93,7 +78,7 @@ func (h *updateServerDataWorker) calculateODifference(od1 models.OpponentsDefeat } } -func (h *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Tribe, +func (w *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Tribe, history []*models.TribeHistory) []*models.DailyTribeStats { todaysStats := []*models.DailyTribeStats{} searchableTribes := makeTribesSearchable(tribes) @@ -110,7 +95,7 @@ func (h *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Trib Rank: (tribe.Rank - historyRecord.Rank) * -1, Dominance: tribe.Dominance - historyRecord.Dominance, CreateDate: historyRecord.CreateDate, - OpponentsDefeated: h.calculateODifference(tribe.OpponentsDefeated, historyRecord.OpponentsDefeated), + OpponentsDefeated: w.calculateODifference(tribe.OpponentsDefeated, historyRecord.OpponentsDefeated), }) } } @@ -118,7 +103,7 @@ func (h *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Trib return todaysStats } -func (h *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Player, +func (w *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Player, history []*models.PlayerHistory) []*models.DailyPlayerStats { todaysStats := []*models.DailyPlayerStats{} searchablePlayers := makePlayersSearchable(players) @@ -132,7 +117,7 @@ func (h *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Pla Points: player.Points - historyRecord.Points, Rank: (player.Rank - historyRecord.Rank) * -1, CreateDate: historyRecord.CreateDate, - OpponentsDefeated: h.calculateODifference(player.OpponentsDefeated, historyRecord.OpponentsDefeated), + OpponentsDefeated: w.calculateODifference(player.OpponentsDefeated, historyRecord.OpponentsDefeated), }) } } @@ -140,53 +125,48 @@ func (h *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Pla return todaysStats } -func (h *updateServerDataWorker) update() error { - pod, err := h.dataloader.LoadOD(false) +func (w *updateServerDataWorker) update() error { + pod, err := w.dataloader.LoadOD(false) if err != nil { return err } - tod, err := h.dataloader.LoadOD(true) + tod, err := w.dataloader.LoadOD(true) if err != nil { return err } - ennoblements, err := h.loadEnnoblements() - if err != nil { - return err - } - - villages, err := h.dataloader.LoadVillages() + villages, err := w.dataloader.LoadVillages() if err != nil { return err } numberOfVillages := len(villages) - tribes, err := h.loadTribes(tod, countPlayerVillages(villages)) + tribes, err := w.loadTribes(tod, countPlayerVillages(villages)) if err != nil { return err } numberOfTribes := len(tribes) - players, err := h.loadPlayers(pod) + players, err := w.loadPlayers(pod) if err != nil { return err } numberOfPlayers := len(players) - cfg, err := h.dataloader.GetConfig() + cfg, err := w.dataloader.GetConfig() if err != nil { return err } - buildingCfg, err := h.dataloader.GetBuildingConfig() + buildingCfg, err := w.dataloader.GetBuildingConfig() if err != nil { return err } - unitCfg, err := h.dataloader.GetUnitConfig() + unitCfg, err := w.dataloader.GetUnitConfig() if err != nil { return err } - tx, err := h.db.Begin() + tx, err := w.db.Begin() if err != nil { return err } @@ -221,7 +201,7 @@ func (h *updateServerDataWorker) update() error { } tribesHistory := []*models.TribeHistory{} - if err := h.db.Model(&tribesHistory). + if err := w.db.Model(&tribesHistory). DistinctOn("tribe_id"). Column("*"). Where("tribe_id = ANY (?)", pg.Array(ids)). @@ -229,7 +209,7 @@ func (h *updateServerDataWorker) update() error { Select(); err != nil && err != pg.ErrNoRows { return errors.Wrap(err, "cannot select tribe history records") } - todaysTribeStats := h.calculateTodaysTribeStats(tribes, tribesHistory) + todaysTribeStats := w.calculateTodaysTribeStats(tribes, tribesHistory) if len(todaysTribeStats) > 0 { if _, err := tx. Model(&todaysTribeStats). @@ -246,6 +226,7 @@ func (h *updateServerDataWorker) update() error { } } } + if len(players) > 0 { ids := []int{} for _, player := range players { @@ -273,14 +254,14 @@ func (h *updateServerDataWorker) update() error { } playerHistory := []*models.PlayerHistory{} - if err := h.db.Model(&playerHistory). + if err := w.db.Model(&playerHistory). DistinctOn("player_id"). Column("*"). Where("player_id = ANY (?)", pg.Array(ids)). Order("player_id DESC", "create_date DESC").Select(); err != nil && err != pg.ErrNoRows { return errors.Wrap(err, "cannot select player history records") } - todaysPlayerStats := h.calculateDailyPlayerStats(players, playerHistory) + todaysPlayerStats := w.calculateDailyPlayerStats(players, playerHistory) if len(todaysPlayerStats) > 0 { if _, err := tx. Model(&todaysPlayerStats). @@ -313,13 +294,8 @@ func (h *updateServerDataWorker) update() error { return errors.Wrap(err, "cannot insert villages") } } - if len(ennoblements) > 0 { - if _, err := tx.Model(&ennoblements).Insert(); err != nil { - return errors.Wrap(err, "cannot insert ennoblements") - } - } - if _, err := tx.Model(h.server). + if _, err := tx.Model(w.server). Set("data_updated_at = ?", time.Now()). Set("unit_config = ?", unitCfg). Set("building_config = ?", buildingCfg). diff --git a/cron/update_server_ennoblements_worker.go b/cron/update_server_ennoblements_worker.go new file mode 100644 index 0000000..32984da --- /dev/null +++ b/cron/update_server_ennoblements_worker.go @@ -0,0 +1,44 @@ +package cron + +import ( + "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/tribalwarshelp/shared/models" + "github.com/tribalwarshelp/shared/tw/dataloader" +) + +type updateServerEnnoblementsWorker struct { + db *pg.DB + dataloader dataloader.DataLoader + server *models.Server +} + +func (w *updateServerEnnoblementsWorker) loadEnnoblements() ([]*models.Ennoblement, error) { + lastEnnoblement := &models.Ennoblement{} + if err := w.db. + Model(lastEnnoblement). + Limit(1). + Order("ennobled_at DESC"). + Select(); err != nil && err != pg.ErrNoRows { + return nil, errors.Wrapf(err, "cannot load last ennoblement") + } + + return w.dataloader.LoadEnnoblements(&dataloader.LoadEnnoblementsConfig{ + EnnobledAtGT: lastEnnoblement.EnnobledAt, + }) +} + +func (w *updateServerEnnoblementsWorker) update() error { + ennoblements, err := w.loadEnnoblements() + if err != nil { + return err + } + + if len(ennoblements) > 0 { + if _, err := w.db.Model(&ennoblements).Insert(); err != nil { + return errors.Wrap(err, "cannot insert ennoblements") + } + } + + return nil +} diff --git a/cron/update_server_history_worker.go b/cron/update_server_history_worker.go index 77f5c67..9bb9711 100644 --- a/cron/update_server_history_worker.go +++ b/cron/update_server_history_worker.go @@ -14,13 +14,13 @@ type updateServerHistoryWorker struct { location *time.Location } -func (h *updateServerHistoryWorker) update() error { +func (w *updateServerHistoryWorker) update() error { players := []*models.Player{} - if err := h.db.Model(&players).Where("exists = true").Select(); err != nil { + if err := w.db.Model(&players).Where("exists = true").Select(); err != nil { return errors.Wrap(err, "cannot load players") } - now := time.Now().In(h.location) + now := time.Now().In(w.location) createDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) ph := []*models.PlayerHistory{} for _, player := range players { @@ -36,7 +36,7 @@ func (h *updateServerHistoryWorker) update() error { } tribes := []*models.Tribe{} - if err := h.db.Model(&tribes).Where("exists = true").Select(); err != nil { + if err := w.db.Model(&tribes).Where("exists = true").Select(); err != nil { return errors.Wrap(err, "cannot load tribes") } th := []*models.TribeHistory{} @@ -54,25 +54,25 @@ func (h *updateServerHistoryWorker) update() error { }) } - tx, err := h.db.Begin() + tx, err := w.db.Begin() if err != nil { return err } defer tx.Close() if len(ph) > 0 { - if _, err := h.db.Model(&ph).Insert(); err != nil { + if _, err := w.db.Model(&ph).Insert(); err != nil { return errors.Wrap(err, "cannot insert players history") } } if len(th) > 0 { - if _, err := h.db.Model(&th).Insert(); err != nil { + if _, err := w.db.Model(&th).Insert(); err != nil { return errors.Wrap(err, "cannot insert tribes history") } } - if _, err := tx.Model(h.server). + if _, err := tx.Model(w.server). Set("history_updated_at = ?", time.Now()). WherePK(). Returning("*"). diff --git a/cron/update_server_stats_worker.go b/cron/update_server_stats_worker.go index 59543b0..c97e4aa 100644 --- a/cron/update_server_stats_worker.go +++ b/cron/update_server_stats_worker.go @@ -14,45 +14,45 @@ type updateServerStatsWorker struct { location *time.Location } -func (h *updateServerStatsWorker) prepare() (*models.ServerStats, error) { - activePlayers, err := h.db.Model(&models.Player{}).Where("exists = true").Count() +func (w *updateServerStatsWorker) prepare() (*models.ServerStats, error) { + activePlayers, err := w.db.Model(&models.Player{}).Where("exists = true").Count() if err != nil { return nil, errors.Wrap(err, "cannot count active players") } - inactivePlayers, err := h.db.Model(&models.Player{}).Where("exists = false").Count() + inactivePlayers, err := w.db.Model(&models.Player{}).Where("exists = false").Count() if err != nil { return nil, errors.Wrap(err, "cannot count inactive players") } players := activePlayers + inactivePlayers - activeTribes, err := h.db.Model(&models.Tribe{}).Where("exists = true").Count() + activeTribes, err := w.db.Model(&models.Tribe{}).Where("exists = true").Count() if err != nil { return nil, errors.Wrap(err, "cannot count active tribes") } - inactiveTribes, err := h.db.Model(&models.Tribe{}).Where("exists = false").Count() + inactiveTribes, err := w.db.Model(&models.Tribe{}).Where("exists = false").Count() if err != nil { return nil, errors.Wrap(err, "cannot count inactive tribes") } tribes := activeTribes + inactiveTribes - barbarianVillages, err := h.db.Model(&models.Village{}).Where("player_id = 0").Count() + barbarianVillages, err := w.db.Model(&models.Village{}).Where("player_id = 0").Count() if err != nil { return nil, errors.Wrap(err, "cannot count barbarian villages") } - bonusVillages, err := h.db.Model(&models.Village{}).Where("bonus <> 0").Count() + bonusVillages, err := w.db.Model(&models.Village{}).Where("bonus <> 0").Count() if err != nil { return nil, errors.Wrap(err, "cannot count bonus villages") } - playerVillages, err := h.db.Model(&models.Village{}).Where("player_id <> 0").Count() + playerVillages, err := w.db.Model(&models.Village{}).Where("player_id <> 0").Count() if err != nil { return nil, errors.Wrap(err, "cannot count player villages") } - villages, err := h.db.Model(&models.Village{}).Count() + villages, err := w.db.Model(&models.Village{}).Count() if err != nil { return nil, errors.Wrap(err, "cannot count villages") } - now := time.Now().In(h.location) + now := time.Now().In(w.location) createDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) return &models.ServerStats{ ActivePlayers: activePlayers, @@ -71,13 +71,13 @@ func (h *updateServerStatsWorker) prepare() (*models.ServerStats, error) { }, nil } -func (h *updateServerStatsWorker) update() error { - stats, err := h.prepare() +func (w *updateServerStatsWorker) update() error { + stats, err := w.prepare() if err != nil { return err } - tx, err := h.db.Begin() + tx, err := w.db.Begin() if err != nil { return err } @@ -87,7 +87,7 @@ func (h *updateServerStatsWorker) update() error { return errors.Wrap(err, "cannot insert server stats") } - _, err = tx.Model(h.server). + _, err = tx.Model(w.server). Set("stats_updated_at = ?", time.Now()). WherePK(). Returning("*"). diff --git a/cron/vacuum_server_db_worker.go b/cron/vacuum_server_db_worker.go index 472f6b3..07588ab 100644 --- a/cron/vacuum_server_db_worker.go +++ b/cron/vacuum_server_db_worker.go @@ -16,15 +16,15 @@ type vacuumServerDBWorker struct { db *pg.DB } -func (h *vacuumServerDBWorker) vacuum() error { - tx, err := h.db.Begin() +func (w *vacuumServerDBWorker) vacuum() error { + tx, err := w.db.Begin() if err != nil { return err } defer tx.Close() - withNonExistentPlayers := h.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'") - withNonExistentTribes := h.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'") + withNonExistentPlayers := w.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'") + withNonExistentTribes := w.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'") _, err = tx.Model(&models.PlayerHistory{}). With("players", withNonExistentPlayers). diff --git a/go.mod b/go.mod index d5d4c8b..44e292d 100644 --- a/go.mod +++ b/go.mod @@ -10,5 +10,5 @@ require ( github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.7.0 - github.com/tribalwarshelp/shared v0.0.0-20210113192528-c747617519f2 + github.com/tribalwarshelp/shared v0.0.0-20210115163415-972e2df3f7db ) diff --git a/go.sum b/go.sum index bbbe2fd..4beb8ad 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= -github.com/tribalwarshelp/shared v0.0.0-20210113192528-c747617519f2 h1:qBJGQiXhhKox/E4Bt+8uYhsOic9QcBLNMYO7LekCudg= -github.com/tribalwarshelp/shared v0.0.0-20210113192528-c747617519f2/go.mod h1:vtDs7EjEysk4dpFPfu3c4lkICXZYIRV0BrT7rsqG0dw= +github.com/tribalwarshelp/shared v0.0.0-20210115163415-972e2df3f7db h1:RzMEF4BP3YJ7jJOyYO00TdeB6DY7g0Dp/8cQ8t6WKzw= +github.com/tribalwarshelp/shared v0.0.0-20210115163415-972e2df3f7db/go.mod h1:vtDs7EjEysk4dpFPfu3c4lkICXZYIRV0BrT7rsqG0dw= github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94= github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ= github.com/vmihailenco/msgpack/v4 v4.3.11 h1:Q47CePddpNGNhk4GCnAx9DDtASi2rasatE0cd26cZoE= diff --git a/main.go b/main.go index b3d6f3e..4fc7c48 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "log" "os" "os/signal" "strconv" @@ -59,7 +58,7 @@ func main() { logrus.WithFields(dbFields).Info("Connected to the database") c := cron.New(cron.WithChain( - cron.SkipIfStillRunning(cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))), + cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))), )) if err := _cron.Attach(c, _cron.Config{ DB: db,