feat: add a new job - sync ennoblements (#27)

Reviewed-on: twhelp/corev3#27
This commit is contained in:
Dawid Wysokiński 2024-01-03 08:59:24 +00:00
parent a44391c06f
commit 1c6624fa2d
12 changed files with 448 additions and 9 deletions

View File

@ -32,3 +32,7 @@ generate:
.PHONY: create-job-sync-data
create-job-sync-data:
kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)"
.PHONY: create-job-sync-ennoblements
create-job-sync-ennoblements:
kubectl create job --from=cronjob/twhelp-job-sync-ennoblements-dev "twhelp-job-sync-ennoblements-$(shell openssl rand -hex 10)"

View File

@ -67,16 +67,81 @@ var (
)
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
dataSyncSvc := app.NewDataSyncService(versionSvc, nil, serverPublisher, nil)
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
defer stop()
if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil {
if err = dataSyncSvc.TriggerDataSync(shutdownSignalCtx); err != nil {
return fmt.Errorf("couldn't trigger data sync: %w", err)
}
logger.Info("data sync triggered")
return nil
},
},
{
Name: "ennoblements",
Usage: "Trigger ennoblement sync",
Flags: concatSlices(dbFlags, rmqFlags),
Action: func(c *cli.Context) error {
logger := loggerFromCtx(c.Context)
watermillLogger := newWatermillLogger(logger)
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := amqpConn.Close(); closeErr != nil {
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
}
}()
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := publisher.Close(); closeErr != nil {
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
}
}()
bunDB, err := newBunDBFromFlags(c)
if err != nil {
return err
}
defer func() {
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
} else {
logger.Debug("db connections closed")
}
}()
ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher(
publisher,
newWatermillMarshaler(),
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
)
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher)
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
defer stop()
if err = dataSyncSvc.TriggerEnnoblementSync(shutdownSignalCtx); err != nil {
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
}
logger.Info("ennoblement sync triggered")
return nil
},
},

View File

@ -41,6 +41,16 @@ var (
Value: "villages.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
}
rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{
Name: "rabbitmq.topic.syncEnnoblementsCmd",
Value: "ennoblements.cmd.sync",
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"},
}
rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.ennoblementsSyncedEvent",
Value: "ennoblements.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd,
@ -48,6 +58,8 @@ var (
rmqFlagTopicTribesSyncedEvent,
rmqFlagTopicPlayersSyncedEvent,
rmqFlagTopicVillagesSyncedEvent,
rmqFlagTopicSyncEnnoblementsCmd,
rmqFlagTopicEnnoblementsSyncedEvent,
}
)

View File

@ -0,0 +1,83 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type EnnoblementWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
cmdSyncTopic string
eventSyncedTopic string
}
func NewEnnoblementWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
cmdSyncTopic string,
eventSyncedTopic string,
) *EnnoblementWatermillPublisher {
return &EnnoblementWatermillPublisher{
publisher: publisher,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventSyncedTopic: eventSyncedTopic,
}
}
func (pub *EnnoblementWatermillPublisher) CmdSync(
ctx context.Context,
payloads ...domain.SyncEnnoblementsCmdPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncEnnoblementsCmdPayload{
ServerKey: p.ServerKey(),
ServerURL: p.ServerURL(),
VersionCode: p.VersionCode(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal SyncEnnoblementsCmdPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
}
return nil
}
func (pub *EnnoblementWatermillPublisher) EventSynced(
ctx context.Context,
payloads ...domain.EnnoblementsSyncedEventPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.EnnoblementsSyncedEventPayload{
ServerKey: p.ServerKey(),
ServerURL: p.ServerURL(),
VersionCode: p.VersionCode(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal EnnoblementsSyncedEventPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
}
return nil
}

View File

@ -22,3 +22,8 @@ type PlayerPublisher interface {
type VillagePublisher interface {
EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error
}
type EnnoblementPublisher interface {
CmdSync(ctx context.Context, payloads ...domain.SyncEnnoblementsCmdPayload) error
EventSynced(ctx context.Context, payloads ...domain.EnnoblementsSyncedEventPayload) error
}

View File

@ -8,12 +8,24 @@ import (
)
type DataSyncService struct {
versionSvc *VersionService
serverPub ServerPublisher
versionSvc *VersionService
serverSvc *ServerService
serverPub ServerPublisher
ennoblementPub EnnoblementPublisher
}
func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService {
return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub}
func NewDataSyncService(
versionSvc *VersionService,
serverSvc *ServerService,
serverPub ServerPublisher,
ennoblementPub EnnoblementPublisher,
) *DataSyncService {
return &DataSyncService{
versionSvc: versionSvc,
serverSvc: serverSvc,
serverPub: serverPub,
ennoblementPub: ennoblementPub,
}
}
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
@ -36,6 +48,37 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
return svc.serverPub.CmdSync(ctx, payloads...)
}
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
panic("unimplemented")
func (svc *DataSyncService) TriggerEnnoblementSync(ctx context.Context) error {
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
if err != nil {
return err
}
for _, v := range versions {
if err = svc.triggerEnnoblementSyncForVersion(ctx, v); err != nil {
return fmt.Errorf("%s: %w", v.Code(), err)
}
}
return nil
}
func (svc *DataSyncService) triggerEnnoblementSyncForVersion(ctx context.Context, v domain.Version) error {
servers, err := svc.serverSvc.ListAllOpen(ctx, v.Code())
if err != nil {
return err
}
payloads := make([]domain.SyncEnnoblementsCmdPayload, 0, len(servers))
for _, s := range servers {
p, payloadErr := domain.NewSyncEnnoblementsCmdPayload(s.Key(), s.URL(), s.VersionCode())
if payloadErr != nil {
return fmt.Errorf("%s: couldn't construct domain.SyncEnnoblementsCmdPayload: %w", s.Key(), payloadErr)
}
payloads = append(payloads, p)
}
return svc.ennoblementPub.CmdSync(ctx, payloads...)
}

View File

@ -37,7 +37,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm
return fmt.Errorf("%s: couldn't list special servers: %w", versionCode, err)
}
currentlyStoredOpenServers, err := svc.listAllOpen(ctx, versionCode)
currentlyStoredOpenServers, err := svc.ListAllOpen(ctx, versionCode)
if err != nil {
return fmt.Errorf("%s: couldn't list open servers: %w", versionCode, err)
}
@ -84,7 +84,7 @@ func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string
return svc.ListAll(ctx, params)
}
func (svc *ServerService) listAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) {
func (svc *ServerService) ListAllOpen(ctx context.Context, versionCode string) (domain.Servers, error) {
params := domain.NewListServersParams()
if err := params.SetVersionCodes([]string{versionCode}); err != nil {
return nil, err

View File

@ -0,0 +1,117 @@
package domain
import (
"net/url"
)
type SyncEnnoblementsCmdPayload struct {
serverKey string
serverURL *url.URL
versionCode string
}
const syncEnnoblementsCmdPayloadModelName = "SyncEnnoblementsCmdPayload"
func NewSyncEnnoblementsCmdPayload(
serverKey string,
serverURL *url.URL,
versionCode string,
) (SyncEnnoblementsCmdPayload, error) {
if serverKey == "" {
return SyncEnnoblementsCmdPayload{}, ValidationError{
Model: syncEnnoblementsCmdPayloadModelName,
Field: "serverKey",
Err: ErrRequired,
}
}
if serverURL == nil {
return SyncEnnoblementsCmdPayload{}, ValidationError{
Model: syncEnnoblementsCmdPayloadModelName,
Field: "serverURL",
Err: ErrNil,
}
}
if versionCode == "" {
return SyncEnnoblementsCmdPayload{}, ValidationError{
Model: syncEnnoblementsCmdPayloadModelName,
Field: "versionCode",
Err: ErrRequired,
}
}
return SyncEnnoblementsCmdPayload{
serverKey: serverKey,
serverURL: serverURL,
versionCode: versionCode,
}, nil
}
func (p SyncEnnoblementsCmdPayload) ServerKey() string {
return p.serverKey
}
func (p SyncEnnoblementsCmdPayload) ServerURL() *url.URL {
return p.serverURL
}
func (p SyncEnnoblementsCmdPayload) VersionCode() string {
return p.versionCode
}
type EnnoblementsSyncedEventPayload struct {
serverKey string
serverURL *url.URL
versionCode string
}
const ennoblementsSyncedEventPayloadModelName = "EnnoblementsSyncedEventPayload"
func NewEnnoblementsSyncedEventPayload(
serverKey string,
serverURL *url.URL,
versionCode string,
) (EnnoblementsSyncedEventPayload, error) {
if serverKey == "" {
return EnnoblementsSyncedEventPayload{}, ValidationError{
Model: ennoblementsSyncedEventPayloadModelName,
Field: "serverKey",
Err: ErrRequired,
}
}
if serverURL == nil {
return EnnoblementsSyncedEventPayload{}, ValidationError{
Model: ennoblementsSyncedEventPayloadModelName,
Field: "serverURL",
Err: ErrNil,
}
}
if versionCode == "" {
return EnnoblementsSyncedEventPayload{}, ValidationError{
Model: ennoblementsSyncedEventPayloadModelName,
Field: "versionCode",
Err: ErrRequired,
}
}
return EnnoblementsSyncedEventPayload{
serverKey: serverKey,
serverURL: serverURL,
versionCode: versionCode,
}, nil
}
func (p EnnoblementsSyncedEventPayload) ServerKey() string {
return p.serverKey
}
func (p EnnoblementsSyncedEventPayload) ServerURL() *url.URL {
return p.serverURL
}
func (p EnnoblementsSyncedEventPayload) VersionCode() string {
return p.versionCode
}

View File

@ -0,0 +1,42 @@
package domain_test
import (
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewSyncEnnoblementsCmdPayload(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
payload, err := domain.NewSyncEnnoblementsCmdPayload(
server.Key(),
server.URL(),
server.VersionCode(),
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
}
func TestNewEnnoblementsSyncedEventPayload(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
payload, err := domain.NewEnnoblementsSyncedEventPayload(
server.Key(),
server.URL(),
server.VersionCode(),
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
}

View File

@ -0,0 +1,15 @@
package watermillmsg
import "net/url"
type SyncEnnoblementsCmdPayload struct {
ServerKey string `json:"serverKey"`
ServerURL *url.URL `json:"serverUrl"`
VersionCode string `json:"versionCode"`
}
type EnnoblementsSyncedEventPayload struct {
ServerKey string `json:"serverKey"`
ServerURL *url.URL `json:"serverUrl"`
VersionCode string `json:"versionCode"`
}

View File

@ -45,6 +45,52 @@ spec:
memory: 128Mi
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: twhelp-job-sync-ennoblements
spec:
schedule: "* * * * *"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
concurrencyPolicy: Forbid
jobTemplate:
spec:
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: twhelp-job-sync-ennoblements
image: twhelp
args: [job, sync, ennoblements]
env:
- name: APP_MODE
value: development
- name: LOG_LEVEL
value: debug
- name: DB_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: db-connection-string
- name: DB_MAX_OPEN_CONNS
value: "1"
- name: DB_MAX_IDLE_CONNS
value: "1"
- name: RABBITMQ_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: rabbitmq-connection-string
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 100m
memory: 128Mi
---
apiVersion: batch/v1
kind: Job
metadata:
name: twhelp-migrations-job

View File

@ -4,3 +4,10 @@ metadata:
name: twhelp-job-sync-data
spec:
schedule: 0 0 * * *
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: twhelp-job-sync-ennoblements
spec:
schedule: 0 0 * * *