core/internal/watermill/watermillamqptest/watermillamqptest.go

206 lines
4.9 KiB
Go

package watermillamqptest
import (
"fmt"
"net"
"net/url"
"os"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermilltest"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/cenkalti/backoff/v4"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
)
type RabbitMQ struct {
connectionString *url.URL
resource *dockertest.Resource
}
type rabbitMQConfig struct {
repo string
tag string
ttl uint
}
type RabbitMQOption func(cfg *rabbitMQConfig)
const rabbitMQDefaultTTL = 60
func newRabbitMQConfig(opts ...RabbitMQOption) *rabbitMQConfig {
cfg := &rabbitMQConfig{
repo: "rabbitmq",
tag: "3.12.10-alpine",
ttl: rabbitMQDefaultTTL,
}
for _, opt := range opts {
opt(cfg)
}
return cfg
}
func WithRabbitMQTTL(ttlSeconds uint) RabbitMQOption {
return func(cfg *rabbitMQConfig) {
cfg.ttl = ttlSeconds
}
}
func WithRabbitMQImage(image string) RabbitMQOption {
return func(cfg *rabbitMQConfig) {
cfg.repo, cfg.tag = docker.ParseRepositoryTag(image)
}
}
// NewRabbitMQ constructs a new RabbitMQ resource. If the env variable 'TESTS_RABBITMQ_CONNECTION_STRING' is set,
// this function doesn't run a new Docker container and uses the value of this variable as a connection string,
// otherwise this function runs a new RabbitMQ instance in a Docker container.
// This function is intended for use in TestMain.
func NewRabbitMQ(pool *dockertest.Pool, opts ...RabbitMQOption) (*RabbitMQ, error) {
cfg := newRabbitMQConfig(opts...)
if connString := os.Getenv("TESTS_RABBITMQ_CONNECTION_STRING"); connString != "" {
u, err := url.ParseRequestURI(connString)
if err != nil {
return nil, err
}
return &RabbitMQ{
connectionString: u,
}, nil
}
u := &url.URL{
Scheme: "amqp",
User: url.UserPassword("rmq", "rmq"),
Path: "/",
}
pw, _ := u.User.Password()
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: cfg.repo,
Tag: cfg.tag,
Env: []string{
fmt.Sprintf("RABBITMQ_DEFAULT_USER=%s", u.User.Username()),
fmt.Sprintf("RABBITMQ_DEFAULT_PASS=%s", pw),
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
if err != nil {
return nil, fmt.Errorf("couldn't run rabbitmq: %w", err)
}
if err = resource.Expire(cfg.ttl); err != nil {
return nil, err
}
u.Host, err = getHostPort(resource, "5672/tcp")
if err != nil {
return nil, err
}
return &RabbitMQ{
connectionString: u,
resource: resource,
}, nil
}
const (
rabbitMQOpenConnBackOffMaxInterval = 5 * time.Second
rabbitMQOpenConnBackOffMaxElapsedTime = 30 * time.Second
)
func (rmq *RabbitMQ) NewConnection(tb watermilltest.TestingTB) *amqp.ConnectionWrapper {
tb.Helper()
require.NotNil(tb, rmq, "rabbit mq resource not property initialized")
require.NotNil(tb, rmq.connectionString, "rabbit mq resource not properly initialized")
connConfig := amqp.ConnectionConfig{AmqpURI: rmq.connectionString.String()}
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = rabbitMQOpenConnBackOffMaxInterval
bo.MaxElapsedTime = rabbitMQOpenConnBackOffMaxElapsedTime
require.NoError(tb, backoff.Retry(func() error {
conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{})
if err != nil {
return err
}
_ = conn.Close()
return nil
}, bo))
conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{})
require.NoError(tb, err)
tb.Cleanup(func() {
_ = conn.Close()
})
return conn
}
func (rmq *RabbitMQ) Close() error {
if rmq != nil && rmq.resource != nil {
if err := rmq.resource.Close(); err != nil {
return err
}
}
return nil
}
type TopicNameGenerator amqp.QueueNameGenerator
func NewPubSub(
tb watermilltest.TestingTB,
conn *amqp.ConnectionWrapper,
queueNameGenerator amqp.QueueNameGenerator,
exchangeAndRoutingKeyNameGenerator TopicNameGenerator,
) (*amqp.Publisher, *amqp.Subscriber) {
tb.Helper()
cfg := amqp.NewNonDurablePubSubConfig("", queueNameGenerator)
cfg.Consume.NoRequeueOnNack = true
cfg.Consume.Qos.PrefetchCount = 1
cfg.Exchange.GenerateName = exchangeAndRoutingKeyNameGenerator
cfg.QueueBind.GenerateRoutingKey = exchangeAndRoutingKeyNameGenerator
publisher, err := amqp.NewPublisherWithConnection(cfg, watermill.NopLogger{}, conn)
require.NoError(tb, err)
tb.Cleanup(func() {
_ = publisher.Close()
})
subscriber, err := amqp.NewSubscriberWithConnection(cfg, watermill.NopLogger{}, conn)
require.NoError(tb, err)
tb.Cleanup(func() {
_ = subscriber.Close()
})
return publisher, subscriber
}
func getHostPort(resource *dockertest.Resource, id string) (string, error) {
dockerURL := os.Getenv("DOCKER_HOST")
if dockerURL == "" {
return resource.GetHostPort(id), nil
}
u, err := url.Parse(dockerURL)
if err != nil {
return "", err
}
return net.JoinHostPort(u.Hostname(), resource.GetPort(id)), nil
}