From ad8cdc692608967f47fbdc75a0dd7965647bd7aa Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sat, 10 Oct 2020 11:45:52 +0200 Subject: [PATCH] delete readme.md, add the option to download data for more than one server at the same time --- README.md | 2 + cron/handler.go | 77 ++++++++++++++++++++----------- cron/update_server_data_worker.go | 4 +- cron/vacuum_server_db_worker.go | 32 +++++++------ main.go | 16 ++++++- readme.md | 9 ---- 6 files changed, 86 insertions(+), 54 deletions(-) delete mode 100644 readme.md diff --git a/README.md b/README.md index 1f4b9b4..4b6beb5 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ DB_NAME=your_db_name DB_PORT=5432 DB_HOST=your_db_host DB_PASSWORD=your_db_pass + +MAX_CONCURRENT_WORKERS=1 #how many servers should update at the same time ``` ### Prerequisites diff --git a/cron/handler.go b/cron/handler.go index 80d9113..122aee9 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -24,15 +24,21 @@ const ( ) type handler struct { - db *pg.DB + db *pg.DB + maxConcurrentWorkers int } type Config struct { - DB *pg.DB + DB *pg.DB + MaxConcurrentWorkers int } func Attach(c *cron.Cron, cfg Config) error { - h := &handler{cfg.DB} + if cfg.DB == nil { + return fmt.Errorf("cfg.DB cannot be nil, expected go-pg database") + } + + h := &handler{cfg.DB, cfg.MaxConcurrentWorkers} if err := h.init(); err != nil { return err } @@ -60,6 +66,10 @@ func Attach(c *cron.Cron, cfg Config) error { } func (h *handler) init() error { + if h.maxConcurrentWorkers <= 0 { + h.maxConcurrentWorkers = 1 + } + tx, err := h.db.Begin() if err != nil { return err @@ -223,25 +233,36 @@ func (h *handler) updateServersData() { return } + var wg sync.WaitGroup + p := newPool(h.maxConcurrentWorkers) + defer p.close() + for _, server := range servers { url, ok := urls[server.Key] if !ok { log.Printf("No one URL associated with key: %s, skipping...", server.Key) continue } - + p.waitForWorker() + wg.Add(1) sh := &updateServerDataWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), baseURL: url, server: server, } - log.Printf("%s: updating data", server.Key) - if err := sh.update(); err != nil { - log.Println(errors.Wrap(err, server.Key)) - } else { - log.Printf("%s: data updated", server.Key) - } + go func(worker *updateServerDataWorker, server *models.Server, url string) { + defer p.releaseWorker() + defer wg.Done() + log.Printf("%s: updating data", server.Key) + if err := sh.update(); err != nil { + log.Println(errors.Wrap(err, server.Key)) + } else { + log.Printf("%s: data updated", server.Key) + } + }(sh, server, url) } + + wg.Wait() } func (h *handler) updateHistory() { @@ -262,22 +283,22 @@ func (h *handler) updateHistory() { defer p.close() for _, server := range servers { - sh := &updateServerHistoryWorker{ + p.waitForWorker() + wg.Add(1) + worker := &updateServerHistoryWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, } - p.waitForWorker() - wg.Add(1) - go func(server *models.Server, sh *updateServerHistoryWorker) { + go func(server *models.Server, worker *updateServerHistoryWorker) { defer p.releaseWorker() defer wg.Done() log.Printf("%s: updating history", server.Key) - if err := sh.update(); err != nil { + if err := worker.update(); err != nil { log.Println(errors.Wrap(err, server.Key)) return } log.Printf("%s: history updated", server.Key) - }(server, sh) + }(server, worker) } wg.Wait() @@ -298,22 +319,22 @@ func (h *handler) updateServersStats(t time.Time) error { defer p.close() for _, server := range servers { - sh := &updateServerStatsWorker{ + p.waitForWorker() + wg.Add(1) + worker := &updateServerStatsWorker{ db: h.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, } - p.waitForWorker() - wg.Add(1) - go func(server *models.Server, sh *updateServerStatsWorker) { + go func(server *models.Server, worker *updateServerStatsWorker) { defer p.releaseWorker() defer wg.Done() log.Printf("%s: updating stats", server.Key) - if err := sh.update(); err != nil { + if err := worker.update(); err != nil { log.Println(errors.Wrap(err, server.Key)) return } log.Printf("%s: stats updated", server.Key) - }(server, sh) + }(server, worker) } wg.Wait() @@ -343,21 +364,21 @@ func (h *handler) vacuumDatabase() { defer p.close() for _, server := range servers { - sh := &vacuumServerDBWorker{ - db: h.db.WithParam("SERVER", pg.Safe(server.Key)), - } p.waitForWorker() wg.Add(1) - go func(server *models.Server, sh *vacuumServerDBWorker, p *pool) { + worker := &vacuumServerDBWorker{ + db: h.db.WithParam("SERVER", pg.Safe(server.Key)), + } + go func(server *models.Server, worker *vacuumServerDBWorker, p *pool) { defer p.releaseWorker() defer wg.Done() log.Printf("%s: vacuuming database", server.Key) - if err := sh.vacuum(); err != nil { + if err := worker.vacuum(); err != nil { log.Println(errors.Wrap(err, server.Key)) return } log.Printf("%s: database vacuumed", server.Key) - }(server, sh, p) + }(server, worker, p) } wg.Wait() diff --git a/cron/update_server_data_worker.go b/cron/update_server_data_worker.go index 65e3bba..35f8aa5 100644 --- a/cron/update_server_data_worker.go +++ b/cron/update_server_data_worker.go @@ -585,7 +585,7 @@ func (h *updateServerDataWorker) update() error { Where("tribe.id NOT IN (?)", pg.In(ids)). Set("exists = false"). Update(); err != nil && err != pg.ErrNoRows { - return errors.Wrap(err, "cannot update not existed tribes") + return errors.Wrap(err, "cannot update nonexistent tribes") } return tx.Commit() }) @@ -647,7 +647,7 @@ func (h *updateServerDataWorker) update() error { Where("id NOT IN (?)", pg.In(ids)). Set("exists = false"). Update(); err != nil && err != pg.ErrNoRows { - return errors.Wrap(err, "cannot update not existed players") + return errors.Wrap(err, "cannot update nonexistent players") } return tx.Commit() }) diff --git a/cron/vacuum_server_db_worker.go b/cron/vacuum_server_db_worker.go index 8119aa3..d9ec885 100644 --- a/cron/vacuum_server_db_worker.go +++ b/cron/vacuum_server_db_worker.go @@ -8,6 +8,10 @@ import ( "github.com/tribalwarshelp/shared/models" ) +const ( + day = 24 * time.Hour +) + type vacuumServerDBWorker struct { db *pg.DB } @@ -19,39 +23,39 @@ func (h *vacuumServerDBWorker) vacuum() error { } defer tx.Close() - withNotExistedPlayers := h.db.Model(&models.Player{}).Where("exists = false") - withNotExistedTribes := h.db.Model(&models.Tribe{}).Where("exists = false") + withNonExistentPlayers := h.db.Model(&models.Player{}).Where("exists = false") + withNonExistentTribes := h.db.Model(&models.Tribe{}).Where("exists = false") _, err = tx.Model(&models.PlayerHistory{}). - With("players", withNotExistedPlayers). - Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*24*time.Hour*90)). + With("players", withNonExistentPlayers). + Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*90)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old player history") + return errors.Wrap(err, "cannot delete old player history records") } _, err = tx.Model(&models.TribeHistory{}). - With("tribes", withNotExistedTribes). - Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*24*time.Hour*90)). + With("tribes", withNonExistentTribes). + Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*90)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old tribe history") + return errors.Wrap(err, "cannot delete old tribe history records") } _, err = tx.Model(&models.DailyPlayerStats{}). - With("players", withNotExistedPlayers). - Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*24*time.Hour*90)). + With("players", withNonExistentPlayers). + Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*90)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old player stats") + return errors.Wrap(err, "cannot delete old player stats records") } _, err = tx.Model(&models.DailyTribeStats{}). - With("tribes", withNotExistedTribes). - Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*24*time.Hour*90)). + With("tribes", withNonExistentTribes). + Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*90)). Delete() if err != nil { - return errors.Wrap(err, "cannot delete old tribe stats") + return errors.Wrap(err, "cannot delete old tribe stats records") } return tx.Commit() diff --git a/main.go b/main.go index d440188..dce446c 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "os" "os/signal" "runtime" + "strconv" "syscall" "github.com/tribalwarshelp/shared/mode" @@ -42,7 +43,8 @@ func main() { cron.SkipIfStillRunning(cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))), )) if err := _cron.Attach(c, _cron.Config{ - DB: db, + DB: db, + MaxConcurrentWorkers: mustParseEnvToInt("MAX_CONCURRENT_WORKERS"), }); err != nil { log.Fatal(err) } @@ -57,3 +59,15 @@ func main() { log.Print("shutting down") } + +func mustParseEnvToInt(key string) int { + str := os.Getenv(key) + if str == "" { + return 0 + } + i, err := strconv.Atoi(str) + if err != nil { + return 0 + } + return i +} diff --git a/readme.md b/readme.md deleted file mode 100644 index 39ba302..0000000 --- a/readme.md +++ /dev/null @@ -1,9 +0,0 @@ -# Cron - -Required ENV Variables: - -- DB_USER= -- DB_NAME= -- DB_PORT= -- DB_HOST= -- DB_PASSWORD=