core/cmd/twhelp/rabbitmq.go

143 lines
4.9 KiB
Go

package main
import (
"log/slog"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/urfave/cli/v2"
)
var (
rmqFlagConnectionString = &cli.StringFlag{
Name: "rabbitmq.connectionString",
Required: true,
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
Usage: "https://www.rabbitmq.com/uri-spec.html",
}
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
Name: "rabbitmq.topic.syncServersCmd",
Value: "servers.cmd.sync",
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
}
rmqFlagTopicServerSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.serverSyncedEvent",
Value: "servers.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
}
rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.tribesSyncedEvent",
Value: "tribes.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
}
rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{
Name: "rabbitmq.topic.createTribeSnapshotsCmd",
Value: "tribes.cmd.create_snapshots",
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"},
}
rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent",
Value: "tribes.event.snapshots_created",
EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"},
}
rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.playersSyncedEvent",
Value: "players.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"},
}
rmqFlagTopicCreatePlayerSnapshotsCmd = &cli.StringFlag{
Name: "rabbitmq.topic.createPlayerSnapshotsCmd",
Value: "players.cmd.create_snapshots",
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_PLAYER_SNAPSHOTS_CMD"},
}
rmqFlagTopicPlayerSnapshotsCreatedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.playerSnapshotsCreatedEvent",
Value: "players.event.snapshots_created",
EnvVars: []string{"RABBITMQ_TOPIC_PLAYER_SNAPSHOTS_CREATED_EVENT"},
}
rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.villagesSyncedEvent",
Value: "villages.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
}
rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{
Name: "rabbitmq.topic.syncEnnoblementsCmd",
Value: "ennoblements.cmd.sync",
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"},
}
rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.ennoblementsSyncedEvent",
Value: "ennoblements.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
}
rmqFlagTopicCleanUpDataCmd = &cli.StringFlag{
Name: "rabbitmq.topic.cleanUpDataCmd",
Value: "all.cmd.clean_up",
EnvVars: []string{"RABBITMQ_TOPIC_CLEAN_UP_DATA_CMD"},
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd,
rmqFlagTopicServerSyncedEvent,
rmqFlagTopicTribesSyncedEvent,
rmqFlagTopicCreateTribeSnapshotsCmd,
rmqFlagTopicTribeSnapshotsCreatedEvent,
rmqFlagTopicPlayersSyncedEvent,
rmqFlagTopicCreatePlayerSnapshotsCmd,
rmqFlagTopicPlayerSnapshotsCreatedEvent,
rmqFlagTopicVillagesSyncedEvent,
rmqFlagTopicSyncEnnoblementsCmd,
rmqFlagTopicEnnoblementsSyncedEvent,
rmqFlagTopicCleanUpDataCmd,
}
)
func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger)
}
func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
return amqp.NewConnection(amqp.ConnectionConfig{
AmqpURI: connString,
}, logger)
}
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
}
func newAMQPSubscriber(
conn *amqp.ConnectionWrapper,
logger watermill.LoggerAdapter,
queueNameSuffix string,
) (*amqp.Subscriber, error) {
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
}
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
cfg.Consume.NoRequeueOnNack = true
cfg.Consume.Qos.PrefetchCount = 1
cfg.Exchange.GenerateName = generateExchangeName
return cfg
}
func generateExchangeName(topic string) string {
return "twhelp_" + topic
}
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
return func(topic string) string {
return generateExchangeName(topic) + "_" + suffix
}
}
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
return watermill.NewSlogLogger(logger)
}
func newWatermillMarshaler() watermillmsg.Marshaler {
return watermillmsg.JSONMarshaler{}
}