package service_test import ( "context" "fmt" "io" "io/fs" "net/http" "net/http/httptest" "net/url" "os" "os/signal" "path" "syscall" "testing" "time" "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" "gitea.dwysokinski.me/twhelp/corev3/internal/app" "gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" "gitea.dwysokinski.me/twhelp/corev3/internal/port" "gitea.dwysokinski.me/twhelp/corev3/internal/tw" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillamqptest" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermilltest" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/brianvoe/gofakeit/v6" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // we can't run the subtests in parallel //nolint:tparallel func TestEnnoblementSync(t *testing.T) { t.Parallel() if testing.Short() { t.Skip("skipping long-running test") } ctxTimeout, cancel := context.WithTimeout(context.Background(), 150*time.Second) defer cancel() ctx, stop := signal.NotifyContext(ctxTimeout, os.Interrupt, syscall.SIGTERM) defer stop() // bun db := postgres.NewDB(t) buntest.NewFixture(db).Load(t, ctx, os.DirFS("./testdata/ennoblementsync"), "fixture.yml") // watermill marshaler := watermillmsg.JSONMarshaler{NewUUID: watermill.NewUUID} generateExchangeAndRoutingKeyName := func(topic string) string { return topic + "_ennoblement_sync" } rmqConn := rabbitMQ.NewConnection(t) nopLogger := watermill.NopLogger{} _, serverSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_ennoblement_sync_server"), generateExchangeAndRoutingKeyName, ) ennoblementPub, ennoblementSub := watermillamqptest.NewPubSub( t, rmqConn, amqp.GenerateQueueNameTopicNameWithSuffix("_ennoblement_sync_ennoblement"), generateExchangeAndRoutingKeyName, ) // events/commands ennoblementCmdSync := gofakeit.UUID() ennoblementEventSynced := gofakeit.UUID() // filesys is set later in this test to os.DirFS("./testdata/ennoblementsync/stage{n}") var filesys fs.FS srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } u, err := url.Parse(r.URL.Query().Get("url")) if err != nil { w.WriteHeader(http.StatusBadRequest) return } f, err := filesys.Open(path.Join(u.Host, u.Path+u.Query().Get("func")+u.Query().Get("since"))) if err != nil { w.WriteHeader(http.StatusNotFound) return } defer func() { _ = f.Close() }() w.WriteHeader(http.StatusOK) _, _ = io.Copy(w, f) })) t.Cleanup(srv.Close) twSvc := adapter.NewTWHTTP(tw.NewClient( tw.WithHTTPClient(&http.Client{ Transport: &urlChangerTransport{ url: srv.URL, transport: srv.Client().Transport, }, }), tw.WithEnnoblementsUseInterfaceFunc(func(since time.Time) bool { return !since.IsZero() }), )) // adapters versionRepo := adapter.NewVersionBunRepository(db) serverRepo := adapter.NewServerBunRepository(db) ennoblementRepo := adapter.NewEnnoblementBunRepository(db) ennoblementPublisher := adapter.NewEnnoblementWatermillPublisher( ennoblementPub, marshaler, ennoblementCmdSync, ennoblementEventSynced, ) // services versionSvc := app.NewVersionService(versionRepo) serverSvc := app.NewServerService(serverRepo, twSvc, nil) ennoblementSvc := app.NewEnnoblementService(ennoblementRepo, twSvc, ennoblementPublisher) dataSyncSvc := app.NewDataSyncService(versionSvc, serverSvc, nil, ennoblementPublisher) watermilltest.RunRouterWithContext( t, ctx, port.NewServerWatermillConsumer( serverSvc, serverSub, nopLogger, marshaler, "", "", "", "", "", ennoblementEventSynced, ), port.NewEnnoblementWatermillConsumer(ennoblementSvc, ennoblementSub, nopLogger, marshaler, ennoblementCmdSync), ) for _, stage := range []uint{1, 2} { t.Run(fmt.Sprintf("stage %d", stage), func(t *testing.T) { filesys = os.DirFS(fmt.Sprintf("./testdata/ennoblementsync/stage%d", stage)) require.NoError(t, dataSyncSvc.SyncEnnoblements(ctx)) assert.EventuallyWithTf(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) listParams := domain.NewListServersParams() require.NoError(collect, listParams.SetSort([]domain.ServerSort{ domain.ServerSortKeyASC, })) require.NoError(collect, listParams.SetSpecial(domain.NullBool{ Value: false, Valid: true, })) require.NoError(collect, listParams.SetLimit(domain.ServerListMaxLimit)) for { servers, err := serverRepo.List(ctx, listParams) require.NoError(collect, err) if len(servers) == 0 { break } for _, s := range servers { assert.WithinDuration(collect, time.Now(), s.EnnoblementDataSyncedAt(), time.Minute) } require.NoError(collect, listParams.SetKeyGT(domain.NullString{ Value: servers[len(servers)-1].Key(), Valid: true, })) } }, 30*time.Second, time.Second, "servers") var expectedEnnoblements []map[string]any readJSONFile(t, filesys, path.Join("expected", "ennoblements.json"), &expectedEnnoblements) assert.EventuallyWithT(t, func(collect *assert.CollectT) { require.NoError(collect, ctx.Err()) listParams := domain.NewListEnnoblementsParams() require.NoError(collect, listParams.SetSort([]domain.EnnoblementSort{ domain.EnnoblementSortServerKeyASC, domain.EnnoblementSortCreatedAtASC, domain.EnnoblementSortIDASC, })) require.NoError(collect, listParams.SetLimit(domain.EnnoblementListMaxLimit)) allEnnoblements := make(domain.Ennoblements, 0, len(expectedEnnoblements)) for { ennoblements, err := ennoblementRepo.List(ctx, listParams) require.NoError(collect, err) if len(ennoblements) == 0 { break } allEnnoblements = append(allEnnoblements, ennoblements...) require.NoError(collect, listParams.SetOffset(listParams.Offset()+domain.EnnoblementListMaxLimit)) } if !assert.Len(collect, allEnnoblements, len(expectedEnnoblements)) { return } for i, expected := range expectedEnnoblements { actual := allEnnoblements[i] msg := fmt.Sprintf("ID=%.0f,ServerKey=%s", expected["ID"], expected["ServerKey"]) assert.EqualValues(collect, expected["ID"], actual.ID(), msg) assert.Equal(collect, expected["ServerKey"], actual.ServerKey(), msg) assert.EqualValues(collect, expected["OldOwnerID"], actual.OldOwnerID(), msg) assert.EqualValues(collect, expected["OldTribeID"], actual.OldTribeID(), msg) assert.EqualValues(collect, expected["NewOwnerID"], actual.NewOwnerID(), msg) assert.EqualValues(collect, expected["NewTribeID"], actual.NewTribeID(), msg) assert.EqualValues(collect, expected["Points"], actual.Points(), msg) assert.Equal(collect, expected["CreatedAt"], actual.CreatedAt().Format(time.RFC3339), msg) } }, 30*time.Second, time.Second, "ennoblements") }) } }