2021-02-14 18:19:30 +00:00
|
|
|
package queue
|
|
|
|
|
|
|
|
import (
|
2021-04-25 12:51:41 +00:00
|
|
|
"context"
|
|
|
|
"github.com/pkg/errors"
|
2021-02-14 18:19:30 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
"github.com/vmihailenco/taskq/v3"
|
|
|
|
"github.com/vmihailenco/taskq/v3/redisq"
|
|
|
|
)
|
|
|
|
|
2021-04-25 12:51:41 +00:00
|
|
|
const (
|
2021-04-27 18:10:14 +00:00
|
|
|
MainQueue = "main"
|
|
|
|
EnnoblementsQueue = "ennoblements"
|
2021-04-25 12:51:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Queue interface {
|
|
|
|
Start(ctx context.Context) error
|
|
|
|
Close() error
|
2021-04-27 18:10:14 +00:00
|
|
|
Add(name string, msg *taskq.Message) error
|
2021-04-25 12:51:41 +00:00
|
|
|
}
|
|
|
|
|
2021-02-14 18:19:30 +00:00
|
|
|
type queue struct {
|
|
|
|
redis redis.UniversalClient
|
|
|
|
mainQueue taskq.Queue
|
|
|
|
ennoblementsQueue taskq.Queue
|
|
|
|
factory taskq.Factory
|
|
|
|
}
|
|
|
|
|
2021-04-25 12:51:41 +00:00
|
|
|
func New(cfg *Config) (Queue, error) {
|
|
|
|
if err := validateConfig(cfg); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-02-14 18:19:30 +00:00
|
|
|
q := &queue{
|
|
|
|
redis: cfg.Redis,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := q.init(cfg); err != nil {
|
2021-04-25 12:51:41 +00:00
|
|
|
return nil, err
|
2021-02-14 18:19:30 +00:00
|
|
|
}
|
|
|
|
|
2021-04-25 12:51:41 +00:00
|
|
|
return q, nil
|
2021-02-14 18:19:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (q *queue) init(cfg *Config) error {
|
|
|
|
q.factory = redisq.NewFactory()
|
2021-04-25 12:51:41 +00:00
|
|
|
q.mainQueue = q.registerQueue(MainQueue, cfg.WorkerLimit)
|
|
|
|
q.ennoblementsQueue = q.registerQueue(EnnoblementsQueue, cfg.WorkerLimit*2)
|
2021-02-14 18:19:30 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-27 18:10:14 +00:00
|
|
|
func (q *queue) registerQueue(name string, limit int) taskq.Queue {
|
2021-02-14 18:19:30 +00:00
|
|
|
return q.factory.RegisterQueue(&taskq.QueueOptions{
|
2021-04-27 18:10:14 +00:00
|
|
|
Name: name,
|
2021-02-14 18:19:30 +00:00
|
|
|
ReservationTimeout: time.Minute * 2,
|
|
|
|
Redis: q.redis,
|
2021-04-25 16:21:34 +00:00
|
|
|
MinNumWorker: int32(limit),
|
2021-02-14 18:19:30 +00:00
|
|
|
MaxNumWorker: int32(limit),
|
|
|
|
})
|
|
|
|
}
|
2021-04-25 12:51:41 +00:00
|
|
|
|
2021-04-27 18:10:14 +00:00
|
|
|
func (q *queue) getQueueByName(name string) taskq.Queue {
|
2021-04-25 18:24:26 +00:00
|
|
|
switch name {
|
|
|
|
case MainQueue:
|
|
|
|
return q.mainQueue
|
|
|
|
case EnnoblementsQueue:
|
|
|
|
return q.ennoblementsQueue
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-25 12:51:41 +00:00
|
|
|
func (q *queue) Start(ctx context.Context) error {
|
|
|
|
if err := q.factory.StartConsumers(ctx); err != nil {
|
|
|
|
return errors.Wrap(err, "Couldn't start the queue")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *queue) Close() error {
|
|
|
|
if err := q.factory.Close(); err != nil {
|
|
|
|
return errors.Wrap(err, "Couldn't close the queue")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-27 18:10:14 +00:00
|
|
|
func (q *queue) Add(name string, msg *taskq.Message) error {
|
2021-04-25 12:51:41 +00:00
|
|
|
queue := q.getQueueByName(name)
|
|
|
|
if queue == nil {
|
|
|
|
return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name)
|
|
|
|
}
|
|
|
|
if err := queue.Add(msg); err != nil {
|
|
|
|
return errors.Wrap(err, "Couldn't add the message to the queue")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|