feat: add a new job - sync ennoblements (#27)
Reviewed-on: twhelp/corev3#27
This commit is contained in:
parent
a44391c06f
commit
1c6624fa2d
4
Makefile
4
Makefile
|
@ -32,3 +32,7 @@ generate:
|
||||||
.PHONY: create-job-sync-data
|
.PHONY: create-job-sync-data
|
||||||
create-job-sync-data:
|
create-job-sync-data:
|
||||||
kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)"
|
kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)"
|
||||||
|
|
||||||
|
.PHONY: create-job-sync-ennoblements
|
||||||
|
create-job-sync-ennoblements:
|
||||||
|
kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)"
|
||||||
|
|
|
@ -67,16 +67,81 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||||
|
dataSyncSvc := app.NewDataSyncService(versionSvc, nil, serverPublisher, nil)
|
||||||
|
|
||||||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil {
|
if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil {
|
||||||
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("data sync triggered")
|
logger.Info("data sync triggered")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "ennoblements",
|
||||||
|
Usage: "Trigger ennoblement sync",
|
||||||
|
Flags: concatSlices(dbFlags, rmqFlags),
|
||||||
|
Action: func(c *cli.Context) error {
|
||||||
|
logger := loggerFromCtx(c.Context)
|
||||||
|
watermillLogger := newWatermillLogger(logger)
|
||||||
|
|
||||||
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
||||||
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closeErr := publisher.Close(); closeErr != nil {
|
||||||
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
bunDB, err := newBunDBFromFlags(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
|
||||||
|
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
|
||||||
|
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
|
||||||
|
} else {
|
||||||
|
logger.Debug("db connections closed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher(
|
||||||
|
publisher,
|
||||||
|
newWatermillMarshaler(),
|
||||||
|
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
|
||||||
|
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
|
||||||
|
)
|
||||||
|
|
||||||
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||||
|
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
||||||
|
dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher)
|
||||||
|
|
||||||
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil {
|
||||||
|
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("ennoblement sync triggered")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -41,6 +41,16 @@ var (
|
||||||
Value: "villages.event.synced",
|
Value: "villages.event.synced",
|
||||||
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
|
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
|
||||||
}
|
}
|
||||||
|
rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.syncEnnoblementsCmd",
|
||||||
|
Value: "ennoblements.cmd.sync",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"},
|
||||||
|
}
|
||||||
|
rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.ennoblementsSyncedEvent",
|
||||||
|
Value: "ennoblements.event.synced",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
|
||||||
|
}
|
||||||
rmqFlags = []cli.Flag{
|
rmqFlags = []cli.Flag{
|
||||||
rmqFlagConnectionString,
|
rmqFlagConnectionString,
|
||||||
rmqFlagTopicSyncServersCmd,
|
rmqFlagTopicSyncServersCmd,
|
||||||
|
@ -48,6 +58,8 @@ var (
|
||||||
rmqFlagTopicTribesSyncedEvent,
|
rmqFlagTopicTribesSyncedEvent,
|
||||||
rmqFlagTopicPlayersSyncedEvent,
|
rmqFlagTopicPlayersSyncedEvent,
|
||||||
rmqFlagTopicVillagesSyncedEvent,
|
rmqFlagTopicVillagesSyncedEvent,
|
||||||
|
rmqFlagTopicSyncEnnoblementsCmd,
|
||||||
|
rmqFlagTopicEnnoblementsSyncedEvent,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
package adapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EnnoblementWatermillPublisher struct {
|
||||||
|
marshaler watermillmsg.Marshaler
|
||||||
|
publisher message.Publisher
|
||||||
|
cmdSyncTopic string
|
||||||
|
eventSyncedTopic string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEnnoblementWatermillPublisher(
|
||||||
|
publisher message.Publisher,
|
||||||
|
marshaler watermillmsg.Marshaler,
|
||||||
|
cmdSyncTopic string,
|
||||||
|
eventSyncedTopic string,
|
||||||
|
) *EnnoblementWatermillPublisher {
|
||||||
|
return &EnnoblementWatermillPublisher{
|
||||||
|
publisher: publisher,
|
||||||
|
marshaler: marshaler,
|
||||||
|
cmdSyncTopic: cmdSyncTopic,
|
||||||
|
eventSyncedTopic: eventSyncedTopic,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *EnnoblementWatermillPublisher) CmdSync(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.SyncEnnoblementsCmdPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncEnnoblementsCmdPayload{
|
||||||
|
ServerKey: p.ServerKey(),
|
||||||
|
ServerURL: p.ServerURL(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal SyncEnnoblementsCmdPayload: %w", p.ServerKey(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *EnnoblementWatermillPublisher) EventSynced(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.EnnoblementsSyncedEventPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.EnnoblementsSyncedEventPayload{
|
||||||
|
ServerKey: p.ServerKey(),
|
||||||
|
ServerURL: p.ServerURL(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal EnnoblementsSyncedEventPayload: %w", p.ServerKey(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -22,3 +22,8 @@ type PlayerPublisher interface {
|
||||||
type VillagePublisher interface {
|
type VillagePublisher interface {
|
||||||
EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error
|
EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EnnoblementPublisher interface {
|
||||||
|
CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error
|
||||||
|
EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error
|
||||||
|
}
|
||||||
|
|
|
@ -8,12 +8,24 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type DataSyncService struct {
|
type DataSyncService struct {
|
||||||
versionSvc *VersionService
|
versionSvc *VersionService
|
||||||
serverPub ServerPublisher
|
serverSvc *ServerService
|
||||||
|
serverPub ServerPublisher
|
||||||
|
ennoblementPub EnnoblementPublisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService {
|
func NewDataSyncService(
|
||||||
return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub}
|
versionSvc *VersionService,
|
||||||
|
serverSvc *ServerService,
|
||||||
|
serverPub ServerPublisher,
|
||||||
|
ennoblementPub EnnoblementPublisher,
|
||||||
|
) *DataSyncService {
|
||||||
|
return &DataSyncService{
|
||||||
|
versionSvc: versionSvc,
|
||||||
|
serverSvc: serverSvc,
|
||||||
|
serverPub: serverPub,
|
||||||
|
ennoblementPub: ennoblementPub,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||||
|
@ -36,6 +48,37 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||||
return svc.serverPub.CmdSync(ctx, payloads...)
|
return svc.serverPub.CmdSync(ctx, payloads...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
|
func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
|
||||||
panic("unimplemented")
|
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range versions {
|
||||||
|
if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", v.Code(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error {
|
||||||
|
servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads := make([]domain.SyncEnnoblementsCmdPayload, 0, len(servers))
|
||||||
|
|
||||||
|
for _, s := range servers {
|
||||||
|
p, payloadErr := domain.NewSyncEnnoblementsCmdPayload(s.Key(), s.URL(), s.VersionCode())
|
||||||
|
if payloadErr != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't construct domain.SyncEnnoblementsCmdPayload: %w", s.Key(), payloadErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads = append(payloads, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc.ennoblementPub.CmdSync(ctx, payloads...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm
|
||||||
return fmt.Errorf("%s: couldn't list special servers: %w", versionCode, err)
|
return fmt.Errorf("%s: couldn't list special servers: %w", versionCode, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentlyStoredOpenServers, err := svc.listAllOpen(ctx, versionCode)
|
currentlyStoredOpenServers, err := svc.ListAllOpen(ctx, versionCode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: couldn't list open servers: %w", versionCode, err)
|
return fmt.Errorf("%s: couldn't list open servers: %w", versionCode, err)
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string
|
||||||
return svc.ListAll(ctx, params)
|
return svc.ListAll(ctx, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) listAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) {
|
func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) {
|
||||||
params := domain.NewListServersParams()
|
params := domain.NewListServersParams()
|
||||||
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncEnnoblementsCmdPayload struct {
|
||||||
|
serverKey string
|
||||||
|
serverURL *url.URL
|
||||||
|
versionCode string
|
||||||
|
}
|
||||||
|
|
||||||
|
const syncEnnoblementsCmdPayloadModelName = "SyncEnnoblementsCmdPayload"
|
||||||
|
|
||||||
|
func NewSyncEnnoblementsCmdPayload(
|
||||||
|
serverKey string,
|
||||||
|
serverURL *url.URL,
|
||||||
|
versionCode string,
|
||||||
|
) (SyncEnnoblementsCmdPayload, error) {
|
||||||
|
if serverKey == "" {
|
||||||
|
return SyncEnnoblementsCmdPayload{}, ValidationError{
|
||||||
|
Model: syncEnnoblementsCmdPayloadModelName,
|
||||||
|
Field: "serverKey",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if serverURL == nil {
|
||||||
|
return SyncEnnoblementsCmdPayload{}, ValidationError{
|
||||||
|
Model: syncEnnoblementsCmdPayloadModelName,
|
||||||
|
Field: "serverURL",
|
||||||
|
Err: ErrNil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return SyncEnnoblementsCmdPayload{}, ValidationError{
|
||||||
|
Model: syncEnnoblementsCmdPayloadModelName,
|
||||||
|
Field: "versionCode",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return SyncEnnoblementsCmdPayload{
|
||||||
|
serverKey: serverKey,
|
||||||
|
serverURL: serverURL,
|
||||||
|
versionCode: versionCode,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SyncEnnoblementsCmdPayload) ServerKey() string {
|
||||||
|
return p.serverKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SyncEnnoblementsCmdPayload) ServerURL() *url.URL {
|
||||||
|
return p.serverURL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SyncEnnoblementsCmdPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
||||||
|
|
||||||
|
type EnnoblementsSyncedEventPayload struct {
|
||||||
|
serverKey string
|
||||||
|
serverURL *url.URL
|
||||||
|
versionCode string
|
||||||
|
}
|
||||||
|
|
||||||
|
const ennoblementsSyncedEventPayloadModelName = "EnnoblementsSyncedEventPayload"
|
||||||
|
|
||||||
|
func NewEnnoblementsSyncedEventPayload(
|
||||||
|
serverKey string,
|
||||||
|
serverURL *url.URL,
|
||||||
|
versionCode string,
|
||||||
|
) (EnnoblementsSyncedEventPayload, error) {
|
||||||
|
if serverKey == "" {
|
||||||
|
return EnnoblementsSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: ennoblementsSyncedEventPayloadModelName,
|
||||||
|
Field: "serverKey",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if serverURL == nil {
|
||||||
|
return EnnoblementsSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: ennoblementsSyncedEventPayloadModelName,
|
||||||
|
Field: "serverURL",
|
||||||
|
Err: ErrNil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return EnnoblementsSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: ennoblementsSyncedEventPayloadModelName,
|
||||||
|
Field: "versionCode",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return EnnoblementsSyncedEventPayload{
|
||||||
|
serverKey: serverKey,
|
||||||
|
serverURL: serverURL,
|
||||||
|
versionCode: versionCode,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p EnnoblementsSyncedEventPayload) ServerKey() string {
|
||||||
|
return p.serverKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p EnnoblementsSyncedEventPayload) ServerURL() *url.URL {
|
||||||
|
return p.serverURL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p EnnoblementsSyncedEventPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
package domain_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewSyncEnnoblementsCmdPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
server := domaintest.NewServer(t)
|
||||||
|
|
||||||
|
payload, err := domain.NewSyncEnnoblementsCmdPayload(
|
||||||
|
server.Key(),
|
||||||
|
server.URL(),
|
||||||
|
server.VersionCode(),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||||
|
assert.Equal(t, server.URL(), payload.ServerURL())
|
||||||
|
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewEnnoblementsSyncedEventPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
server := domaintest.NewServer(t)
|
||||||
|
|
||||||
|
payload, err := domain.NewEnnoblementsSyncedEventPayload(
|
||||||
|
server.Key(),
|
||||||
|
server.URL(),
|
||||||
|
server.VersionCode(),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||||
|
assert.Equal(t, server.URL(), payload.ServerURL())
|
||||||
|
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
import "net/url"
|
||||||
|
|
||||||
|
type SyncEnnoblementsCmdPayload struct {
|
||||||
|
ServerKey string `json:"serverKey"`
|
||||||
|
ServerURL *url.URL `json:"serverUrl"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type EnnoblementsSyncedEventPayload struct {
|
||||||
|
ServerKey string `json:"serverKey"`
|
||||||
|
ServerURL *url.URL `json:"serverUrl"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
}
|
|
@ -45,6 +45,52 @@ spec:
|
||||||
memory: 128Mi
|
memory: 128Mi
|
||||||
---
|
---
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
|
kind: CronJob
|
||||||
|
metadata:
|
||||||
|
name: twhelp-job-sync-ennoblements
|
||||||
|
spec:
|
||||||
|
schedule: "* * * * *"
|
||||||
|
successfulJobsHistoryLimit: 3
|
||||||
|
failedJobsHistoryLimit: 3
|
||||||
|
concurrencyPolicy: Forbid
|
||||||
|
jobTemplate:
|
||||||
|
spec:
|
||||||
|
parallelism: 1
|
||||||
|
template:
|
||||||
|
spec:
|
||||||
|
restartPolicy: Never
|
||||||
|
containers:
|
||||||
|
- name: twhelp-job-sync-ennoblements
|
||||||
|
image: twhelp
|
||||||
|
args: [job, sync, ennoblements]
|
||||||
|
env:
|
||||||
|
- name: APP_MODE
|
||||||
|
value: development
|
||||||
|
- name: LOG_LEVEL
|
||||||
|
value: debug
|
||||||
|
- name: DB_CONNECTION_STRING
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: twhelp-secret
|
||||||
|
key: db-connection-string
|
||||||
|
- name: DB_MAX_OPEN_CONNS
|
||||||
|
value: "1"
|
||||||
|
- name: DB_MAX_IDLE_CONNS
|
||||||
|
value: "1"
|
||||||
|
- name: RABBITMQ_CONNECTION_STRING
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: twhelp-secret
|
||||||
|
key: rabbitmq-connection-string
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: 50m
|
||||||
|
memory: 64Mi
|
||||||
|
limits:
|
||||||
|
cpu: 100m
|
||||||
|
memory: 128Mi
|
||||||
|
---
|
||||||
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: twhelp-migrations-job
|
name: twhelp-migrations-job
|
||||||
|
|
|
@ -4,3 +4,10 @@ metadata:
|
||||||
name: twhelp-job-sync-data
|
name: twhelp-job-sync-data
|
||||||
spec:
|
spec:
|
||||||
schedule: 0 0 * * *
|
schedule: 0 0 * * *
|
||||||
|
---
|
||||||
|
apiVersion: batch/v1
|
||||||
|
kind: CronJob
|
||||||
|
metadata:
|
||||||
|
name: twhelp-job-sync-ennoblements
|
||||||
|
spec:
|
||||||
|
schedule: 0 0 * * *
|
||||||
|
|
Loading…
Reference in New Issue