package port_test import ( "context" "os" "os/signal" "path" "sync" "syscall" "testing" "time" "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" "gitea.dwysokinski.me/twhelp/corev3/internal/app" "gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" "gitea.dwysokinski.me/twhelp/corev3/internal/port" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/brianvoe/gofakeit/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestDataCleanup(t *testing.T) { t.Parallel() if testing.Short() { t.Skip("skipping long-running test") } ctxTimeout, cancel := context.WithTimeout(context.Background(), 150*time.Second) defer cancel() ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM) defer stop() filesys := os.DirFS("./testdata/cleanup") // bun db := postgres.NewDB(t) buntest.NewFixture(db).Load(t, ctx, filesys, "fixture.yml") // watermill marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID} generateExchangeAndRoutingKeyName := func(topic string) string { return topic + "_data_cleanup" } rmqConn := rabbitMQ.NewConnection(t) nopLogger := watermill.NopLogger{} tribePub, tribeSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_data_cleanup_tribe"), generateExchangeAndRoutingKeyName, ) _, playerSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_data_cleanup_player"), generateExchangeAndRoutingKeyName, ) _, ennoblementSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_data_cleanup_ennoblement"), generateExchangeAndRoutingKeyName, ) // events/commands cmdCleanUpTopic := gofakeit.UUID() // adapters serverRepo := adapter.NewServerBunRepository(db) ennoblementRepo := adapter.NewEnnoblementBunRepository(db) tribeSnapshotRepo := adapter.NewTribeSnapshotBunRepository(db) playerSnapshotRepo := adapter.NewPlayerSnapshotBunRepository(db) dataCleanupPublisher := adapter.NewDataCleanupPublisher(tribePub, marshaler, cmdCleanUpTopic) // services serverSvc := app.NewServerService(serverRepo, nil, nil) ennoblementSvc := app.NewEnnoblementService(ennoblementRepo, nil, nil) tribeSnapshotSvc := app.NewTribeSnapshotService(tribeSnapshotRepo, nil, nil) playerSnapshotSvc := app.NewPlayerSnapshotService(playerSnapshotRepo, nil, nil) dataCleanupSvc := app.NewDataCleanupService(serverSvc, dataCleanupPublisher) watermilltest.RunRouterWithContext( t, ctx, port.NewTribeWatermillConsumer( nil, tribeSnapshotSvc, tribeSub, nopLogger, marshaler, "", "", "", cmdCleanUpTopic, ), port.NewPlayerWatermillConsumer( nil, playerSnapshotSvc, playerSub, nopLogger, marshaler, "", "", cmdCleanUpTopic, ), port.NewEnnoblementWatermillConsumer(ennoblementSvc, ennoblementSub, nopLogger, marshaler, "", cmdCleanUpTopic), ) require.NoError(t, dataCleanupSvc.CleanUp(ctx)) var wg sync.WaitGroup var expectedEnnoblements []int readJSONFile(t, filesys, path.Join("expected", "ennoblements.json"), &expectedEnnoblements) wg.Add(1) go func() { defer wg.Done() assert.EventuallyWithTf(t, func(collect *assert.CollectT) { var ids []int params := domain.NewListEnnoblementsParams() require.NoError(collect, params.SetSort([]domain.EnnoblementSort{ domain.EnnoblementSortServerKeyASC, domain.EnnoblementSortCreatedAtASC, domain.EnnoblementSortIDASC, })) for { res, err := ennoblementRepo.List(ctx, params) require.NoError(collect, err) for _, e := range res.Ennoblements() { ids = append(ids, e.ID()) } if res.Next().IsZero() { break } require.NoError(collect, params.SetCursor(res.Next())) } assert.Equal(collect, expectedEnnoblements, ids) }, 30*time.Second, 500*time.Millisecond, "ennoblements") }() var expectedTribeSnapshots []int readJSONFile(t, filesys, path.Join("expected", "tribe-snapshots.json"), &expectedTribeSnapshots) wg.Add(1) go func() { defer wg.Done() assert.EventuallyWithTf(t, func(collect *assert.CollectT) { var ids []int params := domain.NewListTribeSnapshotsParams() require.NoError(collect, params.SetSort([]domain.TribeSnapshotSort{ domain.TribeSnapshotSortServerKeyASC, domain.TribeSnapshotSortDateASC, domain.TribeSnapshotSortIDASC, })) for { res, err := tribeSnapshotRepo.List(ctx, params) require.NoError(collect, err) for _, ts := range res.TribeSnapshots() { ids = append(ids, ts.ID()) } if res.Next().IsZero() { break } require.NoError(collect, params.SetCursor(res.Next())) } assert.Equal(collect, expectedTribeSnapshots, ids) }, 30*time.Second, 500*time.Millisecond, "tribe snapshots") }() var expectedPlayerSnapshots []int readJSONFile(t, filesys, path.Join("expected", "player-snapshots.json"), &expectedPlayerSnapshots) wg.Add(1) go func() { defer wg.Done() assert.EventuallyWithTf(t, func(collect *assert.CollectT) { var ids []int params := domain.NewListPlayerSnapshotsParams() require.NoError(collect, params.SetSort([]domain.PlayerSnapshotSort{ domain.PlayerSnapshotSortServerKeyASC, domain.PlayerSnapshotSortDateASC, domain.PlayerSnapshotSortIDASC, })) for { res, err := playerSnapshotRepo.List(ctx, params) require.NoError(collect, err) for _, ps := range res.PlayerSnapshots() { ids = append(ids, ps.ID()) } if res.Next().IsZero() { break } require.NoError(collect, params.SetCursor(res.Next())) } assert.Equal(collect, expectedPlayerSnapshots, ids) }, 30*time.Second, 500*time.Millisecond, "player snapshots") }() wg.Wait() }