remove one arg from q.Add (name), rename taskVacuumServerDB -> taskVacuumServerData
This commit is contained in:
parent
4154d6c133
commit
91bf75b43b
|
@ -113,42 +113,42 @@ func (c *Cron) Stop() error {
|
|||
}
|
||||
|
||||
func (c *Cron) updateServerData() {
|
||||
err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background()))
|
||||
err := c.queue.Add(queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background()))
|
||||
if err != nil {
|
||||
c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cron) updateEnnoblements() {
|
||||
err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background()))
|
||||
err := c.queue.Add(queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background()))
|
||||
if err != nil {
|
||||
c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cron) updateHistory(timezone string) {
|
||||
err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone))
|
||||
err := c.queue.Add(queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone))
|
||||
if err != nil {
|
||||
c.logError("Cron.updateHistory", queue.UpdateHistory, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cron) updateStats(timezone string) {
|
||||
err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone))
|
||||
err := c.queue.Add(queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone))
|
||||
if err != nil {
|
||||
c.logError("Cron.updateStats", queue.UpdateStats, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cron) vacuumDatabase() {
|
||||
err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background()))
|
||||
err := c.queue.Add(queue.GetTask(queue.Vacuum).WithArgs(context.Background()))
|
||||
if err != nil {
|
||||
c.logError("Cron.vacuumDatabase", queue.Vacuum, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cron) deleteNonExistentVillages() {
|
||||
err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background()))
|
||||
err := c.queue.Add(queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background()))
|
||||
if err != nil {
|
||||
c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err)
|
||||
}
|
||||
|
|
|
@ -11,17 +11,12 @@ import (
|
|||
"github.com/vmihailenco/taskq/v3/redisq"
|
||||
)
|
||||
|
||||
const (
|
||||
Main = "main"
|
||||
Ennoblements = "ennoblements"
|
||||
)
|
||||
|
||||
var log = logrus.WithField("package", "pkg/cron/queue")
|
||||
|
||||
type Queue interface {
|
||||
Start(ctx context.Context) error
|
||||
Close() error
|
||||
Add(name string, msg *taskq.Message) error
|
||||
Add(msg *taskq.Message) error
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
|
@ -49,8 +44,8 @@ func New(cfg *Config) (Queue, error) {
|
|||
|
||||
func (q *queue) init(cfg *Config) error {
|
||||
q.factory = redisq.NewFactory()
|
||||
q.main = q.registerQueue(Main, cfg.WorkerLimit)
|
||||
q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit)
|
||||
q.main = q.registerQueue("main", cfg.WorkerLimit)
|
||||
q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit)
|
||||
|
||||
err := registerTasks(®isterTasksConfig{
|
||||
DB: cfg.DB,
|
||||
|
@ -73,11 +68,22 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue {
|
|||
})
|
||||
}
|
||||
|
||||
func (q *queue) getQueueByName(name string) taskq.Queue {
|
||||
func (q *queue) getQueueByTaskName(name string) taskq.Queue {
|
||||
switch name {
|
||||
case Main:
|
||||
case LoadVersionsAndUpdateServerData,
|
||||
LoadServersAndUpdateData,
|
||||
UpdateServerData,
|
||||
Vacuum,
|
||||
VacuumServerData,
|
||||
UpdateHistory,
|
||||
UpdateServerHistory,
|
||||
UpdateStats,
|
||||
UpdateServerStats,
|
||||
DeleteNonExistentVillages,
|
||||
ServerDeleteNonExistentVillages:
|
||||
return q.main
|
||||
case Ennoblements:
|
||||
case UpdateEnnoblements,
|
||||
UpdateServerEnnoblements:
|
||||
return q.ennoblements
|
||||
}
|
||||
return nil
|
||||
|
@ -97,10 +103,10 @@ func (q *queue) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) Add(name string, msg *taskq.Message) error {
|
||||
queue := q.getQueueByName(name)
|
||||
func (q *queue) Add(msg *taskq.Message) error {
|
||||
queue := q.getQueueByTaskName(msg.TaskName)
|
||||
if queue == nil {
|
||||
return errors.Errorf("couldn't add the message to the queue: unknown queue name '%s'", name)
|
||||
return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName)
|
||||
}
|
||||
if err := queue.Add(msg); err != nil {
|
||||
return errors.Wrap(err, "couldn't add the message to the queue")
|
||||
|
|
|
@ -13,7 +13,7 @@ const (
|
|||
LoadServersAndUpdateData = "loadServersAndUpdateData"
|
||||
UpdateServerData = "updateServerData"
|
||||
Vacuum = "vacuum"
|
||||
VacuumServerDB = "vacuumServerDB"
|
||||
VacuumServerData = "vacuumServerData"
|
||||
UpdateEnnoblements = "updateEnnoblements"
|
||||
UpdateServerEnnoblements = "updateServerEnnoblements"
|
||||
UpdateHistory = "updateHistory"
|
||||
|
@ -71,8 +71,8 @@ func registerTasks(cfg *registerTasksConfig) error {
|
|||
Handler: (&taskVacuum{t}).execute,
|
||||
},
|
||||
{
|
||||
Name: VacuumServerDB,
|
||||
Handler: (&taskVacuumServerDB{t}).execute,
|
||||
Name: VacuumServerData,
|
||||
Handler: (&taskVacuumServerData{t}).execute,
|
||||
},
|
||||
{
|
||||
Name: UpdateEnnoblements,
|
||||
|
|
|
@ -29,7 +29,6 @@ func (t *taskDeleteNonExistentVillages) execute() error {
|
|||
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(
|
||||
Main,
|
||||
GetTask(ServerDeleteNonExistentVillages).
|
||||
WithArgs(
|
||||
context.Background(),
|
||||
|
|
|
@ -85,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
|||
|
||||
entry.Infof("%s: Servers have been loaded", version.Host)
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
|
||||
err := t.queue.Add(GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
|
||||
if err != nil {
|
||||
log.
|
||||
WithField("key", server.Key).
|
||||
|
|
|
@ -20,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error {
|
|||
}
|
||||
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
|
||||
for _, version := range versions {
|
||||
err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version))
|
||||
err := t.queue.Add(GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version))
|
||||
if err != nil {
|
||||
log.
|
||||
WithField("code", version.Code).
|
||||
|
|
|
@ -26,7 +26,6 @@ func (t *taskUpdateEnnoblements) execute() error {
|
|||
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(
|
||||
Ennoblements,
|
||||
GetTask(UpdateServerEnnoblements).
|
||||
WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server),
|
||||
)
|
||||
|
|
|
@ -41,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error {
|
|||
WithField("numberOfServers", len(servers)).
|
||||
Info("taskUpdateHistory.execute: Update of the history has started")
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server))
|
||||
err := t.queue.Add(GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server))
|
||||
if err != nil {
|
||||
log.
|
||||
WithField("key", server.Key).
|
||||
|
|
|
@ -41,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error {
|
|||
WithField("numberOfServers", len(servers)).
|
||||
Info("taskUpdateStats.execute: Update of the stats has started")
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server))
|
||||
err := t.queue.Add(GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server))
|
||||
if err != nil {
|
||||
log.
|
||||
WithField("key", server.Key).
|
||||
|
|
|
@ -22,7 +22,7 @@ func (t *taskVacuum) execute() error {
|
|||
}
|
||||
log.Infof("taskVacuum.execute: The database vacumming process has started...")
|
||||
for _, server := range servers {
|
||||
err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server))
|
||||
err := t.queue.Add(GetTask(VacuumServerData).WithArgs(context.Background(), server))
|
||||
if err != nil {
|
||||
log.
|
||||
WithField("key", server.Key).
|
||||
|
|
|
@ -11,32 +11,32 @@ const (
|
|||
day = 24 * time.Hour
|
||||
)
|
||||
|
||||
type taskVacuumServerDB struct {
|
||||
type taskVacuumServerData struct {
|
||||
*task
|
||||
}
|
||||
|
||||
func (t *taskVacuumServerDB) execute(server *twmodel.Server) error {
|
||||
func (t *taskVacuumServerData) execute(server *twmodel.Server) error {
|
||||
if err := t.validatePayload(server); err != nil {
|
||||
log.Debug(errors.Wrap(err, "taskVacuumServerDB.execute"))
|
||||
log.Debug(errors.Wrap(err, "taskVacuumServerData.execute"))
|
||||
return nil
|
||||
}
|
||||
entry := log.WithField("key", server.Key)
|
||||
entry.Infof("taskVacuumServerDB.execute: %s: Vacumming the database...", server.Key)
|
||||
entry.Infof("taskVacuumServerData.execute: %s: Vacumming the database...", server.Key)
|
||||
err := (&workerVacuumServerDB{
|
||||
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||
server: server,
|
||||
}).vacuum()
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "taskVacuumServerDB.execute")
|
||||
err = errors.Wrap(err, "taskVacuumServerData.execute")
|
||||
entry.Error(err)
|
||||
return err
|
||||
}
|
||||
entry.Infof("taskVacuumServerDB.execute: %s: The database has been vacummed", server.Key)
|
||||
entry.Infof("taskVacuumServerData.execute: %s: The database has been vacummed", server.Key)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error {
|
||||
func (t *taskVacuumServerData) validatePayload(server *twmodel.Server) error {
|
||||
if server == nil {
|
||||
return errors.New("expected *twmodel.Server, got nil")
|
||||
}
|
Reference in New Issue