correct errors / logged messages
This commit is contained in:
parent
818ee39bd7
commit
2ba7adcd20
|
@ -15,10 +15,10 @@ type Config struct {
|
||||||
|
|
||||||
func validateConfig(cfg *Config) error {
|
func validateConfig(cfg *Config) error {
|
||||||
if cfg == nil || cfg.DB == nil {
|
if cfg == nil || cfg.DB == nil {
|
||||||
return errors.New("validateConfig: cfg.DB is required")
|
return errors.New("cfg.DB is required")
|
||||||
}
|
}
|
||||||
if cfg.Redis == nil {
|
if cfg.Redis == nil {
|
||||||
return errors.New("validateConfig: cfg.Redis is required")
|
return errors.New("cfg.Redis is required")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ func New(cfg *Config) (*Cron, error) {
|
||||||
func (c *Cron) init() error {
|
func (c *Cron) init() error {
|
||||||
var versions []*twmodel.Version
|
var versions []*twmodel.Version
|
||||||
if err := c.db.Model(&versions).DistinctOn("timezone").Select(); err != nil {
|
if err := c.db.Model(&versions).DistinctOn("timezone").Select(); err != nil {
|
||||||
return errors.Wrap(err, "Cron.init: couldn't load versions")
|
return errors.Wrap(err, "couldn't load versions")
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateHistoryFuncs []func()
|
var updateHistoryFuncs []func()
|
||||||
|
@ -99,7 +99,7 @@ func (c *Cron) init() error {
|
||||||
|
|
||||||
func (c *Cron) Start(ctx context.Context) error {
|
func (c *Cron) Start(ctx context.Context) error {
|
||||||
if err := c.queue.Start(ctx); err != nil {
|
if err := c.queue.Start(ctx); err != nil {
|
||||||
return errors.Wrap(err, "Cron.Start")
|
return err
|
||||||
}
|
}
|
||||||
c.Cron.Start()
|
c.Cron.Start()
|
||||||
return nil
|
return nil
|
||||||
|
@ -108,7 +108,7 @@ func (c *Cron) Start(ctx context.Context) error {
|
||||||
func (c *Cron) Stop() error {
|
func (c *Cron) Stop() error {
|
||||||
c.Cron.Stop()
|
c.Cron.Stop()
|
||||||
if err := c.queue.Close(); err != nil {
|
if err := c.queue.Close(); err != nil {
|
||||||
return errors.Wrap(err, "Cron.Stop")
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -172,14 +172,14 @@ func initializeQueue(cfg *Config) (queue.Queue, error) {
|
||||||
Redis: cfg.Redis,
|
Redis: cfg.Redis,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
|
return nil, errors.Wrap(err, "couldn't initialize a queue")
|
||||||
}
|
}
|
||||||
err = task.RegisterTasks(&task.Config{
|
err = task.RegisterTasks(&task.Config{
|
||||||
DB: cfg.DB,
|
DB: cfg.DB,
|
||||||
Queue: q,
|
Queue: q,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
|
return nil, errors.Wrap(err, "couldn't register tasks")
|
||||||
}
|
}
|
||||||
return q, nil
|
return q, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ type Config struct {
|
||||||
|
|
||||||
func validateConfig(cfg *Config) error {
|
func validateConfig(cfg *Config) error {
|
||||||
if cfg == nil || cfg.Redis == nil {
|
if cfg == nil || cfg.Redis == nil {
|
||||||
return errors.New("validateConfig: cfg.Redis is required")
|
return errors.New("cfg.Redis is required")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,14 +74,14 @@ func (q *queue) getQueueByName(name string) taskq.Queue {
|
||||||
|
|
||||||
func (q *queue) Start(ctx context.Context) error {
|
func (q *queue) Start(ctx context.Context) error {
|
||||||
if err := q.factory.StartConsumers(ctx); err != nil {
|
if err := q.factory.StartConsumers(ctx); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't start the queue")
|
return errors.Wrap(err, "couldn't start the queue")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *queue) Close() error {
|
func (q *queue) Close() error {
|
||||||
if err := q.factory.Close(); err != nil {
|
if err := q.factory.Close(); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't close the queue")
|
return errors.Wrap(err, "couldn't close the queue")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -89,10 +89,10 @@ func (q *queue) Close() error {
|
||||||
func (q *queue) Add(name string, msg *taskq.Message) error {
|
func (q *queue) Add(name string, msg *taskq.Message) error {
|
||||||
queue := q.getQueueByName(name)
|
queue := q.getQueueByName(name)
|
||||||
if queue == nil {
|
if queue == nil {
|
||||||
return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name)
|
return errors.Errorf("couldn't add the message to the queue: unknown queue name '%s'", name)
|
||||||
}
|
}
|
||||||
if err := queue.Add(msg); err != nil {
|
if err := queue.Add(msg); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't add the message to the queue")
|
return errors.Wrap(err, "couldn't add the message to the queue")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (t *task) loadLocation(timezone string) (*time.Location, error) {
|
||||||
}
|
}
|
||||||
location, err := time.LoadLocation(timezone)
|
location, err := time.LoadLocation(timezone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "task.loadLocation")
|
return nil, errors.Wrap(err, "couldn't load location for the timezone '"+timezone+"'")
|
||||||
}
|
}
|
||||||
t.cachedLocations.Store(timezone, location)
|
t.cachedLocations.Store(timezone, location)
|
||||||
return location, nil
|
return location, nil
|
||||||
|
@ -51,7 +51,7 @@ func (t *task) loadLocation(timezone string) (*time.Location, error) {
|
||||||
|
|
||||||
func RegisterTasks(cfg *Config) error {
|
func RegisterTasks(cfg *Config) error {
|
||||||
if err := validateConfig(cfg); err != nil {
|
if err := validateConfig(cfg); err != nil {
|
||||||
return errors.Wrap(err, "RegisterTasks")
|
return errors.Wrap(err, "config is invalid")
|
||||||
}
|
}
|
||||||
|
|
||||||
t := &task{
|
t := &task{
|
||||||
|
|
|
@ -30,18 +30,19 @@ func (t *taskDeleteNonExistentVillages) execute() error {
|
||||||
WithField("numberOfServers", len(servers)).
|
WithField("numberOfServers", len(servers)).
|
||||||
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
|
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
s := server
|
|
||||||
err := t.queue.Add(
|
err := t.queue.Add(
|
||||||
queue.Main,
|
queue.Main,
|
||||||
Get(ServerDeleteNonExistentVillages).
|
Get(ServerDeleteNonExistentVillages).
|
||||||
WithArgs(
|
WithArgs(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
twurlbuilder.BuildServerURL(server.Key, server.Version.Host),
|
twurlbuilder.BuildServerURL(server.Key, server.Version.Host),
|
||||||
s,
|
server,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
errors.Wrapf(
|
errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"taskDeleteNonExistentVillages.execute: %s: Couldn't add the task '%s' for this server",
|
"taskDeleteNonExistentVillages.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
|
|
@ -23,7 +23,7 @@ type serverWithURL struct {
|
||||||
|
|
||||||
func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
||||||
if err := t.validatePayload(version); err != nil {
|
if err := t.validatePayload(version); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskLoadServersAndUpdateData.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
entry := log.WithField("host", version.Host)
|
entry := log.WithField("host", version.Host)
|
||||||
|
@ -52,7 +52,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
||||||
Version: version,
|
Version: version,
|
||||||
}
|
}
|
||||||
if err := postgres.CreateSchema(t.db, server); err != nil {
|
if err := postgres.CreateSchema(t.db, server); err != nil {
|
||||||
logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: couldn't create the schema", server.Key))
|
logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: Couldn't create the schema", server.Key))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
servers = append(servers, &serverWithURL{
|
servers = append(servers, &serverWithURL{
|
||||||
|
@ -69,7 +69,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
||||||
Set("version_code = EXCLUDED.version_code").
|
Set("version_code = EXCLUDED.version_code").
|
||||||
Returning("*").
|
Returning("*").
|
||||||
Insert(); err != nil {
|
Insert(); err != nil {
|
||||||
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't insert/update servers")
|
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: Couldn't insert/update servers")
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -79,22 +79,34 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error {
|
||||||
Set("status = ?", twmodel.ServerStatusClosed).
|
Set("status = ?", twmodel.ServerStatusClosed).
|
||||||
Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code).
|
Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code).
|
||||||
Update(); err != nil {
|
Update(); err != nil {
|
||||||
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't update server statuses")
|
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: Couldn't update server statuses")
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
entry.Infof("%s: Servers have been loaded", version.Host)
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
t.queue.Add(queue.Main, Get(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
|
err := t.queue.Add(queue.Main, Get(UpdateServerData).WithArgs(context.Background(), server.url, server.Server))
|
||||||
|
if err != nil {
|
||||||
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
|
errors.Wrapf(
|
||||||
|
err,
|
||||||
|
"taskLoadServersAndUpdateData.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
server.Key,
|
||||||
|
UpdateServerData,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.Infof("%s: Servers have been loaded", version.Host)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskLoadServersAndUpdateData) validatePayload(version *twmodel.Version) error {
|
func (t *taskLoadServersAndUpdateData) validatePayload(version *twmodel.Version) error {
|
||||||
if version == nil {
|
if version == nil {
|
||||||
return errors.New("taskLoadServersAndUpdateData.validatePayload: Expected *twmodel.Version, got nil")
|
return errors.New("expected *twmodel.Version, got nil")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,13 +16,25 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error {
|
||||||
var versions []*twmodel.Version
|
var versions []*twmodel.Version
|
||||||
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Loading versions...")
|
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Loading versions...")
|
||||||
if err := t.db.Model(&versions).Relation("SpecialServers").Select(); err != nil {
|
if err := t.db.Model(&versions).Relation("SpecialServers").Select(); err != nil {
|
||||||
err = errors.Wrap(err, "taskLoadVersionsAndUpdateServerData.execute: couldn't load versions")
|
err = errors.Wrap(err, "taskLoadVersionsAndUpdateServerData.execute: Couldn't load versions")
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, version := range versions {
|
|
||||||
t.queue.Add(queue.Main, Get(LoadServersAndUpdateData).WithArgs(context.Background(), version))
|
|
||||||
}
|
|
||||||
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
|
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
|
||||||
|
for _, version := range versions {
|
||||||
|
err := t.queue.Add(queue.Main, Get(LoadServersAndUpdateData).WithArgs(context.Background(), version))
|
||||||
|
if err != nil {
|
||||||
|
log.
|
||||||
|
WithField("code", version.Code).
|
||||||
|
Warn(
|
||||||
|
errors.Wrapf(
|
||||||
|
err,
|
||||||
|
"taskLoadVersionsAndUpdateServerData.execute: %s: Couldn't add the task '%s' for this version",
|
||||||
|
version.Code,
|
||||||
|
LoadServersAndUpdateData,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ type taskServerDeleteNonExistentVillages struct {
|
||||||
|
|
||||||
func (t *taskServerDeleteNonExistentVillages) execute(url string, server *twmodel.Server) error {
|
func (t *taskServerDeleteNonExistentVillages) execute(url string, server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskServerDeleteNonExistentVillages.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
entry := log.WithField("key", server.Key)
|
entry := log.WithField("key", server.Key)
|
||||||
|
@ -34,7 +34,7 @@ func (t *taskServerDeleteNonExistentVillages) execute(url string, server *twmode
|
||||||
|
|
||||||
func (t *taskServerDeleteNonExistentVillages) validatePayload(server *twmodel.Server) error {
|
func (t *taskServerDeleteNonExistentVillages) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("taskUpdateServerData.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.New("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -49,7 +49,7 @@ type workerDeleteNonExistentVillages struct {
|
||||||
func (w *workerDeleteNonExistentVillages) delete() error {
|
func (w *workerDeleteNonExistentVillages) delete() error {
|
||||||
villages, err := w.dataloader.LoadVillages()
|
villages, err := w.dataloader.LoadVillages()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
|
return errors.Wrap(err, "couldn't load villages")
|
||||||
}
|
}
|
||||||
var idsToDelete []int
|
var idsToDelete []int
|
||||||
searchableByVillageID := &villagesSearchableByID{villages}
|
searchableByVillageID := &villagesSearchableByID{villages}
|
||||||
|
@ -63,17 +63,17 @@ func (w *workerDeleteNonExistentVillages) delete() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
|
return errors.Wrap(err, "couldn't determine which villages should be deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
totalDeleted := 0
|
totalDeleted := 0
|
||||||
if len(idsToDelete) > 0 {
|
if len(idsToDelete) > 0 {
|
||||||
result, err := w.db.Model(&twmodel.Village{}).Where("id = ANY(?)", pg.Array(idsToDelete)).Delete()
|
result, err := w.db.Model(&twmodel.Village{}).Where("id = ANY(?)", pg.Array(idsToDelete)).Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
|
return errors.Wrap(err, "couldn't delete villages that don't exist")
|
||||||
}
|
}
|
||||||
totalDeleted = result.RowsAffected()
|
totalDeleted = result.RowsAffected()
|
||||||
}
|
}
|
||||||
log.Debugf("%s: deleted %d villages", w.server.Key, totalDeleted)
|
log.WithField("key", w.server.Key).Debugf("%s: deleted %d villages", w.server.Key, totalDeleted)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,14 +27,15 @@ func (t *taskUpdateEnnoblements) execute() error {
|
||||||
}
|
}
|
||||||
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
|
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
s := server
|
|
||||||
err := t.queue.Add(
|
err := t.queue.Add(
|
||||||
queue.Ennoblements,
|
queue.Ennoblements,
|
||||||
Get(UpdateServerEnnoblements).
|
Get(UpdateServerEnnoblements).
|
||||||
WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), s),
|
WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
errors.Wrapf(
|
errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server",
|
"taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
|
|
@ -43,10 +43,11 @@ func (t *taskUpdateHistory) execute(timezone string) error {
|
||||||
WithField("numberOfServers", len(servers)).
|
WithField("numberOfServers", len(servers)).
|
||||||
Info("taskUpdateHistory.execute: Update of the history has started")
|
Info("taskUpdateHistory.execute: Update of the history has started")
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
s := server
|
err := t.queue.Add(queue.Main, Get(UpdateServerHistory).WithArgs(context.Background(), timezone, server))
|
||||||
err := t.queue.Add(queue.Main, Get(UpdateServerHistory).WithArgs(context.Background(), timezone, s))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
errors.Wrapf(
|
errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server",
|
"taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
|
|
@ -16,7 +16,7 @@ type taskUpdateServerData struct {
|
||||||
|
|
||||||
func (t *taskUpdateServerData) execute(url string, server *twmodel.Server) error {
|
func (t *taskUpdateServerData) execute(url string, server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskUpdateServerData.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -38,13 +38,13 @@ func (t *taskUpdateServerData) execute(url string, server *twmodel.Server) error
|
||||||
"duration": duration.Nanoseconds(),
|
"duration": duration.Nanoseconds(),
|
||||||
"durationPretty": duration.String(),
|
"durationPretty": duration.String(),
|
||||||
}).
|
}).
|
||||||
Infof("taskUpdateServerData.execute: %s: data has been updated", server.Key)
|
Infof("taskUpdateServerData.execute: %s: the server data has been updated", server.Key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskUpdateServerData) validatePayload(server *twmodel.Server) error {
|
func (t *taskUpdateServerData) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("taskUpdateServerData.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.New("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -71,8 +71,9 @@ func (w *workerUpdateServerData) loadPlayers(od map[int]*twmodel.OpponentsDefeat
|
||||||
Model(&ennoblements).
|
Model(&ennoblements).
|
||||||
DistinctOn("new_owner_id").
|
DistinctOn("new_owner_id").
|
||||||
Order("new_owner_id ASC", "ennobled_at ASC").
|
Order("new_owner_id ASC", "ennobled_at ASC").
|
||||||
|
Column("ennobled_at", "new_owner_id").
|
||||||
Select(); err != nil {
|
Select(); err != nil {
|
||||||
return result, errors.Wrap(err, "workerUpdateServerData.loadPlayers: couldn't load ennoblements")
|
return result, errors.Wrap(err, "couldn't load ennoblements")
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -118,7 +119,7 @@ func (w *workerUpdateServerData) loadPlayers(od map[int]*twmodel.OpponentsDefeat
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return result, errors.Wrap(err, "workerUpdateServerData.loadPlayers: Players that have been deleted couldn't be detected")
|
return result, errors.Wrap(err, "couldn't determine which players should be deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
@ -136,7 +137,7 @@ func (w *workerUpdateServerData) loadTribes(od map[int]*twmodel.OpponentsDefeate
|
||||||
result := loadTribesResult{}
|
result := loadTribesResult{}
|
||||||
result.tribes, err = w.dataloader.LoadTribes()
|
result.tribes, err = w.dataloader.LoadTribes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, errors.Wrap(err, "workerUpdateServerData.loadTribes")
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result.numberOfTribes = len(result.tribes)
|
result.numberOfTribes = len(result.tribes)
|
||||||
|
@ -165,7 +166,7 @@ func (w *workerUpdateServerData) loadTribes(od map[int]*twmodel.OpponentsDefeate
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return result, errors.Wrap(err, "workerUpdateServerData.loadTribes: Tribes that have been deleted couldn't be detected")
|
return result, errors.Wrap(err, "couldn't determine which tribes should be deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
@ -238,43 +239,43 @@ func (w *workerUpdateServerData) calculateDailyPlayerStats(
|
||||||
func (w *workerUpdateServerData) update() error {
|
func (w *workerUpdateServerData) update() error {
|
||||||
pod, err := w.dataloader.LoadOD(false)
|
pod, err := w.dataloader.LoadOD(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load players OD")
|
||||||
}
|
}
|
||||||
|
|
||||||
tod, err := w.dataloader.LoadOD(true)
|
tod, err := w.dataloader.LoadOD(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load tribes OD")
|
||||||
}
|
}
|
||||||
|
|
||||||
villages, err := w.dataloader.LoadVillages()
|
villages, err := w.dataloader.LoadVillages()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load villages")
|
||||||
}
|
}
|
||||||
numberOfVillages := len(villages)
|
numberOfVillages := len(villages)
|
||||||
|
|
||||||
tribesResult, err := w.loadTribes(tod, countPlayerVillages(villages))
|
tribesResult, err := w.loadTribes(tod, countPlayerVillages(villages))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load tribes")
|
||||||
}
|
}
|
||||||
|
|
||||||
playersResult, err := w.loadPlayers(pod)
|
playersResult, err := w.loadPlayers(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load players")
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg, err := w.dataloader.GetConfig()
|
cfg, err := w.dataloader.GetConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load server config")
|
||||||
}
|
}
|
||||||
|
|
||||||
buildingCfg, err := w.dataloader.GetBuildingConfig()
|
buildingCfg, err := w.dataloader.GetBuildingConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load building config")
|
||||||
}
|
}
|
||||||
|
|
||||||
unitCfg, err := w.dataloader.GetUnitConfig()
|
unitCfg, err := w.dataloader.GetUnitConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "workerUpdateServerData.update")
|
return errors.Wrap(err, "couldn't load unit config")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
|
|
@ -13,7 +13,7 @@ type taskUpdateServerEnnoblements struct {
|
||||||
|
|
||||||
func (t *taskUpdateServerEnnoblements) execute(url string, server *twmodel.Server) error {
|
func (t *taskUpdateServerEnnoblements) execute(url string, server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskUpdateServerEnnoblements.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
entry := log.WithField("key", server.Key)
|
entry := log.WithField("key", server.Key)
|
||||||
|
@ -27,14 +27,14 @@ func (t *taskUpdateServerEnnoblements) execute(url string, server *twmodel.Serve
|
||||||
entry.Error(err)
|
entry.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry.Debugf("%s: ennoblements have been updated", server.Key)
|
entry.Debugf("%s: the ennoblements have been updated", server.Key)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskUpdateServerEnnoblements) validatePayload(server *twmodel.Server) error {
|
func (t *taskUpdateServerEnnoblements) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.Errorf("taskUpdateServerEnnoblements.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.Errorf("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -13,7 +13,7 @@ type taskUpdateServerHistory struct {
|
||||||
|
|
||||||
func (t *taskUpdateServerHistory) execute(timezone string, server *twmodel.Server) error {
|
func (t *taskUpdateServerHistory) execute(timezone string, server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskUpdateServerHistory.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
location, err := t.loadLocation(timezone)
|
location, err := t.loadLocation(timezone)
|
||||||
|
@ -23,7 +23,7 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *twmodel.Serve
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry := log.WithField("key", server.Key)
|
entry := log.WithField("key", server.Key)
|
||||||
entry.Infof("taskUpdateServerHistory.execute: %s: update of the history has started...", server.Key)
|
entry.Infof("taskUpdateServerHistory.execute: %s: Update of the server history has started...", server.Key)
|
||||||
err = (&workerUpdateServerHistory{
|
err = (&workerUpdateServerHistory{
|
||||||
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||||
server: server,
|
server: server,
|
||||||
|
@ -34,14 +34,14 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *twmodel.Serve
|
||||||
entry.Error(err)
|
entry.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry.Infof("taskUpdateServerHistory.execute: %s: history has been updated", server.Key)
|
entry.Infof("taskUpdateServerHistory.execute: %s: The server history has been updated", server.Key)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskUpdateServerHistory) validatePayload(server *twmodel.Server) error {
|
func (t *taskUpdateServerHistory) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("taskUpdateServerHistory.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.New("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -13,7 +13,7 @@ type taskUpdateServerStats struct {
|
||||||
|
|
||||||
func (t *taskUpdateServerStats) execute(timezone string, server *twmodel.Server) error {
|
func (t *taskUpdateServerStats) execute(timezone string, server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskUpdateServerStats.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
location, err := t.loadLocation(timezone)
|
location, err := t.loadLocation(timezone)
|
||||||
|
@ -23,7 +23,7 @@ func (t *taskUpdateServerStats) execute(timezone string, server *twmodel.Server)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry := log.WithField("key", server.Key)
|
entry := log.WithField("key", server.Key)
|
||||||
entry.Infof("taskUpdateServerStats.execute: %s: update of the stats has started...", server.Key)
|
entry.Infof("taskUpdateServerStats.execute: %s: Update of the server stats has started...", server.Key)
|
||||||
err = (&workerUpdateServerStats{
|
err = (&workerUpdateServerStats{
|
||||||
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||||
server: server,
|
server: server,
|
||||||
|
@ -34,14 +34,14 @@ func (t *taskUpdateServerStats) execute(timezone string, server *twmodel.Server)
|
||||||
entry.Error(err)
|
entry.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry.Infof("taskUpdateServerStats.execute: %s: stats have been updated", server.Key)
|
entry.Infof("taskUpdateServerStats.execute: %s: The server stats have been updated", server.Key)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskUpdateServerStats) validatePayload(server *twmodel.Server) error {
|
func (t *taskUpdateServerStats) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("taskUpdateServerStats.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.New("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -56,39 +56,39 @@ type workerUpdateServerStats struct {
|
||||||
func (w *workerUpdateServerStats) prepare() (*twmodel.ServerStats, error) {
|
func (w *workerUpdateServerStats) prepare() (*twmodel.ServerStats, error) {
|
||||||
activePlayers, err := w.db.Model(&twmodel.Player{}).Where("exists = true").Count()
|
activePlayers, err := w.db.Model(&twmodel.Player{}).Where("exists = true").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count active players")
|
return nil, errors.Wrap(err, "couldn't count active players")
|
||||||
}
|
}
|
||||||
inactivePlayers, err := w.db.Model(&twmodel.Player{}).Where("exists = false").Count()
|
inactivePlayers, err := w.db.Model(&twmodel.Player{}).Where("exists = false").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count inactive players")
|
return nil, errors.Wrap(err, "couldn't count inactive players")
|
||||||
}
|
}
|
||||||
players := activePlayers + inactivePlayers
|
players := activePlayers + inactivePlayers
|
||||||
|
|
||||||
activeTribes, err := w.db.Model(&twmodel.Tribe{}).Where("exists = true").Count()
|
activeTribes, err := w.db.Model(&twmodel.Tribe{}).Where("exists = true").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count active tribes")
|
return nil, errors.Wrap(err, "couldn't count active tribes")
|
||||||
}
|
}
|
||||||
inactiveTribes, err := w.db.Model(&twmodel.Tribe{}).Where("exists = false").Count()
|
inactiveTribes, err := w.db.Model(&twmodel.Tribe{}).Where("exists = false").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count inactive tribes")
|
return nil, errors.Wrap(err, "couldn't count inactive tribes")
|
||||||
}
|
}
|
||||||
tribes := activeTribes + inactiveTribes
|
tribes := activeTribes + inactiveTribes
|
||||||
|
|
||||||
barbarianVillages, err := w.db.Model(&twmodel.Village{}).Where("player_id = 0").Count()
|
barbarianVillages, err := w.db.Model(&twmodel.Village{}).Where("player_id = 0").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count barbarian villages")
|
return nil, errors.Wrap(err, "couldn't count barbarian villages")
|
||||||
}
|
}
|
||||||
bonusVillages, err := w.db.Model(&twmodel.Village{}).Where("bonus <> 0").Count()
|
bonusVillages, err := w.db.Model(&twmodel.Village{}).Where("bonus <> 0").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count bonus villages")
|
return nil, errors.Wrap(err, "couldn't count bonus villages")
|
||||||
}
|
}
|
||||||
playerVillages, err := w.db.Model(&twmodel.Village{}).Where("player_id <> 0").Count()
|
playerVillages, err := w.db.Model(&twmodel.Village{}).Where("player_id <> 0").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count player villages")
|
return nil, errors.Wrap(err, "couldn't count player villages")
|
||||||
}
|
}
|
||||||
villages, err := w.db.Model(&twmodel.Village{}).Count()
|
villages, err := w.db.Model(&twmodel.Village{}).Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot count villages")
|
return nil, errors.Wrap(err, "couldn't count villages")
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().In(w.location)
|
now := time.Now().In(w.location)
|
||||||
|
|
|
@ -43,10 +43,11 @@ func (t *taskUpdateStats) execute(timezone string) error {
|
||||||
WithField("numberOfServers", len(servers)).
|
WithField("numberOfServers", len(servers)).
|
||||||
Info("taskUpdateStats.execute: Update of the stats has started")
|
Info("taskUpdateStats.execute: Update of the stats has started")
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
s := server
|
err := t.queue.Add(queue.Main, Get(UpdateServerStats).WithArgs(context.Background(), timezone, server))
|
||||||
err := t.queue.Add(queue.Main, Get(UpdateServerStats).WithArgs(context.Background(), timezone, s))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
errors.Wrapf(
|
errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server",
|
"taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
|
|
@ -22,12 +22,13 @@ func (t *taskVacuum) execute() error {
|
||||||
log.Errorln(err)
|
log.Errorln(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Infof("taskVacuum.execute: Start database vacumming...")
|
log.Infof("taskVacuum.execute: The database vacumming process has started...")
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
s := server
|
err := t.queue.Add(queue.Main, Get(VacuumServerDB).WithArgs(context.Background(), server))
|
||||||
err := t.queue.Add(queue.Main, Get(VacuumServerDB).WithArgs(context.Background(), s))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(
|
log.
|
||||||
|
WithField("key", server.Key).
|
||||||
|
Warn(
|
||||||
errors.Wrapf(
|
errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"taskVacuum.execute: %s: Couldn't add the task '%s' for this server",
|
"taskVacuum.execute: %s: Couldn't add the task '%s' for this server",
|
||||||
|
|
|
@ -17,11 +17,11 @@ type taskVacuumServerDB struct {
|
||||||
|
|
||||||
func (t *taskVacuumServerDB) execute(server *twmodel.Server) error {
|
func (t *taskVacuumServerDB) execute(server *twmodel.Server) error {
|
||||||
if err := t.validatePayload(server); err != nil {
|
if err := t.validatePayload(server); err != nil {
|
||||||
log.Debug(err)
|
log.Debug(errors.Wrap(err, "taskVacuumServerDB.execute"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
entry := log.WithField("key", server.Key)
|
entry := log.WithField("key", server.Key)
|
||||||
entry.Infof("taskVacuumServerDB.execute: %s: vacumming the database...", server.Key)
|
entry.Infof("taskVacuumServerDB.execute: %s: Vacumming the database...", server.Key)
|
||||||
err := (&workerVacuumServerDB{
|
err := (&workerVacuumServerDB{
|
||||||
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
|
||||||
server: server,
|
server: server,
|
||||||
|
@ -31,14 +31,14 @@ func (t *taskVacuumServerDB) execute(server *twmodel.Server) error {
|
||||||
entry.Error(err)
|
entry.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
entry.Infof("taskVacuumServerDB.execute: %s: the database has been vacummed", server.Key)
|
entry.Infof("taskVacuumServerDB.execute: %s: The database has been vacummed", server.Key)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error {
|
func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error {
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("taskVacuumServerDB.validatePayload: Expected *twmodel.Server, got nil")
|
return errors.New("expected *twmodel.Server, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -28,7 +28,7 @@ func Connect(cfg *Config) (*pg.DB, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := prepareDB(db); err != nil {
|
if err := prepareDB(db); err != nil {
|
||||||
return nil, errors.Wrap(err, "Connect")
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
|
@ -47,11 +47,11 @@ func prepareOptions() *pg.Options {
|
||||||
func prepareDB(db *pg.DB) error {
|
func prepareDB(db *pg.DB) error {
|
||||||
tx, err := db.Begin()
|
tx, err := db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Couldn't start a transaction")
|
return errors.Wrap(err, "couldn't start a transaction")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tx.Close(); err != nil {
|
if err := tx.Close(); err != nil {
|
||||||
log.Warn(errors.Wrap(err, "prepareDB: Couldn't rollback the transaction"))
|
log.Warn(errors.Wrap(err, "prepareDB: couldn't rollback the transaction"))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ func prepareDB(db *pg.DB) error {
|
||||||
IfNotExists: true,
|
IfNotExists: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Couldn't create the table")
|
return errors.Wrap(err, "couldn't create the table")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,17 +96,17 @@ func prepareDB(db *pg.DB) error {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
if _, err := tx.Exec(s.statement, s.params...); err != nil {
|
if _, err := tx.Exec(s.statement, s.params...); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't initialize the db")
|
return errors.Wrap(err, "couldn't prepare the db")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't commit changes")
|
return errors.Wrap(err, "couldn't commit changes")
|
||||||
}
|
}
|
||||||
|
|
||||||
var servers []*twmodel.Server
|
var servers []*twmodel.Server
|
||||||
if err := db.Model(&servers).Select(); err != nil {
|
if err := db.Model(&servers).Select(); err != nil {
|
||||||
return errors.Wrap(err, "Couldn't load servers")
|
return errors.Wrap(err, "couldn't load servers")
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
|
@ -141,7 +141,7 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error {
|
||||||
|
|
||||||
tx, err := db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
|
tx, err := db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "CreateSchema: couldn't start a transaction")
|
return errors.Wrap(err, "couldn't start a transaction")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tx.Close(); err != nil {
|
if err := tx.Close(); err != nil {
|
||||||
|
@ -150,7 +150,7 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
|
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
|
||||||
return errors.Wrap(err, "CreateSchema: couldn't create the schema")
|
return errors.Wrap(err, "couldn't create for the server '"+server.Key+"'")
|
||||||
}
|
}
|
||||||
|
|
||||||
dbModels := []interface{}{
|
dbModels := []interface{}{
|
||||||
|
@ -185,12 +185,12 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error {
|
||||||
}
|
}
|
||||||
for _, statement := range statements {
|
for _, statement := range statements {
|
||||||
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
|
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
|
||||||
return errors.Wrap(err, "CreateSchema: couldn't initialize the schema")
|
return errors.Wrap(err, "couldn't initialize the schema")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return errors.Wrap(err, "CreateSchema: couldn't commit changes")
|
return errors.Wrap(err, "couldn't commit changes")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
4
main.go
4
main.go
|
@ -54,10 +54,10 @@ func main() {
|
||||||
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
|
WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Fatal(err)
|
logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance"))
|
||||||
}
|
}
|
||||||
if err := c.Start(context.Background()); err != nil {
|
if err := c.Start(context.Background()); err != nil {
|
||||||
logrus.Fatal(err)
|
logrus.Fatal(errors.Wrap(err, "Couldn't start the cron"))
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Info("Cron is up and running!")
|
logrus.Info("Cron is up and running!")
|
||||||
|
|
Reference in New Issue
Block a user