206 lines
4.9 KiB
Go
206 lines
4.9 KiB
Go
package watermillamqptest
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"time"
|
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermilltest"
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/ory/dockertest/v3"
|
|
"github.com/ory/dockertest/v3/docker"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type RabbitMQ struct {
|
|
connectionString *url.URL
|
|
resource *dockertest.Resource
|
|
}
|
|
|
|
type rabbitMQConfig struct {
|
|
repo string
|
|
tag string
|
|
ttl uint
|
|
}
|
|
|
|
type RabbitMQOption func(cfg *rabbitMQConfig)
|
|
|
|
const rabbitMQDefaultTTL = 60
|
|
|
|
func newRabbitMQConfig(opts ...RabbitMQOption) *rabbitMQConfig {
|
|
cfg := &rabbitMQConfig{
|
|
repo: "rabbitmq",
|
|
tag: "3.12.10-alpine",
|
|
ttl: rabbitMQDefaultTTL,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(cfg)
|
|
}
|
|
|
|
return cfg
|
|
}
|
|
|
|
func WithRabbitMQTTL(ttlSeconds uint) RabbitMQOption {
|
|
return func(cfg *rabbitMQConfig) {
|
|
cfg.ttl = ttlSeconds
|
|
}
|
|
}
|
|
|
|
func WithRabbitMQImage(image string) RabbitMQOption {
|
|
return func(cfg *rabbitMQConfig) {
|
|
cfg.repo, cfg.tag = docker.ParseRepositoryTag(image)
|
|
}
|
|
}
|
|
|
|
// NewRabbitMQ constructs a new RabbitMQ resource. If the env variable 'TESTS_RABBITMQ_CONNECTION_STRING' is set,
|
|
// this function doesn't run a new Docker container and uses the value of this variable as a connection string,
|
|
// otherwise this function runs a new RabbitMQ instance in a Docker container.
|
|
// This function is intended for use in TestMain.
|
|
func NewRabbitMQ(pool *dockertest.Pool, opts ...RabbitMQOption) (*RabbitMQ, error) {
|
|
cfg := newRabbitMQConfig(opts...)
|
|
|
|
if connString := os.Getenv("TESTS_RABBITMQ_CONNECTION_STRING"); connString != "" {
|
|
u, err := url.ParseRequestURI(connString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &RabbitMQ{
|
|
connectionString: u,
|
|
}, nil
|
|
}
|
|
|
|
u := &url.URL{
|
|
Scheme: "amqp",
|
|
User: url.UserPassword("rmq", "rmq"),
|
|
Path: "/",
|
|
}
|
|
|
|
pw, _ := u.User.Password()
|
|
|
|
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
|
|
Repository: cfg.repo,
|
|
Tag: cfg.tag,
|
|
Env: []string{
|
|
fmt.Sprintf("RABBITMQ_DEFAULT_USER=%s", u.User.Username()),
|
|
fmt.Sprintf("RABBITMQ_DEFAULT_PASS=%s", pw),
|
|
},
|
|
}, func(config *docker.HostConfig) {
|
|
config.AutoRemove = true
|
|
config.RestartPolicy = docker.RestartPolicy{
|
|
Name: "no",
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't run rabbitmq: %w", err)
|
|
}
|
|
|
|
if err = resource.Expire(cfg.ttl); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u.Host, err = getHostPort(resource, "5672/tcp")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &RabbitMQ{
|
|
connectionString: u,
|
|
resource: resource,
|
|
}, nil
|
|
}
|
|
|
|
const (
|
|
rabbitMQOpenConnBackOffMaxInterval = 5 * time.Second
|
|
rabbitMQOpenConnBackOffMaxElapsedTime = 30 * time.Second
|
|
)
|
|
|
|
func (rmq *RabbitMQ) NewConnection(tb watermilltest.TestingTB) *amqp.ConnectionWrapper {
|
|
tb.Helper()
|
|
|
|
require.NotNil(tb, rmq, "rabbit mq resource not property initialized")
|
|
require.NotNil(tb, rmq.connectionString, "rabbit mq resource not properly initialized")
|
|
|
|
connConfig := amqp.ConnectionConfig{AmqpURI: rmq.connectionString.String()}
|
|
|
|
bo := backoff.NewExponentialBackOff()
|
|
bo.MaxInterval = rabbitMQOpenConnBackOffMaxInterval
|
|
bo.MaxElapsedTime = rabbitMQOpenConnBackOffMaxElapsedTime
|
|
require.NoError(tb, backoff.Retry(func() error {
|
|
conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_ = conn.Close()
|
|
return nil
|
|
}, bo))
|
|
|
|
conn, err := amqp.NewConnection(connConfig, watermill.NopLogger{})
|
|
require.NoError(tb, err)
|
|
tb.Cleanup(func() {
|
|
_ = conn.Close()
|
|
})
|
|
|
|
return conn
|
|
}
|
|
|
|
func (rmq *RabbitMQ) Close() error {
|
|
if rmq != nil && rmq.resource != nil {
|
|
if err := rmq.resource.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type TopicNameGenerator amqp.QueueNameGenerator
|
|
|
|
func NewPubSub(
|
|
tb watermilltest.TestingTB,
|
|
conn *amqp.ConnectionWrapper,
|
|
queueNameGenerator amqp.QueueNameGenerator,
|
|
exchangeAndRoutingKeyNameGenerator TopicNameGenerator,
|
|
) (*amqp.Publisher, *amqp.Subscriber) {
|
|
tb.Helper()
|
|
|
|
cfg := amqp.NewNonDurablePubSubConfig("", queueNameGenerator)
|
|
cfg.Consume.NoRequeueOnNack = true
|
|
cfg.Consume.Qos.PrefetchCount = 1
|
|
cfg.Exchange.GenerateName = exchangeAndRoutingKeyNameGenerator
|
|
cfg.QueueBind.GenerateRoutingKey = exchangeAndRoutingKeyNameGenerator
|
|
|
|
publisher, err := amqp.NewPublisherWithConnection(cfg, watermill.NopLogger{}, conn)
|
|
require.NoError(tb, err)
|
|
tb.Cleanup(func() {
|
|
_ = publisher.Close()
|
|
})
|
|
|
|
subscriber, err := amqp.NewSubscriberWithConnection(cfg, watermill.NopLogger{}, conn)
|
|
require.NoError(tb, err)
|
|
tb.Cleanup(func() {
|
|
_ = subscriber.Close()
|
|
})
|
|
|
|
return publisher, subscriber
|
|
}
|
|
|
|
func getHostPort(resource *dockertest.Resource, id string) (string, error) {
|
|
dockerURL := os.Getenv("DOCKER_HOST")
|
|
if dockerURL == "" {
|
|
return resource.GetHostPort(id), nil
|
|
}
|
|
|
|
u, err := url.Parse(dockerURL)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return net.JoinHostPort(u.Hostname(), resource.GetPort(id)), nil
|
|
}
|