2023-12-16 08:03:02 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"log/slog"
|
|
|
|
|
2023-12-21 06:28:53 +00:00
|
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
2023-12-16 08:03:02 +00:00
|
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
|
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
|
|
|
"github.com/urfave/cli/v2"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
rmqFlagConnectionString = &cli.StringFlag{
|
|
|
|
Name: "rabbitmq.connectionString",
|
|
|
|
Required: true,
|
|
|
|
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
|
|
|
|
Usage: "https://www.rabbitmq.com/uri-spec.html",
|
|
|
|
}
|
2023-12-21 06:28:53 +00:00
|
|
|
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
|
|
|
|
Name: "rabbitmq.topic.syncServersCmd",
|
|
|
|
Value: "servers.cmd.sync",
|
|
|
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
|
|
|
}
|
2023-12-25 09:53:01 +00:00
|
|
|
rmqFlagTopicServerSyncedEvent = &cli.StringFlag{
|
|
|
|
Name: "rabbitmq.topic.serverSyncedEvent",
|
|
|
|
Value: "servers.event.synced",
|
|
|
|
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
|
|
|
|
}
|
2023-12-16 08:03:02 +00:00
|
|
|
rmqFlags = []cli.Flag{
|
|
|
|
rmqFlagConnectionString,
|
2023-12-21 06:28:53 +00:00
|
|
|
rmqFlagTopicSyncServersCmd,
|
2023-12-25 09:53:01 +00:00
|
|
|
rmqFlagTopicServerSyncedEvent,
|
2023-12-16 08:03:02 +00:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
|
|
|
|
return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
|
|
|
|
return amqp.NewConnection(amqp.ConnectionConfig{
|
|
|
|
AmqpURI: connString,
|
|
|
|
}, logger)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
|
|
|
|
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newAMQPSubscriber(
|
|
|
|
conn *amqp.ConnectionWrapper,
|
|
|
|
logger watermill.LoggerAdapter,
|
|
|
|
queueNameSuffix string,
|
|
|
|
) (*amqp.Subscriber, error) {
|
|
|
|
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
|
|
|
|
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
|
|
|
|
cfg.Consume.NoRequeueOnNack = true
|
|
|
|
cfg.Consume.Qos.PrefetchCount = 1
|
|
|
|
cfg.Exchange.GenerateName = generateExchangeName
|
|
|
|
return cfg
|
|
|
|
}
|
|
|
|
|
|
|
|
func generateExchangeName(topic string) string {
|
|
|
|
return "twhelp_" + topic
|
|
|
|
}
|
|
|
|
|
|
|
|
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
|
|
|
return func(topic string) string {
|
|
|
|
return generateExchangeName(topic) + "_" + suffix
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
|
|
|
|
return watermill.NewSlogLogger(logger)
|
|
|
|
}
|
2023-12-21 06:28:53 +00:00
|
|
|
|
|
|
|
func newWatermillMarshaler() watermillmsg.Marshaler {
|
|
|
|
return watermillmsg.JSONMarshaler{}
|
|
|
|
}
|