This repository has been archived on 2022-09-04. You can view files and clone it, but cannot push or open issues or pull requests.
dataupdater/cron/handler.go

378 lines
8.5 KiB
Go

package cron
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"runtime"
"sync"
"time"
"github.com/tribalwarshelp/shared/models"
phpserialize "github.com/Kichiyaki/go-php-serialize"
"github.com/robfig/cron/v3"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/pkg/errors"
)
const (
endpointGetServers = "/backend/get_servers.php"
)
type handler struct {
db *pg.DB
}
type Config struct {
DB *pg.DB
}
func Attach(c *cron.Cron, cfg Config) error {
h := &handler{cfg.DB}
if err := h.init(); err != nil {
return err
}
if _, err := c.AddFunc("0 * * * *", h.updateServersData); err != nil {
return err
}
if _, err := c.AddFunc("30 0 * * *", h.updateServerHistory); err != nil {
return err
}
if _, err := c.AddFunc("30 1 * * *", h.vacuumDatabase); err != nil {
return err
}
if _, err := c.AddFunc("30 2 * * *", h.updateStats); err != nil {
return err
}
go func() {
h.updateServersData()
h.vacuumDatabase()
h.updateServerHistory()
h.updateStats()
}()
return nil
}
func (h *handler) init() error {
tx, err := h.db.Begin()
if err != nil {
return err
}
defer tx.Close()
models := []interface{}{
(*models.SpecialServer)(nil),
(*models.Server)(nil),
(*models.LangVersion)(nil),
(*models.PlayerToServer)(nil),
(*models.PlayerNameChange)(nil),
}
for _, model := range models {
err := tx.CreateTable(model, &orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
for _, statement := range []string{
pgDefaultValues,
allVersionsPGInsertStatements,
allSpecialServersPGInsertStatements,
} {
if _, err := tx.Exec(statement); err != nil {
return err
}
}
return tx.Commit()
}
func (h *handler) createSchema(server *models.Server) error {
tx, err := h.db.WithParam("SERVER", pg.Safe(server.Key)).Begin()
if err != nil {
return err
}
defer tx.Close()
if _, err := tx.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", server.Key)); err != nil {
return err
}
models := []interface{}{
(*models.Tribe)(nil),
(*models.Player)(nil),
(*models.Village)(nil),
(*models.Ennoblement)(nil),
(*models.ServerStats)(nil),
(*models.TribeHistory)(nil),
(*models.PlayerHistory)(nil),
(*models.TribeChange)(nil),
(*models.DailyPlayerStats)(nil),
(*models.DailyTribeStats)(nil),
}
for _, model := range models {
err := tx.CreateTable(model, &orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
return err
}
}
for _, statement := range []string{
serverPGFunctions,
serverPGTriggers,
serverPGDefaultValues,
} {
if _, err := tx.Exec(statement, pg.Safe(server.Key), server.LangVersionTag); err != nil {
return err
}
}
return tx.Commit()
}
func (h *handler) getServers() ([]*models.Server, map[string]string, error) {
log.Print("Loading servers...")
langVersions := []*models.LangVersion{}
if err := h.db.Model(&langVersions).Relation("SpecialServers").Select(); err != nil {
return nil, nil, errors.Wrap(err, "getServers")
}
serverKeys := []string{}
servers := []*models.Server{}
urls := make(map[string]string)
loadedLangVersions := []models.LanguageTag{}
for _, langVersion := range langVersions {
resp, err := http.Get(fmt.Sprintf("https://%s%s", langVersion.Host, endpointGetServers))
if err != nil {
log.Print(errors.Wrapf(err, "Cannot fetch servers from %s", langVersion.Host))
continue
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Print(errors.Wrapf(err, "Cannot read response body from %s", langVersion.Host))
continue
}
body, err := phpserialize.Decode(string(bodyBytes))
if err != nil {
log.Print(errors.Wrapf(err, "Cannot serialize body from %s into go value", langVersion.Host))
continue
}
for serverKey, url := range body.(map[interface{}]interface{}) {
serverKeyStr := serverKey.(string)
if langVersion.SpecialServers.Contains(serverKeyStr) {
continue
}
server := &models.Server{
Key: serverKeyStr,
Status: models.ServerStatusOpen,
LangVersionTag: langVersion.Tag,
LangVersion: langVersion,
}
if err := h.createSchema(server); err != nil {
log.Print(errors.Wrapf(err, "Cannot create schema for %s", serverKey))
continue
}
serverKeys = append(serverKeys, serverKeyStr)
urls[serverKeyStr] = url.(string)
servers = append(servers, server)
}
loadedLangVersions = append(loadedLangVersions, langVersion.Tag)
}
if len(servers) > 0 {
if _, err := h.db.Model(&servers).
OnConflict("(key) DO UPDATE").
Set("status = ?", models.ServerStatusOpen).
Set("lang_version_tag = EXCLUDED.lang_version_tag").
Returning("*").
Insert(); err != nil {
return nil, nil, err
}
}
if _, err := h.db.Model(&models.Server{}).
Set("status = ?", models.ServerStatusClosed).
Where("key NOT IN (?) AND lang_version_tag IN (?)", pg.In(serverKeys), pg.In(loadedLangVersions)).
Update(); err != nil {
return nil, nil, err
}
log.Print("Servers loaded!")
return servers, urls, nil
}
func (h *handler) updateServersData() {
servers, urls, err := h.getServers()
if err != nil {
log.Println(err.Error())
return
}
for _, server := range servers {
url, ok := urls[server.Key]
if !ok {
log.Printf("No one URL associated with key: %s, skipping...", server.Key)
continue
}
sh := &updateServerDataHandler{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
baseURL: url,
server: server,
}
log.Printf("%s: updating data", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
} else {
log.Printf("%s: data updated", server.Key)
}
}
}
func (h *handler) updateServerHistory() {
servers := []*models.Server{}
now := time.Now()
t1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 30, 0, 0, time.UTC)
err := h.db.
Model(&servers).
Where("status = ? AND (history_updated_at < ? OR history_updated_at IS NULL)", models.ServerStatusOpen, t1).
Select()
if err != nil {
log.Println(errors.Wrap(err, "updateServerHistory"))
return
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &updateServerHistoryHandler{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
count++
wg.Add(1)
go func(server *models.Server, sh *updateServerHistoryHandler) {
defer wg.Done()
log.Printf("%s: updating history", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: history updated", server.Key)
}
}(server, sh)
}
wg.Wait()
}
func (h *handler) updateServersStats(t time.Time) error {
servers := []*models.Server{}
err := h.db.
Model(&servers).
Where("status = ? AND (stats_updated_at < ? OR stats_updated_at IS NULL)", models.ServerStatusOpen, t).
Select()
if err != nil {
return errors.Wrap(err, "updateServersStats")
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &updateServerStatsHandler{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
count++
wg.Add(1)
go func(server *models.Server, sh *updateServerStatsHandler) {
defer wg.Done()
log.Printf("%s: updating stats", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: stats updated", server.Key)
}
}(server, sh)
}
wg.Wait()
return nil
}
func (h *handler) updateStats() {
now := time.Now()
t1 := time.Date(now.Year(), now.Month(), now.Day(), 1, 30, 0, 0, time.UTC)
if err := h.updateServersStats(t1); err != nil {
log.Println(err)
return
}
}
func (h *handler) vacuumDatabase() {
servers := []*models.Server{}
err := h.db.
Model(&servers).
Select()
if err != nil {
log.Fatal(errors.Wrap(err, "vacuumDatabase"))
return
}
var wg sync.WaitGroup
max := runtime.NumCPU() * 5
count := 0
for _, server := range servers {
if count >= max {
wg.Wait()
count = 0
}
sh := &vacuumServerDBHandler{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
}
count++
wg.Add(1)
go func(server *models.Server, sh *vacuumServerDBHandler) {
defer wg.Done()
log.Printf("%s: vacuuming database", server.Key)
if err := sh.vacuum(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
} else {
log.Printf("%s: database vacuumed", server.Key)
}
}(server, sh)
}
wg.Wait()
}