From 1c6624fa2d671d44ab3b0311dd196194a44b89d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Wed, 3 Jan 2024 08:59:24 +0000 Subject: [PATCH] feat: add a new job - sync ennoblements (#27) Reviewed-on: https://gitea.dwysokinski.me/twhelp/corev3/pulls/27 --- Makefile | 4 + cmd/twhelp/cmd_job.go | 67 +++++++++- cmd/twhelp/rabbitmq.go | 12 ++ .../publisher_watermill_ennoblement.go | 83 +++++++++++++ internal/app/publishers.go | 5 + internal/app/service_data_sync.go | 55 +++++++- internal/app/service_server.go | 4 +- .../domain/ennoblement_message_payloads.go | 117 ++++++++++++++++++ .../ennoblement_message_payloads_test.go | 42 +++++++ internal/watermillmsg/ennoblement.go | 15 +++ k8s/base/jobs.yml | 46 +++++++ k8s/overlays/dev/jobs.yml | 7 ++ 12 files changed, 448 insertions(+), 9 deletions(-) create mode 100644 internal/adapter/publisher_watermill_ennoblement.go create mode 100644 internal/domain/ennoblement_message_payloads.go create mode 100644 internal/domain/ennoblement_message_payloads_test.go create mode 100644 internal/watermillmsg/ennoblement.go diff --git a/Makefile b/Makefile index 80769cd..5496f60 100644 --- a/Makefile +++ b/Makefile @@ -32,3 +32,7 @@ generate: .PHONY: create-job-sync-data create-job-sync-data: kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)" + +.PHONY: create-job-sync-ennoblements +create-job-sync-ennoblements: + kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)" diff --git a/cmd/twhelp/cmd_job.go b/cmd/twhelp/cmd_job.go index 81267bc..cb0703a 100644 --- a/cmd/twhelp/cmd_job.go +++ b/cmd/twhelp/cmd_job.go @@ -67,16 +67,81 @@ var ( ) versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) + dataSyncSvc := app.NewDataSyncService(versionSvc, nil, serverPublisher, nil) shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() - if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil { + if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger data sync: %w", err) } logger.Info("data sync triggered") + return nil + }, + }, + { + Name: "ennoblements", + Usage: "Trigger ennoblement sync", + Flags: concatSlices(dbFlags, rmqFlags), + Action: func(c *cli.Context) error { + logger := loggerFromCtx(c.Context) + watermillLogger := newWatermillLogger(logger) + + amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := amqpConn.Close(); closeErr != nil { + logger.Warn("couldn't close amqp connection", slog.Any("error", err)) + } + }() + + publisher, err := newAMQPPublisher(amqpConn, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := publisher.Close(); closeErr != nil { + logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) + } + }() + + bunDB, err := newBunDBFromFlags(c) + if err != nil { + return err + } + defer func() { + logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) + if dbCloseErr := bunDB.Close(); dbCloseErr != nil { + logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) + } else { + logger.Debug("db connections closed") + } + }() + + ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher( + publisher, + newWatermillMarshaler(), + c.String(rmqFlagTopicSyncEnnoblementsCmd.Name), + c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name), + ) + + versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) + serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil) + dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher) + + shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) + defer stop() + + if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil { + return fmt.Errorf("couldn't trigger ennoblement sync: %w", err) + } + + logger.Info("ennoblement sync triggered") + return nil }, }, diff --git a/cmd/twhelp/rabbitmq.go b/cmd/twhelp/rabbitmq.go index fe6d2da..ef684ae 100644 --- a/cmd/twhelp/rabbitmq.go +++ b/cmd/twhelp/rabbitmq.go @@ -41,6 +41,16 @@ var ( Value: "villages.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"}, } + rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{ + Name: "rabbitmq.topic.syncEnnoblementsCmd", + Value: "ennoblements.cmd.sync", + EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"}, + } + rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.ennoblementsSyncedEvent", + Value: "ennoblements.event.synced", + EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, @@ -48,6 +58,8 @@ var ( rmqFlagTopicTribesSyncedEvent, rmqFlagTopicPlayersSyncedEvent, rmqFlagTopicVillagesSyncedEvent, + rmqFlagTopicSyncEnnoblementsCmd, + rmqFlagTopicEnnoblementsSyncedEvent, } ) diff --git a/internal/adapter/publisher_watermill_ennoblement.go b/internal/adapter/publisher_watermill_ennoblement.go new file mode 100644 index 0000000..79ee1ea --- /dev/null +++ b/internal/adapter/publisher_watermill_ennoblement.go @@ -0,0 +1,83 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type EnnoblementWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + cmdSyncTopic string + eventSyncedTopic string +} + +func NewEnnoblementWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + cmdSyncTopic string, + eventSyncedTopic string, +) *EnnoblementWatermillPublisher { + return &EnnoblementWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + eventSyncedTopic: eventSyncedTopic, + } +} + +func (pub *EnnoblementWatermillPublisher) CmdSync( + ctx context.Context, + payloads ...domain.SyncEnnoblementsCmdPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncEnnoblementsCmdPayload{ + ServerKey: p.ServerKey(), + ServerURL: p.ServerURL(), + VersionCode: p.VersionCode(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal SyncEnnoblementsCmdPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err) + } + + return nil +} + +func (pub *EnnoblementWatermillPublisher) EventSynced( + ctx context.Context, + payloads ...domain.EnnoblementsSyncedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.EnnoblementsSyncedEventPayload{ + ServerKey: p.ServerKey(), + ServerURL: p.ServerURL(), + VersionCode: p.VersionCode(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal EnnoblementsSyncedEventPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) + } + + return nil +} diff --git a/internal/app/publishers.go b/internal/app/publishers.go index 166e90b..c35736c 100644 --- a/internal/app/publishers.go +++ b/internal/app/publishers.go @@ -22,3 +22,8 @@ type PlayerPublisher interface { type VillagePublisher interface { EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error } + +type EnnoblementPublisher interface { + CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error + EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error +} diff --git a/internal/app/service_data_sync.go b/internal/app/service_data_sync.go index 992c0ec..ef26b64 100644 --- a/internal/app/service_data_sync.go +++ b/internal/app/service_data_sync.go @@ -8,12 +8,24 @@ import ( ) type DataSyncService struct { - versionSvc *VersionService - serverPub ServerPublisher + versionSvc *VersionService + serverSvc *ServerService + serverPub ServerPublisher + ennoblementPub EnnoblementPublisher } -func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService { - return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub} +func NewDataSyncService( + versionSvc *VersionService, + serverSvc *ServerService, + serverPub ServerPublisher, + ennoblementPub EnnoblementPublisher, +) *DataSyncService { + return &DataSyncService{ + versionSvc: versionSvc, + serverSvc: serverSvc, + serverPub: serverPub, + ennoblementPub: ennoblementPub, + } } func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { @@ -36,6 +48,37 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { return svc.serverPub.CmdSync(ctx, payloads...) } -func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error { - panic("unimplemented") +func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error { + versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams()) + if err != nil { + return err + } + + for _, v := range versions { + if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil { + return fmt.Errorf("%s: %w", v.Code(), err) + } + } + + return nil +} + +func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error { + servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code()) + if err != nil { + return err + } + + payloads := make([]domain.SyncEnnoblementsCmdPayload, 0, len(servers)) + + for _, s := range servers { + p, payloadErr := domain.NewSyncEnnoblementsCmdPayload(s.Key(), s.URL(), s.VersionCode()) + if payloadErr != nil { + return fmt.Errorf("%s: couldn't construct domain.SyncEnnoblementsCmdPayload: %w", s.Key(), payloadErr) + } + + payloads = append(payloads, p) + } + + return svc.ennoblementPub.CmdSync(ctx, payloads...) } diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 2a2433e..1d7694e 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -37,7 +37,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm return fmt.Errorf("%s: couldn't list special servers: %w", versionCode, err) } - currentlyStoredOpenServers, err := svc.listAllOpen(ctx, versionCode) + currentlyStoredOpenServers, err := svc.ListAllOpen(ctx, versionCode) if err != nil { return fmt.Errorf("%s: couldn't list open servers: %w", versionCode, err) } @@ -84,7 +84,7 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string return svc.ListAll(ctx, params) } -func (svc *ServerService) listAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) { +func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) { params := domain.NewListServersParams() if err := params.SetVersionCodes([]string{versionCode}); err != nil { return nil, err diff --git a/internal/domain/ennoblement_message_payloads.go b/internal/domain/ennoblement_message_payloads.go new file mode 100644 index 0000000..bf4d88d --- /dev/null +++ b/internal/domain/ennoblement_message_payloads.go @@ -0,0 +1,117 @@ +package domain + +import ( + "net/url" +) + +type SyncEnnoblementsCmdPayload struct { + serverKey string + serverURL *url.URL + versionCode string +} + +const syncEnnoblementsCmdPayloadModelName = "SyncEnnoblementsCmdPayload" + +func NewSyncEnnoblementsCmdPayload( + serverKey string, + serverURL *url.URL, + versionCode string, +) (SyncEnnoblementsCmdPayload, error) { + if serverKey == "" { + return SyncEnnoblementsCmdPayload{}, ValidationError{ + Model: syncEnnoblementsCmdPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if serverURL == nil { + return SyncEnnoblementsCmdPayload{}, ValidationError{ + Model: syncEnnoblementsCmdPayloadModelName, + Field: "serverURL", + Err: ErrNil, + } + } + + if versionCode == "" { + return SyncEnnoblementsCmdPayload{}, ValidationError{ + Model: syncEnnoblementsCmdPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + return SyncEnnoblementsCmdPayload{ + serverKey: serverKey, + serverURL: serverURL, + versionCode: versionCode, + }, nil +} + +func (p SyncEnnoblementsCmdPayload) ServerKey() string { + return p.serverKey +} + +func (p SyncEnnoblementsCmdPayload) ServerURL() *url.URL { + return p.serverURL +} + +func (p SyncEnnoblementsCmdPayload) VersionCode() string { + return p.versionCode +} + +type EnnoblementsSyncedEventPayload struct { + serverKey string + serverURL *url.URL + versionCode string +} + +const ennoblementsSyncedEventPayloadModelName = "EnnoblementsSyncedEventPayload" + +func NewEnnoblementsSyncedEventPayload( + serverKey string, + serverURL *url.URL, + versionCode string, +) (EnnoblementsSyncedEventPayload, error) { + if serverKey == "" { + return EnnoblementsSyncedEventPayload{}, ValidationError{ + Model: ennoblementsSyncedEventPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if serverURL == nil { + return EnnoblementsSyncedEventPayload{}, ValidationError{ + Model: ennoblementsSyncedEventPayloadModelName, + Field: "serverURL", + Err: ErrNil, + } + } + + if versionCode == "" { + return EnnoblementsSyncedEventPayload{}, ValidationError{ + Model: ennoblementsSyncedEventPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + return EnnoblementsSyncedEventPayload{ + serverKey: serverKey, + serverURL: serverURL, + versionCode: versionCode, + }, nil +} + +func (p EnnoblementsSyncedEventPayload) ServerKey() string { + return p.serverKey +} + +func (p EnnoblementsSyncedEventPayload) ServerURL() *url.URL { + return p.serverURL +} + +func (p EnnoblementsSyncedEventPayload) VersionCode() string { + return p.versionCode +} diff --git a/internal/domain/ennoblement_message_payloads_test.go b/internal/domain/ennoblement_message_payloads_test.go new file mode 100644 index 0000000..1af82fd --- /dev/null +++ b/internal/domain/ennoblement_message_payloads_test.go @@ -0,0 +1,42 @@ +package domain_test + +import ( + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewSyncEnnoblementsCmdPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + + payload, err := domain.NewSyncEnnoblementsCmdPayload( + server.Key(), + server.URL(), + server.VersionCode(), + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) +} + +func TestNewEnnoblementsSyncedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + + payload, err := domain.NewEnnoblementsSyncedEventPayload( + server.Key(), + server.URL(), + server.VersionCode(), + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) +} diff --git a/internal/watermillmsg/ennoblement.go b/internal/watermillmsg/ennoblement.go new file mode 100644 index 0000000..57fd5ed --- /dev/null +++ b/internal/watermillmsg/ennoblement.go @@ -0,0 +1,15 @@ +package watermillmsg + +import "net/url" + +type SyncEnnoblementsCmdPayload struct { + ServerKey string `json:"serverKey"` + ServerURL *url.URL `json:"serverUrl"` + VersionCode string `json:"versionCode"` +} + +type EnnoblementsSyncedEventPayload struct { + ServerKey string `json:"serverKey"` + ServerURL *url.URL `json:"serverUrl"` + VersionCode string `json:"versionCode"` +} diff --git a/k8s/base/jobs.yml b/k8s/base/jobs.yml index bec9706..4920f57 100644 --- a/k8s/base/jobs.yml +++ b/k8s/base/jobs.yml @@ -45,6 +45,52 @@ spec: memory: 128Mi --- apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-sync-ennoblements +spec: + schedule: "* * * * *" + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + concurrencyPolicy: Forbid + jobTemplate: + spec: + parallelism: 1 + template: + spec: + restartPolicy: Never + containers: + - name: twhelp-job-sync-ennoblements + image: twhelp + args: [job, sync, ennoblements] + env: + - name: APP_MODE + value: development + - name: LOG_LEVEL + value: debug + - name: DB_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: db-connection-string + - name: DB_MAX_OPEN_CONNS + value: "1" + - name: DB_MAX_IDLE_CONNS + value: "1" + - name: RABBITMQ_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: rabbitmq-connection-string + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 100m + memory: 128Mi +--- +apiVersion: batch/v1 kind: Job metadata: name: twhelp-migrations-job diff --git a/k8s/overlays/dev/jobs.yml b/k8s/overlays/dev/jobs.yml index 695028a..e1ea334 100644 --- a/k8s/overlays/dev/jobs.yml +++ b/k8s/overlays/dev/jobs.yml @@ -4,3 +4,10 @@ metadata: name: twhelp-job-sync-data spec: schedule: 0 0 * * * +--- +apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-sync-ennoblements +spec: + schedule: 0 0 * * *