change queue initialization in main.go
This commit is contained in:
parent
bea9d67446
commit
5e1041503c
20
main.go
20
main.go
|
@ -15,6 +15,7 @@ import (
|
|||
|
||||
twhelpcron "github.com/tribalwarshelp/cron/pkg/cron"
|
||||
"github.com/tribalwarshelp/cron/pkg/postgres"
|
||||
"github.com/tribalwarshelp/cron/pkg/queue"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
)
|
||||
|
@ -47,18 +48,29 @@ func main() {
|
|||
}
|
||||
}()
|
||||
|
||||
c, err := twhelpcron.New(&twhelpcron.Config{
|
||||
q, err := queue.New(&queue.Config{
|
||||
DB: dbConn,
|
||||
RunOnInit: envutil.GetenvBool("RUN_ON_INIT"),
|
||||
Redis: redisClient,
|
||||
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue"))
|
||||
}
|
||||
if err := q.Start(context.Background()); err != nil {
|
||||
logrus.Fatal(errors.Wrap(err, "Couldn't start the queue"))
|
||||
}
|
||||
|
||||
c, err := twhelpcron.New(&twhelpcron.Config{
|
||||
DB: dbConn,
|
||||
RunOnInit: envutil.GetenvBool("RUN_ON_INIT"),
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance"))
|
||||
}
|
||||
if err := c.Start(context.Background()); err != nil {
|
||||
if err := c.Start(); err != nil {
|
||||
logrus.Fatal(errors.Wrap(err, "Couldn't start the cron"))
|
||||
}
|
||||
defer c.Stop()
|
||||
|
||||
logrus.Info("Cron is up and running!")
|
||||
|
||||
|
@ -67,7 +79,7 @@ func main() {
|
|||
<-channel
|
||||
|
||||
logrus.Info("shutting down")
|
||||
if err := c.Stop(); err != nil {
|
||||
if err := q.Close(); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,23 +2,23 @@ package cron
|
|||
|
||||
import (
|
||||
"github.com/go-pg/pg/v10"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tribalwarshelp/cron/pkg/queue"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DB *pg.DB
|
||||
Redis redis.UniversalClient
|
||||
RunOnInit bool
|
||||
WorkerLimit int
|
||||
DB *pg.DB
|
||||
Queue *queue.Queue
|
||||
RunOnInit bool
|
||||
}
|
||||
|
||||
func validateConfig(cfg *Config) error {
|
||||
if cfg == nil || cfg.DB == nil {
|
||||
return errors.New("cfg.DB is required")
|
||||
}
|
||||
if cfg.Redis == nil {
|
||||
return errors.New("cfg.Redis is required")
|
||||
if cfg.Queue == nil {
|
||||
return errors.New("cfg.Queue is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -25,10 +25,6 @@ func New(cfg *Config) (*Cron, error) {
|
|||
if err := validateConfig(cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q, err := initializeQueue(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log := logrus.WithField("package", "pkg/cron")
|
||||
c := &Cron{
|
||||
Cron: cron.New(cron.WithChain(
|
||||
|
@ -36,7 +32,7 @@ func New(cfg *Config) (*Cron, error) {
|
|||
cron.PrintfLogger(log),
|
||||
),
|
||||
)),
|
||||
queue: q,
|
||||
queue: cfg.Queue,
|
||||
db: cfg.DB,
|
||||
runOnInit: cfg.RunOnInit,
|
||||
log: log,
|
||||
|
@ -96,19 +92,13 @@ func (c *Cron) init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cron) Start(ctx context.Context) error {
|
||||
if err := c.queue.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
func (c *Cron) Start() error {
|
||||
c.Cron.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cron) Stop() error {
|
||||
c.Cron.Stop()
|
||||
if err := c.queue.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -165,15 +155,6 @@ func (c *Cron) logError(prefix string, taskName string, err error) {
|
|||
)
|
||||
}
|
||||
|
||||
func initializeQueue(cfg *Config) (*queue.Queue, error) {
|
||||
q, err := queue.New(&queue.Config{
|
||||
WorkerLimit: cfg.WorkerLimit,
|
||||
Redis: cfg.Redis,
|
||||
DB: cfg.DB,
|
||||
})
|
||||
return q, errors.Wrap(err, "couldn't initialize a queue")
|
||||
}
|
||||
|
||||
func createFnWithTimezone(timezone string, fn func(timezone string)) func() {
|
||||
return func() {
|
||||
fn(timezone)
|
||||
|
|
|
@ -41,11 +41,10 @@ func (q *Queue) init(cfg *Config) error {
|
|||
q.main = q.registerQueue("main", cfg.WorkerLimit)
|
||||
q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit)
|
||||
|
||||
err := registerTasks(®isterTasksConfig{
|
||||
if err := registerTasks(®isterTasksConfig{
|
||||
DB: cfg.DB,
|
||||
Queue: q,
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "couldn't register tasks")
|
||||
}
|
||||
|
||||
|
|
Reference in New Issue