This repository has been archived on 2024-04-06. You can view files and clone it, but cannot push or open issues or pull requests.
core-old/internal/msg/msg_test.go
Dawid Wysokiński 17d5068238
All checks were successful
continuous-integration/drone/push Build is passing
refactor(msg): refactor integration tests
2023-02-23 07:45:52 +01:00

217 lines
5.0 KiB
Go

package msg_test
import (
"context"
"errors"
"fmt"
"net"
urlpkg "net/url"
"os"
"testing"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/cenkalti/backoff/v4"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type registerer interface {
Register(router *message.Router)
}
func runRouter(tb testing.TB, rs ...registerer) *message.Router {
tb.Helper()
return runRouterWithContext(tb, context.Background(), rs...)
}
func runRouterWithContext(tb testing.TB, ctx context.Context, rs ...registerer) *message.Router {
tb.Helper()
router, err := message.NewRouter(message.RouterConfig{CloseTimeout: 10 * time.Second}, watermill.NopLogger{})
require.NoError(tb, err)
tb.Cleanup(func() {
_ = router.Close()
})
for _, r := range rs {
r.Register(router)
}
go func() {
require.NoError(tb, router.Run(ctx))
}()
<-router.Running()
return router
}
type middlewareRegisterer struct {
h message.HandlerMiddleware
}
var _ registerer = middlewareRegisterer{}
func (m middlewareRegisterer) Register(router *message.Router) {
router.AddMiddleware(m.h)
}
func newWaitForHandlerMiddleware(handlerName string) (middlewareRegisterer, <-chan struct{}) {
ch := make(chan struct{})
return middlewareRegisterer{
h: func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if message.HandlerNameFromCtx(msg.Context()) != handlerName {
return h(msg)
}
result, err := h(msg)
if len(result) == 0 && err == nil {
close(ch)
}
return result, err
}
},
}, ch
}
func newPubSub(tb testing.TB) *gochannel.GoChannel {
tb.Helper()
pubSub := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, watermill.NopLogger{})
tb.Cleanup(func() {
_ = pubSub.Close()
})
return pubSub
}
func newRabbitMQConnection(tb testing.TB) *amqp.ConnectionWrapper {
tb.Helper()
if uri, ok := os.LookupEnv("TESTS_AMQP_URI"); ok {
return newRabbitMQConnectionWithURI(tb, uri)
}
uri := &urlpkg.URL{
Scheme: "amqp",
User: urlpkg.UserPassword("rmq", "rmq"),
Path: "/",
}
pool, err := dockertest.NewPool("")
require.NoError(tb, err, "couldn't connect to docker")
pool.MaxWait = 20 * time.Second
pw, _ := uri.User.Password()
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "rabbitmq",
Tag: "3.11.9-alpine",
Env: []string{
fmt.Sprintf("RABBITMQ_DEFAULT_USER=%s", uri.User.Username()),
fmt.Sprintf("RABBITMQ_DEFAULT_PASS=%s", pw),
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
require.NoError(tb, err, "couldn't start resource")
tb.Cleanup(func() {
_ = pool.Purge(resource)
})
assert.NoError(tb, resource.Expire(180))
uri.Host = getHostPort(tb, resource, "5672/tcp")
return newRabbitMQConnectionWithURI(tb, uri.String())
}
func newRabbitMQConnectionWithURI(tb testing.TB, uri string) *amqp.ConnectionWrapper {
tb.Helper()
require.NoError(tb, retry(func() error {
conn, err := amqp.NewConnection(amqp.ConnectionConfig{AmqpURI: uri}, watermill.NopLogger{})
if err != nil {
return err
}
_ = conn.Close()
return nil
}))
conn, err := amqp.NewConnection(amqp.ConnectionConfig{AmqpURI: uri}, watermill.NopLogger{})
require.NoError(tb, err)
tb.Cleanup(func() {
_ = conn.Close()
})
return conn
}
func newRabbitMQPubSub(
tb testing.TB,
conn *amqp.ConnectionWrapper,
queueNameGenerator amqp.QueueNameGenerator,
exchangeAndRoutingKeyNameGenerator func(topic string) string,
) (*amqp.Publisher, *amqp.Subscriber) {
tb.Helper()
cfg := amqp.NewNonDurablePubSubConfig("", queueNameGenerator)
cfg.Consume.NoRequeueOnNack = true
cfg.Consume.Qos.PrefetchCount = 1
cfg.Exchange.GenerateName = exchangeAndRoutingKeyNameGenerator
cfg.QueueBind.GenerateRoutingKey = exchangeAndRoutingKeyNameGenerator
publisher, err := amqp.NewPublisherWithConnection(cfg, watermill.NopLogger{}, conn)
require.NoError(tb, err)
tb.Cleanup(func() {
_ = publisher.Close()
})
subscriber, err := amqp.NewSubscriberWithConnection(cfg, watermill.NopLogger{}, conn)
require.NoError(tb, err)
tb.Cleanup(func() {
_ = subscriber.Close()
})
return publisher, subscriber
}
func getHostPort(tb testing.TB, resource *dockertest.Resource, id string) string {
tb.Helper()
dockerURL := os.Getenv("DOCKER_HOST")
if dockerURL == "" {
return resource.GetHostPort(id)
}
u, err := urlpkg.Parse(dockerURL)
require.NoError(tb, err)
return net.JoinHostPort(u.Hostname(), resource.GetPort(id))
}
func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = 5 * time.Second
bo.MaxElapsedTime = 30 * time.Second
if err := backoff.Retry(op, bo); err != nil {
if bo.NextBackOff() == backoff.Stop {
return errors.New("reached retry deadline")
}
return err
}
return nil
}