core/internal/port/consumer_snapshot_creation_...

346 lines
10 KiB
Go

package port_test
import (
"context"
"fmt"
"os"
"os/signal"
"slices"
"syscall"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/port"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSnapshotCreation(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM)
defer stop()
// bun
db := postgres.NewDB(t)
buntest.NewFixture(db).Load(t, ctx, os.DirFS("./testdata/snapshotcreation"), "fixture.yml")
// watermill
marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID}
generateExchangeAndRoutingKeyName := func(topic string) string {
return topic + "_snapshots"
}
rmqConn := rabbitMQ.NewConnection(t)
nopLogger := watermill.NopLogger{}
_, serverSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_server"),
generateExchangeAndRoutingKeyName,
)
playerPub, playerSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_player"),
generateExchangeAndRoutingKeyName,
)
tribePub, tribeSub := watermillamqptest.NewPubSub(
t,
rmqConn,
amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_tribe"),
generateExchangeAndRoutingKeyName,
)
// events/commands
tribeSnapshotCmdCreate := gofakeit.UUID()
tribeSnapshotEventCreated := gofakeit.UUID()
playerSnapshotCmdCreate := gofakeit.UUID()
playerSnapshotEventCreated := gofakeit.UUID()
// adapters
versionRepo := adapter.NewVersionBunRepository(db)
serverRepo := adapter.NewServerBunRepository(db)
tribeRepo := adapter.NewTribeBunRepository(db)
playerRepo := adapter.NewPlayerBunRepository(db)
tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db)
playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db)
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
tribePub,
marshaler,
tribeSnapshotCmdCreate,
tribeSnapshotEventCreated,
)
playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
playerPub,
marshaler,
playerSnapshotCmdCreate,
playerSnapshotEventCreated,
)
// services
versionSvc := app.NewVersionService(versionRepo)
serverSvc := app.NewServerService(serverRepo, nil, nil)
tribeSvc := app.NewTribeService(tribeRepo, nil, nil)
playerSvc := app.NewPlayerService(playerRepo, nil, nil, nil)
tribeSnapshotSvc := app.NewTribeSnapshotService(tribeSnapshotRepo, tribeSvc, tribeSnapshotPublisher)
playerSnapshotSvc := app.NewPlayerSnapshotService(playerSnapshotRepo, playerSvc, playerSnapshotPublisher)
snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher, playerSnapshotPublisher)
watermilltest.RunRouterWithContext(
t,
ctx,
port.NewServerWatermillConsumer(
serverSvc,
serverSub,
nopLogger,
marshaler,
"",
"",
"",
"",
"",
"",
tribeSnapshotEventCreated,
playerSnapshotEventCreated,
),
port.NewPlayerWatermillConsumer(
nil,
playerSnapshotSvc,
playerSub,
nopLogger,
marshaler,
"",
playerSnapshotCmdCreate,
),
port.NewTribeWatermillConsumer(
nil,
tribeSnapshotSvc,
tribeSub,
nopLogger,
marshaler,
"",
"",
tribeSnapshotCmdCreate,
),
)
require.NoError(t, snapshotSvc.Create(ctx))
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listParams := domain.NewListServersParams()
require.NoError(collect, listParams.SetSort([]domain.ServerSort{
domain.ServerSortKeyASC,
}))
require.NoError(collect, listParams.SetSpecial(domain.NullBool{
V: false,
Valid: true,
}))
require.NoError(collect, listParams.SetLimit(domain.ServerListMaxLimit))
for {
res, err := serverRepo.List(ctx, listParams)
require.NoError(collect, err)
for _, s := range res.Servers() {
assert.WithinDuration(collect, time.Now(), s.PlayerSnapshotsCreatedAt(), time.Minute, s.Key())
assert.WithinDuration(collect, time.Now(), s.TribeSnapshotsCreatedAt(), time.Minute, s.Key())
}
if res.Next().IsZero() {
return
}
require.NoError(collect, listParams.SetCursor(res.Next()))
}
}, 30*time.Second, 500*time.Millisecond, "servers")
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listTribesParams := domain.NewListTribesParams()
require.NoError(collect, listTribesParams.SetSort([]domain.TribeSort{
domain.TribeSortServerKeyASC,
domain.TribeSortIDASC,
}))
require.NoError(collect, listTribesParams.SetDeleted(domain.NullBool{
V: false,
Valid: true,
}))
require.NoError(collect, listTribesParams.SetLimit(domain.TribeListMaxLimit))
var allTribes domain.Tribes
for {
res, err := tribeRepo.List(ctx, listTribesParams)
require.NoError(collect, err)
allTribes = append(allTribes, res.Tribes()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listTribesParams.SetCursor(res.Next()))
}
listSnapshotsParams := domain.NewListTribeSnapshotsParams()
require.NoError(collect, listSnapshotsParams.SetSort([]domain.TribeSnapshotSort{
domain.TribeSnapshotSortServerKeyASC,
domain.TribeSnapshotSortDateASC,
domain.TribeSnapshotSortIDASC,
}))
require.NoError(collect, listSnapshotsParams.SetLimit(domain.TribeSnapshotListMaxLimit))
cnt := 0
for {
snapshots, err := tribeSnapshotRepo.List(ctx, listSnapshotsParams)
require.NoError(collect, err)
if len(snapshots) == 0 {
break
}
for _, ts := range snapshots {
cnt++
msg := fmt.Sprintf("TribeID=%d,ServerKey=%s", ts.TribeID(), ts.ServerKey())
idx := slices.IndexFunc(allTribes, func(t domain.Tribe) bool {
return t.ID() == ts.TribeID() && t.ServerKey() == ts.ServerKey()
})
if !assert.GreaterOrEqual(
collect,
idx,
0,
msg,
) {
continue
}
tribe := allTribes[idx]
assert.NotZero(collect, ts.ID(), msg)
assert.Equal(collect, tribe.ID(), ts.TribeID(), msg)
assert.Equal(collect, tribe.ServerKey(), ts.ServerKey(), msg)
assert.Equal(collect, tribe.NumMembers(), ts.NumMembers(), msg)
assert.Equal(collect, tribe.NumVillages(), ts.NumVillages(), msg)
assert.Equal(collect, tribe.Points(), ts.Points(), msg)
assert.Equal(collect, tribe.AllPoints(), ts.AllPoints(), msg)
assert.Equal(collect, tribe.Rank(), ts.Rank(), msg)
assert.Equal(collect, tribe.OD(), ts.OD(), msg)
assert.InDelta(collect, tribe.Dominance(), ts.Dominance(), 0.001, msg)
assert.WithinDuration(collect, time.Now(), ts.CreatedAt(), time.Minute, msg)
assert.WithinDuration(collect, time.Now(), ts.Date(), 24*time.Hour, msg)
}
require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit()))
}
//nolint:testifylint
assert.Equal(collect, len(allTribes), cnt)
}, 30*time.Second, 500*time.Millisecond, "tribes")
assert.EventuallyWithTf(t, func(collect *assert.CollectT) {
require.NoError(collect, ctx.Err())
listPlayersParams := domain.NewListPlayersParams()
require.NoError(collect, listPlayersParams.SetSort([]domain.PlayerSort{
domain.PlayerSortServerKeyASC,
domain.PlayerSortIDASC,
}))
require.NoError(collect, listPlayersParams.SetDeleted(domain.NullBool{
V: false,
Valid: true,
}))
require.NoError(collect, listPlayersParams.SetLimit(domain.PlayerListMaxLimit))
var allPlayers domain.Players
for {
res, err := playerRepo.List(ctx, listPlayersParams)
require.NoError(collect, err)
allPlayers = append(allPlayers, res.Players()...)
if res.Next().IsZero() {
break
}
require.NoError(collect, listPlayersParams.SetCursor(res.Next()))
}
listSnapshotsParams := domain.NewListPlayerSnapshotsParams()
require.NoError(collect, listSnapshotsParams.SetSort([]domain.PlayerSnapshotSort{
domain.PlayerSnapshotSortServerKeyASC,
domain.PlayerSnapshotSortDateASC,
domain.PlayerSnapshotSortIDASC,
}))
require.NoError(collect, listSnapshotsParams.SetLimit(domain.PlayerSnapshotListMaxLimit))
cnt := 0
for {
snapshots, err := playerSnapshotRepo.List(ctx, listSnapshotsParams)
require.NoError(collect, err)
if len(snapshots) == 0 {
break
}
for _, ps := range snapshots {
cnt++
msg := fmt.Sprintf("PlayerID=%d,ServerKey=%s", ps.PlayerID(), ps.ServerKey())
idx := slices.IndexFunc(allPlayers, func(p domain.Player) bool {
return p.ID() == ps.PlayerID() && p.ServerKey() == ps.ServerKey()
})
if !assert.GreaterOrEqual(
collect,
idx,
0,
msg,
) {
continue
}
player := allPlayers[idx]
assert.NotZero(collect, ps.ID(), msg)
assert.Equal(collect, player.ID(), ps.PlayerID(), msg)
assert.Equal(collect, player.ServerKey(), ps.ServerKey(), msg)
assert.Equal(collect, player.NumVillages(), ps.NumVillages(), msg)
assert.Equal(collect, player.Points(), ps.Points(), msg)
assert.Equal(collect, player.Rank(), ps.Rank(), msg)
assert.Equal(collect, player.TribeID(), ps.TribeID(), msg)
assert.Equal(collect, player.OD(), ps.OD(), msg)
assert.WithinDuration(collect, time.Now(), ps.CreatedAt(), time.Minute, msg)
assert.WithinDuration(collect, time.Now(), ps.Date(), 24*time.Hour, msg)
}
require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit()))
}
//nolint:testifylint
assert.Equal(collect, len(allPlayers), cnt)
}, 30*time.Second, 500*time.Millisecond, "players")
}