package main import ( "log/slog" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/urfave/cli/v2" ) var ( rmqFlagConnectionString = &cli.StringFlag{ Name: "rabbitmq.connectionString", Required: true, EnvVars: []string{"RABBITMQ_CONNECTION_STRING"}, Usage: "https://www.rabbitmq.com/uri-spec.html", } rmqFlags = []cli.Flag{ rmqFlagConnectionString, } ) func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger) } func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { return amqp.NewConnection(amqp.ConnectionConfig{ AmqpURI: connString, }, logger) } // we will need this function later // //nolint:unused func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) { return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn) } // we will need this function later // //nolint:unused func newAMQPSubscriber( conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter, queueNameSuffix string, ) (*amqp.Subscriber, error) { return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn) } // we will need this function later // //nolint:unused func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config { cfg := amqp.NewDurablePubSubConfig("", generateQueueName) cfg.Consume.NoRequeueOnNack = true cfg.Consume.Qos.PrefetchCount = 1 cfg.Exchange.GenerateName = generateExchangeName return cfg } // we will need this function later // //nolint:unused func generateExchangeName(topic string) string { return "twhelp_" + topic } // we will need this function later // //nolint:unused func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator { return func(topic string) string { return generateExchangeName(topic) + "_" + suffix } } func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter { return watermill.NewSlogLogger(logger) }