diff --git a/.circleci/config.yml b/.circleci/config.yml index 3f00161..9a638e3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,10 +11,13 @@ jobs: # with credentials stored in the UI - run: | docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD - # build the application image - - run: docker build -t $DOCKER_LOGIN/twhelp-cron:latest . - # deploy the image + # build the cron image + - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:latest . + # build the data updater image + - run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:latest . + # deploy the images - run: docker push $DOCKER_LOGIN/twhelp-cron:latest + - run: docker push $DOCKER_LOGIN/twhelp-dataupdater:latest build_version: docker: - image: circleci/golang:1.16 @@ -26,10 +29,13 @@ jobs: # with credentials stored in the UI - run: | docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD - # build the application image - - run: docker build -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . - # deploy the image + # build the cron image + - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . + # build the data updater image + - run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . + # deploy the images - run: docker push $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') + - run: docker push $DOCKER_LOGIN/twhelp-dataupdater::$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') workflows: version: 2 deploy_latest: diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..11ee758 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.env.local diff --git a/README.md b/README.md index 324994e..60e0f7d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# tribalwarshelp.com cron +# datafetcher - Adds automatically new servers. - Fetches and updates server data (players, tribes, ODA, ODD, ODS, OD, conquers, configs). @@ -40,9 +40,13 @@ git clone git@github.com:tribalwarshelp/cron.git ``` 2. Open the folder with this project in a terminal. 3. Set the required env variables directly in your system or create .env.local file. -4. Run the app. +4. Run the cron. ``` -go run main.go +go run ./cmd/cron/main.go +``` +5. Run the data updater in a new tab. +``` +go run ./cmd/dataupdater/main.go ``` ## License diff --git a/Dockerfile b/cmd/cron/Dockerfile similarity index 80% rename from Dockerfile rename to cmd/cron/Dockerfile index 82d02a3..be47ad2 100644 --- a/Dockerfile +++ b/cmd/cron/Dockerfile @@ -12,7 +12,7 @@ RUN go mod download # Copy the source from the current directory to the Working Directory inside the container COPY . . -RUN go build -o twcron . +RUN go build -o twcron ./cmd/cron ######## Start a new stage from scratch ####### FROM alpine:latest @@ -28,7 +28,5 @@ ENV APP_MODE=production EXPOSE 8080 RUN apk add --no-cache tzdata -ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.2.1/wait ./wait -RUN chmod +x ./wait -CMD ./wait && ./twcron +CMD ./twcron diff --git a/cmd/cron/main.go b/cmd/cron/main.go new file mode 100644 index 0000000..8116715 --- /dev/null +++ b/cmd/cron/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "github.com/Kichiyaki/goutil/envutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" + "os/signal" + "syscall" + + "github.com/tribalwarshelp/cron/cmd/internal" + twhelpcron "github.com/tribalwarshelp/cron/pkg/cron" + "github.com/tribalwarshelp/cron/pkg/postgres" + "github.com/tribalwarshelp/cron/pkg/queue" +) + +func main() { + redisClient, err := internal.NewRedisClient() + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't connect to Redis")) + } + defer func() { + if err := redisClient.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "couldn't close the Redis connection")) + } + }() + + dbConn, err := postgres.Connect(nil) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't connect to the db")) + } + defer func() { + if err := dbConn.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "couldn't close the db connection")) + } + }() + + q, err := queue.New(&queue.Config{ + DB: dbConn, + Redis: redisClient, + WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't initialize a queue")) + } + + c, err := twhelpcron.New(&twhelpcron.Config{ + DB: dbConn, + RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), + Queue: q, + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't initialize a cron instance")) + } + if err := c.Start(); err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't start the cron")) + } + defer c.Stop() + + logrus.Info("Cron is up and running!") + + channel := make(chan os.Signal, 1) + signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) + <-channel + + logrus.Info("shutting down") +} diff --git a/cmd/dataupdater/Dockerfile b/cmd/dataupdater/Dockerfile new file mode 100644 index 0000000..fde64d0 --- /dev/null +++ b/cmd/dataupdater/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:alpine as builder + +# Set the Current Working Directory inside the container +WORKDIR /app + +# Copy go mod and sum files +COPY go.mod go.sum ./ + +# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed +RUN go mod download + +# Copy the source from the current directory to the Working Directory inside the container +COPY . . + +RUN go build -o twdataupdater ./cmd/dataupdater + +######## Start a new stage from scratch ####### +FROM alpine:latest + +RUN apk --no-cache add ca-certificates + +WORKDIR /root/ + +# Copy the Pre-built binary file from the previous stage +COPY --from=builder /app/twdataupdater . + +ENV APP_MODE=production +EXPOSE 8080 + +RUN apk add --no-cache tzdata + +CMD ./twdataupdater diff --git a/cmd/dataupdater/main.go b/cmd/dataupdater/main.go new file mode 100644 index 0000000..48837f0 --- /dev/null +++ b/cmd/dataupdater/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "github.com/Kichiyaki/goutil/envutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" + "os/signal" + "syscall" + + "github.com/tribalwarshelp/cron/cmd/internal" + "github.com/tribalwarshelp/cron/pkg/postgres" + "github.com/tribalwarshelp/cron/pkg/queue" +) + +func main() { + redisClient, err := internal.NewRedisClient() + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) + } + defer func() { + if err := redisClient.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection")) + } + }() + + dbConn, err := postgres.Connect(&postgres.Config{SkipDBInitialization: true}) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) + } + defer func() { + if err := dbConn.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the db connection")) + } + }() + + q, err := queue.New(&queue.Config{ + DB: dbConn, + Redis: redisClient, + WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue")) + } + if err := q.Start(context.Background()); err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't start the queue")) + } + + logrus.Info("Data updater is up and running!") + + channel := make(chan os.Signal, 1) + signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) + <-channel + + logrus.Info("shutting down") + if err := q.Close(); err != nil { + logrus.Fatal(err) + } +} diff --git a/cmd/internal/init.go b/cmd/internal/init.go new file mode 100644 index 0000000..91d373e --- /dev/null +++ b/cmd/internal/init.go @@ -0,0 +1,50 @@ +package internal + +import ( + "github.com/Kichiyaki/appmode" + "github.com/joho/godotenv" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" +) + +func init() { + if err := setENVs(); err != nil { + logrus.Fatal(err) + } + prepareLogger() +} + +func setENVs() error { + err := os.Setenv("TZ", "UTC") + if err != nil { + return errors.Wrap(err, "setENVs") + } + + if appmode.Equals(appmode.DevelopmentMode) { + err := godotenv.Load(".env.local") + if err != nil { + return errors.Wrap(err, "setENVs") + } + } + + return nil +} + +func prepareLogger() { + if appmode.Equals(appmode.DevelopmentMode) { + logrus.SetLevel(logrus.DebugLevel) + } + + timestampFormat := "2006-01-02 15:04:05" + if appmode.Equals(appmode.ProductionMode) { + customFormatter := new(logrus.JSONFormatter) + customFormatter.TimestampFormat = timestampFormat + logrus.SetFormatter(customFormatter) + } else { + customFormatter := new(logrus.TextFormatter) + customFormatter.TimestampFormat = timestampFormat + customFormatter.FullTimestamp = true + logrus.SetFormatter(customFormatter) + } +} diff --git a/cmd/internal/redis.go b/cmd/internal/redis.go new file mode 100644 index 0000000..443ea2c --- /dev/null +++ b/cmd/internal/redis.go @@ -0,0 +1,24 @@ +package internal + +import ( + "context" + "github.com/Kichiyaki/goutil/envutil" + "github.com/go-redis/redis/v8" + "github.com/pkg/errors" + "time" +) + +func NewRedisClient() (redis.UniversalClient, error) { + client := redis.NewClient(&redis.Options{ + Addr: envutil.GetenvString("REDIS_ADDR"), + Username: envutil.GetenvString("REDIS_USERNAME"), + Password: envutil.GetenvString("REDIS_PASSWORD"), + DB: envutil.GetenvInt("REDIS_DB"), + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := client.Ping(ctx).Err(); err != nil { + return nil, errors.Wrap(err, "NewRedisClient") + } + return client, nil +} diff --git a/main.go b/main.go deleted file mode 100644 index d9d3745..0000000 --- a/main.go +++ /dev/null @@ -1,122 +0,0 @@ -package main - -import ( - "context" - "github.com/Kichiyaki/appmode" - "github.com/Kichiyaki/goutil/envutil" - "github.com/go-redis/redis/v8" - "github.com/pkg/errors" - "os" - "os/signal" - "syscall" - "time" - - "github.com/sirupsen/logrus" - - twhelpcron "github.com/tribalwarshelp/cron/internal/cron" - "github.com/tribalwarshelp/cron/internal/postgres" - - "github.com/joho/godotenv" -) - -func init() { - if err := setupENVs(); err != nil { - logrus.Fatal(err) - } - setupLogger() -} - -func main() { - redisClient, err := initializeRedis() - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) - } - defer func() { - if err := redisClient.Close(); err != nil { - logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection")) - } - }() - - dbConn, err := postgres.Connect(&postgres.Config{LogQueries: envutil.GetenvBool("LOG_DB_QUERIES")}) - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) - } - defer func() { - if err := dbConn.Close(); err != nil { - logrus.Warn(errors.Wrap(err, "Couldn't close the db connection")) - } - }() - - c, err := twhelpcron.New(&twhelpcron.Config{ - DB: dbConn, - RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), - Redis: redisClient, - WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), - }) - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance")) - } - if err := c.Start(context.Background()); err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't start the cron")) - } - - logrus.Info("Cron is up and running!") - - channel := make(chan os.Signal, 1) - signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) - <-channel - - logrus.Info("shutting down") - if err := c.Stop(); err != nil { - logrus.Fatal(err) - } -} - -func setupENVs() error { - err := os.Setenv("TZ", "UTC") - if err != nil { - return errors.Wrap(err, "setupENVs") - } - - if appmode.Equals(appmode.DevelopmentMode) { - err := godotenv.Load(".env.local") - if err != nil { - return errors.Wrap(err, "setupENVs") - } - } - - return nil -} - -func setupLogger() { - if appmode.Equals(appmode.DevelopmentMode) { - logrus.SetLevel(logrus.DebugLevel) - } - - timestampFormat := "2006-01-02 15:04:05" - if appmode.Equals(appmode.ProductionMode) { - customFormatter := new(logrus.JSONFormatter) - customFormatter.TimestampFormat = timestampFormat - logrus.SetFormatter(customFormatter) - } else { - customFormatter := new(logrus.TextFormatter) - customFormatter.TimestampFormat = timestampFormat - customFormatter.FullTimestamp = true - logrus.SetFormatter(customFormatter) - } -} - -func initializeRedis() (redis.UniversalClient, error) { - client := redis.NewClient(&redis.Options{ - Addr: envutil.GetenvString("REDIS_ADDR"), - Username: envutil.GetenvString("REDIS_USERNAME"), - Password: envutil.GetenvString("REDIS_PASSWORD"), - DB: envutil.GetenvInt("REDIS_DB"), - }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := client.Ping(ctx).Err(); err != nil { - return nil, errors.Wrap(err, "initializeRedis") - } - return client, nil -} diff --git a/internal/cron/config.go b/pkg/cron/config.go similarity index 55% rename from internal/cron/config.go rename to pkg/cron/config.go index ce0b61e..0ef790a 100644 --- a/internal/cron/config.go +++ b/pkg/cron/config.go @@ -2,23 +2,23 @@ package cron import ( "github.com/go-pg/pg/v10" - "github.com/go-redis/redis/v8" "github.com/pkg/errors" + + "github.com/tribalwarshelp/cron/pkg/queue" ) type Config struct { - DB *pg.DB - Redis redis.UniversalClient - RunOnInit bool - WorkerLimit int + DB *pg.DB + Queue *queue.Queue + RunOnInit bool } func validateConfig(cfg *Config) error { if cfg == nil || cfg.DB == nil { return errors.New("cfg.DB is required") } - if cfg.Redis == nil { - return errors.New("cfg.Redis is required") + if cfg.Queue == nil { + return errors.New("cfg.Queue is required") } return nil } diff --git a/internal/cron/cron.go b/pkg/cron/cron.go similarity index 72% rename from internal/cron/cron.go rename to pkg/cron/cron.go index c9d754f..8a69cb5 100644 --- a/internal/cron/cron.go +++ b/pkg/cron/cron.go @@ -10,12 +10,12 @@ import ( "github.com/robfig/cron/v3" - "github.com/tribalwarshelp/cron/internal/cron/queue" + "github.com/tribalwarshelp/cron/pkg/queue" ) type Cron struct { *cron.Cron - queue queue.Queue + queue *queue.Queue db *pg.DB runOnInit bool log logrus.FieldLogger @@ -25,18 +25,14 @@ func New(cfg *Config) (*Cron, error) { if err := validateConfig(cfg); err != nil { return nil, err } - q, err := initializeQueue(cfg) - if err != nil { - return nil, err - } - log := logrus.WithField("package", "internal/cron") + log := logrus.WithField("package", "pkg/cron") c := &Cron{ Cron: cron.New(cron.WithChain( cron.SkipIfStillRunning( cron.PrintfLogger(log), ), )), - queue: q, + queue: cfg.Queue, db: cfg.DB, runOnInit: cfg.RunOnInit, log: log, @@ -96,59 +92,53 @@ func (c *Cron) init() error { return nil } -func (c *Cron) Start(ctx context.Context) error { - if err := c.queue.Start(ctx); err != nil { - return err - } +func (c *Cron) Start() error { c.Cron.Start() return nil } func (c *Cron) Stop() error { c.Cron.Stop() - if err := c.queue.Close(); err != nil { - return err - } return nil } func (c *Cron) updateServerData() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) if err != nil { c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err) } } func (c *Cron) updateEnnoblements() { - err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) if err != nil { c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err) } } func (c *Cron) updateHistory(timezone string) { - err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) if err != nil { c.logError("Cron.updateHistory", queue.UpdateHistory, err) } } func (c *Cron) updateStats(timezone string) { - err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) if err != nil { c.logError("Cron.updateStats", queue.UpdateStats, err) } } func (c *Cron) vacuumDatabase() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.Vacuum).WithArgs(context.Background())) if err != nil { c.logError("Cron.vacuumDatabase", queue.Vacuum, err) } } func (c *Cron) deleteNonExistentVillages() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) if err != nil { c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err) } @@ -165,15 +155,6 @@ func (c *Cron) logError(prefix string, taskName string, err error) { ) } -func initializeQueue(cfg *Config) (queue.Queue, error) { - q, err := queue.New(&queue.Config{ - WorkerLimit: cfg.WorkerLimit, - Redis: cfg.Redis, - DB: cfg.DB, - }) - return q, errors.Wrap(err, "couldn't initialize a queue") -} - func createFnWithTimezone(timezone string, fn func(timezone string)) func() { return func() { fn(timezone) diff --git a/internal/postgres/postgres.go b/pkg/postgres/postgres.go similarity index 86% rename from internal/postgres/postgres.go rename to pkg/postgres/postgres.go index b48a018..87e3a30 100644 --- a/internal/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -11,24 +11,26 @@ import ( "github.com/tribalwarshelp/shared/tw/twmodel" ) -var log = logrus.WithField("package", "internal/postgres") +var log = logrus.WithField("package", "pkg/postgres") type Config struct { - LogQueries bool + SkipDBInitialization bool } func Connect(cfg *Config) (*pg.DB, error) { db := pg.Connect(prepareOptions()) - if cfg != nil && cfg.LogQueries { + if envutil.GetenvBool("LOG_DB_QUERIES") { db.AddQueryHook(querylogger.Logger{ Log: log, MaxQueryLength: 2000, }) } - if err := prepareDB(db); err != nil { - return nil, err + if cfg == nil || !cfg.SkipDBInitialization { + if err := prepareDB(db); err != nil { + return nil, err + } } return db, nil @@ -110,7 +112,7 @@ func prepareDB(db *pg.DB) error { } for _, server := range servers { - if err := createSchema(db, server, true); err != nil { + if err := createServerSchema(db, server, true); err != nil { return err } } @@ -118,11 +120,11 @@ func prepareDB(db *pg.DB) error { return nil } -func CreateSchema(db *pg.DB, server *twmodel.Server) error { - return createSchema(db, server, false) +func CreateServerSchema(db *pg.DB, server *twmodel.Server) error { + return createServerSchema(db, server, false) } -func SchemaExists(db *pg.DB, schemaName string) bool { +func SchemaExists(db pg.DBI, schemaName string) bool { exists, err := db. Model(). Table("information_schema.schemata"). @@ -134,7 +136,7 @@ func SchemaExists(db *pg.DB, schemaName string) bool { return exists } -func createSchema(db *pg.DB, server *twmodel.Server, init bool) error { +func createServerSchema(db *pg.DB, server *twmodel.Server, init bool) error { if !init && SchemaExists(db, server.Key) { return nil } @@ -145,7 +147,7 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error { } defer func() { if err := tx.Close(); err != nil { - log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction")) + log.Warn(errors.Wrap(err, "createServerSchema: Couldn't rollback the transaction")) } }() diff --git a/internal/postgres/sql_statements.go b/pkg/postgres/sql_statements.go similarity index 100% rename from internal/postgres/sql_statements.go rename to pkg/postgres/sql_statements.go diff --git a/internal/cron/queue/config.go b/pkg/queue/config.go similarity index 97% rename from internal/cron/queue/config.go rename to pkg/queue/config.go index 99a88ba..7a32759 100644 --- a/internal/cron/queue/config.go +++ b/pkg/queue/config.go @@ -21,7 +21,7 @@ func validateConfig(cfg *Config) error { type registerTasksConfig struct { DB *pg.DB - Queue Queue + Queue *Queue } func validateRegisterTasksConfig(cfg *registerTasksConfig) error { diff --git a/internal/cron/queue/helpers.go b/pkg/queue/helpers.go similarity index 100% rename from internal/cron/queue/helpers.go rename to pkg/queue/helpers.go diff --git a/internal/cron/queue/queue.go b/pkg/queue/queue.go similarity index 58% rename from internal/cron/queue/queue.go rename to pkg/queue/queue.go index e021a78..32607d7 100644 --- a/internal/cron/queue/queue.go +++ b/pkg/queue/queue.go @@ -11,32 +11,21 @@ import ( "github.com/vmihailenco/taskq/v3/redisq" ) -const ( - Main = "main" - Ennoblements = "ennoblements" -) +var log = logrus.WithField("package", "pkg/queue") -var log = logrus.WithField("package", "internal/cron/queue") - -type Queue interface { - Start(ctx context.Context) error - Close() error - Add(name string, msg *taskq.Message) error -} - -type queue struct { +type Queue struct { redis redis.UniversalClient main taskq.Queue ennoblements taskq.Queue factory taskq.Factory } -func New(cfg *Config) (Queue, error) { +func New(cfg *Config) (*Queue, error) { if err := validateConfig(cfg); err != nil { return nil, err } - q := &queue{ + q := &Queue{ redis: cfg.Redis, } @@ -47,23 +36,22 @@ func New(cfg *Config) (Queue, error) { return q, nil } -func (q *queue) init(cfg *Config) error { +func (q *Queue) init(cfg *Config) error { q.factory = redisq.NewFactory() - q.main = q.registerQueue(Main, cfg.WorkerLimit) - q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit) + q.main = q.registerQueue("main", cfg.WorkerLimit) + q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit) - err := registerTasks(®isterTasksConfig{ + if err := registerTasks(®isterTasksConfig{ DB: cfg.DB, Queue: q, - }) - if err != nil { + }); err != nil { return errors.Wrapf(err, "couldn't register tasks") } return nil } -func (q *queue) registerQueue(name string, limit int) taskq.Queue { +func (q *Queue) registerQueue(name string, limit int) taskq.Queue { return q.factory.RegisterQueue(&taskq.QueueOptions{ Name: name, ReservationTimeout: time.Minute * 2, @@ -73,34 +61,45 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue { }) } -func (q *queue) getQueueByName(name string) taskq.Queue { +func (q *Queue) getQueueByTaskName(name string) taskq.Queue { switch name { - case Main: + case LoadVersionsAndUpdateServerData, + LoadServersAndUpdateData, + UpdateServerData, + Vacuum, + VacuumServerData, + UpdateHistory, + UpdateServerHistory, + UpdateStats, + UpdateServerStats, + DeleteNonExistentVillages, + ServerDeleteNonExistentVillages: return q.main - case Ennoblements: + case UpdateEnnoblements, + UpdateServerEnnoblements: return q.ennoblements } return nil } -func (q *queue) Start(ctx context.Context) error { +func (q *Queue) Start(ctx context.Context) error { if err := q.factory.StartConsumers(ctx); err != nil { return errors.Wrap(err, "couldn't start the queue") } return nil } -func (q *queue) Close() error { +func (q *Queue) Close() error { if err := q.factory.Close(); err != nil { return errors.Wrap(err, "couldn't close the queue") } return nil } -func (q *queue) Add(name string, msg *taskq.Message) error { - queue := q.getQueueByName(name) +func (q *Queue) Add(msg *taskq.Message) error { + queue := q.getQueueByTaskName(msg.TaskName) if queue == nil { - return errors.Errorf("couldn't add the message to the queue: unknown queue name '%s'", name) + return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName) } if err := queue.Add(msg); err != nil { return errors.Wrap(err, "couldn't add the message to the queue") diff --git a/internal/cron/queue/task.go b/pkg/queue/task.go similarity index 95% rename from internal/cron/queue/task.go rename to pkg/queue/task.go index ae92bd7..77c5335 100644 --- a/internal/cron/queue/task.go +++ b/pkg/queue/task.go @@ -13,7 +13,7 @@ const ( LoadServersAndUpdateData = "loadServersAndUpdateData" UpdateServerData = "updateServerData" Vacuum = "vacuum" - VacuumServerDB = "vacuumServerDB" + VacuumServerData = "vacuumServerData" UpdateEnnoblements = "updateEnnoblements" UpdateServerEnnoblements = "updateServerEnnoblements" UpdateHistory = "updateHistory" @@ -27,7 +27,7 @@ const ( type task struct { db *pg.DB - queue Queue + queue *Queue cachedLocations sync.Map } @@ -71,8 +71,8 @@ func registerTasks(cfg *registerTasksConfig) error { Handler: (&taskVacuum{t}).execute, }, { - Name: VacuumServerDB, - Handler: (&taskVacuumServerDB{t}).execute, + Name: VacuumServerData, + Handler: (&taskVacuumServerData{t}).execute, }, { Name: UpdateEnnoblements, diff --git a/internal/cron/queue/task_delete_non_existent_villages.go b/pkg/queue/task_delete_non_existent_villages.go similarity index 99% rename from internal/cron/queue/task_delete_non_existent_villages.go rename to pkg/queue/task_delete_non_existent_villages.go index 63da676..3af9b5b 100644 --- a/internal/cron/queue/task_delete_non_existent_villages.go +++ b/pkg/queue/task_delete_non_existent_villages.go @@ -29,7 +29,6 @@ func (t *taskDeleteNonExistentVillages) execute() error { Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue") for _, server := range servers { err := t.queue.Add( - Main, GetTask(ServerDeleteNonExistentVillages). WithArgs( context.Background(), diff --git a/internal/cron/queue/task_load_servers_and_update_data.go b/pkg/queue/task_load_servers_and_update_data.go similarity index 92% rename from internal/cron/queue/task_load_servers_and_update_data.go rename to pkg/queue/task_load_servers_and_update_data.go index 924fdc2..13a71c8 100644 --- a/internal/cron/queue/task_load_servers_and_update_data.go +++ b/pkg/queue/task_load_servers_and_update_data.go @@ -8,7 +8,7 @@ import ( "github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twmodel" - "github.com/tribalwarshelp/cron/internal/postgres" + "github.com/tribalwarshelp/cron/pkg/postgres" ) type taskLoadServersAndUpdateData struct { @@ -50,7 +50,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { VersionCode: version.Code, Version: version, } - if err := postgres.CreateSchema(t.db, server); err != nil { + if err := postgres.CreateServerSchema(t.db, server); err != nil { logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: Couldn't create the schema", server.Key)) continue } @@ -85,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { entry.Infof("%s: Servers have been loaded", version.Host) for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) + err := t.queue.Add(GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/queue/task_load_versions_and_update_server_data.go b/pkg/queue/task_load_versions_and_update_server_data.go similarity index 90% rename from internal/cron/queue/task_load_versions_and_update_server_data.go rename to pkg/queue/task_load_versions_and_update_server_data.go index 9e598a6..80d5fe3 100644 --- a/internal/cron/queue/task_load_versions_and_update_server_data.go +++ b/pkg/queue/task_load_versions_and_update_server_data.go @@ -20,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error { } log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") for _, version := range versions { - err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) + err := t.queue.Add(GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) if err != nil { log. WithField("code", version.Code). diff --git a/internal/cron/queue/task_server_delete_non_existent_villages.go b/pkg/queue/task_server_delete_non_existent_villages.go similarity index 100% rename from internal/cron/queue/task_server_delete_non_existent_villages.go rename to pkg/queue/task_server_delete_non_existent_villages.go diff --git a/internal/cron/queue/task_update_ennoblements.go b/pkg/queue/task_update_ennoblements.go similarity index 98% rename from internal/cron/queue/task_update_ennoblements.go rename to pkg/queue/task_update_ennoblements.go index 56695eb..f5103b8 100644 --- a/internal/cron/queue/task_update_ennoblements.go +++ b/pkg/queue/task_update_ennoblements.go @@ -26,7 +26,6 @@ func (t *taskUpdateEnnoblements) execute() error { log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") for _, server := range servers { err := t.queue.Add( - Ennoblements, GetTask(UpdateServerEnnoblements). WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server), ) diff --git a/internal/cron/queue/task_update_history.go b/pkg/queue/task_update_history.go similarity index 92% rename from internal/cron/queue/task_update_history.go rename to pkg/queue/task_update_history.go index 84d9611..9be1623 100644 --- a/internal/cron/queue/task_update_history.go +++ b/pkg/queue/task_update_history.go @@ -41,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateHistory.execute: Update of the history has started") for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/queue/task_update_server_data.go b/pkg/queue/task_update_server_data.go similarity index 100% rename from internal/cron/queue/task_update_server_data.go rename to pkg/queue/task_update_server_data.go diff --git a/internal/cron/queue/task_update_server_ennoblements.go b/pkg/queue/task_update_server_ennoblements.go similarity index 100% rename from internal/cron/queue/task_update_server_ennoblements.go rename to pkg/queue/task_update_server_ennoblements.go diff --git a/internal/cron/queue/task_update_server_history.go b/pkg/queue/task_update_server_history.go similarity index 100% rename from internal/cron/queue/task_update_server_history.go rename to pkg/queue/task_update_server_history.go diff --git a/internal/cron/queue/task_update_server_stats.go b/pkg/queue/task_update_server_stats.go similarity index 100% rename from internal/cron/queue/task_update_server_stats.go rename to pkg/queue/task_update_server_stats.go diff --git a/internal/cron/queue/task_update_stats.go b/pkg/queue/task_update_stats.go similarity index 92% rename from internal/cron/queue/task_update_stats.go rename to pkg/queue/task_update_stats.go index 6a74df6..9628bd9 100644 --- a/internal/cron/queue/task_update_stats.go +++ b/pkg/queue/task_update_stats.go @@ -41,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateStats.execute: Update of the stats has started") for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/queue/task_vacuum.go b/pkg/queue/task_vacuum.go similarity index 88% rename from internal/cron/queue/task_vacuum.go rename to pkg/queue/task_vacuum.go index 0a229fd..7e9903d 100644 --- a/internal/cron/queue/task_vacuum.go +++ b/pkg/queue/task_vacuum.go @@ -22,7 +22,7 @@ func (t *taskVacuum) execute() error { } log.Infof("taskVacuum.execute: The database vacumming process has started...") for _, server := range servers { - err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server)) + err := t.queue.Add(GetTask(VacuumServerData).WithArgs(context.Background(), server)) if err != nil { log. WithField("key", server.Key). diff --git a/internal/cron/queue/task_vacuum_server_db.go b/pkg/queue/task_vacuum_server_data.go similarity index 83% rename from internal/cron/queue/task_vacuum_server_db.go rename to pkg/queue/task_vacuum_server_data.go index cac4866..ed229c8 100644 --- a/internal/cron/queue/task_vacuum_server_db.go +++ b/pkg/queue/task_vacuum_server_data.go @@ -11,32 +11,32 @@ const ( day = 24 * time.Hour ) -type taskVacuumServerDB struct { +type taskVacuumServerData struct { *task } -func (t *taskVacuumServerDB) execute(server *twmodel.Server) error { +func (t *taskVacuumServerData) execute(server *twmodel.Server) error { if err := t.validatePayload(server); err != nil { - log.Debug(errors.Wrap(err, "taskVacuumServerDB.execute")) + log.Debug(errors.Wrap(err, "taskVacuumServerData.execute")) return nil } entry := log.WithField("key", server.Key) - entry.Infof("taskVacuumServerDB.execute: %s: Vacumming the database...", server.Key) + entry.Infof("taskVacuumServerData.execute: %s: Vacumming the database...", server.Key) err := (&workerVacuumServerDB{ db: t.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, }).vacuum() if err != nil { - err = errors.Wrap(err, "taskVacuumServerDB.execute") + err = errors.Wrap(err, "taskVacuumServerData.execute") entry.Error(err) return err } - entry.Infof("taskVacuumServerDB.execute: %s: The database has been vacummed", server.Key) + entry.Infof("taskVacuumServerData.execute: %s: The database has been vacummed", server.Key) return nil } -func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error { +func (t *taskVacuumServerData) validatePayload(server *twmodel.Server) error { if server == nil { return errors.New("expected *twmodel.Server, got nil") }