346 lines
10 KiB
Go
346 lines
10 KiB
Go
package port_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"slices"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/port"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest"
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
|
"github.com/brianvoe/gofakeit/v7"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestSnapshotCreation(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
if testing.Short() {
|
|
t.Skip("skipping long-running test")
|
|
}
|
|
|
|
ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// bun
|
|
db := postgres.NewDB(t)
|
|
buntest.NewFixture(db).Load(t, ctx, os.DirFS("./testdata/snapshotcreation"), "fixture.yml")
|
|
|
|
// watermill
|
|
marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID}
|
|
generateExchangeAndRoutingKeyName := func(topic string) string {
|
|
return topic + "_snapshots"
|
|
}
|
|
rmqConn := rabbitMQ.NewConnection(t)
|
|
nopLogger := watermill.NopLogger{}
|
|
_, serverSub := watermillamqptest.NewPubSub(
|
|
t,
|
|
rmqConn,
|
|
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_server"),
|
|
generateExchangeAndRoutingKeyName,
|
|
)
|
|
playerPub, playerSub := watermillamqptest.NewPubSub(
|
|
t,
|
|
rmqConn,
|
|
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_player"),
|
|
generateExchangeAndRoutingKeyName,
|
|
)
|
|
tribePub, tribeSub := watermillamqptest.NewPubSub(
|
|
t,
|
|
rmqConn,
|
|
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_tribe"),
|
|
generateExchangeAndRoutingKeyName,
|
|
)
|
|
|
|
// events/commands
|
|
tribeSnapshotCmdCreate := gofakeit.UUID()
|
|
tribeSnapshotEventCreated := gofakeit.UUID()
|
|
playerSnapshotCmdCreate := gofakeit.UUID()
|
|
playerSnapshotEventCreated := gofakeit.UUID()
|
|
|
|
// adapters
|
|
versionRepo := adapter.NewVersionBunRepository(db)
|
|
serverRepo := adapter.NewServerBunRepository(db)
|
|
tribeRepo := adapter.NewTribeBunRepository(db)
|
|
playerRepo := adapter.NewPlayerBunRepository(db)
|
|
tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db)
|
|
playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db)
|
|
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
tribePub,
|
|
marshaler,
|
|
tribeSnapshotCmdCreate,
|
|
tribeSnapshotEventCreated,
|
|
)
|
|
playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
|
|
playerPub,
|
|
marshaler,
|
|
playerSnapshotCmdCreate,
|
|
playerSnapshotEventCreated,
|
|
)
|
|
|
|
// services
|
|
versionSvc := app.NewVersionService(versionRepo)
|
|
serverSvc := app.NewServerService(serverRepo, nil, nil)
|
|
tribeSvc := app.NewTribeService(tribeRepo, nil, nil)
|
|
playerSvc := app.NewPlayerService(playerRepo, nil, nil, nil)
|
|
tribeSnapshotSvc := app.NewTribeSnapshotService(tribeSnapshotRepo, tribeSvc, tribeSnapshotPublisher)
|
|
playerSnapshotSvc := app.NewPlayerSnapshotService(playerSnapshotRepo, playerSvc, playerSnapshotPublisher)
|
|
snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher, playerSnapshotPublisher)
|
|
|
|
watermilltest.RunRouterWithContext(
|
|
t,
|
|
ctx,
|
|
port.NewServerWatermillConsumer(
|
|
serverSvc,
|
|
serverSub,
|
|
nopLogger,
|
|
marshaler,
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
tribeSnapshotEventCreated,
|
|
playerSnapshotEventCreated,
|
|
),
|
|
port.NewPlayerWatermillConsumer(
|
|
nil,
|
|
playerSnapshotSvc,
|
|
playerSub,
|
|
nopLogger,
|
|
marshaler,
|
|
"",
|
|
playerSnapshotCmdCreate,
|
|
),
|
|
port.NewTribeWatermillConsumer(
|
|
nil,
|
|
tribeSnapshotSvc,
|
|
tribeSub,
|
|
nopLogger,
|
|
marshaler,
|
|
"",
|
|
"",
|
|
tribeSnapshotCmdCreate,
|
|
),
|
|
)
|
|
|
|
require.NoError(t, snapshotSvc.Create(ctx))
|
|
|
|
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
|
require.NoError(collect, ctx.Err())
|
|
|
|
listParams := domain.NewListServersParams()
|
|
require.NoError(collect, listParams.SetSort([]domain.ServerSort{
|
|
domain.ServerSortKeyASC,
|
|
}))
|
|
require.NoError(collect, listParams.SetSpecial(domain.NullBool{
|
|
V: false,
|
|
Valid: true,
|
|
}))
|
|
require.NoError(collect, listParams.SetLimit(domain.ServerListMaxLimit))
|
|
|
|
for {
|
|
res, err := serverRepo.List(ctx, listParams)
|
|
require.NoError(collect, err)
|
|
|
|
for _, s := range res.Servers() {
|
|
assert.WithinDuration(collect, time.Now(), s.PlayerSnapshotsCreatedAt(), time.Minute, s.Key())
|
|
assert.WithinDuration(collect, time.Now(), s.TribeSnapshotsCreatedAt(), time.Minute, s.Key())
|
|
}
|
|
|
|
if res.Next().IsZero() {
|
|
return
|
|
}
|
|
|
|
require.NoError(collect, listParams.SetCursor(res.Next()))
|
|
}
|
|
}, 30*time.Second, 500*time.Millisecond, "servers")
|
|
|
|
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
|
require.NoError(collect, ctx.Err())
|
|
|
|
listTribesParams := domain.NewListTribesParams()
|
|
require.NoError(collect, listTribesParams.SetSort([]domain.TribeSort{
|
|
domain.TribeSortServerKeyASC,
|
|
domain.TribeSortIDASC,
|
|
}))
|
|
require.NoError(collect, listTribesParams.SetDeleted(domain.NullBool{
|
|
V: false,
|
|
Valid: true,
|
|
}))
|
|
require.NoError(collect, listTribesParams.SetLimit(domain.TribeListMaxLimit))
|
|
|
|
var allTribes domain.Tribes
|
|
|
|
for {
|
|
res, err := tribeRepo.List(ctx, listTribesParams)
|
|
require.NoError(collect, err)
|
|
|
|
allTribes = append(allTribes, res.Tribes()...)
|
|
|
|
if res.Next().IsZero() {
|
|
break
|
|
}
|
|
|
|
require.NoError(collect, listTribesParams.SetCursor(res.Next()))
|
|
}
|
|
|
|
listSnapshotsParams := domain.NewListTribeSnapshotsParams()
|
|
require.NoError(collect, listSnapshotsParams.SetSort([]domain.TribeSnapshotSort{
|
|
domain.TribeSnapshotSortServerKeyASC,
|
|
domain.TribeSnapshotSortDateASC,
|
|
domain.TribeSnapshotSortIDASC,
|
|
}))
|
|
require.NoError(collect, listSnapshotsParams.SetLimit(domain.TribeSnapshotListMaxLimit))
|
|
|
|
cnt := 0
|
|
|
|
for {
|
|
snapshots, err := tribeSnapshotRepo.List(ctx, listSnapshotsParams)
|
|
require.NoError(collect, err)
|
|
|
|
if len(snapshots) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, ts := range snapshots {
|
|
cnt++
|
|
msg := fmt.Sprintf("TribeID=%d,ServerKey=%s", ts.TribeID(), ts.ServerKey())
|
|
|
|
idx := slices.IndexFunc(allTribes, func(t domain.Tribe) bool {
|
|
return t.ID() == ts.TribeID() && t.ServerKey() == ts.ServerKey()
|
|
})
|
|
if !assert.GreaterOrEqual(
|
|
collect,
|
|
idx,
|
|
0,
|
|
msg,
|
|
) {
|
|
continue
|
|
}
|
|
|
|
tribe := allTribes[idx]
|
|
|
|
assert.NotZero(collect, ts.ID(), msg)
|
|
assert.Equal(collect, tribe.ID(), ts.TribeID(), msg)
|
|
assert.Equal(collect, tribe.ServerKey(), ts.ServerKey(), msg)
|
|
assert.Equal(collect, tribe.NumMembers(), ts.NumMembers(), msg)
|
|
assert.Equal(collect, tribe.NumVillages(), ts.NumVillages(), msg)
|
|
assert.Equal(collect, tribe.Points(), ts.Points(), msg)
|
|
assert.Equal(collect, tribe.AllPoints(), ts.AllPoints(), msg)
|
|
assert.Equal(collect, tribe.Rank(), ts.Rank(), msg)
|
|
assert.Equal(collect, tribe.OD(), ts.OD(), msg)
|
|
assert.InDelta(collect, tribe.Dominance(), ts.Dominance(), 0.001, msg)
|
|
assert.WithinDuration(collect, time.Now(), ts.CreatedAt(), time.Minute, msg)
|
|
assert.WithinDuration(collect, time.Now(), ts.Date(), 24*time.Hour, msg)
|
|
}
|
|
|
|
require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit()))
|
|
}
|
|
|
|
//nolint:testifylint
|
|
assert.Equal(collect, len(allTribes), cnt)
|
|
}, 30*time.Second, 500*time.Millisecond, "tribes")
|
|
|
|
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
|
|
require.NoError(collect, ctx.Err())
|
|
|
|
listPlayersParams := domain.NewListPlayersParams()
|
|
require.NoError(collect, listPlayersParams.SetSort([]domain.PlayerSort{
|
|
domain.PlayerSortServerKeyASC,
|
|
domain.PlayerSortIDASC,
|
|
}))
|
|
require.NoError(collect, listPlayersParams.SetDeleted(domain.NullBool{
|
|
V: false,
|
|
Valid: true,
|
|
}))
|
|
require.NoError(collect, listPlayersParams.SetLimit(domain.PlayerListMaxLimit))
|
|
|
|
var allPlayers domain.Players
|
|
|
|
for {
|
|
res, err := playerRepo.List(ctx, listPlayersParams)
|
|
require.NoError(collect, err)
|
|
|
|
allPlayers = append(allPlayers, res.Players()...)
|
|
|
|
if res.Next().IsZero() {
|
|
break
|
|
}
|
|
|
|
require.NoError(collect, listPlayersParams.SetCursor(res.Next()))
|
|
}
|
|
|
|
listSnapshotsParams := domain.NewListPlayerSnapshotsParams()
|
|
require.NoError(collect, listSnapshotsParams.SetSort([]domain.PlayerSnapshotSort{
|
|
domain.PlayerSnapshotSortServerKeyASC,
|
|
domain.PlayerSnapshotSortDateASC,
|
|
domain.PlayerSnapshotSortIDASC,
|
|
}))
|
|
require.NoError(collect, listSnapshotsParams.SetLimit(domain.PlayerSnapshotListMaxLimit))
|
|
|
|
cnt := 0
|
|
|
|
for {
|
|
snapshots, err := playerSnapshotRepo.List(ctx, listSnapshotsParams)
|
|
require.NoError(collect, err)
|
|
|
|
if len(snapshots) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, ps := range snapshots {
|
|
cnt++
|
|
msg := fmt.Sprintf("PlayerID=%d,ServerKey=%s", ps.PlayerID(), ps.ServerKey())
|
|
|
|
idx := slices.IndexFunc(allPlayers, func(p domain.Player) bool {
|
|
return p.ID() == ps.PlayerID() && p.ServerKey() == ps.ServerKey()
|
|
})
|
|
if !assert.GreaterOrEqual(
|
|
collect,
|
|
idx,
|
|
0,
|
|
msg,
|
|
) {
|
|
continue
|
|
}
|
|
|
|
player := allPlayers[idx]
|
|
|
|
assert.NotZero(collect, ps.ID(), msg)
|
|
assert.Equal(collect, player.ID(), ps.PlayerID(), msg)
|
|
assert.Equal(collect, player.ServerKey(), ps.ServerKey(), msg)
|
|
assert.Equal(collect, player.NumVillages(), ps.NumVillages(), msg)
|
|
assert.Equal(collect, player.Points(), ps.Points(), msg)
|
|
assert.Equal(collect, player.Rank(), ps.Rank(), msg)
|
|
assert.Equal(collect, player.TribeID(), ps.TribeID(), msg)
|
|
assert.Equal(collect, player.OD(), ps.OD(), msg)
|
|
assert.WithinDuration(collect, time.Now(), ps.CreatedAt(), time.Minute, msg)
|
|
assert.WithinDuration(collect, time.Now(), ps.Date(), 24*time.Hour, msg)
|
|
}
|
|
|
|
require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit()))
|
|
}
|
|
|
|
//nolint:testifylint
|
|
assert.Equal(collect, len(allPlayers), cnt)
|
|
}, 30*time.Second, 500*time.Millisecond, "players")
|
|
}
|