move task registration/queue creation logic to the cron package

This commit is contained in:
Dawid Wysokiński 2021-04-27 18:30:12 +02:00
parent 35045af75f
commit e29807e077
8 changed files with 142 additions and 92 deletions

View File

@ -23,7 +23,7 @@ REDIS_DB=redis_db
REDIS_USER=redis_user
REDIS_PASSWORD=redis_password
RUN_ON_STARTUP=true|false
RUN_ON_INIT=true|false
LOG_DB_QUERIES=true|false
WORKER_LIMIT=1 #how many servers should update at the same time

26
cron/config.go Normal file
View File

@ -0,0 +1,26 @@
package cron
import (
"github.com/go-pg/pg/v10"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
)
type Config struct {
DB *pg.DB
Redis redis.UniversalClient
RunOnInit bool
Opts []cron.Option
WorkerLimit int
}
func validateConfig(cfg *Config) error {
if cfg == nil || cfg.DB == nil {
return errors.New("validateConfig: cfg.DB is required")
}
if cfg.Redis == nil {
return errors.New("validateConfig: cfg.Redis is required")
}
return nil
}

View File

@ -1,45 +1,58 @@
package cron
import (
"context"
"fmt"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/cron/queue"
"github.com/tribalwarshelp/cron/cron/tasks"
)
type Config struct {
DB *pg.DB
RunOnStartup bool
Queue queue.Queue
type Cron struct {
*cron.Cron
queue queue.Queue
db *pg.DB
runOnInit bool
}
func Attach(c *cron.Cron, cfg Config) error {
if cfg.DB == nil {
return fmt.Errorf("cfg.DB cannot be nil, expected *pg.DB")
func New(cfg *Config) (*Cron, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
if cfg.Queue == nil {
return fmt.Errorf("cfg.Queue cannot be nil, expected queue.Queue")
}
h := &handler{
queue: cfg.Queue,
q, err := initializeQueue(cfg)
if err != nil {
return nil, err
}
c := &Cron{
Cron: cron.New(cfg.Opts...),
queue: q,
db: cfg.DB,
runOnInit: cfg.RunOnInit,
}
if err := c.init(); err != nil {
return nil, err
}
return c, nil
}
func (c *Cron) init() error {
var versions []*models.Version
if err := cfg.DB.Model(&versions).DistinctOn("timezone").Select(); err != nil {
return err
if err := c.db.Model(&versions).DistinctOn("timezone").Select(); err != nil {
return errors.Wrap(err, "Cron.init: couldn't load versions")
}
var updateHistoryFuncs []func()
var updateStatsFuncs []func()
for _, version := range versions {
updateHistory := createFnWithTimezone(version.Timezone, h.updateHistory)
updateHistory := createFnWithTimezone(version.Timezone, c.updateHistory)
updateHistoryFuncs = append(updateHistoryFuncs, updateHistory)
updateStats := createFnWithTimezone(version.Timezone, h.updateStats)
updateStats := createFnWithTimezone(version.Timezone, c.updateStats)
updateStatsFuncs = append(updateStatsFuncs, updateStats)
if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 30 1 * * *", version.Timezone), updateHistory); err != nil {
@ -49,19 +62,19 @@ func Attach(c *cron.Cron, cfg Config) error {
return err
}
}
if _, err := c.AddFunc("0 * * * *", h.updateServerData); err != nil {
if _, err := c.AddFunc("0 * * * *", c.updateServerData); err != nil {
return err
}
if _, err := c.AddFunc("20 1 * * *", h.vacuumDatabase); err != nil {
if _, err := c.AddFunc("20 1 * * *", c.vacuumDatabase); err != nil {
return err
}
if _, err := c.AddFunc("@every 1m", h.updateEnnoblements); err != nil {
if _, err := c.AddFunc("@every 1m", c.updateEnnoblements); err != nil {
return err
}
if cfg.RunOnStartup {
if c.runOnInit {
go func() {
h.updateServerData()
h.vacuumDatabase()
c.updateServerData()
c.vacuumDatabase()
for _, fn := range updateHistoryFuncs {
go fn()
}
@ -70,10 +83,60 @@ func Attach(c *cron.Cron, cfg Config) error {
}
}()
}
return nil
}
func (c *Cron) Start(ctx context.Context) error {
if err := c.queue.Start(ctx); err != nil {
return errors.Wrap(err, "Cron.Start")
}
c.Cron.Start()
return nil
}
func (c *Cron) Stop() error {
c.Cron.Stop()
if err := c.queue.Close(); err != nil {
return errors.Wrap(err, "Cron.Stop")
}
return nil
}
func (c *Cron) updateServerData() {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
}
func (c *Cron) updateEnnoblements() {
c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
}
func (c *Cron) updateHistory(timezone string) {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone))
}
func (c *Cron) updateStats(timezone string) {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone))
}
func (c *Cron) vacuumDatabase() {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
}
func initializeQueue(cfg *Config) (queue.Queue, error) {
q, err := queue.New(&queue.Config{
WorkerLimit: cfg.WorkerLimit,
Redis: cfg.Redis,
})
if err != nil {
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task q")
}
tasks.RegisterTasks(&tasks.Config{
DB: cfg.DB,
Queue: q,
})
return q, nil
}
func createFnWithTimezone(timezone string, fn func(timezone string)) func() {
return func() {
fn(timezone)

View File

@ -1,32 +0,0 @@
package cron
import (
"context"
"github.com/tribalwarshelp/cron/cron/queue"
"github.com/tribalwarshelp/cron/cron/tasks"
)
type handler struct {
queue queue.Queue
}
func (h *handler) updateServerData() {
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
}
func (h *handler) updateEnnoblements() {
h.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
}
func (h *handler) updateHistory(timezone string) {
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone))
}
func (h *handler) updateStats(timezone string) {
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone))
}
func (h *handler) vacuumDatabase() {
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
}

View File

@ -12,7 +12,7 @@ type Config struct {
func validateConfig(cfg *Config) error {
if cfg == nil || cfg.Redis == nil {
return errors.New("cfg.Redis is required")
return errors.New("validateConfig: cfg.Redis is required")
}
return nil
}

View File

@ -19,8 +19,11 @@ func countPlayerVillages(villages []*models.Village) int {
}
func getDateDifferenceInDays(t1, t2 time.Time) int {
diff := t1.Sub(t2)
return int(diff.Hours() / 24)
hours := t1.Sub(t2).Hours()
if hours == 0 {
return 0
}
return int(hours / 24)
}
func calcPlayerDailyGrowth(diffInDays, points int) int {

View File

@ -29,7 +29,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error {
return nil
}
entry := log.WithField("host", version.Host)
entry.Infof("%s: Loading servers", version.Host)
entry.Infof("taskLoadServersAndUpdateData.execute: %s: Loading servers", version.Host)
data, err := t.getServers(version)
if err != nil {
log.Errorln(err)
@ -49,7 +49,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error {
Version: version,
}
if err := db.CreateSchema(t.db, server); err != nil {
logrus.Warn(errors.Wrapf(err, "%s: couldn't create the schema", server.Key))
logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: couldn't create the schema", server.Key))
continue
}
servers = append(servers, server)
@ -64,7 +64,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error {
Returning("*").
Insert(); err != nil {
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't insert/update servers")
logrus.Fatal(err)
logrus.Error(err)
return err
}
}
@ -74,7 +74,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error {
Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code).
Update(); err != nil {
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't update server statuses")
logrus.Fatal(err)
logrus.Error(err)
return err
}

44
main.go
View File

@ -13,8 +13,6 @@ import (
"github.com/tribalwarshelp/shared/mode"
twhelpcron "github.com/tribalwarshelp/cron/cron"
"github.com/tribalwarshelp/cron/cron/queue"
"github.com/tribalwarshelp/cron/cron/tasks"
"github.com/tribalwarshelp/cron/db"
envutils "github.com/tribalwarshelp/cron/utils/env"
@ -33,46 +31,38 @@ func init() {
}
func main() {
client, err := initializeRedis()
redisClient, err := initializeRedis()
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis"))
}
defer client.Close()
defer redisClient.Close()
conn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")})
dbConn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db"))
}
defer conn.Close()
defer dbConn.Close()
logrus.Info("Connection with the database has been established")
queue, err := queue.New(&queue.Config{
c, err := twhelpcron.New(&twhelpcron.Config{
DB: dbConn,
RunOnInit: envutils.GetenvBool("RUN_ON_INIT"),
Redis: redisClient,
WorkerLimit: envutils.GetenvInt("WORKER_LIMIT"),
Redis: client,
Opts: []cron.Option{
cron.WithChain(
cron.SkipIfStillRunning(
cron.PrintfLogger(logrus.WithField("package", "cron")),
),
),
},
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't create the task queue"))
}
tasks.RegisterTasks(&tasks.Config{
DB: conn,
Queue: queue,
})
if err := queue.Start(context.Background()); err != nil {
logrus.Fatal(err)
}
c := cron.New(cron.WithChain(
cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))),
))
if err := twhelpcron.Attach(c, twhelpcron.Config{
DB: conn,
RunOnStartup: envutils.GetenvBool("RUN_ON_STARTUP"),
Queue: queue,
}); err != nil {
if err := c.Start(context.Background()); err != nil {
logrus.Fatal(err)
}
c.Start()
defer c.Stop()
logrus.Info("Cron is running!")
@ -81,7 +71,7 @@ func main() {
<-channel
logrus.Info("shutting down")
if err := queue.Close(); err != nil {
if err := c.Stop(); err != nil {
logrus.Fatal(err)
}
}