109 lines
2.3 KiB
Go
109 lines
2.3 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
"github.com/vmihailenco/taskq/v3"
|
|
"github.com/vmihailenco/taskq/v3/redisq"
|
|
)
|
|
|
|
var log = logrus.WithField("package", "pkg/queue")
|
|
|
|
type Queue struct {
|
|
redis redis.UniversalClient
|
|
main taskq.Queue
|
|
ennoblements taskq.Queue
|
|
factory taskq.Factory
|
|
}
|
|
|
|
func New(cfg *Config) (*Queue, error) {
|
|
if err := validateConfig(cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
q := &Queue{
|
|
redis: cfg.Redis,
|
|
}
|
|
|
|
if err := q.init(cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *Queue) init(cfg *Config) error {
|
|
q.factory = redisq.NewFactory()
|
|
q.main = q.registerQueue("main", cfg.WorkerLimit)
|
|
q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit)
|
|
|
|
if err := registerTasks(®isterTasksConfig{
|
|
DB: cfg.DB,
|
|
Queue: q,
|
|
}); err != nil {
|
|
return errors.Wrapf(err, "couldn't register tasks")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) registerQueue(name string, limit int) taskq.Queue {
|
|
return q.factory.RegisterQueue(&taskq.QueueOptions{
|
|
Name: name,
|
|
ReservationTimeout: time.Minute * 2,
|
|
Redis: q.redis,
|
|
MinNumWorker: int32(limit),
|
|
MaxNumWorker: int32(limit),
|
|
})
|
|
}
|
|
|
|
func (q *Queue) getQueueByTaskName(name string) taskq.Queue {
|
|
switch name {
|
|
case LoadVersionsAndUpdateServerData,
|
|
LoadServersAndUpdateData,
|
|
UpdateServerData,
|
|
Vacuum,
|
|
VacuumServerData,
|
|
UpdateHistory,
|
|
UpdateServerHistory,
|
|
UpdateStats,
|
|
UpdateServerStats,
|
|
DeleteNonExistentVillages,
|
|
ServerDeleteNonExistentVillages:
|
|
return q.main
|
|
case UpdateEnnoblements,
|
|
UpdateServerEnnoblements:
|
|
return q.ennoblements
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Start(ctx context.Context) error {
|
|
if err := q.factory.StartConsumers(ctx); err != nil {
|
|
return errors.Wrap(err, "couldn't start the queue")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Close() error {
|
|
if err := q.factory.Close(); err != nil {
|
|
return errors.Wrap(err, "couldn't close the queue")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Add(msg *taskq.Message) error {
|
|
queue := q.getQueueByTaskName(msg.TaskName)
|
|
if queue == nil {
|
|
return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName)
|
|
}
|
|
if err := queue.Add(msg); err != nil {
|
|
return errors.Wrap(err, "couldn't add the message to the queue")
|
|
}
|
|
return nil
|
|
}
|