delete readme.md, add the option to download data for more than one server at the same time

This commit is contained in:
Dawid Wysokiński 2020-10-10 11:45:52 +02:00
parent fc0af6a9f3
commit ad8cdc6926
6 changed files with 86 additions and 54 deletions

View File

@ -17,6 +17,8 @@ DB_NAME=your_db_name
DB_PORT=5432
DB_HOST=your_db_host
DB_PASSWORD=your_db_pass
MAX_CONCURRENT_WORKERS=1 #how many servers should update at the same time
```
### Prerequisites

View File

@ -24,15 +24,21 @@ const (
)
type handler struct {
db *pg.DB
db *pg.DB
maxConcurrentWorkers int
}
type Config struct {
DB *pg.DB
DB *pg.DB
MaxConcurrentWorkers int
}
func Attach(c *cron.Cron, cfg Config) error {
h := &handler{cfg.DB}
if cfg.DB == nil {
return fmt.Errorf("cfg.DB cannot be nil, expected go-pg database")
}
h := &handler{cfg.DB, cfg.MaxConcurrentWorkers}
if err := h.init(); err != nil {
return err
}
@ -60,6 +66,10 @@ func Attach(c *cron.Cron, cfg Config) error {
}
func (h *handler) init() error {
if h.maxConcurrentWorkers <= 0 {
h.maxConcurrentWorkers = 1
}
tx, err := h.db.Begin()
if err != nil {
return err
@ -223,25 +233,36 @@ func (h *handler) updateServersData() {
return
}
var wg sync.WaitGroup
p := newPool(h.maxConcurrentWorkers)
defer p.close()
for _, server := range servers {
url, ok := urls[server.Key]
if !ok {
log.Printf("No one URL associated with key: %s, skipping...", server.Key)
continue
}
p.waitForWorker()
wg.Add(1)
sh := &updateServerDataWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
baseURL: url,
server: server,
}
log.Printf("%s: updating data", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
} else {
log.Printf("%s: data updated", server.Key)
}
go func(worker *updateServerDataWorker, server *models.Server, url string) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: updating data", server.Key)
if err := sh.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
} else {
log.Printf("%s: data updated", server.Key)
}
}(sh, server, url)
}
wg.Wait()
}
func (h *handler) updateHistory() {
@ -262,22 +283,22 @@ func (h *handler) updateHistory() {
defer p.close()
for _, server := range servers {
sh := &updateServerHistoryWorker{
p.waitForWorker()
wg.Add(1)
worker := &updateServerHistoryWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *updateServerHistoryWorker) {
go func(server *models.Server, worker *updateServerHistoryWorker) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: updating history", server.Key)
if err := sh.update(); err != nil {
if err := worker.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
}
log.Printf("%s: history updated", server.Key)
}(server, sh)
}(server, worker)
}
wg.Wait()
@ -298,22 +319,22 @@ func (h *handler) updateServersStats(t time.Time) error {
defer p.close()
for _, server := range servers {
sh := &updateServerStatsWorker{
p.waitForWorker()
wg.Add(1)
worker := &updateServerStatsWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
server: server,
}
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *updateServerStatsWorker) {
go func(server *models.Server, worker *updateServerStatsWorker) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: updating stats", server.Key)
if err := sh.update(); err != nil {
if err := worker.update(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
}
log.Printf("%s: stats updated", server.Key)
}(server, sh)
}(server, worker)
}
wg.Wait()
@ -343,21 +364,21 @@ func (h *handler) vacuumDatabase() {
defer p.close()
for _, server := range servers {
sh := &vacuumServerDBWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
}
p.waitForWorker()
wg.Add(1)
go func(server *models.Server, sh *vacuumServerDBWorker, p *pool) {
worker := &vacuumServerDBWorker{
db: h.db.WithParam("SERVER", pg.Safe(server.Key)),
}
go func(server *models.Server, worker *vacuumServerDBWorker, p *pool) {
defer p.releaseWorker()
defer wg.Done()
log.Printf("%s: vacuuming database", server.Key)
if err := sh.vacuum(); err != nil {
if err := worker.vacuum(); err != nil {
log.Println(errors.Wrap(err, server.Key))
return
}
log.Printf("%s: database vacuumed", server.Key)
}(server, sh, p)
}(server, worker, p)
}
wg.Wait()

View File

@ -585,7 +585,7 @@ func (h *updateServerDataWorker) update() error {
Where("tribe.id NOT IN (?)", pg.In(ids)).
Set("exists = false").
Update(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot update not existed tribes")
return errors.Wrap(err, "cannot update nonexistent tribes")
}
return tx.Commit()
})
@ -647,7 +647,7 @@ func (h *updateServerDataWorker) update() error {
Where("id NOT IN (?)", pg.In(ids)).
Set("exists = false").
Update(); err != nil && err != pg.ErrNoRows {
return errors.Wrap(err, "cannot update not existed players")
return errors.Wrap(err, "cannot update nonexistent players")
}
return tx.Commit()
})

View File

@ -8,6 +8,10 @@ import (
"github.com/tribalwarshelp/shared/models"
)
const (
day = 24 * time.Hour
)
type vacuumServerDBWorker struct {
db *pg.DB
}
@ -19,39 +23,39 @@ func (h *vacuumServerDBWorker) vacuum() error {
}
defer tx.Close()
withNotExistedPlayers := h.db.Model(&models.Player{}).Where("exists = false")
withNotExistedTribes := h.db.Model(&models.Tribe{}).Where("exists = false")
withNonExistentPlayers := h.db.Model(&models.Player{}).Where("exists = false")
withNonExistentTribes := h.db.Model(&models.Tribe{}).Where("exists = false")
_, err = tx.Model(&models.PlayerHistory{}).
With("players", withNotExistedPlayers).
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*24*time.Hour*90)).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR player_history.create_date < ?", time.Now().Add(-1*day*90)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old player history")
return errors.Wrap(err, "cannot delete old player history records")
}
_, err = tx.Model(&models.TribeHistory{}).
With("tribes", withNotExistedTribes).
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*24*time.Hour*90)).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR tribe_history.create_date < ?", time.Now().Add(-1*day*90)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old tribe history")
return errors.Wrap(err, "cannot delete old tribe history records")
}
_, err = tx.Model(&models.DailyPlayerStats{}).
With("players", withNotExistedPlayers).
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*24*time.Hour*90)).
With("players", withNonExistentPlayers).
Where("player_id IN (Select id FROM players) OR daily_player_stats.create_date < ?", time.Now().Add(-1*day*90)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old player stats")
return errors.Wrap(err, "cannot delete old player stats records")
}
_, err = tx.Model(&models.DailyTribeStats{}).
With("tribes", withNotExistedTribes).
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*24*time.Hour*90)).
With("tribes", withNonExistentTribes).
Where("tribe_id IN (Select id FROM tribes) OR daily_tribe_stats.create_date < ?", time.Now().Add(-1*day*90)).
Delete()
if err != nil {
return errors.Wrap(err, "cannot delete old tribe stats")
return errors.Wrap(err, "cannot delete old tribe stats records")
}
return tx.Commit()

16
main.go
View File

@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"github.com/tribalwarshelp/shared/mode"
@ -42,7 +43,8 @@ func main() {
cron.SkipIfStillRunning(cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))),
))
if err := _cron.Attach(c, _cron.Config{
DB: db,
DB: db,
MaxConcurrentWorkers: mustParseEnvToInt("MAX_CONCURRENT_WORKERS"),
}); err != nil {
log.Fatal(err)
}
@ -57,3 +59,15 @@ func main() {
log.Print("shutting down")
}
func mustParseEnvToInt(key string) int {
str := os.Getenv(key)
if str == "" {
return 0
}
i, err := strconv.Atoi(str)
if err != nil {
return 0
}
return i
}

View File

@ -1,9 +0,0 @@
# Cron
Required ENV Variables:
- DB_USER=
- DB_NAME=
- DB_PORT=
- DB_HOST=
- DB_PASSWORD=