264 lines
7.7 KiB
Go
264 lines
7.7 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/adapter"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/app"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
var (
|
|
cmdJob = &cli.Command{
|
|
Name: "job",
|
|
Usage: "Run various jobs (e.g. data sync)",
|
|
Subcommands: []*cli.Command{
|
|
{
|
|
Name: "sync",
|
|
Usage: "Trigger various data syncs",
|
|
Subcommands: []*cli.Command{
|
|
{
|
|
Name: "data",
|
|
Usage: "Trigger data sync (servers, players, tribes, villages)",
|
|
Flags: slices.Concat(dbFlags, rmqFlags),
|
|
Action: func(c *cli.Context) error {
|
|
logger := loggerFromCtx(c.Context)
|
|
watermillLogger := newWatermillLogger(logger)
|
|
|
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := publisher.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
bunDB, err := newBunDBFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer closeBunDB(bunDB, logger)
|
|
|
|
serverPublisher := adapter.NewServerWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
)
|
|
|
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
|
dataSyncSvc := app.NewDataSyncService(versionSvc, nil, serverPublisher, nil)
|
|
|
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
|
defer stop()
|
|
|
|
if err = dataSyncSvc.Sync(shutdownSignalCtx); err != nil {
|
|
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
|
}
|
|
|
|
logger.Info("data sync triggered")
|
|
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
Name: "ennoblements",
|
|
Usage: "Trigger ennoblement sync",
|
|
Flags: slices.Concat(dbFlags, rmqFlags),
|
|
Action: func(c *cli.Context) error {
|
|
logger := loggerFromCtx(c.Context)
|
|
watermillLogger := newWatermillLogger(logger)
|
|
|
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := publisher.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
bunDB, err := newBunDBFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer closeBunDB(bunDB, logger)
|
|
|
|
ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
|
|
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
|
|
)
|
|
|
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
|
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
|
dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher)
|
|
|
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
|
defer stop()
|
|
|
|
if err = dataSyncSvc.SyncEnnoblements(shutdownSignalCtx); err != nil {
|
|
return fmt.Errorf("couldn't trigger ennoblement sync: %w", err)
|
|
}
|
|
|
|
logger.Info("ennoblement sync triggered")
|
|
|
|
return nil
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "create",
|
|
Subcommands: []*cli.Command{
|
|
{
|
|
Name: "snapshots",
|
|
Usage: "Trigger snapshot creation (players/tribes)",
|
|
Flags: slices.Concat(dbFlags, rmqFlags),
|
|
Action: func(c *cli.Context) error {
|
|
logger := loggerFromCtx(c.Context)
|
|
watermillLogger := newWatermillLogger(logger)
|
|
|
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := publisher.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
bunDB, err := newBunDBFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer closeBunDB(bunDB, logger)
|
|
|
|
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
|
|
)
|
|
|
|
playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name),
|
|
)
|
|
|
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
|
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
|
snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher, playerSnapshotPublisher)
|
|
|
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
|
defer stop()
|
|
|
|
if err = snapshotSvc.Create(shutdownSignalCtx); err != nil {
|
|
return fmt.Errorf("couldn't trigger snapshot creation: %w", err)
|
|
}
|
|
|
|
logger.Info("snapshot creation triggered")
|
|
|
|
return nil
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "cleanup",
|
|
Description: "Trigger data cleanup",
|
|
Flags: slices.Concat(dbFlags, rmqFlags),
|
|
Action: func(c *cli.Context) error {
|
|
logger := loggerFromCtx(c.Context)
|
|
watermillLogger := newWatermillLogger(logger)
|
|
|
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := publisher.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
bunDB, err := newBunDBFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer closeBunDB(bunDB, logger)
|
|
|
|
dataCleanupPublisher := adapter.NewDataCleanupPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicCleanUpDataCmd.Name),
|
|
)
|
|
|
|
serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil)
|
|
dataCleanupSvc := app.NewDataCleanupService(serverSvc, dataCleanupPublisher)
|
|
|
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
|
defer stop()
|
|
|
|
if err = dataCleanupSvc.CleanUp(shutdownSignalCtx); err != nil {
|
|
return fmt.Errorf("couldn't trigger data cleanup: %w", err)
|
|
}
|
|
|
|
logger.Info("data cleanup triggered")
|
|
|
|
return nil
|
|
},
|
|
},
|
|
},
|
|
}
|
|
)
|