make messages more clear, fix most of the unhandled error

This commit is contained in:
Dawid Wysokiński 2021-04-27 20:06:38 +02:00
parent e29807e077
commit add1379e0b
13 changed files with 165 additions and 50 deletions

View File

@ -26,17 +26,18 @@ REDIS_PASSWORD=redis_password
RUN_ON_INIT=true|false
LOG_DB_QUERIES=true|false
WORKER_LIMIT=1 #how many servers should update at the same time
WORKER_LIMIT=1
```
### Prerequisites
1. Golang
2. PostgreSQL database
2. PostgreSQL
3. Redis
### Installing
1. Clone this repo.
2. Navigate to the directory where you have cloned this repo.
3. Set the required env variables directly in your system or create .env.development file.
3. Set the required env variables directly in your system or create .env.local file.
4. go run main.go

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/robfig/cron/v3"
"github.com/tribalwarshelp/shared/models"
@ -103,23 +104,49 @@ func (c *Cron) Stop() error {
}
func (c *Cron) updateServerData() {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.updateServerData", tasks.TaskNameLoadVersionsAndUpdateServerData, err)
}
}
func (c *Cron) updateEnnoblements() {
c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
err := c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.updateEnnoblements", tasks.TaskUpdateEnnoblements, err)
}
}
func (c *Cron) updateHistory(timezone string) {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone))
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone))
if err != nil {
c.logError("Cron.updateHistory", tasks.TaskUpdateHistory, err)
}
}
func (c *Cron) updateStats(timezone string) {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone))
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone))
if err != nil {
c.logError("Cron.updateStats", tasks.TaskUpdateStats, err)
}
}
func (c *Cron) vacuumDatabase() {
c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.vacuumDatabase", tasks.TaskNameVacuum, err)
}
}
func (c *Cron) logError(prefix string, taskName string, err error) {
logrus.Error(
errors.Wrapf(
err,
"%s: Couldn't add the task '%s' to the queue",
prefix,
taskName,
),
)
}
func initializeQueue(cfg *Config) (queue.Queue, error) {
@ -128,12 +155,15 @@ func initializeQueue(cfg *Config) (queue.Queue, error) {
Redis: cfg.Redis,
})
if err != nil {
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task q")
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
}
tasks.RegisterTasks(&tasks.Config{
err = tasks.RegisterTasks(&tasks.Config{
DB: cfg.DB,
Queue: q,
})
if err != nil {
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
}
return q, nil
}

View File

@ -10,17 +10,17 @@ import (
"github.com/vmihailenco/taskq/v3/redisq"
)
type QueueName string
type Name string
const (
MainQueue QueueName = "main"
EnnoblementsQueue QueueName = "ennoblements"
MainQueue Name = "main"
EnnoblementsQueue Name = "ennoblements"
)
type Queue interface {
Start(ctx context.Context) error
Close() error
Add(name QueueName, msg *taskq.Message) error
Add(name Name, msg *taskq.Message) error
}
type queue struct {
@ -54,7 +54,7 @@ func (q *queue) init(cfg *Config) error {
return nil
}
func (q *queue) registerQueue(name QueueName, limit int) taskq.Queue {
func (q *queue) registerQueue(name Name, limit int) taskq.Queue {
return q.factory.RegisterQueue(&taskq.QueueOptions{
Name: string(name),
ReservationTimeout: time.Minute * 2,
@ -64,7 +64,7 @@ func (q *queue) registerQueue(name QueueName, limit int) taskq.Queue {
})
}
func (q *queue) getQueueByName(name QueueName) taskq.Queue {
func (q *queue) getQueueByName(name Name) taskq.Queue {
switch name {
case MainQueue:
return q.mainQueue
@ -88,7 +88,7 @@ func (q *queue) Close() error {
return nil
}
func (q *queue) Add(name QueueName, msg *taskq.Message) error {
func (q *queue) Add(name Name, msg *taskq.Message) error {
queue := q.getQueueByName(name)
if queue == nil {
return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name)

View File

@ -25,14 +25,24 @@ func (t *taskUpdateEnnoblements) execute() error {
log.Errorln(err)
return err
}
log.WithField("numberOfServers", len(servers)).Info("Update of the ennoblements has started...")
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
for _, server := range servers {
s := server
t.queue.Add(
err := t.queue.Add(
queue.EnnoblementsQueue,
Get(TaskUpdateServerEnnoblements).
WithArgs(context.Background(), fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), s),
)
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerEnnoblements,
),
)
}
}
return nil
}

View File

@ -41,10 +41,19 @@ func (t *taskUpdateHistory) execute(timezone string) error {
}
entry.
WithField("numberOfServers", len(servers)).
Info("Update of the history has started")
Info("taskUpdateHistory.execute: Update of the history has started")
for _, server := range servers {
s := server
t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s))
err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s))
log.Warn(
errors.Wrapf(
err,
"taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerHistory,
),
)
}
return nil
}

View File

@ -20,7 +20,7 @@ func (t *taskUpdateServerData) execute(url string, server *models.Server) error
}
now := time.Now()
entry := log.WithField("key", server.Key)
entry.Infof("%s: updating data...", server.Key)
entry.Infof("taskUpdateServerData.execute: %s: Update of the server data has started...", server.Key)
err := (&workerUpdateServerData{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
dataloader: newDataloader(url),
@ -37,13 +37,13 @@ func (t *taskUpdateServerData) execute(url string, server *models.Server) error
"duration": duration.Nanoseconds(),
"durationPretty": duration.String(),
}).
Infof("%s has been updated", server.Key)
Infof("taskUpdateServerData.execute: %s: data has been updated", server.Key)
return nil
}
func (t *taskUpdateServerData) validatePayload(server *models.Server) error {
if server == nil {
return errors.Errorf("taskLoadServersAndUpdateData.validatePayload: Expected *models.Server, got nil")
return errors.Errorf("taskUpdateServerData.validatePayload: Expected *models.Server, got nil")
}
return nil
@ -59,7 +59,7 @@ func (w *workerUpdateServerData) loadPlayers(od map[int]*models.OpponentsDefeate
var ennoblements = []*models.Ennoblement{}
err := w.db.Model(&ennoblements).DistinctOn("new_owner_id").Order("new_owner_id ASC", "ennobled_at ASC").Select()
if err != nil {
return nil, errors.Wrap(err, "loadPlayers: couldn't load ennoblements")
return nil, errors.Wrap(err, "workerUpdateServerData.loadPlayers: couldn't load ennoblements")
}
players, err := w.dataloader.LoadPlayers()
@ -210,7 +210,11 @@ func (w *workerUpdateServerData) update() error {
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if len(tribes) > 0 {
ids := []int{}

View File

@ -23,7 +23,7 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *models.Server
return err
}
entry := log.WithField("key", server.Key)
entry.Infof("%s: update of the history has started...", server.Key)
entry.Infof("taskUpdateServerHistory.execute: %s: update of the history has started...", server.Key)
err = (&workerUpdateServerHistory{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
@ -34,7 +34,7 @@ func (t *taskUpdateServerHistory) execute(timezone string, server *models.Server
entry.Error(err)
return err
}
entry.Infof("%s: history has been updated", server.Key)
entry.Infof("taskUpdateServerHistory.execute: %s: history has been updated", server.Key)
return nil
}
@ -97,7 +97,11 @@ func (w *workerUpdateServerHistory) update() error {
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if len(ph) > 0 {
if _, err := w.db.Model(&ph).Insert(); err != nil {

View File

@ -23,7 +23,7 @@ func (t *taskUpdateServerStats) execute(timezone string, server *models.Server)
return err
}
entry := log.WithField("key", server.Key)
entry.Infof("%s: update of the stats has started...", server.Key)
entry.Infof("taskUpdateServerStats.execute: %s: update of the stats has started...", server.Key)
err = (&workerUpdateServerStats{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
@ -34,7 +34,7 @@ func (t *taskUpdateServerStats) execute(timezone string, server *models.Server)
entry.Error(err)
return err
}
entry.Infof("%s: stats have been updated", server.Key)
entry.Infof("taskUpdateServerStats.execute: %s: stats have been updated", server.Key)
return nil
}
@ -120,7 +120,11 @@ func (w *workerUpdateServerStats) update() error {
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if _, err := tx.Model(stats).Insert(); err != nil {
return errors.Wrap(err, "cannot insert server stats")

View File

@ -41,10 +41,18 @@ func (t *taskUpdateStats) execute(timezone string) error {
}
entry.
WithField("numberOfServers", len(servers)).
Info("Update of the stats has started")
Info("taskUpdateStats.execute: Update of the stats has started")
for _, server := range servers {
s := server
t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s))
err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s))
log.Warn(
errors.Wrapf(
err,
"taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerStats,
),
)
}
return nil
}

View File

@ -22,10 +22,20 @@ func (t *taskVacuum) execute() error {
log.Errorln(err)
return err
}
log.Infof("Start database vacuuming...")
log.Infof("taskVacuum.execute: Start database vacumming...")
for _, server := range servers {
s := server
t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s))
err := t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s))
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskVacuum.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerEnnoblements,
),
)
}
}
return nil
}

View File

@ -21,16 +21,17 @@ func (t *taskVacuumServerDB) execute(server *models.Server) error {
return nil
}
entry := log.WithField("key", server.Key)
entry.Infof("%s: vacumming the database...", server.Key)
entry.Infof("taskVacuumServerDB.execute: %s: vacumming the database...", server.Key)
err := (&workerVacuumServerDB{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}).vacuum()
if err != nil {
err = errors.Wrap(err, "taskVacuumServerDB.execute")
entry.Error(err)
return err
}
entry.Infof("%s: the database has been vacummed", server.Key)
entry.Infof("taskVacuumServerDB.execute: %s: the database has been vacummed", server.Key)
return nil
}
@ -44,7 +45,8 @@ func (t *taskVacuumServerDB) validatePayload(server *models.Server) error {
}
type workerVacuumServerDB struct {
db *pg.DB
db *pg.DB
server *models.Server
}
func (w *workerVacuumServerDB) vacuum() error {
@ -52,7 +54,11 @@ func (w *workerVacuumServerDB) vacuum() error {
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
withNonExistentPlayers := w.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'")
withNonExistentTribes := w.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'")

View File

@ -50,7 +50,11 @@ func prepareDB(db *pg.DB) error {
if err != nil {
return errors.Wrap(err, "Couldn't start a transaction")
}
defer tx.Close()
defer func() {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrap(err, "prepareDB: Couldn't rollback the transaction"))
}
}()
dbModels := []interface{}{
(*models.SpecialServer)(nil),
@ -140,7 +144,11 @@ func createSchema(db *pg.DB, server *models.Server, init bool) error {
if err != nil {
return errors.Wrap(err, "CreateSchema: couldn't start a transaction")
}
defer tx.Close()
defer func() {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction"))
}
}()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't create the schema")

35
main.go
View File

@ -21,12 +21,9 @@ import (
)
func init() {
os.Setenv("TZ", "UTC")
if mode.Get() == mode.DevelopmentMode {
godotenv.Load(".env.local")
if err := setupENVs(); err != nil {
logrus.Fatal(err)
}
setupLogger()
}
@ -35,13 +32,21 @@ func main() {
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis"))
}
defer redisClient.Close()
defer func() {
if err := redisClient.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection"))
}
}()
dbConn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db"))
}
defer dbConn.Close()
defer func() {
if err := dbConn.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the db connection"))
}
}()
logrus.Info("Connection with the database has been established")
c, err := twhelpcron.New(&twhelpcron.Config{
@ -94,6 +99,22 @@ func setupLogger() {
}
}
func setupENVs() error {
err := os.Setenv("TZ", "UTC")
if err != nil {
return errors.Wrap(err, "setupENVs")
}
if mode.Get() == mode.DevelopmentMode {
err := godotenv.Load(".env.local")
if err != nil {
return errors.Wrap(err, "setupENVs")
}
}
return nil
}
func initializeRedis() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: envutils.GetenvString("REDIS_ADDR"),