remove the Queue interface

This commit is contained in:
Dawid Wysokiński 2021-07-11 08:16:46 +02:00
parent 91bf75b43b
commit 37955cfebf
4 changed files with 13 additions and 19 deletions

View File

@ -15,7 +15,7 @@ import (
type Cron struct {
*cron.Cron
queue queue.Queue
queue *queue.Queue
db *pg.DB
runOnInit bool
log logrus.FieldLogger
@ -165,7 +165,7 @@ func (c *Cron) logError(prefix string, taskName string, err error) {
)
}
func initializeQueue(cfg *Config) (queue.Queue, error) {
func initializeQueue(cfg *Config) (*queue.Queue, error) {
q, err := queue.New(&queue.Config{
WorkerLimit: cfg.WorkerLimit,
Redis: cfg.Redis,

View File

@ -21,7 +21,7 @@ func validateConfig(cfg *Config) error {
type registerTasksConfig struct {
DB *pg.DB
Queue Queue
Queue *Queue
}
func validateRegisterTasksConfig(cfg *registerTasksConfig) error {

View File

@ -13,25 +13,19 @@ import (
var log = logrus.WithField("package", "pkg/cron/queue")
type Queue interface {
Start(ctx context.Context) error
Close() error
Add(msg *taskq.Message) error
}
type queue struct {
type Queue struct {
redis redis.UniversalClient
main taskq.Queue
ennoblements taskq.Queue
factory taskq.Factory
}
func New(cfg *Config) (Queue, error) {
func New(cfg *Config) (*Queue, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
q := &queue{
q := &Queue{
redis: cfg.Redis,
}
@ -42,7 +36,7 @@ func New(cfg *Config) (Queue, error) {
return q, nil
}
func (q *queue) init(cfg *Config) error {
func (q *Queue) init(cfg *Config) error {
q.factory = redisq.NewFactory()
q.main = q.registerQueue("main", cfg.WorkerLimit)
q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit)
@ -58,7 +52,7 @@ func (q *queue) init(cfg *Config) error {
return nil
}
func (q *queue) registerQueue(name string, limit int) taskq.Queue {
func (q *Queue) registerQueue(name string, limit int) taskq.Queue {
return q.factory.RegisterQueue(&taskq.QueueOptions{
Name: name,
ReservationTimeout: time.Minute * 2,
@ -68,7 +62,7 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue {
})
}
func (q *queue) getQueueByTaskName(name string) taskq.Queue {
func (q *Queue) getQueueByTaskName(name string) taskq.Queue {
switch name {
case LoadVersionsAndUpdateServerData,
LoadServersAndUpdateData,
@ -89,21 +83,21 @@ func (q *queue) getQueueByTaskName(name string) taskq.Queue {
return nil
}
func (q *queue) Start(ctx context.Context) error {
func (q *Queue) Start(ctx context.Context) error {
if err := q.factory.StartConsumers(ctx); err != nil {
return errors.Wrap(err, "couldn't start the queue")
}
return nil
}
func (q *queue) Close() error {
func (q *Queue) Close() error {
if err := q.factory.Close(); err != nil {
return errors.Wrap(err, "couldn't close the queue")
}
return nil
}
func (q *queue) Add(msg *taskq.Message) error {
func (q *Queue) Add(msg *taskq.Message) error {
queue := q.getQueueByTaskName(msg.TaskName)
if queue == nil {
return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName)

View File

@ -27,7 +27,7 @@ const (
type task struct {
db *pg.DB
queue Queue
queue *Queue
cachedLocations sync.Map
}