move queue names to constants, add the Queue interface, implement the Queue interface
This commit is contained in:
parent
55d2a5b0ab
commit
4e051b0a5f
|
@ -1,54 +1,114 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/pkg/errors"
|
||||
"time"
|
||||
|
||||
"github.com/go-pg/pg/v10"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/vmihailenco/taskq/v3"
|
||||
"github.com/vmihailenco/taskq/v3/redisq"
|
||||
)
|
||||
|
||||
type QueueName string
|
||||
|
||||
const (
|
||||
MainQueue QueueName = "main"
|
||||
EnnoblementsQueue QueueName = "ennoblements"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DB *pg.DB
|
||||
Redis redis.UniversalClient
|
||||
WorkerLimit int
|
||||
}
|
||||
|
||||
type Queue interface {
|
||||
Start(ctx context.Context) error
|
||||
Close() error
|
||||
Add(name QueueName, msg *taskq.Message) error
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
db *pg.DB
|
||||
redis redis.UniversalClient
|
||||
mainQueue taskq.Queue
|
||||
ennoblementsQueue taskq.Queue
|
||||
factory taskq.Factory
|
||||
}
|
||||
|
||||
func New(cfg *Config) error {
|
||||
func New(cfg *Config) (Queue, error) {
|
||||
if err := validateConfig(cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q := &queue{
|
||||
db: cfg.DB,
|
||||
redis: cfg.Redis,
|
||||
}
|
||||
|
||||
if err := q.init(cfg); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *queue) init(cfg *Config) error {
|
||||
q.factory = redisq.NewFactory()
|
||||
q.mainQueue = q.registerQueue("main", cfg.WorkerLimit)
|
||||
q.ennoblementsQueue = q.registerQueue("ennoblements", cfg.WorkerLimit*2)
|
||||
q.mainQueue = q.registerQueue(MainQueue, cfg.WorkerLimit)
|
||||
q.ennoblementsQueue = q.registerQueue(EnnoblementsQueue, cfg.WorkerLimit*2)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) registerQueue(name string, limit int) taskq.Queue {
|
||||
func (q *queue) registerQueue(name QueueName, limit int) taskq.Queue {
|
||||
return q.factory.RegisterQueue(&taskq.QueueOptions{
|
||||
Name: name,
|
||||
Name: string(name),
|
||||
ReservationTimeout: time.Minute * 2,
|
||||
Redis: q.redis,
|
||||
MaxNumWorker: int32(limit),
|
||||
})
|
||||
}
|
||||
|
||||
func (q *queue) Start(ctx context.Context) error {
|
||||
if err := q.factory.StartConsumers(ctx); err != nil {
|
||||
return errors.Wrap(err, "Couldn't start the queue")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) Close() error {
|
||||
if err := q.factory.Close(); err != nil {
|
||||
return errors.Wrap(err, "Couldn't close the queue")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) Add(name QueueName, msg *taskq.Message) error {
|
||||
queue := q.getQueueByName(name)
|
||||
if queue == nil {
|
||||
return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name)
|
||||
}
|
||||
if err := queue.Add(msg); err != nil {
|
||||
return errors.Wrap(err, "Couldn't add the message to the queue")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) getQueueByName(name QueueName) taskq.Queue {
|
||||
switch name {
|
||||
case MainQueue:
|
||||
return q.mainQueue
|
||||
case EnnoblementsQueue:
|
||||
return q.ennoblementsQueue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateConfig(cfg *Config) error {
|
||||
if cfg == nil {
|
||||
return errors.New("Config hasn't been provided")
|
||||
}
|
||||
if cfg.Redis == nil {
|
||||
return errors.New("cfg.Redis is a required field")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Reference in New Issue
Block a user