From add1379e0b7417cc5479c7340808a30963855acc Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Tue, 27 Apr 2021 20:06:38 +0200 Subject: [PATCH] make messages more clear, fix most of the unhandled error --- README.md | 7 ++-- cron/cron.go | 44 ++++++++++++++++++++---- cron/queue/queue.go | 14 ++++---- cron/tasks/task_update_ennoblements.go | 14 ++++++-- cron/tasks/task_update_history.go | 13 +++++-- cron/tasks/task_update_server_data.go | 14 +++++--- cron/tasks/task_update_server_history.go | 10 ++++-- cron/tasks/task_update_server_stats.go | 10 ++++-- cron/tasks/task_update_stats.go | 12 +++++-- cron/tasks/task_vacuum.go | 14 ++++++-- cron/tasks/task_vacuum_server_db.go | 16 ++++++--- db/db.go | 12 +++++-- main.go | 35 +++++++++++++++---- 13 files changed, 165 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index a1f0268..d9e8d6d 100644 --- a/README.md +++ b/README.md @@ -26,17 +26,18 @@ REDIS_PASSWORD=redis_password RUN_ON_INIT=true|false LOG_DB_QUERIES=true|false -WORKER_LIMIT=1 #how many servers should update at the same time +WORKER_LIMIT=1 ``` ### Prerequisites 1. Golang -2. PostgreSQL database +2. PostgreSQL +3. Redis ### Installing 1. Clone this repo. 2. Navigate to the directory where you have cloned this repo. -3. Set the required env variables directly in your system or create .env.development file. +3. Set the required env variables directly in your system or create .env.local file. 4. go run main.go diff --git a/cron/cron.go b/cron/cron.go index 8da6e1a..b82cf04 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/go-pg/pg/v10" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/robfig/cron/v3" "github.com/tribalwarshelp/shared/models" @@ -103,23 +104,49 @@ func (c *Cron) Stop() error { } func (c *Cron) updateServerData() { - c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) + err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background())) + if err != nil { + c.logError("Cron.updateServerData", tasks.TaskNameLoadVersionsAndUpdateServerData, err) + } } func (c *Cron) updateEnnoblements() { - c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) + err := c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background())) + if err != nil { + c.logError("Cron.updateEnnoblements", tasks.TaskUpdateEnnoblements, err) + } } func (c *Cron) updateHistory(timezone string) { - c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone)) + if err != nil { + c.logError("Cron.updateHistory", tasks.TaskUpdateHistory, err) + } } func (c *Cron) updateStats(timezone string) { - c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone)) + if err != nil { + c.logError("Cron.updateStats", tasks.TaskUpdateStats, err) + } } func (c *Cron) vacuumDatabase() { - c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background())) + err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background())) + if err != nil { + c.logError("Cron.vacuumDatabase", tasks.TaskNameVacuum, err) + } +} + +func (c *Cron) logError(prefix string, taskName string, err error) { + logrus.Error( + errors.Wrapf( + err, + "%s: Couldn't add the task '%s' to the queue", + prefix, + taskName, + ), + ) } func initializeQueue(cfg *Config) (queue.Queue, error) { @@ -128,12 +155,15 @@ func initializeQueue(cfg *Config) (queue.Queue, error) { Redis: cfg.Redis, }) if err != nil { - return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task q") + return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue") } - tasks.RegisterTasks(&tasks.Config{ + err = tasks.RegisterTasks(&tasks.Config{ DB: cfg.DB, Queue: q, }) + if err != nil { + return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue") + } return q, nil } diff --git a/cron/queue/queue.go b/cron/queue/queue.go index 4656537..197d076 100644 --- a/cron/queue/queue.go +++ b/cron/queue/queue.go @@ -10,17 +10,17 @@ import ( "github.com/vmihailenco/taskq/v3/redisq" ) -type QueueName string +type Name string const ( - MainQueue QueueName = "main" - EnnoblementsQueue QueueName = "ennoblements" + MainQueue Name = "main" + EnnoblementsQueue Name = "ennoblements" ) type Queue interface { Start(ctx context.Context) error Close() error - Add(name QueueName, msg *taskq.Message) error + Add(name Name, msg *taskq.Message) error } type queue struct { @@ -54,7 +54,7 @@ func (q *queue) init(cfg *Config) error { return nil } -func (q *queue) registerQueue(name QueueName, limit int) taskq.Queue { +func (q *queue) registerQueue(name Name, limit int) taskq.Queue { return q.factory.RegisterQueue(&taskq.QueueOptions{ Name: string(name), ReservationTimeout: time.Minute * 2, @@ -64,7 +64,7 @@ func (q *queue) registerQueue(name QueueName, limit int) taskq.Queue { }) } -func (q *queue) getQueueByName(name QueueName) taskq.Queue { +func (q *queue) getQueueByName(name Name) taskq.Queue { switch name { case MainQueue: return q.mainQueue @@ -88,7 +88,7 @@ func (q *queue) Close() error { return nil } -func (q *queue) Add(name QueueName, msg *taskq.Message) error { +func (q *queue) Add(name Name, msg *taskq.Message) error { queue := q.getQueueByName(name) if queue == nil { return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name) diff --git a/cron/tasks/task_update_ennoblements.go b/cron/tasks/task_update_ennoblements.go index af3c3ea..8f08e91 100644 --- a/cron/tasks/task_update_ennoblements.go +++ b/cron/tasks/task_update_ennoblements.go @@ -25,14 +25,24 @@ func (t *taskUpdateEnnoblements) execute() error { log.Errorln(err) return err } - log.WithField("numberOfServers", len(servers)).Info("Update of the ennoblements has started...") + log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") for _, server := range servers { s := server - t.queue.Add( + err := t.queue.Add( queue.EnnoblementsQueue, Get(TaskUpdateServerEnnoblements). WithArgs(context.Background(), fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), s), ) + if err != nil { + log.Warn( + errors.Wrapf( + err, + "taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server", + server.Key, + TaskUpdateServerEnnoblements, + ), + ) + } } return nil } diff --git a/cron/tasks/task_update_history.go b/cron/tasks/task_update_history.go index 02b2b32..c5e115f 100644 --- a/cron/tasks/task_update_history.go +++ b/cron/tasks/task_update_history.go @@ -41,10 +41,19 @@ func (t *taskUpdateHistory) execute(timezone string) error { } entry. WithField("numberOfServers", len(servers)). - Info("Update of the history has started") + Info("taskUpdateHistory.execute: Update of the history has started") for _, server := range servers { s := server - t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s)) + err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s)) + log.Warn( + errors.Wrapf( + err, + "taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server", + server.Key, + TaskUpdateServerHistory, + ), + ) + } return nil } diff --git a/cron/tasks/task_update_server_data.go b/cron/tasks/task_update_server_data.go index 9c7b074..04c8b17 100644 --- a/cron/tasks/task_update_server_data.go +++ b/cron/tasks/task_update_server_data.go @@ -20,7 +20,7 @@ func (t *taskUpdateServerData) execute(url string, server *models.Server) error } now := time.Now() entry := log.WithField("key", server.Key) - entry.Infof("%s: updating data...", server.Key) + entry.Infof("taskUpdateServerData.execute: %s: Update of the server data has started...", server.Key) err := (&workerUpdateServerData{ db: t.db.WithParam("SERVER", pg.Safe(server.Key)), dataloader: newDataloader(url), @@ -37,13 +37,13 @@ func (t *taskUpdateServerData) execute(url string, server *models.Server) error "duration": duration.Nanoseconds(), "durationPretty": duration.String(), }). - Infof("%s has been updated", server.Key) + Infof("taskUpdateServerData.execute: %s: data has been updated", server.Key) return nil } func (t *taskUpdateServerData) validatePayload(server *models.Server) error { if server == nil { - return errors.Errorf("taskLoadServersAndUpdateData.validatePayload: Expected *models.Server, got nil") + return errors.Errorf("taskUpdateServerData.validatePayload: Expected *models.Server, got nil") } return nil @@ -59,7 +59,7 @@ func (w *workerUpdateServerData) loadPlayers(od map[int]*models.OpponentsDefeate var ennoblements = []*models.Ennoblement{} err := w.db.Model(&ennoblements).DistinctOn("new_owner_id").Order("new_owner_id ASC", "ennobled_at ASC").Select() if err != nil { - return nil, errors.Wrap(err, "loadPlayers: couldn't load ennoblements") + return nil, errors.Wrap(err, "workerUpdateServerData.loadPlayers: couldn't load ennoblements") } players, err := w.dataloader.LoadPlayers() @@ -210,7 +210,11 @@ func (w *workerUpdateServerData) update() error { if err != nil { return err } - defer tx.Close() + defer func(s *models.Server) { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key)) + } + }(w.server) if len(tribes) > 0 { ids := []int{} diff --git a/cron/tasks/task_update_server_history.go b/cron/tasks/task_update_server_history.go index 28a8859..df01bdc 100644 --- a/cron/tasks/task_update_server_history.go +++ b/cron/tasks/task_update_server_history.go @@ -23,7 +23,7 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *models.Server return err } entry := log.WithField("key", server.Key) - entry.Infof("%s: update of the history has started...", server.Key) + entry.Infof("taskUpdateServerHistory.execute: %s: update of the history has started...", server.Key) err = (&workerUpdateServerHistory{ db: t.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, @@ -34,7 +34,7 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *models.Server entry.Error(err) return err } - entry.Infof("%s: history has been updated", server.Key) + entry.Infof("taskUpdateServerHistory.execute: %s: history has been updated", server.Key) return nil } @@ -97,7 +97,11 @@ func (w *workerUpdateServerHistory) update() error { if err != nil { return err } - defer tx.Close() + defer func(s *models.Server) { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key)) + } + }(w.server) if len(ph) > 0 { if _, err := w.db.Model(&ph).Insert(); err != nil { diff --git a/cron/tasks/task_update_server_stats.go b/cron/tasks/task_update_server_stats.go index 797e17f..2c77307 100644 --- a/cron/tasks/task_update_server_stats.go +++ b/cron/tasks/task_update_server_stats.go @@ -23,7 +23,7 @@ func (t *taskUpdateServerStats) execute(timezone string, server *models.Server) return err } entry := log.WithField("key", server.Key) - entry.Infof("%s: update of the stats has started...", server.Key) + entry.Infof("taskUpdateServerStats.execute: %s: update of the stats has started...", server.Key) err = (&workerUpdateServerStats{ db: t.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, @@ -34,7 +34,7 @@ func (t *taskUpdateServerStats) execute(timezone string, server *models.Server) entry.Error(err) return err } - entry.Infof("%s: stats have been updated", server.Key) + entry.Infof("taskUpdateServerStats.execute: %s: stats have been updated", server.Key) return nil } @@ -120,7 +120,11 @@ func (w *workerUpdateServerStats) update() error { if err != nil { return err } - defer tx.Close() + defer func(s *models.Server) { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key)) + } + }(w.server) if _, err := tx.Model(stats).Insert(); err != nil { return errors.Wrap(err, "cannot insert server stats") diff --git a/cron/tasks/task_update_stats.go b/cron/tasks/task_update_stats.go index 5020443..5a4c43b 100644 --- a/cron/tasks/task_update_stats.go +++ b/cron/tasks/task_update_stats.go @@ -41,10 +41,18 @@ func (t *taskUpdateStats) execute(timezone string) error { } entry. WithField("numberOfServers", len(servers)). - Info("Update of the stats has started") + Info("taskUpdateStats.execute: Update of the stats has started") for _, server := range servers { s := server - t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s)) + err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s)) + log.Warn( + errors.Wrapf( + err, + "taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server", + server.Key, + TaskUpdateServerStats, + ), + ) } return nil } diff --git a/cron/tasks/task_vacuum.go b/cron/tasks/task_vacuum.go index d83d8bb..0dda3e7 100644 --- a/cron/tasks/task_vacuum.go +++ b/cron/tasks/task_vacuum.go @@ -22,10 +22,20 @@ func (t *taskVacuum) execute() error { log.Errorln(err) return err } - log.Infof("Start database vacuuming...") + log.Infof("taskVacuum.execute: Start database vacumming...") for _, server := range servers { s := server - t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s)) + err := t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s)) + if err != nil { + log.Warn( + errors.Wrapf( + err, + "taskVacuum.execute: %s: Couldn't add the task '%s' for this server", + server.Key, + TaskUpdateServerEnnoblements, + ), + ) + } } return nil } diff --git a/cron/tasks/task_vacuum_server_db.go b/cron/tasks/task_vacuum_server_db.go index 1a04e88..719f2e8 100644 --- a/cron/tasks/task_vacuum_server_db.go +++ b/cron/tasks/task_vacuum_server_db.go @@ -21,16 +21,17 @@ func (t *taskVacuumServerDB) execute(server *models.Server) error { return nil } entry := log.WithField("key", server.Key) - entry.Infof("%s: vacumming the database...", server.Key) + entry.Infof("taskVacuumServerDB.execute: %s: vacumming the database...", server.Key) err := (&workerVacuumServerDB{ - db: t.db.WithParam("SERVER", pg.Safe(server.Key)), + db: t.db.WithParam("SERVER", pg.Safe(server.Key)), + server: server, }).vacuum() if err != nil { err = errors.Wrap(err, "taskVacuumServerDB.execute") entry.Error(err) return err } - entry.Infof("%s: the database has been vacummed", server.Key) + entry.Infof("taskVacuumServerDB.execute: %s: the database has been vacummed", server.Key) return nil } @@ -44,7 +45,8 @@ func (t *taskVacuumServerDB) validatePayload(server *models.Server) error { } type workerVacuumServerDB struct { - db *pg.DB + db *pg.DB + server *models.Server } func (w *workerVacuumServerDB) vacuum() error { @@ -52,7 +54,11 @@ func (w *workerVacuumServerDB) vacuum() error { if err != nil { return err } - defer tx.Close() + defer func(s *models.Server) { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key)) + } + }(w.server) withNonExistentPlayers := w.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'") withNonExistentTribes := w.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'") diff --git a/db/db.go b/db/db.go index d946e48..59eb452 100644 --- a/db/db.go +++ b/db/db.go @@ -50,7 +50,11 @@ func prepareDB(db *pg.DB) error { if err != nil { return errors.Wrap(err, "Couldn't start a transaction") } - defer tx.Close() + defer func() { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrap(err, "prepareDB: Couldn't rollback the transaction")) + } + }() dbModels := []interface{}{ (*models.SpecialServer)(nil), @@ -140,7 +144,11 @@ func createSchema(db *pg.DB, server *models.Server, init bool) error { if err != nil { return errors.Wrap(err, "CreateSchema: couldn't start a transaction") } - defer tx.Close() + defer func() { + if err := tx.Close(); err != nil { + log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction")) + } + }() if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil { return errors.Wrap(err, "CreateSchema: couldn't create the schema") diff --git a/main.go b/main.go index c50a321..2f355d6 100644 --- a/main.go +++ b/main.go @@ -21,12 +21,9 @@ import ( ) func init() { - os.Setenv("TZ", "UTC") - - if mode.Get() == mode.DevelopmentMode { - godotenv.Load(".env.local") + if err := setupENVs(); err != nil { + logrus.Fatal(err) } - setupLogger() } @@ -35,13 +32,21 @@ func main() { if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) } - defer redisClient.Close() + defer func() { + if err := redisClient.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection")) + } + }() dbConn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")}) if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) } - defer dbConn.Close() + defer func() { + if err := dbConn.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the db connection")) + } + }() logrus.Info("Connection with the database has been established") c, err := twhelpcron.New(&twhelpcron.Config{ @@ -94,6 +99,22 @@ func setupLogger() { } } +func setupENVs() error { + err := os.Setenv("TZ", "UTC") + if err != nil { + return errors.Wrap(err, "setupENVs") + } + + if mode.Get() == mode.DevelopmentMode { + err := godotenv.Load(".env.local") + if err != nil { + return errors.Wrap(err, "setupENVs") + } + } + + return nil +} + func initializeRedis() (redis.UniversalClient, error) { client := redis.NewClient(&redis.Options{ Addr: envutils.GetenvString("REDIS_ADDR"),