feat: add a new job - create snapshots (#43)
Reviewed-on: twhelp/corev3#43
This commit is contained in:
parent
85b3f90447
commit
00cee5a181
4
Makefile
4
Makefile
|
@ -36,3 +36,7 @@ create-job-sync-data:
|
||||||
.PHONY: create-job-sync-ennoblements
|
.PHONY: create-job-sync-ennoblements
|
||||||
create-job-sync-ennoblements:
|
create-job-sync-ennoblements:
|
||||||
kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)"
|
kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)"
|
||||||
|
|
||||||
|
.PHONY: create-job-create-snapshots
|
||||||
|
create-job-create-snapshots:
|
||||||
|
kubectl create job --from=cronjob/twhelp-job-create-snapshots-dev "twhelp-job-create-snapshots-$(shell openssl rand -hex 10)"
|
||||||
|
|
|
@ -72,7 +72,7 @@ var (
|
||||||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil {
|
if err = dataSyncSvc.Sync(shutdownSignalCtx); err != nil {
|
||||||
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,12 +136,81 @@ var (
|
||||||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil {
|
if err = dataSyncSvc.SyncEnnoblements(shutdownSignalCtx); err != nil {
|
||||||
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
|
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("ennoblement sync triggered")
|
logger.Info("ennoblement sync triggered")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "create",
|
||||||
|
Subcommands: []*cli.Command{
|
||||||
|
{
|
||||||
|
Name: "snapshots",
|
||||||
|
Usage: "Trigger snapshot creation (players/tribes)",
|
||||||
|
Flags: concatSlices(dbFlags, rmqFlags),
|
||||||
|
Action: func(c *cli.Context) error {
|
||||||
|
logger := loggerFromCtx(c.Context)
|
||||||
|
watermillLogger := newWatermillLogger(logger)
|
||||||
|
|
||||||
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
||||||
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closeErr := publisher.Close(); closeErr != nil {
|
||||||
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
bunDB, err := newBunDBFromFlags(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
|
||||||
|
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
|
||||||
|
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
|
||||||
|
} else {
|
||||||
|
logger.Debug("db connections closed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
||||||
|
publisher,
|
||||||
|
newWatermillMarshaler(),
|
||||||
|
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
|
||||||
|
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
|
||||||
|
)
|
||||||
|
|
||||||
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||||
|
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
||||||
|
snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher)
|
||||||
|
|
||||||
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
if err = snapshotSvc.Create(shutdownSignalCtx); err != nil {
|
||||||
|
return fmt.Errorf("couldn't trigger snapshot creation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("snapshot creation triggered")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -51,6 +51,16 @@ var (
|
||||||
Value: "ennoblements.event.synced",
|
Value: "ennoblements.event.synced",
|
||||||
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
|
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
|
||||||
}
|
}
|
||||||
|
rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.createTribeSnapshotsCmd",
|
||||||
|
Value: "tribes.cmd.create_snapshots",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"},
|
||||||
|
}
|
||||||
|
rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent",
|
||||||
|
Value: "tribes.event.snapshots_created",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"},
|
||||||
|
}
|
||||||
rmqFlags = []cli.Flag{
|
rmqFlags = []cli.Flag{
|
||||||
rmqFlagConnectionString,
|
rmqFlagConnectionString,
|
||||||
rmqFlagTopicSyncServersCmd,
|
rmqFlagTopicSyncServersCmd,
|
||||||
|
@ -60,6 +70,8 @@ var (
|
||||||
rmqFlagTopicVillagesSyncedEvent,
|
rmqFlagTopicVillagesSyncedEvent,
|
||||||
rmqFlagTopicSyncEnnoblementsCmd,
|
rmqFlagTopicSyncEnnoblementsCmd,
|
||||||
rmqFlagTopicEnnoblementsSyncedEvent,
|
rmqFlagTopicEnnoblementsSyncedEvent,
|
||||||
|
rmqFlagTopicCreateTribeSnapshotsCmd,
|
||||||
|
rmqFlagTopicTribeSnapshotsCreatedEvent,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
package adapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SnapshotWatermillPublisher struct {
|
||||||
|
marshaler watermillmsg.Marshaler
|
||||||
|
publisher message.Publisher
|
||||||
|
cmdCreateTopic string
|
||||||
|
eventCreatedTopic string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSnapshotWatermillPublisher(
|
||||||
|
publisher message.Publisher,
|
||||||
|
marshaler watermillmsg.Marshaler,
|
||||||
|
cmdCreateTopic string,
|
||||||
|
eventCreatedTopic string,
|
||||||
|
) *SnapshotWatermillPublisher {
|
||||||
|
return &SnapshotWatermillPublisher{
|
||||||
|
publisher: publisher,
|
||||||
|
marshaler: marshaler,
|
||||||
|
cmdCreateTopic: cmdCreateTopic,
|
||||||
|
eventCreatedTopic: eventCreatedTopic,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *SnapshotWatermillPublisher) CmdCreate(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.CreateSnapshotsCmdPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CreateSnapshotsCmdPayload{
|
||||||
|
ServerKey: p.ServerKey(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
VersionTimezone: p.VersionTimezone(),
|
||||||
|
Date: p.Date(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal CreateSnapshotsCmdPayload: %w", p.ServerKey(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.cmdCreateTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCreateTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *SnapshotWatermillPublisher) EventCreated(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.SnapshotsCreatedEventPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SnapshotsCreatedEventPayload{
|
||||||
|
ServerKey: p.ServerKey(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal SnapshotsCreatedEventPayload: %w", p.ServerKey(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.eventCreatedTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventCreatedTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -192,6 +192,20 @@ func (a listServersParamsApplier) apply(q *bun.SelectQuery) *bun.SelectQuery {
|
||||||
q = q.Where("server.special = ?", special.Value)
|
q = q.Where("server.special = ?", special.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tribeSnapshotsCreatedAtLT := a.params.TribeSnapshotsCreatedAtLT(); tribeSnapshotsCreatedAtLT.Valid {
|
||||||
|
q = q.Where(
|
||||||
|
"server.tribe_snapshots_created_at < ? OR server.tribe_snapshots_created_at is null",
|
||||||
|
tribeSnapshotsCreatedAtLT.Value,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if playerSnapshotsCreatedAtLT := a.params.PlayerSnapshotsCreatedAtLT(); playerSnapshotsCreatedAtLT.Valid {
|
||||||
|
q = q.Where(
|
||||||
|
"server.player_snapshots_created_at < ? OR server.player_snapshots_created_at is null",
|
||||||
|
playerSnapshotsCreatedAtLT.Value,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
for _, s := range a.params.Sort() {
|
for _, s := range a.params.Sort() {
|
||||||
switch s {
|
switch s {
|
||||||
case domain.ServerSortKeyASC:
|
case domain.ServerSortKeyASC:
|
||||||
|
|
|
@ -104,6 +104,17 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
||||||
require.NotEmpty(t, servers)
|
require.NotEmpty(t, servers)
|
||||||
randServer := servers[0]
|
randServer := servers[0]
|
||||||
|
|
||||||
|
snapshotsCreatedAtLT := time.Date(
|
||||||
|
2022,
|
||||||
|
time.March,
|
||||||
|
19,
|
||||||
|
12,
|
||||||
|
0,
|
||||||
|
54,
|
||||||
|
0,
|
||||||
|
time.UTC,
|
||||||
|
)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
params func(t *testing.T) domain.ListServersParams
|
params func(t *testing.T) domain.ListServersParams
|
||||||
|
@ -336,6 +347,60 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
||||||
assert.NotEmpty(t, total)
|
assert.NotEmpty(t, total)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "OK: player snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
|
||||||
|
params: func(t *testing.T) domain.ListServersParams {
|
||||||
|
t.Helper()
|
||||||
|
params := domain.NewListServersParams()
|
||||||
|
require.NoError(t, params.SetPlayerSnapshotsCreatedAtLT(domain.NullTime{
|
||||||
|
Value: snapshotsCreatedAtLT,
|
||||||
|
Valid: true,
|
||||||
|
}))
|
||||||
|
return params
|
||||||
|
},
|
||||||
|
assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) {
|
||||||
|
t.Helper()
|
||||||
|
assert.NotEmpty(t, len(servers))
|
||||||
|
for _, s := range servers {
|
||||||
|
assert.True(t, s.PlayerSnapshotsCreatedAt().Before(snapshotsCreatedAtLT))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
assertError: func(t *testing.T, err error) {
|
||||||
|
t.Helper()
|
||||||
|
require.NoError(t, err)
|
||||||
|
},
|
||||||
|
assertTotal: func(t *testing.T, params domain.ListServersParams, total int) {
|
||||||
|
t.Helper()
|
||||||
|
assert.NotEmpty(t, total)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OK: tribe snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
|
||||||
|
params: func(t *testing.T) domain.ListServersParams {
|
||||||
|
t.Helper()
|
||||||
|
params := domain.NewListServersParams()
|
||||||
|
require.NoError(t, params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{
|
||||||
|
Value: snapshotsCreatedAtLT,
|
||||||
|
Valid: true,
|
||||||
|
}))
|
||||||
|
return params
|
||||||
|
},
|
||||||
|
assertServers: func(t *testing.T, params domain.ListServersParams, servers domain.Servers) {
|
||||||
|
t.Helper()
|
||||||
|
assert.NotEmpty(t, len(servers))
|
||||||
|
for _, s := range servers {
|
||||||
|
assert.True(t, s.TribeSnapshotsCreatedAt().Before(snapshotsCreatedAtLT))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
assertError: func(t *testing.T, err error) {
|
||||||
|
t.Helper()
|
||||||
|
require.NoError(t, err)
|
||||||
|
},
|
||||||
|
assertTotal: func(t *testing.T, params domain.ListServersParams, total int) {
|
||||||
|
t.Helper()
|
||||||
|
assert.NotEmpty(t, total)
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "OK: offset=1 limit=2",
|
name: "OK: offset=1 limit=2",
|
||||||
params: func(t *testing.T) domain.ListServersParams {
|
params: func(t *testing.T) domain.ListServersParams {
|
||||||
|
|
|
@ -27,3 +27,8 @@ type EnnoblementPublisher interface {
|
||||||
CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error
|
CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error
|
||||||
EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error
|
EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SnapshotPublisher interface {
|
||||||
|
CmdCreate(ctx context.Context, payloads ...domain.CreateSnapshotsCmdPayload) error
|
||||||
|
EventCreated(ctx context.Context, payloads ...domain.SnapshotsCreatedEventPayload) error
|
||||||
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ func NewDataSyncService(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
func (svc *DataSyncService) Sync(ctx context.Context) error {
|
||||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -48,14 +48,14 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||||
return svc.serverPub.CmdSync(ctx, payloads...)
|
return svc.serverPub.CmdSync(ctx, payloads...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
|
func (svc *DataSyncService) SyncEnnoblements(ctx context.Context) error {
|
||||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range versions {
|
for _, v := range versions {
|
||||||
if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil {
|
if err = svc.syncEnnoblementsForVersion(ctx, v); err != nil {
|
||||||
return fmt.Errorf("%s: %w", v.Code(), err)
|
return fmt.Errorf("%s: %w", v.Code(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error {
|
func (svc *DataSyncService) syncEnnoblementsForVersion(ctx context.Context, v domain.Version) error {
|
||||||
servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code())
|
servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -75,9 +75,6 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string
|
||||||
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := params.SetSpecial(domain.NullBool{
|
if err := params.SetSpecial(domain.NullBool{
|
||||||
Value: true,
|
Value: true,
|
||||||
Valid: true,
|
Valid: true,
|
||||||
|
@ -93,9 +90,6 @@ func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) (
|
||||||
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := params.SetSort([]domain.ServerSort{domain.ServerSortKeyASC}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := params.SetOpen(domain.NullBool{
|
if err := params.SetOpen(domain.NullBool{
|
||||||
Value: true,
|
Value: true,
|
||||||
Valid: true,
|
Valid: true,
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SnapshotService struct {
|
||||||
|
versionSvc *VersionService
|
||||||
|
serverSvc *ServerService
|
||||||
|
tribeSnapshotPublisher SnapshotPublisher
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSnapshotService(
|
||||||
|
versionSvc *VersionService,
|
||||||
|
serverSvc *ServerService,
|
||||||
|
tribeSnapshotPublisher SnapshotPublisher,
|
||||||
|
) *SnapshotService {
|
||||||
|
return &SnapshotService{
|
||||||
|
versionSvc: versionSvc,
|
||||||
|
serverSvc: serverSvc,
|
||||||
|
tribeSnapshotPublisher: tribeSnapshotPublisher,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *SnapshotService) Create(ctx context.Context) error {
|
||||||
|
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range versions {
|
||||||
|
loc, loopErr := time.LoadLocation(v.Timezone())
|
||||||
|
if loopErr != nil {
|
||||||
|
return fmt.Errorf("%s: %w", v.Code(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
year, month, day := time.Now().In(loc).Date()
|
||||||
|
snapshotsCreatedAtLT := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
||||||
|
date := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
if loopErr = svc.createTribe(ctx, v, snapshotsCreatedAtLT, date); loopErr != nil {
|
||||||
|
return loopErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *SnapshotService) createTribe(
|
||||||
|
ctx context.Context,
|
||||||
|
v domain.Version,
|
||||||
|
snapshotsCreatedAtLT time.Time,
|
||||||
|
date time.Time,
|
||||||
|
) error {
|
||||||
|
params := domain.NewListServersParams()
|
||||||
|
if err := params.SetVersionCodes([]string{v.Code()}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := params.SetOpen(domain.NullBool{
|
||||||
|
Value: true,
|
||||||
|
Valid: true,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := params.SetTribeSnapshotsCreatedAtLT(domain.NullTime{
|
||||||
|
Value: snapshotsCreatedAtLT,
|
||||||
|
Valid: true,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
servers, err := svc.serverSvc.ListAll(ctx, params)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads, err := svc.toPayload(v, servers, date)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc.tribeSnapshotPublisher.CmdCreate(ctx, payloads...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *SnapshotService) toPayload(
|
||||||
|
v domain.Version,
|
||||||
|
servers domain.Servers,
|
||||||
|
date time.Time,
|
||||||
|
) ([]domain.CreateSnapshotsCmdPayload, error) {
|
||||||
|
payloads := make([]domain.CreateSnapshotsCmdPayload, 0, len(servers))
|
||||||
|
|
||||||
|
for _, s := range servers {
|
||||||
|
p, err := domain.NewCreateSnapshotsCmdPayload(s.Key(), v.Code(), v.Timezone(), date)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%s: couldn't construct domain.CreateSnapshotsCmdPayload: %w", s.Key(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads = append(payloads, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloads, nil
|
||||||
|
}
|
|
@ -444,14 +444,16 @@ const (
|
||||||
const ServerListMaxLimit = 500
|
const ServerListMaxLimit = 500
|
||||||
|
|
||||||
type ListServersParams struct {
|
type ListServersParams struct {
|
||||||
keys []string
|
keys []string
|
||||||
keyGT NullString
|
keyGT NullString
|
||||||
versionCodes []string
|
versionCodes []string
|
||||||
open NullBool
|
open NullBool
|
||||||
special NullBool
|
special NullBool
|
||||||
sort []ServerSort
|
tribeSnapshotsCreatedAtLT NullTime
|
||||||
limit int
|
playerSnapshotsCreatedAtLT NullTime
|
||||||
offset int
|
sort []ServerSort
|
||||||
|
limit int
|
||||||
|
offset int
|
||||||
}
|
}
|
||||||
|
|
||||||
const listServersParamsModelName = "ListServersParams"
|
const listServersParamsModelName = "ListServersParams"
|
||||||
|
@ -512,6 +514,24 @@ func (params *ListServersParams) SetSpecial(special NullBool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (params *ListServersParams) TribeSnapshotsCreatedAtLT() NullTime {
|
||||||
|
return params.tribeSnapshotsCreatedAtLT
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *ListServersParams) SetTribeSnapshotsCreatedAtLT(tribeSnapshotsCreatedAtLT NullTime) error {
|
||||||
|
params.tribeSnapshotsCreatedAtLT = tribeSnapshotsCreatedAtLT
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *ListServersParams) PlayerSnapshotsCreatedAtLT() NullTime {
|
||||||
|
return params.playerSnapshotsCreatedAtLT
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *ListServersParams) SetPlayerSnapshotsCreatedAtLT(playerSnapshotsCreatedAtLT NullTime) error {
|
||||||
|
params.playerSnapshotsCreatedAtLT = playerSnapshotsCreatedAtLT
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (params *ListServersParams) Sort() []ServerSort {
|
func (params *ListServersParams) Sort() []ServerSort {
|
||||||
return params.sort
|
return params.sort
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type CreateSnapshotsCmdPayload struct {
|
||||||
|
serverKey string
|
||||||
|
versionCode string
|
||||||
|
versionTimezone string
|
||||||
|
date time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
const createSnapshotsCmdPayloadModelName = "CreateSnapshotsCmdPayload"
|
||||||
|
|
||||||
|
func NewCreateSnapshotsCmdPayload(
|
||||||
|
serverKey string,
|
||||||
|
versionCode string,
|
||||||
|
versionTimezone string,
|
||||||
|
date time.Time,
|
||||||
|
) (CreateSnapshotsCmdPayload, error) {
|
||||||
|
if serverKey == "" {
|
||||||
|
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||||
|
Model: createSnapshotsCmdPayloadModelName,
|
||||||
|
Field: "serverKey",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||||
|
Model: createSnapshotsCmdPayloadModelName,
|
||||||
|
Field: "versionCode",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionTimezone == "" {
|
||||||
|
return CreateSnapshotsCmdPayload{}, ValidationError{
|
||||||
|
Model: createSnapshotsCmdPayloadModelName,
|
||||||
|
Field: "versionTimezone",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return CreateSnapshotsCmdPayload{
|
||||||
|
serverKey: serverKey,
|
||||||
|
versionCode: versionCode,
|
||||||
|
versionTimezone: versionTimezone,
|
||||||
|
date: date,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p CreateSnapshotsCmdPayload) ServerKey() string {
|
||||||
|
return p.serverKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p CreateSnapshotsCmdPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p CreateSnapshotsCmdPayload) VersionTimezone() string {
|
||||||
|
return p.versionTimezone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p CreateSnapshotsCmdPayload) Date() time.Time {
|
||||||
|
return p.date
|
||||||
|
}
|
||||||
|
|
||||||
|
type SnapshotsCreatedEventPayload struct {
|
||||||
|
serverKey string
|
||||||
|
versionCode string
|
||||||
|
}
|
||||||
|
|
||||||
|
const snapshotsCreatedEventPayloadModelName = "SnapshotsCreatedEventPayload"
|
||||||
|
|
||||||
|
func NewSnapshotsCreatedEventPayload(serverKey string, versionCode string) (SnapshotsCreatedEventPayload, error) {
|
||||||
|
if serverKey == "" {
|
||||||
|
return SnapshotsCreatedEventPayload{}, ValidationError{
|
||||||
|
Model: snapshotsCreatedEventPayloadModelName,
|
||||||
|
Field: "serverKey",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return SnapshotsCreatedEventPayload{}, ValidationError{
|
||||||
|
Model: snapshotsCreatedEventPayloadModelName,
|
||||||
|
Field: "versionCode",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return SnapshotsCreatedEventPayload{serverKey: serverKey, versionCode: versionCode}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SnapshotsCreatedEventPayload) ServerKey() string {
|
||||||
|
return p.serverKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SnapshotsCreatedEventPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package domain_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewCreateSnapshotsCmdPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
version := domaintest.NewVersion(t)
|
||||||
|
server := domaintest.NewServer(t, func(cfg *domaintest.ServerConfig) {
|
||||||
|
cfg.VersionCode = version.Code()
|
||||||
|
})
|
||||||
|
date := time.Now()
|
||||||
|
|
||||||
|
payload, err := domain.NewCreateSnapshotsCmdPayload(server.Key(), version.Code(), version.Timezone(), date)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||||
|
assert.Equal(t, version.Code(), payload.VersionCode())
|
||||||
|
assert.Equal(t, version.Timezone(), payload.VersionTimezone())
|
||||||
|
assert.Equal(t, date, payload.Date())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewSnapshotsCreatedEventPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
server := domaintest.NewServer(t)
|
||||||
|
|
||||||
|
payload, err := domain.NewSnapshotsCreatedEventPayload(server.Key(), server.VersionCode())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||||
|
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||||
|
}
|
|
@ -176,7 +176,7 @@ func TestDataSync(t *testing.T) {
|
||||||
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
||||||
filesys = os.DirFS(fmt.Sprintf("./testdata/datasync/stage%d", stage))
|
filesys = os.DirFS(fmt.Sprintf("./testdata/datasync/stage%d", stage))
|
||||||
|
|
||||||
require.NoError(t, dataSyncSvc.TriggerDataSync(ctx))
|
require.NoError(t, dataSyncSvc.Sync(ctx))
|
||||||
|
|
||||||
var expectedServers []map[string]any
|
var expectedServers []map[string]any
|
||||||
readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers)
|
readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers)
|
||||||
|
|
|
@ -155,7 +155,7 @@ func TestEnnoblementSync(t *testing.T) {
|
||||||
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
|
||||||
filesys = os.DirFS(fmt.Sprintf("./testdata/ennoblementsync/stage%d", stage))
|
filesys = os.DirFS(fmt.Sprintf("./testdata/ennoblementsync/stage%d", stage))
|
||||||
|
|
||||||
require.NoError(t, dataSyncSvc.TriggerEnnoblementSync(ctx))
|
require.NoError(t, dataSyncSvc.SyncEnnoblements(ctx))
|
||||||
|
|
||||||
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
||||||
require.NoError(collect, ctx.Err())
|
require.NoError(collect, ctx.Err())
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type CreateSnapshotsCmdPayload struct {
|
||||||
|
ServerKey string `json:"serverKey"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
VersionTimezone string `json:"versionTimezone"`
|
||||||
|
Date time.Time `json:"date"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SnapshotsCreatedEventPayload struct {
|
||||||
|
ServerKey string `json:"serverKey"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
}
|
|
@ -91,6 +91,52 @@ spec:
|
||||||
memory: 128Mi
|
memory: 128Mi
|
||||||
---
|
---
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
|
kind: CronJob
|
||||||
|
metadata:
|
||||||
|
name: twhelp-job-create-snapshots
|
||||||
|
spec:
|
||||||
|
schedule: 30 * * * *
|
||||||
|
successfulJobsHistoryLimit: 3
|
||||||
|
failedJobsHistoryLimit: 3
|
||||||
|
concurrencyPolicy: Forbid
|
||||||
|
jobTemplate:
|
||||||
|
spec:
|
||||||
|
parallelism: 1
|
||||||
|
template:
|
||||||
|
spec:
|
||||||
|
restartPolicy: Never
|
||||||
|
containers:
|
||||||
|
- name: twhelp-job-create-snapshots
|
||||||
|
image: twhelp
|
||||||
|
args: [job, create, snapshots]
|
||||||
|
env:
|
||||||
|
- name: APP_MODE
|
||||||
|
value: development
|
||||||
|
- name: LOG_LEVEL
|
||||||
|
value: debug
|
||||||
|
- name: DB_CONNECTION_STRING
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: twhelp-secret
|
||||||
|
key: db-connection-string
|
||||||
|
- name: DB_MAX_OPEN_CONNS
|
||||||
|
value: "1"
|
||||||
|
- name: DB_MAX_IDLE_CONNS
|
||||||
|
value: "1"
|
||||||
|
- name: RABBITMQ_CONNECTION_STRING
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: twhelp-secret
|
||||||
|
key: rabbitmq-connection-string
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: 50m
|
||||||
|
memory: 64Mi
|
||||||
|
limits:
|
||||||
|
cpu: 100m
|
||||||
|
memory: 128Mi
|
||||||
|
---
|
||||||
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: twhelp-migrations-job
|
name: twhelp-migrations-job
|
||||||
|
|
|
@ -11,3 +11,10 @@ metadata:
|
||||||
name: twhelp-job-sync-ennoblements
|
name: twhelp-job-sync-ennoblements
|
||||||
spec:
|
spec:
|
||||||
schedule: 0 0 * * *
|
schedule: 0 0 * * *
|
||||||
|
---
|
||||||
|
apiVersion: batch/v1
|
||||||
|
kind: CronJob
|
||||||
|
metadata:
|
||||||
|
name: twhelp-job-create-snapshots
|
||||||
|
spec:
|
||||||
|
schedule: 0 0 * * *
|
||||||
|
|
Loading…
Reference in New Issue