package main import ( "context" "fmt" "log/slog" "slices" "sync" "time" "gitea.dwysokinski.me/twhelp/core/internal/adapter" "gitea.dwysokinski.me/twhelp/core/internal/app" "gitea.dwysokinski.me/twhelp/core/internal/health" "gitea.dwysokinski.me/twhelp/core/internal/health/healthfile" "gitea.dwysokinski.me/twhelp/core/internal/port" "gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ettle/strcase" "github.com/uptrace/bun" "github.com/urfave/cli/v2" ) var cmdConsumer = &cli.Command{ Name: "consumer", Usage: "Run one of the workers", Subcommands: []*cli.Command{ { Name: "server", Usage: "Run the worker responsible for consuming server-related messages", Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags), Action: func(c *cli.Context) error { return runConsumer( c, "ServerConsumer", func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error { serverPublisher := adapter.NewServerWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicServerSyncedEvent.Name), ) serverSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicCreateServerSnapshotCmd.Name), "", ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } serverSvc := app.NewServerService(adapter.NewServerBunRepository(db), twSvc, serverPublisher) serverSnapshotSvc := app.NewServerSnapshotService( adapter.NewServerSnapshotBunRepository(db), serverSvc, serverSnapshotPublisher, ) consumer := port.NewServerWatermillConsumer( serverSvc, serverSnapshotSvc, subscriber, logger, marshaler, c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicCreateServerSnapshotCmd.Name), c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicTribesSyncedEvent.Name), c.String(rmqFlagTopicPlayersSyncedEvent.Name), c.String(rmqFlagTopicVillagesSyncedEvent.Name), c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name), c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name), ) consumer.Register(router) return nil }, ) }, }, { Name: "tribe", Usage: "Run the worker responsible for consuming tribe-related messages", Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags), Action: func(c *cli.Context) error { return runConsumer( c, "TribeConsumer", func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error { tribePublisher := adapter.NewTribeWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicTribesSyncedEvent.Name), ) tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } tribeSvc := app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher) tribeSnapshotSvc := app.NewTribeSnapshotService( adapter.NewTribeSnapshotBunRepository(db), tribeSvc, tribeSnapshotPublisher, ) consumer := port.NewTribeWatermillConsumer( tribeSvc, tribeSnapshotSvc, subscriber, logger, marshaler, c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicVillagesSyncedEvent.Name), c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), c.String(rmqFlagTopicCleanUpDataCmd.Name), ) consumer.Register(router) return nil }, ) }, }, { Name: "player", Usage: "Run the worker responsible for consuming player-related messages", Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags), Action: func(c *cli.Context) error { return runConsumer( c, "PlayerConsumer", func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error { playerPublisher := adapter.NewPlayerWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicPlayersSyncedEvent.Name), ) playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name), c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name), ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } tribeChangeSvc := app.NewTribeChangeService(adapter.NewTribeChangeBunRepository(db)) playerSvc := app.NewPlayerService( adapter.NewPlayerBunRepository(db), tribeChangeSvc, twSvc, playerPublisher, ) playerSnapshotSvc := app.NewPlayerSnapshotService( adapter.NewPlayerSnapshotBunRepository(db), playerSvc, playerSnapshotPublisher, ) consumer := port.NewPlayerWatermillConsumer( playerSvc, playerSnapshotSvc, subscriber, logger, marshaler, c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name), c.String(rmqFlagTopicCleanUpDataCmd.Name), ) consumer.Register(router) return nil }, ) }, }, { Name: "village", Usage: "Run the worker responsible for consuming village-related messages", Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags), Action: func(c *cli.Context) error { return runConsumer( c, "VillageConsumer", func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error { villagePublisher := adapter.NewVillageWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicVillagesSyncedEvent.Name), ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } consumer := port.NewVillageWatermillConsumer( app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher), subscriber, logger, marshaler, c.String(rmqFlagTopicServerSyncedEvent.Name), ) consumer.Register(router) return nil }, ) }, }, { Name: "ennoblement", Usage: "Run the worker responsible for consuming ennoblement-related messages", Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags), Action: func(c *cli.Context) error { return runConsumer( c, "EnnoblementConsumer", func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error { ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher( publisher, marshaler, c.String(rmqFlagTopicSyncEnnoblementsCmd.Name), c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name), ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } consumer := port.NewEnnoblementWatermillConsumer( app.NewEnnoblementService(adapter.NewEnnoblementBunRepository(db), twSvc, ennoblementPublisher), subscriber, logger, marshaler, c.String(rmqFlagTopicSyncEnnoblementsCmd.Name), c.String(rmqFlagTopicCleanUpDataCmd.Name), ) consumer.Register(router) return nil }, ) }, }, }, } type registerConsumerHandlersFunc func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, publisher *amqp.Publisher, subscriber *amqp.Subscriber, marshaler watermillmsg.Marshaler, db *bun.DB, ) error //nolint:gocyclo func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error { logger := loggerFromCtx(c.Context) watermillLogger := newWatermillLogger(logger) amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) if err != nil { return err } defer func() { if closeErr := amqpConn.Close(); closeErr != nil { logger.Warn("couldn't close amqp connection", slog.Any("error", err)) } }() publisher, err := newAMQPPublisher(amqpConn, watermillLogger) if err != nil { return err } defer func() { if closeErr := publisher.Close(); closeErr != nil { logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) } }() subscriber, err := newAMQPSubscriber(amqpConn, watermillLogger, name) if err != nil { return err } defer func() { if closeErr := subscriber.Close(); closeErr != nil { logger.Warn("couldn't close amqp subscriber", slog.Any("error", err)) } }() bunDB, err := newBunDBFromFlags(c, strcase.ToSnake(name)) if err != nil { return err } defer closeBunDB(bunDB, logger) router, err := newWatermillRouter(watermillLogger) if err != nil { return err } if err = registerHandlers( c, router, watermillLogger, publisher, subscriber, newWatermillMarshaler(), bunDB, ); err != nil { return err } ctx, cancel := context.WithCancel(c.Context) defer cancel() var wg sync.WaitGroup defer func() { // it's required for the graceful shutdown wg.Wait() }() healthObserver := healthfile.LiveObserver(health.New(), "/tmp/live") defer func() { if closeErr := healthObserver.Close(); closeErr != nil { logger.Warn("couldn't close health observer", slog.Any("error", closeErr)) } }() wg.Add(1) go func() { defer wg.Done() if runErr := healthObserver.Run(ctx); runErr != nil { logger.Warn("couldn't run health observer", slog.Any("error", runErr)) } }() wg.Add(1) go func() { defer wg.Done() select { case <-ctx.Done(): return case <-router.Running(): logger.Info("consumer is up and running", slog.String("name", name)) } waitForShutdownSignal(ctx, logger) if closeErr := router.Close(); closeErr != nil { logger.Warn("couldn't close router", slog.Any("error", err)) } }() if err = router.Run(ctx); err != nil { cancel() return fmt.Errorf("couldn't run router: %w", err) } return nil } const ( watermillRouterCloseTimeout = 30 * time.Second watermillRetryMWMaxRetries = 3 watermillRetryMWMInitialInterval = 3 * time.Second watermillRetryMWMMaxInterval = 10 * time.Second watermillRetryMWMIntervalMultiplier = 1.5 watermillTimeout = 2 * time.Minute ) func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) { router, err := message.NewRouter(message.RouterConfig{ CloseTimeout: watermillRouterCloseTimeout, }, logger) if err != nil { return nil, err } router.AddMiddleware(middleware.Recoverer) router.AddMiddleware(middleware.Timeout(watermillTimeout)) router.AddMiddleware(middleware.Retry{ MaxRetries: watermillRetryMWMaxRetries, InitialInterval: watermillRetryMWMInitialInterval, MaxInterval: watermillRetryMWMMaxInterval, Multiplier: watermillRetryMWMIntervalMultiplier, Logger: logger, }.Middleware) return router, nil }