Merge pull request #2 from tribalwarshelp/taskq

feat: taskq
This commit is contained in:
Dawid Wysokiński 2021-05-01 08:41:31 +02:00 committed by GitHub
commit ff6ad10c27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1951 additions and 1043 deletions

View File

@ -1,4 +1,4 @@
# tribalwarshelp.com cron
# TWHelp cron
Features:
@ -18,17 +18,26 @@ DB_PORT=5432
DB_HOST=your_db_host
DB_PASSWORD=your_db_pass
MAX_CONCURRENT_WORKERS=1 #how many servers should update at the same time
REDIS_ADDR=redis_addr
REDIS_DB=redis_db
REDIS_USER=redis_user
REDIS_PASSWORD=redis_password
RUN_ON_INIT=true|false
LOG_DB_QUERIES=true|false
WORKER_LIMIT=1
```
### Prerequisites
1. Golang
2. PostgreSQL database
2. PostgreSQL
3. Redis
### Installing
1. Clone this repo.
2. Navigate to the directory where you have cloned this repo.
3. Set the required env variables directly in your system or create .env.development file.
3. Set the required env variables directly in your system or create .env.local file.
4. go run main.go

View File

@ -1,83 +0,0 @@
package cron
import (
"fmt"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/utils"
"github.com/go-pg/pg/v10"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("package", "cron")
type Config struct {
DB *pg.DB
MaxConcurrentWorkers int
RunOnStartup bool
}
func Attach(c *cron.Cron, cfg Config) error {
if cfg.DB == nil {
return fmt.Errorf("cfg.DB cannot be nil, expected *pg.DB")
}
h := &handler{db: cfg.DB, maxConcurrentWorkers: cfg.MaxConcurrentWorkers}
if err := h.init(); err != nil {
return err
}
versions := []*models.Version{}
if err := cfg.DB.Model(&versions).DistinctOn("timezone").Select(); err != nil {
return err
}
updateServerData := utils.TrackExecutionTime(log, h.updateServerData, "updateServerData")
vacuumDatabase := utils.TrackExecutionTime(log, h.vacuumDatabase, "vacuumDatabase")
updateServerEnnoblements := utils.TrackExecutionTime(log, h.updateServerEnnoblements, "updateServerEnnoblements")
updateHistoryFuncs := []func(){}
updateStatsFuncs := []func(){}
for _, version := range versions {
updateHistory := utils.TrackExecutionTime(log,
createFnWithTimezone(version.Timezone, h.updateHistory),
fmt.Sprintf("%s: updateHistory", version.Timezone))
updateHistoryFuncs = append(updateHistoryFuncs, updateHistory)
updateStats := utils.TrackExecutionTime(log,
createFnWithTimezone(version.Timezone, h.updateStats),
fmt.Sprintf("%s: updateStats", version.Timezone))
updateStatsFuncs = append(updateStatsFuncs, updateStats)
if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 30 1 * * *", version.Timezone), updateHistory); err != nil {
return err
}
if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 45 1 * * *", version.Timezone), updateStats); err != nil {
return err
}
}
if _, err := c.AddFunc("0 * * * *", updateServerData); err != nil {
return err
}
if _, err := c.AddFunc("20 1 * * *", vacuumDatabase); err != nil {
return err
}
if _, err := c.AddFunc("@every 1m", updateServerEnnoblements); err != nil {
return err
}
if cfg.RunOnStartup {
go func() {
updateServerData()
vacuumDatabase()
for _, fn := range updateHistoryFuncs {
go fn()
}
for _, fn := range updateStatsFuncs {
go fn()
}
}()
}
return nil
}

View File

@ -1,463 +0,0 @@
package cron
import (
"fmt"
"io/ioutil"
"net/http"
"runtime"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
phpserialize "github.com/Kichiyaki/go-php-serialize"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
)
const (
endpointGetServers = "/backend/get_servers.php"
)
type handler struct {
db *pg.DB
maxConcurrentWorkers int
pool *pool
}
func (h *handler) init() error {
if h.maxConcurrentWorkers <= 0 {
h.maxConcurrentWorkers = runtime.NumCPU()
}
if h.pool == nil {
h.pool = newPool(h.maxConcurrentWorkers)
}
tx, err := h.db.Begin()
if err != nil {
return err
}
defer tx.Close()
dbModels := []interface{}{
(*models.SpecialServer)(nil),
(*models.Server)(nil),
(*models.Version)(nil),
(*models.PlayerToServer)(nil),
(*models.PlayerNameChange)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
type statementWithParams struct {
statement string
params []interface{}
}
for _, s := range []statementWithParams{
{
statement: pgDefaultValues,
},
{
statement: allVersionsPGInsertStatements,
},
{
statement: allSpecialServersPGInsertStatements,
},
{
statement: pgDropSchemaFunctions,
params: []interface{}{pg.Safe("public")},
},
{
statement: pgFunctions,
},
} {
if _, err := tx.Exec(s.statement, s.params...); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
servers := []*models.Server{}
if err := h.db.Model(&servers).Select(); err != nil {
return err
}
for _, server := range servers {
if err := h.createSchema(server, true); err != nil {
return err
}
}
return nil
}
func (h *handler) createSchema(server *models.Server, init bool) error {
if !init {
exists, err := h.db.Model().Table("information_schema.schemata").Where("schema_name = ?", server.Key).Exists()
if err != nil {
return err
}
if exists {
return nil
}
}
tx, err := h.db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
if err != nil {
return err
}
defer tx.Close()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return err
}
dbModels := []interface{}{
(*models.Tribe)(nil),
(*models.Player)(nil),
(*models.Village)(nil),
(*models.Ennoblement)(nil),
(*models.ServerStats)(nil),
(*models.TribeHistory)(nil),
(*models.PlayerHistory)(nil),
(*models.TribeChange)(nil),
(*models.DailyPlayerStats)(nil),
(*models.DailyTribeStats)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
statements := []string{
serverPGFunctions,
serverPGTriggers,
serverPGDefaultValues,
}
if init {
statements = append([]string{pgDropSchemaFunctions}, statements...)
}
for _, statement := range statements {
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
return err
}
}
return tx.Commit()
}
func (h *handler) getServers() ([]*models.Server, map[string]string, error) {
versions := []*models.Version{}
if err := h.db.Model(&versions).Relation("SpecialServers").Order("code ASC").Select(); err != nil {
return nil, nil, errors.Wrap(err, "getServers")
}
serverKeys := []string{}
servers := []*models.Server{}
urls := make(map[string]string)
loadedVersions := []models.VersionCode{}
for _, version := range versions {
log := log.WithField("host", version.Host)
log.Infof("Loading servers from %s", version.Host)
resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers))
if err != nil {
log.Errorln(errors.Wrapf(err, "fetching servers from %s", version.Host))
continue
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorln(errors.Wrapf(err, "reading response body from %s", version.Host))
continue
}
body, err := phpserialize.Decode(string(bodyBytes))
if err != nil {
log.Errorln(errors.Wrapf(err, "serializing body from %s into go value", version.Host))
continue
}
for serverKey, url := range body.(map[interface{}]interface{}) {
serverKeyStr := serverKey.(string)
if version.SpecialServers.Contains(serverKeyStr) {
continue
}
server := &models.Server{
Key: serverKeyStr,
Status: models.ServerStatusOpen,
VersionCode: version.Code,
Version: version,
}
if err := h.createSchema(server, false); err != nil {
log.WithField("serverKey", serverKey).Errorln(errors.Wrapf(err, "cannot create schema for %s", serverKey))
continue
}
serverKeys = append(serverKeys, serverKeyStr)
urls[serverKeyStr] = url.(string)
servers = append(servers, server)
}
loadedVersions = append(loadedVersions, version.Code)
}
if len(servers) > 0 {
if _, err := h.db.Model(&servers).
OnConflict("(key) DO UPDATE").
Set("status = ?", models.ServerStatusOpen).
Set("version_code = EXCLUDED.version_code").
Returning("*").
Insert(); err != nil {
return nil, nil, err
}
}
if _, err := h.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND version_code IN (?)", pg.In(serverKeys), pg.In(loadedVersions)).
Update(); err != nil {
return nil, nil, err
}
return servers, urls, nil
}
func (h *handler) updateServerData() {
servers, urls, err := h.getServers()
if err != nil {
log.Errorln("updateServerData:", err.Error())
return
}
log.
WithField("numberOfServers", len(servers)).
Info("updateServerData: servers loaded")
var wg sync.WaitGroup
for _, server := range servers {
log := log.WithField("serverKey", server.Key)
url, ok := urls[server.Key]
if !ok {
log.Warnf("No one URL associated with key: %s, skipping...", server.Key)
continue
}
h.pool.waitForWorker()
wg.Add(1)
sh := &updateServerDataWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
dataloader: dataloader.New(&dataloader.Config{
BaseURL: url,
Client: &http.Client{
Timeout: 10 * time.Second,
},
}),
}
go func(worker *updateServerDataWorker, server *models.Server, url string, log *logrus.Entry) {
defer func() {
h.pool.releaseWorker()
wg.Done()
}()
log.Infof("updateServerData: %s: updating data", server.Key)
err := sh.update()
if err != nil {
log.Errorln("updateServerData:", errors.Wrap(err, server.Key))
return
}
log.Infof("updateServerData: %s: data updated", server.Key)
}(sh, server, url, log)
}
wg.Wait()
}
func (h *handler) updateServerEnnoblements() {
servers := []*models.Server{}
if err := h.db.Model(&servers).Relation("Version").Where("status = ?", models.ServerStatusOpen).Select(); err != nil {
log.Error(errors.Wrap(err, "updateServerEnnoblements: cannot load ennoblements"))
}
log.
WithField("numberOfServers", len(servers)).
Info("updateServerEnnoblements: servers loaded")
var wg sync.WaitGroup
pool := newPool(h.maxConcurrentWorkers)
for _, server := range servers {
pool.waitForWorker()
wg.Add(1)
sh := &updateServerEnnoblementsWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
dataloader: dataloader.New(&dataloader.Config{
BaseURL: fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host),
}),
}
go func(worker *updateServerEnnoblementsWorker, server *models.Server) {
defer func() {
pool.releaseWorker()
wg.Done()
}()
log := log.WithField("serverKey", server.Key)
err := sh.update()
if err != nil {
log.Errorln("updateServerEnnoblements:", errors.Wrap(err, server.Key))
return
}
}(sh, server)
}
wg.Wait()
}
func (h *handler) updateHistory(location *time.Location) {
servers := []*models.Server{}
log := log.WithField("timezone", location.String())
year, month, day := time.Now().In(location).Date()
t := time.Date(year, month, day, 1, 30, 0, 0, location)
err := h.db.
Model(&servers).
Where(
"status = ? AND (history_updated_at IS NULL OR history_updated_at < ?) AND timezone = ?",
models.ServerStatusOpen,
t,
location.String(),
).
Relation("Version").
Select()
if err != nil {
log.Errorln(errors.Wrap(err, "updateHistory"))
return
}
log.
WithField("numberOfServers", len(servers)).
Info("updateHistory: servers loaded")
var wg sync.WaitGroup
for _, server := range servers {
h.pool.waitForWorker()
wg.Add(1)
worker := &updateServerHistoryWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
location: location,
}
go func(server *models.Server, worker *updateServerHistoryWorker) {
defer func() {
h.pool.releaseWorker()
wg.Done()
}()
log := log.WithField("serverKey", server.Key)
log.Infof("updateHistory: %s: updating history", server.Key)
if err := worker.update(); err != nil {
log.Errorln("updateHistory:", errors.Wrap(err, server.Key))
return
}
log.Infof("updateHistory: %s: history updated", server.Key)
}(server, worker)
}
wg.Wait()
}
func (h *handler) updateStats(location *time.Location) {
servers := []*models.Server{}
log := log.WithField("timezone", location.String())
year, month, day := time.Now().In(location).Date()
t := time.Date(year, month, day, 1, 45, 0, 0, location)
err := h.db.
Model(&servers).
Where(
"status = ? AND (stats_updated_at IS NULL OR stats_updated_at < ?) AND timezone = ?",
models.ServerStatusOpen,
t,
location.String(),
).
Relation("Version").
Select()
if err != nil {
log.Errorf(errors.Wrap(err, "updateServerStats").Error())
return
}
log.WithField("numberOfServers", len(servers)).Info("updateServerStats: servers loaded")
var wg sync.WaitGroup
for _, server := range servers {
h.pool.waitForWorker()
wg.Add(1)
worker := &updateServerStatsWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
location: location,
}
go func(server *models.Server, worker *updateServerStatsWorker) {
defer func() {
h.pool.releaseWorker()
wg.Done()
}()
log := log.WithField("serverKey", server.Key)
log.Infof("updateServerStats: %s: updating stats", server.Key)
if err := worker.update(); err != nil {
log.Errorln("updateServerStats:", errors.Wrap(err, server.Key))
return
}
log.Infof("updateServerStats: %s: stats updated", server.Key)
}(server, worker)
}
wg.Wait()
}
func (h *handler) vacuumDatabase() {
servers := []*models.Server{}
err := h.db.
Model(&servers).
Select()
if err != nil {
log.Errorln(errors.Wrap(err, "vacuumDatabase"))
return
}
var wg sync.WaitGroup
for _, server := range servers {
h.pool.waitForWorker()
wg.Add(1)
worker := &vacuumServerDBWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
}
go func(server *models.Server, worker *vacuumServerDBWorker) {
defer func() {
h.pool.releaseWorker()
wg.Done()
}()
log := log.WithField("serverKey", server.Key)
log.Infof("vacuumDatabase: %s: vacuuming database", server.Key)
if err := worker.vacuum(); err != nil {
log.Errorln("vacuumDatabase:", errors.Wrap(err, server.Key))
return
}
log.Infof("vacuumDatabase: %s: database vacuumed", server.Key)
}(server, worker)
}
wg.Wait()
}

View File

@ -1,90 +0,0 @@
package cron
import (
"compress/gzip"
"encoding/csv"
"encoding/xml"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/tribalwarshelp/shared/models"
)
var client = &http.Client{
Timeout: 20 * time.Second,
}
func newCsvReader(r io.Reader) *csv.Reader {
csvReader := csv.NewReader(r)
csvReader.LazyQuotes = true
return csvReader
}
func uncompressAndReadCsvLines(r io.Reader) ([][]string, error) {
uncompressedStream, err := gzip.NewReader(r)
if err != nil {
return [][]string{}, err
}
defer uncompressedStream.Close()
return newCsvReader(uncompressedStream).ReadAll()
}
func getCSVData(url string, compressed bool) ([][]string, error) {
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if !compressed {
return newCsvReader(resp.Body).ReadAll()
}
return uncompressAndReadCsvLines(resp.Body)
}
func getXML(url string, decode interface{}) error {
resp, err := client.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return xml.Unmarshal(bytes, decode)
}
func countPlayerVillages(villages []*models.Village) int {
count := 0
for _, village := range villages {
if village.PlayerID != 0 {
count++
}
}
return count
}
func getDateDifferenceInDays(t1, t2 time.Time) int {
diff := t1.Sub(t2)
return int(diff.Hours() / 24)
}
func calcPlayerDailyGrowth(diffInDays, points int) int {
if diffInDays > 0 {
return points / diffInDays
}
return 0
}
func createFnWithTimezone(timezone string, fn func(location *time.Location)) func() {
tz, err := time.LoadLocation(timezone)
if err != nil {
tz = time.UTC
}
return func() {
fn(tz)
}
}

View File

@ -1,22 +0,0 @@
package cron
type pool struct {
workers chan bool
}
func newPool(capacity int) *pool {
p := &pool{}
p.workers = make(chan bool, capacity)
for i := 0; i < capacity; i++ {
p.releaseWorker()
}
return p
}
func (p *pool) releaseWorker() {
p.workers <- true
}
func (p *pool) waitForWorker() bool {
return <-p.workers
}

View File

@ -1,77 +0,0 @@
package cron
import "github.com/tribalwarshelp/shared/models"
type tribesSearchableByID struct {
tribes []*models.Tribe
}
func (searchable tribesSearchableByID) GetID(index int) int {
return searchable.tribes[index].ID
}
func (searchable tribesSearchableByID) Len() int {
return len(searchable.tribes)
}
type playersSearchableByID struct {
players []*models.Player
}
func (searchable playersSearchableByID) GetID(index int) int {
return searchable.players[index].ID
}
func (searchable playersSearchableByID) Len() int {
return len(searchable.players)
}
type ennoblementsSearchableByNewOwnerID struct {
ennoblements []*models.Ennoblement
}
func (searchable ennoblementsSearchableByNewOwnerID) GetID(index int) int {
return searchable.ennoblements[index].NewOwnerID
}
func (searchable ennoblementsSearchableByNewOwnerID) Len() int {
return len(searchable.ennoblements)
}
type searchableByID interface {
GetID(index int) int
Len() int
}
func makePlayersSearchable(players []*models.Player) searchableByID {
return playersSearchableByID{
players: players,
}
}
func makeTribesSearchable(tribes []*models.Tribe) searchableByID {
return tribesSearchableByID{
tribes: tribes,
}
}
func searchByID(haystack searchableByID, id int) int {
low := 0
high := haystack.Len() - 1
for low <= high {
median := (low + high) / 2
if haystack.GetID(median) < id {
low = median + 1
} else {
high = median - 1
}
}
if low == haystack.Len() || haystack.GetID(low) != id {
return -1
}
return low
}

View File

@ -1,44 +0,0 @@
package cron
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
)
type updateServerEnnoblementsWorker struct {
db *pg.DB
dataloader dataloader.DataLoader
server *models.Server
}
func (w *updateServerEnnoblementsWorker) loadEnnoblements() ([]*models.Ennoblement, error) {
lastEnnoblement := &models.Ennoblement{}
if err := w.db.
Model(lastEnnoblement).
Limit(1).
Order("ennobled_at DESC").
Select(); err != nil && err != pg.ErrNoRows {
return nil, errors.Wrapf(err, "cannot load last ennoblement")
}
return w.dataloader.LoadEnnoblements(&dataloader.LoadEnnoblementsConfig{
EnnobledAtGT: lastEnnoblement.EnnobledAt,
})
}
func (w *updateServerEnnoblementsWorker) update() error {
ennoblements, err := w.loadEnnoblements()
if err != nil {
return err
}
if len(ennoblements) > 0 {
if _, err := w.db.Model(&ennoblements).Insert(); err != nil {
return errors.Wrap(err, "cannot insert ennoblements")
}
}
return nil
}

View File

@ -1,85 +0,0 @@
package cron
import (
"time"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
)
type updateServerHistoryWorker struct {
db *pg.DB
server *models.Server
location *time.Location
}
func (w *updateServerHistoryWorker) update() error {
players := []*models.Player{}
if err := w.db.Model(&players).Where("exists = true").Select(); err != nil {
return errors.Wrap(err, "cannot load players")
}
now := time.Now().In(w.location)
createDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
ph := []*models.PlayerHistory{}
for _, player := range players {
ph = append(ph, &models.PlayerHistory{
OpponentsDefeated: player.OpponentsDefeated,
PlayerID: player.ID,
TotalVillages: player.TotalVillages,
Points: player.Points,
Rank: player.Rank,
TribeID: player.TribeID,
CreateDate: createDate,
})
}
tribes := []*models.Tribe{}
if err := w.db.Model(&tribes).Where("exists = true").Select(); err != nil {
return errors.Wrap(err, "cannot load tribes")
}
th := []*models.TribeHistory{}
for _, tribe := range tribes {
th = append(th, &models.TribeHistory{
OpponentsDefeated: tribe.OpponentsDefeated,
TribeID: tribe.ID,
TotalMembers: tribe.TotalMembers,
TotalVillages: tribe.TotalVillages,
Points: tribe.Points,
AllPoints: tribe.AllPoints,
Rank: tribe.Rank,
Dominance: tribe.Dominance,
CreateDate: createDate,
})
}
tx, err := w.db.Begin()
if err != nil {
return err
}
defer tx.Close()
if len(ph) > 0 {
if _, err := w.db.Model(&ph).Insert(); err != nil {
return errors.Wrap(err, "cannot insert players history")
}
}
if len(th) > 0 {
if _, err := w.db.Model(&th).Insert(); err != nil {
return errors.Wrap(err, "cannot insert tribes history")
}
}
if _, err := tx.Model(w.server).
Set("history_updated_at = ?", time.Now()).
WherePK().
Returning("*").
Update(); err != nil {
return errors.Wrap(err, "cannot update server")
}
return tx.Commit()
}

View File

@ -1,62 +0,0 @@
package cron
import (
"time"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
)
const (
day = 24 * time.Hour
)
type vacuumServerDBWorker struct {
db *pg.DB
}
func (w *vacuumServerDBWorker) vacuum() error {
tx, err := w.db.Begin()
if err != nil {
return err
}
defer tx.Close()
withNonExistentPlayers := w.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'")
withNonExistentTribes := w.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'")
_, err = tx.Model(&models.PlayerHistory{}).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old player history records")
}
_, err = tx.Model(&models.TribeHistory{}).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old tribe history records")
}
_, err = tx.Model(&models.DailyPlayerStats{}).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old player stats records")
}
_, err = tx.Model(&models.DailyTribeStats{}).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old tribe stats records")
}
return tx.Commit()
}

3
dev.sh
View File

@ -1,3 +0,0 @@
#!/bin/sh
export MODE=development
go run main.go

6
go.mod
View File

@ -3,13 +3,17 @@ module github.com/tribalwarshelp/cron
go 1.16
require (
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210428180109-fb97298564d9
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83
github.com/go-pg/pg/v10 v10.9.1
github.com/go-redis/redis/v8 v8.8.2
github.com/go-redis/redis_rate/v9 v9.1.1 // indirect
github.com/joho/godotenv v1.3.0
github.com/klauspost/compress v1.12.1 // indirect
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.8.1
github.com/tribalwarshelp/shared v0.0.0-20210423190057-03d8445d35dc
github.com/vmihailenco/taskq/v3 v3.2.3
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect
)

89
go.sum
View File

@ -1,23 +1,46 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7 h1:7IdSzhdupqm4AC3UDH9b5gdCDE2SlX6qkVC0zwqAuLA=
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210423175217-c83fa01c60d7/go.mod h1:ADHVWnGlWcRn1aGthuh7I1Lrn6zzsjkVJju151dXyDw=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210428180109-fb97298564d9 h1:S/08K0AD4bXYeSPJKei8ZbumDy1JNARZsgYbNZgr9Tk=
github.com/Kichiyaki/go-pg-logrus-query-logger/v10 v10.0.0-20210428180109-fb97298564d9/go.mod h1:ADHVWnGlWcRn1aGthuh7I1Lrn6zzsjkVJju151dXyDw=
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83 h1:Oa8Bk4LNcknxw50gZOlvPwEreOlAbOnu7V82lUYNbOM=
github.com/Kichiyaki/go-php-serialize v0.0.0-20200601110855-47b6982acf83/go.mod h1:+iGkf5HfOVeRVd9K7qQDucIl+/Kt3MyenMa90b/O/c4=
github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc=
github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/bsm/redislock v0.7.0 h1:RL7aZJhCKkuBjQbnSTKCeedTRifBWxd/ffP+GZ599Mo=
github.com/bsm/redislock v0.7.0/go.mod h1:3Kgu+cXw0JrkZ5pmY/JbcFpixGZ5M9v9G2PGWYqku+k=
github.com/capnm/sysinfo v0.0.0-20130621111458-5909a53897f3 h1:IHZ1Le1ejzkmS7Si7dIzJvYDWe+BIoNmqMnfWHBZSVw=
github.com/capnm/sysinfo v0.0.0-20130621111458-5909a53897f3/go.mod h1:M5XHQLu90v2JNm/bW2tdsYar+5vhV0gEcBcmDBNAN1Y=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-pg/pg/v10 v10.9.1 h1:kU4t84zWGGaU0Qsu49FbNtToUVrlSTkNOngW8aQmwvk=
github.com/go-pg/pg/v10 v10.9.1/go.mod h1:rgmTPgHgl5EN2CNKKoMwC7QT62t8BqsdpEkUQuiZMQs=
github.com/go-pg/zerochecker v0.2.0 h1:pp7f72c3DobMWOb2ErtZsnrPaSvHd2W4o9//8HtF4mU=
github.com/go-pg/zerochecker v0.2.0/go.mod h1:NJZ4wKL0NmTtz0GKCoJ8kym6Xn/EQzXRl2OnAe7MmDo=
github.com/go-redis/redis/v8 v8.1.0/go.mod h1:isLoQT/NFSP7V67lyvM9GmdvLdyZ7pEhsXvvyQtnQTo=
github.com/go-redis/redis/v8 v8.3.4/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU=
github.com/go-redis/redis/v8 v8.4.0/go.mod h1:A1tbYoHSa1fXwN+//ljcCYYJeLmVrwL9hbQN45Jdy0M=
github.com/go-redis/redis/v8 v8.8.2 h1:O/NcHqobw7SEptA0yA6up6spZVFtwE06SXM8rgLtsP8=
github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-redis/redis_rate/v9 v9.1.0/go.mod h1:jjU9YxOSZ3cz0yj1QJVAJiy5ueKmL9o4AySJHcKyTSE=
github.com/go-redis/redis_rate/v9 v9.1.1 h1:7SIrbnhQ7zsTNEgIvprFhJf7/+l3wSpZc2iRVwUmaq8=
github.com/go-redis/redis_rate/v9 v9.1.1/go.mod h1:jjU9YxOSZ3cz0yj1QJVAJiy5ueKmL9o4AySJHcKyTSE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -30,18 +53,36 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac h1:w5wltlINIIqRTqQ64dASrCo0fM7k9nosPbKCZnkL0W0=
github.com/iron-io/iron_go3 v0.0.0-20190916120531-a4a7f74b73ac/go.mod h1:gyMTRVO+ZkEy7wQDyD++okPsBN2q127EpuShhHMWG54=
github.com/jeffh/go.bdd v0.0.0-20120717032931-88f798ee0c74/go.mod h1:qNa9FlAfO0U/qNkzYBMH1JKYRMzC+sP9IcyV4U18l98=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.1 h1:/+xsCsk06wE38cyiqOR/o7U2fSftcH72xD+BQXmja/g=
github.com/klauspost/compress v1.12.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
@ -52,12 +93,16 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M=
github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA=
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -65,6 +110,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
@ -80,6 +126,7 @@ github.com/tribalwarshelp/shared v0.0.0-20210423190057-03d8445d35dc h1:giWPsD/6W
github.com/tribalwarshelp/shared v0.0.0-20210423190057-03d8445d35dc/go.mod h1:CDQvesBYmSyGDl5X37xfa+ub55ZbikrHDuIZ4AcfM8I=
github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94=
github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ=
github.com/vmihailenco/msgpack/v5 v5.0.0/go.mod h1:HVxBVPUK/+fZMonk4bi1islLa8V3cfnBug0+4dykPzo=
github.com/vmihailenco/msgpack/v5 v5.3.0/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/msgpack/v5 v5.3.1 h1:0i85a4dsZh8mC//wmyyTEzidDLPQfQAxZIOLtafGbFY=
github.com/vmihailenco/msgpack/v5 v5.3.1/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
@ -87,6 +134,12 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb
github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/vmihailenco/taskq/v3 v3.2.3 h1:6VeFiv0o9iXbKULgPFy4JzWzFZvrEjifF/lLMLUNbm0=
github.com/vmihailenco/taskq/v3 v3.2.3/go.mod h1:RQHAGqec6G4RcjZ5sLNqkWWjhLddBQPLxU/x1+RLsCA=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0=
go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY=
go.opentelemetry.io/otel v0.14.0/go.mod h1:vH5xEuwy7Rts0GNtsCW3HYQoZDY+OmBJ6t1bFGGlxgw=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
@ -101,40 +154,59 @@ go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52l
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b h1:7mWr3k41Qtv8XlltBkDkl8LoP3mpSgBW8BUoxtEdbXg=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925/go.mod h1:1phAWC201xIgDyaFpmDeZkgf70Q4Pd/CNqfRtVPtxNw=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201027133719-8eef5233e2a1/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
@ -149,8 +221,13 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
@ -170,6 +247,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
@ -177,6 +255,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

26
internal/cron/config.go Normal file
View File

@ -0,0 +1,26 @@
package cron
import (
"github.com/go-pg/pg/v10"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
)
type Config struct {
DB *pg.DB
Redis redis.UniversalClient
RunOnInit bool
Opts []cron.Option
WorkerLimit int
}
func validateConfig(cfg *Config) error {
if cfg == nil || cfg.DB == nil {
return errors.New("validateConfig: cfg.DB is required")
}
if cfg.Redis == nil {
return errors.New("validateConfig: cfg.Redis is required")
}
return nil
}

186
internal/cron/cron.go Normal file
View File

@ -0,0 +1,186 @@
package cron
import (
"context"
"fmt"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/robfig/cron/v3"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/internal/cron/queue"
"github.com/tribalwarshelp/cron/internal/cron/tasks"
)
type Cron struct {
*cron.Cron
queue queue.Queue
db *pg.DB
runOnInit bool
log logrus.FieldLogger
}
func New(cfg *Config) (*Cron, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
q, err := initializeQueue(cfg)
if err != nil {
return nil, err
}
c := &Cron{
Cron: cron.New(cfg.Opts...),
queue: q,
db: cfg.DB,
runOnInit: cfg.RunOnInit,
log: logrus.WithField("package", "cron"),
}
if err := c.init(); err != nil {
return nil, err
}
return c, nil
}
func (c *Cron) init() error {
var versions []*models.Version
if err := c.db.Model(&versions).DistinctOn("timezone").Select(); err != nil {
return errors.Wrap(err, "Cron.init: couldn't load versions")
}
var updateHistoryFuncs []func()
var updateStatsFuncs []func()
for _, version := range versions {
updateHistory := createFnWithTimezone(version.Timezone, c.updateHistory)
updateHistoryFuncs = append(updateHistoryFuncs, updateHistory)
updateStats := createFnWithTimezone(version.Timezone, c.updateStats)
updateStatsFuncs = append(updateStatsFuncs, updateStats)
if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 30 1 * * *", version.Timezone), updateHistory); err != nil {
return err
}
if _, err := c.AddFunc(fmt.Sprintf("CRON_TZ=%s 45 1 * * *", version.Timezone), updateStats); err != nil {
return err
}
}
if _, err := c.AddFunc("0 * * * *", c.updateServerData); err != nil {
return err
}
if _, err := c.AddFunc("20 1 * * *", c.vacuumDatabase); err != nil {
return err
}
if _, err := c.AddFunc("10 1 * * *", c.deleteNonExistentVillages); err != nil {
return err
}
if _, err := c.AddFunc("@every 1m", c.updateEnnoblements); err != nil {
return err
}
if c.runOnInit {
go func() {
c.updateServerData()
c.vacuumDatabase()
for _, fn := range updateHistoryFuncs {
fn()
}
for _, fn := range updateStatsFuncs {
fn()
}
}()
}
return nil
}
func (c *Cron) Start(ctx context.Context) error {
if err := c.queue.Start(ctx); err != nil {
return errors.Wrap(err, "Cron.Start")
}
c.Cron.Start()
return nil
}
func (c *Cron) Stop() error {
c.Cron.Stop()
if err := c.queue.Close(); err != nil {
return errors.Wrap(err, "Cron.Stop")
}
return nil
}
func (c *Cron) updateServerData() {
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameLoadVersionsAndUpdateServerData).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.updateServerData", tasks.TaskNameLoadVersionsAndUpdateServerData, err)
}
}
func (c *Cron) updateEnnoblements() {
err := c.queue.Add(queue.EnnoblementsQueue, tasks.Get(tasks.TaskUpdateEnnoblements).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.updateEnnoblements", tasks.TaskUpdateEnnoblements, err)
}
}
func (c *Cron) updateHistory(timezone string) {
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateHistory).WithArgs(context.Background(), timezone))
if err != nil {
c.logError("Cron.updateHistory", tasks.TaskUpdateHistory, err)
}
}
func (c *Cron) updateStats(timezone string) {
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskUpdateStats).WithArgs(context.Background(), timezone))
if err != nil {
c.logError("Cron.updateStats", tasks.TaskUpdateStats, err)
}
}
func (c *Cron) vacuumDatabase() {
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameVacuum).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.vacuumDatabase", tasks.TaskNameVacuum, err)
}
}
func (c *Cron) deleteNonExistentVillages() {
err := c.queue.Add(queue.MainQueue, tasks.Get(tasks.TaskNameDeleteNonExistentVillages).WithArgs(context.Background()))
if err != nil {
c.logError("Cron.deleteNonExistentVillages", tasks.TaskNameDeleteNonExistentVillages, err)
}
}
func (c *Cron) logError(prefix string, taskName string, err error) {
c.log.Error(
errors.Wrapf(
err,
"%s: Couldn't add the task '%s' to the queue",
prefix,
taskName,
),
)
}
func initializeQueue(cfg *Config) (queue.Queue, error) {
q, err := queue.New(&queue.Config{
WorkerLimit: cfg.WorkerLimit,
Redis: cfg.Redis,
})
if err != nil {
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
}
err = tasks.RegisterTasks(&tasks.Config{
DB: cfg.DB,
Queue: q,
})
if err != nil {
return nil, errors.Wrap(err, "initializeQueue: Couldn't create the task queue")
}
return q, nil
}
func createFnWithTimezone(timezone string, fn func(timezone string)) func() {
return func() {
fn(timezone)
}
}

View File

@ -0,0 +1,18 @@
package queue
import (
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)
type Config struct {
Redis redis.UniversalClient
WorkerLimit int
}
func validateConfig(cfg *Config) error {
if cfg == nil || cfg.Redis == nil {
return errors.New("validateConfig: cfg.Redis is required")
}
return nil
}

View File

@ -0,0 +1,98 @@
package queue
import (
"context"
"github.com/pkg/errors"
"time"
"github.com/go-redis/redis/v8"
"github.com/vmihailenco/taskq/v3"
"github.com/vmihailenco/taskq/v3/redisq"
)
const (
MainQueue = "main"
EnnoblementsQueue = "ennoblements"
)
type Queue interface {
Start(ctx context.Context) error
Close() error
Add(name string, msg *taskq.Message) error
}
type queue struct {
redis redis.UniversalClient
mainQueue taskq.Queue
ennoblementsQueue taskq.Queue
factory taskq.Factory
}
func New(cfg *Config) (Queue, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
q := &queue{
redis: cfg.Redis,
}
if err := q.init(cfg); err != nil {
return nil, err
}
return q, nil
}
func (q *queue) init(cfg *Config) error {
q.factory = redisq.NewFactory()
q.mainQueue = q.registerQueue(MainQueue, cfg.WorkerLimit)
q.ennoblementsQueue = q.registerQueue(EnnoblementsQueue, cfg.WorkerLimit*2)
return nil
}
func (q *queue) registerQueue(name string, limit int) taskq.Queue {
return q.factory.RegisterQueue(&taskq.QueueOptions{
Name: name,
ReservationTimeout: time.Minute * 2,
Redis: q.redis,
MinNumWorker: int32(limit),
MaxNumWorker: int32(limit),
})
}
func (q *queue) getQueueByName(name string) taskq.Queue {
switch name {
case MainQueue:
return q.mainQueue
case EnnoblementsQueue:
return q.ennoblementsQueue
}
return nil
}
func (q *queue) Start(ctx context.Context) error {
if err := q.factory.StartConsumers(ctx); err != nil {
return errors.Wrap(err, "Couldn't start the queue")
}
return nil
}
func (q *queue) Close() error {
if err := q.factory.Close(); err != nil {
return errors.Wrap(err, "Couldn't close the queue")
}
return nil
}
func (q *queue) Add(name string, msg *taskq.Message) error {
queue := q.getQueueByName(name)
if queue == nil {
return errors.Errorf("Couldn't add the message to the queue: unknown queue name '%s'", name)
}
if err := queue.Add(msg); err != nil {
return errors.Wrap(err, "Couldn't add the message to the queue")
}
return nil
}

View File

@ -0,0 +1,133 @@
package tasks
import (
"github.com/tribalwarshelp/shared/tw/dataloader"
"net/http"
"time"
"github.com/tribalwarshelp/shared/models"
)
func countPlayerVillages(villages []*models.Village) int {
count := 0
for _, village := range villages {
if village.PlayerID != 0 {
count++
}
}
return count
}
func getDateDifferenceInDays(t1, t2 time.Time) int {
hours := t1.Sub(t2).Hours()
if hours == 0 {
return 0
}
return int(hours / 24)
}
func calcPlayerDailyGrowth(diffInDays, points int) int {
if diffInDays > 0 {
return points / diffInDays
}
return 0
}
func newHTTPClient() *http.Client {
return &http.Client{
Timeout: 10 * time.Second,
}
}
func newDataloader(url string) dataloader.DataLoader {
return dataloader.New(&dataloader.Config{
BaseURL: url,
Client: newHTTPClient(),
})
}
type playersSearchableByID struct {
players []*models.Player
}
func (searchable playersSearchableByID) getID(index int) int {
return searchable.players[index].ID
}
func (searchable playersSearchableByID) len() int {
return len(searchable.players)
}
type tribesSearchableByID struct {
tribes []*models.Tribe
}
func (searchable tribesSearchableByID) getID(index int) int {
return searchable.tribes[index].ID
}
func (searchable tribesSearchableByID) len() int {
return len(searchable.tribes)
}
type villagesSearchableByID struct {
villages []*models.Village
}
func (searchable villagesSearchableByID) getID(index int) int {
return searchable.villages[index].ID
}
func (searchable villagesSearchableByID) len() int {
return len(searchable.villages)
}
type ennoblementsSearchableByNewOwnerID struct {
ennoblements []*models.Ennoblement
}
func (searchable ennoblementsSearchableByNewOwnerID) getID(index int) int {
return searchable.ennoblements[index].NewOwnerID
}
func (searchable ennoblementsSearchableByNewOwnerID) len() int {
return len(searchable.ennoblements)
}
type searchableByID interface {
getID(index int) int
len() int
}
func makePlayersSearchable(players []*models.Player) searchableByID {
return playersSearchableByID{
players: players,
}
}
func makeTribesSearchable(tribes []*models.Tribe) searchableByID {
return tribesSearchableByID{
tribes: tribes,
}
}
func searchByID(haystack searchableByID, id int) int {
low := 0
high := haystack.len() - 1
for low <= high {
median := (low + high) / 2
if haystack.getID(median) < id {
low = median + 1
} else {
high = median - 1
}
}
if low == haystack.len() || haystack.getID(low) != id {
return -1
}
return low
}

View File

@ -0,0 +1,29 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"sync"
"time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type task struct {
db *pg.DB
queue queue.Queue
cachedLocations sync.Map
}
func (t *task) loadLocation(timezone string) (*time.Location, error) {
val, ok := t.cachedLocations.Load(timezone)
if ok {
return val.(*time.Location), nil
}
location, err := time.LoadLocation(timezone)
if err != nil {
return nil, errors.Wrap(err, "task.loadLocation")
}
t.cachedLocations.Store(timezone, location)
return location, nil
}

View File

@ -0,0 +1,55 @@
package tasks
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskDeleteNonExistentVillages struct {
*task
}
func (t *taskDeleteNonExistentVillages) execute() error {
var servers []*models.Server
err := t.db.
Model(&servers).
Relation("Version").
Where("status = ?", models.ServerStatusOpen).
Relation("Version").
Select()
if err != nil {
err = errors.Wrap(err, "taskDeleteNonExistentVillages.execute")
log.Errorln(err)
return err
}
log.
WithField("numberOfServers", len(servers)).
Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue")
for _, server := range servers {
s := server
err := t.queue.Add(
queue.MainQueue,
Get(TaskNameServerDeleteNonExistentVillages).
WithArgs(
context.Background(),
fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host),
s,
),
)
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskDeleteNonExistentVillages.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskNameServerDeleteNonExistentVillages,
),
)
}
}
return nil
}

View File

@ -0,0 +1,122 @@
package tasks
import (
"context"
"fmt"
phpserialize "github.com/Kichiyaki/go-php-serialize"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
"io/ioutil"
"net/http"
"github.com/tribalwarshelp/cron/internal/cron/queue"
"github.com/tribalwarshelp/cron/internal/db"
)
const (
endpointGetServers = "/backend/get_servers.php"
)
type taskLoadServersAndUpdateData struct {
*task
}
func (t *taskLoadServersAndUpdateData) execute(version *models.Version) error {
if err := t.validatePayload(version); err != nil {
log.Debug(err)
return nil
}
entry := log.WithField("host", version.Host)
entry.Infof("taskLoadServersAndUpdateData.execute: %s: Loading servers", version.Host)
data, err := t.getServers(version)
if err != nil {
log.Errorln(err)
return err
}
var serverKeys []string
var servers []*models.Server
for serverKey := range data {
if version.SpecialServers.Contains(serverKey) {
continue
}
server := &models.Server{
Key: serverKey,
Status: models.ServerStatusOpen,
VersionCode: version.Code,
Version: version,
}
if err := db.CreateSchema(t.db, server); err != nil {
logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: couldn't create the schema", server.Key))
continue
}
servers = append(servers, server)
serverKeys = append(serverKeys, serverKey)
}
if len(servers) > 0 {
if _, err := t.db.Model(&servers).
OnConflict("(key) DO UPDATE").
Set("status = ?", models.ServerStatusOpen).
Set("version_code = EXCLUDED.version_code").
Returning("*").
Insert(); err != nil {
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't insert/update servers")
logrus.Error(err)
return err
}
}
if _, err := t.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND version_code = ?", pg.In(serverKeys), version.Code).
Update(); err != nil {
err = errors.Wrap(err, "taskLoadServersAndUpdateData.execute: couldn't update server statuses")
logrus.Error(err)
return err
}
for _, server := range servers {
s := server
t.queue.Add(queue.MainQueue, Get(TaskNameUpdateServerData).WithArgs(context.Background(), data[s.Key], s))
}
entry.Infof("%s: Servers have been loaded", version.Host)
return nil
}
func (t *taskLoadServersAndUpdateData) validatePayload(version *models.Version) error {
if version == nil {
return errors.New("taskLoadServersAndUpdateData.validatePayload: Expected *models.Version, got nil")
}
return nil
}
func (t *taskLoadServersAndUpdateData) getServers(version *models.Version) (map[string]string, error) {
resp, err := http.Get(fmt.Sprintf("https://%s%s", version.Host, endpointGetServers))
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServersAndUpdateData.loadServers couldn't load servers", version.Host)
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServersAndUpdateData.loadServers couldn't read the response body", version.Host)
}
body, err := phpserialize.Decode(string(bodyBytes))
if err != nil {
return nil, errors.Wrapf(err, "%s: taskLoadServersAndUpdateData.loadServers couldn't decode the response body into the go value", version.Host)
}
result := make(map[string]string)
for serverKey, url := range body.(map[interface{}]interface{}) {
serverKeyStr := serverKey.(string)
urlStr := url.(string)
if serverKeyStr != "" && urlStr != "" {
result[serverKeyStr] = urlStr
}
}
return result, nil
}

View File

@ -0,0 +1,28 @@
package tasks
import (
"context"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskLoadVersionsAndUpdateServerData struct {
*task
}
func (t *taskLoadVersionsAndUpdateServerData) execute() error {
var versions []*models.Version
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Loading versions...")
if err := t.db.Model(&versions).Relation("SpecialServers").Select(); err != nil {
err = errors.Wrap(err, "taskLoadVersionsAndUpdateServerData.execute: couldn't load versions")
log.Fatal(err)
return err
}
for _, version := range versions {
t.queue.Add(queue.MainQueue, Get(TaskNameLoadServersAndUpdateData).WithArgs(context.Background(), version))
}
log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded")
return nil
}

View File

@ -0,0 +1,77 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
)
type taskServerDeleteNonExistentVillages struct {
*task
}
func (t *taskServerDeleteNonExistentVillages) execute(url string, server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
entry := log.WithField("key", server.Key)
entry.Infof("taskServerDeleteNonExistentVillages.execute: %s: Deleting non-existent villages...", server.Key)
err := (&workerDeleteNonExistentVillages{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
dataloader: newDataloader(url),
server: server,
}).delete()
if err != nil {
err = errors.Wrap(err, "taskServerDeleteNonExistentVillages.execute")
entry.Error(err)
return err
}
entry.Infof("taskServerDeleteNonExistentVillages.execute: %s: Non-existent villages have been deleted", server.Key)
return nil
}
func (t *taskServerDeleteNonExistentVillages) validatePayload(server *models.Server) error {
if server == nil {
return errors.New("taskUpdateServerData.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerDeleteNonExistentVillages struct {
db *pg.DB
dataloader dataloader.DataLoader
server *models.Server
}
func (w *workerDeleteNonExistentVillages) delete() error {
var villagesFromDB []*models.Village
if err := w.db.Model(&villagesFromDB).Column("id").Select(); err != nil {
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
}
villages, err := w.dataloader.LoadVillages()
if err != nil {
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
}
var idsToDelete []int
searchableByVillageID := &villagesSearchableByID{villages}
for _, village := range villagesFromDB {
index := searchByID(searchableByVillageID, village.ID)
if index < 0 {
idsToDelete = append(idsToDelete, village.ID)
}
}
totalDeleted := 0
if len(idsToDelete) > 0 {
result, err := w.db.Model(&models.Village{}).Where("id = ANY(?)", pg.Array(idsToDelete)).Delete()
if err != nil {
return errors.Wrap(err, "workerDeleteNonExistentVillages.delete")
}
totalDeleted = result.RowsAffected()
}
log.Debugf("%s: deleted %d villages", w.server.Key, totalDeleted)
return nil
}

View File

@ -0,0 +1,48 @@
package tasks
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskUpdateEnnoblements struct {
*task
}
func (t *taskUpdateEnnoblements) execute() error {
var servers []*models.Server
err := t.db.
Model(&servers).
Relation("Version").
Where("status = ?", models.ServerStatusOpen).
Select()
if err != nil {
err = errors.Wrap(err, "taskUpdateEnnoblements.execute")
log.Errorln(err)
return err
}
log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...")
for _, server := range servers {
s := server
err := t.queue.Add(
queue.EnnoblementsQueue,
Get(TaskUpdateServerEnnoblements).
WithArgs(context.Background(), fmt.Sprintf("https://%s.%s", server.Key, server.Version.Host), s),
)
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskUpdateEnnoblements.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerEnnoblements,
),
)
}
}
return nil
}

View File

@ -0,0 +1,60 @@
package tasks
import (
"context"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskUpdateHistory struct {
*task
}
func (t *taskUpdateHistory) execute(timezone string) error {
entry := log.WithField("timezone", timezone)
location, err := t.loadLocation(timezone)
if err != nil {
err = errors.Wrap(err, "taskUpdateHistory.execute")
entry.Error(err)
return err
}
year, month, day := time.Now().In(location).Date()
date := time.Date(year, month, day, 1, 30, 0, 0, location)
var servers []*models.Server
err = t.db.
Model(&servers).
Where(
"status = ? AND (history_updated_at IS NULL OR history_updated_at < ?) AND timezone = ?",
models.ServerStatusOpen,
date,
timezone,
).
Relation("Version").
Select()
if err != nil {
err = errors.Wrap(err, "taskUpdateHistory.execute")
entry.Errorln(err)
return err
}
entry.
WithField("numberOfServers", len(servers)).
Info("taskUpdateHistory.execute: Update of the history has started")
for _, server := range servers {
s := server
err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerHistory).WithArgs(context.Background(), timezone, s))
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskUpdateHistory.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerHistory,
),
)
}
}
return nil
}

View File

@ -1,27 +1,65 @@
package cron
package tasks
import (
"time"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
"time"
)
type updateServerDataWorker struct {
type taskUpdateServerData struct {
*task
}
func (t *taskUpdateServerData) execute(url string, server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
now := time.Now()
entry := log.WithField("key", server.Key)
entry.Infof("taskUpdateServerData.execute: %s: Update of the server data has started...", server.Key)
err := (&workerUpdateServerData{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
dataloader: newDataloader(url),
server: server,
}).update()
if err != nil {
err = errors.Wrap(err, "taskUpdateServerData.execute")
entry.Error(err)
return err
}
duration := time.Since(now)
entry.
WithFields(map[string]interface{}{
"duration": duration.Nanoseconds(),
"durationPretty": duration.String(),
}).
Infof("taskUpdateServerData.execute: %s: data has been updated", server.Key)
return nil
}
func (t *taskUpdateServerData) validatePayload(server *models.Server) error {
if server == nil {
return errors.New("taskUpdateServerData.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerUpdateServerData struct {
db *pg.DB
dataloader dataloader.DataLoader
server *models.Server
}
func (w *updateServerDataWorker) loadPlayers(od map[int]*models.OpponentsDefeated) ([]*models.Player, error) {
ennoblements := []*models.Ennoblement{}
func (w *workerUpdateServerData) loadPlayers(od map[int]*models.OpponentsDefeated) ([]*models.Player, error) {
var ennoblements []*models.Ennoblement
err := w.db.Model(&ennoblements).DistinctOn("new_owner_id").Order("new_owner_id ASC", "ennobled_at ASC").Select()
if err != nil {
return nil, errors.Wrap(err, "loadPlayers: cannot load ennoblements")
return nil, errors.Wrap(err, "workerUpdateServerData.loadPlayers: couldn't load ennoblements")
}
players, err := w.dataloader.LoadPlayers()
@ -46,7 +84,7 @@ func (w *updateServerDataWorker) loadPlayers(od map[int]*models.OpponentsDefeate
return players, nil
}
func (w *updateServerDataWorker) loadTribes(od map[int]*models.OpponentsDefeated, numberOfVillages int) ([]*models.Tribe, error) {
func (w *workerUpdateServerData) loadTribes(od map[int]*models.OpponentsDefeated, numberOfVillages int) ([]*models.Tribe, error) {
tribes, err := w.dataloader.LoadTribes()
if err != nil {
return nil, err
@ -65,7 +103,7 @@ func (w *updateServerDataWorker) loadTribes(od map[int]*models.OpponentsDefeated
return tribes, nil
}
func (w *updateServerDataWorker) calculateODifference(od1 models.OpponentsDefeated, od2 models.OpponentsDefeated) models.OpponentsDefeated {
func (w *workerUpdateServerData) calculateODifference(od1 models.OpponentsDefeated, od2 models.OpponentsDefeated) models.OpponentsDefeated {
return models.OpponentsDefeated{
RankAtt: (od1.RankAtt - od2.RankAtt) * -1,
ScoreAtt: od1.ScoreAtt - od2.ScoreAtt,
@ -78,9 +116,11 @@ func (w *updateServerDataWorker) calculateODifference(od1 models.OpponentsDefeat
}
}
func (w *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Tribe,
history []*models.TribeHistory) []*models.DailyTribeStats {
todaysStats := []*models.DailyTribeStats{}
func (w *workerUpdateServerData) calculateTodaysTribeStats(
tribes []*models.Tribe,
history []*models.TribeHistory,
) []*models.DailyTribeStats {
var todaysStats []*models.DailyTribeStats
searchableTribes := makeTribesSearchable(tribes)
for _, historyRecord := range history {
@ -103,9 +143,9 @@ func (w *updateServerDataWorker) calculateTodaysTribeStats(tribes []*models.Trib
return todaysStats
}
func (w *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Player,
func (w *workerUpdateServerData) calculateDailyPlayerStats(players []*models.Player,
history []*models.PlayerHistory) []*models.DailyPlayerStats {
todaysStats := []*models.DailyPlayerStats{}
var todaysStats []*models.DailyPlayerStats
searchablePlayers := makePlayersSearchable(players)
for _, historyRecord := range history {
@ -125,55 +165,62 @@ func (w *updateServerDataWorker) calculateDailyPlayerStats(players []*models.Pla
return todaysStats
}
func (w *updateServerDataWorker) update() error {
func (w *workerUpdateServerData) update() error {
pod, err := w.dataloader.LoadOD(false)
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
tod, err := w.dataloader.LoadOD(true)
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
villages, err := w.dataloader.LoadVillages()
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
numberOfVillages := len(villages)
tribes, err := w.loadTribes(tod, countPlayerVillages(villages))
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
numberOfTribes := len(tribes)
players, err := w.loadPlayers(pod)
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
numberOfPlayers := len(players)
cfg, err := w.dataloader.GetConfig()
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
buildingCfg, err := w.dataloader.GetBuildingConfig()
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
unitCfg, err := w.dataloader.GetUnitConfig()
if err != nil {
return err
return errors.Wrap(err, "workerUpdateServerData.update")
}
tx, err := w.db.Begin()
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if len(tribes) > 0 {
ids := []int{}
var ids []int
for _, tribe := range tribes {
ids = append(ids, tribe.ID)
}
@ -191,23 +238,23 @@ func (w *updateServerDataWorker) update() error {
Set("dominance = EXCLUDED.dominance").
Apply(appendODSetClauses).
Insert(); err != nil {
return errors.Wrap(err, "cannot insert tribes")
return errors.Wrap(err, "couldn't insert tribes")
}
if _, err := tx.Model(&tribes).
Where("NOT (tribe.id = ANY (?))", pg.Array(ids)).
Set("exists = false").
Update(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot update nonexistent tribes")
return errors.Wrap(err, "couldn't update non-existent tribes")
}
tribesHistory := []*models.TribeHistory{}
var tribesHistory []*models.TribeHistory
if err := w.db.Model(&tribesHistory).
DistinctOn("tribe_id").
Column("*").
Where("tribe_id = ANY (?)", pg.Array(ids)).
Order("tribe_id DESC", "create_date DESC").
Select(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot select tribe history records")
return errors.Wrap(err, "couldn't select tribe history records")
}
todaysTribeStats := w.calculateTodaysTribeStats(tribes, tribesHistory)
if len(todaysTribeStats) > 0 {
@ -222,13 +269,13 @@ func (w *updateServerDataWorker) update() error {
Set("dominance = EXCLUDED.dominance").
Apply(appendODSetClauses).
Insert(); err != nil {
return errors.Wrap(err, "cannot insert today's tribe stats")
return errors.Wrap(err, "couldn't insert today's tribe stats")
}
}
}
if len(players) > 0 {
ids := []int{}
var ids []int
for _, player := range players {
ids = append(ids, player.ID)
}
@ -243,23 +290,23 @@ func (w *updateServerDataWorker) update() error {
Set("daily_growth = EXCLUDED.daily_growth").
Apply(appendODSetClauses).
Insert(); err != nil {
return errors.Wrap(err, "cannot insert players")
return errors.Wrap(err, "couldn't insert players")
}
if _, err := tx.Model(&models.Player{}).
Where("NOT (player.id = ANY (?))", pg.Array(ids)).
Set("exists = false").
Set("tribe_id = 0").
Update(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot update nonexistent players")
return errors.Wrap(err, "couldn't update non-existent players")
}
playerHistory := []*models.PlayerHistory{}
var playerHistory []*models.PlayerHistory
if err := w.db.Model(&playerHistory).
DistinctOn("player_id").
Column("*").
Where("player_id = ANY (?)", pg.Array(ids)).
Order("player_id DESC", "create_date DESC").Select(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot select player history records")
return errors.Wrap(err, "couldn't select player history records")
}
todaysPlayerStats := w.calculateDailyPlayerStats(players, playerHistory)
if len(todaysPlayerStats) > 0 {
@ -271,17 +318,12 @@ func (w *updateServerDataWorker) update() error {
Set("rank = EXCLUDED.rank").
Apply(appendODSetClauses).
Insert(); err != nil {
return errors.Wrap(err, "cannot insert today's player stats")
return errors.Wrap(err, "couldn't insert today's player stats")
}
}
}
if len(villages) > 0 {
if _, err := tx.Model(&models.Village{}).
Where("true").
Delete(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot delete villages")
}
if _, err := tx.Model(&villages).
OnConflict("(id) DO UPDATE").
Set("name = EXCLUDED.name").
@ -291,7 +333,7 @@ func (w *updateServerDataWorker) update() error {
Set("bonus = EXCLUDED.bonus").
Set("player_id = EXCLUDED.player_id").
Insert(); err != nil {
return errors.Wrap(err, "cannot insert villages")
return errors.Wrap(err, "couldn't insert villages")
}
}
@ -306,7 +348,7 @@ func (w *updateServerDataWorker) update() error {
Returning("*").
WherePK().
Update(); err != nil {
return errors.Wrap(err, "cannot update server")
return errors.Wrap(err, "couldn't update server")
}
if err := tx.Commit(); err != nil {

View File

@ -0,0 +1,76 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/shared/tw/dataloader"
)
type taskUpdateServerEnnoblements struct {
*task
}
func (t *taskUpdateServerEnnoblements) execute(url string, server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
entry := log.WithField("key", server.Key)
entry.Debugf("%s: update of the ennoblements has started...", server.Key)
err := (&workerUpdateServerEnnoblements{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
dataloader: newDataloader(url),
}).update()
if err != nil {
err = errors.Wrap(err, "taskUpdateServerEnnoblements.execute")
entry.Error(err)
return err
}
entry.Debugf("%s: ennoblements have been updated", server.Key)
return nil
}
func (t *taskUpdateServerEnnoblements) validatePayload(server *models.Server) error {
if server == nil {
return errors.Errorf("taskUpdateServerEnnoblements.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerUpdateServerEnnoblements struct {
db *pg.DB
dataloader dataloader.DataLoader
}
func (w *workerUpdateServerEnnoblements) loadEnnoblements() ([]*models.Ennoblement, error) {
lastEnnoblement := &models.Ennoblement{}
if err := w.db.
Model(lastEnnoblement).
Limit(1).
Order("ennobled_at DESC").
Select(); err != nil && err != pg.ErrNoRows {
return nil, errors.Wrapf(err, "couldn't load last ennoblement")
}
return w.dataloader.LoadEnnoblements(&dataloader.LoadEnnoblementsConfig{
EnnobledAtGT: lastEnnoblement.EnnobledAt,
})
}
func (w *workerUpdateServerEnnoblements) update() error {
ennoblements, err := w.loadEnnoblements()
if err != nil {
return err
}
if len(ennoblements) > 0 {
if _, err := w.db.Model(&ennoblements).Insert(); err != nil {
return errors.Wrap(err, "couldn't insert ennoblements")
}
}
return nil
}

View File

@ -0,0 +1,128 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"time"
)
type taskUpdateServerHistory struct {
*task
}
func (t *taskUpdateServerHistory) execute(timezone string, server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
location, err := t.loadLocation(timezone)
if err != nil {
err = errors.Wrap(err, "taskUpdateServerHistory.execute")
log.Error(err)
return err
}
entry := log.WithField("key", server.Key)
entry.Infof("taskUpdateServerHistory.execute: %s: update of the history has started...", server.Key)
err = (&workerUpdateServerHistory{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
location: location,
}).update()
if err != nil {
err = errors.Wrap(err, "taskUpdateServerHistory.execute")
entry.Error(err)
return err
}
entry.Infof("taskUpdateServerHistory.execute: %s: history has been updated", server.Key)
return nil
}
func (t *taskUpdateServerHistory) validatePayload(server *models.Server) error {
if server == nil {
return errors.New("taskUpdateServerHistory.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerUpdateServerHistory struct {
db *pg.DB
server *models.Server
location *time.Location
}
func (w *workerUpdateServerHistory) update() error {
var players []*models.Player
if err := w.db.Model(&players).Where("exists = true").Select(); err != nil {
return errors.Wrap(err, "couldn't load players")
}
now := time.Now().In(w.location)
createDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
var ph []*models.PlayerHistory
for _, player := range players {
ph = append(ph, &models.PlayerHistory{
OpponentsDefeated: player.OpponentsDefeated,
PlayerID: player.ID,
TotalVillages: player.TotalVillages,
Points: player.Points,
Rank: player.Rank,
TribeID: player.TribeID,
CreateDate: createDate,
})
}
var tribes []*models.Tribe
if err := w.db.Model(&tribes).Where("exists = true").Select(); err != nil {
return errors.Wrap(err, "couldn't load tribes")
}
var th []*models.TribeHistory
for _, tribe := range tribes {
th = append(th, &models.TribeHistory{
OpponentsDefeated: tribe.OpponentsDefeated,
TribeID: tribe.ID,
TotalMembers: tribe.TotalMembers,
TotalVillages: tribe.TotalVillages,
Points: tribe.Points,
AllPoints: tribe.AllPoints,
Rank: tribe.Rank,
Dominance: tribe.Dominance,
CreateDate: createDate,
})
}
tx, err := w.db.Begin()
if err != nil {
return err
}
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if len(ph) > 0 {
if _, err := w.db.Model(&ph).Insert(); err != nil {
return errors.Wrap(err, "couldn't insert players history")
}
}
if len(th) > 0 {
if _, err := w.db.Model(&th).Insert(); err != nil {
return errors.Wrap(err, "couldn't insert tribes history")
}
}
if _, err := tx.Model(w.server).
Set("history_updated_at = ?", time.Now()).
WherePK().
Returning("*").
Update(); err != nil {
return errors.Wrap(err, "couldn't update server")
}
return tx.Commit()
}

View File

@ -1,20 +1,59 @@
package cron
package tasks
import (
"time"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"time"
)
type updateServerStatsWorker struct {
type taskUpdateServerStats struct {
*task
}
func (t *taskUpdateServerStats) execute(timezone string, server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
location, err := t.loadLocation(timezone)
if err != nil {
err = errors.Wrap(err, "taskUpdateServerStats.execute")
log.Error(err)
return err
}
entry := log.WithField("key", server.Key)
entry.Infof("taskUpdateServerStats.execute: %s: update of the stats has started...", server.Key)
err = (&workerUpdateServerStats{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
location: location,
}).update()
if err != nil {
err = errors.Wrap(err, "taskUpdateServerStats.execute")
entry.Error(err)
return err
}
entry.Infof("taskUpdateServerStats.execute: %s: stats have been updated", server.Key)
return nil
}
func (t *taskUpdateServerStats) validatePayload(server *models.Server) error {
if server == nil {
return errors.New("taskUpdateServerStats.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerUpdateServerStats struct {
db *pg.DB
server *models.Server
location *time.Location
}
func (w *updateServerStatsWorker) prepare() (*models.ServerStats, error) {
func (w *workerUpdateServerStats) prepare() (*models.ServerStats, error) {
activePlayers, err := w.db.Model(&models.Player{}).Where("exists = true").Count()
if err != nil {
return nil, errors.Wrap(err, "cannot count active players")
@ -71,7 +110,7 @@ func (w *updateServerStatsWorker) prepare() (*models.ServerStats, error) {
}, nil
}
func (w *updateServerStatsWorker) update() error {
func (w *workerUpdateServerStats) update() error {
stats, err := w.prepare()
if err != nil {
return err
@ -81,10 +120,14 @@ func (w *updateServerStatsWorker) update() error {
if err != nil {
return err
}
defer tx.Close()
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
if _, err := tx.Model(stats).Insert(); err != nil {
return errors.Wrap(err, "cannot insert server stats")
return errors.Wrap(err, "couldn't insert server stats")
}
_, err = tx.Model(w.server).
@ -93,7 +136,7 @@ func (w *updateServerStatsWorker) update() error {
Returning("*").
Update()
if err != nil {
return errors.Wrap(err, "cannot update server")
return errors.Wrap(err, "couldn't update the server")
}
return tx.Commit()

View File

@ -0,0 +1,60 @@
package tasks
import (
"context"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"time"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskUpdateStats struct {
*task
}
func (t *taskUpdateStats) execute(timezone string) error {
entry := log.WithField("timezone", timezone)
location, err := t.loadLocation(timezone)
if err != nil {
err = errors.Wrap(err, "taskUpdateStats.execute")
entry.Error(err)
return err
}
year, month, day := time.Now().In(location).Date()
date := time.Date(year, month, day, 1, 45, 0, 0, location)
var servers []*models.Server
err = t.db.
Model(&servers).
Where(
"status = ? AND (stats_updated_at IS NULL OR stats_updated_at < ?) AND timezone = ?",
models.ServerStatusOpen,
date,
location.String(),
).
Relation("Version").
Select()
if err != nil {
err = errors.Wrap(err, "taskUpdateStats.execute")
entry.Errorln(err)
return err
}
entry.
WithField("numberOfServers", len(servers)).
Info("taskUpdateStats.execute: Update of the stats has started")
for _, server := range servers {
s := server
err := t.queue.Add(queue.MainQueue, Get(TaskUpdateServerStats).WithArgs(context.Background(), timezone, s))
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskUpdateStats.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerStats,
),
)
}
}
return nil
}

View File

@ -0,0 +1,41 @@
package tasks
import (
"context"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
type taskVacuum struct {
*task
}
func (t *taskVacuum) execute() error {
var servers []*models.Server
err := t.db.
Model(&servers).
Select()
if err != nil {
err = errors.Wrap(err, "taskVacuum.execute")
log.Errorln(err)
return err
}
log.Infof("taskVacuum.execute: Start database vacumming...")
for _, server := range servers {
s := server
err := t.queue.Add(queue.MainQueue, Get(TaskNameVacuumServerDB).WithArgs(context.Background(), s))
if err != nil {
log.Warn(
errors.Wrapf(
err,
"taskVacuum.execute: %s: Couldn't add the task '%s' for this server",
server.Key,
TaskUpdateServerEnnoblements,
),
)
}
}
return nil
}

View File

@ -0,0 +1,99 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/tribalwarshelp/shared/models"
"time"
)
const (
day = 24 * time.Hour
)
type taskVacuumServerDB struct {
*task
}
func (t *taskVacuumServerDB) execute(server *models.Server) error {
if err := t.validatePayload(server); err != nil {
log.Debug(err)
return nil
}
entry := log.WithField("key", server.Key)
entry.Infof("taskVacuumServerDB.execute: %s: vacumming the database...", server.Key)
err := (&workerVacuumServerDB{
db: t.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}).vacuum()
if err != nil {
err = errors.Wrap(err, "taskVacuumServerDB.execute")
entry.Error(err)
return err
}
entry.Infof("taskVacuumServerDB.execute: %s: the database has been vacummed", server.Key)
return nil
}
func (t *taskVacuumServerDB) validatePayload(server *models.Server) error {
if server == nil {
return errors.New("taskVacuumServerDB.validatePayload: Expected *models.Server, got nil")
}
return nil
}
type workerVacuumServerDB struct {
db *pg.DB
server *models.Server
}
func (w *workerVacuumServerDB) vacuum() error {
tx, err := w.db.Begin()
if err != nil {
return err
}
defer func(s *models.Server) {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrapf(err, "%s: Couldn't rollback the transaction", s.Key))
}
}(w.server)
withNonExistentPlayers := w.db.Model(&models.Player{}).Column("id").Where("exists = false and NOW() - deleted_at > '14 days'")
withNonExistentTribes := w.db.Model(&models.Tribe{}).Column("id").Where("exists = false and NOW() - deleted_at > '1 days'")
_, err = tx.Model(&models.PlayerHistory{}).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "couldn't delete the old player history records")
}
_, err = tx.Model(&models.TribeHistory{}).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "couldn't delete the old tribe history records")
}
_, err = tx.Model(&models.DailyPlayerStats{}).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "couldn't delete the old player stats records")
}
_, err = tx.Model(&models.DailyTribeStats{}).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*180)).
Delete()
if err != nil {
return errors.Wrap(err, "couldn't delete the old tribe stats records")
}
return tx.Commit()
}

View File

@ -0,0 +1,126 @@
package tasks
import (
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmihailenco/taskq/v3"
"github.com/tribalwarshelp/cron/internal/cron/queue"
)
const (
TaskNameLoadVersionsAndUpdateServerData = "loadVersionsAndUpdateServerData"
TaskNameLoadServersAndUpdateData = "loadServersAndUpdateData"
TaskNameUpdateServerData = "updateServerData"
TaskNameVacuum = "vacuum"
TaskNameVacuumServerDB = "vacuumServerDB"
TaskUpdateEnnoblements = "updateEnnoblements"
TaskUpdateServerEnnoblements = "updateServerEnnoblements"
TaskUpdateHistory = "updateHistory"
TaskUpdateServerHistory = "updateServerHistory"
TaskUpdateStats = "updateStats"
TaskUpdateServerStats = "updateServerStats"
TaskNameDeleteNonExistentVillages = "deleteNonExistentVillages"
TaskNameServerDeleteNonExistentVillages = "serverDeleteNonExistentVillages"
defaultRetryLimit = 3
)
var log = logrus.WithField("package", "cron/tasks")
type Config struct {
DB *pg.DB
Queue queue.Queue
}
func RegisterTasks(cfg *Config) error {
if err := validateConfig(cfg); err != nil {
return errors.Wrap(err, "RegisterTasks")
}
t := &task{
db: cfg.DB,
queue: cfg.Queue,
}
options := []*taskq.TaskOptions{
{
Name: TaskNameLoadVersionsAndUpdateServerData,
Handler: (&taskLoadVersionsAndUpdateServerData{t}).execute,
},
{
Name: TaskNameLoadServersAndUpdateData,
Handler: (&taskLoadServersAndUpdateData{t}).execute,
},
{
Name: TaskNameUpdateServerData,
Handler: (&taskUpdateServerData{t}).execute,
},
{
Name: TaskNameVacuum,
Handler: (&taskVacuum{t}).execute,
},
{
Name: TaskNameVacuumServerDB,
Handler: (&taskVacuumServerDB{t}).execute,
},
{
Name: TaskUpdateEnnoblements,
Handler: (&taskUpdateEnnoblements{t}).execute,
},
{
Name: TaskUpdateServerEnnoblements,
Handler: (&taskUpdateServerEnnoblements{t}).execute,
},
{
Name: TaskUpdateHistory,
Handler: (&taskUpdateHistory{t}).execute,
},
{
Name: TaskUpdateServerHistory,
RetryLimit: defaultRetryLimit,
Handler: (&taskUpdateServerHistory{t}).execute,
},
{
Name: TaskUpdateStats,
Handler: (&taskUpdateStats{t}).execute,
},
{
Name: TaskUpdateServerStats,
Handler: (&taskUpdateServerStats{t}).execute,
},
{
Name: TaskNameDeleteNonExistentVillages,
Handler: (&taskDeleteNonExistentVillages{t}).execute,
},
{
Name: TaskNameServerDeleteNonExistentVillages,
Handler: (&taskServerDeleteNonExistentVillages{t}).execute,
},
}
for _, taskOptions := range options {
opts := taskOptions
if opts.RetryLimit == 0 {
opts.RetryLimit = defaultRetryLimit
}
taskq.RegisterTask(opts)
}
return nil
}
func Get(taskName string) *taskq.Task {
return taskq.Tasks.Get(taskName)
}
func validateConfig(cfg *Config) error {
if cfg == nil {
return errors.New("Config hasn't been provided")
}
if cfg.DB == nil {
return errors.New("cfg.DB is required")
}
if cfg.Queue == nil {
return errors.New("cfg.Queue is required")
}
return nil
}

197
internal/db/db.go Normal file
View File

@ -0,0 +1,197 @@
package db
import (
"fmt"
gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/models"
envutils "github.com/tribalwarshelp/cron/internal/utils/env"
)
var log = logrus.WithField("package", "db")
type Config struct {
LogQueries bool
}
func New(cfg *Config) (*pg.DB, error) {
db := pg.Connect(prepareOptions())
if cfg != nil && cfg.LogQueries {
db.AddQueryHook(gopglogrusquerylogger.QueryLogger{
Log: log,
MaxQueryLength: 5000,
})
}
if err := prepareDB(db); err != nil {
return nil, errors.Wrap(err, "New")
}
return db, nil
}
func prepareOptions() *pg.Options {
return &pg.Options{
User: envutils.GetenvString("DB_USER"),
Password: envutils.GetenvString("DB_PASSWORD"),
Database: envutils.GetenvString("DB_NAME"),
Addr: envutils.GetenvString("DB_HOST") + ":" + envutils.GetenvString("DB_PORT"),
PoolSize: envutils.GetenvInt("DB_POOL_SIZE"),
}
}
func prepareDB(db *pg.DB) error {
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "Couldn't start a transaction")
}
defer func() {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrap(err, "prepareDB: Couldn't rollback the transaction"))
}
}()
dbModels := []interface{}{
(*models.SpecialServer)(nil),
(*models.Server)(nil),
(*models.Version)(nil),
(*models.PlayerToServer)(nil),
(*models.PlayerNameChange)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return errors.Wrap(err, "Couldn't create the table")
}
}
type statementWithParams struct {
statement string
params []interface{}
}
for _, s := range []statementWithParams{
{
statement: pgDefaultValues,
},
{
statement: allVersionsPGInsertStatements,
},
{
statement: allSpecialServersPGInsertStatements,
},
{
statement: pgDropSchemaFunctions,
params: []interface{}{pg.Safe("public")},
},
{
statement: pgFunctions,
},
} {
if _, err := tx.Exec(s.statement, s.params...); err != nil {
return errors.Wrap(err, "Couldn't initialize the db")
}
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "Couldn't commit changes")
}
var servers []*models.Server
if err := db.Model(&servers).Select(); err != nil {
return errors.Wrap(err, "Couldn't load servers")
}
for _, server := range servers {
if err := createSchema(db, server, true); err != nil {
return err
}
}
return nil
}
func CreateSchema(db *pg.DB, server *models.Server) error {
return createSchema(db, server, false)
}
func SchemaExists(db *pg.DB, schemaName string) bool {
exists, err := db.
Model().
Table("information_schema.schemata").
Where("schema_name = ?", schemaName).
Exists()
if err != nil {
return false
}
return exists
}
func createSchema(db *pg.DB, server *models.Server, init bool) error {
if !init && SchemaExists(db, server.Key) {
return nil
}
tx, err := db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
if err != nil {
return errors.Wrap(err, "CreateSchema: couldn't start a transaction")
}
defer func() {
if err := tx.Close(); err != nil {
log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction"))
}
}()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't create the schema")
}
dbModels := []interface{}{
(*models.Tribe)(nil),
(*models.Player)(nil),
(*models.Village)(nil),
(*models.Ennoblement)(nil),
(*models.ServerStats)(nil),
(*models.TribeHistory)(nil),
(*models.PlayerHistory)(nil),
(*models.TribeChange)(nil),
(*models.DailyPlayerStats)(nil),
(*models.DailyTribeStats)(nil),
}
for _, model := range dbModels {
err := tx.Model(model).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
statements := []string{
serverPGFunctions,
serverPGTriggers,
serverPGDefaultValues,
}
if init {
statements = append([]string{pgDropSchemaFunctions}, statements...)
}
for _, statement := range statements {
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.VersionCode); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't initialize the schema")
}
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "CreateSchema: couldn't commit changes")
}
return nil
}

View File

@ -1,4 +1,4 @@
package cron
package db
const (
allSpecialServersPGInsertStatements = `

30
internal/utils/env/env.go vendored Normal file
View File

@ -0,0 +1,30 @@
package envutils
import (
"os"
"strconv"
)
func GetenvInt(key string) int {
str := GetenvString(key)
if str == "" {
return 0
}
i, err := strconv.Atoi(str)
if err != nil {
return 0
}
return i
}
func GetenvBool(key string) bool {
str := GetenvString(key)
if str == "" {
return false
}
return str == "true" || str == "1"
}
func GetenvString(key string) string {
return os.Getenv(key)
}

119
main.go
View File

@ -1,71 +1,72 @@
package main
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/tribalwarshelp/shared/mode"
_cron "github.com/tribalwarshelp/cron/cron"
twhelpcron "github.com/tribalwarshelp/cron/internal/cron"
"github.com/tribalwarshelp/cron/internal/db"
envutils "github.com/tribalwarshelp/cron/internal/utils/env"
gopglogrusquerylogger "github.com/Kichiyaki/go-pg-logrus-query-logger/v10"
"github.com/go-pg/pg/v10"
"github.com/joho/godotenv"
"github.com/robfig/cron/v3"
)
func init() {
os.Setenv("TZ", "UTC")
if mode.Get() == mode.DevelopmentMode {
godotenv.Load(".env.local")
if err := setupENVs(); err != nil {
logrus.Fatal(err)
}
setupLogger()
}
func main() {
dbOptions := &pg.Options{
User: os.Getenv("DB_USER"),
Password: os.Getenv("DB_PASSWORD"),
Database: os.Getenv("DB_NAME"),
Addr: os.Getenv("DB_HOST") + ":" + os.Getenv("DB_PORT"),
PoolSize: mustParseEnvToInt("DB_POOL_SIZE"),
redisClient, err := initializeRedis()
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis"))
}
dbFields := logrus.Fields{
"user": dbOptions.User,
"database": dbOptions.Database,
"addr": dbOptions.Addr,
}
db := pg.Connect(dbOptions)
defer func() {
if err := db.Close(); err != nil {
logrus.WithFields(dbFields).Fatalln(err)
if err := redisClient.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection"))
}
}()
if strings.ToUpper(os.Getenv("LOG_DB_QUERIES")) == "TRUE" {
db.AddQueryHook(gopglogrusquerylogger.QueryLogger{
Entry: logrus.NewEntry(logrus.StandardLogger()),
})
}
logrus.WithFields(dbFields).Info("Connection with the database has been established")
c := cron.New(cron.WithChain(
cron.SkipIfStillRunning(cron.PrintfLogger(logrus.WithField("package", "cron"))),
))
if err := _cron.Attach(c, _cron.Config{
DB: db,
MaxConcurrentWorkers: mustParseEnvToInt("MAX_CONCURRENT_WORKERS"),
RunOnStartup: os.Getenv("RUN_ON_STARTUP") == "true",
}); err != nil {
dbConn, err := db.New(&db.Config{LogQueries: envutils.GetenvBool("LOG_DB_QUERIES")})
if err != nil {
logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db"))
}
defer func() {
if err := dbConn.Close(); err != nil {
logrus.Warn(errors.Wrap(err, "Couldn't close the db connection"))
}
}()
c, err := twhelpcron.New(&twhelpcron.Config{
DB: dbConn,
RunOnInit: envutils.GetenvBool("RUN_ON_INIT"),
Redis: redisClient,
WorkerLimit: envutils.GetenvInt("WORKER_LIMIT"),
Opts: []cron.Option{
cron.WithChain(
cron.SkipIfStillRunning(
cron.PrintfLogger(logrus.WithField("package", "cron")),
),
),
},
})
if err != nil {
logrus.Fatal(err)
}
if err := c.Start(context.Background()); err != nil {
logrus.Fatal(err)
}
c.Start()
defer c.Stop()
logrus.Info("Cron is running!")
@ -74,18 +75,25 @@ func main() {
<-channel
logrus.Info("shutting down")
if err := c.Stop(); err != nil {
logrus.Fatal(err)
}
}
func mustParseEnvToInt(key string) int {
str := os.Getenv(key)
if str == "" {
return 0
}
i, err := strconv.Atoi(str)
func setupENVs() error {
err := os.Setenv("TZ", "UTC")
if err != nil {
return 0
return errors.Wrap(err, "setupENVs")
}
return i
if mode.Get() == mode.DevelopmentMode {
err := godotenv.Load(".env.local")
if err != nil {
return errors.Wrap(err, "setupENVs")
}
}
return nil
}
func setupLogger() {
@ -105,3 +113,18 @@ func setupLogger() {
logrus.SetFormatter(customFormatter)
}
}
func initializeRedis() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: envutils.GetenvString("REDIS_ADDR"),
Username: envutils.GetenvString("REDIS_USERNAME"),
Password: envutils.GetenvString("REDIS_PASSWORD"),
DB: envutils.GetenvInt("REDIS_DB"),
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, errors.Wrap(err, "initializeRedis")
}
return client, nil
}