package watermillamqptest import ( "fmt" "net" "net/url" "os" "time" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/cenkalti/backoff/v4" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" ) type RabbitMQ struct { connectionString *url.URL resource *dockertest.Resource } type rabbitMQConfig struct { repo string tag string ttl uint } type RabbitMQOption func(cfg *rabbitMQConfig) const rabbitMQDefaultTTL = 60 func newRabbitMQConfig(opts ...RabbitMQOption) *rabbitMQConfig { cfg := &rabbitMQConfig{ repo: "rabbitmq", tag: "3.12.10-alpine", ttl: rabbitMQDefaultTTL, } for _, opt := range opts { opt(cfg) } return cfg } func WithRabbitMQTTL(ttlSeconds uint) RabbitMQOption { return func(cfg *rabbitMQConfig) { cfg.ttl = ttlSeconds } } func WithRabbitMQImage(image string) RabbitMQOption { return func(cfg *rabbitMQConfig) { cfg.repo, cfg.tag = docker.ParseRepositoryTag(image) } } // NewRabbitMQ constructs a new RabbitMQ resource. If the env variable 'TESTS_RABBITMQ_CONNECTION_STRING' is set, // this function doesn't run a new Docker container and uses the value of this variable as a connection string, // otherwise this function runs a new RabbitMQ instance in a Docker container. // This function is intended for use in TestMain. func NewRabbitMQ(pool *dockertest.Pool, opts ...RabbitMQOption) (*RabbitMQ, error) { cfg := newRabbitMQConfig(opts...) if connString := os.Getenv("TESTS_RABBITMQ_CONNECTION_STRING"); connString != "" { u, err := url.ParseRequestURI(connString) if err != nil { return nil, err } return &RabbitMQ{ connectionString: u, }, nil } u := &url.URL{ Scheme: "amqp", User: url.UserPassword("rmq", "rmq"), Path: "/", } pw, _ := u.User.Password() resource, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: cfg.repo, Tag: cfg.tag, Env: []string{ fmt.Sprintf("RABBITMQ_DEFAULT_USER=%s", u.User.Username()), fmt.Sprintf("RABBITMQ_DEFAULT_PASS=%s", pw), }, }, func(config *docker.HostConfig) { config.AutoRemove = true config.RestartPolicy = docker.RestartPolicy{ Name: "no", } }) if err != nil { return nil, fmt.Errorf("couldn't run rabbitmq: %w", err) } if err = resource.Expire(cfg.ttl); err != nil { return nil, err } u.Host, err = getHostPort(resource, "5672/tcp") if err != nil { return nil, err } return &RabbitMQ{ connectionString: u, resource: resource, }, nil } const ( rabbitMQOpenConnBackOffMaxInterval = 5 * time.Second rabbitMQOpenConnBackOffMaxElapsedTime = 30 * time.Second ) func (rmq *RabbitMQ) NewConnection(tb watermilltest.TestingTB) *amqp.ConnectionWrapper { tb.Helper() require.NotNil(tb, rmq, "rabbit mq resource not property initialized") require.NotNil(tb, rmq.connectionString, "rabbit mq resource not properly initialized") connConfig := amqp.ConnectionConfig{AmqpURI: rmq.connectionString.String()} bo := backoff.NewExponentialBackOff() bo.MaxInterval = rabbitMQOpenConnBackOffMaxInterval bo.MaxElapsedTime = rabbitMQOpenConnBackOffMaxElapsedTime require.NoError(tb, backoff.Retry(func() error { conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{}) if err != nil { return err } _ = conn.Close() return nil }, bo)) conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{}) require.NoError(tb, err) tb.Cleanup(func() { _ = conn.Close() }) return conn } func (rmq *RabbitMQ) Close() error { if rmq != nil && rmq.resource != nil { if err := rmq.resource.Close(); err != nil { return err } } return nil } type TopicNameGenerator amqp.QueueNameGenerator func NewPubSub( tb watermilltest.TestingTB, conn *amqp.ConnectionWrapper, queueNameGenerator amqp.QueueNameGenerator, exchangeAndRoutingKeyNameGenerator TopicNameGenerator, ) (*amqp.Publisher, *amqp.Subscriber) { tb.Helper() cfg := amqp.NewNonDurablePubSubConfig("", queueNameGenerator) cfg.Consume.NoRequeueOnNack = true cfg.Consume.Qos.PrefetchCount = 1 cfg.Exchange.GenerateName = exchangeAndRoutingKeyNameGenerator cfg.QueueBind.GenerateRoutingKey = exchangeAndRoutingKeyNameGenerator publisher, err := amqp.NewPublisherWithConnection(cfg, watermill.NopLogger{}, conn) require.NoError(tb, err) tb.Cleanup(func() { _ = publisher.Close() }) subscriber, err := amqp.NewSubscriberWithConnection(cfg, watermill.NopLogger{}, conn) require.NoError(tb, err) tb.Cleanup(func() { _ = subscriber.Close() }) return publisher, subscriber } func getHostPort(resource *dockertest.Resource, id string) (string, error) { dockerURL := os.Getenv("DOCKER_HOST") if dockerURL == "" { return resource.GetHostPort(id), nil } u, err := url.Parse(dockerURL) if err != nil { return "", err } return net.JoinHostPort(u.Hostname(), resource.GetPort(id)), nil }