diff --git a/Makefile b/Makefile index 5496f60..cf6f58c 100644 --- a/Makefile +++ b/Makefile @@ -36,3 +36,7 @@ create-job-sync-data: .PHONY: create-job-sync-ennoblements create-job-sync-ennoblements: kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)" + +.PHONY: create-job-create-snapshots +create-job-create-snapshots: + kubectl create job --from=cronjob/twhelp-job-create-snapshots-dev "twhelp-job-create-snapshots-$(shell openssl rand -hex 10)" diff --git a/cmd/twhelp/cmd_job.go b/cmd/twhelp/cmd_job.go index cb0703a..0256d0f 100644 --- a/cmd/twhelp/cmd_job.go +++ b/cmd/twhelp/cmd_job.go @@ -72,7 +72,7 @@ var ( shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() - if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil { + if err = dataSyncSvc.Sync(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger data sync: %w", err) } @@ -136,12 +136,81 @@ var ( shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() - if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil { + if err = dataSyncSvc.SyncEnnoblements(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger ennoblement sync: %w", err) } logger.Info("ennoblement sync triggered") + return nil + }, + }, + }, + }, + { + Name: "create", + Subcommands: []*cli.Command{ + { + Name: "snapshots", + Usage: "Trigger snapshot creation (players/tribes)", + Flags: concatSlices(dbFlags, rmqFlags), + Action: func(c *cli.Context) error { + logger := loggerFromCtx(c.Context) + watermillLogger := newWatermillLogger(logger) + + amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := amqpConn.Close(); closeErr != nil { + logger.Warn("couldn't close amqp connection", slog.Any("error", err)) + } + }() + + publisher, err := newAMQPPublisher(amqpConn, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := publisher.Close(); closeErr != nil { + logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) + } + }() + + bunDB, err := newBunDBFromFlags(c) + if err != nil { + return err + } + defer func() { + logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) + if dbCloseErr := bunDB.Close(); dbCloseErr != nil { + logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) + } else { + logger.Debug("db connections closed") + } + }() + + tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( + publisher, + newWatermillMarshaler(), + c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), + c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), + ) + + versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) + serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil) + snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher) + + shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) + defer stop() + + if err = snapshotSvc.Create(shutdownSignalCtx); err != nil { + return fmt.Errorf("couldn't trigger snapshot creation: %w", err) + } + + logger.Info("snapshot creation triggered") + return nil }, }, diff --git a/cmd/twhelp/rabbitmq.go b/cmd/twhelp/rabbitmq.go index 2818e06..47482cb 100644 --- a/cmd/twhelp/rabbitmq.go +++ b/cmd/twhelp/rabbitmq.go @@ -51,6 +51,16 @@ var ( Value: "ennoblements.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"}, } + rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{ + Name: "rabbitmq.topic.createTribeSnapshotsCmd", + Value: "tribes.cmd.create_snapshots", + EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"}, + } + rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent", + Value: "tribes.event.snapshots_created", + EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, @@ -60,6 +70,8 @@ var ( rmqFlagTopicVillagesSyncedEvent, rmqFlagTopicSyncEnnoblementsCmd, rmqFlagTopicEnnoblementsSyncedEvent, + rmqFlagTopicCreateTribeSnapshotsCmd, + rmqFlagTopicTribeSnapshotsCreatedEvent, } ) diff --git a/internal/adapter/publisher_watermill_snapshot.go b/internal/adapter/publisher_watermill_snapshot.go new file mode 100644 index 0000000..8a32840 --- /dev/null +++ b/internal/adapter/publisher_watermill_snapshot.go @@ -0,0 +1,83 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type SnapshotWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + cmdCreateTopic string + eventCreatedTopic string +} + +func NewSnapshotWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + cmdCreateTopic string, + eventCreatedTopic string, +) *SnapshotWatermillPublisher { + return &SnapshotWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + cmdCreateTopic: cmdCreateTopic, + eventCreatedTopic: eventCreatedTopic, + } +} + +func (pub *SnapshotWatermillPublisher) CmdCreate( + ctx context.Context, + payloads ...domain.CreateSnapshotsCmdPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CreateSnapshotsCmdPayload{ + ServerKey: p.ServerKey(), + VersionCode: p.VersionCode(), + VersionTimezone: p.VersionTimezone(), + Date: p.Date(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal CreateSnapshotsCmdPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.cmdCreateTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCreateTopic, err) + } + + return nil +} + +func (pub *SnapshotWatermillPublisher) EventCreated( + ctx context.Context, + payloads ...domain.SnapshotsCreatedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SnapshotsCreatedEventPayload{ + ServerKey: p.ServerKey(), + VersionCode: p.VersionCode(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal SnapshotsCreatedEventPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventCreatedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventCreatedTopic, err) + } + + return nil +} diff --git a/internal/adapter/repository_bun_server.go b/internal/adapter/repository_bun_server.go index 981bb9b..cbe8e1f 100644 --- a/internal/adapter/repository_bun_server.go +++ b/internal/adapter/repository_bun_server.go @@ -192,6 +192,20 @@ func (a listServersParamsApplier) apply(q *bun.SelectQuery) *bun.SelectQuery { q = q.Where("server.special = ?", special.Value) } + if tribeSnapshotsCreatedAtLT := a.params.TribeSnapshotsCreatedAtLT(); tribeSnapshotsCreatedAtLT.Valid { + q = q.Where( + "server.tribe_snapshots_created_at < ? OR server.tribe_snapshots_created_at is null", + tribeSnapshotsCreatedAtLT.Value, + ) + } + + if playerSnapshotsCreatedAtLT := a.params.PlayerSnapshotsCreatedAtLT(); playerSnapshotsCreatedAtLT.Valid { + q = q.Where( + "server.player_snapshots_created_at < ? OR server.player_snapshots_created_at is null", + playerSnapshotsCreatedAtLT.Value, + ) + } + for _, s := range a.params.Sort() { switch s { case domain.ServerSortKeyASC: diff --git a/internal/adapter/repository_server_test.go b/internal/adapter/repository_server_test.go index 9594670..d8ca1c2 100644 --- a/internal/adapter/repository_server_test.go +++ b/internal/adapter/repository_server_test.go @@ -104,6 +104,17 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories require.NotEmpty(t, servers) randServer := servers[0] + snapshotsCreatedAtLT := time.Date( + 2022, + time.March, + 19, + 12, + 0, + 54, + 0, + time.UTC, + ) + tests := []struct { name string params func(t *testing.T) domain.ListServersParams @@ -336,6 +347,60 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories assert.NotEmpty(t, total) }, }, + { + name: "OK: player snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339), + params: func(t *testing.T) domain.ListServersParams { + t.Helper() + params := domain.NewListServersParams() + require.NoError(t, params.SetPlayerSnapshotsCreatedAtLT(domain.NullTime{ + Value: snapshotsCreatedAtLT, + Valid: true, + })) + return params + }, + assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) { + t.Helper() + assert.NotEmpty(t, len(servers)) + for _, s := range servers { + assert.True(t, s.PlayerSnapshotsCreatedAt().Before(snapshotsCreatedAtLT)) + } + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListServersParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: "OK: tribe snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339), + params: func(t *testing.T) domain.ListServersParams { + t.Helper() + params := domain.NewListServersParams() + require.NoError(t, params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{ + Value: snapshotsCreatedAtLT, + Valid: true, + })) + return params + }, + assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) { + t.Helper() + assert.NotEmpty(t, len(servers)) + for _, s := range servers { + assert.True(t, s.TribeSnapshotsCreatedAt().Before(snapshotsCreatedAtLT)) + } + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListServersParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, { name: "OK: offset=1 limit=2", params: func(t *testing.T) domain.ListServersParams { diff --git a/internal/app/publishers.go b/internal/app/publishers.go index c35736c..22792d1 100644 --- a/internal/app/publishers.go +++ b/internal/app/publishers.go @@ -27,3 +27,8 @@ type EnnoblementPublisher interface { CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error } + +type SnapshotPublisher interface { + CmdCreate(ctx context.Context, payloads ...domain.CreateSnapshotsCmdPayload) error + EventCreated(ctx context.Context, payloads ...domain.SnapshotsCreatedEventPayload) error +} diff --git a/internal/app/service_data_sync.go b/internal/app/service_data_sync.go index ef26b64..3e88ceb 100644 --- a/internal/app/service_data_sync.go +++ b/internal/app/service_data_sync.go @@ -28,7 +28,7 @@ func NewDataSyncService( } } -func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { +func (svc *DataSyncService) Sync(ctx context.Context) error { versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams()) if err != nil { return err @@ -48,14 +48,14 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { return svc.serverPub.CmdSync(ctx, payloads...) } -func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error { +func (svc *DataSyncService) SyncEnnoblements(ctx context.Context) error { versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams()) if err != nil { return err } for _, v := range versions { - if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil { + if err = svc.syncEnnoblementsForVersion(ctx, v); err != nil { return fmt.Errorf("%s: %w", v.Code(), err) } } @@ -63,7 +63,7 @@ func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error { return nil } -func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error { +func (svc *DataSyncService) syncEnnoblementsForVersion(ctx context.Context, v domain.Version) error { servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code()) if err != nil { return err diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 9de0400..fa520dd 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -75,9 +75,6 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string if err := params.SetVersionCodes([]string{versionCode}); err != nil { return nil, err } - if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil { - return nil, err - } if err := params.SetSpecial(domain.NullBool{ Value: true, Valid: true, @@ -93,9 +90,6 @@ func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) ( if err := params.SetVersionCodes([]string{versionCode}); err != nil { return nil, err } - if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil { - return nil, err - } if err := params.SetOpen(domain.NullBool{ Value: true, Valid: true, diff --git a/internal/app/service_snapshot.go b/internal/app/service_snapshot.go new file mode 100644 index 0000000..1897cc9 --- /dev/null +++ b/internal/app/service_snapshot.go @@ -0,0 +1,106 @@ +package app + +import ( + "context" + "fmt" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type SnapshotService struct { + versionSvc *VersionService + serverSvc *ServerService + tribeSnapshotPublisher SnapshotPublisher +} + +func NewSnapshotService( + versionSvc *VersionService, + serverSvc *ServerService, + tribeSnapshotPublisher SnapshotPublisher, +) *SnapshotService { + return &SnapshotService{ + versionSvc: versionSvc, + serverSvc: serverSvc, + tribeSnapshotPublisher: tribeSnapshotPublisher, + } +} + +func (svc *SnapshotService) Create(ctx context.Context) error { + versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams()) + if err != nil { + return err + } + + for _, v := range versions { + loc, loopErr := time.LoadLocation(v.Timezone()) + if loopErr != nil { + return fmt.Errorf("%s: %w", v.Code(), err) + } + + year, month, day := time.Now().In(loc).Date() + snapshotsCreatedAtLT := time.Date(year, month, day, 0, 0, 0, 0, loc) + date := time.Date(year, month, day, 0, 0, 0, 0, time.UTC) + + if loopErr = svc.createTribe(ctx, v, snapshotsCreatedAtLT, date); loopErr != nil { + return loopErr + } + } + + return nil +} + +func (svc *SnapshotService) createTribe( + ctx context.Context, + v domain.Version, + snapshotsCreatedAtLT time.Time, + date time.Time, +) error { + params := domain.NewListServersParams() + if err := params.SetVersionCodes([]string{v.Code()}); err != nil { + return err + } + if err := params.SetOpen(domain.NullBool{ + Value: true, + Valid: true, + }); err != nil { + return err + } + if err := params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{ + Value: snapshotsCreatedAtLT, + Valid: true, + }); err != nil { + return err + } + + servers, err := svc.serverSvc.ListAll(ctx, params) + if err != nil { + return err + } + + payloads, err := svc.toPayload(v, servers, date) + if err != nil { + return err + } + + return svc.tribeSnapshotPublisher.CmdCreate(ctx, payloads...) +} + +func (svc *SnapshotService) toPayload( + v domain.Version, + servers domain.Servers, + date time.Time, +) ([]domain.CreateSnapshotsCmdPayload, error) { + payloads := make([]domain.CreateSnapshotsCmdPayload, 0, len(servers)) + + for _, s := range servers { + p, err := domain.NewCreateSnapshotsCmdPayload(s.Key(), v.Code(), v.Timezone(), date) + if err != nil { + return nil, fmt.Errorf("%s: couldn't construct domain.CreateSnapshotsCmdPayload: %w", s.Key(), err) + } + + payloads = append(payloads, p) + } + + return payloads, nil +} diff --git a/internal/domain/server.go b/internal/domain/server.go index d1bca21..2431ec4 100644 --- a/internal/domain/server.go +++ b/internal/domain/server.go @@ -444,14 +444,16 @@ const ( const ServerListMaxLimit = 500 type ListServersParams struct { - keys []string - keyGT NullString - versionCodes []string - open NullBool - special NullBool - sort []ServerSort - limit int - offset int + keys []string + keyGT NullString + versionCodes []string + open NullBool + special NullBool + tribeSnapshotsCreatedAtLT NullTime + playerSnapshotsCreatedAtLT NullTime + sort []ServerSort + limit int + offset int } const listServersParamsModelName = "ListServersParams" @@ -512,6 +514,24 @@ func (params *ListServersParams) SetSpecial(special NullBool) error { return nil } +func (params *ListServersParams) TribeSnapshotsCreatedAtLT() NullTime { + return params.tribeSnapshotsCreatedAtLT +} + +func (params *ListServersParams) SetTribeSnapshotsCreatedAtLT(tribeSnapshotsCreatedAtLT NullTime) error { + params.tribeSnapshotsCreatedAtLT = tribeSnapshotsCreatedAtLT + return nil +} + +func (params *ListServersParams) PlayerSnapshotsCreatedAtLT() NullTime { + return params.playerSnapshotsCreatedAtLT +} + +func (params *ListServersParams) SetPlayerSnapshotsCreatedAtLT(playerSnapshotsCreatedAtLT NullTime) error { + params.playerSnapshotsCreatedAtLT = playerSnapshotsCreatedAtLT + return nil +} + func (params *ListServersParams) Sort() []ServerSort { return params.sort } diff --git a/internal/domain/snapshot_message_payloads.go b/internal/domain/snapshot_message_payloads.go new file mode 100644 index 0000000..3058c99 --- /dev/null +++ b/internal/domain/snapshot_message_payloads.go @@ -0,0 +1,101 @@ +package domain + +import "time" + +type CreateSnapshotsCmdPayload struct { + serverKey string + versionCode string + versionTimezone string + date time.Time +} + +const createSnapshotsCmdPayloadModelName = "CreateSnapshotsCmdPayload" + +func NewCreateSnapshotsCmdPayload( + serverKey string, + versionCode string, + versionTimezone string, + date time.Time, +) (CreateSnapshotsCmdPayload, error) { + if serverKey == "" { + return CreateSnapshotsCmdPayload{}, ValidationError{ + Model: createSnapshotsCmdPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if versionCode == "" { + return CreateSnapshotsCmdPayload{}, ValidationError{ + Model: createSnapshotsCmdPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + if versionTimezone == "" { + return CreateSnapshotsCmdPayload{}, ValidationError{ + Model: createSnapshotsCmdPayloadModelName, + Field: "versionTimezone", + Err: ErrRequired, + } + } + + return CreateSnapshotsCmdPayload{ + serverKey: serverKey, + versionCode: versionCode, + versionTimezone: versionTimezone, + date: date, + }, nil +} + +func (p CreateSnapshotsCmdPayload) ServerKey() string { + return p.serverKey +} + +func (p CreateSnapshotsCmdPayload) VersionCode() string { + return p.versionCode +} + +func (p CreateSnapshotsCmdPayload) VersionTimezone() string { + return p.versionTimezone +} + +func (p CreateSnapshotsCmdPayload) Date() time.Time { + return p.date +} + +type SnapshotsCreatedEventPayload struct { + serverKey string + versionCode string +} + +const snapshotsCreatedEventPayloadModelName = "SnapshotsCreatedEventPayload" + +func NewSnapshotsCreatedEventPayload(serverKey string, versionCode string) (SnapshotsCreatedEventPayload, error) { + if serverKey == "" { + return SnapshotsCreatedEventPayload{}, ValidationError{ + Model: snapshotsCreatedEventPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if versionCode == "" { + return SnapshotsCreatedEventPayload{}, ValidationError{ + Model: snapshotsCreatedEventPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + return SnapshotsCreatedEventPayload{serverKey: serverKey, versionCode: versionCode}, nil +} + +func (p SnapshotsCreatedEventPayload) ServerKey() string { + return p.serverKey +} + +func (p SnapshotsCreatedEventPayload) VersionCode() string { + return p.versionCode +} diff --git a/internal/domain/snapshot_message_payloads_test.go b/internal/domain/snapshot_message_payloads_test.go new file mode 100644 index 0000000..e3f344f --- /dev/null +++ b/internal/domain/snapshot_message_payloads_test.go @@ -0,0 +1,39 @@ +package domain_test + +import ( + "testing" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewCreateSnapshotsCmdPayload(t *testing.T) { + t.Parallel() + + version := domaintest.NewVersion(t) + server := domaintest.NewServer(t, func(cfg *domaintest.ServerConfig) { + cfg.VersionCode = version.Code() + }) + date := time.Now() + + payload, err := domain.NewCreateSnapshotsCmdPayload(server.Key(), version.Code(), version.Timezone(), date) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, version.Code(), payload.VersionCode()) + assert.Equal(t, version.Timezone(), payload.VersionTimezone()) + assert.Equal(t, date, payload.Date()) +} + +func TestNewSnapshotsCreatedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + + payload, err := domain.NewSnapshotsCreatedEventPayload(server.Key(), server.VersionCode()) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) +} diff --git a/internal/service/data_sync_test.go b/internal/service/data_sync_test.go index 7c02f6d..43463b9 100644 --- a/internal/service/data_sync_test.go +++ b/internal/service/data_sync_test.go @@ -176,7 +176,7 @@ func TestDataSync(t *testing.T) { t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) { filesys = os.DirFS(fmt.Sprintf("./testdata/datasync/stage%d", stage)) - require.NoError(t, dataSyncSvc.TriggerDataSync(ctx)) + require.NoError(t, dataSyncSvc.Sync(ctx)) var expectedServers []map[string]any readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers) diff --git a/internal/service/ennoblement_sync_test.go b/internal/service/ennoblement_sync_test.go index 2c57001..41a70a2 100644 --- a/internal/service/ennoblement_sync_test.go +++ b/internal/service/ennoblement_sync_test.go @@ -155,7 +155,7 @@ func TestEnnoblementSync(t *testing.T) { t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) { filesys = os.DirFS(fmt.Sprintf("./testdata/ennoblementsync/stage%d", stage)) - require.NoError(t, dataSyncSvc.TriggerEnnoblementSync(ctx)) + require.NoError(t, dataSyncSvc.SyncEnnoblements(ctx)) assert.EventuallyWithTf(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) diff --git a/internal/watermill/watermillmsg/snapshot.go b/internal/watermill/watermillmsg/snapshot.go new file mode 100644 index 0000000..b1f78d3 --- /dev/null +++ b/internal/watermill/watermillmsg/snapshot.go @@ -0,0 +1,15 @@ +package watermillmsg + +import "time" + +type CreateSnapshotsCmdPayload struct { + ServerKey string `json:"serverKey"` + VersionCode string `json:"versionCode"` + VersionTimezone string `json:"versionTimezone"` + Date time.Time `json:"date"` +} + +type SnapshotsCreatedEventPayload struct { + ServerKey string `json:"serverKey"` + VersionCode string `json:"versionCode"` +} diff --git a/k8s/base/jobs.yml b/k8s/base/jobs.yml index 4920f57..bed6fea 100644 --- a/k8s/base/jobs.yml +++ b/k8s/base/jobs.yml @@ -91,6 +91,52 @@ spec: memory: 128Mi --- apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-create-snapshots +spec: + schedule: 30 * * * * + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + concurrencyPolicy: Forbid + jobTemplate: + spec: + parallelism: 1 + template: + spec: + restartPolicy: Never + containers: + - name: twhelp-job-create-snapshots + image: twhelp + args: [job, create, snapshots] + env: + - name: APP_MODE + value: development + - name: LOG_LEVEL + value: debug + - name: DB_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: db-connection-string + - name: DB_MAX_OPEN_CONNS + value: "1" + - name: DB_MAX_IDLE_CONNS + value: "1" + - name: RABBITMQ_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: rabbitmq-connection-string + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 100m + memory: 128Mi +--- +apiVersion: batch/v1 kind: Job metadata: name: twhelp-migrations-job diff --git a/k8s/overlays/dev/jobs.yml b/k8s/overlays/dev/jobs.yml index e1ea334..18e7045 100644 --- a/k8s/overlays/dev/jobs.yml +++ b/k8s/overlays/dev/jobs.yml @@ -11,3 +11,10 @@ metadata: name: twhelp-job-sync-ennoblements spec: schedule: 0 0 * * * +--- +apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-create-snapshots +spec: + schedule: 0 0 * * *