package main import ( "fmt" "log/slog" "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" "gitea.dwysokinski.me/twhelp/corev3/internal/app" "github.com/urfave/cli/v2" ) var ( cmdJob = &cli.Command{ Name: "job", Usage: "Run various jobs (e.g. data sync)", Subcommands: []*cli.Command{ { Name: "sync", Usage: "Trigger various data syncs", Subcommands: []*cli.Command{ { Name: "data", Usage: "Trigger data sync (servers, players, tribes, villages)", Flags: concatSlices(dbFlags, rmqFlags), Action: func(c *cli.Context) error { logger := loggerFromCtx(c.Context) watermillLogger := newWatermillLogger(logger) amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) if err != nil { return err } defer func() { if closeErr := amqpConn.Close(); closeErr != nil { logger.Warn("couldn't close amqp connection", slog.Any("error", err)) } }() publisher, err := newAMQPPublisher(amqpConn, watermillLogger) if err != nil { return err } defer func() { if closeErr := publisher.Close(); closeErr != nil { logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) } }() bunDB, err := newBunDBFromFlags(c) if err != nil { return err } defer func() { logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) if dbCloseErr := bunDB.Close(); dbCloseErr != nil { logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) } else { logger.Debug("db connections closed") } }() serverPublisher := adapter.NewServerWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicServerSyncedEvent.Name), ) versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) dataSyncSvc := app.NewDataSyncService(versionSvc, nil, serverPublisher, nil) shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() if err = dataSyncSvc.Sync(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger data sync: %w", err) } logger.Info("data sync triggered") return nil }, }, { Name: "ennoblements", Usage: "Trigger ennoblement sync", Flags: concatSlices(dbFlags, rmqFlags), Action: func(c *cli.Context) error { logger := loggerFromCtx(c.Context) watermillLogger := newWatermillLogger(logger) amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) if err != nil { return err } defer func() { if closeErr := amqpConn.Close(); closeErr != nil { logger.Warn("couldn't close amqp connection", slog.Any("error", err)) } }() publisher, err := newAMQPPublisher(amqpConn, watermillLogger) if err != nil { return err } defer func() { if closeErr := publisher.Close(); closeErr != nil { logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) } }() bunDB, err := newBunDBFromFlags(c) if err != nil { return err } defer func() { logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) if dbCloseErr := bunDB.Close(); dbCloseErr != nil { logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) } else { logger.Debug("db connections closed") } }() ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicSyncEnnoblementsCmd.Name), c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name), ) versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil) dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher) shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() if err = dataSyncSvc.SyncEnnoblements(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger ennoblement sync: %w", err) } logger.Info("ennoblement sync triggered") return nil }, }, }, }, { Name: "create", Subcommands: []*cli.Command{ { Name: "snapshots", Usage: "Trigger snapshot creation (players/tribes)", Flags: concatSlices(dbFlags, rmqFlags), Action: func(c *cli.Context) error { logger := loggerFromCtx(c.Context) watermillLogger := newWatermillLogger(logger) amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) if err != nil { return err } defer func() { if closeErr := amqpConn.Close(); closeErr != nil { logger.Warn("couldn't close amqp connection", slog.Any("error", err)) } }() publisher, err := newAMQPPublisher(amqpConn, watermillLogger) if err != nil { return err } defer func() { if closeErr := publisher.Close(); closeErr != nil { logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) } }() bunDB, err := newBunDBFromFlags(c) if err != nil { return err } defer func() { logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) if dbCloseErr := bunDB.Close(); dbCloseErr != nil { logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) } else { logger.Debug("db connections closed") } }() tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), ) playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name), c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name), ) versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) serverSvc := app.NewServerService(adapter.NewServerBunRepository(bunDB), nil, nil) snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher, playerSnapshotPublisher) shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) defer stop() if err = snapshotSvc.Create(shutdownSignalCtx); err != nil { return fmt.Errorf("couldn't trigger snapshot creation: %w", err) } logger.Info("snapshot creation triggered") return nil }, }, }, }, }, } )