diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index 27fa451..0e789dc 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -65,6 +65,7 @@ var cmdConsumer = &cli.Command{ c.String(rmqFlagTopicPlayersSyncedEvent.Name), c.String(rmqFlagTopicVillagesSyncedEvent.Name), c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name), + c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), ) consumer.Register(router) @@ -95,19 +96,33 @@ var cmdConsumer = &cli.Command{ marshaler, c.String(rmqFlagTopicTribesSyncedEvent.Name), ) + tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher( + publisher, + newWatermillMarshaler(), + c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), + c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name), + ) twSvc, err := newTWServiceFromFlags(c) if err != nil { return err } + tribeSvc := app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher) + tribeSnapshotSvc := app.NewTribeSnapshotService( + adapter.NewTribeSnapshotBunRepository(db), + tribeSvc, + tribeSnapshotPublisher, + ) consumer := port.NewTribeWatermillConsumer( - app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher), + tribeSvc, + tribeSnapshotSvc, subscriber, logger, marshaler, c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicVillagesSyncedEvent.Name), + c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name), ) consumer.Register(router) diff --git a/internal/adapter/repository_bun_server.go b/internal/adapter/repository_bun_server.go index cbe8e1f..6626ab9 100644 --- a/internal/adapter/repository_bun_server.go +++ b/internal/adapter/repository_bun_server.go @@ -163,6 +163,10 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery { q = q.Set("ennoblement_data_updated_at = ?", ennoblementDataSyncedAt.Value) } + if tribeSnapshotsCreatedAt := a.params.TribeSnapshotsCreatedAt(); tribeSnapshotsCreatedAt.Valid { + q = q.Set("tribe_snapshots_created_at = ?", tribeSnapshotsCreatedAt.Value) + } + return q } diff --git a/internal/adapter/repository_bun_tribe_snapshot.go b/internal/adapter/repository_bun_tribe_snapshot.go new file mode 100644 index 0000000..45e9489 --- /dev/null +++ b/internal/adapter/repository_bun_tribe_snapshot.go @@ -0,0 +1,104 @@ +package adapter + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/bun/bunmodel" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/uptrace/bun" +) + +type TribeSnapshotBunRepository struct { + db bun.IDB +} + +func NewTribeSnapshotBunRepository(db bun.IDB) *TribeSnapshotBunRepository { + return &TribeSnapshotBunRepository{db: db} +} + +func (repo *TribeSnapshotBunRepository) Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error { + if len(params) == 0 { + return nil + } + + now := time.Now() + tribeSnapshots := make(bunmodel.TribeSnapshots, 0, len(params)) + + for _, p := range params { + tribeSnapshots = append(tribeSnapshots, bunmodel.TribeSnapshot{ + TribeID: p.TribeID(), + ServerKey: p.ServerKey(), + NumMembers: p.NumMembers(), + NumVillages: p.NumVillages(), + Points: p.Points(), + AllPoints: p.AllPoints(), + Rank: p.Rank(), + Dominance: p.Dominance(), + Date: p.Date(), + CreatedAt: now, + OpponentsDefeated: bunmodel.NewOpponentsDefeated(p.OD()), + }) + } + + if _, err := repo.db.NewInsert(). + Model(&tribeSnapshots). + Ignore(). + Returning(""). + Exec(ctx); err != nil { + return fmt.Errorf("something went wrong while inserting tribe snapshots into the db: %w", err) + } + + return nil +} + +func (repo *TribeSnapshotBunRepository) List( + ctx context.Context, + params domain.ListTribeSnapshotsParams, +) (domain.TribeSnapshots, error) { + var tribeSnapshots bunmodel.TribeSnapshots + + if err := repo.db.NewSelect(). + Model(&tribeSnapshots). + Apply(listTribeSnapshotsParamsApplier{params: params}.apply). + Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("couldn't select tribe snapshots from the db: %w", err) + } + + return tribeSnapshots.ToDomain() +} + +type listTribeSnapshotsParamsApplier struct { + params domain.ListTribeSnapshotsParams +} + +//nolint:gocyclo +func (a listTribeSnapshotsParamsApplier) apply(q *bun.SelectQuery) *bun.SelectQuery { + if serverKeys := a.params.ServerKeys(); len(serverKeys) > 0 { + q = q.Where("ts.server_key IN (?)", bun.In(serverKeys)) + } + + for _, s := range a.params.Sort() { + switch s { + case domain.TribeSnapshotSortDateASC: + q = q.Order("ts.date ASC") + case domain.TribeSnapshotSortDateDESC: + q = q.Order("ts.date DESC") + case domain.TribeSnapshotSortIDASC: + q = q.Order("ts.id ASC") + case domain.TribeSnapshotSortIDDESC: + q = q.Order("ts.id DESC") + case domain.TribeSnapshotSortServerKeyASC: + q = q.Order("ts.server_key ASC") + case domain.TribeSnapshotSortServerKeyDESC: + q = q.Order("ts.server_key DESC") + default: + return q.Err(errors.New("unsupported sort value")) + } + } + + return q.Limit(a.params.Limit()).Offset(a.params.Offset()) +} diff --git a/internal/adapter/repository_bun_tribe_snapshot_test.go b/internal/adapter/repository_bun_tribe_snapshot_test.go new file mode 100644 index 0000000..a46923e --- /dev/null +++ b/internal/adapter/repository_bun_tribe_snapshot_test.go @@ -0,0 +1,29 @@ +package adapter_test + +import ( + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest" +) + +func TestTribeSnapshotBunRepository_Postgres(t *testing.T) { + t.Parallel() + + if testing.Short() { + t.Skip("skipping long-running test") + } + + testTribeSnapshotRepository(t, func(t *testing.T) repositories { + t.Helper() + return newBunDBRepositories(t, postgres.NewDB(t)) + }) +} + +func TestTribeSnapshotBunRepository_SQLite(t *testing.T) { + t.Parallel() + + testTribeSnapshotRepository(t, func(t *testing.T) repositories { + t.Helper() + return newBunDBRepositories(t, buntest.NewSQLiteDB(t)) + }) +} diff --git a/internal/adapter/repository_server_test.go b/internal/adapter/repository_server_test.go index d8ca1c2..2b11271 100644 --- a/internal/adapter/repository_server_test.go +++ b/internal/adapter/repository_server_test.go @@ -238,7 +238,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories }, }, { - name: fmt.Sprintf("OK: version code=[%s]", randServer.VersionCode()), + name: fmt.Sprintf("OK: versionCodes=[%s]", randServer.VersionCode()), params: func(t *testing.T) domain.ListServersParams { t.Helper() params := domain.NewListServersParams() @@ -348,7 +348,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories }, }, { - name: "OK: player snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339), + name: "OK: playerSnapshotsCreatedAtLt=" + snapshotsCreatedAtLT.Format(time.RFC3339), params: func(t *testing.T) domain.ListServersParams { t.Helper() params := domain.NewListServersParams() @@ -375,7 +375,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories }, }, { - name: "OK: tribe snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339), + name: "OK: tribeSnapshotsCreatedAtLt=" + snapshotsCreatedAtLT.Format(time.RFC3339), params: func(t *testing.T) domain.ListServersParams { t.Helper() params := domain.NewListServersParams() @@ -508,6 +508,10 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories Value: time.Now(), Valid: true, })) + require.NoError(t, updateParams.SetTribeSnapshotsCreatedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + })) require.NoError(t, repos.server.Update(ctx, serversBeforeUpdate[0].Key(), updateParams)) @@ -551,6 +555,12 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories serversAfterUpdate[0].EnnoblementDataSyncedAt(), time.Minute, ) + assert.WithinDuration( + t, + updateParams.TribeSnapshotsCreatedAt().Value, + serversAfterUpdate[0].TribeSnapshotsCreatedAt(), + time.Minute, + ) }) t.Run("ERR: not found", func(t *testing.T) { diff --git a/internal/adapter/repository_test.go b/internal/adapter/repository_test.go index 4b0ad79..f498b3d 100644 --- a/internal/adapter/repository_test.go +++ b/internal/adapter/repository_test.go @@ -54,14 +54,20 @@ type tribeChangeRepository interface { List(ctx context.Context, params domain.ListTribeChangesParams) (domain.TribeChanges, error) } +type tribeSnapshotRepository interface { + Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error + List(ctx context.Context, params domain.ListTribeSnapshotsParams) (domain.TribeSnapshots, error) +} + type repositories struct { - version versionRepository - server serverRepository - tribe tribeRepository - player playerRepository - village villageRepository - ennoblement ennoblementRepository - tribeChange tribeChangeRepository + version versionRepository + server serverRepository + tribe tribeRepository + player playerRepository + village villageRepository + ennoblement ennoblementRepository + tribeChange tribeChangeRepository + tribeSnapshot tribeSnapshotRepository } func newBunDBRepositories(tb testing.TB, bunDB *bun.DB) repositories { @@ -70,12 +76,13 @@ func newBunDBRepositories(tb testing.TB, bunDB *bun.DB) repositories { buntest.NewFixture(bunDB).Load(tb, context.Background(), os.DirFS("testdata"), "fixture.yml") return repositories{ - version: adapter.NewVersionBunRepository(bunDB), - server: adapter.NewServerBunRepository(bunDB), - tribe: adapter.NewTribeBunRepository(bunDB), - player: adapter.NewPlayerBunRepository(bunDB), - village: adapter.NewVillageBunRepository(bunDB), - ennoblement: adapter.NewEnnoblementBunRepository(bunDB), - tribeChange: adapter.NewTribeChangeBunRepository(bunDB), + version: adapter.NewVersionBunRepository(bunDB), + server: adapter.NewServerBunRepository(bunDB), + tribe: adapter.NewTribeBunRepository(bunDB), + player: adapter.NewPlayerBunRepository(bunDB), + village: adapter.NewVillageBunRepository(bunDB), + ennoblement: adapter.NewEnnoblementBunRepository(bunDB), + tribeChange: adapter.NewTribeChangeBunRepository(bunDB), + tribeSnapshot: adapter.NewTribeSnapshotBunRepository(bunDB), } } diff --git a/internal/adapter/repository_tribe_snapshot_test.go b/internal/adapter/repository_tribe_snapshot_test.go new file mode 100644 index 0000000..4bc475c --- /dev/null +++ b/internal/adapter/repository_tribe_snapshot_test.go @@ -0,0 +1,332 @@ +package adapter_test + +import ( + "cmp" + "context" + "fmt" + "slices" + "testing" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testTribeSnapshotRepository(t *testing.T, newRepos func(t *testing.T) repositories) { + t.Helper() + + ctx := context.Background() + + t.Run("Create", func(t *testing.T) { + t.Parallel() + + const dateFormat = "2006-01-02" + + repos := newRepos(t) + + assertCreated := func(t *testing.T, params []domain.CreateTribeSnapshotParams) { + t.Helper() + + require.NotEmpty(t, params) + + tribeSnapshots, err := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams()) + require.NoError(t, err) + for i, p := range params { + date := p.Date().Format(dateFormat) + + idx := slices.IndexFunc(tribeSnapshots, func(ts domain.TribeSnapshot) bool { + return ts.ServerKey() == p.ServerKey() && + ts.TribeID() == p.TribeID() && + ts.Date().Format(dateFormat) == date + }) + require.GreaterOrEqualf(t, idx, 0, "params[%d] not found", i) + tribeSnapshot := tribeSnapshots[idx] + + assert.Equalf(t, p.TribeID(), tribeSnapshot.TribeID(), "params[%d]", i) + assert.Equalf(t, p.ServerKey(), tribeSnapshot.ServerKey(), "params[%d]", i) + assert.Equalf(t, p.NumMembers(), tribeSnapshot.NumMembers(), "params[%d]", i) + assert.Equalf(t, p.NumVillages(), tribeSnapshot.NumVillages(), "params[%d]", i) + assert.Equalf(t, p.Points(), tribeSnapshot.Points(), "params[%d]", i) + assert.Equalf(t, p.AllPoints(), tribeSnapshot.AllPoints(), "params[%d]", i) + assert.Equalf(t, p.Rank(), tribeSnapshot.Rank(), "params[%d]", i) + assert.Equalf(t, p.OD(), tribeSnapshot.OD(), "params[%d]", i) + assert.InDeltaf(t, p.Dominance(), tribeSnapshot.Dominance(), 0.001, "params[%d]", i) + assert.Equalf(t, date, tribeSnapshot.Date().Format(dateFormat), "params[%d]", i) + assert.WithinDurationf(t, time.Now(), tribeSnapshot.CreatedAt(), time.Minute, "params[%d]", i) + } + } + + assertNoDuplicates := func(t *testing.T, params []domain.CreateTribeSnapshotParams) { + t.Helper() + + require.NotEmpty(t, params) + + tribeSnapshots, err := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams()) + require.NoError(t, err) + + res := make(map[string][]int) + + for _, p := range params { + key := fmt.Sprintf("%s-%d-%s", p.ServerKey(), p.TribeID(), p.Date().Format(dateFormat)) + + for i, ts := range tribeSnapshots { + if ts.ServerKey() == p.ServerKey() && ts.TribeID() == p.TribeID() && ts.Date().Equal(p.Date()) { + res[key] = append(res[key], i) + } + } + } + + for key, indexes := range res { + assert.Len(t, indexes, 1, key) + } + } + + t.Run("OK", func(t *testing.T) { + t.Parallel() + + listTribesParams := domain.NewListTribesParams() + require.NoError(t, listTribesParams.SetDeleted(domain.NullBool{ + Value: false, + Valid: true, + })) + + tribes, err := repos.tribe.List(ctx, listTribesParams) + require.NoError(t, err) + require.NotEmpty(t, tribes) + + date := time.Now() + + createParams, err := domain.NewCreateTribeSnapshotParams(tribes, date) + require.NoError(t, err) + + require.NoError(t, repos.tribeSnapshot.Create(ctx, createParams...)) + assertCreated(t, createParams) + + require.NoError(t, repos.tribeSnapshot.Create(ctx, createParams...)) + assertNoDuplicates(t, createParams) + }) + + t.Run("OK: len(params) == 0", func(t *testing.T) { + t.Parallel() + + require.NoError(t, repos.tribeSnapshot.Create(ctx)) + }) + }) + + t.Run("List & ListCount", func(t *testing.T) { + t.Parallel() + + repos := newRepos(t) + + tribeSnapshots, listTribeSnapshotsErr := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams()) + require.NoError(t, listTribeSnapshotsErr) + require.NotEmpty(t, tribeSnapshots) + randTribeSnapshot := tribeSnapshots[0] + + tests := []struct { + name string + params func(t *testing.T) domain.ListTribeSnapshotsParams + assertTribeSnapshots func(t *testing.T, params domain.ListTribeSnapshotsParams, tribeSnapshots domain.TribeSnapshots) + assertError func(t *testing.T, err error) + assertTotal func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) + }{ + { + name: "OK: default params", + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + return domain.NewListTribeSnapshotsParams() + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + assert.NotEmpty(t, len(tribeSnapshots)) + assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int { + if x := cmp.Compare(a.ServerKey(), b.ServerKey()); x != 0 { + return x + } + if x := a.Date().Compare(b.Date()); x != 0 { + return x + } + return cmp.Compare(a.ID(), b.ID()) + })) + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: "OK: sort=[serverKey DESC, date DESC]", + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + params := domain.NewListTribeSnapshotsParams() + require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{ + domain.TribeSnapshotSortServerKeyDESC, + domain.TribeSnapshotSortDateDESC, + })) + return params + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + assert.NotEmpty(t, len(tribeSnapshots)) + assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int { + if x := cmp.Compare(a.ServerKey(), b.ServerKey()) * -1; x != 0 { + return x + } + return a.Date().Compare(b.Date()) * -1 + })) + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: "OK: sort=[id ASC]", + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + params := domain.NewListTribeSnapshotsParams() + require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{ + domain.TribeSnapshotSortIDASC, + })) + return params + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + assert.NotEmpty(t, len(tribeSnapshots)) + assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int { + return cmp.Compare(a.ID(), b.ID()) + })) + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: "OK: sort=[id DESC]", + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + params := domain.NewListTribeSnapshotsParams() + require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{ + domain.TribeSnapshotSortIDDESC, + })) + return params + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + assert.NotEmpty(t, len(tribeSnapshots)) + assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int { + return cmp.Compare(a.ID(), b.ID()) * -1 + })) + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: fmt.Sprintf("OK: serverKeys=[%s]", randTribeSnapshot.ServerKey()), + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + params := domain.NewListTribeSnapshotsParams() + require.NoError(t, params.SetServerKeys([]string{randTribeSnapshot.ServerKey()})) + return params + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + + serverKeys := params.ServerKeys() + + for _, ts := range tribeSnapshots { + assert.True(t, slices.Contains(serverKeys, ts.ServerKey())) + } + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + { + name: "OK: offset=1 limit=2", + params: func(t *testing.T) domain.ListTribeSnapshotsParams { + t.Helper() + params := domain.NewListTribeSnapshotsParams() + require.NoError(t, params.SetOffset(1)) + require.NoError(t, params.SetLimit(2)) + return params + }, + assertTribeSnapshots: func( + t *testing.T, + params domain.ListTribeSnapshotsParams, + tribeSnapshots domain.TribeSnapshots, + ) { + t.Helper() + assert.Len(t, tribeSnapshots, params.Limit()) + }, + assertError: func(t *testing.T, err error) { + t.Helper() + require.NoError(t, err) + }, + assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) { + t.Helper() + assert.NotEmpty(t, total) + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + params := tt.params(t) + + res, err := repos.tribeSnapshot.List(ctx, params) + tt.assertError(t, err) + tt.assertTribeSnapshots(t, params, res) + }) + } + }) +} diff --git a/internal/adapter/testdata/fixture.yml b/internal/adapter/testdata/fixture.yml index c45504a..7a34b6f 100644 --- a/internal/adapter/testdata/fixture.yml +++ b/internal/adapter/testdata/fixture.yml @@ -7374,3 +7374,125 @@ new_tribe_id: 2 server_key: pl169 created_at: 2021-09-10T20:01:11.000Z +- model: TribeSnapshot + rows: + - rank_att: 1 + score_att: 669292167 + rank_def: 5 + score_def: 199124128 + rank_sup: 0 + score_sup: 0 + rank_total: 2 + score_total: 868416295 + _id: pl169-csa-2021-09-02 + id: 10000 + tribe_id: 28 + server_key: pl169 + num_members: 66 + num_villages: 14480 + points: 114277979 + all_points: 143350058 + rank: 1 + dominance: 29.76912481240106 + date: 2021-09-02T00:00:00.000Z + created_at: 2021-09-02T23:01:13.000Z + - rank_att: 1 + score_att: 669292167 + rank_def: 5 + score_def: 199124128 + rank_sup: 0 + score_sup: 0 + rank_total: 2 + score_total: 868416295 + _id: pl169-csa-2021-09-03 + id: 10001 + tribe_id: 28 + server_key: pl169 + num_members: 66 + num_villages: 14480 + points: 114277979 + all_points: 143350058 + rank: 1 + dominance: 29.76912481240106 + date: 2021-09-03T00:00:00.000Z + created_at: 2021-09-03T23:01:13.000Z + - rank_att: 2 + score_att: 513902873 + rank_def: 6 + score_def: 177660231 + rank_sup: 0 + score_sup: 0 + rank_total: 3 + score_total: 691563104 + _id: pl169-kuzyni-2021-09-02 + id: 10050 + tribe_id: 27 + server_key: pl169 + num_members: 52 + num_villages: 10152 + points: 91951132 + all_points: 97812820 + rank: 3 + dominance: 20.87128142924693 + date: 2021-09-02T00:00:00.000Z + created_at: 2021-09-02T22:00:37.000Z + - rank_att: 2 + score_att: 513902873 + rank_def: 6 + score_def: 177660231 + rank_sup: 0 + score_sup: 0 + rank_total: 3 + score_total: 691563104 + _id: pl169-kuzyni-2021-09-03 + id: 10051 + tribe_id: 27 + server_key: pl169 + num_members: 52 + num_villages: 10152 + points: 91951132 + all_points: 97812820 + rank: 3 + dominance: 20.87128142924693 + date: 2021-09-03T00:00:00.000Z + created_at: 2021-09-03T22:00:37.000Z + - rank_att: 1 + score_att: 1449031872 + rank_def: 1 + score_def: 993299748 + rank_sup: 0 + score_sup: 0 + rank_total: 1 + score_total: 2442331620 + _id: en113-kekw-2020-06-22 + id: 10100 + tribe_id: 122 + server_key: en113 + num_members: 20 + num_villages: 22568 + points: 266237628 + all_points: 266237628 + rank: 1 + dominance: 54.27478896611433 + date: 2020-06-22T00:00:00.000Z + created_at: 2020-06-22T13:45:46.000Z + - rank_att: 1 + score_att: 1449031872 + rank_def: 1 + score_def: 993299748 + rank_sup: 0 + score_sup: 0 + rank_total: 1 + score_total: 2442331620 + _id: en113-kekw-2020-06-23 + id: 10101 + tribe_id: 122 + server_key: en113 + num_members: 20 + num_villages: 22568 + points: 266237628 + all_points: 266237628 + rank: 1 + dominance: 54.27478896611433 + date: 2020-06-23T00:00:00.000Z + created_at: 2020-06-23T13:45:46.000Z diff --git a/internal/app/service_player.go b/internal/app/service_player.go index 82cf0a1..ff1246c 100644 --- a/internal/app/service_player.go +++ b/internal/app/service_player.go @@ -50,7 +50,7 @@ func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.S return fmt.Errorf("%s: couldn't delete players: %w", serverKey, err) } - playersSyncedPayload, err := domain.NewPlayersSyncedEventPayload( + payload, err := domain.NewPlayersSyncedEventPayload( serverKey, serverURL, serverSyncedPayload.VersionCode(), @@ -60,7 +60,7 @@ func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.S return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err) } - if err = svc.pub.EventSynced(ctx, playersSyncedPayload); err != nil { + if err = svc.pub.EventSynced(ctx, payload); err != nil { return fmt.Errorf("%s: %w", serverKey, err) } diff --git a/internal/app/service_server.go b/internal/app/service_server.go index fa520dd..e2dbcd8 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -272,3 +272,20 @@ func (svc *ServerService) UpdateEnnoblementDataSyncedAt( return svc.repo.Update(ctx, key, updateParams) } + +func (svc *ServerService) UpdateTribeSnapshotsCreatedAt( + ctx context.Context, + payload domain.SnapshotsCreatedEventPayload, +) error { + key := payload.ServerKey() + + var updateParams domain.UpdateServerParams + if err := updateParams.SetTribeSnapshotsCreatedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + + return svc.repo.Update(ctx, key, updateParams) +} diff --git a/internal/app/service_tribe.go b/internal/app/service_tribe.go index ba8400f..cd3d7d2 100644 --- a/internal/app/service_tribe.go +++ b/internal/app/service_tribe.go @@ -44,7 +44,7 @@ func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.Se return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err) } - tribesSyncedPayload, err := domain.NewTribesSyncedEventPayload( + payload, err := domain.NewTribesSyncedEventPayload( serverKey, serverURL, serverSyncedPayload.VersionCode(), @@ -54,7 +54,7 @@ func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.Se return fmt.Errorf("%s: couldn't construct domain.TribesSyncedEventPayload: %w", serverKey, err) } - if err = svc.pub.EventSynced(ctx, tribesSyncedPayload); err != nil { + if err = svc.pub.EventSynced(ctx, payload); err != nil { return fmt.Errorf("%s: %w", serverKey, err) } @@ -157,3 +157,7 @@ func (svc *TribeService) delete(ctx context.Context, serverKey string, tribes do func (svc *TribeService) UpdateDominance(ctx context.Context, payload domain.VillagesSyncedEventPayload) error { return svc.repo.UpdateDominance(ctx, payload.ServerKey(), payload.NumPlayerVillages()) } + +func (svc *TribeService) List(ctx context.Context, params domain.ListTribesParams) (domain.Tribes, error) { + return svc.repo.List(ctx, params) +} diff --git a/internal/app/service_tribe_change.go b/internal/app/service_tribe_change.go index c856eb0..8e8445e 100644 --- a/internal/app/service_tribe_change.go +++ b/internal/app/service_tribe_change.go @@ -8,9 +8,7 @@ import ( type TribeChangeRepository interface { // Create persists tribe changes in a store (e.g. Postgres). - // If there is a similar tribe change, such changes are ignored. - // Similar means that there is a tribe change with the same player id, - // new tribe id and old tribe id and this tribe change was created within the same hour. + // Duplicates are ignored. Create(ctx context.Context, params ...domain.CreateTribeChangeParams) error } diff --git a/internal/app/service_tribe_snapshot.go b/internal/app/service_tribe_snapshot.go new file mode 100644 index 0000000..59097d4 --- /dev/null +++ b/internal/app/service_tribe_snapshot.go @@ -0,0 +1,92 @@ +package app + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type TribeSnapshotRepository interface { + // Create persists tribe snapshots in a store (e.g. Postgres). + // Duplicates are ignored. + Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error +} + +type TribeSnapshotService struct { + repo TribeSnapshotRepository + tribeSvc *TribeService + pub SnapshotPublisher +} + +func NewTribeSnapshotService( + repo TribeSnapshotRepository, + tribeSvc *TribeService, + pub SnapshotPublisher, +) *TribeSnapshotService { + return &TribeSnapshotService{repo: repo, tribeSvc: tribeSvc, pub: pub} +} + +//nolint:gocyclo +func (svc *TribeSnapshotService) Create( + ctx context.Context, + createSnapshotsCmdPayload domain.CreateSnapshotsCmdPayload, +) error { + serverKey := createSnapshotsCmdPayload.ServerKey() + date := createSnapshotsCmdPayload.Date() + + listTribesParams := domain.NewListTribesParams() + if err := listTribesParams.SetServerKeys([]string{serverKey}); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + if err := listTribesParams.SetDeleted(domain.NullBool{ + Value: false, + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + if err := listTribesParams.SetSort([]domain.TribeSort{domain.TribeSortIDASC}); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + if err := listTribesParams.SetLimit(domain.TribeListMaxLimit); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + + for { + tribes, err := svc.tribeSvc.List(ctx, listTribesParams) + if err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + + if len(tribes) == 0 { + break + } + + params, err := domain.NewCreateTribeSnapshotParams(tribes, date) + if err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + + if err = svc.repo.Create(ctx, params...); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + + if err = listTribesParams.SetIDGT(domain.NullInt{ + Value: tribes[len(tribes)-1].ID(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + } + + payload, err := domain.NewSnapshotsCreatedEventPayload(serverKey, createSnapshotsCmdPayload.VersionCode()) + if err != nil { + return fmt.Errorf("%s: couldn't construct domain.SnapshotsCreatedEventPayload: %w", serverKey, err) + } + + if err = svc.pub.EventCreated(ctx, payload); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + + return nil +} diff --git a/internal/bun/bunmodel/tribe_snapshot.go b/internal/bun/bunmodel/tribe_snapshot.go new file mode 100644 index 0000000..757db1c --- /dev/null +++ b/internal/bun/bunmodel/tribe_snapshot.go @@ -0,0 +1,79 @@ +package bunmodel + +import ( + "fmt" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/uptrace/bun" +) + +type TribeSnapshot struct { + bun.BaseModel `bun:"table:tribe_snapshots,alias:ts"` + + ID int `bun:"id,pk,autoincrement,identity"` + TribeID int `bun:"tribe_id,nullzero"` + ServerKey string `bun:"server_key,nullzero"` + NumMembers int `bun:"num_members"` + NumVillages int `bun:"num_villages"` + Points int `bun:"points"` + AllPoints int `bun:"all_points"` + Rank int `bun:"rank"` + Dominance float64 `bun:"dominance"` + Date time.Time `bun:"date,nullzero"` + CreatedAt time.Time `bun:"created_at,nullzero"` + + OpponentsDefeated +} + +func (ts TribeSnapshot) ToDomain() (domain.TribeSnapshot, error) { + od, err := ts.OpponentsDefeated.ToDomain() + if err != nil { + return domain.TribeSnapshot{}, fmt.Errorf( + "couldn't construct domain.TribeSnapshot (id=%d): %w", + ts.ID, + err, + ) + } + + converted, err := domain.UnmarshalTribeSnapshotFromDatabase( + ts.ID, + ts.TribeID, + ts.ServerKey, + ts.NumMembers, + ts.NumVillages, + ts.Points, + ts.AllPoints, + ts.Rank, + od, + ts.Dominance, + ts.Date, + ts.CreatedAt, + ) + if err != nil { + return domain.TribeSnapshot{}, fmt.Errorf( + "couldn't construct domain.TribeSnapshot (id=%d): %w", + ts.ID, + err, + ) + } + + return converted, nil +} + +type TribeSnapshots []TribeSnapshot + +func (tss TribeSnapshots) ToDomain() (domain.TribeSnapshots, error) { + res := make(domain.TribeSnapshots, 0, len(tss)) + + for _, ts := range tss { + converted, err := ts.ToDomain() + if err != nil { + return nil, err + } + + res = append(res, converted) + } + + return res, nil +} diff --git a/internal/bun/buntest/fixture.go b/internal/bun/buntest/fixture.go index 3128914..b1a2dce 100644 --- a/internal/bun/buntest/fixture.go +++ b/internal/bun/buntest/fixture.go @@ -23,6 +23,7 @@ func NewFixture(bunDB *bun.DB) *Fixture { (*bunmodel.Village)(nil), (*bunmodel.Ennoblement)(nil), (*bunmodel.TribeChange)(nil), + (*bunmodel.TribeSnapshot)(nil), ) return &Fixture{ f: dbfixture.New(bunDB), diff --git a/internal/domain/server.go b/internal/domain/server.go index 2431ec4..bf4788c 100644 --- a/internal/domain/server.go +++ b/internal/domain/server.go @@ -287,6 +287,7 @@ type UpdateServerParams struct { numBonusVillages NullInt villageDataSyncedAt NullTime ennoblementDataSyncedAt NullTime + tribeSnapshotsCreatedAt NullTime } const updateServerParamsModelName = "UpdateServerParams" @@ -419,6 +420,16 @@ func (params *UpdateServerParams) SetEnnoblementDataSyncedAt(ennoblementDataSync return nil } +func (params *UpdateServerParams) TribeSnapshotsCreatedAt() NullTime { + return params.tribeSnapshotsCreatedAt +} + +func (params *UpdateServerParams) SetTribeSnapshotsCreatedAt(tribeSnapshotsCreatedAt NullTime) error { + params.tribeSnapshotsCreatedAt = tribeSnapshotsCreatedAt + return nil +} + +//nolint:gocyclo func (params *UpdateServerParams) IsZero() bool { return !params.config.Valid && !params.buildingInfo.Valid && @@ -429,7 +440,8 @@ func (params *UpdateServerParams) IsZero() bool { !params.playerDataSyncedAt.Valid && !params.numVillages.Valid && !params.villageDataSyncedAt.Valid && - !params.ennoblementDataSyncedAt.Valid + !params.ennoblementDataSyncedAt.Valid && + !params.tribeSnapshotsCreatedAt.Valid } type ServerSort uint8 diff --git a/internal/domain/tribe.go b/internal/domain/tribe.go index 4d048f2..9fe8a52 100644 --- a/internal/domain/tribe.go +++ b/internal/domain/tribe.go @@ -215,6 +215,10 @@ func (t Tribe) Base() BaseTribe { } } +func (t Tribe) IsZero() bool { + return t == Tribe{} +} + type Tribes []Tribe // Delete finds all tribes with the given serverKey that are not in the given slice with active tribes diff --git a/internal/domain/tribe_snapshot.go b/internal/domain/tribe_snapshot.go new file mode 100644 index 0000000..fbab562 --- /dev/null +++ b/internal/domain/tribe_snapshot.go @@ -0,0 +1,307 @@ +package domain + +import ( + "fmt" + "math" + "time" +) + +type TribeSnapshot struct { + id int + tribeID int + serverKey string + numMembers int + numVillages int + points int + allPoints int + rank int + od OpponentsDefeated + dominance float64 + date time.Time + createdAt time.Time +} + +const tribeSnapshotModelName = "TribeSnapshot" + +// UnmarshalTribeSnapshotFromDatabase unmarshals TribeSnapshot from the database. +// +// It should be used only for unmarshalling from the database! +// You can't use UnmarshalTribeSnapshotFromDatabase as constructor - It may put domain into the invalid state! +func UnmarshalTribeSnapshotFromDatabase( + id int, + tribeID int, + serverKey string, + numMembers int, + numVillages int, + points int, + allPoints int, + rank int, + od OpponentsDefeated, + dominance float64, + date time.Time, + createdAt time.Time, +) (TribeSnapshot, error) { + if err := validateIntInRange(id, 1, math.MaxInt); err != nil { + return TribeSnapshot{}, ValidationError{ + Model: tribeSnapshotModelName, + Field: "id", + Err: err, + } + } + + if err := validateServerKey(serverKey); err != nil { + return TribeSnapshot{}, ValidationError{ + Model: tribeSnapshotModelName, + Field: "serverKey", + Err: err, + } + } + + return TribeSnapshot{ + id: id, + tribeID: tribeID, + serverKey: serverKey, + numMembers: numMembers, + numVillages: numVillages, + points: points, + allPoints: allPoints, + rank: rank, + od: od, + dominance: dominance, + date: date, + createdAt: createdAt, + }, nil +} + +func (ts TribeSnapshot) ID() int { + return ts.id +} + +func (ts TribeSnapshot) TribeID() int { + return ts.tribeID +} + +func (ts TribeSnapshot) ServerKey() string { + return ts.serverKey +} + +func (ts TribeSnapshot) NumMembers() int { + return ts.numMembers +} + +func (ts TribeSnapshot) NumVillages() int { + return ts.numVillages +} + +func (ts TribeSnapshot) Points() int { + return ts.points +} + +func (ts TribeSnapshot) AllPoints() int { + return ts.allPoints +} + +func (ts TribeSnapshot) Rank() int { + return ts.rank +} + +func (ts TribeSnapshot) OD() OpponentsDefeated { + return ts.od +} + +func (ts TribeSnapshot) Dominance() float64 { + return ts.dominance +} + +func (ts TribeSnapshot) Date() time.Time { + return ts.date +} + +func (ts TribeSnapshot) CreatedAt() time.Time { + return ts.createdAt +} + +type TribeSnapshots []TribeSnapshot + +type CreateTribeSnapshotParams struct { + tribeID int + serverKey string + numMembers int + numVillages int + points int + allPoints int + rank int + od OpponentsDefeated + dominance float64 + date time.Time +} + +func NewCreateTribeSnapshotParams(tribes Tribes, date time.Time) ([]CreateTribeSnapshotParams, error) { + params := make([]CreateTribeSnapshotParams, 0, len(tribes)) + + for i, t := range tribes { + if t.IsZero() { + return nil, fmt.Errorf("tribes[%d] is an empty struct", i) + } + + if t.IsDeleted() { + continue + } + + params = append(params, CreateTribeSnapshotParams{ + tribeID: t.ID(), + serverKey: t.ServerKey(), + numMembers: t.NumMembers(), + numVillages: t.NumVillages(), + points: t.Points(), + allPoints: t.AllPoints(), + rank: t.Rank(), + od: t.OD(), + dominance: t.Dominance(), + date: date, + }) + } + + return params, nil +} + +func (params CreateTribeSnapshotParams) TribeID() int { + return params.tribeID +} + +func (params CreateTribeSnapshotParams) ServerKey() string { + return params.serverKey +} + +func (params CreateTribeSnapshotParams) NumMembers() int { + return params.numMembers +} + +func (params CreateTribeSnapshotParams) NumVillages() int { + return params.numVillages +} + +func (params CreateTribeSnapshotParams) Points() int { + return params.points +} + +func (params CreateTribeSnapshotParams) AllPoints() int { + return params.allPoints +} + +func (params CreateTribeSnapshotParams) Rank() int { + return params.rank +} + +func (params CreateTribeSnapshotParams) OD() OpponentsDefeated { + return params.od +} + +func (params CreateTribeSnapshotParams) Dominance() float64 { + return params.dominance +} + +func (params CreateTribeSnapshotParams) Date() time.Time { + return params.date +} + +type TribeSnapshotSort uint8 + +const ( + TribeSnapshotSortDateASC TribeSnapshotSort = iota + 1 + TribeSnapshotSortDateDESC + TribeSnapshotSortIDASC + TribeSnapshotSortIDDESC + TribeSnapshotSortServerKeyASC + TribeSnapshotSortServerKeyDESC +) + +const TribeSnapshotListMaxLimit = 200 + +type ListTribeSnapshotsParams struct { + serverKeys []string + sort []TribeSnapshotSort + limit int + offset int +} + +const listTribeSnapshotsParamsModelName = "ListTribeSnapshotsParams" + +func NewListTribeSnapshotsParams() ListTribeSnapshotsParams { + return ListTribeSnapshotsParams{ + sort: []TribeSnapshotSort{ + TribeSnapshotSortServerKeyASC, + TribeSnapshotSortDateASC, + TribeSnapshotSortIDASC, + }, + limit: TribeSnapshotListMaxLimit, + } +} + +func (params *ListTribeSnapshotsParams) ServerKeys() []string { + return params.serverKeys +} + +func (params *ListTribeSnapshotsParams) SetServerKeys(serverKeys []string) error { + params.serverKeys = serverKeys + return nil +} + +func (params *ListTribeSnapshotsParams) Sort() []TribeSnapshotSort { + return params.sort +} + +const ( + tribeSnapshotSortMinLength = 1 + tribeSnapshotSortMaxLength = 3 +) + +func (params *ListTribeSnapshotsParams) SetSort(sort []TribeSnapshotSort) error { + if err := validateSliceLen(sort, tribeSnapshotSortMinLength, tribeSnapshotSortMaxLength); err != nil { + return ValidationError{ + Model: listTribeSnapshotsParamsModelName, + Field: "sort", + Err: err, + } + } + + params.sort = sort + + return nil +} + +func (params *ListTribeSnapshotsParams) Limit() int { + return params.limit +} + +func (params *ListTribeSnapshotsParams) SetLimit(limit int) error { + if err := validateIntInRange(limit, 1, TribeSnapshotListMaxLimit); err != nil { + return ValidationError{ + Model: listTribeSnapshotsParamsModelName, + Field: "limit", + Err: err, + } + } + + params.limit = limit + + return nil +} + +func (params *ListTribeSnapshotsParams) Offset() int { + return params.offset +} + +func (params *ListTribeSnapshotsParams) SetOffset(offset int) error { + if err := validateIntInRange(offset, 0, math.MaxInt); err != nil { + return ValidationError{ + Model: listTribeSnapshotsParamsModelName, + Field: "offset", + Err: err, + } + } + + params.offset = offset + + return nil +} diff --git a/internal/domain/tribe_snapshot_test.go b/internal/domain/tribe_snapshot_test.go new file mode 100644 index 0000000..4c38d4a --- /dev/null +++ b/internal/domain/tribe_snapshot_test.go @@ -0,0 +1,238 @@ +package domain_test + +import ( + "fmt" + "slices" + "testing" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewCreateTribeSnapshotParams(t *testing.T) { + t.Parallel() + + tribes := domain.Tribes{ + domaintest.NewTribe(t), + domaintest.NewTribe(t), + domaintest.NewTribe(t), + } + date := time.Now() + + res, err := domain.NewCreateTribeSnapshotParams(tribes, date) + require.NoError(t, err) + assert.Len(t, res, len(tribes)) + for i, tr := range tribes { + idx := slices.IndexFunc(res, func(params domain.CreateTribeSnapshotParams) bool { + return params.TribeID() == tr.ID() && params.ServerKey() == tr.ServerKey() + }) + require.GreaterOrEqualf(t, idx, 0, "tribes[%d] not found", i) + + params := res[i] + + assert.Equalf(t, tr.ID(), params.TribeID(), "tribes[%d]", i) + assert.Equalf(t, tr.ServerKey(), params.ServerKey(), "tribes[%d]", i) + assert.Equalf(t, tr.NumMembers(), params.NumMembers(), "tribes[%d]", i) + assert.Equalf(t, tr.NumVillages(), params.NumVillages(), "tribes[%d]", i) + assert.Equalf(t, tr.Points(), params.Points(), "tribes[%d]", i) + assert.Equalf(t, tr.AllPoints(), params.AllPoints(), "tribes[%d]", i) + assert.Equalf(t, tr.Rank(), params.Rank(), "tribes[%d]", i) + assert.Equalf(t, tr.OD(), params.OD(), "tribes[%d]", i) + assert.InDeltaf(t, tr.Dominance(), params.Dominance(), 0.001, "tribes[%d]", i) + assert.Equalf(t, date, params.Date(), "tribes[%d]", i) + } +} + +func TestListTribeSnapshotsParams_SetSort(t *testing.T) { + t.Parallel() + + type args struct { + sort []domain.TribeSnapshotSort + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + sort: []domain.TribeSnapshotSort{ + domain.TribeSnapshotSortDateASC, + domain.TribeSnapshotSortServerKeyASC, + }, + }, + }, + { + name: "ERR: len(sort) < 1", + args: args{ + sort: nil, + }, + expectedErr: domain.ValidationError{ + Model: "ListTribeSnapshotsParams", + Field: "sort", + Err: domain.LenOutOfRangeError{ + Min: 1, + Max: 3, + Current: 0, + }, + }, + }, + { + name: "ERR: len(sort) > 3", + args: args{ + sort: []domain.TribeSnapshotSort{ + domain.TribeSnapshotSortDateASC, + domain.TribeSnapshotSortServerKeyASC, + domain.TribeSnapshotSortIDASC, + domain.TribeSnapshotSortIDDESC, + }, + }, + expectedErr: domain.ValidationError{ + Model: "ListTribeSnapshotsParams", + Field: "sort", + Err: domain.LenOutOfRangeError{ + Min: 1, + Max: 3, + Current: 4, + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + params := domain.NewListTribeSnapshotsParams() + + require.ErrorIs(t, params.SetSort(tt.args.sort), tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.sort, params.Sort()) + }) + } +} + +func TestListTribeSnapshotsParams_SetLimit(t *testing.T) { + t.Parallel() + + type args struct { + limit int + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + limit: domain.TribeSnapshotListMaxLimit, + }, + }, + { + name: "ERR: limit < 1", + args: args{ + limit: 0, + }, + expectedErr: domain.ValidationError{ + Model: "ListTribeSnapshotsParams", + Field: "limit", + Err: domain.MinGreaterEqualError{ + Min: 1, + Current: 0, + }, + }, + }, + { + name: fmt.Sprintf("ERR: limit > %d", domain.TribeSnapshotListMaxLimit), + args: args{ + limit: domain.TribeSnapshotListMaxLimit + 1, + }, + expectedErr: domain.ValidationError{ + Model: "ListTribeSnapshotsParams", + Field: "limit", + Err: domain.MaxLessEqualError{ + Max: domain.TribeSnapshotListMaxLimit, + Current: domain.TribeSnapshotListMaxLimit + 1, + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + params := domain.NewListTribeSnapshotsParams() + + require.ErrorIs(t, params.SetLimit(tt.args.limit), tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.limit, params.Limit()) + }) + } +} + +func TestListTribeSnapshotsParams_SetOffset(t *testing.T) { + t.Parallel() + + type args struct { + offset int + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + offset: 100, + }, + }, + { + name: "ERR: offset < 0", + args: args{ + offset: -1, + }, + expectedErr: domain.ValidationError{ + Model: "ListTribeSnapshotsParams", + Field: "offset", + Err: domain.MinGreaterEqualError{ + Min: 0, + Current: -1, + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + params := domain.NewListTribeSnapshotsParams() + + require.ErrorIs(t, params.SetOffset(tt.args.offset), tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.offset, params.Offset()) + }) + } +} diff --git a/internal/port/consumer_watermill_server.go b/internal/port/consumer_watermill_server.go index df0e41f..cb146a7 100644 --- a/internal/port/consumer_watermill_server.go +++ b/internal/port/consumer_watermill_server.go @@ -9,16 +9,17 @@ import ( ) type ServerWatermillConsumer struct { - svc *app.ServerService - subscriber message.Subscriber - logger watermill.LoggerAdapter - marshaler watermillmsg.Marshaler - cmdSyncTopic string - eventServerSyncedTopic string - eventTribesSyncedTopic string - eventPlayersSyncedTopic string - eventVillagesSyncedTopic string - eventEnnoblementsSyncedTopic string + svc *app.ServerService + subscriber message.Subscriber + logger watermill.LoggerAdapter + marshaler watermillmsg.Marshaler + cmdSyncTopic string + eventServerSyncedTopic string + eventTribesSyncedTopic string + eventPlayersSyncedTopic string + eventVillagesSyncedTopic string + eventEnnoblementsSyncedTopic string + eventTribeSnapshotsCreatedTopic string } func NewServerWatermillConsumer( @@ -32,18 +33,20 @@ func NewServerWatermillConsumer( eventPlayersSyncedTopic string, eventVillagesSyncedTopic string, eventEnnoblementsSyncedTopic string, + eventTribeSnapshotsCreatedTopic string, ) *ServerWatermillConsumer { return &ServerWatermillConsumer{ - svc: svc, - subscriber: subscriber, - logger: logger, - marshaler: marshaler, - cmdSyncTopic: cmdSyncTopic, - eventServerSyncedTopic: eventServerSyncedTopic, - eventTribesSyncedTopic: eventTribesSyncedTopic, - eventPlayersSyncedTopic: eventPlayersSyncedTopic, - eventVillagesSyncedTopic: eventVillagesSyncedTopic, - eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic, + svc: svc, + subscriber: subscriber, + logger: logger, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + eventServerSyncedTopic: eventServerSyncedTopic, + eventTribesSyncedTopic: eventTribesSyncedTopic, + eventPlayersSyncedTopic: eventPlayersSyncedTopic, + eventVillagesSyncedTopic: eventVillagesSyncedTopic, + eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic, + eventTribeSnapshotsCreatedTopic: eventTribeSnapshotsCreatedTopic, } } @@ -79,6 +82,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) { c.subscriber, c.updateEnnoblementDataSyncedAt, ) + router.AddNoPublisherHandler( + "ServerConsumer.updateTribeSnapshotsCreatedAt", + c.eventTribeSnapshotsCreatedTopic, + c.subscriber, + c.updateTribeSnapshotsCreatedAt, + ) } func (c *ServerWatermillConsumer) sync(msg *message.Message) error { @@ -228,3 +237,27 @@ func (c *ServerWatermillConsumer) updateEnnoblementDataSyncedAt(msg *message.Mes return c.svc.UpdateEnnoblementDataSyncedAt(msg.Context(), payload) } + +func (c *ServerWatermillConsumer) updateTribeSnapshotsCreatedAt(msg *message.Message) error { + var rawPayload watermillmsg.SnapshotsCreatedEventPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewSnapshotsCreatedEventPayload( + rawPayload.ServerKey, + rawPayload.VersionCode, + ) + if err != nil { + c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.svc.UpdateTribeSnapshotsCreatedAt(msg.Context(), payload) +} diff --git a/internal/port/consumer_watermill_tribe.go b/internal/port/consumer_watermill_tribe.go index 9f62ca6..02f7a21 100644 --- a/internal/port/consumer_watermill_tribe.go +++ b/internal/port/consumer_watermill_tribe.go @@ -10,28 +10,34 @@ import ( type TribeWatermillConsumer struct { svc *app.TribeService + snapshotSvc *app.TribeSnapshotService subscriber message.Subscriber logger watermill.LoggerAdapter marshaler watermillmsg.Marshaler eventServerSyncedTopic string eventVillagesSyncedTopic string + cmdCreateSnapshotsTopic string } func NewTribeWatermillConsumer( svc *app.TribeService, + snapshotSvc *app.TribeSnapshotService, subscriber message.Subscriber, logger watermill.LoggerAdapter, marshaler watermillmsg.Marshaler, eventServerSyncedTopic string, eventVillagesSyncedTopic string, + cmdCreateSnapshotsTopic string, ) *TribeWatermillConsumer { return &TribeWatermillConsumer{ svc: svc, + snapshotSvc: snapshotSvc, subscriber: subscriber, logger: logger, marshaler: marshaler, eventServerSyncedTopic: eventServerSyncedTopic, eventVillagesSyncedTopic: eventVillagesSyncedTopic, + cmdCreateSnapshotsTopic: cmdCreateSnapshotsTopic, } } @@ -48,6 +54,12 @@ func (c *TribeWatermillConsumer) Register(router *message.Router) { c.subscriber, c.updateDominance, ) + router.AddNoPublisherHandler( + "TribeConsumer.createSnapshots", + c.cmdCreateSnapshotsTopic, + c.subscriber, + c.createSnapshots, + ) } func (c *TribeWatermillConsumer) sync(msg *message.Message) error { @@ -99,3 +111,29 @@ func (c *TribeWatermillConsumer) updateDominance(msg *message.Message) error { return c.svc.UpdateDominance(msg.Context(), payload) } + +func (c *TribeWatermillConsumer) createSnapshots(msg *message.Message) error { + var rawPayload watermillmsg.CreateSnapshotsCmdPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewCreateSnapshotsCmdPayload( + rawPayload.ServerKey, + rawPayload.VersionCode, + rawPayload.VersionTimezone, + rawPayload.Date, + ) + if err != nil { + c.logger.Error("couldn't construct domain.CreateSnapshotsCmdPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.snapshotSvc.Create(msg.Context(), payload) +} diff --git a/internal/service/data_sync_test.go b/internal/service/data_sync_test.go index 43463b9..bfa394f 100644 --- a/internal/service/data_sync_test.go +++ b/internal/service/data_sync_test.go @@ -166,8 +166,18 @@ func TestDataSync(t *testing.T) { playerEventSynced, villageEventSynced, "", + "", + ), + port.NewTribeWatermillConsumer( + tribeSvc, + nil, + tribeSub, + nopLogger, + marshaler, + serverEventSynced, + villageEventSynced, + "", ), - port.NewTribeWatermillConsumer(tribeSvc, tribeSub, nopLogger, marshaler, serverEventSynced, villageEventSynced), port.NewPlayerWatermillConsumer(playerSvc, playerSub, nopLogger, marshaler, serverEventSynced), port.NewVillageWatermillConsumer(villageSvc, villageSub, nopLogger, marshaler, serverEventSynced), ) diff --git a/internal/service/ennoblement_sync_test.go b/internal/service/ennoblement_sync_test.go index 41a70a2..ce20348 100644 --- a/internal/service/ennoblement_sync_test.go +++ b/internal/service/ennoblement_sync_test.go @@ -147,6 +147,7 @@ func TestEnnoblementSync(t *testing.T) { "", "", ennoblementEventSynced, + "", ), port.NewEnnoblementWatermillConsumer(ennoblementSvc, ennoblementSub, nopLogger, marshaler, ennoblementCmdSync), )