core/internal/port/consumer_data_sync_test.go

640 lines
21 KiB
Go

package port_test
import (
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"net/url"
"os"
"os/signal"
"path"
"sync"
"syscall"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/port"
"gitea.dwysokinski.me/twhelp/corev3/internal/tw"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/brianvoe/gofakeit/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// we can't run the subtests in parallel
//nolint:tparallel
func TestDataSync(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 150*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
// bun
db := postgres.NewDB(t)
// watermill
marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID}
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_data_sync"
}
rmqConn := rabbitMQ.NewConnection(t)
nopLogger := watermill.NopLogger{}
serverPub, serverSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_data_sync_server"),
generateExchangeAndRoutingKeyName,
)
playerPub, playerSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_data_sync_player"),
generateExchangeAndRoutingKeyName,
)
tribePub, tribeSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_data_sync_tribe"),
generateExchangeAndRoutingKeyName,
)
villagePub, villageSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_data_sync_village"),
generateExchangeAndRoutingKeyName,
)
// events/commands
serverCmdSync := gofakeit.UUID()
serverEventSynced := gofakeit.UUID()
playerEventSynced := gofakeit.UUID()
tribeEventSynced := gofakeit.UUID()
villageEventSynced := gofakeit.UUID()
// TW service
// filesys is set later in this test to os.DirFS("./testdata/datasync/stage{n}")
var filesys fs.FS
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
u, err := url.Parse(r.URL.Query().Get("url"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
fname := u.Path + u.Query().Get("func")
f, err := filesys.Open(path.Join(u.Host, fname))
if err != nil {
f, err = filesys.Open(path.Join("default", fname))
}
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
defer func() {
_ = f.Close()
}()
w.WriteHeader(http.StatusOK)
_, _ = io.Copy(w, f)
}))
t.Cleanup(srv.Close)
twSvc := adapter.NewTWHTTP(tw.NewClient(tw.WithHTTPClient(&http.Client{
Transport: &urlChangerTransport{
url: srv.URL,
transport: srv.Client().Transport,
},
})))
// adapters
versionRepo := adapter.NewVersionBunRepository(db)
serverRepo := adapter.NewServerBunRepository(db)
tribeRepo := adapter.NewTribeBunRepository(db)
playerRepo := adapter.NewPlayerBunRepository(db)
villageRepo := adapter.NewVillageBunRepository(db)
tribeChangeRepo := adapter.NewTribeChangeBunRepository(db)
serverPublisher := adapter.NewServerWatermillPublisher(serverPub, marshaler, serverCmdSync, serverEventSynced)
playerPublisher := adapter.NewPlayerWatermillPublisher(playerPub, marshaler, playerEventSynced)
tribePublisher := adapter.NewTribeWatermillPublisher(tribePub, marshaler, tribeEventSynced)
villagePublisher := adapter.NewVillageWatermillPublisher(villagePub, marshaler, villageEventSynced)
// services
versionSvc := app.NewVersionService(versionRepo)
serverSvc := app.NewServerService(serverRepo, twSvc, serverPublisher)
tribeChangeSvc := app.NewTribeChangeService(tribeChangeRepo)
tribeSvc := app.NewTribeService(tribeRepo, twSvc, tribePublisher)
playerSvc := app.NewPlayerService(playerRepo, tribeChangeSvc, twSvc, playerPublisher)
villageSvc := app.NewVillageService(villageRepo, twSvc, villagePublisher)
dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, serverPublisher, nil)
watermilltest.RunRouterWithContext(
t,
ctx,
port.NewServerWatermillConsumer(
serverSvc,
serverSub,
nopLogger,
marshaler,
serverCmdSync,
serverEventSynced,
tribeEventSynced,
playerEventSynced,
villageEventSynced,
"",
"",
"",
),
port.NewTribeWatermillConsumer(
tribeSvc,
nil,
tribeSub,
nopLogger,
marshaler,
serverEventSynced,
villageEventSynced,
"",
"",
),
port.NewPlayerWatermillConsumer(
playerSvc,
nil,
playerSub,
nopLogger,
marshaler,
serverEventSynced,
"",
"",
),
port.NewVillageWatermillConsumer(villageSvc, villageSub, nopLogger, marshaler, serverEventSynced),
)
for _, stage := range []uint{1, 2} {
t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) {
filesys = os.DirFS(fmt.Sprintf("./testdata/datasync/stage%d", stage))
require.NoError(t, dataSyncSvc.Sync(ctx))
var wg sync.WaitGroup
var expectedServers []map[string]any
readJSONFile(t, filesys, path.Join("expected", "servers.json"), &expectedServers)
wg.Add(1)
go func() {
defer wg.Done()
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListServersParams()
require.NoError(collect, listParams.SetSort([]domain.ServerSort{
domain.ServerSortKeyASC,
}))
require.NoError(collect, listParams.SetSpecial(domain.NullBool{
V: false,
Valid: true,
}))
require.NoError(collect, listParams.SetLimit(domain.ServerListMaxLimit))
allServers := make(domain.Servers, 0, len(expectedServers))
for {
res, err := serverRepo.List(ctx, listParams)
require.NoError(collect, err)
allServers = append(allServers, res.Servers()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
if !assert.Len(collect, allServers, len(expectedServers)) {
return
}
for i, expected := range expectedServers {
actual := allServers[i]
msg := fmt.Sprintf("Key=%s", expected["Key"])
assert.Equal(collect, expected["Key"], actual.Key(), msg)
assert.Equal(collect, expected["URL"], actual.URL().String(), msg)
assert.Equal(collect, expected["Open"], actual.Open(), msg)
assert.Equal(collect, expected["VersionCode"], actual.VersionCode(), msg)
assert.EqualValues(collect, expected["NumPlayers"], actual.NumPlayers(), msg)
assert.EqualValues(collect, expected["NumTribes"], actual.NumTribes(), msg)
assert.EqualValues(collect, expected["NumVillages"], actual.NumVillages(), msg)
assert.EqualValues(collect, expected["NumPlayerVillages"], actual.NumPlayerVillages(), msg)
assert.EqualValues(collect, expected["NumBonusVillages"], actual.NumBonusVillages(), msg)
assert.EqualValues(
collect,
expected["NumBarbarianVillages"],
actual.NumBarbarianVillages(),
"Key=%s",
expected["Key"],
)
assert.WithinDuration(collect, time.Now(), actual.PlayerDataSyncedAt(), time.Minute, msg)
assert.WithinDuration(collect, time.Now(), actual.TribeDataSyncedAt(), time.Minute, msg)
assert.WithinDuration(collect, time.Now(), actual.VillageDataSyncedAt(), time.Minute, msg)
assert.JSONEqf(
collect,
string(marshalJSON(collect, expected["Config"])),
string(marshalJSON(collect, serverConfigToMap(actual.Config()))),
"Key=%s",
expected["Key"],
)
assert.JSONEq(
collect,
string(marshalJSON(collect, expected["BuildingInfo"])),
string(marshalJSON(collect, buildingInfoToMap(actual.BuildingInfo()))),
msg,
)
assert.JSONEq(
collect,
string(marshalJSON(collect, expected["UnitInfo"])),
string(marshalJSON(collect, unitInfoToMap(actual.UnitInfo()))),
msg,
)
}
}, 60*time.Second, time.Second, "servers")
}()
var expectedTribes []map[string]any
readJSONFile(t, filesys, path.Join("expected", "tribes.json"), &expectedTribes)
wg.Add(1)
go func() {
defer wg.Done()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListTribesParams()
require.NoError(collect, listParams.SetSort([]domain.TribeSort{
domain.TribeSortServerKeyASC,
domain.TribeSortIDASC,
}))
require.NoError(collect, listParams.SetLimit(domain.TribeListMaxLimit))
allTribes := make(domain.Tribes, 0, len(expectedTribes))
for {
res, err := tribeRepo.List(ctx, listParams)
require.NoError(collect, err)
allTribes = append(allTribes, res.Tribes()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
if !assert.Len(collect, allTribes, len(expectedTribes)) {
return
}
for i, expected := range expectedTribes {
actual := allTribes[i]
msg := fmt.Sprintf("ID=%.0f,ServerKey=%s", expected["ID"], expected["ServerKey"])
assert.EqualValues(collect, expected["ID"], actual.ID(), msg)
assert.Equal(collect, expected["ServerKey"], actual.ServerKey(), msg)
assert.Equal(collect, expected["Name"], actual.Name(), msg)
assert.Equal(collect, expected["Tag"], actual.Tag(), msg)
assert.EqualValues(collect, expected["NumMembers"], actual.NumMembers(), msg)
assert.EqualValues(collect, expected["NumVillages"], actual.NumVillages(), msg)
assert.EqualValues(collect, expected["Points"], actual.Points(), msg)
assert.EqualValues(collect, expected["AllPoints"], actual.AllPoints(), msg)
assert.EqualValues(collect, expected["Rank"], actual.Rank(), msg)
assert.EqualValues(collect, expected["RankAtt"], actual.OD().RankAtt(), msg)
assert.EqualValues(collect, expected["ScoreAtt"], actual.OD().ScoreAtt(), msg)
assert.EqualValues(collect, expected["RankDef"], actual.OD().RankDef(), msg)
assert.EqualValues(collect, expected["ScoreDef"], actual.OD().ScoreDef(), msg)
assert.EqualValues(collect, expected["RankSup"], actual.OD().RankSup(), msg)
assert.EqualValues(collect, expected["ScoreSup"], actual.OD().ScoreSup(), msg)
assert.EqualValues(collect, expected["RankTotal"], actual.OD().RankTotal(), msg)
assert.EqualValues(collect, expected["ScoreTotal"], actual.OD().ScoreTotal(), msg)
assert.Equal(collect, expected["ProfileURL"], actual.ProfileURL().String(), msg)
assert.InDelta(collect, expected["Dominance"], actual.Dominance(), 0.01, msg)
assert.EqualValues(collect, expected["BestRank"], actual.BestRank(), msg)
assert.NotEmpty(collect, actual.BestRankAt(), msg)
assert.EqualValues(collect, expected["MostPoints"], actual.MostPoints(), msg)
assert.NotEmpty(collect, actual.MostPointsAt(), msg)
assert.EqualValues(collect, expected["MostVillages"], actual.MostVillages(), msg)
assert.NotEmpty(collect, actual.MostVillagesAt(), msg)
if deletedAt, ok := expected["DeletedAt"].(string); ok && (time.Time{}).Format(time.RFC3339) != deletedAt {
assert.NotEmpty(collect, actual.DeletedAt(), msg)
} else {
assert.Empty(collect, actual.DeletedAt(), msg)
}
}
}, 60*time.Second, time.Second, "tribes")
}()
var expectedPlayers []map[string]any
readJSONFile(t, filesys, path.Join("expected", "players.json"), &expectedPlayers)
wg.Add(1)
go func() {
defer wg.Done()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListPlayersParams()
require.NoError(collect, listParams.SetSort([]domain.PlayerSort{
domain.PlayerSortServerKeyASC,
domain.PlayerSortIDASC,
}))
require.NoError(collect, listParams.SetLimit(domain.PlayerListMaxLimit))
allPlayers := make(domain.Players, 0, len(expectedPlayers))
for {
res, err := playerRepo.List(ctx, listParams)
require.NoError(collect, err)
allPlayers = append(allPlayers, res.Players()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
if !assert.Len(collect, allPlayers, len(expectedPlayers)) {
return
}
for i, expected := range expectedPlayers {
actual := allPlayers[i]
msg := fmt.Sprintf("ID=%.0f,ServerKey=%s", expected["ID"], expected["ServerKey"])
assert.EqualValues(collect, expected["ID"], actual.ID(), msg)
assert.Equal(collect, expected["ServerKey"], actual.ServerKey(), msg)
assert.Equal(collect, expected["Name"], actual.Name(), msg)
assert.EqualValues(collect, expected["NumVillages"], actual.NumVillages(), msg)
assert.EqualValues(collect, expected["Points"], actual.Points(), msg)
assert.EqualValues(collect, expected["Rank"], actual.Rank(), msg)
assert.EqualValues(collect, expected["TribeID"], actual.TribeID(), msg)
assert.EqualValues(collect, expected["RankAtt"], actual.OD().RankAtt(), msg)
assert.EqualValues(collect, expected["ScoreAtt"], actual.OD().ScoreAtt(), msg)
assert.EqualValues(collect, expected["RankDef"], actual.OD().RankDef(), msg)
assert.EqualValues(collect, expected["ScoreDef"], actual.OD().ScoreDef(), msg)
assert.EqualValues(collect, expected["RankSup"], actual.OD().RankSup(), msg)
assert.EqualValues(collect, expected["ScoreSup"], actual.OD().ScoreSup(), msg)
assert.EqualValues(collect, expected["RankTotal"], actual.OD().RankTotal(), msg)
assert.EqualValues(collect, expected["ScoreTotal"], actual.OD().ScoreTotal(), msg)
assert.Equal(collect, expected["ProfileURL"], actual.ProfileURL().String(), msg)
assert.EqualValues(collect, expected["BestRank"], actual.BestRank(), msg)
assert.NotEmpty(collect, actual.BestRankAt(), msg)
assert.EqualValues(collect, expected["MostPoints"], actual.MostPoints(), msg)
assert.NotEmpty(collect, actual.MostPointsAt(), msg)
assert.EqualValues(collect, expected["MostVillages"], actual.MostVillages(), msg)
assert.NotEmpty(collect, actual.MostVillagesAt(), msg)
if deletedAt, ok := expected["DeletedAt"].(string); ok && (time.Time{}).Format(time.RFC3339) != deletedAt {
assert.NotEmpty(collect, actual.DeletedAt(), msg)
} else {
assert.Empty(collect, actual.DeletedAt(), msg)
}
}
}, 60*time.Second, time.Second, "players")
}()
var expectedVillages []map[string]any
readJSONFile(t, filesys, path.Join("expected", "villages.json"), &expectedVillages)
wg.Add(1)
go func() {
defer wg.Done()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListVillagesParams()
require.NoError(collect, listParams.SetSort([]domain.VillageSort{
domain.VillageSortServerKeyASC,
domain.VillageSortIDASC,
}))
require.NoError(collect, listParams.SetLimit(domain.VillageListMaxLimit))
allVillages := make(domain.Villages, 0, len(expectedVillages))
for {
res, err := villageRepo.List(ctx, listParams)
require.NoError(collect, err)
allVillages = append(allVillages, res.Villages()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
if !assert.Len(collect, allVillages, len(expectedVillages)) {
return
}
for i, expected := range expectedVillages {
actual := allVillages[i]
msg := fmt.Sprintf("ID=%.0f,ServerKey=%s", expected["ID"], expected["ServerKey"])
assert.EqualValues(collect, expected["ID"], actual.ID(), msg)
assert.Equal(collect, expected["ServerKey"], actual.ServerKey(), msg)
assert.Equal(collect, expected["Name"], actual.Name(), msg)
assert.EqualValues(collect, expected["Points"], actual.Points(), msg)
assert.EqualValues(collect, expected["X"], actual.X(), msg)
assert.EqualValues(collect, expected["Y"], actual.Y(), msg)
assert.Equal(collect, expected["Continent"], actual.Continent(), msg)
assert.EqualValues(collect, expected["Bonus"], actual.Bonus(), msg)
assert.EqualValues(collect, expected["PlayerID"], actual.PlayerID(), msg)
assert.Equal(collect, expected["ProfileURL"], actual.ProfileURL().String(), msg)
}
}, 60*time.Second, time.Second, "villages")
}()
var expectedTribeChanges []map[string]any
readJSONFile(t, filesys, path.Join("expected", "tribe-changes.json"), &expectedTribeChanges)
wg.Add(1)
go func() {
defer wg.Done()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListTribeChangesParams()
require.NoError(collect, listParams.SetSort([]domain.TribeChangeSort{
domain.TribeChangeSortIDASC,
}))
require.NoError(collect, listParams.SetLimit(domain.TribeChangeListMaxLimit))
allTribeChanges := make(domain.TribeChanges, 0, len(expectedTribeChanges))
for {
res, err := tribeChangeRepo.List(ctx, listParams)
require.NoError(collect, err)
allTribeChanges = append(allTribeChanges, res.TribeChanges()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
if !assert.Len(collect, allTribeChanges, len(expectedTribeChanges)) {
return
}
for i, expected := range expectedTribeChanges {
actual := allTribeChanges[i]
msg := fmt.Sprintf("ID=%.0f,ServerKey=%s", expected["ID"], expected["ServerKey"])
assert.EqualValues(collect, expected["ID"], actual.ID(), msg)
assert.Equal(collect, expected["ServerKey"], actual.ServerKey(), msg)
assert.EqualValues(collect, expected["PlayerID"], actual.PlayerID(), msg)
assert.EqualValues(collect, expected["OldTribeID"], actual.OldTribeID(), msg)
assert.EqualValues(collect, expected["NewTribeID"], actual.NewTribeID(), msg)
}
}, 60*time.Second, time.Second, "tribe changes")
}()
wg.Wait()
})
}
}
type urlChangerTransport struct {
url string
transport http.RoundTripper
}
func (t *urlChangerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
u, err := url.Parse(t.url)
if err != nil {
return nil, err
}
u.RawQuery = url.Values{
"url": []string{r.URL.String()},
}.Encode()
r2 := r.Clone(r.Context())
r2.URL = u
return t.transport.RoundTrip(r2)
}
func readJSONFile(tb testing.TB, filesys fs.FS, name string, v any) {
tb.Helper()
f, err := filesys.Open(name)
require.NoError(tb, err)
defer func() {
_ = f.Close()
}()
require.NoError(tb, json.NewDecoder(f).Decode(v))
}
func serverConfigToMap(cfg domain.ServerConfig) map[string]any {
return map[string]any{
"Speed": cfg.Speed(),
"UnitSpeed": cfg.UnitSpeed(),
"Moral": cfg.Moral(),
"Build": cfg.Build(),
"Misc": cfg.Misc(),
"Commands": cfg.Commands(),
"Newbie": cfg.Newbie(),
"Game": cfg.Game(),
"Buildings": cfg.Buildings(),
"Snob": cfg.Snob(),
"Ally": cfg.Ally(),
"Coord": cfg.Coord(),
"Sitter": cfg.Sitter(),
"Sleep": cfg.Sleep(),
"Night": cfg.Night(),
"Win": cfg.Win(),
}
}
func unitInfoToMap(info domain.UnitInfo) map[string]any {
return map[string]any{
"Spear": info.Spear(),
"Sword": info.Sword(),
"Axe": info.Axe(),
"Archer": info.Archer(),
"Spy": info.Spy(),
"Light": info.Light(),
"Marcher": info.Marcher(),
"Heavy": info.Heavy(),
"Ram": info.Ram(),
"Catapult": info.Catapult(),
"Knight": info.Knight(),
"Snob": info.Snob(),
"Militia": info.Militia(),
}
}
func buildingInfoToMap(info domain.BuildingInfo) map[string]any {
return map[string]any{
"Main": info.Main(),
"Barracks": info.Barracks(),
"Stable": info.Stable(),
"Garage": info.Garage(),
"Watchtower": info.Watchtower(),
"Snob": info.Snob(),
"Smith": info.Smith(),
"Place": info.Place(),
"Statue": info.Statue(),
"Market": info.Market(),
"Wood": info.Wood(),
"Stone": info.Stone(),
"Iron": info.Iron(),
"Farm": info.Farm(),
"Storage": info.Storage(),
"Hide": info.Hide(),
"Wall": info.Wall(),
}
}
func marshalJSON(tb require.TestingT, v any) []byte {
if h, ok := tb.(interface {
Helper()
}); ok {
h.Helper()
}
b, err := json.Marshal(v)
require.NoError(tb, err)
return b
}