chore: move all files from the task package to the queue package

This commit is contained in:
Dawid Wysokiński 2021-06-05 09:14:17 +02:00
parent 1aa45851dc
commit 643a50171b
19 changed files with 78 additions and 100 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/tribalwarshelp/cron/internal/cron/queue" "github.com/tribalwarshelp/cron/internal/cron/queue"
"github.com/tribalwarshelp/cron/internal/cron/task"
) )
type Cron struct { type Cron struct {
@ -114,44 +113,44 @@ func (c *Cron) Stop() error {
} }
func (c *Cron) updateServerData() { func (c *Cron) updateServerData() {
err := c.queue.Add(queue.Main, task.Get(task.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.updateServerData", task.LoadVersionsAndUpdateServerData, err) c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err)
} }
} }
func (c *Cron) updateEnnoblements() { func (c *Cron) updateEnnoblements() {
err := c.queue.Add(queue.Ennoblements, task.Get(task.UpdateEnnoblements).WithArgs(context.Background())) err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.updateEnnoblements", task.UpdateEnnoblements, err) c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err)
} }
} }
func (c *Cron) updateHistory(timezone string) { func (c *Cron) updateHistory(timezone string) {
err := c.queue.Add(queue.Main, task.Get(task.UpdateHistory).WithArgs(context.Background(), timezone)) err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone))
if err != nil { if err != nil {
c.logError("Cron.updateHistory", task.UpdateHistory, err) c.logError("Cron.updateHistory", queue.UpdateHistory, err)
} }
} }
func (c *Cron) updateStats(timezone string) { func (c *Cron) updateStats(timezone string) {
err := c.queue.Add(queue.Main, task.Get(task.UpdateStats).WithArgs(context.Background(), timezone)) err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone))
if err != nil { if err != nil {
c.logError("Cron.updateStats", task.UpdateStats, err) c.logError("Cron.updateStats", queue.UpdateStats, err)
} }
} }
func (c *Cron) vacuumDatabase() { func (c *Cron) vacuumDatabase() {
err := c.queue.Add(queue.Main, task.Get(task.Vacuum).WithArgs(context.Background())) err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.vacuumDatabase", task.Vacuum, err) c.logError("Cron.vacuumDatabase", queue.Vacuum, err)
} }
} }
func (c *Cron) deleteNonExistentVillages() { func (c *Cron) deleteNonExistentVillages() {
err := c.queue.Add(queue.Main, task.Get(task.DeleteNonExistentVillages).WithArgs(context.Background())) err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.deleteNonExistentVillages", task.DeleteNonExistentVillages, err) c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err)
} }
} }
@ -170,18 +169,9 @@ func initializeQueue(cfg *Config) (queue.Queue, error) {
q, err := queue.New(&queue.Config{ q, err := queue.New(&queue.Config{
WorkerLimit: cfg.WorkerLimit, WorkerLimit: cfg.WorkerLimit,
Redis: cfg.Redis, Redis: cfg.Redis,
DB: cfg.DB,
}) })
if err != nil { return q, errors.Wrap(err, "couldn't initialize a queue")
return nil, errors.Wrap(err, "couldn't initialize a queue")
}
err = task.RegisterTasks(&task.Config{
DB: cfg.DB,
Queue: q,
})
if err != nil {
return nil, errors.Wrap(err, "couldn't register tasks")
}
return q, nil
} }
func createFnWithTimezone(timezone string, fn func(timezone string)) func() { func createFnWithTimezone(timezone string, fn func(timezone string)) func() {

View File

@ -1,6 +1,7 @@
package queue package queue
import ( import (
"github.com/go-pg/pg/v10"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -8,6 +9,7 @@ import (
type Config struct { type Config struct {
Redis redis.UniversalClient Redis redis.UniversalClient
WorkerLimit int WorkerLimit int
DB *pg.DB
} }
func validateConfig(cfg *Config) error { func validateConfig(cfg *Config) error {
@ -16,3 +18,18 @@ func validateConfig(cfg *Config) error {
} }
return nil return nil
} }
type registerTasksConfig struct {
DB *pg.DB
Queue Queue
}
func validateRegisterTasksConfig(cfg *registerTasksConfig) error {
if cfg == nil || cfg.DB == nil {
return errors.New("cfg.DB is required")
}
if cfg.Queue == nil {
return errors.New("cfg.Queue is required")
}
return nil
}

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twdataloader"

View File

@ -3,6 +3,7 @@ package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@ -15,6 +16,8 @@ const (
Ennoblements = "ennoblements" Ennoblements = "ennoblements"
) )
var log = logrus.WithField("package", "internal/cron/queue")
type Queue interface { type Queue interface {
Start(ctx context.Context) error Start(ctx context.Context) error
Close() error Close() error
@ -22,10 +25,10 @@ type Queue interface {
} }
type queue struct { type queue struct {
redis redis.UniversalClient redis redis.UniversalClient
mainQueue taskq.Queue main taskq.Queue
ennoblementsQueue taskq.Queue ennoblements taskq.Queue
factory taskq.Factory factory taskq.Factory
} }
func New(cfg *Config) (Queue, error) { func New(cfg *Config) (Queue, error) {
@ -46,8 +49,16 @@ func New(cfg *Config) (Queue, error) {
func (q *queue) init(cfg *Config) error { func (q *queue) init(cfg *Config) error {
q.factory = redisq.NewFactory() q.factory = redisq.NewFactory()
q.mainQueue = q.registerQueue(Main, cfg.WorkerLimit) q.main = q.registerQueue(Main, cfg.WorkerLimit)
q.ennoblementsQueue = q.registerQueue(Ennoblements, cfg.WorkerLimit) q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit)
err := registerTasks(&registerTasksConfig{
DB: cfg.DB,
Queue: q,
})
if err != nil {
return errors.Wrapf(err, "couldn't register tasks")
}
return nil return nil
} }
@ -65,9 +76,9 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue {
func (q *queue) getQueueByName(name string) taskq.Queue { func (q *queue) getQueueByName(name string) taskq.Queue {
switch name { switch name {
case Main: case Main:
return q.mainQueue return q.main
case Ennoblements: case Ennoblements:
return q.ennoblementsQueue return q.ennoblements
} }
return nil return nil
} }

View File

@ -1,14 +1,11 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmihailenco/taskq/v3" "github.com/vmihailenco/taskq/v3"
"sync" "sync"
"time" "time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
const ( const (
@ -28,11 +25,9 @@ const (
defaultRetryLimit = 3 defaultRetryLimit = 3
) )
var log = logrus.WithField("package", "internal/cron/task")
type task struct { type task struct {
db *pg.DB db *pg.DB
queue queue.Queue queue Queue
cachedLocations sync.Map cachedLocations sync.Map
} }
@ -49,8 +44,8 @@ func (t *task) loadLocation(timezone string) (*time.Location, error) {
return location, nil return location, nil
} }
func RegisterTasks(cfg *Config) error { func registerTasks(cfg *registerTasksConfig) error {
if err := validateConfig(cfg); err != nil { if err := validateRegisterTasksConfig(cfg); err != nil {
return errors.Wrap(err, "config is invalid") return errors.Wrap(err, "config is invalid")
} }
@ -124,6 +119,6 @@ func RegisterTasks(cfg *Config) error {
return nil return nil
} }
func Get(taskName string) *taskq.Task { func GetTask(taskName string) *taskq.Task {
return taskq.Tasks.Get(taskName) return taskq.Tasks.Get(taskName)
} }

View File

@ -1,12 +1,10 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/shared/tw/twurlbuilder" "github.com/tribalwarshelp/shared/tw/twurlbuilder"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskDeleteNonExistentVillages struct { type taskDeleteNonExistentVillages struct {
@ -31,8 +29,8 @@ func (t *taskDeleteNonExistentVillages) execute() error {
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue") Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add( err := t.queue.Add(
queue.Main, Main,
Get(ServerDeleteNonExistentVillages). GetTask(ServerDeleteNonExistentVillages).
WithArgs( WithArgs(
context.Background(), context.Background(),
twurlbuilder.BuildServerURL(server.Key, server.Version.Host), twurlbuilder.BuildServerURL(server.Key, server.Version.Host),

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"context" "context"
@ -8,7 +8,6 @@ import (
"github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twdataloader"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/cron/internal/cron/queue"
"github.com/tribalwarshelp/cron/internal/postgres" "github.com/tribalwarshelp/cron/internal/postgres"
) )
@ -86,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
entry.Infof("%s: Servers have been loaded", version.Host) entry.Infof("%s: Servers have been loaded", version.Host)
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(queue.Main, Get(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -1,11 +1,9 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskLoadVersionsAndUpdateServerData struct { type taskLoadVersionsAndUpdateServerData struct {
@ -22,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error {
} }
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
for _, version := range versions { for _, version := range versions {
err := t.queue.Add(queue.Main, Get(LoadServersAndUpdateData).WithArgs(context.Background(), version)) err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version))
if err != nil { if err != nil {
log. log.
WithField("code", version.Code). WithField("code", version.Code).

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"

View File

@ -1,12 +1,10 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/shared/tw/twurlbuilder" "github.com/tribalwarshelp/shared/tw/twurlbuilder"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskUpdateEnnoblements struct { type taskUpdateEnnoblements struct {
@ -28,8 +26,8 @@ func (t *taskUpdateEnnoblements) execute() error {
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add( err := t.queue.Add(
queue.Ennoblements, Ennoblements,
Get(UpdateServerEnnoblements). GetTask(UpdateServerEnnoblements).
WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server), WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server),
) )
if err != nil { if err != nil {

View File

@ -1,12 +1,10 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"time" "time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskUpdateHistory struct { type taskUpdateHistory struct {
@ -43,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error {
WithField("numberOfServers", len(servers)). WithField("numberOfServers", len(servers)).
Info("taskUpdateHistory.execute: Update of the history has started") Info("taskUpdateHistory.execute: Update of the history has started")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(queue.Main, Get(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"

View File

@ -1,12 +1,10 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"time" "time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskUpdateStats struct { type taskUpdateStats struct {
@ -43,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error {
WithField("numberOfServers", len(servers)). WithField("numberOfServers", len(servers)).
Info("taskUpdateStats.execute: Update of the stats has started") Info("taskUpdateStats.execute: Update of the stats has started")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(queue.Main, Get(UpdateServerStats).WithArgs(context.Background(), timezone, server)) err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -1,11 +1,9 @@
package task package queue
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/cron/internal/cron/queue"
) )
type taskVacuum struct { type taskVacuum struct {
@ -24,7 +22,7 @@ func (t *taskVacuum) execute() error {
} }
log.Infof("taskVacuum.execute: The database vacumming process has started...") log.Infof("taskVacuum.execute: The database vacumming process has started...")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(queue.Main, Get(VacuumServerDB).WithArgs(context.Background(), server)) err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -1,4 +1,4 @@
package task package queue
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"

View File

@ -1,22 +0,0 @@
package task
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type Config struct {
DB *pg.DB
Queue queue.Queue
}
func validateConfig(cfg *Config) error {
if cfg == nil || cfg.DB == nil {
return errors.New("cfg.DB is required")
}
if cfg.Queue == nil {
return errors.New("cfg.Queue is required")
}
return nil
}