add taskLodVersions

This commit is contained in:
Dawid Wysokiński 2021-04-25 17:21:32 +02:00
parent dea3d4616a
commit cdd8c9b03f
5 changed files with 53 additions and 8 deletions

View File

@ -30,7 +30,11 @@ func Attach(c *cron.Cron, cfg Config) error {
return fmt.Errorf("cfg.Queue cannot be nil, expected queue.Queue")
}
h := &handler{db: cfg.DB, maxConcurrentWorkers: cfg.MaxConcurrentWorkers}
h := &handler{
db: cfg.DB,
maxConcurrentWorkers: cfg.MaxConcurrentWorkers,
queue: cfg.Queue,
}
if err := h.init(); err != nil {
return err
}

View File

@ -68,7 +68,7 @@ func (t *taskLoadServers) execute(version *models.Version) error {
if _, err := t.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND version_code =", pg.In(serverKeys), version.Code).
Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code).
Update(); err != nil {
err = errors.Wrap(err, "couldn't update server statuses")
logrus.Fatal(err)
@ -92,6 +92,7 @@ func (t *taskLoadServers) getServers(version *models.Version) (map[string]string
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't load servers", version.Host)
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't read the body", version.Host)
@ -100,5 +101,14 @@ func (t *taskLoadServers) getServers(version *models.Version) (map[string]string
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't decode the body into the go value", version.Host)
}
return body.(map[string]string), nil
result := make(map[string]string)
for serverKey, url := range body.(map[interface{}]interface{}) {
serverKeyStr := serverKey.(string)
urlStr := url.(string)
if serverKeyStr != "" && urlStr != "" {
result[serverKeyStr] = urlStr
}
}
return result, nil
}

View File

@ -0,0 +1,28 @@
package tasks
import (
"context"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/cron/queue"
)
type taskLoadVersions struct {
*task
}
func (t *taskLoadVersions) execute() error {
var versions []*models.Version
log.Debug("taskLoadVersions.execute: Loading versions...")
if err := t.db.Model(&versions).Relation("SpecialServers").Select(); err != nil {
err = errors.Wrap(err, "taskLoadVersions.execute: couldn't load versions")
log.Fatal(err)
return err
}
for _, version := range versions {
t.queue.Add(queue.MainQueue, Get(TaskNameLoadServers).WithArgs(context.Background(), version))
}
log.Debug("taskLoadVersions.execute: Versions have been loaded")
return nil
}

View File

@ -13,7 +13,7 @@ const (
TaskNameLoadVersions = "loadVersions"
TaskNameLoadServers = "loadServers"
TaskNameUpdateServerData = "updateServerData"
retryLimitLoadServers = 3
defaultRetryLimit = 3
)
var log = logrus.WithField("package", "tasks")
@ -32,9 +32,14 @@ func RegisterTasks(cfg *Config) error {
db: cfg.DB,
queue: cfg.Queue,
}
taskq.RegisterTask(&taskq.TaskOptions{
Name: TaskNameLoadVersions,
RetryLimit: defaultRetryLimit,
Handler: (&taskLoadVersions{t}).execute,
})
taskq.RegisterTask(&taskq.TaskOptions{
Name: TaskNameLoadServers,
RetryLimit: retryLimitLoadServers,
RetryLimit: defaultRetryLimit,
Handler: (&taskLoadServers{t}).execute,
})

View File

@ -57,9 +57,7 @@ func main() {
DB: conn,
Queue: queue,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := queue.Start(ctx); err != nil {
if err := queue.Start(context.Background()); err != nil {
logrus.Fatal(err)
}