core/cmd/twhelp/cmd_consumer.go

438 lines
11 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"slices"
"sync"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/adapter"
"gitea.dwysokinski.me/twhelp/core/internal/app"
"gitea.dwysokinski.me/twhelp/core/internal/health"
"gitea.dwysokinski.me/twhelp/core/internal/health/healthfile"
"gitea.dwysokinski.me/twhelp/core/internal/port"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/uptrace/bun"
"github.com/urfave/cli/v2"
)
var cmdConsumer = &cli.Command{
Name: "consumer",
Usage: "Run one of the workers",
Subcommands: []*cli.Command{
{
Name: "server",
Usage: "Run the worker responsible for consuming server-related messages",
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"ServerConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
serverPublisher := adapter.NewServerWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicSyncServersCmd.Name),
c.String(rmqFlagTopicServerSyncedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
consumer := port.NewServerWatermillConsumer(
app.NewServerService(adapter.NewServerBunRepository(db), twSvc, serverPublisher),
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicSyncServersCmd.Name),
c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicTribesSyncedEvent.Name),
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name),
)
consumer.Register(router)
return nil
},
)
},
},
{
Name: "tribe",
Usage: "Run the worker responsible for consuming tribe-related messages",
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"TribeConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
tribePublisher := adapter.NewTribeWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicTribesSyncedEvent.Name),
)
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
publisher,
newWatermillMarshaler(),
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
tribeSvc := app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher)
tribeSnapshotSvc := app.NewTribeSnapshotService(
adapter.NewTribeSnapshotBunRepository(db),
tribeSvc,
tribeSnapshotPublisher,
)
consumer := port.NewTribeWatermillConsumer(
tribeSvc,
tribeSnapshotSvc,
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
c.String(rmqFlagTopicCleanUpDataCmd.Name),
)
consumer.Register(router)
return nil
},
)
},
},
{
Name: "player",
Usage: "Run the worker responsible for consuming player-related messages",
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"PlayerConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
playerPublisher := adapter.NewPlayerWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
)
playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
publisher,
newWatermillMarshaler(),
c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name),
c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
tribeChangeSvc := app.NewTribeChangeService(adapter.NewTribeChangeBunRepository(db))
playerSvc := app.NewPlayerService(
adapter.NewPlayerBunRepository(db),
tribeChangeSvc,
twSvc,
playerPublisher,
)
playerSnapshotSvc := app.NewPlayerSnapshotService(
adapter.NewPlayerSnapshotBunRepository(db),
playerSvc,
playerSnapshotPublisher,
)
consumer := port.NewPlayerWatermillConsumer(
playerSvc,
playerSnapshotSvc,
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name),
c.String(rmqFlagTopicCleanUpDataCmd.Name),
)
consumer.Register(router)
return nil
},
)
},
},
{
Name: "village",
Usage: "Run the worker responsible for consuming village-related messages",
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"VillageConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
villagePublisher := adapter.NewVillageWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
consumer := port.NewVillageWatermillConsumer(
app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher),
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicServerSyncedEvent.Name),
)
consumer.Register(router)
return nil
},
)
},
},
{
Name: "ennoblement",
Usage: "Run the worker responsible for consuming ennoblement-related messages",
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"EnnoblementConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
consumer := port.NewEnnoblementWatermillConsumer(
app.NewEnnoblementService(adapter.NewEnnoblementBunRepository(db), twSvc, ennoblementPublisher),
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
c.String(rmqFlagTopicCleanUpDataCmd.Name),
)
consumer.Register(router)
return nil
},
)
},
},
},
}
type registerConsumerHandlersFunc func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error
//nolint:gocyclo
func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error {
logger := loggerFromCtx(c.Context)
watermillLogger := newWatermillLogger(logger)
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := amqpConn.Close(); closeErr != nil {
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
}
}()
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := publisher.Close(); closeErr != nil {
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
}
}()
subscriber, err := newAMQPSubscriber(amqpConn, watermillLogger, name)
if err != nil {
return err
}
defer func() {
if closeErr := subscriber.Close(); closeErr != nil {
logger.Warn("couldn't close amqp subscriber", slog.Any("error", err))
}
}()
bunDB, err := newBunDBFromFlags(c)
if err != nil {
return err
}
defer closeBunDB(bunDB, logger)
router, err := newWatermillRouter(watermillLogger)
if err != nil {
return err
}
if err = registerHandlers(
c,
router,
watermillLogger,
publisher,
subscriber,
newWatermillMarshaler(),
bunDB,
); err != nil {
return err
}
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
var wg sync.WaitGroup
defer func() {
// it's required for the graceful shutdown
wg.Wait()
}()
healthObserver := healthfile.LiveObserver(health.New(), "/tmp/live")
defer func() {
if closeErr := healthObserver.Close(); closeErr != nil {
logger.Warn("couldn't close health observer", slog.Any("error", closeErr))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if runErr := healthObserver.Run(ctx); runErr != nil {
logger.Warn("couldn't run health observer", slog.Any("error", runErr))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-router.Running():
logger.Info("consumer is up and running", slog.String("name", name))
}
waitForShutdownSignal(ctx, logger)
if closeErr := router.Close(); closeErr != nil {
logger.Warn("couldn't close router", slog.Any("error", err))
}
}()
if err = router.Run(ctx); err != nil {
cancel()
return fmt.Errorf("couldn't run router: %w", err)
}
return nil
}
const (
watermillRouterCloseTimeout = 30 * time.Second
watermillRetryMWMaxRetries = 3
watermillRetryMWMInitialInterval = 3 * time.Second
watermillRetryMWMMaxInterval = 10 * time.Second
watermillRetryMWMIntervalMultiplier = 1.5
watermillTimeout = 2 * time.Minute
)
func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
router, err := message.NewRouter(message.RouterConfig{
CloseTimeout: watermillRouterCloseTimeout,
}, logger)
if err != nil {
return nil, err
}
router.AddMiddleware(middleware.Recoverer)
router.AddMiddleware(middleware.Timeout(watermillTimeout))
router.AddMiddleware(middleware.Retry{
MaxRetries: watermillRetryMWMaxRetries,
InitialInterval: watermillRetryMWMInitialInterval,
MaxInterval: watermillRetryMWMMaxInterval,
Multiplier: watermillRetryMWMIntervalMultiplier,
Logger: logger,
}.Middleware)
return router, nil
}