diff --git a/README.md b/README.md index 3fded54..a1f0268 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ REDIS_DB=redis_db REDIS_USER=redis_user REDIS_PASSWORD=redis_password -RUN_ON_STARTUP=true|false +RUN_ON_INIT=true|false LOG_DB_QUERIES=true|false WORKER_LIMIT=1 #how many servers should update at the same time diff --git a/cron/config.go b/cron/config.go new file mode 100644 index 0000000..069724a --- /dev/null +++ b/cron/config.go @@ -0,0 +1,26 @@ +package cron + +import ( + "github.com/go-pg/pg/v10" + "github.com/go-redis/redis/v8" + "github.com/pkg/errors" + "github.com/robfig/cron/v3" +) + +type Config struct { + DB *pg.DB + Redis redis.UniversalClient + RunOnInit bool + Opts []cron.Option + WorkerLimit int +} + +func validateConfig(cfg *Config) error { + if cfg == nil || cfg.DB == nil { + return errors.New("validateConfig: cfg.DB is required") + } + if cfg.Redis == nil { + return errors.New("validateConfig: cfg.Redis is required") + } + return nil +} diff --git a/cron/cron.go b/cron/cron.go index b693373..8da6e1a 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -1,45 +1,58 @@ package cron import ( + "context" "fmt" - "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/robfig/cron/v3" "github.com/tribalwarshelp/shared/models" "github.com/tribalwarshelp/cron/cron/queue" + "github.com/tribalwarshelp/cron/cron/tasks" ) -type Config struct { - DB *pg.DB - RunOnStartup bool - Queue queue.Queue +type Cron struct { + *cron.Cron + queue queue.Queue + db *pg.DB + runOnInit bool } -func Attach(c *cron.Cron, cfg Config) error { - if cfg.DB == nil { - return fmt.Errorf("cfg.DB cannot be nil, expected *pg.DB") +func New(cfg *Config) (*Cron, error) { + if err := validateConfig(cfg); err != nil { + return nil, err } - if cfg.Queue == nil { - return fmt.Errorf("cfg.Queue cannot be nil, expected queue.Queue") - } - - h := &handler{ - queue: cfg.Queue, + q, err := initializeQueue(cfg) + if err != nil { + return nil, err } + c := &Cron{ + Cron: cron.New(cfg.Opts...), + queue: q, + db: cfg.DB, + runOnInit: cfg.RunOnInit, + } + if err := c.init(); err != nil { + return nil, err + } + return c, nil +} +func (c *Cron) init() error { var versions []*models.Version - if err := cfg.DB.Model(&versions).DistinctOn("timezone").Select(); err != nil { - return err + if err := c.db.Model(&versions).DistinctOn("timezone").Select(); err != nil { + return errors.Wrap(err, "Cron.init: couldn't load versions") } var updateHistoryFuncs []func() var updateStatsFuncs []func() for _, version := range versions { - updateHistory := createFnWithTimezone(version.Timezone, h.updateHistory) + updateHistory := createFnWithTimezone(version.Timezone, c.updateHistory) updateHistoryFuncs = append(updateHistoryFuncs, updateHistory) - updateStats := createFnWithTimezone(version.Timezone, h.updateStats) + updateStats := createFnWithTimezone(version.Timezone, c.updateStats) updateStatsFuncs = append(updateStatsFuncs, updateStats) if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 30 1 * * *", version.Timezone), updateHistory); err != nil { @@ -49,19 +62,19 @@ func Attach(c *cron.Cron, cfg Config) error { return err } } - if _, err := c.AddFunc("0 * * * *", h.updateServerData); err != nil { + if _, err := c.AddFunc("0 * * * *", c.updateServerData); err != nil { return err } - if _, err := c.AddFunc("20 1 * * *", h.vacuumDatabase); err != nil { + if _, err := c.AddFunc("20 1 * * *", c.vacuumDatabase); err != nil { return err } - if _, err := c.AddFunc("@every 1m", h.updateEnnoblements); err != nil { + if _, err := c.AddFunc("@every 1m", c.updateEnnoblements); err != nil { return err } - if cfg.RunOnStartup { + if c.runOnInit { go func() { - h.updateServerData() - h.vacuumDatabase() + c.updateServerData() + c.vacuumDatabase() for _, fn := range updateHistoryFuncs { go fn() } @@ -70,10 +83,60 @@ func Attach(c *cron.Cron, cfg Config) error { } }() } - return nil } +func (c *Cron) Start(ctx context.Context) error { + if err := c.queue.Start(ctx); err != nil { + return errors.Wrap(err, "Cron.Start") + } + c.Cron.Start() + return nil +} + +func (c *Cron) Stop() error { + c.Cron.Stop() + if err := c.queue.Close(); err != nil { + return errors.Wrap(err, "Cron.Stop") + } + return nil +} + +func (c *Cron) updateServerData() { + c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) +} + +func (c *Cron) updateEnnoblements() { + c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) +} + +func (c *Cron) updateHistory(timezone string) { + c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone)) +} + +func (c *Cron) updateStats(timezone string) { + c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone)) +} + +func (c *Cron) vacuumDatabase() { + c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background())) +} + +func initializeQueue(cfg *Config) (queue.Queue, error) { + q, err := queue.New(&queue.Config{ + WorkerLimit: cfg.WorkerLimit, + Redis: cfg.Redis, + }) + if err != nil { + return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task q") + } + tasks.RegisterTasks(&tasks.Config{ + DB: cfg.DB, + Queue: q, + }) + return q, nil +} + func createFnWithTimezone(timezone string, fn func(timezone string)) func() { return func() { fn(timezone) diff --git a/cron/handler.go b/cron/handler.go deleted file mode 100644 index d783b75..0000000 --- a/cron/handler.go +++ /dev/null @@ -1,32 +0,0 @@ -package cron - -import ( - "context" - - "github.com/tribalwarshelp/cron/cron/queue" - "github.com/tribalwarshelp/cron/cron/tasks" -) - -type handler struct { - queue queue.Queue -} - -func (h *handler) updateServerData() { - h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) -} - -func (h *handler) updateEnnoblements() { - h.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) -} - -func (h *handler) updateHistory(timezone string) { - h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone)) -} - -func (h *handler) updateStats(timezone string) { - h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone)) -} - -func (h *handler) vacuumDatabase() { - h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background())) -} diff --git a/cron/queue/config.go b/cron/queue/config.go index 52d1543..09e8f00 100644 --- a/cron/queue/config.go +++ b/cron/queue/config.go @@ -12,7 +12,7 @@ type Config struct { func validateConfig(cfg *Config) error { if cfg == nil || cfg.Redis == nil { - return errors.New("cfg.Redis is required") + return errors.New("validateConfig: cfg.Redis is required") } return nil } diff --git a/cron/tasks/helpers.go b/cron/tasks/helpers.go index 89cf285..4d3c437 100644 --- a/cron/tasks/helpers.go +++ b/cron/tasks/helpers.go @@ -19,8 +19,11 @@ func countPlayerVillages(villages []*models.Village) int { } func getDateDifferenceInDays(t1, t2 time.Time) int { - diff := t1.Sub(t2) - return int(diff.Hours() / 24) + hours := t1.Sub(t2).Hours() + if hours == 0 { + return 0 + } + return int(hours / 24) } func calcPlayerDailyGrowth(diffInDays, points int) int { diff --git a/cron/tasks/task_load_servers_and_update_data.go b/cron/tasks/task_load_servers_and_update_data.go index 8107a6a..ec7aed0 100644 --- a/cron/tasks/task_load_servers_and_update_data.go +++ b/cron/tasks/task_load_servers_and_update_data.go @@ -29,7 +29,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error { return nil } entry := log.WithField("host", version.Host) - entry.Infof("%s: Loading servers", version.Host) + entry.Infof("taskLoadServersAndUpdateData.execute: %s: Loading servers", version.Host) data, err := t.getServers(version) if err != nil { log.Errorln(err) @@ -49,7 +49,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error { Version: version, } if err := db.CreateSchema(t.db, server); err != nil { - logrus.Warn(errors.Wrapf(err, "%s: couldn't create the schema", server.Key)) + logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: couldn't create the schema", server.Key)) continue } servers = append(servers, server) @@ -64,7 +64,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error { Returning("*"). Insert(); err != nil { err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't insert/update servers") - logrus.Fatal(err) + logrus.Error(err) return err } } @@ -74,7 +74,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error { Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code). Update(); err != nil { err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't update server statuses") - logrus.Fatal(err) + logrus.Error(err) return err } diff --git a/main.go b/main.go index f46d489..c50a321 100644 --- a/main.go +++ b/main.go @@ -13,8 +13,6 @@ import ( "github.com/tribalwarshelp/shared/mode" twhelpcron "github.com/tribalwarshelp/cron/cron" - "github.com/tribalwarshelp/cron/cron/queue" - "github.com/tribalwarshelp/cron/cron/tasks" "github.com/tribalwarshelp/cron/db" envutils "github.com/tribalwarshelp/cron/utils/env" @@ -33,46 +31,38 @@ func init() { } func main() { - client, err := initializeRedis() + redisClient, err := initializeRedis() if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) } - defer client.Close() + defer redisClient.Close() - conn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")}) + dbConn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")}) if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) } - defer conn.Close() + defer dbConn.Close() logrus.Info("Connection with the database has been established") - queue, err := queue.New(&queue.Config{ + c, err := twhelpcron.New(&twhelpcron.Config{ + DB: dbConn, + RunOnInit: envutils.GetenvBool("RUN_ON_INIT"), + Redis: redisClient, WorkerLimit: envutils.GetenvInt("WORKER_LIMIT"), - Redis: client, + Opts: []cron.Option{ + cron.WithChain( + cron.SkipIfStillRunning( + cron.PrintfLogger(logrus.WithField("package", "cron")), + ), + ), + }, }) if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't create the task queue")) - } - tasks.RegisterTasks(&tasks.Config{ - DB: conn, - Queue: queue, - }) - if err := queue.Start(context.Background()); err != nil { logrus.Fatal(err) } - - c := cron.New(cron.WithChain( - cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))), - )) - if err := twhelpcron.Attach(c, twhelpcron.Config{ - DB: conn, - RunOnStartup: envutils.GetenvBool("RUN_ON_STARTUP"), - Queue: queue, - }); err != nil { + if err := c.Start(context.Background()); err != nil { logrus.Fatal(err) } - c.Start() - defer c.Stop() logrus.Info("Cron is running!") @@ -81,7 +71,7 @@ func main() { <-channel logrus.Info("shutting down") - if err := queue.Close(); err != nil { + if err := c.Stop(); err != nil { logrus.Fatal(err) } }