package main import ( "log/slog" "gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/urfave/cli/v2" ) var ( rmqFlagConnectionString = &cli.StringFlag{ Name: "rabbitmq.connectionString", Required: true, EnvVars: []string{"RABBITMQ_CONNECTION_STRING"}, Usage: "https://www.rabbitmq.com/uri-spec.html", } rmqFlagTopicSyncServersCmd = &cli.StringFlag{ Name: "rabbitmq.topic.syncServersCmd", Value: "servers.cmd.sync", EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"}, } rmqFlagTopicServerSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.serverSyncedEvent", Value: "servers.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"}, } rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.tribesSyncedEvent", Value: "tribes.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"}, } rmqFlagTopicCreateServerSnapshotCmd = &cli.StringFlag{ Name: "rabbitmq.topic.createServerSnapshotCmd", Value: "servers.cmd.create_snapshot", EnvVars: []string{"RABBITMQ_TOPIC_CREATE_SERVER_SNAPSHOT_CMD"}, } rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{ Name: "rabbitmq.topic.createTribeSnapshotsCmd", Value: "tribes.cmd.create_snapshots", EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"}, } rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent", Value: "tribes.event.snapshots_created", EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"}, } rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.playersSyncedEvent", Value: "players.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"}, } rmqFlagTopicCreatePlayerSnapshotsCmd = &cli.StringFlag{ Name: "rabbitmq.topic.createPlayerSnapshotsCmd", Value: "players.cmd.create_snapshots", EnvVars: []string{"RABBITMQ_TOPIC_CREATE_PLAYER_SNAPSHOTS_CMD"}, } rmqFlagTopicPlayerSnapshotsCreatedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.playerSnapshotsCreatedEvent", Value: "players.event.snapshots_created", EnvVars: []string{"RABBITMQ_TOPIC_PLAYER_SNAPSHOTS_CREATED_EVENT"}, } rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.villagesSyncedEvent", Value: "villages.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"}, } rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{ Name: "rabbitmq.topic.syncEnnoblementsCmd", Value: "ennoblements.cmd.sync", EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"}, } rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.ennoblementsSyncedEvent", Value: "ennoblements.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"}, } rmqFlagTopicCleanUpDataCmd = &cli.StringFlag{ Name: "rabbitmq.topic.cleanUpDataCmd", Value: "all.cmd.clean_up", EnvVars: []string{"RABBITMQ_TOPIC_CLEAN_UP_DATA_CMD"}, } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, rmqFlagTopicCreateServerSnapshotCmd, rmqFlagTopicServerSyncedEvent, rmqFlagTopicTribesSyncedEvent, rmqFlagTopicCreateTribeSnapshotsCmd, rmqFlagTopicTribeSnapshotsCreatedEvent, rmqFlagTopicPlayersSyncedEvent, rmqFlagTopicCreatePlayerSnapshotsCmd, rmqFlagTopicPlayerSnapshotsCreatedEvent, rmqFlagTopicVillagesSyncedEvent, rmqFlagTopicSyncEnnoblementsCmd, rmqFlagTopicEnnoblementsSyncedEvent, rmqFlagTopicCleanUpDataCmd, } ) func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger) } func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { return amqp.NewConnection(amqp.ConnectionConfig{ AmqpURI: connString, }, logger) } func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) { return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn) } func newAMQPSubscriber( conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter, queueNameSuffix string, ) (*amqp.Subscriber, error) { return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn) } func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config { cfg := amqp.NewDurablePubSubConfig("", generateQueueName) cfg.Consume.NoRequeueOnNack = true cfg.Consume.Qos.PrefetchCount = 1 cfg.Exchange.GenerateName = generateExchangeName return cfg } func generateExchangeName(topic string) string { return "twhelp_" + topic } func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator { return func(topic string) string { return generateExchangeName(topic) + "_" + suffix } } func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter { return watermill.NewSlogLogger(logger) } func newWatermillMarshaler() watermillmsg.Marshaler { return watermillmsg.JSONMarshaler{} }