package port_test import ( "context" "fmt" "os" "os/signal" "slices" "syscall" "testing" "time" "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" "gitea.dwysokinski.me/twhelp/corev3/internal/app" "gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" "gitea.dwysokinski.me/twhelp/corev3/internal/port" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/brianvoe/gofakeit/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestSnapshotCreation(t *testing.T) { t.Parallel() if testing.Short() { t.Skip("skipping long-running test") } ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM) defer stop() // bun db := postgres.NewDB(t) buntest.NewFixture(db).Load(t, ctx, os.DirFS("./testdata/snapshotcreation"), "fixture.yml") // watermill marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID} generateExchangeAndRoutingKeyName := func(topic string) string { return topic + "_snapshots" } rmqConn := rabbitMQ.NewConnection(t) nopLogger := watermill.NopLogger{} _, serverSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_server"), generateExchangeAndRoutingKeyName, ) playerPub, playerSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_player"), generateExchangeAndRoutingKeyName, ) tribePub, tribeSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_snapshots_tribe"), generateExchangeAndRoutingKeyName, ) // events/commands tribeSnapshotCmdCreate := gofakeit.UUID() tribeSnapshotEventCreated := gofakeit.UUID() playerSnapshotCmdCreate := gofakeit.UUID() playerSnapshotEventCreated := gofakeit.UUID() // adapters versionRepo := adapter.NewVersionBunRepository(db) serverRepo := adapter.NewServerBunRepository(db) tribeRepo := adapter.NewTribeBunRepository(db) playerRepo := adapter.NewPlayerBunRepository(db) tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db) playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db) tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( tribePub, marshaler, tribeSnapshotCmdCreate, tribeSnapshotEventCreated, ) playerSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( playerPub, marshaler, playerSnapshotCmdCreate, playerSnapshotEventCreated, ) // services versionSvc := app.NewVersionService(versionRepo) serverSvc := app.NewServerService(serverRepo, nil, nil) tribeSvc := app.NewTribeService(tribeRepo, nil, nil) playerSvc := app.NewPlayerService(playerRepo, nil, nil, nil) tribeSnapshotSvc := app.NewTribeSnapshotService(tribeSnapshotRepo, tribeSvc, tribeSnapshotPublisher) playerSnapshotSvc := app.NewPlayerSnapshotService(playerSnapshotRepo, playerSvc, playerSnapshotPublisher) snapshotSvc := app.NewSnapshotService(versionSvc, serverSvc, tribeSnapshotPublisher, playerSnapshotPublisher) watermilltest.RunRouterWithContext( t, ctx, port.NewServerWatermillConsumer( serverSvc, serverSub, nopLogger, marshaler, "", "", "", "", "", "", tribeSnapshotEventCreated, playerSnapshotEventCreated, ), port.NewPlayerWatermillConsumer( nil, playerSnapshotSvc, playerSub, nopLogger, marshaler, "", playerSnapshotCmdCreate, ), port.NewTribeWatermillConsumer( nil, tribeSnapshotSvc, tribeSub, nopLogger, marshaler, "", "", tribeSnapshotCmdCreate, ), ) require.NoError(t, snapshotSvc.Create(ctx)) assert.EventuallyWithTf(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) listParams := domain.NewListServersParams() require.NoError(collect, listParams.SetSort([]domain.ServerSort{ domain.ServerSortKeyASC, })) require.NoError(collect, listParams.SetSpecial(domain.NullBool{ V: false, Valid: true, })) require.NoError(collect, listParams.SetLimit(domain.ServerListMaxLimit)) for { res, err := serverRepo.List(ctx, listParams) require.NoError(collect, err) for _, s := range res.Servers() { assert.WithinDuration(collect, time.Now(), s.PlayerSnapshotsCreatedAt(), time.Minute, s.Key()) assert.WithinDuration(collect, time.Now(), s.TribeSnapshotsCreatedAt(), time.Minute, s.Key()) } if res.Next().IsZero() { return } require.NoError(collect, listParams.SetCursor(res.Next())) } }, 30*time.Second, 500*time.Millisecond, "servers") assert.EventuallyWithTf(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) listTribesParams := domain.NewListTribesParams() require.NoError(collect, listTribesParams.SetSort([]domain.TribeSort{ domain.TribeSortServerKeyASC, domain.TribeSortIDASC, })) require.NoError(collect, listTribesParams.SetDeleted(domain.NullBool{ V: false, Valid: true, })) require.NoError(collect, listTribesParams.SetLimit(domain.TribeListMaxLimit)) var allTribes domain.Tribes for { res, err := tribeRepo.List(ctx, listTribesParams) require.NoError(collect, err) allTribes = append(allTribes, res.Tribes()...) if res.Next().IsZero() { break } require.NoError(collect, listTribesParams.SetCursor(res.Next())) } listSnapshotsParams := domain.NewListTribeSnapshotsParams() require.NoError(collect, listSnapshotsParams.SetSort([]domain.TribeSnapshotSort{ domain.TribeSnapshotSortServerKeyASC, domain.TribeSnapshotSortDateASC, domain.TribeSnapshotSortIDASC, })) require.NoError(collect, listSnapshotsParams.SetLimit(domain.TribeSnapshotListMaxLimit)) cnt := 0 for { snapshots, err := tribeSnapshotRepo.List(ctx, listSnapshotsParams) require.NoError(collect, err) if len(snapshots) == 0 { break } for _, ts := range snapshots { cnt++ msg := fmt.Sprintf("TribeID=%d,ServerKey=%s", ts.TribeID(), ts.ServerKey()) idx := slices.IndexFunc(allTribes, func(t domain.Tribe) bool { return t.ID() == ts.TribeID() && t.ServerKey() == ts.ServerKey() }) if !assert.GreaterOrEqual( collect, idx, 0, msg, ) { continue } tribe := allTribes[idx] assert.NotZero(collect, ts.ID(), msg) assert.Equal(collect, tribe.ID(), ts.TribeID(), msg) assert.Equal(collect, tribe.ServerKey(), ts.ServerKey(), msg) assert.Equal(collect, tribe.NumMembers(), ts.NumMembers(), msg) assert.Equal(collect, tribe.NumVillages(), ts.NumVillages(), msg) assert.Equal(collect, tribe.Points(), ts.Points(), msg) assert.Equal(collect, tribe.AllPoints(), ts.AllPoints(), msg) assert.Equal(collect, tribe.Rank(), ts.Rank(), msg) assert.Equal(collect, tribe.OD(), ts.OD(), msg) assert.InDelta(collect, tribe.Dominance(), ts.Dominance(), 0.001, msg) assert.WithinDuration(collect, time.Now(), ts.CreatedAt(), time.Minute, msg) assert.WithinDuration(collect, time.Now(), ts.Date(), 24*time.Hour, msg) } require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit())) } //nolint:testifylint assert.Equal(collect, len(allTribes), cnt) }, 30*time.Second, 500*time.Millisecond, "tribes") assert.EventuallyWithTf(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) listPlayersParams := domain.NewListPlayersParams() require.NoError(collect, listPlayersParams.SetSort([]domain.PlayerSort{ domain.PlayerSortServerKeyASC, domain.PlayerSortIDASC, })) require.NoError(collect, listPlayersParams.SetDeleted(domain.NullBool{ V: false, Valid: true, })) require.NoError(collect, listPlayersParams.SetLimit(domain.PlayerListMaxLimit)) var allPlayers domain.Players for { res, err := playerRepo.List(ctx, listPlayersParams) require.NoError(collect, err) allPlayers = append(allPlayers, res.Players()...) if res.Next().IsZero() { break } require.NoError(collect, listPlayersParams.SetCursor(res.Next())) } listSnapshotsParams := domain.NewListPlayerSnapshotsParams() require.NoError(collect, listSnapshotsParams.SetSort([]domain.PlayerSnapshotSort{ domain.PlayerSnapshotSortServerKeyASC, domain.PlayerSnapshotSortDateASC, domain.PlayerSnapshotSortIDASC, })) require.NoError(collect, listSnapshotsParams.SetLimit(domain.PlayerSnapshotListMaxLimit)) cnt := 0 for { snapshots, err := playerSnapshotRepo.List(ctx, listSnapshotsParams) require.NoError(collect, err) if len(snapshots) == 0 { break } for _, ps := range snapshots { cnt++ msg := fmt.Sprintf("PlayerID=%d,ServerKey=%s", ps.PlayerID(), ps.ServerKey()) idx := slices.IndexFunc(allPlayers, func(p domain.Player) bool { return p.ID() == ps.PlayerID() && p.ServerKey() == ps.ServerKey() }) if !assert.GreaterOrEqual( collect, idx, 0, msg, ) { continue } player := allPlayers[idx] assert.NotZero(collect, ps.ID(), msg) assert.Equal(collect, player.ID(), ps.PlayerID(), msg) assert.Equal(collect, player.ServerKey(), ps.ServerKey(), msg) assert.Equal(collect, player.NumVillages(), ps.NumVillages(), msg) assert.Equal(collect, player.Points(), ps.Points(), msg) assert.Equal(collect, player.Rank(), ps.Rank(), msg) assert.Equal(collect, player.TribeID(), ps.TribeID(), msg) assert.Equal(collect, player.OD(), ps.OD(), msg) assert.WithinDuration(collect, time.Now(), ps.CreatedAt(), time.Minute, msg) assert.WithinDuration(collect, time.Now(), ps.Date(), 24*time.Hour, msg) } require.NoError(collect, listSnapshotsParams.SetOffset(listSnapshotsParams.Offset()+listSnapshotsParams.Limit())) } //nolint:testifylint assert.Equal(collect, len(allPlayers), cnt) }, 30*time.Second, 500*time.Millisecond, "players") }