From 4154d6c133eee2b2582264b98097c3db2c80bfc7 Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 07:43:44 +0200 Subject: [PATCH 1/9] rename one of the directories (internal -> pkg) --- main.go | 4 ++-- {internal => pkg}/cron/config.go | 0 {internal => pkg}/cron/cron.go | 4 ++-- {internal => pkg}/cron/queue/config.go | 0 {internal => pkg}/cron/queue/helpers.go | 0 {internal => pkg}/cron/queue/queue.go | 2 +- {internal => pkg}/cron/queue/task.go | 0 .../cron/queue/task_delete_non_existent_villages.go | 0 .../cron/queue/task_load_servers_and_update_data.go | 2 +- .../cron/queue/task_load_versions_and_update_server_data.go | 0 .../cron/queue/task_server_delete_non_existent_villages.go | 0 {internal => pkg}/cron/queue/task_update_ennoblements.go | 0 {internal => pkg}/cron/queue/task_update_history.go | 0 {internal => pkg}/cron/queue/task_update_server_data.go | 0 .../cron/queue/task_update_server_ennoblements.go | 0 {internal => pkg}/cron/queue/task_update_server_history.go | 0 {internal => pkg}/cron/queue/task_update_server_stats.go | 0 {internal => pkg}/cron/queue/task_update_stats.go | 0 {internal => pkg}/cron/queue/task_vacuum.go | 0 {internal => pkg}/cron/queue/task_vacuum_server_db.go | 0 {internal => pkg}/postgres/postgres.go | 2 +- {internal => pkg}/postgres/sql_statements.go | 0 22 files changed, 7 insertions(+), 7 deletions(-) rename {internal => pkg}/cron/config.go (100%) rename {internal => pkg}/cron/cron.go (97%) rename {internal => pkg}/cron/queue/config.go (100%) rename {internal => pkg}/cron/queue/helpers.go (100%) rename {internal => pkg}/cron/queue/queue.go (97%) rename {internal => pkg}/cron/queue/task.go (100%) rename {internal => pkg}/cron/queue/task_delete_non_existent_villages.go (100%) rename {internal => pkg}/cron/queue/task_load_servers_and_update_data.go (98%) rename {internal => pkg}/cron/queue/task_load_versions_and_update_server_data.go (100%) rename {internal => pkg}/cron/queue/task_server_delete_non_existent_villages.go (100%) rename {internal => pkg}/cron/queue/task_update_ennoblements.go (100%) rename {internal => pkg}/cron/queue/task_update_history.go (100%) rename {internal => pkg}/cron/queue/task_update_server_data.go (100%) rename {internal => pkg}/cron/queue/task_update_server_ennoblements.go (100%) rename {internal => pkg}/cron/queue/task_update_server_history.go (100%) rename {internal => pkg}/cron/queue/task_update_server_stats.go (100%) rename {internal => pkg}/cron/queue/task_update_stats.go (100%) rename {internal => pkg}/cron/queue/task_vacuum.go (100%) rename {internal => pkg}/cron/queue/task_vacuum_server_db.go (100%) rename {internal => pkg}/postgres/postgres.go (98%) rename {internal => pkg}/postgres/sql_statements.go (100%) diff --git a/main.go b/main.go index d9d3745..2178e23 100644 --- a/main.go +++ b/main.go @@ -13,8 +13,8 @@ import ( "github.com/sirupsen/logrus" - twhelpcron "github.com/tribalwarshelp/cron/internal/cron" - "github.com/tribalwarshelp/cron/internal/postgres" + twhelpcron "github.com/tribalwarshelp/cron/pkg/cron" + "github.com/tribalwarshelp/cron/pkg/postgres" "github.com/joho/godotenv" ) diff --git a/internal/cron/config.go b/pkg/cron/config.go similarity index 100% rename from internal/cron/config.go rename to pkg/cron/config.go diff --git a/internal/cron/cron.go b/pkg/cron/cron.go similarity index 97% rename from internal/cron/cron.go rename to pkg/cron/cron.go index c9d754f..a23d1e9 100644 --- a/internal/cron/cron.go +++ b/pkg/cron/cron.go @@ -10,7 +10,7 @@ import ( "github.com/robfig/cron/v3" - "github.com/tribalwarshelp/cron/internal/cron/queue" + "github.com/tribalwarshelp/cron/pkg/cron/queue" ) type Cron struct { @@ -29,7 +29,7 @@ func New(cfg *Config) (*Cron, error) { if err != nil { return nil, err } - log := logrus.WithField("package", "internal/cron") + log := logrus.WithField("package", "pkg/cron") c := &Cron{ Cron: cron.New(cron.WithChain( cron.SkipIfStillRunning( diff --git a/internal/cron/queue/config.go b/pkg/cron/queue/config.go similarity index 100% rename from internal/cron/queue/config.go rename to pkg/cron/queue/config.go diff --git a/internal/cron/queue/helpers.go b/pkg/cron/queue/helpers.go similarity index 100% rename from internal/cron/queue/helpers.go rename to pkg/cron/queue/helpers.go diff --git a/internal/cron/queue/queue.go b/pkg/cron/queue/queue.go similarity index 97% rename from internal/cron/queue/queue.go rename to pkg/cron/queue/queue.go index e021a78..60ffaa5 100644 --- a/internal/cron/queue/queue.go +++ b/pkg/cron/queue/queue.go @@ -16,7 +16,7 @@ const ( Ennoblements = "ennoblements" ) -var log = logrus.WithField("package", "internal/cron/queue") +var log = logrus.WithField("package", "pkg/cron/queue") type Queue interface { Start(ctx context.Context) error diff --git a/internal/cron/queue/task.go b/pkg/cron/queue/task.go similarity index 100% rename from internal/cron/queue/task.go rename to pkg/cron/queue/task.go diff --git a/internal/cron/queue/task_delete_non_existent_villages.go b/pkg/cron/queue/task_delete_non_existent_villages.go similarity index 100% rename from internal/cron/queue/task_delete_non_existent_villages.go rename to pkg/cron/queue/task_delete_non_existent_villages.go diff --git a/internal/cron/queue/task_load_servers_and_update_data.go b/pkg/cron/queue/task_load_servers_and_update_data.go similarity index 98% rename from internal/cron/queue/task_load_servers_and_update_data.go rename to pkg/cron/queue/task_load_servers_and_update_data.go index 924fdc2..65453e1 100644 --- a/internal/cron/queue/task_load_servers_and_update_data.go +++ b/pkg/cron/queue/task_load_servers_and_update_data.go @@ -8,7 +8,7 @@ import ( "github.com/tribalwarshelp/shared/tw/twdataloader" "github.com/tribalwarshelp/shared/tw/twmodel" - "github.com/tribalwarshelp/cron/internal/postgres" + "github.com/tribalwarshelp/cron/pkg/postgres" ) type taskLoadServersAndUpdateData struct { diff --git a/internal/cron/queue/task_load_versions_and_update_server_data.go b/pkg/cron/queue/task_load_versions_and_update_server_data.go similarity index 100% rename from internal/cron/queue/task_load_versions_and_update_server_data.go rename to pkg/cron/queue/task_load_versions_and_update_server_data.go diff --git a/internal/cron/queue/task_server_delete_non_existent_villages.go b/pkg/cron/queue/task_server_delete_non_existent_villages.go similarity index 100% rename from internal/cron/queue/task_server_delete_non_existent_villages.go rename to pkg/cron/queue/task_server_delete_non_existent_villages.go diff --git a/internal/cron/queue/task_update_ennoblements.go b/pkg/cron/queue/task_update_ennoblements.go similarity index 100% rename from internal/cron/queue/task_update_ennoblements.go rename to pkg/cron/queue/task_update_ennoblements.go diff --git a/internal/cron/queue/task_update_history.go b/pkg/cron/queue/task_update_history.go similarity index 100% rename from internal/cron/queue/task_update_history.go rename to pkg/cron/queue/task_update_history.go diff --git a/internal/cron/queue/task_update_server_data.go b/pkg/cron/queue/task_update_server_data.go similarity index 100% rename from internal/cron/queue/task_update_server_data.go rename to pkg/cron/queue/task_update_server_data.go diff --git a/internal/cron/queue/task_update_server_ennoblements.go b/pkg/cron/queue/task_update_server_ennoblements.go similarity index 100% rename from internal/cron/queue/task_update_server_ennoblements.go rename to pkg/cron/queue/task_update_server_ennoblements.go diff --git a/internal/cron/queue/task_update_server_history.go b/pkg/cron/queue/task_update_server_history.go similarity index 100% rename from internal/cron/queue/task_update_server_history.go rename to pkg/cron/queue/task_update_server_history.go diff --git a/internal/cron/queue/task_update_server_stats.go b/pkg/cron/queue/task_update_server_stats.go similarity index 100% rename from internal/cron/queue/task_update_server_stats.go rename to pkg/cron/queue/task_update_server_stats.go diff --git a/internal/cron/queue/task_update_stats.go b/pkg/cron/queue/task_update_stats.go similarity index 100% rename from internal/cron/queue/task_update_stats.go rename to pkg/cron/queue/task_update_stats.go diff --git a/internal/cron/queue/task_vacuum.go b/pkg/cron/queue/task_vacuum.go similarity index 100% rename from internal/cron/queue/task_vacuum.go rename to pkg/cron/queue/task_vacuum.go diff --git a/internal/cron/queue/task_vacuum_server_db.go b/pkg/cron/queue/task_vacuum_server_db.go similarity index 100% rename from internal/cron/queue/task_vacuum_server_db.go rename to pkg/cron/queue/task_vacuum_server_db.go diff --git a/internal/postgres/postgres.go b/pkg/postgres/postgres.go similarity index 98% rename from internal/postgres/postgres.go rename to pkg/postgres/postgres.go index b48a018..c7622a7 100644 --- a/internal/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -11,7 +11,7 @@ import ( "github.com/tribalwarshelp/shared/tw/twmodel" ) -var log = logrus.WithField("package", "internal/postgres") +var log = logrus.WithField("package", "pkg/postgres") type Config struct { LogQueries bool diff --git a/internal/postgres/sql_statements.go b/pkg/postgres/sql_statements.go similarity index 100% rename from internal/postgres/sql_statements.go rename to pkg/postgres/sql_statements.go From 91bf75b43b1d3c21abe7cf7f3d81549e5ed88f9c Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 08:09:15 +0200 Subject: [PATCH 2/9] remove one arg from q.Add (name), rename taskVacuumServerDB -> taskVacuumServerData --- pkg/cron/cron.go | 12 +++---- pkg/cron/queue/queue.go | 34 +++++++++++-------- pkg/cron/queue/task.go | 6 ++-- .../task_delete_non_existent_villages.go | 1 - .../task_load_servers_and_update_data.go | 2 +- ...sk_load_versions_and_update_server_data.go | 2 +- pkg/cron/queue/task_update_ennoblements.go | 1 - pkg/cron/queue/task_update_history.go | 2 +- pkg/cron/queue/task_update_stats.go | 2 +- pkg/cron/queue/task_vacuum.go | 2 +- ...erver_db.go => task_vacuum_server_data.go} | 14 ++++---- 11 files changed, 41 insertions(+), 37 deletions(-) rename pkg/cron/queue/{task_vacuum_server_db.go => task_vacuum_server_data.go} (83%) diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go index a23d1e9..665c2e0 100644 --- a/pkg/cron/cron.go +++ b/pkg/cron/cron.go @@ -113,42 +113,42 @@ func (c *Cron) Stop() error { } func (c *Cron) updateServerData() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.LoadVersionsAndUpdateServerData).WithArgs(context.Background())) if err != nil { c.logError("Cron.updateServerData", queue.LoadVersionsAndUpdateServerData, err) } } func (c *Cron) updateEnnoblements() { - err := c.queue.Add(queue.Ennoblements, queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.UpdateEnnoblements).WithArgs(context.Background())) if err != nil { c.logError("Cron.updateEnnoblements", queue.UpdateEnnoblements, err) } } func (c *Cron) updateHistory(timezone string) { - err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.GetTask(queue.UpdateHistory).WithArgs(context.Background(), timezone)) if err != nil { c.logError("Cron.updateHistory", queue.UpdateHistory, err) } } func (c *Cron) updateStats(timezone string) { - err := c.queue.Add(queue.Main, queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) + err := c.queue.Add(queue.GetTask(queue.UpdateStats).WithArgs(context.Background(), timezone)) if err != nil { c.logError("Cron.updateStats", queue.UpdateStats, err) } } func (c *Cron) vacuumDatabase() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.Vacuum).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.Vacuum).WithArgs(context.Background())) if err != nil { c.logError("Cron.vacuumDatabase", queue.Vacuum, err) } } func (c *Cron) deleteNonExistentVillages() { - err := c.queue.Add(queue.Main, queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) + err := c.queue.Add(queue.GetTask(queue.DeleteNonExistentVillages).WithArgs(context.Background())) if err != nil { c.logError("Cron.deleteNonExistentVillages", queue.DeleteNonExistentVillages, err) } diff --git a/pkg/cron/queue/queue.go b/pkg/cron/queue/queue.go index 60ffaa5..aca16b3 100644 --- a/pkg/cron/queue/queue.go +++ b/pkg/cron/queue/queue.go @@ -11,17 +11,12 @@ import ( "github.com/vmihailenco/taskq/v3/redisq" ) -const ( - Main = "main" - Ennoblements = "ennoblements" -) - var log = logrus.WithField("package", "pkg/cron/queue") type Queue interface { Start(ctx context.Context) error Close() error - Add(name string, msg *taskq.Message) error + Add(msg *taskq.Message) error } type queue struct { @@ -49,8 +44,8 @@ func New(cfg *Config) (Queue, error) { func (q *queue) init(cfg *Config) error { q.factory = redisq.NewFactory() - q.main = q.registerQueue(Main, cfg.WorkerLimit) - q.ennoblements = q.registerQueue(Ennoblements, cfg.WorkerLimit) + q.main = q.registerQueue("main", cfg.WorkerLimit) + q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit) err := registerTasks(®isterTasksConfig{ DB: cfg.DB, @@ -73,11 +68,22 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue { }) } -func (q *queue) getQueueByName(name string) taskq.Queue { +func (q *queue) getQueueByTaskName(name string) taskq.Queue { switch name { - case Main: + case LoadVersionsAndUpdateServerData, + LoadServersAndUpdateData, + UpdateServerData, + Vacuum, + VacuumServerData, + UpdateHistory, + UpdateServerHistory, + UpdateStats, + UpdateServerStats, + DeleteNonExistentVillages, + ServerDeleteNonExistentVillages: return q.main - case Ennoblements: + case UpdateEnnoblements, + UpdateServerEnnoblements: return q.ennoblements } return nil @@ -97,10 +103,10 @@ func (q *queue) Close() error { return nil } -func (q *queue) Add(name string, msg *taskq.Message) error { - queue := q.getQueueByName(name) +func (q *queue) Add(msg *taskq.Message) error { + queue := q.getQueueByTaskName(msg.TaskName) if queue == nil { - return errors.Errorf("couldn't add the message to the queue: unknown queue name '%s'", name) + return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName) } if err := queue.Add(msg); err != nil { return errors.Wrap(err, "couldn't add the message to the queue") diff --git a/pkg/cron/queue/task.go b/pkg/cron/queue/task.go index ae92bd7..7e6890a 100644 --- a/pkg/cron/queue/task.go +++ b/pkg/cron/queue/task.go @@ -13,7 +13,7 @@ const ( LoadServersAndUpdateData = "loadServersAndUpdateData" UpdateServerData = "updateServerData" Vacuum = "vacuum" - VacuumServerDB = "vacuumServerDB" + VacuumServerData = "vacuumServerData" UpdateEnnoblements = "updateEnnoblements" UpdateServerEnnoblements = "updateServerEnnoblements" UpdateHistory = "updateHistory" @@ -71,8 +71,8 @@ func registerTasks(cfg *registerTasksConfig) error { Handler: (&taskVacuum{t}).execute, }, { - Name: VacuumServerDB, - Handler: (&taskVacuumServerDB{t}).execute, + Name: VacuumServerData, + Handler: (&taskVacuumServerData{t}).execute, }, { Name: UpdateEnnoblements, diff --git a/pkg/cron/queue/task_delete_non_existent_villages.go b/pkg/cron/queue/task_delete_non_existent_villages.go index 63da676..3af9b5b 100644 --- a/pkg/cron/queue/task_delete_non_existent_villages.go +++ b/pkg/cron/queue/task_delete_non_existent_villages.go @@ -29,7 +29,6 @@ func (t *taskDeleteNonExistentVillages) execute() error { Info("taskDeleteNonExistentVillages.execute: Servers have been loaded and added to the queue") for _, server := range servers { err := t.queue.Add( - Main, GetTask(ServerDeleteNonExistentVillages). WithArgs( context.Background(), diff --git a/pkg/cron/queue/task_load_servers_and_update_data.go b/pkg/cron/queue/task_load_servers_and_update_data.go index 65453e1..838f562 100644 --- a/pkg/cron/queue/task_load_servers_and_update_data.go +++ b/pkg/cron/queue/task_load_servers_and_update_data.go @@ -85,7 +85,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { entry.Infof("%s: Servers have been loaded", version.Host) for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) + err := t.queue.Add(GetTask(UpdateServerData).WithArgs(context.Background(), server.url, server.Server)) if err != nil { log. WithField("key", server.Key). diff --git a/pkg/cron/queue/task_load_versions_and_update_server_data.go b/pkg/cron/queue/task_load_versions_and_update_server_data.go index 9e598a6..80d5fe3 100644 --- a/pkg/cron/queue/task_load_versions_and_update_server_data.go +++ b/pkg/cron/queue/task_load_versions_and_update_server_data.go @@ -20,7 +20,7 @@ func (t *taskLoadVersionsAndUpdateServerData) execute() error { } log.Debug("taskLoadVersionsAndUpdateServerData.execute: Versions have been loaded") for _, version := range versions { - err := t.queue.Add(Main, GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) + err := t.queue.Add(GetTask(LoadServersAndUpdateData).WithArgs(context.Background(), version)) if err != nil { log. WithField("code", version.Code). diff --git a/pkg/cron/queue/task_update_ennoblements.go b/pkg/cron/queue/task_update_ennoblements.go index 56695eb..f5103b8 100644 --- a/pkg/cron/queue/task_update_ennoblements.go +++ b/pkg/cron/queue/task_update_ennoblements.go @@ -26,7 +26,6 @@ func (t *taskUpdateEnnoblements) execute() error { log.WithField("numberOfServers", len(servers)).Info("taskUpdateEnnoblements.execute: Update of the ennoblements has started...") for _, server := range servers { err := t.queue.Add( - Ennoblements, GetTask(UpdateServerEnnoblements). WithArgs(context.Background(), twurlbuilder.BuildServerURL(server.Key, server.Version.Host), server), ) diff --git a/pkg/cron/queue/task_update_history.go b/pkg/cron/queue/task_update_history.go index 84d9611..9be1623 100644 --- a/pkg/cron/queue/task_update_history.go +++ b/pkg/cron/queue/task_update_history.go @@ -41,7 +41,7 @@ func (t *taskUpdateHistory) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateHistory.execute: Update of the history has started") for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(GetTask(UpdateServerHistory).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/pkg/cron/queue/task_update_stats.go b/pkg/cron/queue/task_update_stats.go index 6a74df6..9628bd9 100644 --- a/pkg/cron/queue/task_update_stats.go +++ b/pkg/cron/queue/task_update_stats.go @@ -41,7 +41,7 @@ func (t *taskUpdateStats) execute(timezone string) error { WithField("numberOfServers", len(servers)). Info("taskUpdateStats.execute: Update of the stats has started") for _, server := range servers { - err := t.queue.Add(Main, GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) + err := t.queue.Add(GetTask(UpdateServerStats).WithArgs(context.Background(), timezone, server)) if err != nil { log. WithField("key", server.Key). diff --git a/pkg/cron/queue/task_vacuum.go b/pkg/cron/queue/task_vacuum.go index 0a229fd..7e9903d 100644 --- a/pkg/cron/queue/task_vacuum.go +++ b/pkg/cron/queue/task_vacuum.go @@ -22,7 +22,7 @@ func (t *taskVacuum) execute() error { } log.Infof("taskVacuum.execute: The database vacumming process has started...") for _, server := range servers { - err := t.queue.Add(Main, GetTask(VacuumServerDB).WithArgs(context.Background(), server)) + err := t.queue.Add(GetTask(VacuumServerData).WithArgs(context.Background(), server)) if err != nil { log. WithField("key", server.Key). diff --git a/pkg/cron/queue/task_vacuum_server_db.go b/pkg/cron/queue/task_vacuum_server_data.go similarity index 83% rename from pkg/cron/queue/task_vacuum_server_db.go rename to pkg/cron/queue/task_vacuum_server_data.go index cac4866..ed229c8 100644 --- a/pkg/cron/queue/task_vacuum_server_db.go +++ b/pkg/cron/queue/task_vacuum_server_data.go @@ -11,32 +11,32 @@ const ( day = 24 * time.Hour ) -type taskVacuumServerDB struct { +type taskVacuumServerData struct { *task } -func (t *taskVacuumServerDB) execute(server *twmodel.Server) error { +func (t *taskVacuumServerData) execute(server *twmodel.Server) error { if err := t.validatePayload(server); err != nil { - log.Debug(errors.Wrap(err, "taskVacuumServerDB.execute")) + log.Debug(errors.Wrap(err, "taskVacuumServerData.execute")) return nil } entry := log.WithField("key", server.Key) - entry.Infof("taskVacuumServerDB.execute: %s: Vacumming the database...", server.Key) + entry.Infof("taskVacuumServerData.execute: %s: Vacumming the database...", server.Key) err := (&workerVacuumServerDB{ db: t.db.WithParam("SERVER", pg.Safe(server.Key)), server: server, }).vacuum() if err != nil { - err = errors.Wrap(err, "taskVacuumServerDB.execute") + err = errors.Wrap(err, "taskVacuumServerData.execute") entry.Error(err) return err } - entry.Infof("taskVacuumServerDB.execute: %s: The database has been vacummed", server.Key) + entry.Infof("taskVacuumServerData.execute: %s: The database has been vacummed", server.Key) return nil } -func (t *taskVacuumServerDB) validatePayload(server *twmodel.Server) error { +func (t *taskVacuumServerData) validatePayload(server *twmodel.Server) error { if server == nil { return errors.New("expected *twmodel.Server, got nil") } From 37955cfebf5ca007ae789f8363eaeaaac61400af Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 08:16:46 +0200 Subject: [PATCH 3/9] remove the Queue interface --- pkg/cron/cron.go | 4 ++-- pkg/cron/queue/config.go | 2 +- pkg/cron/queue/queue.go | 24 +++++++++--------------- pkg/cron/queue/task.go | 2 +- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go index 665c2e0..9c124f3 100644 --- a/pkg/cron/cron.go +++ b/pkg/cron/cron.go @@ -15,7 +15,7 @@ import ( type Cron struct { *cron.Cron - queue queue.Queue + queue *queue.Queue db *pg.DB runOnInit bool log logrus.FieldLogger @@ -165,7 +165,7 @@ func (c *Cron) logError(prefix string, taskName string, err error) { ) } -func initializeQueue(cfg *Config) (queue.Queue, error) { +func initializeQueue(cfg *Config) (*queue.Queue, error) { q, err := queue.New(&queue.Config{ WorkerLimit: cfg.WorkerLimit, Redis: cfg.Redis, diff --git a/pkg/cron/queue/config.go b/pkg/cron/queue/config.go index 99a88ba..7a32759 100644 --- a/pkg/cron/queue/config.go +++ b/pkg/cron/queue/config.go @@ -21,7 +21,7 @@ func validateConfig(cfg *Config) error { type registerTasksConfig struct { DB *pg.DB - Queue Queue + Queue *Queue } func validateRegisterTasksConfig(cfg *registerTasksConfig) error { diff --git a/pkg/cron/queue/queue.go b/pkg/cron/queue/queue.go index aca16b3..9cb6c94 100644 --- a/pkg/cron/queue/queue.go +++ b/pkg/cron/queue/queue.go @@ -13,25 +13,19 @@ import ( var log = logrus.WithField("package", "pkg/cron/queue") -type Queue interface { - Start(ctx context.Context) error - Close() error - Add(msg *taskq.Message) error -} - -type queue struct { +type Queue struct { redis redis.UniversalClient main taskq.Queue ennoblements taskq.Queue factory taskq.Factory } -func New(cfg *Config) (Queue, error) { +func New(cfg *Config) (*Queue, error) { if err := validateConfig(cfg); err != nil { return nil, err } - q := &queue{ + q := &Queue{ redis: cfg.Redis, } @@ -42,7 +36,7 @@ func New(cfg *Config) (Queue, error) { return q, nil } -func (q *queue) init(cfg *Config) error { +func (q *Queue) init(cfg *Config) error { q.factory = redisq.NewFactory() q.main = q.registerQueue("main", cfg.WorkerLimit) q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit) @@ -58,7 +52,7 @@ func (q *queue) init(cfg *Config) error { return nil } -func (q *queue) registerQueue(name string, limit int) taskq.Queue { +func (q *Queue) registerQueue(name string, limit int) taskq.Queue { return q.factory.RegisterQueue(&taskq.QueueOptions{ Name: name, ReservationTimeout: time.Minute * 2, @@ -68,7 +62,7 @@ func (q *queue) registerQueue(name string, limit int) taskq.Queue { }) } -func (q *queue) getQueueByTaskName(name string) taskq.Queue { +func (q *Queue) getQueueByTaskName(name string) taskq.Queue { switch name { case LoadVersionsAndUpdateServerData, LoadServersAndUpdateData, @@ -89,21 +83,21 @@ func (q *queue) getQueueByTaskName(name string) taskq.Queue { return nil } -func (q *queue) Start(ctx context.Context) error { +func (q *Queue) Start(ctx context.Context) error { if err := q.factory.StartConsumers(ctx); err != nil { return errors.Wrap(err, "couldn't start the queue") } return nil } -func (q *queue) Close() error { +func (q *Queue) Close() error { if err := q.factory.Close(); err != nil { return errors.Wrap(err, "couldn't close the queue") } return nil } -func (q *queue) Add(msg *taskq.Message) error { +func (q *Queue) Add(msg *taskq.Message) error { queue := q.getQueueByTaskName(msg.TaskName) if queue == nil { return errors.Errorf("couldn't add the message to the queue: unknown task name '%s'", msg.TaskName) diff --git a/pkg/cron/queue/task.go b/pkg/cron/queue/task.go index 7e6890a..77c5335 100644 --- a/pkg/cron/queue/task.go +++ b/pkg/cron/queue/task.go @@ -27,7 +27,7 @@ const ( type task struct { db *pg.DB - queue Queue + queue *Queue cachedLocations sync.Map } From f2b6bbcbc47a52e543a8b7b70f2bcf7b5566bb09 Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 08:27:31 +0200 Subject: [PATCH 4/9] rename two functions in the postgres package (CreateSchema -> CreateServerSchema, createSchema -> createServerSchema) --- pkg/cron/queue/task_load_servers_and_update_data.go | 2 +- pkg/postgres/postgres.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cron/queue/task_load_servers_and_update_data.go b/pkg/cron/queue/task_load_servers_and_update_data.go index 838f562..13a71c8 100644 --- a/pkg/cron/queue/task_load_servers_and_update_data.go +++ b/pkg/cron/queue/task_load_servers_and_update_data.go @@ -50,7 +50,7 @@ func (t *taskLoadServersAndUpdateData) execute(version *twmodel.Version) error { VersionCode: version.Code, Version: version, } - if err := postgres.CreateSchema(t.db, server); err != nil { + if err := postgres.CreateServerSchema(t.db, server); err != nil { logrus.Warn(errors.Wrapf(err, "taskLoadServersAndUpdateData.execute: %s: Couldn't create the schema", server.Key)) continue } diff --git a/pkg/postgres/postgres.go b/pkg/postgres/postgres.go index c7622a7..b1c5c11 100644 --- a/pkg/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -110,7 +110,7 @@ func prepareDB(db *pg.DB) error { } for _, server := range servers { - if err := createSchema(db, server, true); err != nil { + if err := createServerSchema(db, server, true); err != nil { return err } } @@ -118,11 +118,11 @@ func prepareDB(db *pg.DB) error { return nil } -func CreateSchema(db *pg.DB, server *twmodel.Server) error { - return createSchema(db, server, false) +func CreateServerSchema(db *pg.DB, server *twmodel.Server) error { + return createServerSchema(db, server, false) } -func SchemaExists(db *pg.DB, schemaName string) bool { +func SchemaExists(db pg.DBI, schemaName string) bool { exists, err := db. Model(). Table("information_schema.schemata"). @@ -134,7 +134,7 @@ func SchemaExists(db *pg.DB, schemaName string) bool { return exists } -func createSchema(db *pg.DB, server *twmodel.Server, init bool) error { +func createServerSchema(db *pg.DB, server *twmodel.Server, init bool) error { if !init && SchemaExists(db, server.Key) { return nil } @@ -145,7 +145,7 @@ func createSchema(db *pg.DB, server *twmodel.Server, init bool) error { } defer func() { if err := tx.Close(); err != nil { - log.Warn(errors.Wrap(err, "createSchema: Couldn't rollback the transaction")) + log.Warn(errors.Wrap(err, "createServerSchema: Couldn't rollback the transaction")) } }() From bea9d67446fb06fbfa254da6a33934d8d0ff48c7 Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 08:33:39 +0200 Subject: [PATCH 5/9] one more opt to postgres.Config (SkipDBInitialization), move the queue package folder up --- pkg/cron/cron.go | 2 +- pkg/postgres/postgres.go | 9 ++++++--- pkg/{cron => }/queue/config.go | 0 pkg/{cron => }/queue/helpers.go | 0 pkg/{cron => }/queue/queue.go | 2 +- pkg/{cron => }/queue/task.go | 0 .../queue/task_delete_non_existent_villages.go | 0 .../queue/task_load_servers_and_update_data.go | 0 .../queue/task_load_versions_and_update_server_data.go | 0 .../queue/task_server_delete_non_existent_villages.go | 0 pkg/{cron => }/queue/task_update_ennoblements.go | 0 pkg/{cron => }/queue/task_update_history.go | 0 pkg/{cron => }/queue/task_update_server_data.go | 0 pkg/{cron => }/queue/task_update_server_ennoblements.go | 0 pkg/{cron => }/queue/task_update_server_history.go | 0 pkg/{cron => }/queue/task_update_server_stats.go | 0 pkg/{cron => }/queue/task_update_stats.go | 0 pkg/{cron => }/queue/task_vacuum.go | 0 pkg/{cron => }/queue/task_vacuum_server_data.go | 0 19 files changed, 8 insertions(+), 5 deletions(-) rename pkg/{cron => }/queue/config.go (100%) rename pkg/{cron => }/queue/helpers.go (100%) rename pkg/{cron => }/queue/queue.go (97%) rename pkg/{cron => }/queue/task.go (100%) rename pkg/{cron => }/queue/task_delete_non_existent_villages.go (100%) rename pkg/{cron => }/queue/task_load_servers_and_update_data.go (100%) rename pkg/{cron => }/queue/task_load_versions_and_update_server_data.go (100%) rename pkg/{cron => }/queue/task_server_delete_non_existent_villages.go (100%) rename pkg/{cron => }/queue/task_update_ennoblements.go (100%) rename pkg/{cron => }/queue/task_update_history.go (100%) rename pkg/{cron => }/queue/task_update_server_data.go (100%) rename pkg/{cron => }/queue/task_update_server_ennoblements.go (100%) rename pkg/{cron => }/queue/task_update_server_history.go (100%) rename pkg/{cron => }/queue/task_update_server_stats.go (100%) rename pkg/{cron => }/queue/task_update_stats.go (100%) rename pkg/{cron => }/queue/task_vacuum.go (100%) rename pkg/{cron => }/queue/task_vacuum_server_data.go (100%) diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go index 9c124f3..02e5dd4 100644 --- a/pkg/cron/cron.go +++ b/pkg/cron/cron.go @@ -10,7 +10,7 @@ import ( "github.com/robfig/cron/v3" - "github.com/tribalwarshelp/cron/pkg/cron/queue" + "github.com/tribalwarshelp/cron/pkg/queue" ) type Cron struct { diff --git a/pkg/postgres/postgres.go b/pkg/postgres/postgres.go index b1c5c11..e0b7e89 100644 --- a/pkg/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -14,7 +14,8 @@ import ( var log = logrus.WithField("package", "pkg/postgres") type Config struct { - LogQueries bool + LogQueries bool + SkipDBInitialization bool } func Connect(cfg *Config) (*pg.DB, error) { @@ -27,8 +28,10 @@ func Connect(cfg *Config) (*pg.DB, error) { }) } - if err := prepareDB(db); err != nil { - return nil, err + if cfg == nil || !cfg.SkipDBInitialization { + if err := prepareDB(db); err != nil { + return nil, err + } } return db, nil diff --git a/pkg/cron/queue/config.go b/pkg/queue/config.go similarity index 100% rename from pkg/cron/queue/config.go rename to pkg/queue/config.go diff --git a/pkg/cron/queue/helpers.go b/pkg/queue/helpers.go similarity index 100% rename from pkg/cron/queue/helpers.go rename to pkg/queue/helpers.go diff --git a/pkg/cron/queue/queue.go b/pkg/queue/queue.go similarity index 97% rename from pkg/cron/queue/queue.go rename to pkg/queue/queue.go index 9cb6c94..9f1d9c4 100644 --- a/pkg/cron/queue/queue.go +++ b/pkg/queue/queue.go @@ -11,7 +11,7 @@ import ( "github.com/vmihailenco/taskq/v3/redisq" ) -var log = logrus.WithField("package", "pkg/cron/queue") +var log = logrus.WithField("package", "pkg/queue") type Queue struct { redis redis.UniversalClient diff --git a/pkg/cron/queue/task.go b/pkg/queue/task.go similarity index 100% rename from pkg/cron/queue/task.go rename to pkg/queue/task.go diff --git a/pkg/cron/queue/task_delete_non_existent_villages.go b/pkg/queue/task_delete_non_existent_villages.go similarity index 100% rename from pkg/cron/queue/task_delete_non_existent_villages.go rename to pkg/queue/task_delete_non_existent_villages.go diff --git a/pkg/cron/queue/task_load_servers_and_update_data.go b/pkg/queue/task_load_servers_and_update_data.go similarity index 100% rename from pkg/cron/queue/task_load_servers_and_update_data.go rename to pkg/queue/task_load_servers_and_update_data.go diff --git a/pkg/cron/queue/task_load_versions_and_update_server_data.go b/pkg/queue/task_load_versions_and_update_server_data.go similarity index 100% rename from pkg/cron/queue/task_load_versions_and_update_server_data.go rename to pkg/queue/task_load_versions_and_update_server_data.go diff --git a/pkg/cron/queue/task_server_delete_non_existent_villages.go b/pkg/queue/task_server_delete_non_existent_villages.go similarity index 100% rename from pkg/cron/queue/task_server_delete_non_existent_villages.go rename to pkg/queue/task_server_delete_non_existent_villages.go diff --git a/pkg/cron/queue/task_update_ennoblements.go b/pkg/queue/task_update_ennoblements.go similarity index 100% rename from pkg/cron/queue/task_update_ennoblements.go rename to pkg/queue/task_update_ennoblements.go diff --git a/pkg/cron/queue/task_update_history.go b/pkg/queue/task_update_history.go similarity index 100% rename from pkg/cron/queue/task_update_history.go rename to pkg/queue/task_update_history.go diff --git a/pkg/cron/queue/task_update_server_data.go b/pkg/queue/task_update_server_data.go similarity index 100% rename from pkg/cron/queue/task_update_server_data.go rename to pkg/queue/task_update_server_data.go diff --git a/pkg/cron/queue/task_update_server_ennoblements.go b/pkg/queue/task_update_server_ennoblements.go similarity index 100% rename from pkg/cron/queue/task_update_server_ennoblements.go rename to pkg/queue/task_update_server_ennoblements.go diff --git a/pkg/cron/queue/task_update_server_history.go b/pkg/queue/task_update_server_history.go similarity index 100% rename from pkg/cron/queue/task_update_server_history.go rename to pkg/queue/task_update_server_history.go diff --git a/pkg/cron/queue/task_update_server_stats.go b/pkg/queue/task_update_server_stats.go similarity index 100% rename from pkg/cron/queue/task_update_server_stats.go rename to pkg/queue/task_update_server_stats.go diff --git a/pkg/cron/queue/task_update_stats.go b/pkg/queue/task_update_stats.go similarity index 100% rename from pkg/cron/queue/task_update_stats.go rename to pkg/queue/task_update_stats.go diff --git a/pkg/cron/queue/task_vacuum.go b/pkg/queue/task_vacuum.go similarity index 100% rename from pkg/cron/queue/task_vacuum.go rename to pkg/queue/task_vacuum.go diff --git a/pkg/cron/queue/task_vacuum_server_data.go b/pkg/queue/task_vacuum_server_data.go similarity index 100% rename from pkg/cron/queue/task_vacuum_server_data.go rename to pkg/queue/task_vacuum_server_data.go From 5e1041503c1cf73cd72a656b8100bf51fb0c9c36 Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 09:25:12 +0200 Subject: [PATCH 6/9] change queue initialization in main.go --- main.go | 20 ++++++++++++++++---- pkg/cron/config.go | 14 +++++++------- pkg/cron/cron.go | 23 ++--------------------- pkg/queue/queue.go | 5 ++--- 4 files changed, 27 insertions(+), 35 deletions(-) diff --git a/main.go b/main.go index 2178e23..32ab641 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( twhelpcron "github.com/tribalwarshelp/cron/pkg/cron" "github.com/tribalwarshelp/cron/pkg/postgres" + "github.com/tribalwarshelp/cron/pkg/queue" "github.com/joho/godotenv" ) @@ -47,18 +48,29 @@ func main() { } }() - c, err := twhelpcron.New(&twhelpcron.Config{ + q, err := queue.New(&queue.Config{ DB: dbConn, - RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), Redis: redisClient, WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue")) + } + if err := q.Start(context.Background()); err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't start the queue")) + } + + c, err := twhelpcron.New(&twhelpcron.Config{ + DB: dbConn, + RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), + }) if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance")) } - if err := c.Start(context.Background()); err != nil { + if err := c.Start(); err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't start the cron")) } + defer c.Stop() logrus.Info("Cron is up and running!") @@ -67,7 +79,7 @@ func main() { <-channel logrus.Info("shutting down") - if err := c.Stop(); err != nil { + if err := q.Close(); err != nil { logrus.Fatal(err) } } diff --git a/pkg/cron/config.go b/pkg/cron/config.go index ce0b61e..0ef790a 100644 --- a/pkg/cron/config.go +++ b/pkg/cron/config.go @@ -2,23 +2,23 @@ package cron import ( "github.com/go-pg/pg/v10" - "github.com/go-redis/redis/v8" "github.com/pkg/errors" + + "github.com/tribalwarshelp/cron/pkg/queue" ) type Config struct { - DB *pg.DB - Redis redis.UniversalClient - RunOnInit bool - WorkerLimit int + DB *pg.DB + Queue *queue.Queue + RunOnInit bool } func validateConfig(cfg *Config) error { if cfg == nil || cfg.DB == nil { return errors.New("cfg.DB is required") } - if cfg.Redis == nil { - return errors.New("cfg.Redis is required") + if cfg.Queue == nil { + return errors.New("cfg.Queue is required") } return nil } diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go index 02e5dd4..8a69cb5 100644 --- a/pkg/cron/cron.go +++ b/pkg/cron/cron.go @@ -25,10 +25,6 @@ func New(cfg *Config) (*Cron, error) { if err := validateConfig(cfg); err != nil { return nil, err } - q, err := initializeQueue(cfg) - if err != nil { - return nil, err - } log := logrus.WithField("package", "pkg/cron") c := &Cron{ Cron: cron.New(cron.WithChain( @@ -36,7 +32,7 @@ func New(cfg *Config) (*Cron, error) { cron.PrintfLogger(log), ), )), - queue: q, + queue: cfg.Queue, db: cfg.DB, runOnInit: cfg.RunOnInit, log: log, @@ -96,19 +92,13 @@ func (c *Cron) init() error { return nil } -func (c *Cron) Start(ctx context.Context) error { - if err := c.queue.Start(ctx); err != nil { - return err - } +func (c *Cron) Start() error { c.Cron.Start() return nil } func (c *Cron) Stop() error { c.Cron.Stop() - if err := c.queue.Close(); err != nil { - return err - } return nil } @@ -165,15 +155,6 @@ func (c *Cron) logError(prefix string, taskName string, err error) { ) } -func initializeQueue(cfg *Config) (*queue.Queue, error) { - q, err := queue.New(&queue.Config{ - WorkerLimit: cfg.WorkerLimit, - Redis: cfg.Redis, - DB: cfg.DB, - }) - return q, errors.Wrap(err, "couldn't initialize a queue") -} - func createFnWithTimezone(timezone string, fn func(timezone string)) func() { return func() { fn(timezone) diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 9f1d9c4..32607d7 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -41,11 +41,10 @@ func (q *Queue) init(cfg *Config) error { q.main = q.registerQueue("main", cfg.WorkerLimit) q.ennoblements = q.registerQueue("ennoblements", cfg.WorkerLimit) - err := registerTasks(®isterTasksConfig{ + if err := registerTasks(®isterTasksConfig{ DB: cfg.DB, Queue: q, - }) - if err != nil { + }); err != nil { return errors.Wrapf(err, "couldn't register tasks") } From 7a67a9c048c3767ca5ddde877e90d0c8ce9ed88a Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 10:56:44 +0200 Subject: [PATCH 7/9] add a new directory - cmd --- cmd/cron/main.go | 67 ++++++++++++++++++++++++++++++++++++++++ cmd/dataupdater/main.go | 60 +++++++++++++++++++++++++++++++++++ cmd/internal/init.go | 50 ++++++++++++++++++++++++++++++ cmd/internal/redis.go | 24 ++++++++++++++ main.go | 3 +- pkg/postgres/postgres.go | 3 +- 6 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 cmd/cron/main.go create mode 100644 cmd/dataupdater/main.go create mode 100644 cmd/internal/init.go create mode 100644 cmd/internal/redis.go diff --git a/cmd/cron/main.go b/cmd/cron/main.go new file mode 100644 index 0000000..8116715 --- /dev/null +++ b/cmd/cron/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "github.com/Kichiyaki/goutil/envutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" + "os/signal" + "syscall" + + "github.com/tribalwarshelp/cron/cmd/internal" + twhelpcron "github.com/tribalwarshelp/cron/pkg/cron" + "github.com/tribalwarshelp/cron/pkg/postgres" + "github.com/tribalwarshelp/cron/pkg/queue" +) + +func main() { + redisClient, err := internal.NewRedisClient() + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't connect to Redis")) + } + defer func() { + if err := redisClient.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "couldn't close the Redis connection")) + } + }() + + dbConn, err := postgres.Connect(nil) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't connect to the db")) + } + defer func() { + if err := dbConn.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "couldn't close the db connection")) + } + }() + + q, err := queue.New(&queue.Config{ + DB: dbConn, + Redis: redisClient, + WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't initialize a queue")) + } + + c, err := twhelpcron.New(&twhelpcron.Config{ + DB: dbConn, + RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), + Queue: q, + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't initialize a cron instance")) + } + if err := c.Start(); err != nil { + logrus.Fatal(errors.Wrap(err, "couldn't start the cron")) + } + defer c.Stop() + + logrus.Info("Cron is up and running!") + + channel := make(chan os.Signal, 1) + signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) + <-channel + + logrus.Info("shutting down") +} diff --git a/cmd/dataupdater/main.go b/cmd/dataupdater/main.go new file mode 100644 index 0000000..48837f0 --- /dev/null +++ b/cmd/dataupdater/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "github.com/Kichiyaki/goutil/envutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" + "os/signal" + "syscall" + + "github.com/tribalwarshelp/cron/cmd/internal" + "github.com/tribalwarshelp/cron/pkg/postgres" + "github.com/tribalwarshelp/cron/pkg/queue" +) + +func main() { + redisClient, err := internal.NewRedisClient() + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) + } + defer func() { + if err := redisClient.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection")) + } + }() + + dbConn, err := postgres.Connect(&postgres.Config{SkipDBInitialization: true}) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) + } + defer func() { + if err := dbConn.Close(); err != nil { + logrus.Warn(errors.Wrap(err, "Couldn't close the db connection")) + } + }() + + q, err := queue.New(&queue.Config{ + DB: dbConn, + Redis: redisClient, + WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), + }) + if err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue")) + } + if err := q.Start(context.Background()); err != nil { + logrus.Fatal(errors.Wrap(err, "Couldn't start the queue")) + } + + logrus.Info("Data updater is up and running!") + + channel := make(chan os.Signal, 1) + signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) + <-channel + + logrus.Info("shutting down") + if err := q.Close(); err != nil { + logrus.Fatal(err) + } +} diff --git a/cmd/internal/init.go b/cmd/internal/init.go new file mode 100644 index 0000000..91d373e --- /dev/null +++ b/cmd/internal/init.go @@ -0,0 +1,50 @@ +package internal + +import ( + "github.com/Kichiyaki/appmode" + "github.com/joho/godotenv" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "os" +) + +func init() { + if err := setENVs(); err != nil { + logrus.Fatal(err) + } + prepareLogger() +} + +func setENVs() error { + err := os.Setenv("TZ", "UTC") + if err != nil { + return errors.Wrap(err, "setENVs") + } + + if appmode.Equals(appmode.DevelopmentMode) { + err := godotenv.Load(".env.local") + if err != nil { + return errors.Wrap(err, "setENVs") + } + } + + return nil +} + +func prepareLogger() { + if appmode.Equals(appmode.DevelopmentMode) { + logrus.SetLevel(logrus.DebugLevel) + } + + timestampFormat := "2006-01-02 15:04:05" + if appmode.Equals(appmode.ProductionMode) { + customFormatter := new(logrus.JSONFormatter) + customFormatter.TimestampFormat = timestampFormat + logrus.SetFormatter(customFormatter) + } else { + customFormatter := new(logrus.TextFormatter) + customFormatter.TimestampFormat = timestampFormat + customFormatter.FullTimestamp = true + logrus.SetFormatter(customFormatter) + } +} diff --git a/cmd/internal/redis.go b/cmd/internal/redis.go new file mode 100644 index 0000000..443ea2c --- /dev/null +++ b/cmd/internal/redis.go @@ -0,0 +1,24 @@ +package internal + +import ( + "context" + "github.com/Kichiyaki/goutil/envutil" + "github.com/go-redis/redis/v8" + "github.com/pkg/errors" + "time" +) + +func NewRedisClient() (redis.UniversalClient, error) { + client := redis.NewClient(&redis.Options{ + Addr: envutil.GetenvString("REDIS_ADDR"), + Username: envutil.GetenvString("REDIS_USERNAME"), + Password: envutil.GetenvString("REDIS_PASSWORD"), + DB: envutil.GetenvInt("REDIS_DB"), + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := client.Ping(ctx).Err(); err != nil { + return nil, errors.Wrap(err, "NewRedisClient") + } + return client, nil +} diff --git a/main.go b/main.go index 32ab641..93cfacd 100644 --- a/main.go +++ b/main.go @@ -38,7 +38,7 @@ func main() { } }() - dbConn, err := postgres.Connect(&postgres.Config{LogQueries: envutil.GetenvBool("LOG_DB_QUERIES")}) + dbConn, err := postgres.Connect(nil) if err != nil { logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) } @@ -62,6 +62,7 @@ func main() { c, err := twhelpcron.New(&twhelpcron.Config{ DB: dbConn, + Queue: q, RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), }) if err != nil { diff --git a/pkg/postgres/postgres.go b/pkg/postgres/postgres.go index e0b7e89..87e3a30 100644 --- a/pkg/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -14,14 +14,13 @@ import ( var log = logrus.WithField("package", "pkg/postgres") type Config struct { - LogQueries bool SkipDBInitialization bool } func Connect(cfg *Config) (*pg.DB, error) { db := pg.Connect(prepareOptions()) - if cfg != nil && cfg.LogQueries { + if envutil.GetenvBool("LOG_DB_QUERIES") { db.AddQueryHook(querylogger.Logger{ Log: log, MaxQueryLength: 2000, From 7b23e21ddfc6fc51673a06cee4e909d924eccbbf Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 11:04:51 +0200 Subject: [PATCH 8/9] update README.md, add dockerfiles --- .dockerignore | 1 + README.md | 10 ++- Dockerfile => cmd/cron/Dockerfile | 6 +- cmd/dataupdater/Dockerfile | 32 +++++++ main.go | 135 ------------------------------ 5 files changed, 42 insertions(+), 142 deletions(-) create mode 100644 .dockerignore rename Dockerfile => cmd/cron/Dockerfile (80%) create mode 100644 cmd/dataupdater/Dockerfile delete mode 100644 main.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..11ee758 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.env.local diff --git a/README.md b/README.md index 324994e..60e0f7d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# tribalwarshelp.com cron +# datafetcher - Adds automatically new servers. - Fetches and updates server data (players, tribes, ODA, ODD, ODS, OD, conquers, configs). @@ -40,9 +40,13 @@ git clone git@github.com:tribalwarshelp/cron.git ``` 2. Open the folder with this project in a terminal. 3. Set the required env variables directly in your system or create .env.local file. -4. Run the app. +4. Run the cron. ``` -go run main.go +go run ./cmd/cron/main.go +``` +5. Run the data updater in a new tab. +``` +go run ./cmd/dataupdater/main.go ``` ## License diff --git a/Dockerfile b/cmd/cron/Dockerfile similarity index 80% rename from Dockerfile rename to cmd/cron/Dockerfile index 82d02a3..be47ad2 100644 --- a/Dockerfile +++ b/cmd/cron/Dockerfile @@ -12,7 +12,7 @@ RUN go mod download # Copy the source from the current directory to the Working Directory inside the container COPY . . -RUN go build -o twcron . +RUN go build -o twcron ./cmd/cron ######## Start a new stage from scratch ####### FROM alpine:latest @@ -28,7 +28,5 @@ ENV APP_MODE=production EXPOSE 8080 RUN apk add --no-cache tzdata -ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.2.1/wait ./wait -RUN chmod +x ./wait -CMD ./wait && ./twcron +CMD ./twcron diff --git a/cmd/dataupdater/Dockerfile b/cmd/dataupdater/Dockerfile new file mode 100644 index 0000000..fde64d0 --- /dev/null +++ b/cmd/dataupdater/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:alpine as builder + +# Set the Current Working Directory inside the container +WORKDIR /app + +# Copy go mod and sum files +COPY go.mod go.sum ./ + +# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed +RUN go mod download + +# Copy the source from the current directory to the Working Directory inside the container +COPY . . + +RUN go build -o twdataupdater ./cmd/dataupdater + +######## Start a new stage from scratch ####### +FROM alpine:latest + +RUN apk --no-cache add ca-certificates + +WORKDIR /root/ + +# Copy the Pre-built binary file from the previous stage +COPY --from=builder /app/twdataupdater . + +ENV APP_MODE=production +EXPOSE 8080 + +RUN apk add --no-cache tzdata + +CMD ./twdataupdater diff --git a/main.go b/main.go deleted file mode 100644 index 93cfacd..0000000 --- a/main.go +++ /dev/null @@ -1,135 +0,0 @@ -package main - -import ( - "context" - "github.com/Kichiyaki/appmode" - "github.com/Kichiyaki/goutil/envutil" - "github.com/go-redis/redis/v8" - "github.com/pkg/errors" - "os" - "os/signal" - "syscall" - "time" - - "github.com/sirupsen/logrus" - - twhelpcron "github.com/tribalwarshelp/cron/pkg/cron" - "github.com/tribalwarshelp/cron/pkg/postgres" - "github.com/tribalwarshelp/cron/pkg/queue" - - "github.com/joho/godotenv" -) - -func init() { - if err := setupENVs(); err != nil { - logrus.Fatal(err) - } - setupLogger() -} - -func main() { - redisClient, err := initializeRedis() - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't connect to Redis")) - } - defer func() { - if err := redisClient.Close(); err != nil { - logrus.Warn(errors.Wrap(err, "Couldn't close the Redis connection")) - } - }() - - dbConn, err := postgres.Connect(nil) - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't connect to the db")) - } - defer func() { - if err := dbConn.Close(); err != nil { - logrus.Warn(errors.Wrap(err, "Couldn't close the db connection")) - } - }() - - q, err := queue.New(&queue.Config{ - DB: dbConn, - Redis: redisClient, - WorkerLimit: envutil.GetenvInt("WORKER_LIMIT"), - }) - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't initialize a queue")) - } - if err := q.Start(context.Background()); err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't start the queue")) - } - - c, err := twhelpcron.New(&twhelpcron.Config{ - DB: dbConn, - Queue: q, - RunOnInit: envutil.GetenvBool("RUN_ON_INIT"), - }) - if err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't initialize a cron instance")) - } - if err := c.Start(); err != nil { - logrus.Fatal(errors.Wrap(err, "Couldn't start the cron")) - } - defer c.Stop() - - logrus.Info("Cron is up and running!") - - channel := make(chan os.Signal, 1) - signal.Notify(channel, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) - <-channel - - logrus.Info("shutting down") - if err := q.Close(); err != nil { - logrus.Fatal(err) - } -} - -func setupENVs() error { - err := os.Setenv("TZ", "UTC") - if err != nil { - return errors.Wrap(err, "setupENVs") - } - - if appmode.Equals(appmode.DevelopmentMode) { - err := godotenv.Load(".env.local") - if err != nil { - return errors.Wrap(err, "setupENVs") - } - } - - return nil -} - -func setupLogger() { - if appmode.Equals(appmode.DevelopmentMode) { - logrus.SetLevel(logrus.DebugLevel) - } - - timestampFormat := "2006-01-02 15:04:05" - if appmode.Equals(appmode.ProductionMode) { - customFormatter := new(logrus.JSONFormatter) - customFormatter.TimestampFormat = timestampFormat - logrus.SetFormatter(customFormatter) - } else { - customFormatter := new(logrus.TextFormatter) - customFormatter.TimestampFormat = timestampFormat - customFormatter.FullTimestamp = true - logrus.SetFormatter(customFormatter) - } -} - -func initializeRedis() (redis.UniversalClient, error) { - client := redis.NewClient(&redis.Options{ - Addr: envutil.GetenvString("REDIS_ADDR"), - Username: envutil.GetenvString("REDIS_USERNAME"), - Password: envutil.GetenvString("REDIS_PASSWORD"), - DB: envutil.GetenvInt("REDIS_DB"), - }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := client.Ping(ctx).Err(); err != nil { - return nil, errors.Wrap(err, "initializeRedis") - } - return client, nil -} From 7e179276f7096e933d6a16bfe59e8d036d47c8ef Mon Sep 17 00:00:00 2001 From: Kichiyaki Date: Sun, 11 Jul 2021 11:22:31 +0200 Subject: [PATCH 9/9] update .circleci/config.yml --- .circleci/config.yml | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 3f00161..9a638e3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,10 +11,13 @@ jobs: # with credentials stored in the UI - run: | docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD - # build the application image - - run: docker build -t $DOCKER_LOGIN/twhelp-cron:latest . - # deploy the image + # build the cron image + - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:latest . + # build the data updater image + - run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:latest . + # deploy the images - run: docker push $DOCKER_LOGIN/twhelp-cron:latest + - run: docker push $DOCKER_LOGIN/twhelp-dataupdater:latest build_version: docker: - image: circleci/golang:1.16 @@ -26,10 +29,13 @@ jobs: # with credentials stored in the UI - run: | docker login -u $DOCKER_LOGIN -p $DOCKER_PASSWORD - # build the application image - - run: docker build -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . - # deploy the image + # build the cron image + - run: docker build -f cmd/cron/Dockerfile -t $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . + # build the data updater image + - run: docker build -f cmd/dataupdater/Dockerfile -t $DOCKER_LOGIN/twhelp-dataupdater:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') . + # deploy the images - run: docker push $DOCKER_LOGIN/twhelp-cron:$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') + - run: docker push $DOCKER_LOGIN/twhelp-dataupdater::$(echo $CIRCLE_TAG | sed -r 's/^.{1}//') workflows: version: 2 deploy_latest: