From 849ee6ae362a9d4a2c405a36061ccc94176fc553 Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Fri, 14 May 2021 14:02:51 +0200 Subject: [PATCH] rename one package (internal/cron/tasks -> internal/cron/task), correct 'package' field in logger initialization --- internal/cron/cron.go | 30 ++-- internal/cron/queue/queue.go | 12 +- internal/cron/task/config.go | 22 +++ internal/cron/{tasks => task}/helpers.go | 2 +- internal/cron/task/task.go | 129 ++++++++++++++++++ .../task_delete_non_existent_villages.go | 8 +- .../task_load_servers_and_update_data.go | 4 +- ...sk_load_versions_and_update_server_data.go | 4 +- ...ask_server_delete_non_existent_villages.go | 2 +- .../task_update_ennoblements.go | 8 +- .../{tasks => task}/task_update_history.go | 6 +- .../task_update_server_data.go | 2 +- .../task_update_server_ennoblements.go | 2 +- .../task_update_server_history.go | 2 +- .../task_update_server_stats.go | 2 +- .../cron/{tasks => task}/task_update_stats.go | 6 +- internal/cron/{tasks => task}/task_vacuum.go | 6 +- .../{tasks => task}/task_vacuum_server_db.go | 2 +- internal/cron/tasks/task.go | 29 ---- internal/cron/tasks/tasks.go | 123 ----------------- internal/postgres/postgres.go | 2 +- 21 files changed, 201 insertions(+), 202 deletions(-) create mode 100644 internal/cron/task/config.go rename internal/cron/{tasks => task}/helpers.go (99%) create mode 100644 internal/cron/task/task.go rename internal/cron/{tasks => task}/task_delete_non_existent_villages.go (89%) rename internal/cron/{tasks => task}/task_load_servers_and_update_data.go (95%) rename internal/cron/{tasks => task}/task_load_versions_and_update_server_data.go (85%) rename internal/cron/{tasks => task}/task_server_delete_non_existent_villages.go (99%) rename internal/cron/{tasks => task}/task_update_ennoblements.go (90%) rename internal/cron/{tasks => task}/task_update_history.go (89%) rename internal/cron/{tasks => task}/task_update_server_data.go (99%) rename internal/cron/{tasks => task}/task_update_server_ennoblements.go (99%) rename internal/cron/{tasks => task}/task_update_server_history.go (99%) rename internal/cron/{tasks => task}/task_update_server_stats.go (99%) rename internal/cron/{tasks => task}/task_update_stats.go (89%) rename internal/cron/{tasks => task}/task_vacuum.go (82%) rename internal/cron/{tasks => task}/task_vacuum_server_db.go (99%) delete mode 100644 internal/cron/tasks/task.go delete mode 100644 internal/cron/tasks/tasks.go diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 9eb2be5..6ee17d8 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -11,7 +11,7 @@ import ( "github.com/robfig/cron/v3" "github.com/tribalwarshelp/cron/internal/cron/queue" - "github.com/tribalwarshelp/cron/internal/cron/tasks" + "github.com/tribalwarshelp/cron/internal/cron/task" ) type Cron struct { @@ -35,7 +35,7 @@ func New(cfg *Config) (*Cron, error) { queue: q, db: cfg.DB, runOnInit: cfg.RunOnInit, - log: logrus.WithField("package", "cron"), + log: logrus.WithField("package", "internal/cron"), } if err := c.init(); err != nil { return nil, err @@ -109,44 +109,44 @@ func (c *Cron) Stop() error { } func (c *Cron) updateServerData() { - err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, task.Get(task.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) if err != nil { - c.logError("Cron.updateServerData", tasks.TaskNameLoadVersionsAndUpdateServerData, err) + c.logError("Cron.updateServerData", task.LoadVersionsAndUpdateServerData, err) } } func (c *Cron) updateEnnoblements() { - err := c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) + err := c.queue.Add(queue.Ennoblements, task.Get(task.UpdateEnnoblements).WithArgs(context.Background())) if err != nil { - c.logError("Cron.updateEnnoblements", tasks.TaskUpdateEnnoblements, err) + c.logError("Cron.updateEnnoblements", task.UpdateEnnoblements, err) } } func (c *Cron) updateHistory(timezone string) { - err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.Main, task.Get(task.UpdateHistory).WithArgs(context.Background(), timezone)) if err != nil { - c.logError("Cron.updateHistory", tasks.TaskUpdateHistory, err) + c.logError("Cron.updateHistory", task.UpdateHistory, err) } } func (c *Cron) updateStats(timezone string) { - err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.Main, task.Get(task.UpdateStats).WithArgs(context.Background(), timezone)) if err != nil { - c.logError("Cron.updateStats", tasks.TaskUpdateStats, err) + c.logError("Cron.updateStats", task.UpdateStats, err) } } func (c *Cron) vacuumDatabase() { - err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, task.Get(task.Vacuum).WithArgs(context.Background())) if err != nil { - c.logError("Cron.vacuumDatabase", tasks.TaskNameVacuum, err) + c.logError("Cron.vacuumDatabase", task.Vacuum, err) } } func (c *Cron) deleteNonExistentVillages() { - err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameDeleteNonExistentVillages).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, task.Get(task.DeleteNonExistentVillages).WithArgs(context.Background())) if err != nil { - c.logError("Cron.deleteNonExistentVillages", tasks.TaskNameDeleteNonExistentVillages, err) + c.logError("Cron.deleteNonExistentVillages", task.DeleteNonExistentVillages, err) } } @@ -169,7 +169,7 @@ func initializeQueue(cfg *Config) (queue.Queue, error) { if err != nil { return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue") } - err = tasks.RegisterTasks(&tasks.Config{ + err = task.RegisterTasks(&task.Config{ DB: cfg.DB, Queue: q, }) diff --git a/internal/cron/queue/queue.go b/internal/cron/queue/queue.go index deaf272..c7c0a43 100644 --- a/internal/cron/queue/queue.go +++ b/internal/cron/queue/queue.go @@ -11,8 +11,8 @@ import ( ) const ( - MainQueue = "main" - EnnoblementsQueue = "ennoblements" + Main = "main" + Ennoblements = "ennoblements" ) type Queue interface { @@ -46,8 +46,8 @@ func New(cfg *Config) (Queue, error) { func (q *queue) init(cfg *Config) error { q.factory = redisq.NewFactory() - q.mainQueue = q.registerQueue(MainQueue, cfg.WorkerLimit) - q.ennoblementsQueue = q.registerQueue(EnnoblementsQueue, cfg.WorkerLimit) + q.mainQueue = q.registerQueue(Main, cfg.WorkerLimit) + q.ennoblementsQueue = q.registerQueue(Ennoblements, cfg.WorkerLimit) return nil } @@ -64,9 +64,9 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue { func (q *queue) getQueueByName(name string) taskq.Queue { switch name { - case MainQueue: + case Main: return q.mainQueue - case EnnoblementsQueue: + case Ennoblements: return q.ennoblementsQueue } return nil diff --git a/internal/cron/task/config.go b/internal/cron/task/config.go new file mode 100644 index 0000000..ea60f52 --- /dev/null +++ b/internal/cron/task/config.go @@ -0,0 +1,22 @@ +package task + +import ( + "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/tribalwarshelp/cron/internal/cron/queue" +) + +type Config struct { + DB *pg.DB + Queue queue.Queue +} + +func validateConfig(cfg *Config) error { + if cfg == nil || cfg.DB == nil { + return errors.New("cfg.DB is required") + } + if cfg.Queue == nil { + return errors.New("cfg.Queue is required") + } + return nil +} diff --git a/internal/cron/tasks/helpers.go b/internal/cron/task/helpers.go similarity index 99% rename from internal/cron/tasks/helpers.go rename to internal/cron/task/helpers.go index eb51d18..4981c47 100644 --- a/internal/cron/tasks/helpers.go +++ b/internal/cron/task/helpers.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/tribalwarshelp/shared/tw/twdataloader" diff --git a/internal/cron/task/task.go b/internal/cron/task/task.go new file mode 100644 index 0000000..8c7a730 --- /dev/null +++ b/internal/cron/task/task.go @@ -0,0 +1,129 @@ +package task + +import ( + "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/vmihailenco/taskq/v3" + "sync" + "time" + + "github.com/tribalwarshelp/cron/internal/cron/queue" +) + +const ( + LoadVersionsAndUpdateServerData = "loadVersionsAndUpdateServerData" + LoadServersAndUpdateData = "loadServersAndUpdateData" + UpdateServerData = "updateServerData" + Vacuum = "vacuum" + VacuumServerDB = "vacuumServerDB" + UpdateEnnoblements = "updateEnnoblements" + UpdateServerEnnoblements = "updateServerEnnoblements" + UpdateHistory = "updateHistory" + UpdateServerHistory = "updateServerHistory" + UpdateStats = "updateStats" + UpdateServerStats = "updateServerStats" + DeleteNonExistentVillages = "deleteNonExistentVillages" + ServerDeleteNonExistentVillages = "serverDeleteNonExistentVillages" + defaultRetryLimit = 3 +) + +var log = logrus.WithField("package", "internal/cron/task") + +type task struct { + db *pg.DB + queue queue.Queue + cachedLocations sync.Map +} + +func (t *task) loadLocation(timezone string) (*time.Location, error) { + val, ok := t.cachedLocations.Load(timezone) + if ok { + return val.(*time.Location), nil + } + location, err := time.LoadLocation(timezone) + if err != nil { + return nil, errors.Wrap(err, "task.loadLocation") + } + t.cachedLocations.Store(timezone, location) + return location, nil +} + +func RegisterTasks(cfg *Config) error { + if err := validateConfig(cfg); err != nil { + return errors.Wrap(err, "RegisterTasks") + } + + t := &task{ + db: cfg.DB, + queue: cfg.Queue, + } + options := []*taskq.TaskOptions{ + { + Name: LoadVersionsAndUpdateServerData, + Handler: (&taskLoadVersionsAndUpdateServerData{t}).execute, + }, + { + Name: LoadServersAndUpdateData, + Handler: (&taskLoadServersAndUpdateData{t}).execute, + }, + { + Name: UpdateServerData, + Handler: (&taskUpdateServerData{t}).execute, + }, + { + Name: Vacuum, + Handler: (&taskVacuum{t}).execute, + }, + { + Name: VacuumServerDB, + Handler: (&taskVacuumServerDB{t}).execute, + }, + { + Name: UpdateEnnoblements, + Handler: (&taskUpdateEnnoblements{t}).execute, + }, + { + Name: UpdateServerEnnoblements, + Handler: (&taskUpdateServerEnnoblements{t}).execute, + }, + { + Name: UpdateHistory, + Handler: (&taskUpdateHistory{t}).execute, + }, + { + Name: UpdateServerHistory, + RetryLimit: defaultRetryLimit, + Handler: (&taskUpdateServerHistory{t}).execute, + }, + { + Name: UpdateStats, + Handler: (&taskUpdateStats{t}).execute, + }, + { + Name: UpdateServerStats, + Handler: (&taskUpdateServerStats{t}).execute, + }, + { + Name: DeleteNonExistentVillages, + Handler: (&taskDeleteNonExistentVillages{t}).execute, + }, + { + Name: ServerDeleteNonExistentVillages, + Handler: (&taskServerDeleteNonExistentVillages{t}).execute, + }, + } + for _, taskOptions := range options { + opts := taskOptions + if opts.RetryLimit == 0 { + opts.RetryLimit = defaultRetryLimit + } + taskq.RegisterTask(opts) + } + + return nil +} + +func Get(taskName string) *taskq.Task { + return taskq.Tasks.Get(taskName) +} diff --git a/internal/cron/tasks/task_delete_non_existent_villages.go b/internal/cron/task/task_delete_non_existent_villages.go similarity index 89% rename from internal/cron/tasks/task_delete_non_existent_villages.go rename to internal/cron/task/task_delete_non_existent_villages.go index 9546c98..c85efb5 100644 --- a/internal/cron/tasks/task_delete_non_existent_villages.go +++ b/internal/cron/task/task_delete_non_existent_villages.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -32,8 +32,8 @@ func (t *taskDeleteNonExistentVillages) execute() error { for _, server := range servers { s := server err := t.queue.Add( - queue.MainQueue, - Get(TaskNameServerDeleteNonExistentVillages). + queue.Main, + Get(ServerDeleteNonExistentVillages). WithArgs( context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), @@ -46,7 +46,7 @@ func (t *taskDeleteNonExistentVillages) execute() error { err, "taskDeleteNonExistentVillages.execute: %s: Couldn't add the task '%s' for this server", server.Key, - TaskNameServerDeleteNonExistentVillages, + ServerDeleteNonExistentVillages, ), ) } diff --git a/internal/cron/tasks/task_load_servers_and_update_data.go b/internal/cron/task/task_load_servers_and_update_data.go similarity index 95% rename from internal/cron/tasks/task_load_servers_and_update_data.go rename to internal/cron/task/task_load_servers_and_update_data.go index e5e8afb..5137a1f 100644 --- a/internal/cron/tasks/task_load_servers_and_update_data.go +++ b/internal/cron/task/task_load_servers_and_update_data.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -85,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { } for _, server := range servers { - t.queue.Add(queue.MainQueue, Get(TaskNameUpdateServerData).WithArgs(context.Background(), server.url, server.Server)) + t.queue.Add(queue.Main, Get(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) } entry.Infof("%s: Servers have been loaded", version.Host) diff --git a/internal/cron/tasks/task_load_versions_and_update_server_data.go b/internal/cron/task/task_load_versions_and_update_server_data.go similarity index 85% rename from internal/cron/tasks/task_load_versions_and_update_server_data.go rename to internal/cron/task/task_load_versions_and_update_server_data.go index 7875866..b576c32 100644 --- a/internal/cron/tasks/task_load_versions_and_update_server_data.go +++ b/internal/cron/task/task_load_versions_and_update_server_data.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -21,7 +21,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error { return err } for _, version := range versions { - t.queue.Add(queue.MainQueue, Get(TaskNameLoadServersAndUpdateData).WithArgs(context.Background(), version)) + t.queue.Add(queue.Main, Get(LoadServersAndUpdateData).WithArgs(context.Background(), version)) } log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") return nil diff --git a/internal/cron/tasks/task_server_delete_non_existent_villages.go b/internal/cron/task/task_server_delete_non_existent_villages.go similarity index 99% rename from internal/cron/tasks/task_server_delete_non_existent_villages.go rename to internal/cron/task/task_server_delete_non_existent_villages.go index d3985f5..65375fd 100644 --- a/internal/cron/tasks/task_server_delete_non_existent_villages.go +++ b/internal/cron/task/task_server_delete_non_existent_villages.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/tasks/task_update_ennoblements.go b/internal/cron/task/task_update_ennoblements.go similarity index 90% rename from internal/cron/tasks/task_update_ennoblements.go rename to internal/cron/task/task_update_ennoblements.go index f685e56..bd07739 100644 --- a/internal/cron/tasks/task_update_ennoblements.go +++ b/internal/cron/task/task_update_ennoblements.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -29,8 +29,8 @@ func (t *taskUpdateEnnoblements) execute() error { for _, server := range servers { s := server err := t.queue.Add( - queue.EnnoblementsQueue, - Get(TaskUpdateServerEnnoblements). + queue.Ennoblements, + Get(UpdateServerEnnoblements). WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), s), ) if err != nil { @@ -39,7 +39,7 @@ func (t *taskUpdateEnnoblements) execute() error { err, "taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server", server.Key, - TaskUpdateServerEnnoblements, + UpdateServerEnnoblements, ), ) } diff --git a/internal/cron/tasks/task_update_history.go b/internal/cron/task/task_update_history.go similarity index 89% rename from internal/cron/tasks/task_update_history.go rename to internal/cron/task/task_update_history.go index bc88d02..1592164 100644 --- a/internal/cron/tasks/task_update_history.go +++ b/internal/cron/task/task_update_history.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -44,14 +44,14 @@ func (t *taskUpdateHistory) execute(timezone string) error { Info("taskUpdateHistory.execute: Update of the history has started") for _, server := range servers { s := server - err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s)) + err := t.queue.Add(queue.Main, Get(UpdateServerHistory).WithArgs(context.Background(), timezone, s)) if err != nil { log.Warn( errors.Wrapf( err, "taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server", server.Key, - TaskUpdateServerHistory, + UpdateServerHistory, ), ) } diff --git a/internal/cron/tasks/task_update_server_data.go b/internal/cron/task/task_update_server_data.go similarity index 99% rename from internal/cron/tasks/task_update_server_data.go rename to internal/cron/task/task_update_server_data.go index fcb5af9..7ddfcdb 100644 --- a/internal/cron/tasks/task_update_server_data.go +++ b/internal/cron/task/task_update_server_data.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" diff --git a/internal/cron/tasks/task_update_server_ennoblements.go b/internal/cron/task/task_update_server_ennoblements.go similarity index 99% rename from internal/cron/tasks/task_update_server_ennoblements.go rename to internal/cron/task/task_update_server_ennoblements.go index a9e65af..5a5df8a 100644 --- a/internal/cron/tasks/task_update_server_ennoblements.go +++ b/internal/cron/task/task_update_server_ennoblements.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/tasks/task_update_server_history.go b/internal/cron/task/task_update_server_history.go similarity index 99% rename from internal/cron/tasks/task_update_server_history.go rename to internal/cron/task/task_update_server_history.go index 09d00bb..0328ae0 100644 --- a/internal/cron/tasks/task_update_server_history.go +++ b/internal/cron/task/task_update_server_history.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/tasks/task_update_server_stats.go b/internal/cron/task/task_update_server_stats.go similarity index 99% rename from internal/cron/tasks/task_update_server_stats.go rename to internal/cron/task/task_update_server_stats.go index e1f4e4b..261f5fe 100644 --- a/internal/cron/tasks/task_update_server_stats.go +++ b/internal/cron/task/task_update_server_stats.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/tasks/task_update_stats.go b/internal/cron/task/task_update_stats.go similarity index 89% rename from internal/cron/tasks/task_update_stats.go rename to internal/cron/task/task_update_stats.go index eb63f92..646fecd 100644 --- a/internal/cron/tasks/task_update_stats.go +++ b/internal/cron/task/task_update_stats.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -44,14 +44,14 @@ func (t *taskUpdateStats) execute(timezone string) error { Info("taskUpdateStats.execute: Update of the stats has started") for _, server := range servers { s := server - err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s)) + err := t.queue.Add(queue.Main, Get(UpdateServerStats).WithArgs(context.Background(), timezone, s)) if err != nil { log.Warn( errors.Wrapf( err, "taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server", server.Key, - TaskUpdateServerStats, + UpdateServerStats, ), ) } diff --git a/internal/cron/tasks/task_vacuum.go b/internal/cron/task/task_vacuum.go similarity index 82% rename from internal/cron/tasks/task_vacuum.go rename to internal/cron/task/task_vacuum.go index 14ed57c..473e992 100644 --- a/internal/cron/tasks/task_vacuum.go +++ b/internal/cron/task/task_vacuum.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "context" @@ -25,14 +25,14 @@ func (t *taskVacuum) execute() error { log.Infof("taskVacuum.execute: Start database vacumming...") for _, server := range servers { s := server - err := t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s)) + err := t.queue.Add(queue.Main, Get(VacuumServerDB).WithArgs(context.Background(), s)) if err != nil { log.Warn( errors.Wrapf( err, "taskVacuum.execute: %s: Couldn't add the task '%s' for this server", server.Key, - TaskUpdateServerEnnoblements, + UpdateServerEnnoblements, ), ) } diff --git a/internal/cron/tasks/task_vacuum_server_db.go b/internal/cron/task/task_vacuum_server_db.go similarity index 99% rename from internal/cron/tasks/task_vacuum_server_db.go rename to internal/cron/task/task_vacuum_server_db.go index 32c8c4d..de1de78 100644 --- a/internal/cron/tasks/task_vacuum_server_db.go +++ b/internal/cron/task/task_vacuum_server_db.go @@ -1,4 +1,4 @@ -package tasks +package task import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/tasks/task.go b/internal/cron/tasks/task.go deleted file mode 100644 index 0d52cce..0000000 --- a/internal/cron/tasks/task.go +++ /dev/null @@ -1,29 +0,0 @@ -package tasks - -import ( - "github.com/go-pg/pg/v10" - "github.com/pkg/errors" - "sync" - "time" - - "github.com/tribalwarshelp/cron/internal/cron/queue" -) - -type task struct { - db *pg.DB - queue queue.Queue - cachedLocations sync.Map -} - -func (t *task) loadLocation(timezone string) (*time.Location, error) { - val, ok := t.cachedLocations.Load(timezone) - if ok { - return val.(*time.Location), nil - } - location, err := time.LoadLocation(timezone) - if err != nil { - return nil, errors.Wrap(err, "task.loadLocation") - } - t.cachedLocations.Store(timezone, location) - return location, nil -} diff --git a/internal/cron/tasks/tasks.go b/internal/cron/tasks/tasks.go deleted file mode 100644 index 6330ad0..0000000 --- a/internal/cron/tasks/tasks.go +++ /dev/null @@ -1,123 +0,0 @@ -package tasks - -import ( - "github.com/go-pg/pg/v10" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/vmihailenco/taskq/v3" - - "github.com/tribalwarshelp/cron/internal/cron/queue" -) - -const ( - TaskNameLoadVersionsAndUpdateServerData = "loadVersionsAndUpdateServerData" - TaskNameLoadServersAndUpdateData = "loadServersAndUpdateData" - TaskNameUpdateServerData = "updateServerData" - TaskNameVacuum = "vacuum" - TaskNameVacuumServerDB = "vacuumServerDB" - TaskUpdateEnnoblements = "updateEnnoblements" - TaskUpdateServerEnnoblements = "updateServerEnnoblements" - TaskUpdateHistory = "updateHistory" - TaskUpdateServerHistory = "updateServerHistory" - TaskUpdateStats = "updateStats" - TaskUpdateServerStats = "updateServerStats" - TaskNameDeleteNonExistentVillages = "deleteNonExistentVillages" - TaskNameServerDeleteNonExistentVillages = "serverDeleteNonExistentVillages" - defaultRetryLimit = 3 -) - -var log = logrus.WithField("package", "cron/tasks") - -type Config struct { - DB *pg.DB - Queue queue.Queue -} - -func RegisterTasks(cfg *Config) error { - if err := validateConfig(cfg); err != nil { - return errors.Wrap(err, "RegisterTasks") - } - - t := &task{ - db: cfg.DB, - queue: cfg.Queue, - } - options := []*taskq.TaskOptions{ - { - Name: TaskNameLoadVersionsAndUpdateServerData, - Handler: (&taskLoadVersionsAndUpdateServerData{t}).execute, - }, - { - Name: TaskNameLoadServersAndUpdateData, - Handler: (&taskLoadServersAndUpdateData{t}).execute, - }, - { - Name: TaskNameUpdateServerData, - Handler: (&taskUpdateServerData{t}).execute, - }, - { - Name: TaskNameVacuum, - Handler: (&taskVacuum{t}).execute, - }, - { - Name: TaskNameVacuumServerDB, - Handler: (&taskVacuumServerDB{t}).execute, - }, - { - Name: TaskUpdateEnnoblements, - Handler: (&taskUpdateEnnoblements{t}).execute, - }, - { - Name: TaskUpdateServerEnnoblements, - Handler: (&taskUpdateServerEnnoblements{t}).execute, - }, - { - Name: TaskUpdateHistory, - Handler: (&taskUpdateHistory{t}).execute, - }, - { - Name: TaskUpdateServerHistory, - RetryLimit: defaultRetryLimit, - Handler: (&taskUpdateServerHistory{t}).execute, - }, - { - Name: TaskUpdateStats, - Handler: (&taskUpdateStats{t}).execute, - }, - { - Name: TaskUpdateServerStats, - Handler: (&taskUpdateServerStats{t}).execute, - }, - { - Name: TaskNameDeleteNonExistentVillages, - Handler: (&taskDeleteNonExistentVillages{t}).execute, - }, - { - Name: TaskNameServerDeleteNonExistentVillages, - Handler: (&taskServerDeleteNonExistentVillages{t}).execute, - }, - } - for _, taskOptions := range options { - opts := taskOptions - if opts.RetryLimit == 0 { - opts.RetryLimit = defaultRetryLimit - } - taskq.RegisterTask(opts) - } - - return nil -} - -func Get(taskName string) *taskq.Task { - return taskq.Tasks.Get(taskName) -} - -func validateConfig(cfg *Config) error { - if cfg == nil || cfg.DB == nil { - return errors.New("cfg.DB is required") - } - if cfg.Queue == nil { - return errors.New("cfg.Queue is required") - } - return nil -} diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go index 83a2ac8..96f733a 100644 --- a/internal/postgres/postgres.go +++ b/internal/postgres/postgres.go @@ -11,7 +11,7 @@ import ( "github.com/tribalwarshelp/shared/tw/twmodel" ) -var log = logrus.WithField("package", "postgres") +var log = logrus.WithField("package", "internal/postgres") type Config struct { LogQueries bool