diff --git a/internal/cron/cron.go b/internal/cron/cron.go index c2bf548..c9d754f 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -11,7 +11,6 @@ import ( "github.com/robfig/cron/v3" "github.com/tribalwarshelp/cron/internal/cron/queue" - "github.com/tribalwarshelp/cron/internal/cron/task" ) type Cron struct { @@ -114,44 +113,44 @@ func (c *Cron) Stop() error { } func (c *Cron) updateServerData() { - err := c.queue.Add(queue.Main, task.Get(task.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) if err != nil { - c.logError("Cron.updateServerData", task.LoadVersionsAndUpdateServerData, err) + c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err) } } func (c *Cron) updateEnnoblements() { - err := c.queue.Add(queue.Ennoblements, task.Get(task.UpdateEnnoblements).WithArgs(context.Background())) + err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) if err != nil { - c.logError("Cron.updateEnnoblements", task.UpdateEnnoblements, err) + c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err) } } func (c *Cron) updateHistory(timezone string) { - err := c.queue.Add(queue.Main, task.Get(task.UpdateHistory).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) if err != nil { - c.logError("Cron.updateHistory", task.UpdateHistory, err) + c.logError("Cron.updateHistory", queue.UpdateHistory, err) } } func (c *Cron) updateStats(timezone string) { - err := c.queue.Add(queue.Main, task.Get(task.UpdateStats).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) if err != nil { - c.logError("Cron.updateStats", task.UpdateStats, err) + c.logError("Cron.updateStats", queue.UpdateStats, err) } } func (c *Cron) vacuumDatabase() { - err := c.queue.Add(queue.Main, task.Get(task.Vacuum).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background())) if err != nil { - c.logError("Cron.vacuumDatabase", task.Vacuum, err) + c.logError("Cron.vacuumDatabase", queue.Vacuum, err) } } func (c *Cron) deleteNonExistentVillages() { - err := c.queue.Add(queue.Main, task.Get(task.DeleteNonExistentVillages).WithArgs(context.Background())) + err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) if err != nil { - c.logError("Cron.deleteNonExistentVillages", task.DeleteNonExistentVillages, err) + c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err) } } @@ -170,18 +169,9 @@ func initializeQueue(cfg *Config) (queue.Queue, error) { q, err := queue.New(&queue.Config{ WorkerLimit: cfg.WorkerLimit, Redis: cfg.Redis, + DB: cfg.DB, }) - if err != nil { - return nil, errors.Wrap(err, "couldn't initialize a queue") - } - err = task.RegisterTasks(&task.Config{ - DB: cfg.DB, - Queue: q, - }) - if err != nil { - return nil, errors.Wrap(err, "couldn't register tasks") - } - return q, nil + return q, errors.Wrap(err, "couldn't initialize a queue") } func createFnWithTimezone(timezone string, fn func(timezone string)) func() { diff --git a/internal/cron/queue/config.go b/internal/cron/queue/config.go index 52d1543..99a88ba 100644 --- a/internal/cron/queue/config.go +++ b/internal/cron/queue/config.go @@ -1,6 +1,7 @@ package queue import ( + "github.com/go-pg/pg/v10" "github.com/go-redis/redis/v8" "github.com/pkg/errors" ) @@ -8,6 +9,7 @@ import ( type Config struct { Redis redis.UniversalClient WorkerLimit int + DB *pg.DB } func validateConfig(cfg *Config) error { @@ -16,3 +18,18 @@ func validateConfig(cfg *Config) error { } return nil } + +type registerTasksConfig struct { + DB *pg.DB + Queue Queue +} + +func validateRegisterTasksConfig(cfg *registerTasksConfig) error { + if cfg == nil || cfg.DB == nil { + return errors.New("cfg.DB is required") + } + if cfg.Queue == nil { + return errors.New("cfg.Queue is required") + } + return nil +} diff --git a/internal/cron/task/helpers.go b/internal/cron/queue/helpers.go similarity index 99% rename from internal/cron/task/helpers.go rename to internal/cron/queue/helpers.go index 4981c47..73a4b15 100644 --- a/internal/cron/task/helpers.go +++ b/internal/cron/queue/helpers.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/tribalwarshelp/shared/tw/twdataloader" diff --git a/internal/cron/queue/queue.go b/internal/cron/queue/queue.go index 98acb34..e021a78 100644 --- a/internal/cron/queue/queue.go +++ b/internal/cron/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "context" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "time" "github.com/go-redis/redis/v8" @@ -15,6 +16,8 @@ const ( Ennoblements = "ennoblements" ) +var log = logrus.WithField("package", "internal/cron/queue") + type Queue interface { Start(ctx context.Context) error Close() error @@ -22,10 +25,10 @@ type Queue interface { } type queue struct { - redis redis.UniversalClient - mainQueue taskq.Queue - ennoblementsQueue taskq.Queue - factory taskq.Factory + redis redis.UniversalClient + main taskq.Queue + ennoblements taskq.Queue + factory taskq.Factory } func New(cfg *Config) (Queue, error) { @@ -46,8 +49,16 @@ func New(cfg *Config) (Queue, error) { func (q *queue) init(cfg *Config) error { q.factory = redisq.NewFactory() - q.mainQueue = q.registerQueue(Main, cfg.WorkerLimit) - q.ennoblementsQueue = q.registerQueue(Ennoblements, cfg.WorkerLimit) + q.main = q.registerQueue(Main, cfg.WorkerLimit) + q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit) + + err := registerTasks(®isterTasksConfig{ + DB: cfg.DB, + Queue: q, + }) + if err != nil { + return errors.Wrapf(err, "couldn't register tasks") + } return nil } @@ -65,9 +76,9 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue { func (q *queue) getQueueByName(name string) taskq.Queue { switch name { case Main: - return q.mainQueue + return q.main case Ennoblements: - return q.ennoblementsQueue + return q.ennoblements } return nil } diff --git a/internal/cron/task/task.go b/internal/cron/queue/task.go similarity index 90% rename from internal/cron/task/task.go rename to internal/cron/queue/task.go index 7fb809a..ae92bd7 100644 --- a/internal/cron/task/task.go +++ b/internal/cron/queue/task.go @@ -1,14 +1,11 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "github.com/vmihailenco/taskq/v3" "sync" "time" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) const ( @@ -28,11 +25,9 @@ const ( defaultRetryLimit = 3 ) -var log = logrus.WithField("package", "internal/cron/task") - type task struct { db *pg.DB - queue queue.Queue + queue Queue cachedLocations sync.Map } @@ -49,8 +44,8 @@ func (t *task) loadLocation(timezone string) (*time.Location, error) { return location, nil } -func RegisterTasks(cfg *Config) error { - if err := validateConfig(cfg); err != nil { +func registerTasks(cfg *registerTasksConfig) error { + if err := validateRegisterTasksConfig(cfg); err != nil { return errors.Wrap(err, "config is invalid") } @@ -124,6 +119,6 @@ func RegisterTasks(cfg *Config) error { return nil } -func Get(taskName string) *taskq.Task { +func GetTask(taskName string) *taskq.Task { return taskq.Tasks.Get(taskName) } diff --git a/internal/cron/task/task_delete_non_existent_villages.go b/internal/cron/queue/task_delete_non_existent_villages.go similarity index 90% rename from internal/cron/task/task_delete_non_existent_villages.go rename to internal/cron/queue/task_delete_non_existent_villages.go index 47dc9a1..63da676 100644 --- a/internal/cron/task/task_delete_non_existent_villages.go +++ b/internal/cron/queue/task_delete_non_existent_villages.go @@ -1,12 +1,10 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twurlbuilder" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskDeleteNonExistentVillages struct { @@ -31,8 +29,8 @@ func (t *taskDeleteNonExistentVillages) execute() error { Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue") for _, server := range servers { err := t.queue.Add( - queue.Main, - Get(ServerDeleteNonExistentVillages). + Main, + GetTask(ServerDeleteNonExistentVillages). WithArgs( context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), diff --git a/internal/cron/task/task_load_servers_and_update_data.go b/internal/cron/queue/task_load_servers_and_update_data.go similarity index 94% rename from internal/cron/task/task_load_servers_and_update_data.go rename to internal/cron/queue/task_load_servers_and_update_data.go index 5d203ae..924fdc2 100644 --- a/internal/cron/task/task_load_servers_and_update_data.go +++ b/internal/cron/queue/task_load_servers_and_update_data.go @@ -1,4 +1,4 @@ -package task +package queue import ( "context" @@ -8,7 +8,6 @@ import ( "github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twmodel" - "github.com/tribalwarshelp/cron/internal/cron/queue" "github.com/tribalwarshelp/cron/internal/postgres" ) @@ -86,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { entry.Infof("%s: Servers have been loaded", version.Host) for _, server := range servers { - err := t.queue.Add(queue.Main, Get(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) + err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/task/task_load_versions_and_update_server_data.go b/internal/cron/queue/task_load_versions_and_update_server_data.go similarity index 84% rename from internal/cron/task/task_load_versions_and_update_server_data.go rename to internal/cron/queue/task_load_versions_and_update_server_data.go index 5447dfc..9e598a6 100644 --- a/internal/cron/task/task_load_versions_and_update_server_data.go +++ b/internal/cron/queue/task_load_versions_and_update_server_data.go @@ -1,11 +1,9 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskLoadVersionsAndUpdateServerData struct { @@ -22,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error { } log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") for _, version := range versions { - err := t.queue.Add(queue.Main, Get(LoadServersAndUpdateData).WithArgs(context.Background(), version)) + err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) if err != nil { log. WithField("code", version.Code). diff --git a/internal/cron/task/task_server_delete_non_existent_villages.go b/internal/cron/queue/task_server_delete_non_existent_villages.go similarity index 99% rename from internal/cron/task/task_server_delete_non_existent_villages.go rename to internal/cron/queue/task_server_delete_non_existent_villages.go index 737a156..cc65ef6 100644 --- a/internal/cron/task/task_server_delete_non_existent_villages.go +++ b/internal/cron/queue/task_server_delete_non_existent_villages.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/task/task_update_ennoblements.go b/internal/cron/queue/task_update_ennoblements.go similarity index 89% rename from internal/cron/task/task_update_ennoblements.go rename to internal/cron/queue/task_update_ennoblements.go index 0cae60d..56695eb 100644 --- a/internal/cron/task/task_update_ennoblements.go +++ b/internal/cron/queue/task_update_ennoblements.go @@ -1,12 +1,10 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twurlbuilder" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskUpdateEnnoblements struct { @@ -28,8 +26,8 @@ func (t *taskUpdateEnnoblements) execute() error { log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") for _, server := range servers { err := t.queue.Add( - queue.Ennoblements, - Get(UpdateServerEnnoblements). + Ennoblements, + GetTask(UpdateServerEnnoblements). WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server), ) if err != nil { diff --git a/internal/cron/task/task_update_history.go b/internal/cron/queue/task_update_history.go similarity index 88% rename from internal/cron/task/task_update_history.go rename to internal/cron/queue/task_update_history.go index e9c6ef2..84d9611 100644 --- a/internal/cron/task/task_update_history.go +++ b/internal/cron/queue/task_update_history.go @@ -1,12 +1,10 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" "time" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskUpdateHistory struct { @@ -43,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateHistory.execute: Update of the history has started") for _, server := range servers { - err := t.queue.Add(queue.Main, Get(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/task/task_update_server_data.go b/internal/cron/queue/task_update_server_data.go similarity index 99% rename from internal/cron/task/task_update_server_data.go rename to internal/cron/queue/task_update_server_data.go index 81b4254..384b8c6 100644 --- a/internal/cron/task/task_update_server_data.go +++ b/internal/cron/queue/task_update_server_data.go @@ -1,4 +1,4 @@ -package task +package queue import ( "context" diff --git a/internal/cron/task/task_update_server_ennoblements.go b/internal/cron/queue/task_update_server_ennoblements.go similarity index 99% rename from internal/cron/task/task_update_server_ennoblements.go rename to internal/cron/queue/task_update_server_ennoblements.go index fb71bad..16ecf5b 100644 --- a/internal/cron/task/task_update_server_ennoblements.go +++ b/internal/cron/queue/task_update_server_ennoblements.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/task/task_update_server_history.go b/internal/cron/queue/task_update_server_history.go similarity index 99% rename from internal/cron/task/task_update_server_history.go rename to internal/cron/queue/task_update_server_history.go index 98734d7..979fa5c 100644 --- a/internal/cron/task/task_update_server_history.go +++ b/internal/cron/queue/task_update_server_history.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/task/task_update_server_stats.go b/internal/cron/queue/task_update_server_stats.go similarity index 99% rename from internal/cron/task/task_update_server_stats.go rename to internal/cron/queue/task_update_server_stats.go index 6c7b243..c202592 100644 --- a/internal/cron/task/task_update_server_stats.go +++ b/internal/cron/queue/task_update_server_stats.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/task/task_update_stats.go b/internal/cron/queue/task_update_stats.go similarity index 88% rename from internal/cron/task/task_update_stats.go rename to internal/cron/queue/task_update_stats.go index e6eef3f..6a74df6 100644 --- a/internal/cron/task/task_update_stats.go +++ b/internal/cron/queue/task_update_stats.go @@ -1,12 +1,10 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" "time" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskUpdateStats struct { @@ -43,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateStats.execute: Update of the stats has started") for _, server := range servers { - err := t.queue.Add(queue.Main, Get(UpdateServerStats).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/task/task_vacuum.go b/internal/cron/queue/task_vacuum.go similarity index 81% rename from internal/cron/task/task_vacuum.go rename to internal/cron/queue/task_vacuum.go index 7da217b..0a229fd 100644 --- a/internal/cron/task/task_vacuum.go +++ b/internal/cron/queue/task_vacuum.go @@ -1,11 +1,9 @@ -package task +package queue import ( "context" "github.com/pkg/errors" "github.com/tribalwarshelp/shared/tw/twmodel" - - "github.com/tribalwarshelp/cron/internal/cron/queue" ) type taskVacuum struct { @@ -24,7 +22,7 @@ func (t *taskVacuum) execute() error { } log.Infof("taskVacuum.execute: The database vacumming process has started...") for _, server := range servers { - err := t.queue.Add(queue.Main, Get(VacuumServerDB).WithArgs(context.Background(), server)) + err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/task/task_vacuum_server_db.go b/internal/cron/queue/task_vacuum_server_db.go similarity index 99% rename from internal/cron/task/task_vacuum_server_db.go rename to internal/cron/queue/task_vacuum_server_db.go index a7b5625..cac4866 100644 --- a/internal/cron/task/task_vacuum_server_db.go +++ b/internal/cron/queue/task_vacuum_server_db.go @@ -1,4 +1,4 @@ -package task +package queue import ( "github.com/go-pg/pg/v10" diff --git a/internal/cron/task/config.go b/internal/cron/task/config.go deleted file mode 100644 index ea60f52..0000000 --- a/internal/cron/task/config.go +++ /dev/null @@ -1,22 +0,0 @@ -package task - -import ( - "github.com/go-pg/pg/v10" - "github.com/pkg/errors" - "github.com/tribalwarshelp/cron/internal/cron/queue" -) - -type Config struct { - DB *pg.DB - Queue queue.Queue -} - -func validateConfig(cfg *Config) error { - if cfg == nil || cfg.DB == nil { - return errors.New("cfg.DB is required") - } - if cfg.Queue == nil { - return errors.New("cfg.Queue is required") - } - return nil -}