Merge pull request #16 from tribalwarshelp/feat/separate-binaries

feat: separate binaries (cron/dataupdater)
This commit is contained in:
Dawid Wysokiński 2021-07-11 11:29:16 +02:00 committed by GitHub
commit 5bbe01fff6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 334 additions and 234 deletions

View File

@ -11,10 +11,13 @@ jobs:
# with credentials stored in the UI # with credentials stored in the UI
- run: | - run: |
docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD
# build the application image # build the cron image
- run: docker build -t $DOCKER_LOGIN/twhelp-cron:latest . - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:latest .
# deploy the image # build the data updater image
- run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:latest .
# deploy the images
- run: docker push $DOCKER_LOGIN/twhelp-cron:latest - run: docker push $DOCKER_LOGIN/twhelp-cron:latest
- run: docker push $DOCKER_LOGIN/twhelp-dataupdater:latest
build_version: build_version:
docker: docker:
- image: circleci/golang:1.16 - image: circleci/golang:1.16
@ -26,10 +29,13 @@ jobs:
# with credentials stored in the UI # with credentials stored in the UI
- run: | - run: |
docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD
# build the application image # build the cron image
- run: docker build -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') .
# deploy the image # build the data updater image
- run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') .
# deploy the images
- run: docker push $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') - run: docker push $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//')
- run: docker push $DOCKER_LOGIN/twhelp-dataupdater::$(echo $CIRCLE_TAG | sed -r 's/^.{1}//')
workflows: workflows:
version: 2 version: 2
deploy_latest: deploy_latest:

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
.env.local

View File

@ -1,4 +1,4 @@
# tribalwarshelp.com cron # datafetcher
- Adds automatically new servers. - Adds automatically new servers.
- Fetches and updates server data (players, tribes, ODA, ODD, ODS, OD, conquers, configs). - Fetches and updates server data (players, tribes, ODA, ODD, ODS, OD, conquers, configs).
@ -40,9 +40,13 @@ git clone git@github.com:tribalwarshelp/cron.git
``` ```
2. Open the folder with this project in a terminal. 2. Open the folder with this project in a terminal.
3. Set the required env variables directly in your system or create .env.local file. 3. Set the required env variables directly in your system or create .env.local file.
4. Run the app. 4. Run the cron.
``` ```
go run main.go go run ./cmd/cron/main.go
```
5. Run the data updater in a new tab.
```
go run ./cmd/dataupdater/main.go
``` ```
## License ## License

View File

@ -12,7 +12,7 @@ RUN go mod download
# Copy the source from the current directory to the Working Directory inside the container # Copy the source from the current directory to the Working Directory inside the container
COPY . . COPY . .
RUN go build -o twcron . RUN go build -o twcron ./cmd/cron
######## Start a new stage from scratch ####### ######## Start a new stage from scratch #######
FROM alpine:latest FROM alpine:latest
@ -28,7 +28,5 @@ ENV APP_MODE=production
EXPOSE 8080 EXPOSE 8080
RUN apk add --no-cache tzdata RUN apk add --no-cache tzdata
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.2.1/wait ./wait
RUN chmod +x ./wait
CMD ./wait && ./twcron CMD ./twcron

67
cmd/cron/main.go Normal file
View File

@ -0,0 +1,67 @@
package main
import (
"github.com/Kichiyaki/goutil/envutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
"os/signal"
"syscall"
"github.com/tribalwarshelp/cron/cmd/internal"
twhelpcron "github.com/tribalwarshelp/cron/pkg/cron"
"github.com/tribalwarshelp/cron/pkg/postgres"
"github.com/tribalwarshelp/cron/pkg/queue"
)
func main() {
redisClient, err := internal.NewRedisClient()
if err != nil {
logrus.Fatal(errors.Wrap(err, "couldn't connect to Redis"))
}
defer func() {
if err := redisClient.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "couldn't close the Redis connection"))
}
}()
dbConn, err := postgres.Connect(nil)
if err != nil {
logrus.Fatal(errors.Wrap(err, "couldn't connect to the db"))
}
defer func() {
if err := dbConn.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "couldn't close the db connection"))
}
}()
q, err := queue.New(&queue.Config{
DB: dbConn,
Redis: redisClient,
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "couldn't initialize a queue"))
}
c, err := twhelpcron.New(&twhelpcron.Config{
DB: dbConn,
RunOnInit: envutil.GetenvBool("RUN_ON_INIT"),
Queue: q,
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "couldn't initialize a cron instance"))
}
if err := c.Start(); err != nil {
logrus.Fatal(errors.Wrap(err, "couldn't start the cron"))
}
defer c.Stop()
logrus.Info("Cron is up and running!")
channel := make(chan os.Signal, 1)
signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
<-channel
logrus.Info("shutting down")
}

View File

@ -0,0 +1,32 @@
FROM golang:alpine as builder
# Set the Current Working Directory inside the container
WORKDIR /app
# Copy go mod and sum files
COPY go.mod go.sum ./
# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed
RUN go mod download
# Copy the source from the current directory to the Working Directory inside the container
COPY . .
RUN go build -o twdataupdater ./cmd/dataupdater
######## Start a new stage from scratch #######
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
# Copy the Pre-built binary file from the previous stage
COPY --from=builder /app/twdataupdater .
ENV APP_MODE=production
EXPOSE 8080
RUN apk add --no-cache tzdata
CMD ./twdataupdater

60
cmd/dataupdater/main.go Normal file
View File

@ -0,0 +1,60 @@
package main
import (
"context"
"github.com/Kichiyaki/goutil/envutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
"os/signal"
"syscall"
"github.com/tribalwarshelp/cron/cmd/internal"
"github.com/tribalwarshelp/cron/pkg/postgres"
"github.com/tribalwarshelp/cron/pkg/queue"
)
func main() {
redisClient, err := internal.NewRedisClient()
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis"))
}
defer func() {
if err := redisClient.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection"))
}
}()
dbConn, err := postgres.Connect(&postgres.Config{SkipDBInitialization: true})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db"))
}
defer func() {
if err := dbConn.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the db connection"))
}
}()
q, err := queue.New(&queue.Config{
DB: dbConn,
Redis: redisClient,
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue"))
}
if err := q.Start(context.Background()); err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't start the queue"))
}
logrus.Info("Data updater is up and running!")
channel := make(chan os.Signal, 1)
signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
<-channel
logrus.Info("shutting down")
if err := q.Close(); err != nil {
logrus.Fatal(err)
}
}

50
cmd/internal/init.go Normal file
View File

@ -0,0 +1,50 @@
package internal
import (
"github.com/Kichiyaki/appmode"
"github.com/joho/godotenv"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
)
func init() {
if err := setENVs(); err != nil {
logrus.Fatal(err)
}
prepareLogger()
}
func setENVs() error {
err := os.Setenv("TZ", "UTC")
if err != nil {
return errors.Wrap(err, "setENVs")
}
if appmode.Equals(appmode.DevelopmentMode) {
err := godotenv.Load(".env.local")
if err != nil {
return errors.Wrap(err, "setENVs")
}
}
return nil
}
func prepareLogger() {
if appmode.Equals(appmode.DevelopmentMode) {
logrus.SetLevel(logrus.DebugLevel)
}
timestampFormat := "2006-01-02 15:04:05"
if appmode.Equals(appmode.ProductionMode) {
customFormatter := new(logrus.JSONFormatter)
customFormatter.TimestampFormat = timestampFormat
logrus.SetFormatter(customFormatter)
} else {
customFormatter := new(logrus.TextFormatter)
customFormatter.TimestampFormat = timestampFormat
customFormatter.FullTimestamp = true
logrus.SetFormatter(customFormatter)
}
}

24
cmd/internal/redis.go Normal file
View File

@ -0,0 +1,24 @@
package internal
import (
"context"
"github.com/Kichiyaki/goutil/envutil"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"time"
)
func NewRedisClient() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: envutil.GetenvString("REDIS_ADDR"),
Username: envutil.GetenvString("REDIS_USERNAME"),
Password: envutil.GetenvString("REDIS_PASSWORD"),
DB: envutil.GetenvInt("REDIS_DB"),
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, errors.Wrap(err, "NewRedisClient")
}
return client, nil
}

122
main.go
View File

@ -1,122 +0,0 @@
package main
import (
"context"
"github.com/Kichiyaki/appmode"
"github.com/Kichiyaki/goutil/envutil"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"
"time"
"github.com/sirupsen/logrus"
twhelpcron "github.com/tribalwarshelp/cron/internal/cron"
"github.com/tribalwarshelp/cron/internal/postgres"
"github.com/joho/godotenv"
)
func init() {
if err := setupENVs(); err != nil {
logrus.Fatal(err)
}
setupLogger()
}
func main() {
redisClient, err := initializeRedis()
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis"))
}
defer func() {
if err := redisClient.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection"))
}
}()
dbConn, err := postgres.Connect(&postgres.Config{LogQueries: envutil.GetenvBool("LOG_DB_QUERIES")})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db"))
}
defer func() {
if err := dbConn.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the db connection"))
}
}()
c, err := twhelpcron.New(&twhelpcron.Config{
DB: dbConn,
RunOnInit: envutil.GetenvBool("RUN_ON_INIT"),
Redis: redisClient,
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance"))
}
if err := c.Start(context.Background()); err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't start the cron"))
}
logrus.Info("Cron is up and running!")
channel := make(chan os.Signal, 1)
signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
<-channel
logrus.Info("shutting down")
if err := c.Stop(); err != nil {
logrus.Fatal(err)
}
}
func setupENVs() error {
err := os.Setenv("TZ", "UTC")
if err != nil {
return errors.Wrap(err, "setupENVs")
}
if appmode.Equals(appmode.DevelopmentMode) {
err := godotenv.Load(".env.local")
if err != nil {
return errors.Wrap(err, "setupENVs")
}
}
return nil
}
func setupLogger() {
if appmode.Equals(appmode.DevelopmentMode) {
logrus.SetLevel(logrus.DebugLevel)
}
timestampFormat := "2006-01-02 15:04:05"
if appmode.Equals(appmode.ProductionMode) {
customFormatter := new(logrus.JSONFormatter)
customFormatter.TimestampFormat = timestampFormat
logrus.SetFormatter(customFormatter)
} else {
customFormatter := new(logrus.TextFormatter)
customFormatter.TimestampFormat = timestampFormat
customFormatter.FullTimestamp = true
logrus.SetFormatter(customFormatter)
}
}
func initializeRedis() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: envutil.GetenvString("REDIS_ADDR"),
Username: envutil.GetenvString("REDIS_USERNAME"),
Password: envutil.GetenvString("REDIS_PASSWORD"),
DB: envutil.GetenvInt("REDIS_DB"),
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, errors.Wrap(err, "initializeRedis")
}
return client, nil
}

View File

@ -2,23 +2,23 @@ package cron
import ( import (
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tribalwarshelp/cron/pkg/queue"
) )
type Config struct { type Config struct {
DB *pg.DB DB *pg.DB
Redis redis.UniversalClient Queue *queue.Queue
RunOnInit bool RunOnInit bool
WorkerLimit int
} }
func validateConfig(cfg *Config) error { func validateConfig(cfg *Config) error {
if cfg == nil || cfg.DB == nil { if cfg == nil || cfg.DB == nil {
return errors.New("cfg.DB is required") return errors.New("cfg.DB is required")
} }
if cfg.Redis == nil { if cfg.Queue == nil {
return errors.New("cfg.Redis is required") return errors.New("cfg.Queue is required")
} }
return nil return nil
} }

View File

@ -10,12 +10,12 @@ import (
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/tribalwarshelp/cron/internal/cron/queue" "github.com/tribalwarshelp/cron/pkg/queue"
) )
type Cron struct { type Cron struct {
*cron.Cron *cron.Cron
queue queue.Queue queue *queue.Queue
db *pg.DB db *pg.DB
runOnInit bool runOnInit bool
log logrus.FieldLogger log logrus.FieldLogger
@ -25,18 +25,14 @@ func New(cfg *Config) (*Cron, error) {
if err := validateConfig(cfg); err != nil { if err := validateConfig(cfg); err != nil {
return nil, err return nil, err
} }
q, err := initializeQueue(cfg) log := logrus.WithField("package", "pkg/cron")
if err != nil {
return nil, err
}
log := logrus.WithField("package", "internal/cron")
c := &Cron{ c := &Cron{
Cron: cron.New(cron.WithChain( Cron: cron.New(cron.WithChain(
cron.SkipIfStillRunning( cron.SkipIfStillRunning(
cron.PrintfLogger(log), cron.PrintfLogger(log),
), ),
)), )),
queue: q, queue: cfg.Queue,
db: cfg.DB, db: cfg.DB,
runOnInit: cfg.RunOnInit, runOnInit: cfg.RunOnInit,
log: log, log: log,
@ -96,59 +92,53 @@ func (c *Cron) init() error {
return nil return nil
} }
func (c *Cron) Start(ctx context.Context) error { func (c *Cron) Start() error {
if err := c.queue.Start(ctx); err != nil {
return err
}
c.Cron.Start() c.Cron.Start()
return nil return nil
} }
func (c *Cron) Stop() error { func (c *Cron) Stop() error {
c.Cron.Stop() c.Cron.Stop()
if err := c.queue.Close(); err != nil {
return err
}
return nil return nil
} }
func (c *Cron) updateServerData() { func (c *Cron) updateServerData() {
err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) err := c.queue.Add(queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err) c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err)
} }
} }
func (c *Cron) updateEnnoblements() { func (c *Cron) updateEnnoblements() {
err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) err := c.queue.Add(queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err) c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err)
} }
} }
func (c *Cron) updateHistory(timezone string) { func (c *Cron) updateHistory(timezone string) {
err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) err := c.queue.Add(queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone))
if err != nil { if err != nil {
c.logError("Cron.updateHistory", queue.UpdateHistory, err) c.logError("Cron.updateHistory", queue.UpdateHistory, err)
} }
} }
func (c *Cron) updateStats(timezone string) { func (c *Cron) updateStats(timezone string) {
err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) err := c.queue.Add(queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone))
if err != nil { if err != nil {
c.logError("Cron.updateStats", queue.UpdateStats, err) c.logError("Cron.updateStats", queue.UpdateStats, err)
} }
} }
func (c *Cron) vacuumDatabase() { func (c *Cron) vacuumDatabase() {
err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background())) err := c.queue.Add(queue.GetTask(queue.Vacuum).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.vacuumDatabase", queue.Vacuum, err) c.logError("Cron.vacuumDatabase", queue.Vacuum, err)
} }
} }
func (c *Cron) deleteNonExistentVillages() { func (c *Cron) deleteNonExistentVillages() {
err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) err := c.queue.Add(queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background()))
if err != nil { if err != nil {
c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err) c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err)
} }
@ -165,15 +155,6 @@ func (c *Cron) logError(prefix string, taskName string, err error) {
) )
} }
func initializeQueue(cfg *Config) (queue.Queue, error) {
q, err := queue.New(&queue.Config{
WorkerLimit: cfg.WorkerLimit,
Redis: cfg.Redis,
DB: cfg.DB,
})
return q, errors.Wrap(err, "couldn't initialize a queue")
}
func createFnWithTimezone(timezone string, fn func(timezone string)) func() { func createFnWithTimezone(timezone string, fn func(timezone string)) func() {
return func() { return func() {
fn(timezone) fn(timezone)

View File

@ -11,24 +11,26 @@ import (
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
) )
var log = logrus.WithField("package", "internal/postgres") var log = logrus.WithField("package", "pkg/postgres")
type Config struct { type Config struct {
LogQueries bool SkipDBInitialization bool
} }
func Connect(cfg *Config) (*pg.DB, error) { func Connect(cfg *Config) (*pg.DB, error) {
db := pg.Connect(prepareOptions()) db := pg.Connect(prepareOptions())
if cfg != nil && cfg.LogQueries { if envutil.GetenvBool("LOG_DB_QUERIES") {
db.AddQueryHook(querylogger.Logger{ db.AddQueryHook(querylogger.Logger{
Log: log, Log: log,
MaxQueryLength: 2000, MaxQueryLength: 2000,
}) })
} }
if err := prepareDB(db); err != nil { if cfg == nil || !cfg.SkipDBInitialization {
return nil, err if err := prepareDB(db); err != nil {
return nil, err
}
} }
return db, nil return db, nil
@ -110,7 +112,7 @@ func prepareDB(db *pg.DB) error {
} }
for _, server := range servers { for _, server := range servers {
if err := createSchema(db, server, true); err != nil { if err := createServerSchema(db, server, true); err != nil {
return err return err
} }
} }
@ -118,11 +120,11 @@ func prepareDB(db *pg.DB) error {
return nil return nil
} }
func CreateSchema(db *pg.DB, server *twmodel.Server) error { func CreateServerSchema(db *pg.DB, server *twmodel.Server) error {
return createSchema(db, server, false) return createServerSchema(db, server, false)
} }
func SchemaExists(db *pg.DB, schemaName string) bool { func SchemaExists(db pg.DBI, schemaName string) bool {
exists, err := db. exists, err := db.
Model(). Model().
Table("information_schema.schemata"). Table("information_schema.schemata").
@ -134,7 +136,7 @@ func SchemaExists(db *pg.DB, schemaName string) bool {
return exists return exists
} }
func createSchema(db *pg.DB, server *twmodel.Server, init bool) error { func createServerSchema(db *pg.DB, server *twmodel.Server, init bool) error {
if !init && SchemaExists(db, server.Key) { if !init && SchemaExists(db, server.Key) {
return nil return nil
} }
@ -145,7 +147,7 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error {
} }
defer func() { defer func() {
if err := tx.Close(); err != nil { if err := tx.Close(); err != nil {
log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction")) log.Warn(errors.Wrap(err, "createServerSchema: Couldn't rollback the transaction"))
} }
}() }()

View File

@ -21,7 +21,7 @@ func validateConfig(cfg *Config) error {
type registerTasksConfig struct { type registerTasksConfig struct {
DB *pg.DB DB *pg.DB
Queue Queue Queue *Queue
} }
func validateRegisterTasksConfig(cfg *registerTasksConfig) error { func validateRegisterTasksConfig(cfg *registerTasksConfig) error {

View File

@ -11,32 +11,21 @@ import (
"github.com/vmihailenco/taskq/v3/redisq" "github.com/vmihailenco/taskq/v3/redisq"
) )
const ( var log = logrus.WithField("package", "pkg/queue")
Main = "main"
Ennoblements = "ennoblements"
)
var log = logrus.WithField("package", "internal/cron/queue") type Queue struct {
type Queue interface {
Start(ctx context.Context) error
Close() error
Add(name string, msg *taskq.Message) error
}
type queue struct {
redis redis.UniversalClient redis redis.UniversalClient
main taskq.Queue main taskq.Queue
ennoblements taskq.Queue ennoblements taskq.Queue
factory taskq.Factory factory taskq.Factory
} }
func New(cfg *Config) (Queue, error) { func New(cfg *Config) (*Queue, error) {
if err := validateConfig(cfg); err != nil { if err := validateConfig(cfg); err != nil {
return nil, err return nil, err
} }
q := &queue{ q := &Queue{
redis: cfg.Redis, redis: cfg.Redis,
} }
@ -47,23 +36,22 @@ func New(cfg *Config) (Queue, error) {
return q, nil return q, nil
} }
func (q *queue) init(cfg *Config) error { func (q *Queue) init(cfg *Config) error {
q.factory = redisq.NewFactory() q.factory = redisq.NewFactory()
q.main = q.registerQueue(Main, cfg.WorkerLimit) q.main = q.registerQueue("main", cfg.WorkerLimit)
q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit) q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit)
err := registerTasks(&registerTasksConfig{ if err := registerTasks(&registerTasksConfig{
DB: cfg.DB, DB: cfg.DB,
Queue: q, Queue: q,
}) }); err != nil {
if err != nil {
return errors.Wrapf(err, "couldn't register tasks") return errors.Wrapf(err, "couldn't register tasks")
} }
return nil return nil
} }
func (q *queue) registerQueue(name string, limit int) taskq.Queue { func (q *Queue) registerQueue(name string, limit int) taskq.Queue {
return q.factory.RegisterQueue(&taskq.QueueOptions{ return q.factory.RegisterQueue(&taskq.QueueOptions{
Name: name, Name: name,
ReservationTimeout: time.Minute * 2, ReservationTimeout: time.Minute * 2,
@ -73,34 +61,45 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue {
}) })
} }
func (q *queue) getQueueByName(name string) taskq.Queue { func (q *Queue) getQueueByTaskName(name string) taskq.Queue {
switch name { switch name {
case Main: case LoadVersionsAndUpdateServerData,
LoadServersAndUpdateData,
UpdateServerData,
Vacuum,
VacuumServerData,
UpdateHistory,
UpdateServerHistory,
UpdateStats,
UpdateServerStats,
DeleteNonExistentVillages,
ServerDeleteNonExistentVillages:
return q.main return q.main
case Ennoblements: case UpdateEnnoblements,
UpdateServerEnnoblements:
return q.ennoblements return q.ennoblements
} }
return nil return nil
} }
func (q *queue) Start(ctx context.Context) error { func (q *Queue) Start(ctx context.Context) error {
if err := q.factory.StartConsumers(ctx); err != nil { if err := q.factory.StartConsumers(ctx); err != nil {
return errors.Wrap(err, "couldn't start the queue") return errors.Wrap(err, "couldn't start the queue")
} }
return nil return nil
} }
func (q *queue) Close() error { func (q *Queue) Close() error {
if err := q.factory.Close(); err != nil { if err := q.factory.Close(); err != nil {
return errors.Wrap(err, "couldn't close the queue") return errors.Wrap(err, "couldn't close the queue")
} }
return nil return nil
} }
func (q *queue) Add(name string, msg *taskq.Message) error { func (q *Queue) Add(msg *taskq.Message) error {
queue := q.getQueueByName(name) queue := q.getQueueByTaskName(msg.TaskName)
if queue == nil { if queue == nil {
return errors.Errorf("couldn't add the message to the queue: unknown queue name '%s'", name) return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName)
} }
if err := queue.Add(msg); err != nil { if err := queue.Add(msg); err != nil {
return errors.Wrap(err, "couldn't add the message to the queue") return errors.Wrap(err, "couldn't add the message to the queue")

View File

@ -13,7 +13,7 @@ const (
LoadServersAndUpdateData = "loadServersAndUpdateData" LoadServersAndUpdateData = "loadServersAndUpdateData"
UpdateServerData = "updateServerData" UpdateServerData = "updateServerData"
Vacuum = "vacuum" Vacuum = "vacuum"
VacuumServerDB = "vacuumServerDB" VacuumServerData = "vacuumServerData"
UpdateEnnoblements = "updateEnnoblements" UpdateEnnoblements = "updateEnnoblements"
UpdateServerEnnoblements = "updateServerEnnoblements" UpdateServerEnnoblements = "updateServerEnnoblements"
UpdateHistory = "updateHistory" UpdateHistory = "updateHistory"
@ -27,7 +27,7 @@ const (
type task struct { type task struct {
db *pg.DB db *pg.DB
queue Queue queue *Queue
cachedLocations sync.Map cachedLocations sync.Map
} }
@ -71,8 +71,8 @@ func registerTasks(cfg *registerTasksConfig) error {
Handler: (&taskVacuum{t}).execute, Handler: (&taskVacuum{t}).execute,
}, },
{ {
Name: VacuumServerDB, Name: VacuumServerData,
Handler: (&taskVacuumServerDB{t}).execute, Handler: (&taskVacuumServerData{t}).execute,
}, },
{ {
Name: UpdateEnnoblements, Name: UpdateEnnoblements,

View File

@ -29,7 +29,6 @@ func (t *taskDeleteNonExistentVillages) execute() error {
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue") Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add( err := t.queue.Add(
Main,
GetTask(ServerDeleteNonExistentVillages). GetTask(ServerDeleteNonExistentVillages).
WithArgs( WithArgs(
context.Background(), context.Background(),

View File

@ -8,7 +8,7 @@ import (
"github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twdataloader"
"github.com/tribalwarshelp/shared/tw/twmodel" "github.com/tribalwarshelp/shared/tw/twmodel"
"github.com/tribalwarshelp/cron/internal/postgres" "github.com/tribalwarshelp/cron/pkg/postgres"
) )
type taskLoadServersAndUpdateData struct { type taskLoadServersAndUpdateData struct {
@ -50,7 +50,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
VersionCode: version.Code, VersionCode: version.Code,
Version: version, Version: version,
} }
if err := postgres.CreateSchema(t.db, server); err != nil { if err := postgres.CreateServerSchema(t.db, server); err != nil {
logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: Couldn't create the schema", server.Key)) logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: Couldn't create the schema", server.Key))
continue continue
} }
@ -85,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
entry.Infof("%s: Servers have been loaded", version.Host) entry.Infof("%s: Servers have been loaded", version.Host)
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) err := t.queue.Add(GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -20,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error {
} }
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
for _, version := range versions { for _, version := range versions {
err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) err := t.queue.Add(GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version))
if err != nil { if err != nil {
log. log.
WithField("code", version.Code). WithField("code", version.Code).

View File

@ -26,7 +26,6 @@ func (t *taskUpdateEnnoblements) execute() error {
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add( err := t.queue.Add(
Ennoblements,
GetTask(UpdateServerEnnoblements). GetTask(UpdateServerEnnoblements).
WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server), WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server),
) )

View File

@ -41,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error {
WithField("numberOfServers", len(servers)). WithField("numberOfServers", len(servers)).
Info("taskUpdateHistory.execute: Update of the history has started") Info("taskUpdateHistory.execute: Update of the history has started")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) err := t.queue.Add(GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -41,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error {
WithField("numberOfServers", len(servers)). WithField("numberOfServers", len(servers)).
Info("taskUpdateStats.execute: Update of the stats has started") Info("taskUpdateStats.execute: Update of the stats has started")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) err := t.queue.Add(GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -22,7 +22,7 @@ func (t *taskVacuum) execute() error {
} }
log.Infof("taskVacuum.execute: The database vacumming process has started...") log.Infof("taskVacuum.execute: The database vacumming process has started...")
for _, server := range servers { for _, server := range servers {
err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server)) err := t.queue.Add(GetTask(VacuumServerData).WithArgs(context.Background(), server))
if err != nil { if err != nil {
log. log.
WithField("key", server.Key). WithField("key", server.Key).

View File

@ -11,32 +11,32 @@ const (
day = 24 * time.Hour day = 24 * time.Hour
) )
type taskVacuumServerDB struct { type taskVacuumServerData struct {
*task *task
} }
func (t *taskVacuumServerDB) execute(server *twmodel.Server) error { func (t *taskVacuumServerData) execute(server *twmodel.Server) error {
if err := t.validatePayload(server); err != nil { if err := t.validatePayload(server); err != nil {
log.Debug(errors.Wrap(err, "taskVacuumServerDB.execute")) log.Debug(errors.Wrap(err, "taskVacuumServerData.execute"))
return nil return nil
} }
entry := log.WithField("key", server.Key) entry := log.WithField("key", server.Key)
entry.Infof("taskVacuumServerDB.execute: %s: Vacumming the database...", server.Key) entry.Infof("taskVacuumServerData.execute: %s: Vacumming the database...", server.Key)
err := (&workerVacuumServerDB{ err := (&workerVacuumServerDB{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)), db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server, server: server,
}).vacuum() }).vacuum()
if err != nil { if err != nil {
err = errors.Wrap(err, "taskVacuumServerDB.execute") err = errors.Wrap(err, "taskVacuumServerData.execute")
entry.Error(err) entry.Error(err)
return err return err
} }
entry.Infof("taskVacuumServerDB.execute: %s: The database has been vacummed", server.Key) entry.Infof("taskVacuumServerData.execute: %s: The database has been vacummed", server.Key)
return nil return nil
} }
func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error { func (t *taskVacuumServerData) validatePayload(server *twmodel.Server) error {
if server == nil { if server == nil {
return errors.New("expected *twmodel.Server, got nil") return errors.New("expected *twmodel.Server, got nil")
} }