From dea3d4616a1f8e435ab2b0350715000b0d5d85dd Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 25 Apr 2021 17:02:58 +0200 Subject: [PATCH] huge refactor --- cron/cron.go | 26 +-- cron/handler.go | 259 +-------------------------- cron/tasks/task.go | 12 ++ cron/tasks/task_load_servers.go | 104 +++++++++++ cron/tasks/tasks.go | 59 ++++++ db/db.go | 189 ++++++++++++++++++++ db/sql_statements.go | 306 ++++++++++++++++++++++++++++++++ go.mod | 4 +- go.sum | 10 +- main.go | 98 +++++----- utils/env/env.go | 30 ++++ 11 files changed, 787 insertions(+), 310 deletions(-) create mode 100644 cron/tasks/task.go create mode 100644 cron/tasks/task_load_servers.go create mode 100644 cron/tasks/tasks.go create mode 100644 db/db.go create mode 100644 db/sql_statements.go create mode 100644 utils/env/env.go diff --git a/cron/cron.go b/cron/cron.go index d9f8023..38b5788 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -9,6 +9,8 @@ import ( "github.com/go-pg/pg/v10" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + + "github.com/tribalwarshelp/cron/cron/queue" ) var log = logrus.WithField("package", "cron") @@ -17,19 +19,23 @@ type Config struct { DB *pg.DB MaxConcurrentWorkers int RunOnStartup bool + Queue queue.Queue } func Attach(c *cron.Cron, cfg Config) error { if cfg.DB == nil { return fmt.Errorf("cfg.DB cannot be nil, expected *pg.DB") } + if cfg.Queue == nil { + return fmt.Errorf("cfg.Queue cannot be nil, expected queue.Queue") + } h := &handler{db: cfg.DB, maxConcurrentWorkers: cfg.MaxConcurrentWorkers} if err := h.init(); err != nil { return err } - versions := []*models.Version{} + var versions []*models.Version if err := cfg.DB.Model(&versions).DistinctOn("timezone").Select(); err != nil { return err } @@ -37,8 +43,8 @@ func Attach(c *cron.Cron, cfg Config) error { updateServerData := utils.TrackExecutionTime(log, h.updateServerData, "updateServerData") vacuumDatabase := utils.TrackExecutionTime(log, h.vacuumDatabase, "vacuumDatabase") updateServerEnnoblements := utils.TrackExecutionTime(log, h.updateServerEnnoblements, "updateServerEnnoblements") - updateHistoryFuncs := []func(){} - updateStatsFuncs := []func(){} + var updateHistoryFuncs []func() + var updateStatsFuncs []func() for _, version := range versions { updateHistory := utils.TrackExecutionTime(log, createFnWithTimezone(version.Timezone, h.updateHistory), @@ -69,13 +75,13 @@ func Attach(c *cron.Cron, cfg Config) error { if cfg.RunOnStartup { go func() { updateServerData() - vacuumDatabase() - for _, fn := range updateHistoryFuncs { - go fn() - } - for _, fn := range updateStatsFuncs { - go fn() - } + //vacuumDatabase() + //for _, fn := range updateHistoryFuncs { + // go fn() + //} + //for _, fn := range updateStatsFuncs { + // go fn() + //} }() } diff --git a/cron/handler.go b/cron/handler.go index 5376b66..d264201 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -1,32 +1,27 @@ package cron import ( + "context" "fmt" - "io/ioutil" - "net/http" "runtime" "sync" "time" - "github.com/sirupsen/logrus" "github.com/tribalwarshelp/shared/models" "github.com/tribalwarshelp/shared/tw/dataloader" - phpserialize "github.com/Kichiyaki/go-php-serialize" - "github.com/go-pg/pg/v10" - "github.com/go-pg/pg/v10/orm" "github.com/pkg/errors" -) -const ( - endpointGetServers = "/backend/get_servers.php" + "github.com/tribalwarshelp/cron/cron/queue" + "github.com/tribalwarshelp/cron/cron/tasks" ) type handler struct { db *pg.DB maxConcurrentWorkers int pool *pool + queue queue.Queue } func (h *handler) init() error { @@ -38,255 +33,11 @@ func (h *handler) init() error { h.pool = newPool(h.maxConcurrentWorkers) } - tx, err := h.db.Begin() - if err != nil { - return err - } - defer tx.Close() - - dbModels := []interface{}{ - (*models.SpecialServer)(nil), - (*models.Server)(nil), - (*models.Version)(nil), - (*models.PlayerToServer)(nil), - (*models.PlayerNameChange)(nil), - } - - for _, model := range dbModels { - err := tx.Model(model).CreateTable(&orm.CreateTableOptions{ - IfNotExists: true, - }) - if err != nil { - return err - } - } - - type statementWithParams struct { - statement string - params []interface{} - } - - for _, s := range []statementWithParams{ - { - statement: pgDefaultValues, - }, - { - statement: allVersionsPGInsertStatements, - }, - { - statement: allSpecialServersPGInsertStatements, - }, - { - statement: pgDropSchemaFunctions, - params: []interface{}{pg.Safe("public")}, - }, - { - statement: pgFunctions, - }, - } { - if _, err := tx.Exec(s.statement, s.params...); err != nil { - return err - } - } - - if err := tx.Commit(); err != nil { - return err - } - - servers := []*models.Server{} - if err := h.db.Model(&servers).Select(); err != nil { - return err - } - - for _, server := range servers { - if err := h.createSchema(server, true); err != nil { - return err - } - } - return nil } -func (h *handler) createSchema(server *models.Server, init bool) error { - if !init { - exists, err := h.db.Model().Table("information_schema.schemata").Where("schema_name = ?", server.Key).Exists() - if err != nil { - return err - } - - if exists { - return nil - } - } - - tx, err := h.db.WithParam("SERVER", pg.Safe(server.Key)).Begin() - if err != nil { - return err - } - defer tx.Close() - - if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil { - return err - } - - dbModels := []interface{}{ - (*models.Tribe)(nil), - (*models.Player)(nil), - (*models.Village)(nil), - (*models.Ennoblement)(nil), - (*models.ServerStats)(nil), - (*models.TribeHistory)(nil), - (*models.PlayerHistory)(nil), - (*models.TribeChange)(nil), - (*models.DailyPlayerStats)(nil), - (*models.DailyTribeStats)(nil), - } - - for _, model := range dbModels { - err := tx.Model(model).CreateTable(&orm.CreateTableOptions{ - IfNotExists: true, - }) - if err != nil { - return err - } - } - - statements := []string{ - serverPGFunctions, - serverPGTriggers, - serverPGDefaultValues, - } - if init { - statements = append([]string{pgDropSchemaFunctions}, statements...) - } - for _, statement := range statements { - if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil { - return err - } - } - - return tx.Commit() -} - -func (h *handler) getServers() ([]*models.Server, map[string]string, error) { - versions := []*models.Version{} - if err := h.db.Model(&versions).Relation("SpecialServers").Order("code ASC").Select(); err != nil { - return nil, nil, errors.Wrap(err, "getServers") - } - - serverKeys := []string{} - servers := []*models.Server{} - urls := make(map[string]string) - loadedVersions := []models.VersionCode{} - for _, version := range versions { - log := log.WithField("host", version.Host) - log.Infof("Loading servers from %s", version.Host) - resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers)) - if err != nil { - log.Errorln(errors.Wrapf(err, "fetching servers from %s", version.Host)) - continue - } - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Errorln(errors.Wrapf(err, "reading response body from %s", version.Host)) - continue - } - body, err := phpserialize.Decode(string(bodyBytes)) - if err != nil { - log.Errorln(errors.Wrapf(err, "serializing body from %s into go value", version.Host)) - continue - } - for serverKey, url := range body.(map[interface{}]interface{}) { - serverKeyStr := serverKey.(string) - if version.SpecialServers.Contains(serverKeyStr) { - continue - } - server := &models.Server{ - Key: serverKeyStr, - Status: models.ServerStatusOpen, - VersionCode: version.Code, - Version: version, - } - if err := h.createSchema(server, false); err != nil { - log.WithField("serverKey", serverKey).Errorln(errors.Wrapf(err, "cannot create schema for %s", serverKey)) - continue - } - serverKeys = append(serverKeys, serverKeyStr) - urls[serverKeyStr] = url.(string) - servers = append(servers, server) - } - loadedVersions = append(loadedVersions, version.Code) - } - - if len(servers) > 0 { - if _, err := h.db.Model(&servers). - OnConflict("(key) DO UPDATE"). - Set("status = ?", models.ServerStatusOpen). - Set("version_code = EXCLUDED.version_code"). - Returning("*"). - Insert(); err != nil { - return nil, nil, err - } - } - - if _, err := h.db.Model(&models.Server{}). - Set("status = ?", models.ServerStatusClosed). - Where("key NOT IN (?) AND version_code IN (?)", pg.In(serverKeys), pg.In(loadedVersions)). - Update(); err != nil { - return nil, nil, err - } - - return servers, urls, nil -} - func (h *handler) updateServerData() { - servers, urls, err := h.getServers() - if err != nil { - log.Errorln("updateServerData:", err.Error()) - return - } - log. - WithField("numberOfServers", len(servers)). - Info("updateServerData: servers loaded") - - var wg sync.WaitGroup - - for _, server := range servers { - log := log.WithField("serverKey", server.Key) - url, ok := urls[server.Key] - if !ok { - log.Warnf("No one URL associated with key: %s, skipping...", server.Key) - continue - } - h.pool.waitForWorker() - wg.Add(1) - sh := &updateServerDataWorker{ - db: h.db.WithParam("SERVER", pg.Safe(server.Key)), - server: server, - dataloader: dataloader.New(&dataloader.Config{ - BaseURL: url, - Client: &http.Client{ - Timeout: 10 * time.Second, - }, - }), - } - go func(worker *updateServerDataWorker, server *models.Server, url string, log *logrus.Entry) { - defer func() { - h.pool.releaseWorker() - wg.Done() - }() - log.Infof("updateServerData: %s: updating data", server.Key) - err := sh.update() - if err != nil { - log.Errorln("updateServerData:", errors.Wrap(err, server.Key)) - return - } - log.Infof("updateServerData: %s: data updated", server.Key) - }(sh, server, url, log) - } - - wg.Wait() + h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersions).WithArgs(context.Background())) } func (h *handler) updateServerEnnoblements() { diff --git a/cron/tasks/task.go b/cron/tasks/task.go new file mode 100644 index 0000000..03e6d84 --- /dev/null +++ b/cron/tasks/task.go @@ -0,0 +1,12 @@ +package tasks + +import ( + "github.com/go-pg/pg/v10" + + "github.com/tribalwarshelp/cron/cron/queue" +) + +type task struct { + db *pg.DB + queue queue.Queue +} diff --git a/cron/tasks/task_load_servers.go b/cron/tasks/task_load_servers.go new file mode 100644 index 0000000..8190f7f --- /dev/null +++ b/cron/tasks/task_load_servers.go @@ -0,0 +1,104 @@ +package tasks + +import ( + "fmt" + phpserialize "github.com/Kichiyaki/go-php-serialize" + "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/tribalwarshelp/shared/models" + "io/ioutil" + "net/http" + + "github.com/tribalwarshelp/cron/db" +) + +const ( + endpointGetServers = "/backend/get_servers.php" +) + +type taskLoadServers struct { + *task +} + +func (t *taskLoadServers) execute(version *models.Version) error { + if err := t.validatePayload(version); err != nil { + return nil + } + entry := log.WithField("host", version.Host) + entry.Infof("%s: Loading servers", version.Host) + data, err := t.getServers(version) + if err != nil { + log.Errorln(err) + return err + } + + var serverKeys []string + var servers []*models.Server + for serverKey := range data { + if version.SpecialServers.Contains(serverKey) { + continue + } + server := &models.Server{ + Key: serverKey, + Status: models.ServerStatusOpen, + VersionCode: version.Code, + Version: version, + } + if err := db.CreateSchema(t.db, server); err != nil { + logrus.Warn(errors.Wrapf(err, "%s: couldn't create the schema", server.Key)) + continue + } + servers = append(servers, server) + serverKeys = append(serverKeys, serverKey) + } + + if len(servers) > 0 { + if _, err := t.db.Model(&servers). + OnConflict("(key) DO UPDATE"). + Set("status = ?", models.ServerStatusOpen). + Set("version_code = EXCLUDED.version_code"). + Returning("*"). + Insert(); err != nil { + err = errors.Wrap(err, "couldn't insert/update servers") + logrus.Fatal(err) + return err + } + } + + if _, err := t.db.Model(&models.Server{}). + Set("status = ?", models.ServerStatusClosed). + Where("key NOT IN (?) AND version_code =", pg.In(serverKeys), version.Code). + Update(); err != nil { + err = errors.Wrap(err, "couldn't update server statuses") + logrus.Fatal(err) + return err + } + + entry.Infof("%s: Servers have been loaded", version.Host) + return nil +} + +func (t *taskLoadServers) validatePayload(version *models.Version) error { + if version == nil { + return errors.Errorf("taskLoadServers.validatePayload: Expected *models.Version, got nil") + } + return nil +} + +func (t *taskLoadServers) getServers(version *models.Version) (map[string]string, error) { + resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers)) + if err != nil { + return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't load servers", version.Host) + } + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't read the body", version.Host) + } + body, err := phpserialize.Decode(string(bodyBytes)) + if err != nil { + return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't decode the body into the go value", version.Host) + } + return body.(map[string]string), nil +} diff --git a/cron/tasks/tasks.go b/cron/tasks/tasks.go new file mode 100644 index 0000000..10bda4e --- /dev/null +++ b/cron/tasks/tasks.go @@ -0,0 +1,59 @@ +package tasks + +import ( + "github.com/go-pg/pg/v10" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/vmihailenco/taskq/v3" + + "github.com/tribalwarshelp/cron/cron/queue" +) + +const ( + TaskNameLoadVersions = "loadVersions" + TaskNameLoadServers = "loadServers" + TaskNameUpdateServerData = "updateServerData" + retryLimitLoadServers = 3 +) + +var log = logrus.WithField("package", "tasks") + +type Config struct { + DB *pg.DB + Queue queue.Queue +} + +func RegisterTasks(cfg *Config) error { + if err := validateConfig(cfg); err != nil { + return errors.Wrap(err, "RegisterTasks") + } + + t := &task{ + db: cfg.DB, + queue: cfg.Queue, + } + taskq.RegisterTask(&taskq.TaskOptions{ + Name: TaskNameLoadServers, + RetryLimit: retryLimitLoadServers, + Handler: (&taskLoadServers{t}).execute, + }) + + return nil +} + +func Get(taskName string) *taskq.Task { + return taskq.Tasks.Get(taskName) +} + +func validateConfig(cfg *Config) error { + if cfg == nil { + return errors.New("Config hasn't been provided") + } + if cfg.DB == nil { + return errors.New("cfg.DB is required") + } + if cfg.Queue == nil { + return errors.New("cfg.Queue is required") + } + return nil +} diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000..d946e48 --- /dev/null +++ b/db/db.go @@ -0,0 +1,189 @@ +package db + +import ( + "fmt" + gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10" + "github.com/go-pg/pg/v10" + "github.com/go-pg/pg/v10/orm" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/tribalwarshelp/shared/models" + + envutils "github.com/tribalwarshelp/cron/utils/env" +) + +var log = logrus.WithField("package", "db") + +type Config struct { + LogQueries bool +} + +func New(cfg *Config) (*pg.DB, error) { + db := pg.Connect(prepareOptions()) + + if cfg != nil && cfg.LogQueries { + db.AddQueryHook(gopglogrusquerylogger.QueryLogger{ + Entry: log, + MaxQueryLength: 5000, + }) + } + + if err := prepareDB(db); err != nil { + return nil, errors.Wrap(err, "New") + } + + return db, nil +} + +func prepareOptions() *pg.Options { + return &pg.Options{ + User: envutils.GetenvString("DB_USER"), + Password: envutils.GetenvString("DB_PASSWORD"), + Database: envutils.GetenvString("DB_NAME"), + Addr: envutils.GetenvString("DB_HOST") + ":" + envutils.GetenvString("DB_PORT"), + PoolSize: envutils.GetenvInt("DB_POOL_SIZE"), + } +} + +func prepareDB(db *pg.DB) error { + tx, err := db.Begin() + if err != nil { + return errors.Wrap(err, "Couldn't start a transaction") + } + defer tx.Close() + + dbModels := []interface{}{ + (*models.SpecialServer)(nil), + (*models.Server)(nil), + (*models.Version)(nil), + (*models.PlayerToServer)(nil), + (*models.PlayerNameChange)(nil), + } + + for _, model := range dbModels { + err := tx.Model(model).CreateTable(&orm.CreateTableOptions{ + IfNotExists: true, + }) + if err != nil { + return errors.Wrap(err, "Couldn't create the table") + } + } + + type statementWithParams struct { + statement string + params []interface{} + } + + for _, s := range []statementWithParams{ + { + statement: pgDefaultValues, + }, + { + statement: allVersionsPGInsertStatements, + }, + { + statement: allSpecialServersPGInsertStatements, + }, + { + statement: pgDropSchemaFunctions, + params: []interface{}{pg.Safe("public")}, + }, + { + statement: pgFunctions, + }, + } { + if _, err := tx.Exec(s.statement, s.params...); err != nil { + return errors.Wrap(err, "Couldn't initialize the db") + } + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "Couldn't commit changes") + } + + var servers []*models.Server + if err := db.Model(&servers).Select(); err != nil { + return errors.Wrap(err, "Couldn't load servers") + } + + for _, server := range servers { + if err := createSchema(db, server, true); err != nil { + return err + } + } + + return nil +} + +func CreateSchema(db *pg.DB, server *models.Server) error { + return createSchema(db, server, false) +} + +func SchemaExists(db *pg.DB, schemaName string) bool { + exists, err := db. + Model(). + Table("information_schema.schemata"). + Where("schema_name = ?", schemaName). + Exists() + if err != nil { + return false + } + return exists +} + +func createSchema(db *pg.DB, server *models.Server, init bool) error { + if !init && SchemaExists(db, server.Key) { + return nil + } + + tx, err := db.WithParam("SERVER", pg.Safe(server.Key)).Begin() + if err != nil { + return errors.Wrap(err, "CreateSchema: couldn't start a transaction") + } + defer tx.Close() + + if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil { + return errors.Wrap(err, "CreateSchema: couldn't create the schema") + } + + dbModels := []interface{}{ + (*models.Tribe)(nil), + (*models.Player)(nil), + (*models.Village)(nil), + (*models.Ennoblement)(nil), + (*models.ServerStats)(nil), + (*models.TribeHistory)(nil), + (*models.PlayerHistory)(nil), + (*models.TribeChange)(nil), + (*models.DailyPlayerStats)(nil), + (*models.DailyTribeStats)(nil), + } + + for _, model := range dbModels { + err := tx.Model(model).CreateTable(&orm.CreateTableOptions{ + IfNotExists: true, + }) + if err != nil { + return err + } + } + + statements := []string{ + serverPGFunctions, + serverPGTriggers, + serverPGDefaultValues, + } + if init { + statements = append([]string{pgDropSchemaFunctions}, statements...) + } + for _, statement := range statements { + if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil { + return errors.Wrap(err, "CreateSchema: couldn't initialize the schema") + } + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "CreateSchema: couldn't commit changes") + } + return nil +} diff --git a/db/sql_statements.go b/db/sql_statements.go new file mode 100644 index 0000000..583d57e --- /dev/null +++ b/db/sql_statements.go @@ -0,0 +1,306 @@ +package db + +const ( + allSpecialServersPGInsertStatements = ` + INSERT INTO public.special_servers (version_code, key) VALUES ('pl', 'pls1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('en', 'ens1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('uk', 'uks1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('uk', 'master') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('it', 'its1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('hu', 'hus1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('fr', 'frs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('us', 'uss1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('nl', 'nls1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('es', 'ess1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('ro', 'ros1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('gr', 'grs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('br', 'brs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('tr', 'trs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('cs', 'css1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('de', 'des1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('ru', 'rus1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('ch', 'chs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('pt', 'pts1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + INSERT INTO public.special_servers (version_code, key) VALUES ('sk', 'sks1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING; + ` + allVersionsPGInsertStatements = ` + INSERT INTO public.versions (code, name, host, timezone) VALUES ('pl', 'Polska', 'plemiona.pl', 'Europe/Warsaw') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('uk', 'United Kingdom', 'tribalwars.co.uk', 'Europe/London') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('hu', 'Hungary', 'klanhaboru.hu', 'Europe/Budapest') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('it', 'Italy', 'tribals.it', 'Europe/Rome') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('fr', 'France', 'guerretribale.fr', 'Europe/Paris') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('us', 'United States', 'tribalwars.us', 'America/New_York') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('nl', 'The Netherlands', 'tribalwars.nl', 'Europe/Amsterdam') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('es', 'Spain', 'guerrastribales.es', 'Europe/Madrid') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('ro', 'Romania', 'triburile.ro', 'Europe/Bucharest') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('gr', 'Greece', 'fyletikesmaxes.gr', 'Europe/Athens') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('br', 'Brazil', 'tribalwars.com.br', 'America/Sao_Paulo') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('tr', 'Turkey', 'klanlar.org', 'Europe/Istanbul') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('cs', 'Czech Republic', 'divokekmeny.cz', 'Europe/Prague') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('ru', 'Russia', 'voyna-plemyon.ru', 'Europe/Moscow') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('ch', 'Switerzland', 'staemme.ch', 'Europe/Zurich') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('pt', 'Portugal', 'tribalwars.com.pt', 'Europe/Lisbon') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('en', 'International', 'tribalwars.net', 'Europe/London') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('de', 'Germany', 'die-staemme.de', 'Europe/Berlin') ON CONFLICT (code) DO NOTHING; + INSERT INTO public.versions (code, name, host, timezone) VALUES ('sk', 'Slovakia', 'divoke-kmene.sk', 'Europe/Bratislava') ON CONFLICT (code) DO NOTHING; + ` + + pgDropSchemaFunctions = ` + DO + $do$ + DECLARE + funcTableRecord RECORD; + BEGIN + FOR funcTableRecord IN SELECT routine_schema, routine_name from information_schema.routines where routine_type = 'FUNCTION' AND specific_schema = '?0' LOOP + EXECUTE 'DROP FUNCTION IF EXISTS ' || funcTableRecord.routine_schema || '.' || funcTableRecord.routine_name || ' CASCADE;'; + END LOOP; + END + $do$; + ` + + pgFunctions = ` + CREATE OR REPLACE FUNCTION check_daily_growth() + RETURNS trigger AS + $BODY$ + BEGIN + IF NEW.exists = false THEN + NEW.daily_growth = 0; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql; + + CREATE OR REPLACE FUNCTION check_existence() + RETURNS trigger AS + $BODY$ + BEGIN + IF NEW.exists = false AND OLD.exists = true THEN + NEW.deleted_at = now(); + END IF; + IF NEW.exists = true THEN + NEW.deleted_at = null; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql; + + CREATE OR REPLACE FUNCTION check_dominance() + RETURNS trigger AS + $BODY$ + BEGIN + IF NEW.exists = false THEN + NEW.dominance = 0; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql; + + CREATE OR REPLACE FUNCTION update_most_points_most_villages_best_rank_last_activity() + RETURNS trigger AS + $BODY$ + BEGIN + IF TG_OP = 'INSERT' THEN + IF NEW.most_points IS null OR NEW.points > NEW.most_points THEN + NEW.most_points = NEW.points; + NEW.most_points_at = now(); + END IF; + IF NEW.most_villages IS null OR NEW.total_villages > NEW.most_villages THEN + NEW.most_villages = NEW.total_villages; + NEW.most_villages_at = now(); + END IF; + IF NEW.best_rank IS null OR NEW.rank < NEW.best_rank OR NEW.best_rank = 0 THEN + NEW.best_rank = NEW.rank; + NEW.best_rank_at = now(); + END IF; + END IF; + + IF TG_OP = 'UPDATE' THEN + IF NEW.most_points IS null OR NEW.points > OLD.most_points THEN + NEW.most_points = NEW.points; + NEW.most_points_at = now(); + END IF; + IF NEW.most_villages IS null OR NEW.total_villages > OLD.most_villages THEN + NEW.most_villages = NEW.total_villages; + NEW.most_villages_at = now(); + END IF; + IF NEW.best_rank IS null OR NEW.rank < OLD.best_rank OR OLD.best_rank = 0 THEN + NEW.best_rank = NEW.rank; + NEW.best_rank_at = now(); + END IF; + if TG_TABLE_NAME = 'players' THEN + IF NEW.points > OLD.points OR NEW.score_att > OLD.score_att THEN + NEW.last_activity_at = now(); + END IF; + END IF; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql; + ` + + serverPGFunctions = ` + CREATE OR REPLACE FUNCTION ?0.log_tribe_change() + RETURNS trigger AS + $BODY$ + BEGIN + IF TG_OP = 'INSERT' THEN + IF NEW.tribe_id <> 0 THEN + INSERT INTO ?0.tribe_changes(player_id,old_tribe_id,new_tribe_id,created_at) + VALUES(NEW.id,0,NEW.tribe_id,now()); + END IF; + END IF; + + IF TG_OP = 'UPDATE' THEN + IF NEW.tribe_id <> OLD.tribe_id THEN + INSERT INTO ?0.tribe_changes(player_id,old_tribe_id,new_tribe_id,created_at) + VALUES(OLD.id,OLD.tribe_id,NEW.tribe_id,now()); + END IF; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql VOLATILE; + + CREATE OR REPLACE FUNCTION ?0.log_player_name_change() + RETURNS trigger AS + $BODY$ + BEGIN + IF NEW.name <> OLD.name AND old.exists = true THEN + INSERT INTO player_name_changes(version_code,player_id,old_name,new_name,change_date) + VALUES(?1,NEW.id,OLD.name,NEW.name,CURRENT_DATE) + ON CONFLICT DO NOTHING; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql VOLATILE; + + CREATE OR REPLACE FUNCTION ?0.get_old_and_new_owner_tribe_id() + RETURNS trigger AS + $BODY$ + BEGIN + IF NEW.old_owner_id <> 0 THEN + SELECT tribe_id INTO NEW.old_owner_tribe_id + FROM ?0.players + WHERE id = NEW.old_owner_id; + END IF; + IF NEW.old_owner_tribe_id IS NULL THEN + NEW.old_owner_tribe_id = 0; + END IF; + IF NEW.new_owner_id <> 0 THEN + SELECT tribe_id INTO NEW.new_owner_tribe_id + FROM ?0.players + WHERE id = NEW.new_owner_id; + END IF; + IF NEW.new_owner_tribe_id IS NULL THEN + NEW.new_owner_tribe_id = 0; + END IF; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql VOLATILE; + + CREATE OR REPLACE FUNCTION ?0.insert_into_player_to_servers() + RETURNS trigger AS + $BODY$ + BEGIN + INSERT INTO player_to_servers(server_key,player_id) + VALUES('?0', NEW.id) + ON CONFLICT DO NOTHING; + + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql; + ` + serverPGTriggers = ` + CREATE TRIGGER ?0_log_tribe_change_on_insert + AFTER INSERT + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE ?0.log_tribe_change(); + + CREATE TRIGGER ?0_log_tribe_change_on_update + AFTER UPDATE + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE ?0.log_tribe_change(); + + CREATE TRIGGER ?0_name_change + AFTER UPDATE + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE ?0.log_player_name_change(); + + CREATE TRIGGER ?0_check_daily_growth + BEFORE UPDATE + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE check_daily_growth(); + + CREATE TRIGGER ?0_check_player_existence + BEFORE UPDATE + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE check_existence(); + + CREATE TRIGGER ?0_check_tribe_existence + BEFORE UPDATE + ON ?0.tribes + FOR EACH ROW + EXECUTE PROCEDURE check_existence(); + + CREATE TRIGGER ?0_check_dominance + BEFORE UPDATE + ON ?0.tribes + FOR EACH ROW + EXECUTE PROCEDURE check_dominance(); + + CREATE TRIGGER ?0_update_ennoblement_old_and_new_owner_tribe_id + BEFORE INSERT + ON ?0.ennoblements + FOR EACH ROW + EXECUTE PROCEDURE ?0.get_old_and_new_owner_tribe_id(); + + CREATE TRIGGER ?0_insert_into_player_to_servers + AFTER INSERT + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE ?0.insert_into_player_to_servers(); + + CREATE TRIGGER ?0_update_most_points_most_villages_best_rank_last_activity + BEFORE INSERT OR UPDATE + ON ?0.players + FOR EACH ROW + EXECUTE PROCEDURE update_most_points_most_villages_best_rank_last_activity(); + + CREATE TRIGGER ?0_update_most_points_most_villages_best_rank_last_activity + BEFORE INSERT OR UPDATE + ON ?0.tribes + FOR EACH ROW + EXECUTE PROCEDURE update_most_points_most_villages_best_rank_last_activity(); + ` + + serverPGDefaultValues = ` + ALTER TABLE ?0.daily_player_stats ALTER COLUMN create_date set default CURRENT_DATE; + ALTER TABLE ?0.daily_tribe_stats ALTER COLUMN create_date set default CURRENT_DATE; + ALTER TABLE ?0.player_history ALTER COLUMN create_date set default CURRENT_DATE; + ALTER TABLE ?0.tribe_history ALTER COLUMN create_date set default CURRENT_DATE; + ALTER TABLE ?0.stats ALTER COLUMN create_date set default CURRENT_DATE; + ` + + pgDefaultValues = ` + ALTER TABLE player_name_changes ALTER COLUMN change_date set default CURRENT_DATE; + ` +) diff --git a/go.mod b/go.mod index d1a3015..ef15513 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7 github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83 github.com/go-pg/pg/v10 v10.9.1 - github.com/go-redis/redis/v8 v8.8.2 // indirect + github.com/go-redis/redis/v8 v8.8.2 github.com/go-redis/redis_rate/v9 v9.1.1 // indirect github.com/joho/godotenv v1.3.0 github.com/klauspost/compress v1.12.1 // indirect @@ -14,6 +14,6 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.8.1 github.com/tribalwarshelp/shared v0.0.0-20210423190057-03d8445d35dc - github.com/vmihailenco/taskq/v3 v3.2.3 // indirect + github.com/vmihailenco/taskq/v3 v3.2.3 golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect ) diff --git a/go.sum b/go.sum index 6de7c76..65be078 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,7 @@ github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa0 github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7/go.mod h1:ADHVWnGlWcRn1aGthuh7I1Lrn6zzsjkVJju151dXyDw= github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83 h1:Oa8Bk4LNcknxw50gZOlvPwEreOlAbOnu7V82lUYNbOM= github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83/go.mod h1:+iGkf5HfOVeRVd9K7qQDucIl+/Kt3MyenMa90b/O/c4= +github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc= github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/bsm/redislock v0.7.0 h1:RL7aZJhCKkuBjQbnSTKCeedTRifBWxd/ffP+GZ599Mo= github.com/bsm/redislock v0.7.0/go.mod h1:3Kgu+cXw0JrkZ5pmY/JbcFpixGZ5M9v9G2PGWYqku+k= @@ -18,6 +19,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= @@ -63,14 +65,17 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac h1:w5wltlINIIqRTqQ64dASrCo0fM7k9nosPbKCZnkL0W0= github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac/go.mod h1:gyMTRVO+ZkEy7wQDyD++okPsBN2q127EpuShhHMWG54= github.com/jeffh/go.bdd v0.0.0-20120717032931-88f798ee0c74/go.mod h1:qNa9FlAfO0U/qNkzYBMH1JKYRMzC+sP9IcyV4U18l98= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= @@ -89,14 +94,14 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -221,7 +226,6 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/main.go b/main.go index ae6ca10..4d35874 100644 --- a/main.go +++ b/main.go @@ -1,19 +1,23 @@ package main import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/pkg/errors" "os" "os/signal" - "strconv" - "strings" "syscall" + "time" "github.com/sirupsen/logrus" "github.com/tribalwarshelp/shared/mode" - _cron "github.com/tribalwarshelp/cron/cron" + twhelpcron "github.com/tribalwarshelp/cron/cron" + "github.com/tribalwarshelp/cron/cron/queue" + "github.com/tribalwarshelp/cron/cron/tasks" + "github.com/tribalwarshelp/cron/db" + envutils "github.com/tribalwarshelp/cron/utils/env" - gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10" - "github.com/go-pg/pg/v10" "github.com/joho/godotenv" "github.com/robfig/cron/v3" ) @@ -29,38 +33,44 @@ func init() { } func main() { - dbOptions := &pg.Options{ - User: os.Getenv("DB_USER"), - Password: os.Getenv("DB_PASSWORD"), - Database: os.Getenv("DB_NAME"), - Addr: os.Getenv("DB_HOST") + ":" + os.Getenv("DB_PORT"), - PoolSize: mustParseEnvToInt("DB_POOL_SIZE"), + client, err := initializeRedis() + if err != nil { + logrus.Fatal(errors.Wrap(err, "Error establishing a redis connection")) } - dbFields := logrus.Fields{ - "user": dbOptions.User, - "database": dbOptions.Database, - "addr": dbOptions.Addr, + defer client.Close() + + conn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")}) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Error establishing a database connection")) } - db := pg.Connect(dbOptions) - defer func() { - if err := db.Close(); err != nil { - logrus.WithFields(dbFields).Fatalln(err) - } - }() - if strings.ToUpper(os.Getenv("LOG_DB_QUERIES")) == "TRUE" { - db.AddQueryHook(gopglogrusquerylogger.QueryLogger{ - Entry: logrus.NewEntry(logrus.StandardLogger()), - }) + defer conn.Close() + logrus.Info("Connection with the database has been established") + + queue, err := queue.New(&queue.Config{ + WorkerLimit: envutils.GetenvInt("MAX_CONCURRENT_WORKERS"), + Redis: client, + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't create the task queue")) + } + tasks.RegisterTasks(&tasks.Config{ + DB: conn, + Queue: queue, + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := queue.Start(ctx); err != nil { + logrus.Fatal(err) } - logrus.WithFields(dbFields).Info("Connection with the database has been established") c := cron.New(cron.WithChain( cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))), )) - if err := _cron.Attach(c, _cron.Config{ - DB: db, - MaxConcurrentWorkers: mustParseEnvToInt("MAX_CONCURRENT_WORKERS"), - RunOnStartup: os.Getenv("RUN_ON_STARTUP") == "true", + if err := twhelpcron.Attach(c, twhelpcron.Config{ + DB: conn, + MaxConcurrentWorkers: envutils.GetenvInt("MAX_CONCURRENT_WORKERS"), + RunOnStartup: envutils.GetenvBool("RUN_ON_STARTUP"), + Queue: queue, }); err != nil { logrus.Fatal(err) } @@ -74,18 +84,9 @@ func main() { <-channel logrus.Info("shutting down") -} - -func mustParseEnvToInt(key string) int { - str := os.Getenv(key) - if str == "" { - return 0 + if err := queue.Close(); err != nil { + logrus.Fatal(err) } - i, err := strconv.Atoi(str) - if err != nil { - return 0 - } - return i } func setupLogger() { @@ -105,3 +106,18 @@ func setupLogger() { logrus.SetFormatter(customFormatter) } } + +func initializeRedis() (redis.UniversalClient, error) { + client := redis.NewClient(&redis.Options{ + Addr: envutils.GetenvString("REDIS_ADDR"), + Username: envutils.GetenvString("REDIS_USERNAME"), + Password: envutils.GetenvString("REDIS_PASSWORD"), + DB: envutils.GetenvInt("REDIS_DB"), + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := client.Ping(ctx).Err(); err != nil { + return nil, errors.Wrap(err, "initializeRedis") + } + return client, nil +} diff --git a/utils/env/env.go b/utils/env/env.go new file mode 100644 index 0000000..626395c --- /dev/null +++ b/utils/env/env.go @@ -0,0 +1,30 @@ +package envutils + +import ( + "os" + "strconv" +) + +func GetenvInt(key string) int { + str := GetenvString(key) + if str == "" { + return 0 + } + i, err := strconv.Atoi(str) + if err != nil { + return 0 + } + return i +} + +func GetenvBool(key string) bool { + str := GetenvString(key) + if str == "" { + return false + } + return str == "true" || str == "1" +} + +func GetenvString(key string) string { + return os.Getenv(key) +}