add taskUpdateEnnoblements
This commit is contained in:
parent
1598b833bb
commit
6994607e0c
|
@ -64,13 +64,13 @@ func Attach(c *cron.Cron, cfg Config) error {
|
||||||
if _, err := c.AddFunc("20 1 * * *", h.vacuumDatabase); err != nil {
|
if _, err := c.AddFunc("20 1 * * *", h.vacuumDatabase); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := c.AddFunc("@every 1m", h.updateServerEnnoblements); err != nil {
|
if _, err := c.AddFunc("@every 1m", h.updateEnnoblements); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cfg.RunOnStartup {
|
if cfg.RunOnStartup {
|
||||||
go func() {
|
go func() {
|
||||||
//h.updateServerData()
|
//h.updateServerData()
|
||||||
h.vacuumDatabase()
|
//h.vacuumDatabase()
|
||||||
//for _, fn := range updateHistoryFuncs {
|
//for _, fn := range updateHistoryFuncs {
|
||||||
// go fn()
|
// go fn()
|
||||||
//}
|
//}
|
||||||
|
|
|
@ -2,16 +2,13 @@ package cron
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tribalwarshelp/shared/models"
|
|
||||||
"github.com/tribalwarshelp/shared/tw/dataloader"
|
|
||||||
|
|
||||||
"github.com/go-pg/pg/v10"
|
"github.com/go-pg/pg/v10"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/tribalwarshelp/shared/models"
|
||||||
|
|
||||||
"github.com/tribalwarshelp/cron/cron/queue"
|
"github.com/tribalwarshelp/cron/cron/queue"
|
||||||
"github.com/tribalwarshelp/cron/cron/tasks"
|
"github.com/tribalwarshelp/cron/cron/tasks"
|
||||||
|
@ -40,41 +37,8 @@ func (h *handler) updateServerData() {
|
||||||
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
|
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) updateServerEnnoblements() {
|
func (h *handler) updateEnnoblements() {
|
||||||
servers := []*models.Server{}
|
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
|
||||||
if err := h.db.Model(&servers).Relation("Version").Where("status = ?", models.ServerStatusOpen).Select(); err != nil {
|
|
||||||
log.Error(errors.Wrap(err, "updateServerEnnoblements: cannot load ennoblements"))
|
|
||||||
}
|
|
||||||
log.
|
|
||||||
WithField("numberOfServers", len(servers)).
|
|
||||||
Info("updateServerEnnoblements: servers loaded")
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
pool := newPool(h.maxConcurrentWorkers)
|
|
||||||
for _, server := range servers {
|
|
||||||
pool.waitForWorker()
|
|
||||||
wg.Add(1)
|
|
||||||
sh := &updateServerEnnoblementsWorker{
|
|
||||||
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
|
|
||||||
server: server,
|
|
||||||
dataloader: dataloader.New(&dataloader.Config{
|
|
||||||
BaseURL: fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
go func(worker *updateServerEnnoblementsWorker, server *models.Server) {
|
|
||||||
defer func() {
|
|
||||||
pool.releaseWorker()
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
log := log.WithField("serverKey", server.Key)
|
|
||||||
err := sh.update()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorln("updateServerEnnoblements:", errors.Wrap(err, server.Key))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}(sh, server)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) updateHistory(location *time.Location) {
|
func (h *handler) updateHistory(location *time.Location) {
|
||||||
|
|
38
cron/tasks/task_update_ennoblements.go
Normal file
38
cron/tasks/task_update_ennoblements.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/tribalwarshelp/shared/models"
|
||||||
|
|
||||||
|
"github.com/tribalwarshelp/cron/cron/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type taskUpdateEnnoblements struct {
|
||||||
|
*task
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *taskUpdateEnnoblements) execute() error {
|
||||||
|
var servers []*models.Server
|
||||||
|
err := t.db.
|
||||||
|
Model(&servers).
|
||||||
|
Relation("Version").
|
||||||
|
Where("status = ?", models.ServerStatusOpen).
|
||||||
|
Select()
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "taskUpdateEnnoblements.execute")
|
||||||
|
log.Errorln(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debug("Updating ennoblements...")
|
||||||
|
for _, server := range servers {
|
||||||
|
s := server
|
||||||
|
t.queue.Add(
|
||||||
|
queue.MainQueue,
|
||||||
|
Get(TaskNameVacuumServerDB).
|
||||||
|
WithArgs(context.Background(), fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), s),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,17 +1,48 @@
|
||||||
package cron
|
package tasks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-pg/pg/v10"
|
"github.com/go-pg/pg/v10"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/tribalwarshelp/shared/models"
|
"github.com/tribalwarshelp/shared/models"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
day = 24 * time.Hour
|
day = 24 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type taskVacuumServerDB struct {
|
||||||
|
*task
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *taskVacuumServerDB) execute(server *models.Server) error {
|
||||||
|
if err := t.validatePayload(server); err != nil {
|
||||||
|
log.Debug(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
entry := log.WithField("key", server.Key)
|
||||||
|
entry.Infof("%s: vacumming the database...", server.Key)
|
||||||
|
err := (&vacuumServerDBWorker{
|
||||||
|
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||||
|
}).vacuum()
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "taskVacuumServerDB.execute")
|
||||||
|
entry.Error(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
entry.Infof("%s: the database has been vacummed", server.Key)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *taskVacuumServerDB) validatePayload(server *models.Server) error {
|
||||||
|
if server == nil {
|
||||||
|
return errors.Errorf("taskVacuumServerDB.validatePayload: Expected *models.Server, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type vacuumServerDBWorker struct {
|
type vacuumServerDBWorker struct {
|
||||||
db *pg.DB
|
db *pg.DB
|
||||||
}
|
}
|
||||||
|
@ -31,7 +62,7 @@ func (w *vacuumServerDBWorker) vacuum() error {
|
||||||
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*180)).
|
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*180)).
|
||||||
Delete()
|
Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "cannot delete old player history records")
|
return errors.Wrap(err, "couldn't delete the old player history records")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Model(&models.TribeHistory{}).
|
_, err = tx.Model(&models.TribeHistory{}).
|
||||||
|
@ -39,7 +70,7 @@ func (w *vacuumServerDBWorker) vacuum() error {
|
||||||
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*180)).
|
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*180)).
|
||||||
Delete()
|
Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "cannot delete old tribe history records")
|
return errors.Wrap(err, "couldn't delete the old tribe history records")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Model(&models.DailyPlayerStats{}).
|
_, err = tx.Model(&models.DailyPlayerStats{}).
|
||||||
|
@ -47,7 +78,7 @@ func (w *vacuumServerDBWorker) vacuum() error {
|
||||||
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*180)).
|
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*180)).
|
||||||
Delete()
|
Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "cannot delete old player stats records")
|
return errors.Wrap(err, "couldn't delete the old player stats records")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Model(&models.DailyTribeStats{}).
|
_, err = tx.Model(&models.DailyTribeStats{}).
|
||||||
|
@ -55,7 +86,7 @@ func (w *vacuumServerDBWorker) vacuum() error {
|
||||||
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*180)).
|
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*180)).
|
||||||
Delete()
|
Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "cannot delete old tribe stats records")
|
return errors.Wrap(err, "couldn't delete the old tribe stats records")
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
|
@ -10,11 +10,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TaskNameLoadVersionsAndUpdateServerData = "loadVersions"
|
TaskNameLoadVersionsAndUpdateServerData = "loadVersionsAndUpdateServerData"
|
||||||
TaskNameLoadServersAndUpdateData = "loadServers"
|
TaskNameLoadServersAndUpdateData = "loadServersAndUpdateData"
|
||||||
TaskNameUpdateServerData = "updateServerData"
|
TaskNameUpdateServerData = "updateServerData"
|
||||||
TaskNameVacuum = "vacuum"
|
TaskNameVacuum = "vacuum"
|
||||||
TaskNameVacuumServerDB = "vacuumServerDB"
|
TaskNameVacuumServerDB = "vacuumServerDB"
|
||||||
|
TaskUpdateEnnoblements = "updateEnnoblements"
|
||||||
|
TaskServerUpdateEnnoblements = "serverUpdateEnnoblements"
|
||||||
defaultRetryLimit = 3
|
defaultRetryLimit = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,6 +61,11 @@ func RegisterTasks(cfg *Config) error {
|
||||||
RetryLimit: defaultRetryLimit,
|
RetryLimit: defaultRetryLimit,
|
||||||
Handler: (&taskVacuumServerDB{t}).execute,
|
Handler: (&taskVacuumServerDB{t}).execute,
|
||||||
})
|
})
|
||||||
|
taskq.RegisterTask(&taskq.TaskOptions{
|
||||||
|
Name: TaskUpdateEnnoblements,
|
||||||
|
RetryLimit: defaultRetryLimit,
|
||||||
|
Handler: (&taskUpdateEnnoblements{t}).execute,
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue
Block a user