438 lines
11 KiB
Go
438 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/adapter"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/app"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/health"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/health/healthfile"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/port"
|
|
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
|
|
"github.com/uptrace/bun"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
var cmdConsumer = &cli.Command{
|
|
Name: "consumer",
|
|
Usage: "Run one of the workers",
|
|
Subcommands: []*cli.Command{
|
|
{
|
|
Name: "server",
|
|
Usage: "Run the worker responsible for consuming server-related messages",
|
|
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
|
|
Action: func(c *cli.Context) error {
|
|
return runConsumer(
|
|
c,
|
|
"ServerConsumer",
|
|
func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error {
|
|
serverPublisher := adapter.NewServerWatermillPublisher(
|
|
publisher,
|
|
marshaler,
|
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
)
|
|
|
|
twSvc, err := newTWServiceFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
consumer := port.NewServerWatermillConsumer(
|
|
app.NewServerService(adapter.NewServerBunRepository(db), twSvc, serverPublisher),
|
|
subscriber,
|
|
logger,
|
|
marshaler,
|
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
c.String(rmqFlagTopicTribesSyncedEvent.Name),
|
|
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
|
|
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
|
|
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
|
|
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
|
|
c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name),
|
|
)
|
|
consumer.Register(router)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
},
|
|
},
|
|
{
|
|
Name: "tribe",
|
|
Usage: "Run the worker responsible for consuming tribe-related messages",
|
|
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
|
|
Action: func(c *cli.Context) error {
|
|
return runConsumer(
|
|
c,
|
|
"TribeConsumer",
|
|
func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error {
|
|
tribePublisher := adapter.NewTribeWatermillPublisher(
|
|
publisher,
|
|
marshaler,
|
|
c.String(rmqFlagTopicTribesSyncedEvent.Name),
|
|
)
|
|
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
|
|
)
|
|
|
|
twSvc, err := newTWServiceFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tribeSvc := app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher)
|
|
tribeSnapshotSvc := app.NewTribeSnapshotService(
|
|
adapter.NewTribeSnapshotBunRepository(db),
|
|
tribeSvc,
|
|
tribeSnapshotPublisher,
|
|
)
|
|
|
|
consumer := port.NewTribeWatermillConsumer(
|
|
tribeSvc,
|
|
tribeSnapshotSvc,
|
|
subscriber,
|
|
logger,
|
|
marshaler,
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
|
|
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicCleanUpDataCmd.Name),
|
|
)
|
|
consumer.Register(router)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
},
|
|
},
|
|
{
|
|
Name: "player",
|
|
Usage: "Run the worker responsible for consuming player-related messages",
|
|
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
|
|
Action: func(c *cli.Context) error {
|
|
return runConsumer(
|
|
c,
|
|
"PlayerConsumer",
|
|
func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error {
|
|
playerPublisher := adapter.NewPlayerWatermillPublisher(
|
|
publisher,
|
|
marshaler,
|
|
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
|
|
)
|
|
playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
publisher,
|
|
newWatermillMarshaler(),
|
|
c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicPlayerSnapshotsCreatedEvent.Name),
|
|
)
|
|
|
|
twSvc, err := newTWServiceFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tribeChangeSvc := app.NewTribeChangeService(adapter.NewTribeChangeBunRepository(db))
|
|
playerSvc := app.NewPlayerService(
|
|
adapter.NewPlayerBunRepository(db),
|
|
tribeChangeSvc,
|
|
twSvc,
|
|
playerPublisher,
|
|
)
|
|
playerSnapshotSvc := app.NewPlayerSnapshotService(
|
|
adapter.NewPlayerSnapshotBunRepository(db),
|
|
playerSvc,
|
|
playerSnapshotPublisher,
|
|
)
|
|
|
|
consumer := port.NewPlayerWatermillConsumer(
|
|
playerSvc,
|
|
playerSnapshotSvc,
|
|
subscriber,
|
|
logger,
|
|
marshaler,
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
c.String(rmqFlagTopicCreatePlayerSnapshotsCmd.Name),
|
|
c.String(rmqFlagTopicCleanUpDataCmd.Name),
|
|
)
|
|
consumer.Register(router)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
},
|
|
},
|
|
{
|
|
Name: "village",
|
|
Usage: "Run the worker responsible for consuming village-related messages",
|
|
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
|
|
Action: func(c *cli.Context) error {
|
|
return runConsumer(
|
|
c,
|
|
"VillageConsumer",
|
|
func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error {
|
|
villagePublisher := adapter.NewVillageWatermillPublisher(
|
|
publisher,
|
|
marshaler,
|
|
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
|
|
)
|
|
|
|
twSvc, err := newTWServiceFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
consumer := port.NewVillageWatermillConsumer(
|
|
app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher),
|
|
subscriber,
|
|
logger,
|
|
marshaler,
|
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
|
)
|
|
consumer.Register(router)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
},
|
|
},
|
|
{
|
|
Name: "ennoblement",
|
|
Usage: "Run the worker responsible for consuming ennoblement-related messages",
|
|
Flags: slices.Concat(dbFlags, rmqFlags, twSvcFlags),
|
|
Action: func(c *cli.Context) error {
|
|
return runConsumer(
|
|
c,
|
|
"EnnoblementConsumer",
|
|
func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error {
|
|
ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher(
|
|
publisher,
|
|
marshaler,
|
|
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
|
|
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
|
|
)
|
|
|
|
twSvc, err := newTWServiceFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
consumer := port.NewEnnoblementWatermillConsumer(
|
|
app.NewEnnoblementService(adapter.NewEnnoblementBunRepository(db), twSvc, ennoblementPublisher),
|
|
subscriber,
|
|
logger,
|
|
marshaler,
|
|
c.String(rmqFlagTopicSyncEnnoblementsCmd.Name),
|
|
c.String(rmqFlagTopicCleanUpDataCmd.Name),
|
|
)
|
|
consumer.Register(router)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
type registerConsumerHandlersFunc func(
|
|
c *cli.Context,
|
|
router *message.Router,
|
|
logger watermill.LoggerAdapter,
|
|
publisher *amqp.Publisher,
|
|
subscriber *amqp.Subscriber,
|
|
marshaler watermillmsg.Marshaler,
|
|
db *bun.DB,
|
|
) error
|
|
|
|
//nolint:gocyclo
|
|
func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error {
|
|
logger := loggerFromCtx(c.Context)
|
|
watermillLogger := newWatermillLogger(logger)
|
|
|
|
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := publisher.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
subscriber, err := newAMQPSubscriber(amqpConn, watermillLogger, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if closeErr := subscriber.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close amqp subscriber", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
bunDB, err := newBunDBFromFlags(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer closeBunDB(bunDB, logger)
|
|
|
|
router, err := newWatermillRouter(watermillLogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = registerHandlers(
|
|
c,
|
|
router,
|
|
watermillLogger,
|
|
publisher,
|
|
subscriber,
|
|
newWatermillMarshaler(),
|
|
bunDB,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(c.Context)
|
|
defer cancel()
|
|
|
|
var wg sync.WaitGroup
|
|
defer func() {
|
|
// it's required for the graceful shutdown
|
|
wg.Wait()
|
|
}()
|
|
|
|
healthObserver := healthfile.LiveObserver(health.New(), "/tmp/live")
|
|
defer func() {
|
|
if closeErr := healthObserver.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close health observer", slog.Any("error", closeErr))
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
if runErr := healthObserver.Run(ctx); runErr != nil {
|
|
logger.Warn("couldn't run health observer", slog.Any("error", runErr))
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-router.Running():
|
|
logger.Info("consumer is up and running", slog.String("name", name))
|
|
}
|
|
|
|
waitForShutdownSignal(ctx, logger)
|
|
|
|
if closeErr := router.Close(); closeErr != nil {
|
|
logger.Warn("couldn't close router", slog.Any("error", err))
|
|
}
|
|
}()
|
|
|
|
if err = router.Run(ctx); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("couldn't run router: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
watermillRouterCloseTimeout = 30 * time.Second
|
|
watermillRetryMWMaxRetries = 3
|
|
watermillRetryMWMInitialInterval = 3 * time.Second
|
|
watermillRetryMWMMaxInterval = 10 * time.Second
|
|
watermillRetryMWMIntervalMultiplier = 1.5
|
|
watermillTimeout = 2 * time.Minute
|
|
)
|
|
|
|
func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
|
|
router, err := message.NewRouter(message.RouterConfig{
|
|
CloseTimeout: watermillRouterCloseTimeout,
|
|
}, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
router.AddMiddleware(middleware.Recoverer)
|
|
router.AddMiddleware(middleware.Timeout(watermillTimeout))
|
|
router.AddMiddleware(middleware.Retry{
|
|
MaxRetries: watermillRetryMWMaxRetries,
|
|
InitialInterval: watermillRetryMWMInitialInterval,
|
|
MaxInterval: watermillRetryMWMMaxInterval,
|
|
Multiplier: watermillRetryMWMIntervalMultiplier,
|
|
Logger: logger,
|
|
}.Middleware)
|
|
|
|
return router, nil
|
|
}
|