feat: add a new job - create snapshots (#43)
Reviewed-on: twhelp/corev3#43
This commit is contained in:
parent
85b3f90447
commit
00cee5a181
4
Makefile
4
Makefile
|
@ -36,3 +36,7 @@ create-job-sync-data:
|
|||
.PHONY: create-job-sync-ennoblements
|
||||
create-job-sync-ennoblements:
|
||||
kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)"
|
||||
|
||||
.PHONY: create-job-create-snapshots
|
||||
create-job-create-snapshots:
|
||||
kubectl create job --from=cronjob/twhelp-job-create-snapshots-dev "twhelp-job-create-snapshots-$(shell openssl rand -hex 10)"
|
||||
|
|
|
@ -72,7 +72,7 @@ var (
|
|||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||
defer stop()
|
||||
|
||||
if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil {
|
||||
if err = dataSyncSvc.Sync(shutdownSignalCtx); err != nil {
|
||||
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
||||
}
|
||||
|
||||
|
@ -136,12 +136,81 @@ var (
|
|||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||
defer stop()
|
||||
|
||||
if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil {
|
||||
if err = dataSyncSvc.SyncEnnoblements(shutdownSignalCtx); err != nil {
|
||||
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("ennoblement sync triggered")
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "create",
|
||||
Subcommands: []*cli.Command{
|
||||
{
|
||||
Name: "snapshots",
|
||||
Usage: "Trigger snapshot creation (players/tribes)",
|
||||
Flags: concatSlices(dbFlags, rmqFlags),
|
||||
Action: func(c *cli.Context) error {
|
||||
logger := loggerFromCtx(c.Context)
|
||||
watermillLogger := newWatermillLogger(logger)
|
||||
|
||||
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := amqpConn.Close(); closeErr != nil {
|
||||
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
|
||||
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := publisher.Close(); closeErr != nil {
|
||||
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
|
||||
bunDB, err := newBunDBFromFlags(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
|
||||
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
|
||||
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
|
||||
} else {
|
||||
logger.Debug("db connections closed")
|
||||
}
|
||||
}()
|
||||
|
||||
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
||||
publisher,
|
||||
newWatermillMarshaler(),
|
||||
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
|
||||
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
|
||||
)
|
||||
|
||||
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
||||
snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher)
|
||||
|
||||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||
defer stop()
|
||||
|
||||
if err = snapshotSvc.Create(shutdownSignalCtx); err != nil {
|
||||
return fmt.Errorf("couldn't trigger snapshot creation: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("snapshot creation triggered")
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
|
|
|
@ -51,6 +51,16 @@ var (
|
|||
Value: "ennoblements.event.synced",
|
||||
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
|
||||
}
|
||||
rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{
|
||||
Name: "rabbitmq.topic.createTribeSnapshotsCmd",
|
||||
Value: "tribes.cmd.create_snapshots",
|
||||
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"},
|
||||
}
|
||||
rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{
|
||||
Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent",
|
||||
Value: "tribes.event.snapshots_created",
|
||||
EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"},
|
||||
}
|
||||
rmqFlags = []cli.Flag{
|
||||
rmqFlagConnectionString,
|
||||
rmqFlagTopicSyncServersCmd,
|
||||
|
@ -60,6 +70,8 @@ var (
|
|||
rmqFlagTopicVillagesSyncedEvent,
|
||||
rmqFlagTopicSyncEnnoblementsCmd,
|
||||
rmqFlagTopicEnnoblementsSyncedEvent,
|
||||
rmqFlagTopicCreateTribeSnapshotsCmd,
|
||||
rmqFlagTopicTribeSnapshotsCreatedEvent,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
package adapter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
type SnapshotWatermillPublisher struct {
|
||||
marshaler watermillmsg.Marshaler
|
||||
publisher message.Publisher
|
||||
cmdCreateTopic string
|
||||
eventCreatedTopic string
|
||||
}
|
||||
|
||||
func NewSnapshotWatermillPublisher(
|
||||
publisher message.Publisher,
|
||||
marshaler watermillmsg.Marshaler,
|
||||
cmdCreateTopic string,
|
||||
eventCreatedTopic string,
|
||||
) *SnapshotWatermillPublisher {
|
||||
return &SnapshotWatermillPublisher{
|
||||
publisher: publisher,
|
||||
marshaler: marshaler,
|
||||
cmdCreateTopic: cmdCreateTopic,
|
||||
eventCreatedTopic: eventCreatedTopic,
|
||||
}
|
||||
}
|
||||
|
||||
func (pub *SnapshotWatermillPublisher) CmdCreate(
|
||||
ctx context.Context,
|
||||
payloads ...domain.CreateSnapshotsCmdPayload,
|
||||
) error {
|
||||
msgs := make([]*message.Message, 0, len(payloads))
|
||||
|
||||
for _, p := range payloads {
|
||||
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CreateSnapshotsCmdPayload{
|
||||
ServerKey: p.ServerKey(),
|
||||
VersionCode: p.VersionCode(),
|
||||
VersionTimezone: p.VersionTimezone(),
|
||||
Date: p.Date(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: couldn't marshal CreateSnapshotsCmdPayload: %w", p.ServerKey(), err)
|
||||
}
|
||||
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
if err := pub.publisher.Publish(pub.cmdCreateTopic, msgs...); err != nil {
|
||||
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCreateTopic, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pub *SnapshotWatermillPublisher) EventCreated(
|
||||
ctx context.Context,
|
||||
payloads ...domain.SnapshotsCreatedEventPayload,
|
||||
) error {
|
||||
msgs := make([]*message.Message, 0, len(payloads))
|
||||
|
||||
for _, p := range payloads {
|
||||
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SnapshotsCreatedEventPayload{
|
||||
ServerKey: p.ServerKey(),
|
||||
VersionCode: p.VersionCode(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: couldn't marshal SnapshotsCreatedEventPayload: %w", p.ServerKey(), err)
|
||||
}
|
||||
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
if err := pub.publisher.Publish(pub.eventCreatedTopic, msgs...); err != nil {
|
||||
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventCreatedTopic, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -192,6 +192,20 @@ func (a listServersParamsApplier) apply(q *bun.SelectQuery) *bun.SelectQuery {
|
|||
q = q.Where("server.special = ?", special.Value)
|
||||
}
|
||||
|
||||
if tribeSnapshotsCreatedAtLT := a.params.TribeSnapshotsCreatedAtLT(); tribeSnapshotsCreatedAtLT.Valid {
|
||||
q = q.Where(
|
||||
"server.tribe_snapshots_created_at < ? OR server.tribe_snapshots_created_at is null",
|
||||
tribeSnapshotsCreatedAtLT.Value,
|
||||
)
|
||||
}
|
||||
|
||||
if playerSnapshotsCreatedAtLT := a.params.PlayerSnapshotsCreatedAtLT(); playerSnapshotsCreatedAtLT.Valid {
|
||||
q = q.Where(
|
||||
"server.player_snapshots_created_at < ? OR server.player_snapshots_created_at is null",
|
||||
playerSnapshotsCreatedAtLT.Value,
|
||||
)
|
||||
}
|
||||
|
||||
for _, s := range a.params.Sort() {
|
||||
switch s {
|
||||
case domain.ServerSortKeyASC:
|
||||
|
|
|
@ -104,6 +104,17 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
|||
require.NotEmpty(t, servers)
|
||||
randServer := servers[0]
|
||||
|
||||
snapshotsCreatedAtLT := time.Date(
|
||||
2022,
|
||||
time.March,
|
||||
19,
|
||||
12,
|
||||
0,
|
||||
54,
|
||||
0,
|
||||
time.UTC,
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
params func(t *testing.T) domain.ListServersParams
|
||||
|
@ -336,6 +347,60 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
|||
assert.NotEmpty(t, total)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OK: player snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
|
||||
params: func(t *testing.T) domain.ListServersParams {
|
||||
t.Helper()
|
||||
params := domain.NewListServersParams()
|
||||
require.NoError(t, params.SetPlayerSnapshotsCreatedAtLT(domain.NullTime{
|
||||
Value: snapshotsCreatedAtLT,
|
||||
Valid: true,
|
||||
}))
|
||||
return params
|
||||
},
|
||||
assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) {
|
||||
t.Helper()
|
||||
assert.NotEmpty(t, len(servers))
|
||||
for _, s := range servers {
|
||||
assert.True(t, s.PlayerSnapshotsCreatedAt().Before(snapshotsCreatedAtLT))
|
||||
}
|
||||
},
|
||||
assertError: func(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
require.NoError(t, err)
|
||||
},
|
||||
assertTotal: func(t *testing.T, params domain.ListServersParams, total int) {
|
||||
t.Helper()
|
||||
assert.NotEmpty(t, total)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OK: tribe snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
|
||||
params: func(t *testing.T) domain.ListServersParams {
|
||||
t.Helper()
|
||||
params := domain.NewListServersParams()
|
||||
require.NoError(t, params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{
|
||||
Value: snapshotsCreatedAtLT,
|
||||
Valid: true,
|
||||
}))
|
||||
return params
|
||||
},
|
||||
assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) {
|
||||
t.Helper()
|
||||
assert.NotEmpty(t, len(servers))
|
||||
for _, s := range servers {
|
||||
assert.True(t, s.TribeSnapshotsCreatedAt().Before(snapshotsCreatedAtLT))
|
||||
}
|
||||
},
|
||||
assertError: func(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
require.NoError(t, err)
|
||||
},
|
||||
assertTotal: func(t *testing.T, params domain.ListServersParams, total int) {
|
||||
t.Helper()
|
||||
assert.NotEmpty(t, total)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OK: offset=1 limit=2",
|
||||
params: func(t *testing.T) domain.ListServersParams {
|
||||
|
|
|
@ -27,3 +27,8 @@ type EnnoblementPublisher interface {
|
|||
CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error
|
||||
EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error
|
||||
}
|
||||
|
||||
type SnapshotPublisher interface {
|
||||
CmdCreate(ctx context.Context, payloads ...domain.CreateSnapshotsCmdPayload) error
|
||||
EventCreated(ctx context.Context, payloads ...domain.SnapshotsCreatedEventPayload) error
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ func NewDataSyncService(
|
|||
}
|
||||
}
|
||||
|
||||
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||
func (svc *DataSyncService) Sync(ctx context.Context) error {
|
||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -48,14 +48,14 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
|||
return svc.serverPub.CmdSync(ctx, payloads...)
|
||||
}
|
||||
|
||||
func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
|
||||
func (svc *DataSyncService) SyncEnnoblements(ctx context.Context) error {
|
||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range versions {
|
||||
if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil {
|
||||
if err = svc.syncEnnoblementsForVersion(ctx, v); err != nil {
|
||||
return fmt.Errorf("%s: %w", v.Code(), err)
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error {
|
||||
func (svc *DataSyncService) syncEnnoblementsForVersion(ctx context.Context, v domain.Version) error {
|
||||
servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -75,9 +75,6 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string
|
|||
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := params.SetSpecial(domain.NullBool{
|
||||
Value: true,
|
||||
Valid: true,
|
||||
|
@ -93,9 +90,6 @@ func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) (
|
|||
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := params.SetOpen(domain.NullBool{
|
||||
Value: true,
|
||||
Valid: true,
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
)
|
||||
|
||||
type SnapshotService struct {
|
||||
versionSvc *VersionService
|
||||
serverSvc *ServerService
|
||||
tribeSnapshotPublisher SnapshotPublisher
|
||||
}
|
||||
|
||||
func NewSnapshotService(
|
||||
versionSvc *VersionService,
|
||||
serverSvc *ServerService,
|
||||
tribeSnapshotPublisher SnapshotPublisher,
|
||||
) *SnapshotService {
|
||||
return &SnapshotService{
|
||||
versionSvc: versionSvc,
|
||||
serverSvc: serverSvc,
|
||||
tribeSnapshotPublisher: tribeSnapshotPublisher,
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *SnapshotService) Create(ctx context.Context) error {
|
||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range versions {
|
||||
loc, loopErr := time.LoadLocation(v.Timezone())
|
||||
if loopErr != nil {
|
||||
return fmt.Errorf("%s: %w", v.Code(), err)
|
||||
}
|
||||
|
||||
year, month, day := time.Now().In(loc).Date()
|
||||
snapshotsCreatedAtLT := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
||||
date := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
if loopErr = svc.createTribe(ctx, v, snapshotsCreatedAtLT, date); loopErr != nil {
|
||||
return loopErr
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *SnapshotService) createTribe(
|
||||
ctx context.Context,
|
||||
v domain.Version,
|
||||
snapshotsCreatedAtLT time.Time,
|
||||
date time.Time,
|
||||
) error {
|
||||
params := domain.NewListServersParams()
|
||||
if err := params.SetVersionCodes([]string{v.Code()}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := params.SetOpen(domain.NullBool{
|
||||
Value: true,
|
||||
Valid: true,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{
|
||||
Value: snapshotsCreatedAtLT,
|
||||
Valid: true,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
servers, err := svc.serverSvc.ListAll(ctx, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
payloads, err := svc.toPayload(v, servers, date)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return svc.tribeSnapshotPublisher.CmdCreate(ctx, payloads...)
|
||||
}
|
||||
|
||||
func (svc *SnapshotService) toPayload(
|
||||
v domain.Version,
|
||||
servers domain.Servers,
|
||||
date time.Time,
|
||||
) ([]domain.CreateSnapshotsCmdPayload, error) {
|
||||
payloads := make([]domain.CreateSnapshotsCmdPayload, 0, len(servers))
|
||||
|
||||
for _, s := range servers {
|
||||
p, err := domain.NewCreateSnapshotsCmdPayload(s.Key(), v.Code(), v.Timezone(), date)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s: couldn't construct domain.CreateSnapshotsCmdPayload: %w", s.Key(), err)
|
||||
}
|
||||
|
||||
payloads = append(payloads, p)
|
||||
}
|
||||
|
||||
return payloads, nil
|
||||
}
|
|
@ -444,14 +444,16 @@ const (
|
|||
const ServerListMaxLimit = 500
|
||||
|
||||
type ListServersParams struct {
|
||||
keys []string
|
||||
keyGT NullString
|
||||
versionCodes []string
|
||||
open NullBool
|
||||
special NullBool
|
||||
sort []ServerSort
|
||||
limit int
|
||||
offset int
|
||||
keys []string
|
||||
keyGT NullString
|
||||
versionCodes []string
|
||||
open NullBool
|
||||
special NullBool
|
||||
tribeSnapshotsCreatedAtLT NullTime
|
||||
playerSnapshotsCreatedAtLT NullTime
|
||||
sort []ServerSort
|
||||
limit int
|
||||
offset int
|
||||
}
|
||||
|
||||
const listServersParamsModelName = "ListServersParams"
|
||||
|
@ -512,6 +514,24 @@ func (params *ListServersParams) SetSpecial(special NullBool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (params *ListServersParams) TribeSnapshotsCreatedAtLT() NullTime {
|
||||
return params.tribeSnapshotsCreatedAtLT
|
||||
}
|
||||
|
||||
func (params *ListServersParams) SetTribeSnapshotsCreatedAtLT(tribeSnapshotsCreatedAtLT NullTime) error {
|
||||
params.tribeSnapshotsCreatedAtLT = tribeSnapshotsCreatedAtLT
|
||||
return nil
|
||||
}
|
||||
|
||||
func (params *ListServersParams) PlayerSnapshotsCreatedAtLT() NullTime {
|
||||
return params.playerSnapshotsCreatedAtLT
|
||||
}
|
||||
|
||||
func (params *ListServersParams) SetPlayerSnapshotsCreatedAtLT(playerSnapshotsCreatedAtLT NullTime) error {
|
||||
params.playerSnapshotsCreatedAtLT = playerSnapshotsCreatedAtLT
|
||||
return nil
|
||||
}
|
||||
|
||||
func (params *ListServersParams) Sort() []ServerSort {
|
||||
return params.sort
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
package domain
|
||||
|
||||
import "time"
|
||||
|
||||
type CreateSnapshotsCmdPayload struct {
|
||||
serverKey string
|
||||
versionCode string
|
||||
versionTimezone string
|
||||
date time.Time
|
||||
}
|
||||
|
||||
const createSnapshotsCmdPayloadModelName = "CreateSnapshotsCmdPayload"
|
||||
|
||||
func NewCreateSnapshotsCmdPayload(
|
||||
serverKey string,
|
||||
versionCode string,
|
||||
versionTimezone string,
|
||||
date time.Time,
|
||||
) (CreateSnapshotsCmdPayload, error) {
|
||||
if serverKey == "" {
|
||||
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||
Model: createSnapshotsCmdPayloadModelName,
|
||||
Field: "serverKey",
|
||||
Err: ErrRequired,
|
||||
}
|
||||
}
|
||||
|
||||
if versionCode == "" {
|
||||
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||
Model: createSnapshotsCmdPayloadModelName,
|
||||
Field: "versionCode",
|
||||
Err: ErrRequired,
|
||||
}
|
||||
}
|
||||
|
||||
if versionTimezone == "" {
|
||||
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||
Model: createSnapshotsCmdPayloadModelName,
|
||||
Field: "versionTimezone",
|
||||
Err: ErrRequired,
|
||||
}
|
||||
}
|
||||
|
||||
return CreateSnapshotsCmdPayload{
|
||||
serverKey: serverKey,
|
||||
versionCode: versionCode,
|
||||
versionTimezone: versionTimezone,
|
||||
date: date,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p CreateSnapshotsCmdPayload) ServerKey() string {
|
||||
return p.serverKey
|
||||
}
|
||||
|
||||
func (p CreateSnapshotsCmdPayload) VersionCode() string {
|
||||
return p.versionCode
|
||||
}
|
||||
|
||||
func (p CreateSnapshotsCmdPayload) VersionTimezone() string {
|
||||
return p.versionTimezone
|
||||
}
|
||||
|
||||
func (p CreateSnapshotsCmdPayload) Date() time.Time {
|
||||
return p.date
|
||||
}
|
||||
|
||||
type SnapshotsCreatedEventPayload struct {
|
||||
serverKey string
|
||||
versionCode string
|
||||
}
|
||||
|
||||
const snapshotsCreatedEventPayloadModelName = "SnapshotsCreatedEventPayload"
|
||||
|
||||
func NewSnapshotsCreatedEventPayload(serverKey string, versionCode string) (SnapshotsCreatedEventPayload, error) {
|
||||
if serverKey == "" {
|
||||
return SnapshotsCreatedEventPayload{}, ValidationError{
|
||||
Model: snapshotsCreatedEventPayloadModelName,
|
||||
Field: "serverKey",
|
||||
Err: ErrRequired,
|
||||
}
|
||||
}
|
||||
|
||||
if versionCode == "" {
|
||||
return SnapshotsCreatedEventPayload{}, ValidationError{
|
||||
Model: snapshotsCreatedEventPayloadModelName,
|
||||
Field: "versionCode",
|
||||
Err: ErrRequired,
|
||||
}
|
||||
}
|
||||
|
||||
return SnapshotsCreatedEventPayload{serverKey: serverKey, versionCode: versionCode}, nil
|
||||
}
|
||||
|
||||
func (p SnapshotsCreatedEventPayload) ServerKey() string {
|
||||
return p.serverKey
|
||||
}
|
||||
|
||||
func (p SnapshotsCreatedEventPayload) VersionCode() string {
|
||||
return p.versionCode
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package domain_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewCreateSnapshotsCmdPayload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
version := domaintest.NewVersion(t)
|
||||
server := domaintest.NewServer(t, func(cfg *domaintest.ServerConfig) {
|
||||
cfg.VersionCode = version.Code()
|
||||
})
|
||||
date := time.Now()
|
||||
|
||||
payload, err := domain.NewCreateSnapshotsCmdPayload(server.Key(), version.Code(), version.Timezone(), date)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||
assert.Equal(t, version.Code(), payload.VersionCode())
|
||||
assert.Equal(t, version.Timezone(), payload.VersionTimezone())
|
||||
assert.Equal(t, date, payload.Date())
|
||||
}
|
||||
|
||||
func TestNewSnapshotsCreatedEventPayload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
server := domaintest.NewServer(t)
|
||||
|
||||
payload, err := domain.NewSnapshotsCreatedEventPayload(server.Key(), server.VersionCode())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||
}
|
|
@ -176,7 +176,7 @@ func TestDataSync(t *testing.T) {
|
|||
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
||||
filesys = os.DirFS(fmt.Sprintf("./testdata/datasync/stage%d", stage))
|
||||
|
||||
require.NoError(t, dataSyncSvc.TriggerDataSync(ctx))
|
||||
require.NoError(t, dataSyncSvc.Sync(ctx))
|
||||
|
||||
var expectedServers []map[string]any
|
||||
readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers)
|
||||
|
|
|
@ -155,7 +155,7 @@ func TestEnnoblementSync(t *testing.T) {
|
|||
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
||||
filesys = os.DirFS(fmt.Sprintf("./testdata/ennoblementsync/stage%d", stage))
|
||||
|
||||
require.NoError(t, dataSyncSvc.TriggerEnnoblementSync(ctx))
|
||||
require.NoError(t, dataSyncSvc.SyncEnnoblements(ctx))
|
||||
|
||||
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
||||
require.NoError(collect, ctx.Err())
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package watermillmsg
|
||||
|
||||
import "time"
|
||||
|
||||
type CreateSnapshotsCmdPayload struct {
|
||||
ServerKey string `json:"serverKey"`
|
||||
VersionCode string `json:"versionCode"`
|
||||
VersionTimezone string `json:"versionTimezone"`
|
||||
Date time.Time `json:"date"`
|
||||
}
|
||||
|
||||
type SnapshotsCreatedEventPayload struct {
|
||||
ServerKey string `json:"serverKey"`
|
||||
VersionCode string `json:"versionCode"`
|
||||
}
|
|
@ -91,6 +91,52 @@ spec:
|
|||
memory: 128Mi
|
||||
---
|
||||
apiVersion: batch/v1
|
||||
kind: CronJob
|
||||
metadata:
|
||||
name: twhelp-job-create-snapshots
|
||||
spec:
|
||||
schedule: 30 * * * *
|
||||
successfulJobsHistoryLimit: 3
|
||||
failedJobsHistoryLimit: 3
|
||||
concurrencyPolicy: Forbid
|
||||
jobTemplate:
|
||||
spec:
|
||||
parallelism: 1
|
||||
template:
|
||||
spec:
|
||||
restartPolicy: Never
|
||||
containers:
|
||||
- name: twhelp-job-create-snapshots
|
||||
image: twhelp
|
||||
args: [job, create, snapshots]
|
||||
env:
|
||||
- name: APP_MODE
|
||||
value: development
|
||||
- name: LOG_LEVEL
|
||||
value: debug
|
||||
- name: DB_CONNECTION_STRING
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: db-connection-string
|
||||
- name: DB_MAX_OPEN_CONNS
|
||||
value: "1"
|
||||
- name: DB_MAX_IDLE_CONNS
|
||||
value: "1"
|
||||
- name: RABBITMQ_CONNECTION_STRING
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: rabbitmq-connection-string
|
||||
resources:
|
||||
requests:
|
||||
cpu: 50m
|
||||
memory: 64Mi
|
||||
limits:
|
||||
cpu: 100m
|
||||
memory: 128Mi
|
||||
---
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: twhelp-migrations-job
|
||||
|
|
|
@ -11,3 +11,10 @@ metadata:
|
|||
name: twhelp-job-sync-ennoblements
|
||||
spec:
|
||||
schedule: 0 0 * * *
|
||||
---
|
||||
apiVersion: batch/v1
|
||||
kind: CronJob
|
||||
metadata:
|
||||
name: twhelp-job-create-snapshots
|
||||
spec:
|
||||
schedule: 0 0 * * *
|
||||
|
|
Loading…
Reference in New Issue