huge refactor

This commit is contained in:
Dawid Wysokiński 2021-04-25 17:02:58 +02:00
parent 4e051b0a5f
commit dea3d4616a
11 changed files with 787 additions and 310 deletions

View File

@ -9,6 +9,8 @@ import (
"github.com/go-pg/pg/v10"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/cron/cron/queue"
)
var log = logrus.WithField("package", "cron")
@ -17,19 +19,23 @@ type Config struct {
DB *pg.DB
MaxConcurrentWorkers int
RunOnStartup bool
Queue queue.Queue
}
func Attach(c *cron.Cron, cfg Config) error {
if cfg.DB == nil {
return fmt.Errorf("cfg.DB cannot be nil, expected *pg.DB")
}
if cfg.Queue == nil {
return fmt.Errorf("cfg.Queue cannot be nil, expected queue.Queue")
}
h := &handler{db: cfg.DB, maxConcurrentWorkers: cfg.MaxConcurrentWorkers}
if err := h.init(); err != nil {
return err
}
versions := []*models.Version{}
var versions []*models.Version
if err := cfg.DB.Model(&versions).DistinctOn("timezone").Select(); err != nil {
return err
}
@ -37,8 +43,8 @@ func Attach(c *cron.Cron, cfg Config) error {
updateServerData := utils.TrackExecutionTime(log, h.updateServerData, "updateServerData")
vacuumDatabase := utils.TrackExecutionTime(log, h.vacuumDatabase, "vacuumDatabase")
updateServerEnnoblements := utils.TrackExecutionTime(log, h.updateServerEnnoblements, "updateServerEnnoblements")
updateHistoryFuncs := []func(){}
updateStatsFuncs := []func(){}
var updateHistoryFuncs []func()
var updateStatsFuncs []func()
for _, version := range versions {
updateHistory := utils.TrackExecutionTime(log,
createFnWithTimezone(version.Timezone, h.updateHistory),
@ -69,13 +75,13 @@ func Attach(c *cron.Cron, cfg Config) error {
if cfg.RunOnStartup {
go func() {
updateServerData()
vacuumDatabase()
for _, fn := range updateHistoryFuncs {
go fn()
}
for _, fn := range updateStatsFuncs {
go fn()
}
//vacuumDatabase()
//for _, fn := range updateHistoryFuncs {
// go fn()
//}
//for _, fn := range updateStatsFuncs {
// go fn()
//}
}()
}

View File

@ -1,32 +1,27 @@
package cron
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"runtime"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
phpserialize "github.com/Kichiyaki/go-php-serialize"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
)
const (
endpointGetServers = "/backend/get_servers.php"
"github.com/tribalwarshelp/cron/cron/queue"
"github.com/tribalwarshelp/cron/cron/tasks"
)
type handler struct {
db *pg.DB
maxConcurrentWorkers int
pool *pool
queue queue.Queue
}
func (h *handler) init() error {
@ -38,255 +33,11 @@ func (h *handler) init() error {
h.pool = newPool(h.maxConcurrentWorkers)
}
tx, err := h.db.Begin()
if err != nil {
return err
}
defer tx.Close()
dbModels := []interface{}{
(*models.SpecialServer)(nil),
(*models.Server)(nil),
(*models.Version)(nil),
(*models.PlayerToServer)(nil),
(*models.PlayerNameChange)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
type statementWithParams struct {
statement string
params []interface{}
}
for _, s := range []statementWithParams{
{
statement: pgDefaultValues,
},
{
statement: allVersionsPGInsertStatements,
},
{
statement: allSpecialServersPGInsertStatements,
},
{
statement: pgDropSchemaFunctions,
params: []interface{}{pg.Safe("public")},
},
{
statement: pgFunctions,
},
} {
if _, err := tx.Exec(s.statement, s.params...); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
servers := []*models.Server{}
if err := h.db.Model(&servers).Select(); err != nil {
return err
}
for _, server := range servers {
if err := h.createSchema(server, true); err != nil {
return err
}
}
return nil
}
func (h *handler) createSchema(server *models.Server, init bool) error {
if !init {
exists, err := h.db.Model().Table("information_schema.schemata").Where("schema_name = ?", server.Key).Exists()
if err != nil {
return err
}
if exists {
return nil
}
}
tx, err := h.db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
if err != nil {
return err
}
defer tx.Close()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return err
}
dbModels := []interface{}{
(*models.Tribe)(nil),
(*models.Player)(nil),
(*models.Village)(nil),
(*models.Ennoblement)(nil),
(*models.ServerStats)(nil),
(*models.TribeHistory)(nil),
(*models.PlayerHistory)(nil),
(*models.TribeChange)(nil),
(*models.DailyPlayerStats)(nil),
(*models.DailyTribeStats)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
statements := []string{
serverPGFunctions,
serverPGTriggers,
serverPGDefaultValues,
}
if init {
statements = append([]string{pgDropSchemaFunctions}, statements...)
}
for _, statement := range statements {
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
return err
}
}
return tx.Commit()
}
func (h *handler) getServers() ([]*models.Server, map[string]string, error) {
versions := []*models.Version{}
if err := h.db.Model(&versions).Relation("SpecialServers").Order("code ASC").Select(); err != nil {
return nil, nil, errors.Wrap(err, "getServers")
}
serverKeys := []string{}
servers := []*models.Server{}
urls := make(map[string]string)
loadedVersions := []models.VersionCode{}
for _, version := range versions {
log := log.WithField("host", version.Host)
log.Infof("Loading servers from %s", version.Host)
resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers))
if err != nil {
log.Errorln(errors.Wrapf(err, "fetching servers from %s", version.Host))
continue
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorln(errors.Wrapf(err, "reading response body from %s", version.Host))
continue
}
body, err := phpserialize.Decode(string(bodyBytes))
if err != nil {
log.Errorln(errors.Wrapf(err, "serializing body from %s into go value", version.Host))
continue
}
for serverKey, url := range body.(map[interface{}]interface{}) {
serverKeyStr := serverKey.(string)
if version.SpecialServers.Contains(serverKeyStr) {
continue
}
server := &models.Server{
Key: serverKeyStr,
Status: models.ServerStatusOpen,
VersionCode: version.Code,
Version: version,
}
if err := h.createSchema(server, false); err != nil {
log.WithField("serverKey", serverKey).Errorln(errors.Wrapf(err, "cannot create schema for %s", serverKey))
continue
}
serverKeys = append(serverKeys, serverKeyStr)
urls[serverKeyStr] = url.(string)
servers = append(servers, server)
}
loadedVersions = append(loadedVersions, version.Code)
}
if len(servers) > 0 {
if _, err := h.db.Model(&servers).
OnConflict("(key) DO UPDATE").
Set("status = ?", models.ServerStatusOpen).
Set("version_code = EXCLUDED.version_code").
Returning("*").
Insert(); err != nil {
return nil, nil, err
}
}
if _, err := h.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND version_code IN (?)", pg.In(serverKeys), pg.In(loadedVersions)).
Update(); err != nil {
return nil, nil, err
}
return servers, urls, nil
}
func (h *handler) updateServerData() {
servers, urls, err := h.getServers()
if err != nil {
log.Errorln("updateServerData:", err.Error())
return
}
log.
WithField("numberOfServers", len(servers)).
Info("updateServerData: servers loaded")
var wg sync.WaitGroup
for _, server := range servers {
log := log.WithField("serverKey", server.Key)
url, ok := urls[server.Key]
if !ok {
log.Warnf("No one URL associated with key: %s, skipping...", server.Key)
continue
}
h.pool.waitForWorker()
wg.Add(1)
sh := &updateServerDataWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
dataloader: dataloader.New(&dataloader.Config{
BaseURL: url,
Client: &http.Client{
Timeout: 10 * time.Second,
},
}),
}
go func(worker *updateServerDataWorker, server *models.Server, url string, log *logrus.Entry) {
defer func() {
h.pool.releaseWorker()
wg.Done()
}()
log.Infof("updateServerData: %s: updating data", server.Key)
err := sh.update()
if err != nil {
log.Errorln("updateServerData:", errors.Wrap(err, server.Key))
return
}
log.Infof("updateServerData: %s: data updated", server.Key)
}(sh, server, url, log)
}
wg.Wait()
h.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersions).WithArgs(context.Background()))
}
func (h *handler) updateServerEnnoblements() {

12
cron/tasks/task.go Normal file
View File

@ -0,0 +1,12 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/tribalwarshelp/cron/cron/queue"
)
type task struct {
db *pg.DB
queue queue.Queue
}

View File

@ -0,0 +1,104 @@
package tasks
import (
"fmt"
phpserialize "github.com/Kichiyaki/go-php-serialize"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
"io/ioutil"
"net/http"
"github.com/tribalwarshelp/cron/db"
)
const (
endpointGetServers = "/backend/get_servers.php"
)
type taskLoadServers struct {
*task
}
func (t *taskLoadServers) execute(version *models.Version) error {
if err := t.validatePayload(version); err != nil {
return nil
}
entry := log.WithField("host", version.Host)
entry.Infof("%s: Loading servers", version.Host)
data, err := t.getServers(version)
if err != nil {
log.Errorln(err)
return err
}
var serverKeys []string
var servers []*models.Server
for serverKey := range data {
if version.SpecialServers.Contains(serverKey) {
continue
}
server := &models.Server{
Key: serverKey,
Status: models.ServerStatusOpen,
VersionCode: version.Code,
Version: version,
}
if err := db.CreateSchema(t.db, server); err != nil {
logrus.Warn(errors.Wrapf(err, "%s: couldn't create the schema", server.Key))
continue
}
servers = append(servers, server)
serverKeys = append(serverKeys, serverKey)
}
if len(servers) > 0 {
if _, err := t.db.Model(&servers).
OnConflict("(key) DO UPDATE").
Set("status = ?", models.ServerStatusOpen).
Set("version_code = EXCLUDED.version_code").
Returning("*").
Insert(); err != nil {
err = errors.Wrap(err, "couldn't insert/update servers")
logrus.Fatal(err)
return err
}
}
if _, err := t.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND version_code =", pg.In(serverKeys), version.Code).
Update(); err != nil {
err = errors.Wrap(err, "couldn't update server statuses")
logrus.Fatal(err)
return err
}
entry.Infof("%s: Servers have been loaded", version.Host)
return nil
}
func (t *taskLoadServers) validatePayload(version *models.Version) error {
if version == nil {
return errors.Errorf("taskLoadServers.validatePayload: Expected *models.Version, got nil")
}
return nil
}
func (t *taskLoadServers) getServers(version *models.Version) (map[string]string, error) {
resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers))
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't load servers", version.Host)
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't read the body", version.Host)
}
body, err := phpserialize.Decode(string(bodyBytes))
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServers.loadServers couldn't decode the body into the go value", version.Host)
}
return body.(map[string]string), nil
}

59
cron/tasks/tasks.go Normal file
View File

@ -0,0 +1,59 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmihailenco/taskq/v3"
"github.com/tribalwarshelp/cron/cron/queue"
)
const (
TaskNameLoadVersions = "loadVersions"
TaskNameLoadServers = "loadServers"
TaskNameUpdateServerData = "updateServerData"
retryLimitLoadServers = 3
)
var log = logrus.WithField("package", "tasks")
type Config struct {
DB *pg.DB
Queue queue.Queue
}
func RegisterTasks(cfg *Config) error {
if err := validateConfig(cfg); err != nil {
return errors.Wrap(err, "RegisterTasks")
}
t := &task{
db: cfg.DB,
queue: cfg.Queue,
}
taskq.RegisterTask(&taskq.TaskOptions{
Name: TaskNameLoadServers,
RetryLimit: retryLimitLoadServers,
Handler: (&taskLoadServers{t}).execute,
})
return nil
}
func Get(taskName string) *taskq.Task {
return taskq.Tasks.Get(taskName)
}
func validateConfig(cfg *Config) error {
if cfg == nil {
return errors.New("Config hasn't been provided")
}
if cfg.DB == nil {
return errors.New("cfg.DB is required")
}
if cfg.Queue == nil {
return errors.New("cfg.Queue is required")
}
return nil
}

189
db/db.go Normal file
View File

@ -0,0 +1,189 @@
package db
import (
"fmt"
gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
envutils "github.com/tribalwarshelp/cron/utils/env"
)
var log = logrus.WithField("package", "db")
type Config struct {
LogQueries bool
}
func New(cfg *Config) (*pg.DB, error) {
db := pg.Connect(prepareOptions())
if cfg != nil && cfg.LogQueries {
db.AddQueryHook(gopglogrusquerylogger.QueryLogger{
Entry: log,
MaxQueryLength: 5000,
})
}
if err := prepareDB(db); err != nil {
return nil, errors.Wrap(err, "New")
}
return db, nil
}
func prepareOptions() *pg.Options {
return &pg.Options{
User: envutils.GetenvString("DB_USER"),
Password: envutils.GetenvString("DB_PASSWORD"),
Database: envutils.GetenvString("DB_NAME"),
Addr: envutils.GetenvString("DB_HOST") + ":" + envutils.GetenvString("DB_PORT"),
PoolSize: envutils.GetenvInt("DB_POOL_SIZE"),
}
}
func prepareDB(db *pg.DB) error {
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "Couldn't start a transaction")
}
defer tx.Close()
dbModels := []interface{}{
(*models.SpecialServer)(nil),
(*models.Server)(nil),
(*models.Version)(nil),
(*models.PlayerToServer)(nil),
(*models.PlayerNameChange)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return errors.Wrap(err, "Couldn't create the table")
}
}
type statementWithParams struct {
statement string
params []interface{}
}
for _, s := range []statementWithParams{
{
statement: pgDefaultValues,
},
{
statement: allVersionsPGInsertStatements,
},
{
statement: allSpecialServersPGInsertStatements,
},
{
statement: pgDropSchemaFunctions,
params: []interface{}{pg.Safe("public")},
},
{
statement: pgFunctions,
},
} {
if _, err := tx.Exec(s.statement, s.params...); err != nil {
return errors.Wrap(err, "Couldn't initialize the db")
}
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "Couldn't commit changes")
}
var servers []*models.Server
if err := db.Model(&servers).Select(); err != nil {
return errors.Wrap(err, "Couldn't load servers")
}
for _, server := range servers {
if err := createSchema(db, server, true); err != nil {
return err
}
}
return nil
}
func CreateSchema(db *pg.DB, server *models.Server) error {
return createSchema(db, server, false)
}
func SchemaExists(db *pg.DB, schemaName string) bool {
exists, err := db.
Model().
Table("information_schema.schemata").
Where("schema_name = ?", schemaName).
Exists()
if err != nil {
return false
}
return exists
}
func createSchema(db *pg.DB, server *models.Server, init bool) error {
if !init && SchemaExists(db, server.Key) {
return nil
}
tx, err := db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
if err != nil {
return errors.Wrap(err, "CreateSchema: couldn't start a transaction")
}
defer tx.Close()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't create the schema")
}
dbModels := []interface{}{
(*models.Tribe)(nil),
(*models.Player)(nil),
(*models.Village)(nil),
(*models.Ennoblement)(nil),
(*models.ServerStats)(nil),
(*models.TribeHistory)(nil),
(*models.PlayerHistory)(nil),
(*models.TribeChange)(nil),
(*models.DailyPlayerStats)(nil),
(*models.DailyTribeStats)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
statements := []string{
serverPGFunctions,
serverPGTriggers,
serverPGDefaultValues,
}
if init {
statements = append([]string{pgDropSchemaFunctions}, statements...)
}
for _, statement := range statements {
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't initialize the schema")
}
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't commit changes")
}
return nil
}

306
db/sql_statements.go Normal file
View File

@ -0,0 +1,306 @@
package db
const (
allSpecialServersPGInsertStatements = `
INSERT INTO public.special_servers (version_code, key) VALUES ('pl', 'pls1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('en', 'ens1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('uk', 'uks1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('uk', 'master') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('it', 'its1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('hu', 'hus1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('fr', 'frs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('us', 'uss1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('nl', 'nls1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('es', 'ess1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('ro', 'ros1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('gr', 'grs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('br', 'brs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('tr', 'trs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('cs', 'css1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('de', 'des1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('ru', 'rus1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('ch', 'chs1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('pt', 'pts1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
INSERT INTO public.special_servers (version_code, key) VALUES ('sk', 'sks1') ON CONFLICT ON CONSTRAINT special_servers_version_code_key_key DO NOTHING;
`
allVersionsPGInsertStatements = `
INSERT INTO public.versions (code, name, host, timezone) VALUES ('pl', 'Polska', 'plemiona.pl', 'Europe/Warsaw') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('uk', 'United Kingdom', 'tribalwars.co.uk', 'Europe/London') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('hu', 'Hungary', 'klanhaboru.hu', 'Europe/Budapest') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('it', 'Italy', 'tribals.it', 'Europe/Rome') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('fr', 'France', 'guerretribale.fr', 'Europe/Paris') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('us', 'United States', 'tribalwars.us', 'America/New_York') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('nl', 'The Netherlands', 'tribalwars.nl', 'Europe/Amsterdam') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('es', 'Spain', 'guerrastribales.es', 'Europe/Madrid') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('ro', 'Romania', 'triburile.ro', 'Europe/Bucharest') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('gr', 'Greece', 'fyletikesmaxes.gr', 'Europe/Athens') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('br', 'Brazil', 'tribalwars.com.br', 'America/Sao_Paulo') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('tr', 'Turkey', 'klanlar.org', 'Europe/Istanbul') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('cs', 'Czech Republic', 'divokekmeny.cz', 'Europe/Prague') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('ru', 'Russia', 'voyna-plemyon.ru', 'Europe/Moscow') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('ch', 'Switerzland', 'staemme.ch', 'Europe/Zurich') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('pt', 'Portugal', 'tribalwars.com.pt', 'Europe/Lisbon') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('en', 'International', 'tribalwars.net', 'Europe/London') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('de', 'Germany', 'die-staemme.de', 'Europe/Berlin') ON CONFLICT (code) DO NOTHING;
INSERT INTO public.versions (code, name, host, timezone) VALUES ('sk', 'Slovakia', 'divoke-kmene.sk', 'Europe/Bratislava') ON CONFLICT (code) DO NOTHING;
`
pgDropSchemaFunctions = `
DO
$do$
DECLARE
funcTableRecord RECORD;
BEGIN
FOR funcTableRecord IN SELECT routine_schema, routine_name from information_schema.routines where routine_type = 'FUNCTION' AND specific_schema = '?0' LOOP
EXECUTE 'DROP FUNCTION IF EXISTS ' || funcTableRecord.routine_schema || '.' || funcTableRecord.routine_name || ' CASCADE;';
END LOOP;
END
$do$;
`
pgFunctions = `
CREATE OR REPLACE FUNCTION check_daily_growth()
RETURNS trigger AS
$BODY$
BEGIN
IF NEW.exists = false THEN
NEW.daily_growth = 0;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION check_existence()
RETURNS trigger AS
$BODY$
BEGIN
IF NEW.exists = false AND OLD.exists = true THEN
NEW.deleted_at = now();
END IF;
IF NEW.exists = true THEN
NEW.deleted_at = null;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION check_dominance()
RETURNS trigger AS
$BODY$
BEGIN
IF NEW.exists = false THEN
NEW.dominance = 0;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION update_most_points_most_villages_best_rank_last_activity()
RETURNS trigger AS
$BODY$
BEGIN
IF TG_OP = 'INSERT' THEN
IF NEW.most_points IS null OR NEW.points > NEW.most_points THEN
NEW.most_points = NEW.points;
NEW.most_points_at = now();
END IF;
IF NEW.most_villages IS null OR NEW.total_villages > NEW.most_villages THEN
NEW.most_villages = NEW.total_villages;
NEW.most_villages_at = now();
END IF;
IF NEW.best_rank IS null OR NEW.rank < NEW.best_rank OR NEW.best_rank = 0 THEN
NEW.best_rank = NEW.rank;
NEW.best_rank_at = now();
END IF;
END IF;
IF TG_OP = 'UPDATE' THEN
IF NEW.most_points IS null OR NEW.points > OLD.most_points THEN
NEW.most_points = NEW.points;
NEW.most_points_at = now();
END IF;
IF NEW.most_villages IS null OR NEW.total_villages > OLD.most_villages THEN
NEW.most_villages = NEW.total_villages;
NEW.most_villages_at = now();
END IF;
IF NEW.best_rank IS null OR NEW.rank < OLD.best_rank OR OLD.best_rank = 0 THEN
NEW.best_rank = NEW.rank;
NEW.best_rank_at = now();
END IF;
if TG_TABLE_NAME = 'players' THEN
IF NEW.points > OLD.points OR NEW.score_att > OLD.score_att THEN
NEW.last_activity_at = now();
END IF;
END IF;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
`
serverPGFunctions = `
CREATE OR REPLACE FUNCTION ?0.log_tribe_change()
RETURNS trigger AS
$BODY$
BEGIN
IF TG_OP = 'INSERT' THEN
IF NEW.tribe_id <> 0 THEN
INSERT INTO ?0.tribe_changes(player_id,old_tribe_id,new_tribe_id,created_at)
VALUES(NEW.id,0,NEW.tribe_id,now());
END IF;
END IF;
IF TG_OP = 'UPDATE' THEN
IF NEW.tribe_id <> OLD.tribe_id THEN
INSERT INTO ?0.tribe_changes(player_id,old_tribe_id,new_tribe_id,created_at)
VALUES(OLD.id,OLD.tribe_id,NEW.tribe_id,now());
END IF;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
CREATE OR REPLACE FUNCTION ?0.log_player_name_change()
RETURNS trigger AS
$BODY$
BEGIN
IF NEW.name <> OLD.name AND old.exists = true THEN
INSERT INTO player_name_changes(version_code,player_id,old_name,new_name,change_date)
VALUES(?1,NEW.id,OLD.name,NEW.name,CURRENT_DATE)
ON CONFLICT DO NOTHING;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
CREATE OR REPLACE FUNCTION ?0.get_old_and_new_owner_tribe_id()
RETURNS trigger AS
$BODY$
BEGIN
IF NEW.old_owner_id <> 0 THEN
SELECT tribe_id INTO NEW.old_owner_tribe_id
FROM ?0.players
WHERE id = NEW.old_owner_id;
END IF;
IF NEW.old_owner_tribe_id IS NULL THEN
NEW.old_owner_tribe_id = 0;
END IF;
IF NEW.new_owner_id <> 0 THEN
SELECT tribe_id INTO NEW.new_owner_tribe_id
FROM ?0.players
WHERE id = NEW.new_owner_id;
END IF;
IF NEW.new_owner_tribe_id IS NULL THEN
NEW.new_owner_tribe_id = 0;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
CREATE OR REPLACE FUNCTION ?0.insert_into_player_to_servers()
RETURNS trigger AS
$BODY$
BEGIN
INSERT INTO player_to_servers(server_key,player_id)
VALUES('?0', NEW.id)
ON CONFLICT DO NOTHING;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
`
serverPGTriggers = `
CREATE TRIGGER ?0_log_tribe_change_on_insert
AFTER INSERT
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE ?0.log_tribe_change();
CREATE TRIGGER ?0_log_tribe_change_on_update
AFTER UPDATE
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE ?0.log_tribe_change();
CREATE TRIGGER ?0_name_change
AFTER UPDATE
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE ?0.log_player_name_change();
CREATE TRIGGER ?0_check_daily_growth
BEFORE UPDATE
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE check_daily_growth();
CREATE TRIGGER ?0_check_player_existence
BEFORE UPDATE
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE check_existence();
CREATE TRIGGER ?0_check_tribe_existence
BEFORE UPDATE
ON ?0.tribes
FOR EACH ROW
EXECUTE PROCEDURE check_existence();
CREATE TRIGGER ?0_check_dominance
BEFORE UPDATE
ON ?0.tribes
FOR EACH ROW
EXECUTE PROCEDURE check_dominance();
CREATE TRIGGER ?0_update_ennoblement_old_and_new_owner_tribe_id
BEFORE INSERT
ON ?0.ennoblements
FOR EACH ROW
EXECUTE PROCEDURE ?0.get_old_and_new_owner_tribe_id();
CREATE TRIGGER ?0_insert_into_player_to_servers
AFTER INSERT
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE ?0.insert_into_player_to_servers();
CREATE TRIGGER ?0_update_most_points_most_villages_best_rank_last_activity
BEFORE INSERT OR UPDATE
ON ?0.players
FOR EACH ROW
EXECUTE PROCEDURE update_most_points_most_villages_best_rank_last_activity();
CREATE TRIGGER ?0_update_most_points_most_villages_best_rank_last_activity
BEFORE INSERT OR UPDATE
ON ?0.tribes
FOR EACH ROW
EXECUTE PROCEDURE update_most_points_most_villages_best_rank_last_activity();
`
serverPGDefaultValues = `
ALTER TABLE ?0.daily_player_stats ALTER COLUMN create_date set default CURRENT_DATE;
ALTER TABLE ?0.daily_tribe_stats ALTER COLUMN create_date set default CURRENT_DATE;
ALTER TABLE ?0.player_history ALTER COLUMN create_date set default CURRENT_DATE;
ALTER TABLE ?0.tribe_history ALTER COLUMN create_date set default CURRENT_DATE;
ALTER TABLE ?0.stats ALTER COLUMN create_date set default CURRENT_DATE;
`
pgDefaultValues = `
ALTER TABLE player_name_changes ALTER COLUMN change_date set default CURRENT_DATE;
`
)

4
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83
github.com/go-pg/pg/v10 v10.9.1
github.com/go-redis/redis/v8 v8.8.2 // indirect
github.com/go-redis/redis/v8 v8.8.2
github.com/go-redis/redis_rate/v9 v9.1.1 // indirect
github.com/joho/godotenv v1.3.0
github.com/klauspost/compress v1.12.1 // indirect
@ -14,6 +14,6 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.8.1
github.com/tribalwarshelp/shared v0.0.0-20210423190057-03d8445d35dc
github.com/vmihailenco/taskq/v3 v3.2.3 // indirect
github.com/vmihailenco/taskq/v3 v3.2.3
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect
)

10
go.sum
View File

@ -6,6 +6,7 @@ github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa0
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7/go.mod h1:ADHVWnGlWcRn1aGthuh7I1Lrn6zzsjkVJju151dXyDw=
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83 h1:Oa8Bk4LNcknxw50gZOlvPwEreOlAbOnu7V82lUYNbOM=
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83/go.mod h1:+iGkf5HfOVeRVd9K7qQDucIl+/Kt3MyenMa90b/O/c4=
github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc=
github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/bsm/redislock v0.7.0 h1:RL7aZJhCKkuBjQbnSTKCeedTRifBWxd/ffP+GZ599Mo=
github.com/bsm/redislock v0.7.0/go.mod h1:3Kgu+cXw0JrkZ5pmY/JbcFpixGZ5M9v9G2PGWYqku+k=
@ -18,6 +19,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
@ -63,14 +65,17 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac h1:w5wltlINIIqRTqQ64dASrCo0fM7k9nosPbKCZnkL0W0=
github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac/go.mod h1:gyMTRVO+ZkEy7wQDyD++okPsBN2q127EpuShhHMWG54=
github.com/jeffh/go.bdd v0.0.0-20120717032931-88f798ee0c74/go.mod h1:qNa9FlAfO0U/qNkzYBMH1JKYRMzC+sP9IcyV4U18l98=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
@ -89,14 +94,14 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -221,7 +226,6 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=

98
main.go
View File

@ -1,19 +1,23 @@
package main
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/mode"
_cron "github.com/tribalwarshelp/cron/cron"
twhelpcron "github.com/tribalwarshelp/cron/cron"
"github.com/tribalwarshelp/cron/cron/queue"
"github.com/tribalwarshelp/cron/cron/tasks"
"github.com/tribalwarshelp/cron/db"
envutils "github.com/tribalwarshelp/cron/utils/env"
gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10"
"github.com/go-pg/pg/v10"
"github.com/joho/godotenv"
"github.com/robfig/cron/v3"
)
@ -29,38 +33,44 @@ func init() {
}
func main() {
dbOptions := &pg.Options{
User: os.Getenv("DB_USER"),
Password: os.Getenv("DB_PASSWORD"),
Database: os.Getenv("DB_NAME"),
Addr: os.Getenv("DB_HOST") + ":" + os.Getenv("DB_PORT"),
PoolSize: mustParseEnvToInt("DB_POOL_SIZE"),
client, err := initializeRedis()
if err != nil {
logrus.Fatal(errors.Wrap(err, "Error establishing a redis connection"))
}
dbFields := logrus.Fields{
"user": dbOptions.User,
"database": dbOptions.Database,
"addr": dbOptions.Addr,
defer client.Close()
conn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Error establishing a database connection"))
}
db := pg.Connect(dbOptions)
defer func() {
if err := db.Close(); err != nil {
logrus.WithFields(dbFields).Fatalln(err)
}
}()
if strings.ToUpper(os.Getenv("LOG_DB_QUERIES")) == "TRUE" {
db.AddQueryHook(gopglogrusquerylogger.QueryLogger{
Entry: logrus.NewEntry(logrus.StandardLogger()),
})
defer conn.Close()
logrus.Info("Connection with the database has been established")
queue, err := queue.New(&queue.Config{
WorkerLimit: envutils.GetenvInt("MAX_CONCURRENT_WORKERS"),
Redis: client,
})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't create the task queue"))
}
tasks.RegisterTasks(&tasks.Config{
DB: conn,
Queue: queue,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := queue.Start(ctx); err != nil {
logrus.Fatal(err)
}
logrus.WithFields(dbFields).Info("Connection with the database has been established")
c := cron.New(cron.WithChain(
cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))),
))
if err := _cron.Attach(c, _cron.Config{
DB: db,
MaxConcurrentWorkers: mustParseEnvToInt("MAX_CONCURRENT_WORKERS"),
RunOnStartup: os.Getenv("RUN_ON_STARTUP") == "true",
if err := twhelpcron.Attach(c, twhelpcron.Config{
DB: conn,
MaxConcurrentWorkers: envutils.GetenvInt("MAX_CONCURRENT_WORKERS"),
RunOnStartup: envutils.GetenvBool("RUN_ON_STARTUP"),
Queue: queue,
}); err != nil {
logrus.Fatal(err)
}
@ -74,18 +84,9 @@ func main() {
<-channel
logrus.Info("shutting down")
}
func mustParseEnvToInt(key string) int {
str := os.Getenv(key)
if str == "" {
return 0
if err := queue.Close(); err != nil {
logrus.Fatal(err)
}
i, err := strconv.Atoi(str)
if err != nil {
return 0
}
return i
}
func setupLogger() {
@ -105,3 +106,18 @@ func setupLogger() {
logrus.SetFormatter(customFormatter)
}
}
func initializeRedis() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: envutils.GetenvString("REDIS_ADDR"),
Username: envutils.GetenvString("REDIS_USERNAME"),
Password: envutils.GetenvString("REDIS_PASSWORD"),
DB: envutils.GetenvInt("REDIS_DB"),
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, errors.Wrap(err, "initializeRedis")
}
return client, nil
}

30
utils/env/env.go vendored Normal file
View File

@ -0,0 +1,30 @@
package envutils
import (
"os"
"strconv"
)
func GetenvInt(key string) int {
str := GetenvString(key)
if str == "" {
return 0
}
i, err := strconv.Atoi(str)
if err != nil {
return 0
}
return i
}
func GetenvBool(key string) bool {
str := GetenvString(key)
if str == "" {
return false
}
return str == "true" || str == "1"
}
func GetenvString(key string) string {
return os.Getenv(key)
}