This repository has been archived on 2024-04-06. You can view files and clone it, but cannot push or open issues or pull requests.
core-old/internal/msg/integration_test.go
Dawid Wysokiński 9c1b580adb
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
refactor: adapters - rename structs/files
2023-07-22 08:22:28 +02:00

695 lines
19 KiB
Go

package msg_test
import (
"context"
"encoding/json"
"io"
"io/fs"
"net/http"
"net/http/httptest"
urlpkg "net/url"
"os"
"os/signal"
"path"
"strconv"
"syscall"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/adapter"
"gitea.dwysokinski.me/twhelp/core/internal/adapter/bundbtest"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg"
"gitea.dwysokinski.me/twhelp/core/internal/service"
"gitea.dwysokinski.me/twhelp/core/internal/tw"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUpdateData(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 150*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_update_data"
}
publisherMarshaler := adapter.JSONWatermillMarshaler{NewUUID: uuid.NewString}
consumerMarshaler := msg.JSONMarshaler{NewUUID: uuid.NewString}
rmqConn := newRabbitMQConnection(t)
nopLogger := watermill.NopLogger{}
serverPub, serverSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_data_server"),
generateExchangeAndRoutingKeyName,
)
playerPub, playerSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_data_player"),
generateExchangeAndRoutingKeyName,
)
tribePub, tribeSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_data_tribe"),
generateExchangeAndRoutingKeyName,
)
villagePub, villageSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_data_village"),
generateExchangeAndRoutingKeyName,
)
// filesys is set later in this test to os.DirFS("./testdata/updatedata/stage{n}")
var filesys fs.FS
// client
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
url, err := urlpkg.Parse(r.URL.Query().Get("url"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
fname := url.Path + url.Query().Get("func")
f, err := filesys.Open(path.Join(url.Host, fname))
if err != nil {
f, err = filesys.Open(path.Join("default", fname))
}
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
defer func() {
_ = f.Close()
}()
w.WriteHeader(http.StatusOK)
_, _ = io.Copy(w, f)
}))
t.Cleanup(srv.Close)
client := adapter.NewTWHTTPClient(tw.NewClient(tw.WithHTTPClient(&http.Client{
Transport: &urlChangerTransport{
url: srv.URL,
transport: srv.Client().Transport,
},
})))
// repos
db := newBunDB(t)
versionRepo := adapter.NewVersionBunRepository(db)
serverRepo := adapter.NewServerBunRepository(db)
tribeRepo := adapter.NewTribeBunRepository(db)
playerRepo := adapter.NewPlayerBunRepository(db)
villageRepo := adapter.NewVillageBunRepository(db)
tribeChangeRepo := adapter.NewTribeChangeBunRepository(db)
playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db)
tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db)
// services
versionSvc := service.NewVersion(versionRepo)
serverSvc := service.NewServer(serverRepo, client)
tribeChangeSvc := service.NewTribeChange(tribeChangeRepo)
tribeSvc := service.NewTribe(tribeRepo, client)
playerSvc := service.NewPlayer(playerRepo, tribeChangeSvc, client)
villageSvc := service.NewVillage(villageRepo, client)
playerSnapshotSvc := service.NewPlayerSnapshot(playerSnapshotRepo, playerSvc)
tribeSnapshotSvc := service.NewTribeSnapshot(tribeSnapshotRepo, tribeSvc)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
adapter.NewServerWatermillPublisher(serverPub, publisherMarshaler),
nil,
nil,
nil,
)
runRouterWithContext(
t,
ctx,
msg.NewPlayerConsumer(consumerMarshaler, playerPub, playerSub, nopLogger, playerSvc, playerSnapshotSvc),
msg.NewServerConsumer(consumerMarshaler, serverPub, serverSub, nopLogger, serverSvc),
msg.NewTribeConsumer(consumerMarshaler, tribePub, tribeSub, nopLogger, tribeSvc, tribeSnapshotSvc),
msg.NewVillageConsumer(consumerMarshaler, villagePub, villageSub, nopLogger, villageSvc),
)
runUpdateData := func(stage int64) {
filesys = os.DirFS("./testdata/updatedata/stage" + strconv.FormatInt(stage, 10))
now := time.Now()
require.NoError(t, jobSvc.UpdateData(ctx))
var expectedServers []domain.Server
readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers)
assert.Eventuallyf(t, func() bool {
servers, err := serverRepo.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
})
if err != nil {
return false
}
for _, s := range servers {
if !isWithinDuration(now, s.PlayerDataUpdatedAt, time.Minute) ||
!isWithinDuration(now, s.TribeDataUpdatedAt, time.Minute) ||
!isWithinDuration(now, s.VillageDataUpdatedAt, time.Minute) {
return false
}
}
return cmp.Equal(servers, expectedServers, cmpopts.IgnoreTypes(time.Time{}))
}, 60*time.Second, time.Second, "servers - stage %d", stage)
require.NoError(t, ctx.Err())
var expectedPlayers []domain.Player
readJSONFile(t, filesys, path.Join("expected", "players.json"), &expectedPlayers)
assert.Eventuallyf(t, func() bool {
players, err := playerRepo.List(ctx, domain.ListPlayersParams{
Sort: []domain.PlayerSort{
{By: domain.PlayerSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil && len(players) != len(expectedPlayers) {
return false
}
for i, p := range players {
if p.CreatedAt.IsZero() ||
p.BestRankAt.IsZero() ||
p.MostVillagesAt.IsZero() ||
p.MostPointsAt.IsZero() ||
(p.DeletedAt.IsZero() && !expectedPlayers[i].DeletedAt.IsZero()) ||
(!p.DeletedAt.IsZero() && expectedPlayers[i].DeletedAt.IsZero()) {
return false
}
}
return cmp.Equal(players, expectedPlayers, cmpopts.IgnoreTypes(time.Time{}))
}, 60*time.Second, time.Second, "players - stage %d", stage)
require.NoError(t, ctx.Err())
var expectedTribes []domain.Tribe
readJSONFile(t, filesys, path.Join("expected", "tribes.json"), &expectedTribes)
assert.Eventuallyf(t, func() bool {
tribes, err := tribeRepo.List(ctx, domain.ListTribesParams{
Sort: []domain.TribeSort{
{By: domain.TribeSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil || len(tribes) != len(expectedTribes) {
return false
}
for i, trb := range tribes {
if trb.CreatedAt.IsZero() ||
trb.BestRankAt.IsZero() ||
trb.MostVillagesAt.IsZero() ||
trb.MostPointsAt.IsZero() ||
(trb.DeletedAt.IsZero() && !expectedTribes[i].DeletedAt.IsZero()) ||
(!trb.DeletedAt.IsZero() && expectedTribes[i].DeletedAt.IsZero()) {
return false
}
}
return cmp.Equal(tribes, expectedTribes, cmpopts.IgnoreTypes(time.Time{}))
}, 60*time.Second, time.Second, "tribes - stage %d", stage)
require.NoError(t, ctx.Err())
var expectedVillages []domain.Village
readJSONFile(t, filesys, path.Join("expected", "villages.json"), &expectedVillages)
assert.Eventuallyf(t, func() bool {
villages, err := villageRepo.List(ctx, domain.ListVillagesParams{})
if err != nil {
return false
}
for _, v := range villages {
if v.CreatedAt.IsZero() {
return false
}
}
return cmp.Equal(villages, expectedVillages, cmpopts.IgnoreTypes(time.Time{}))
}, 60*time.Second, time.Second, "villages - stage %d", stage)
require.NoError(t, ctx.Err())
var expectedTribeChanges []domain.TribeChange
readJSONFile(t, filesys, path.Join("expected", "tribe-changes.json"), &expectedTribeChanges)
assert.Eventuallyf(t, func() bool {
tcs, err := tribeChangeRepo.List(ctx, domain.ListTribeChangesParams{
Sort: []domain.TribeChangeSort{
{By: domain.TribeChangeSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
for _, tc := range tcs {
if tc.CreatedAt.IsZero() {
return false
}
}
return cmp.Equal(tcs, expectedTribeChanges, cmpopts.IgnoreTypes(time.Time{}))
}, 60*time.Second, time.Second, "tribe changes - stage %d", stage)
}
runUpdateData(1)
runUpdateData(2)
}
func TestUpdateEnnoblements(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_update_ennoblements"
}
publisherMarshaler := adapter.JSONWatermillMarshaler{NewUUID: uuid.NewString}
consumerMarshaler := msg.JSONMarshaler{NewUUID: uuid.NewString}
rmqConn := newRabbitMQConnection(t)
nopLogger := watermill.NopLogger{}
serverPub, serverSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_ennoblements_server"),
generateExchangeAndRoutingKeyName,
)
ennoblementPub, ennoblementSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_update_ennoblements_ennoblement"),
generateExchangeAndRoutingKeyName,
)
// filesys is set later in this test to os.DirFS("./testdata/updateennoblements/stage{n}")
var filesys fs.FS
// client
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
url, err := urlpkg.Parse(r.URL.Query().Get("url"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
f, err := filesys.Open(path.Join(url.Host, url.Path+url.Query().Get("func")+url.Query().Get("since")))
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
defer func() {
_ = f.Close()
}()
w.WriteHeader(http.StatusOK)
_, _ = io.Copy(w, f)
}))
t.Cleanup(srv.Close)
client := adapter.NewTWHTTPClient(tw.NewClient(
tw.WithHTTPClient(&http.Client{
Transport: &urlChangerTransport{
url: srv.URL,
transport: srv.Client().Transport,
},
}),
tw.WithEnnoblementsUseInterfaceFunc(func(since time.Time) bool {
return !since.IsZero()
}),
))
// db
db := newBunDB(t)
require.NoError(t, bundbtest.NewFixture(db).Load(ctx, os.DirFS("./testdata/updateennoblements"), "fixture.yml"))
// repos
versionRepo := adapter.NewVersionBunRepository(db)
serverRepo := adapter.NewServerBunRepository(db)
ennoblementRepo := adapter.NewEnnoblementBunRepository(db)
// services
versionSvc := service.NewVersion(versionRepo)
serverSvc := service.NewServer(serverRepo, client)
ennoblementSvc := service.NewEnnoblement(ennoblementRepo, client)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
nil,
adapter.NewEnnoblementWatermillPublisher(ennoblementPub, publisherMarshaler),
nil,
nil,
)
runRouterWithContext(
t,
ctx,
msg.NewServerConsumer(consumerMarshaler, serverPub, serverSub, nopLogger, serverSvc),
msg.NewEnnoblementConsumer(consumerMarshaler, ennoblementPub, ennoblementSub, nopLogger, ennoblementSvc),
)
runUpdateEnnoblements := func(stage int64) {
filesys = os.DirFS("./testdata/updateennoblements/stage" + strconv.FormatInt(stage, 10))
require.NoError(t, jobSvc.UpdateEnnoblements(ctx))
var expectedEnnoblements []domain.Ennoblement
readJSONFile(t, filesys, path.Join("expected", "ennoblements.json"), &expectedEnnoblements)
assert.Eventuallyf(t, func() bool {
ennoblements, err := ennoblementRepo.List(ctx, domain.ListEnnoblementsParams{
Sort: []domain.EnnoblementSort{
{By: domain.EnnoblementSortByCreatedAt, Direction: domain.SortDirectionASC},
},
})
return err == nil && cmp.Equal(ennoblements, expectedEnnoblements)
}, 60*time.Second, time.Second, "ennoblements - stage %d", stage)
require.NoError(t, ctx.Err())
now := time.Now()
assert.Eventuallyf(t, func() bool {
servers, err := serverRepo.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
Open: domain.NullBool{
Bool: true,
Valid: true,
},
})
if err != nil {
return false
}
for _, s := range servers {
if !isWithinDuration(now, s.EnnoblementDataUpdatedAt, time.Minute) {
return false
}
}
return true
}, 60*time.Second, time.Second, "servers - stage %d", stage)
}
runUpdateEnnoblements(1)
runUpdateEnnoblements(2)
}
func TestCreateSnapshots(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_create_snapshots"
}
publisherMarshaler := adapter.JSONWatermillMarshaler{NewUUID: uuid.NewString}
consumerMarshaler := msg.JSONMarshaler{NewUUID: uuid.NewString}
rmqConn := newRabbitMQConnection(t)
nopLogger := watermill.NopLogger{}
serverPub, serverSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_server"),
generateExchangeAndRoutingKeyName,
)
playerPub, playerSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_player"),
generateExchangeAndRoutingKeyName,
)
tribePub, tribeSub := newRabbitMQPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_create_snapshots_tribe"),
generateExchangeAndRoutingKeyName,
)
// db
db := newBunDB(t)
require.NoError(t, bundbtest.NewFixture(db).Load(ctx, os.DirFS("./testdata/createsnapshots"), "fixture.yml"))
// repos
versionRepo := adapter.NewVersionBunRepository(db)
serverRepo := adapter.NewServerBunRepository(db)
playerRepo := adapter.NewPlayerBunRepository(db)
playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db)
tribeRepo := adapter.NewTribeBunRepository(db)
tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db)
// services
versionSvc := service.NewVersion(versionRepo)
serverSvc := service.NewServer(serverRepo, nil)
playerSvc := service.NewPlayer(playerRepo, nil, nil)
playerSnapshotSvc := service.NewPlayerSnapshot(playerSnapshotRepo, playerSvc)
tribeSvc := service.NewTribe(tribeRepo, nil)
tribeSnapshotSvc := service.NewTribeSnapshot(tribeSnapshotRepo, tribeSvc)
jobSvc := service.NewJob(
versionSvc,
serverSvc,
nil,
nil,
adapter.NewSnapshotWatermillPublisher(playerPub, publisherMarshaler),
nil,
)
runRouterWithContext(
t,
ctx,
msg.NewServerConsumer(consumerMarshaler, serverPub, serverSub, nopLogger, serverSvc),
msg.NewPlayerConsumer(consumerMarshaler, playerPub, playerSub, nopLogger, playerSvc, playerSnapshotSvc),
msg.NewTribeConsumer(consumerMarshaler, tribePub, tribeSub, nopLogger, tribeSvc, tribeSnapshotSvc),
)
require.NoError(t, jobSvc.CreateSnapshots(ctx))
now := time.Now()
assert.Eventually(t, func() bool {
servers, err := serverRepo.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
})
if err != nil {
return false
}
for _, s := range servers {
if !isWithinDuration(now, s.PlayerSnapshotsCreatedAt, time.Minute) ||
!isWithinDuration(now, s.TribeSnapshotsCreatedAt, time.Minute) {
return false
}
}
return true
}, 60*time.Second, time.Second, "servers")
require.NoError(t, ctx.Err())
assert.Eventually(t, func() bool {
tribes, err := tribeRepo.List(ctx, domain.ListTribesParams{
Sort: []domain.TribeSort{
{By: domain.TribeSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
snapshots, err := tribeSnapshotRepo.List(ctx, domain.ListTribeSnapshotsParams{
Sort: []domain.TribeSnapshotSort{
{By: domain.TribeSnapshotSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
if len(snapshots) != len(tribes) {
return false
}
for _, trb := range tribes {
idx := -1
for i, s := range snapshots {
if !cmp.Equal(domain.TribeSnapshot{
OpponentsDefeated: trb.OpponentsDefeated,
ID: 0,
TribeID: trb.ID,
ServerKey: trb.ServerKey,
NumMembers: trb.NumMembers,
NumVillages: trb.NumVillages,
Points: trb.Points,
AllPoints: trb.AllPoints,
Rank: trb.Rank,
Dominance: trb.Dominance,
Date: now,
CreatedAt: now,
}, s, cmpopts.IgnoreFields(domain.TribeSnapshot{}, "ID"), cmpopts.EquateApproxTime(24*time.Hour)) {
continue
}
idx = i
break
}
if idx == -1 {
return false
}
}
return true
}, 60*time.Second, time.Second, "tribe snapshots")
require.NoError(t, ctx.Err())
assert.Eventually(t, func() bool {
players, err := playerRepo.List(ctx, domain.ListPlayersParams{
Sort: []domain.PlayerSort{
{By: domain.PlayerSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
snapshots, err := playerSnapshotRepo.List(ctx, domain.ListPlayerSnapshotsParams{
Sort: []domain.PlayerSnapshotSort{
{By: domain.PlayerSnapshotSortByID, Direction: domain.SortDirectionASC},
},
})
if err != nil {
return false
}
if len(snapshots) != len(players) {
return false
}
for _, p := range players {
idx := -1
for i, s := range snapshots {
if !cmp.Equal(domain.PlayerSnapshot{
OpponentsDefeated: p.OpponentsDefeated,
ID: 0,
PlayerID: p.ID,
ServerKey: p.ServerKey,
NumVillages: p.NumVillages,
Points: p.Points,
Rank: p.Rank,
TribeID: p.TribeID,
Date: now,
CreatedAt: now,
}, s, cmpopts.IgnoreFields(domain.PlayerSnapshot{}, "ID"), cmpopts.EquateApproxTime(24*time.Hour)) {
continue
}
idx = i
break
}
if idx == -1 {
return false
}
}
return true
}, 60*time.Second, time.Second, "player snapshots")
}
type urlChangerTransport struct {
url string
transport http.RoundTripper
}
func (t *urlChangerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
u, err := urlpkg.Parse(t.url)
if err != nil {
return nil, err
}
u.RawQuery = urlpkg.Values{
"url": []string{r.URL.String()},
}.Encode()
r2 := r.Clone(r.Context())
r2.URL = u
return t.transport.RoundTrip(r2)
}
func readJSONFile(tb testing.TB, filesys fs.FS, path string, v any) {
tb.Helper()
f, err := filesys.Open(path)
require.NoError(tb, err)
defer func() {
_ = f.Close()
}()
require.NoError(tb, json.NewDecoder(f).Decode(v))
}
func isWithinDuration(expected, actual time.Time, delta time.Duration) bool {
dt := expected.Sub(actual)
if dt < -delta || dt > delta {
return false
}
return true
}