143 lines
4.9 KiB
Go
143 lines
4.9 KiB
Go
package main
|
|
|
|
import (
|
|
"log/slog"
|
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
var (
|
|
rmqFlagConnectionString = &cli.StringFlag{
|
|
Name: "rabbitmq.connectionString",
|
|
Required: true,
|
|
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
|
|
Usage: "https://www.rabbitmq.com/uri-spec.html",
|
|
}
|
|
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.syncServersCmd",
|
|
Value: "servers.cmd.sync",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
|
}
|
|
rmqFlagTopicServerSyncedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.serverSyncedEvent",
|
|
Value: "servers.event.synced",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
|
|
}
|
|
rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.tribesSyncedEvent",
|
|
Value: "tribes.event.synced",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
|
|
}
|
|
rmqFlagTopicCreateTribeSnapshotsCmd = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.createTribeSnapshotsCmd",
|
|
Value: "tribes.cmd.create_snapshots",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_TRIBE_SNAPSHOTS_CMD"},
|
|
}
|
|
rmqFlagTopicTribeSnapshotsCreatedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.tribeSnapshotsCreatedEvent",
|
|
Value: "tribes.event.snapshots_created",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_TRIBE_SNAPSHOTS_CREATED_EVENT"},
|
|
}
|
|
rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.playersSyncedEvent",
|
|
Value: "players.event.synced",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"},
|
|
}
|
|
rmqFlagTopicCreatePlayerSnapshotsCmd = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.createPlayerSnapshotsCmd",
|
|
Value: "players.cmd.create_snapshots",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_CREATE_PLAYER_SNAPSHOTS_CMD"},
|
|
}
|
|
rmqFlagTopicPlayerSnapshotsCreatedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.playerSnapshotsCreatedEvent",
|
|
Value: "players.event.snapshots_created",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_PLAYER_SNAPSHOTS_CREATED_EVENT"},
|
|
}
|
|
rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.villagesSyncedEvent",
|
|
Value: "villages.event.synced",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
|
|
}
|
|
rmqFlagTopicSyncEnnoblementsCmd = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.syncEnnoblementsCmd",
|
|
Value: "ennoblements.cmd.sync",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_ENNOBLEMENTS_CMD"},
|
|
}
|
|
rmqFlagTopicEnnoblementsSyncedEvent = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.ennoblementsSyncedEvent",
|
|
Value: "ennoblements.event.synced",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_ENNOBLEMENTS_SYNCED_EVENT"},
|
|
}
|
|
rmqFlagTopicCleanUpDataCmd = &cli.StringFlag{
|
|
Name: "rabbitmq.topic.cleanUpDataCmd",
|
|
Value: "all.cmd.clean_up",
|
|
EnvVars: []string{"RABBITMQ_TOPIC_CLEAN_UP_DATA_CMD"},
|
|
}
|
|
rmqFlags = []cli.Flag{
|
|
rmqFlagConnectionString,
|
|
rmqFlagTopicSyncServersCmd,
|
|
rmqFlagTopicServerSyncedEvent,
|
|
rmqFlagTopicTribesSyncedEvent,
|
|
rmqFlagTopicCreateTribeSnapshotsCmd,
|
|
rmqFlagTopicTribeSnapshotsCreatedEvent,
|
|
rmqFlagTopicPlayersSyncedEvent,
|
|
rmqFlagTopicCreatePlayerSnapshotsCmd,
|
|
rmqFlagTopicPlayerSnapshotsCreatedEvent,
|
|
rmqFlagTopicVillagesSyncedEvent,
|
|
rmqFlagTopicSyncEnnoblementsCmd,
|
|
rmqFlagTopicEnnoblementsSyncedEvent,
|
|
rmqFlagTopicCleanUpDataCmd,
|
|
}
|
|
)
|
|
|
|
func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
|
|
return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger)
|
|
}
|
|
|
|
func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
|
|
return amqp.NewConnection(amqp.ConnectionConfig{
|
|
AmqpURI: connString,
|
|
}, logger)
|
|
}
|
|
|
|
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
|
|
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
|
|
}
|
|
|
|
func newAMQPSubscriber(
|
|
conn *amqp.ConnectionWrapper,
|
|
logger watermill.LoggerAdapter,
|
|
queueNameSuffix string,
|
|
) (*amqp.Subscriber, error) {
|
|
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
|
|
}
|
|
|
|
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
|
|
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
|
|
cfg.Consume.NoRequeueOnNack = true
|
|
cfg.Consume.Qos.PrefetchCount = 1
|
|
cfg.Exchange.GenerateName = generateExchangeName
|
|
return cfg
|
|
}
|
|
|
|
func generateExchangeName(topic string) string {
|
|
return "twhelp_" + topic
|
|
}
|
|
|
|
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
|
return func(topic string) string {
|
|
return generateExchangeName(topic) + "_" + suffix
|
|
}
|
|
}
|
|
|
|
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
|
|
return watermill.NewSlogLogger(logger)
|
|
}
|
|
|
|
func newWatermillMarshaler() watermillmsg.Marshaler {
|
|
return watermillmsg.JSONMarshaler{}
|
|
}
|