feat: tribe snapshots (#44)

Reviewed-on: twhelp/corev3#44
This commit is contained in:
Dawid Wysokiński 2024-01-15 07:00:08 +00:00
parent 00cee5a181
commit 87b3308433
23 changed files with 1504 additions and 47 deletions

View File

@ -65,6 +65,7 @@ var cmdConsumer = &cli.Command{
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
c.String(rmqFlagTopicEnnoblementsSyncedEvent.Name),
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
)
consumer.Register(router)
@ -95,19 +96,33 @@ var cmdConsumer = &cli.Command{
marshaler,
c.String(rmqFlagTopicTribesSyncedEvent.Name),
)
tribeSnapshotPublisher := adapter.NewSnapshotWatermillPublisher(
publisher,
newWatermillMarshaler(),
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
c.String(rmqFlagTopicTribeSnapshotsCreatedEvent.Name),
)
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
tribeSvc := app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher)
tribeSnapshotSvc := app.NewTribeSnapshotService(
adapter.NewTribeSnapshotBunRepository(db),
tribeSvc,
tribeSnapshotPublisher,
)
consumer := port.NewTribeWatermillConsumer(
app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher),
tribeSvc,
tribeSnapshotSvc,
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
c.String(rmqFlagTopicCreateTribeSnapshotsCmd.Name),
)
consumer.Register(router)

View File

@ -163,6 +163,10 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery {
q = q.Set("ennoblement_data_updated_at = ?", ennoblementDataSyncedAt.Value)
}
if tribeSnapshotsCreatedAt := a.params.TribeSnapshotsCreatedAt(); tribeSnapshotsCreatedAt.Valid {
q = q.Set("tribe_snapshots_created_at = ?", tribeSnapshotsCreatedAt.Value)
}
return q
}

View File

@ -0,0 +1,104 @@
package adapter
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/bun/bunmodel"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/uptrace/bun"
)
type TribeSnapshotBunRepository struct {
db bun.IDB
}
func NewTribeSnapshotBunRepository(db bun.IDB) *TribeSnapshotBunRepository {
return &TribeSnapshotBunRepository{db: db}
}
func (repo *TribeSnapshotBunRepository) Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error {
if len(params) == 0 {
return nil
}
now := time.Now()
tribeSnapshots := make(bunmodel.TribeSnapshots, 0, len(params))
for _, p := range params {
tribeSnapshots = append(tribeSnapshots, bunmodel.TribeSnapshot{
TribeID: p.TribeID(),
ServerKey: p.ServerKey(),
NumMembers: p.NumMembers(),
NumVillages: p.NumVillages(),
Points: p.Points(),
AllPoints: p.AllPoints(),
Rank: p.Rank(),
Dominance: p.Dominance(),
Date: p.Date(),
CreatedAt: now,
OpponentsDefeated: bunmodel.NewOpponentsDefeated(p.OD()),
})
}
if _, err := repo.db.NewInsert().
Model(&tribeSnapshots).
Ignore().
Returning("").
Exec(ctx); err != nil {
return fmt.Errorf("something went wrong while inserting tribe snapshots into the db: %w", err)
}
return nil
}
func (repo *TribeSnapshotBunRepository) List(
ctx context.Context,
params domain.ListTribeSnapshotsParams,
) (domain.TribeSnapshots, error) {
var tribeSnapshots bunmodel.TribeSnapshots
if err := repo.db.NewSelect().
Model(&tribeSnapshots).
Apply(listTribeSnapshotsParamsApplier{params: params}.apply).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("couldn't select tribe snapshots from the db: %w", err)
}
return tribeSnapshots.ToDomain()
}
type listTribeSnapshotsParamsApplier struct {
params domain.ListTribeSnapshotsParams
}
//nolint:gocyclo
func (a listTribeSnapshotsParamsApplier) apply(q *bun.SelectQuery) *bun.SelectQuery {
if serverKeys := a.params.ServerKeys(); len(serverKeys) > 0 {
q = q.Where("ts.server_key IN (?)", bun.In(serverKeys))
}
for _, s := range a.params.Sort() {
switch s {
case domain.TribeSnapshotSortDateASC:
q = q.Order("ts.date ASC")
case domain.TribeSnapshotSortDateDESC:
q = q.Order("ts.date DESC")
case domain.TribeSnapshotSortIDASC:
q = q.Order("ts.id ASC")
case domain.TribeSnapshotSortIDDESC:
q = q.Order("ts.id DESC")
case domain.TribeSnapshotSortServerKeyASC:
q = q.Order("ts.server_key ASC")
case domain.TribeSnapshotSortServerKeyDESC:
q = q.Order("ts.server_key DESC")
default:
return q.Err(errors.New("unsupported sort value"))
}
}
return q.Limit(a.params.Limit()).Offset(a.params.Offset())
}

View File

@ -0,0 +1,29 @@
package adapter_test
import (
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/bun/buntest"
)
func TestTribeSnapshotBunRepository_Postgres(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping long-running test")
}
testTribeSnapshotRepository(t, func(t *testing.T) repositories {
t.Helper()
return newBunDBRepositories(t, postgres.NewDB(t))
})
}
func TestTribeSnapshotBunRepository_SQLite(t *testing.T) {
t.Parallel()
testTribeSnapshotRepository(t, func(t *testing.T) repositories {
t.Helper()
return newBunDBRepositories(t, buntest.NewSQLiteDB(t))
})
}

View File

@ -238,7 +238,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
},
},
{
name: fmt.Sprintf("OK: version code=[%s]", randServer.VersionCode()),
name: fmt.Sprintf("OK: versionCodes=[%s]", randServer.VersionCode()),
params: func(t *testing.T) domain.ListServersParams {
t.Helper()
params := domain.NewListServersParams()
@ -348,7 +348,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
},
},
{
name: "OK: player snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
name: "OK: playerSnapshotsCreatedAtLt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
params: func(t *testing.T) domain.ListServersParams {
t.Helper()
params := domain.NewListServersParams()
@ -375,7 +375,7 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
},
},
{
name: "OK: tribe snapshots created at lt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
name: "OK: tribeSnapshotsCreatedAtLt=" + snapshotsCreatedAtLT.Format(time.RFC3339),
params: func(t *testing.T) domain.ListServersParams {
t.Helper()
params := domain.NewListServersParams()
@ -508,6 +508,10 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
Value: time.Now(),
Valid: true,
}))
require.NoError(t, updateParams.SetTribeSnapshotsCreatedAt(domain.NullTime{
Value: time.Now(),
Valid: true,
}))
require.NoError(t, repos.server.Update(ctx, serversBeforeUpdate[0].Key(), updateParams))
@ -551,6 +555,12 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
serversAfterUpdate[0].EnnoblementDataSyncedAt(),
time.Minute,
)
assert.WithinDuration(
t,
updateParams.TribeSnapshotsCreatedAt().Value,
serversAfterUpdate[0].TribeSnapshotsCreatedAt(),
time.Minute,
)
})
t.Run("ERR: not found", func(t *testing.T) {

View File

@ -54,14 +54,20 @@ type tribeChangeRepository interface {
List(ctx context.Context, params domain.ListTribeChangesParams) (domain.TribeChanges, error)
}
type tribeSnapshotRepository interface {
Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error
List(ctx context.Context, params domain.ListTribeSnapshotsParams) (domain.TribeSnapshots, error)
}
type repositories struct {
version versionRepository
server serverRepository
tribe tribeRepository
player playerRepository
village villageRepository
ennoblement ennoblementRepository
tribeChange tribeChangeRepository
version versionRepository
server serverRepository
tribe tribeRepository
player playerRepository
village villageRepository
ennoblement ennoblementRepository
tribeChange tribeChangeRepository
tribeSnapshot tribeSnapshotRepository
}
func newBunDBRepositories(tb testing.TB, bunDB *bun.DB) repositories {
@ -70,12 +76,13 @@ func newBunDBRepositories(tb testing.TB, bunDB *bun.DB) repositories {
buntest.NewFixture(bunDB).Load(tb, context.Background(), os.DirFS("testdata"), "fixture.yml")
return repositories{
version: adapter.NewVersionBunRepository(bunDB),
server: adapter.NewServerBunRepository(bunDB),
tribe: adapter.NewTribeBunRepository(bunDB),
player: adapter.NewPlayerBunRepository(bunDB),
village: adapter.NewVillageBunRepository(bunDB),
ennoblement: adapter.NewEnnoblementBunRepository(bunDB),
tribeChange: adapter.NewTribeChangeBunRepository(bunDB),
version: adapter.NewVersionBunRepository(bunDB),
server: adapter.NewServerBunRepository(bunDB),
tribe: adapter.NewTribeBunRepository(bunDB),
player: adapter.NewPlayerBunRepository(bunDB),
village: adapter.NewVillageBunRepository(bunDB),
ennoblement: adapter.NewEnnoblementBunRepository(bunDB),
tribeChange: adapter.NewTribeChangeBunRepository(bunDB),
tribeSnapshot: adapter.NewTribeSnapshotBunRepository(bunDB),
}
}

View File

@ -0,0 +1,332 @@
package adapter_test
import (
"cmp"
"context"
"fmt"
"slices"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func testTribeSnapshotRepository(t *testing.T, newRepos func(t *testing.T) repositories) {
t.Helper()
ctx := context.Background()
t.Run("Create", func(t *testing.T) {
t.Parallel()
const dateFormat = "2006-01-02"
repos := newRepos(t)
assertCreated := func(t *testing.T, params []domain.CreateTribeSnapshotParams) {
t.Helper()
require.NotEmpty(t, params)
tribeSnapshots, err := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams())
require.NoError(t, err)
for i, p := range params {
date := p.Date().Format(dateFormat)
idx := slices.IndexFunc(tribeSnapshots, func(ts domain.TribeSnapshot) bool {
return ts.ServerKey() == p.ServerKey() &&
ts.TribeID() == p.TribeID() &&
ts.Date().Format(dateFormat) == date
})
require.GreaterOrEqualf(t, idx, 0, "params[%d] not found", i)
tribeSnapshot := tribeSnapshots[idx]
assert.Equalf(t, p.TribeID(), tribeSnapshot.TribeID(), "params[%d]", i)
assert.Equalf(t, p.ServerKey(), tribeSnapshot.ServerKey(), "params[%d]", i)
assert.Equalf(t, p.NumMembers(), tribeSnapshot.NumMembers(), "params[%d]", i)
assert.Equalf(t, p.NumVillages(), tribeSnapshot.NumVillages(), "params[%d]", i)
assert.Equalf(t, p.Points(), tribeSnapshot.Points(), "params[%d]", i)
assert.Equalf(t, p.AllPoints(), tribeSnapshot.AllPoints(), "params[%d]", i)
assert.Equalf(t, p.Rank(), tribeSnapshot.Rank(), "params[%d]", i)
assert.Equalf(t, p.OD(), tribeSnapshot.OD(), "params[%d]", i)
assert.InDeltaf(t, p.Dominance(), tribeSnapshot.Dominance(), 0.001, "params[%d]", i)
assert.Equalf(t, date, tribeSnapshot.Date().Format(dateFormat), "params[%d]", i)
assert.WithinDurationf(t, time.Now(), tribeSnapshot.CreatedAt(), time.Minute, "params[%d]", i)
}
}
assertNoDuplicates := func(t *testing.T, params []domain.CreateTribeSnapshotParams) {
t.Helper()
require.NotEmpty(t, params)
tribeSnapshots, err := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams())
require.NoError(t, err)
res := make(map[string][]int)
for _, p := range params {
key := fmt.Sprintf("%s-%d-%s", p.ServerKey(), p.TribeID(), p.Date().Format(dateFormat))
for i, ts := range tribeSnapshots {
if ts.ServerKey() == p.ServerKey() && ts.TribeID() == p.TribeID() && ts.Date().Equal(p.Date()) {
res[key] = append(res[key], i)
}
}
}
for key, indexes := range res {
assert.Len(t, indexes, 1, key)
}
}
t.Run("OK", func(t *testing.T) {
t.Parallel()
listTribesParams := domain.NewListTribesParams()
require.NoError(t, listTribesParams.SetDeleted(domain.NullBool{
Value: false,
Valid: true,
}))
tribes, err := repos.tribe.List(ctx, listTribesParams)
require.NoError(t, err)
require.NotEmpty(t, tribes)
date := time.Now()
createParams, err := domain.NewCreateTribeSnapshotParams(tribes, date)
require.NoError(t, err)
require.NoError(t, repos.tribeSnapshot.Create(ctx, createParams...))
assertCreated(t, createParams)
require.NoError(t, repos.tribeSnapshot.Create(ctx, createParams...))
assertNoDuplicates(t, createParams)
})
t.Run("OK: len(params) == 0", func(t *testing.T) {
t.Parallel()
require.NoError(t, repos.tribeSnapshot.Create(ctx))
})
})
t.Run("List & ListCount", func(t *testing.T) {
t.Parallel()
repos := newRepos(t)
tribeSnapshots, listTribeSnapshotsErr := repos.tribeSnapshot.List(ctx, domain.NewListTribeSnapshotsParams())
require.NoError(t, listTribeSnapshotsErr)
require.NotEmpty(t, tribeSnapshots)
randTribeSnapshot := tribeSnapshots[0]
tests := []struct {
name string
params func(t *testing.T) domain.ListTribeSnapshotsParams
assertTribeSnapshots func(t *testing.T, params domain.ListTribeSnapshotsParams, tribeSnapshots domain.TribeSnapshots)
assertError func(t *testing.T, err error)
assertTotal func(t *testing.T, params domain.ListTribeSnapshotsParams, total int)
}{
{
name: "OK: default params",
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
return domain.NewListTribeSnapshotsParams()
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
assert.NotEmpty(t, len(tribeSnapshots))
assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int {
if x := cmp.Compare(a.ServerKey(), b.ServerKey()); x != 0 {
return x
}
if x := a.Date().Compare(b.Date()); x != 0 {
return x
}
return cmp.Compare(a.ID(), b.ID())
}))
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
{
name: "OK: sort=[serverKey DESC, date DESC]",
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
params := domain.NewListTribeSnapshotsParams()
require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{
domain.TribeSnapshotSortServerKeyDESC,
domain.TribeSnapshotSortDateDESC,
}))
return params
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
assert.NotEmpty(t, len(tribeSnapshots))
assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int {
if x := cmp.Compare(a.ServerKey(), b.ServerKey()) * -1; x != 0 {
return x
}
return a.Date().Compare(b.Date()) * -1
}))
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
{
name: "OK: sort=[id ASC]",
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
params := domain.NewListTribeSnapshotsParams()
require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{
domain.TribeSnapshotSortIDASC,
}))
return params
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
assert.NotEmpty(t, len(tribeSnapshots))
assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int {
return cmp.Compare(a.ID(), b.ID())
}))
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
{
name: "OK: sort=[id DESC]",
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
params := domain.NewListTribeSnapshotsParams()
require.NoError(t, params.SetSort([]domain.TribeSnapshotSort{
domain.TribeSnapshotSortIDDESC,
}))
return params
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
assert.NotEmpty(t, len(tribeSnapshots))
assert.True(t, slices.IsSortedFunc(tribeSnapshots, func(a, b domain.TribeSnapshot) int {
return cmp.Compare(a.ID(), b.ID()) * -1
}))
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
{
name: fmt.Sprintf("OK: serverKeys=[%s]", randTribeSnapshot.ServerKey()),
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
params := domain.NewListTribeSnapshotsParams()
require.NoError(t, params.SetServerKeys([]string{randTribeSnapshot.ServerKey()}))
return params
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
serverKeys := params.ServerKeys()
for _, ts := range tribeSnapshots {
assert.True(t, slices.Contains(serverKeys, ts.ServerKey()))
}
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
{
name: "OK: offset=1 limit=2",
params: func(t *testing.T) domain.ListTribeSnapshotsParams {
t.Helper()
params := domain.NewListTribeSnapshotsParams()
require.NoError(t, params.SetOffset(1))
require.NoError(t, params.SetLimit(2))
return params
},
assertTribeSnapshots: func(
t *testing.T,
params domain.ListTribeSnapshotsParams,
tribeSnapshots domain.TribeSnapshots,
) {
t.Helper()
assert.Len(t, tribeSnapshots, params.Limit())
},
assertError: func(t *testing.T, err error) {
t.Helper()
require.NoError(t, err)
},
assertTotal: func(t *testing.T, params domain.ListTribeSnapshotsParams, total int) {
t.Helper()
assert.NotEmpty(t, total)
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
params := tt.params(t)
res, err := repos.tribeSnapshot.List(ctx, params)
tt.assertError(t, err)
tt.assertTribeSnapshots(t, params, res)
})
}
})
}

View File

@ -7374,3 +7374,125 @@
new_tribe_id: 2
server_key: pl169
created_at: 2021-09-10T20:01:11.000Z
- model: TribeSnapshot
rows:
- rank_att: 1
score_att: 669292167
rank_def: 5
score_def: 199124128
rank_sup: 0
score_sup: 0
rank_total: 2
score_total: 868416295
_id: pl169-csa-2021-09-02
id: 10000
tribe_id: 28
server_key: pl169
num_members: 66
num_villages: 14480
points: 114277979
all_points: 143350058
rank: 1
dominance: 29.76912481240106
date: 2021-09-02T00:00:00.000Z
created_at: 2021-09-02T23:01:13.000Z
- rank_att: 1
score_att: 669292167
rank_def: 5
score_def: 199124128
rank_sup: 0
score_sup: 0
rank_total: 2
score_total: 868416295
_id: pl169-csa-2021-09-03
id: 10001
tribe_id: 28
server_key: pl169
num_members: 66
num_villages: 14480
points: 114277979
all_points: 143350058
rank: 1
dominance: 29.76912481240106
date: 2021-09-03T00:00:00.000Z
created_at: 2021-09-03T23:01:13.000Z
- rank_att: 2
score_att: 513902873
rank_def: 6
score_def: 177660231
rank_sup: 0
score_sup: 0
rank_total: 3
score_total: 691563104
_id: pl169-kuzyni-2021-09-02
id: 10050
tribe_id: 27
server_key: pl169
num_members: 52
num_villages: 10152
points: 91951132
all_points: 97812820
rank: 3
dominance: 20.87128142924693
date: 2021-09-02T00:00:00.000Z
created_at: 2021-09-02T22:00:37.000Z
- rank_att: 2
score_att: 513902873
rank_def: 6
score_def: 177660231
rank_sup: 0
score_sup: 0
rank_total: 3
score_total: 691563104
_id: pl169-kuzyni-2021-09-03
id: 10051
tribe_id: 27
server_key: pl169
num_members: 52
num_villages: 10152
points: 91951132
all_points: 97812820
rank: 3
dominance: 20.87128142924693
date: 2021-09-03T00:00:00.000Z
created_at: 2021-09-03T22:00:37.000Z
- rank_att: 1
score_att: 1449031872
rank_def: 1
score_def: 993299748
rank_sup: 0
score_sup: 0
rank_total: 1
score_total: 2442331620
_id: en113-kekw-2020-06-22
id: 10100
tribe_id: 122
server_key: en113
num_members: 20
num_villages: 22568
points: 266237628
all_points: 266237628
rank: 1
dominance: 54.27478896611433
date: 2020-06-22T00:00:00.000Z
created_at: 2020-06-22T13:45:46.000Z
- rank_att: 1
score_att: 1449031872
rank_def: 1
score_def: 993299748
rank_sup: 0
score_sup: 0
rank_total: 1
score_total: 2442331620
_id: en113-kekw-2020-06-23
id: 10101
tribe_id: 122
server_key: en113
num_members: 20
num_villages: 22568
points: 266237628
all_points: 266237628
rank: 1
dominance: 54.27478896611433
date: 2020-06-23T00:00:00.000Z
created_at: 2020-06-23T13:45:46.000Z

View File

@ -50,7 +50,7 @@ func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.S
return fmt.Errorf("%s: couldn't delete players: %w", serverKey, err)
}
playersSyncedPayload, err := domain.NewPlayersSyncedEventPayload(
payload, err := domain.NewPlayersSyncedEventPayload(
serverKey,
serverURL,
serverSyncedPayload.VersionCode(),
@ -60,7 +60,7 @@ func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.S
return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventSynced(ctx, playersSyncedPayload); err != nil {
if err = svc.pub.EventSynced(ctx, payload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}

View File

@ -272,3 +272,20 @@ func (svc *ServerService) UpdateEnnoblementDataSyncedAt(
return svc.repo.Update(ctx, key, updateParams)
}
func (svc *ServerService) UpdateTribeSnapshotsCreatedAt(
ctx context.Context,
payload domain.SnapshotsCreatedEventPayload,
) error {
key := payload.ServerKey()
var updateParams domain.UpdateServerParams
if err := updateParams.SetTribeSnapshotsCreatedAt(domain.NullTime{
Value: time.Now(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
return svc.repo.Update(ctx, key, updateParams)
}

View File

@ -44,7 +44,7 @@ func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.Se
return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err)
}
tribesSyncedPayload, err := domain.NewTribesSyncedEventPayload(
payload, err := domain.NewTribesSyncedEventPayload(
serverKey,
serverURL,
serverSyncedPayload.VersionCode(),
@ -54,7 +54,7 @@ func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.Se
return fmt.Errorf("%s: couldn't construct domain.TribesSyncedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventSynced(ctx, tribesSyncedPayload); err != nil {
if err = svc.pub.EventSynced(ctx, payload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
@ -157,3 +157,7 @@ func (svc *TribeService) delete(ctx context.Context, serverKey string, tribes do
func (svc *TribeService) UpdateDominance(ctx context.Context, payload domain.VillagesSyncedEventPayload) error {
return svc.repo.UpdateDominance(ctx, payload.ServerKey(), payload.NumPlayerVillages())
}
func (svc *TribeService) List(ctx context.Context, params domain.ListTribesParams) (domain.Tribes, error) {
return svc.repo.List(ctx, params)
}

View File

@ -8,9 +8,7 @@ import (
type TribeChangeRepository interface {
// Create persists tribe changes in a store (e.g. Postgres).
// If there is a similar tribe change, such changes are ignored.
// Similar means that there is a tribe change with the same player id,
// new tribe id and old tribe id and this tribe change was created within the same hour.
// Duplicates are ignored.
Create(ctx context.Context, params ...domain.CreateTribeChangeParams) error
}

View File

@ -0,0 +1,92 @@
package app
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
)
type TribeSnapshotRepository interface {
// Create persists tribe snapshots in a store (e.g. Postgres).
// Duplicates are ignored.
Create(ctx context.Context, params ...domain.CreateTribeSnapshotParams) error
}
type TribeSnapshotService struct {
repo TribeSnapshotRepository
tribeSvc *TribeService
pub SnapshotPublisher
}
func NewTribeSnapshotService(
repo TribeSnapshotRepository,
tribeSvc *TribeService,
pub SnapshotPublisher,
) *TribeSnapshotService {
return &TribeSnapshotService{repo: repo, tribeSvc: tribeSvc, pub: pub}
}
//nolint:gocyclo
func (svc *TribeSnapshotService) Create(
ctx context.Context,
createSnapshotsCmdPayload domain.CreateSnapshotsCmdPayload,
) error {
serverKey := createSnapshotsCmdPayload.ServerKey()
date := createSnapshotsCmdPayload.Date()
listTribesParams := domain.NewListTribesParams()
if err := listTribesParams.SetServerKeys([]string{serverKey}); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if err := listTribesParams.SetDeleted(domain.NullBool{
Value: false,
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if err := listTribesParams.SetSort([]domain.TribeSort{domain.TribeSortIDASC}); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if err := listTribesParams.SetLimit(domain.TribeListMaxLimit); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
for {
tribes, err := svc.tribeSvc.List(ctx, listTribesParams)
if err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if len(tribes) == 0 {
break
}
params, err := domain.NewCreateTribeSnapshotParams(tribes, date)
if err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if err = svc.repo.Create(ctx, params...); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
if err = listTribesParams.SetIDGT(domain.NullInt{
Value: tribes[len(tribes)-1].ID(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
}
payload, err := domain.NewSnapshotsCreatedEventPayload(serverKey, createSnapshotsCmdPayload.VersionCode())
if err != nil {
return fmt.Errorf("%s: couldn't construct domain.SnapshotsCreatedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventCreated(ctx, payload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
return nil
}

View File

@ -0,0 +1,79 @@
package bunmodel
import (
"fmt"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/uptrace/bun"
)
type TribeSnapshot struct {
bun.BaseModel `bun:"table:tribe_snapshots,alias:ts"`
ID int `bun:"id,pk,autoincrement,identity"`
TribeID int `bun:"tribe_id,nullzero"`
ServerKey string `bun:"server_key,nullzero"`
NumMembers int `bun:"num_members"`
NumVillages int `bun:"num_villages"`
Points int `bun:"points"`
AllPoints int `bun:"all_points"`
Rank int `bun:"rank"`
Dominance float64 `bun:"dominance"`
Date time.Time `bun:"date,nullzero"`
CreatedAt time.Time `bun:"created_at,nullzero"`
OpponentsDefeated
}
func (ts TribeSnapshot) ToDomain() (domain.TribeSnapshot, error) {
od, err := ts.OpponentsDefeated.ToDomain()
if err != nil {
return domain.TribeSnapshot{}, fmt.Errorf(
"couldn't construct domain.TribeSnapshot (id=%d): %w",
ts.ID,
err,
)
}
converted, err := domain.UnmarshalTribeSnapshotFromDatabase(
ts.ID,
ts.TribeID,
ts.ServerKey,
ts.NumMembers,
ts.NumVillages,
ts.Points,
ts.AllPoints,
ts.Rank,
od,
ts.Dominance,
ts.Date,
ts.CreatedAt,
)
if err != nil {
return domain.TribeSnapshot{}, fmt.Errorf(
"couldn't construct domain.TribeSnapshot (id=%d): %w",
ts.ID,
err,
)
}
return converted, nil
}
type TribeSnapshots []TribeSnapshot
func (tss TribeSnapshots) ToDomain() (domain.TribeSnapshots, error) {
res := make(domain.TribeSnapshots, 0, len(tss))
for _, ts := range tss {
converted, err := ts.ToDomain()
if err != nil {
return nil, err
}
res = append(res, converted)
}
return res, nil
}

View File

@ -23,6 +23,7 @@ func NewFixture(bunDB *bun.DB) *Fixture {
(*bunmodel.Village)(nil),
(*bunmodel.Ennoblement)(nil),
(*bunmodel.TribeChange)(nil),
(*bunmodel.TribeSnapshot)(nil),
)
return &Fixture{
f: dbfixture.New(bunDB),

View File

@ -287,6 +287,7 @@ type UpdateServerParams struct {
numBonusVillages NullInt
villageDataSyncedAt NullTime
ennoblementDataSyncedAt NullTime
tribeSnapshotsCreatedAt NullTime
}
const updateServerParamsModelName = "UpdateServerParams"
@ -419,6 +420,16 @@ func (params *UpdateServerParams) SetEnnoblementDataSyncedAt(ennoblementDataSync
return nil
}
func (params *UpdateServerParams) TribeSnapshotsCreatedAt() NullTime {
return params.tribeSnapshotsCreatedAt
}
func (params *UpdateServerParams) SetTribeSnapshotsCreatedAt(tribeSnapshotsCreatedAt NullTime) error {
params.tribeSnapshotsCreatedAt = tribeSnapshotsCreatedAt
return nil
}
//nolint:gocyclo
func (params *UpdateServerParams) IsZero() bool {
return !params.config.Valid &&
!params.buildingInfo.Valid &&
@ -429,7 +440,8 @@ func (params *UpdateServerParams) IsZero() bool {
!params.playerDataSyncedAt.Valid &&
!params.numVillages.Valid &&
!params.villageDataSyncedAt.Valid &&
!params.ennoblementDataSyncedAt.Valid
!params.ennoblementDataSyncedAt.Valid &&
!params.tribeSnapshotsCreatedAt.Valid
}
type ServerSort uint8

View File

@ -215,6 +215,10 @@ func (t Tribe) Base() BaseTribe {
}
}
func (t Tribe) IsZero() bool {
return t == Tribe{}
}
type Tribes []Tribe
// Delete finds all tribes with the given serverKey that are not in the given slice with active tribes

View File

@ -0,0 +1,307 @@
package domain
import (
"fmt"
"math"
"time"
)
type TribeSnapshot struct {
id int
tribeID int
serverKey string
numMembers int
numVillages int
points int
allPoints int
rank int
od OpponentsDefeated
dominance float64
date time.Time
createdAt time.Time
}
const tribeSnapshotModelName = "TribeSnapshot"
// UnmarshalTribeSnapshotFromDatabase unmarshals TribeSnapshot from the database.
//
// It should be used only for unmarshalling from the database!
// You can't use UnmarshalTribeSnapshotFromDatabase as constructor - It may put domain into the invalid state!
func UnmarshalTribeSnapshotFromDatabase(
id int,
tribeID int,
serverKey string,
numMembers int,
numVillages int,
points int,
allPoints int,
rank int,
od OpponentsDefeated,
dominance float64,
date time.Time,
createdAt time.Time,
) (TribeSnapshot, error) {
if err := validateIntInRange(id, 1, math.MaxInt); err != nil {
return TribeSnapshot{}, ValidationError{
Model: tribeSnapshotModelName,
Field: "id",
Err: err,
}
}
if err := validateServerKey(serverKey); err != nil {
return TribeSnapshot{}, ValidationError{
Model: tribeSnapshotModelName,
Field: "serverKey",
Err: err,
}
}
return TribeSnapshot{
id: id,
tribeID: tribeID,
serverKey: serverKey,
numMembers: numMembers,
numVillages: numVillages,
points: points,
allPoints: allPoints,
rank: rank,
od: od,
dominance: dominance,
date: date,
createdAt: createdAt,
}, nil
}
func (ts TribeSnapshot) ID() int {
return ts.id
}
func (ts TribeSnapshot) TribeID() int {
return ts.tribeID
}
func (ts TribeSnapshot) ServerKey() string {
return ts.serverKey
}
func (ts TribeSnapshot) NumMembers() int {
return ts.numMembers
}
func (ts TribeSnapshot) NumVillages() int {
return ts.numVillages
}
func (ts TribeSnapshot) Points() int {
return ts.points
}
func (ts TribeSnapshot) AllPoints() int {
return ts.allPoints
}
func (ts TribeSnapshot) Rank() int {
return ts.rank
}
func (ts TribeSnapshot) OD() OpponentsDefeated {
return ts.od
}
func (ts TribeSnapshot) Dominance() float64 {
return ts.dominance
}
func (ts TribeSnapshot) Date() time.Time {
return ts.date
}
func (ts TribeSnapshot) CreatedAt() time.Time {
return ts.createdAt
}
type TribeSnapshots []TribeSnapshot
type CreateTribeSnapshotParams struct {
tribeID int
serverKey string
numMembers int
numVillages int
points int
allPoints int
rank int
od OpponentsDefeated
dominance float64
date time.Time
}
func NewCreateTribeSnapshotParams(tribes Tribes, date time.Time) ([]CreateTribeSnapshotParams, error) {
params := make([]CreateTribeSnapshotParams, 0, len(tribes))
for i, t := range tribes {
if t.IsZero() {
return nil, fmt.Errorf("tribes[%d] is an empty struct", i)
}
if t.IsDeleted() {
continue
}
params = append(params, CreateTribeSnapshotParams{
tribeID: t.ID(),
serverKey: t.ServerKey(),
numMembers: t.NumMembers(),
numVillages: t.NumVillages(),
points: t.Points(),
allPoints: t.AllPoints(),
rank: t.Rank(),
od: t.OD(),
dominance: t.Dominance(),
date: date,
})
}
return params, nil
}
func (params CreateTribeSnapshotParams) TribeID() int {
return params.tribeID
}
func (params CreateTribeSnapshotParams) ServerKey() string {
return params.serverKey
}
func (params CreateTribeSnapshotParams) NumMembers() int {
return params.numMembers
}
func (params CreateTribeSnapshotParams) NumVillages() int {
return params.numVillages
}
func (params CreateTribeSnapshotParams) Points() int {
return params.points
}
func (params CreateTribeSnapshotParams) AllPoints() int {
return params.allPoints
}
func (params CreateTribeSnapshotParams) Rank() int {
return params.rank
}
func (params CreateTribeSnapshotParams) OD() OpponentsDefeated {
return params.od
}
func (params CreateTribeSnapshotParams) Dominance() float64 {
return params.dominance
}
func (params CreateTribeSnapshotParams) Date() time.Time {
return params.date
}
type TribeSnapshotSort uint8
const (
TribeSnapshotSortDateASC TribeSnapshotSort = iota + 1
TribeSnapshotSortDateDESC
TribeSnapshotSortIDASC
TribeSnapshotSortIDDESC
TribeSnapshotSortServerKeyASC
TribeSnapshotSortServerKeyDESC
)
const TribeSnapshotListMaxLimit = 200
type ListTribeSnapshotsParams struct {
serverKeys []string
sort []TribeSnapshotSort
limit int
offset int
}
const listTribeSnapshotsParamsModelName = "ListTribeSnapshotsParams"
func NewListTribeSnapshotsParams() ListTribeSnapshotsParams {
return ListTribeSnapshotsParams{
sort: []TribeSnapshotSort{
TribeSnapshotSortServerKeyASC,
TribeSnapshotSortDateASC,
TribeSnapshotSortIDASC,
},
limit: TribeSnapshotListMaxLimit,
}
}
func (params *ListTribeSnapshotsParams) ServerKeys() []string {
return params.serverKeys
}
func (params *ListTribeSnapshotsParams) SetServerKeys(serverKeys []string) error {
params.serverKeys = serverKeys
return nil
}
func (params *ListTribeSnapshotsParams) Sort() []TribeSnapshotSort {
return params.sort
}
const (
tribeSnapshotSortMinLength = 1
tribeSnapshotSortMaxLength = 3
)
func (params *ListTribeSnapshotsParams) SetSort(sort []TribeSnapshotSort) error {
if err := validateSliceLen(sort, tribeSnapshotSortMinLength, tribeSnapshotSortMaxLength); err != nil {
return ValidationError{
Model: listTribeSnapshotsParamsModelName,
Field: "sort",
Err: err,
}
}
params.sort = sort
return nil
}
func (params *ListTribeSnapshotsParams) Limit() int {
return params.limit
}
func (params *ListTribeSnapshotsParams) SetLimit(limit int) error {
if err := validateIntInRange(limit, 1, TribeSnapshotListMaxLimit); err != nil {
return ValidationError{
Model: listTribeSnapshotsParamsModelName,
Field: "limit",
Err: err,
}
}
params.limit = limit
return nil
}
func (params *ListTribeSnapshotsParams) Offset() int {
return params.offset
}
func (params *ListTribeSnapshotsParams) SetOffset(offset int) error {
if err := validateIntInRange(offset, 0, math.MaxInt); err != nil {
return ValidationError{
Model: listTribeSnapshotsParamsModelName,
Field: "offset",
Err: err,
}
}
params.offset = offset
return nil
}

View File

@ -0,0 +1,238 @@
package domain_test
import (
"fmt"
"slices"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewCreateTribeSnapshotParams(t *testing.T) {
t.Parallel()
tribes := domain.Tribes{
domaintest.NewTribe(t),
domaintest.NewTribe(t),
domaintest.NewTribe(t),
}
date := time.Now()
res, err := domain.NewCreateTribeSnapshotParams(tribes, date)
require.NoError(t, err)
assert.Len(t, res, len(tribes))
for i, tr := range tribes {
idx := slices.IndexFunc(res, func(params domain.CreateTribeSnapshotParams) bool {
return params.TribeID() == tr.ID() && params.ServerKey() == tr.ServerKey()
})
require.GreaterOrEqualf(t, idx, 0, "tribes[%d] not found", i)
params := res[i]
assert.Equalf(t, tr.ID(), params.TribeID(), "tribes[%d]", i)
assert.Equalf(t, tr.ServerKey(), params.ServerKey(), "tribes[%d]", i)
assert.Equalf(t, tr.NumMembers(), params.NumMembers(), "tribes[%d]", i)
assert.Equalf(t, tr.NumVillages(), params.NumVillages(), "tribes[%d]", i)
assert.Equalf(t, tr.Points(), params.Points(), "tribes[%d]", i)
assert.Equalf(t, tr.AllPoints(), params.AllPoints(), "tribes[%d]", i)
assert.Equalf(t, tr.Rank(), params.Rank(), "tribes[%d]", i)
assert.Equalf(t, tr.OD(), params.OD(), "tribes[%d]", i)
assert.InDeltaf(t, tr.Dominance(), params.Dominance(), 0.001, "tribes[%d]", i)
assert.Equalf(t, date, params.Date(), "tribes[%d]", i)
}
}
func TestListTribeSnapshotsParams_SetSort(t *testing.T) {
t.Parallel()
type args struct {
sort []domain.TribeSnapshotSort
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
sort: []domain.TribeSnapshotSort{
domain.TribeSnapshotSortDateASC,
domain.TribeSnapshotSortServerKeyASC,
},
},
},
{
name: "ERR: len(sort) < 1",
args: args{
sort: nil,
},
expectedErr: domain.ValidationError{
Model: "ListTribeSnapshotsParams",
Field: "sort",
Err: domain.LenOutOfRangeError{
Min: 1,
Max: 3,
Current: 0,
},
},
},
{
name: "ERR: len(sort) > 3",
args: args{
sort: []domain.TribeSnapshotSort{
domain.TribeSnapshotSortDateASC,
domain.TribeSnapshotSortServerKeyASC,
domain.TribeSnapshotSortIDASC,
domain.TribeSnapshotSortIDDESC,
},
},
expectedErr: domain.ValidationError{
Model: "ListTribeSnapshotsParams",
Field: "sort",
Err: domain.LenOutOfRangeError{
Min: 1,
Max: 3,
Current: 4,
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
params := domain.NewListTribeSnapshotsParams()
require.ErrorIs(t, params.SetSort(tt.args.sort), tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.sort, params.Sort())
})
}
}
func TestListTribeSnapshotsParams_SetLimit(t *testing.T) {
t.Parallel()
type args struct {
limit int
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
limit: domain.TribeSnapshotListMaxLimit,
},
},
{
name: "ERR: limit < 1",
args: args{
limit: 0,
},
expectedErr: domain.ValidationError{
Model: "ListTribeSnapshotsParams",
Field: "limit",
Err: domain.MinGreaterEqualError{
Min: 1,
Current: 0,
},
},
},
{
name: fmt.Sprintf("ERR: limit > %d", domain.TribeSnapshotListMaxLimit),
args: args{
limit: domain.TribeSnapshotListMaxLimit + 1,
},
expectedErr: domain.ValidationError{
Model: "ListTribeSnapshotsParams",
Field: "limit",
Err: domain.MaxLessEqualError{
Max: domain.TribeSnapshotListMaxLimit,
Current: domain.TribeSnapshotListMaxLimit + 1,
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
params := domain.NewListTribeSnapshotsParams()
require.ErrorIs(t, params.SetLimit(tt.args.limit), tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.limit, params.Limit())
})
}
}
func TestListTribeSnapshotsParams_SetOffset(t *testing.T) {
t.Parallel()
type args struct {
offset int
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
offset: 100,
},
},
{
name: "ERR: offset < 0",
args: args{
offset: -1,
},
expectedErr: domain.ValidationError{
Model: "ListTribeSnapshotsParams",
Field: "offset",
Err: domain.MinGreaterEqualError{
Min: 0,
Current: -1,
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
params := domain.NewListTribeSnapshotsParams()
require.ErrorIs(t, params.SetOffset(tt.args.offset), tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.offset, params.Offset())
})
}
}

View File

@ -9,16 +9,17 @@ import (
)
type ServerWatermillConsumer struct {
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
eventServerSyncedTopic string
eventTribesSyncedTopic string
eventPlayersSyncedTopic string
eventVillagesSyncedTopic string
eventEnnoblementsSyncedTopic string
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
eventServerSyncedTopic string
eventTribesSyncedTopic string
eventPlayersSyncedTopic string
eventVillagesSyncedTopic string
eventEnnoblementsSyncedTopic string
eventTribeSnapshotsCreatedTopic string
}
func NewServerWatermillConsumer(
@ -32,18 +33,20 @@ func NewServerWatermillConsumer(
eventPlayersSyncedTopic string,
eventVillagesSyncedTopic string,
eventEnnoblementsSyncedTopic string,
eventTribeSnapshotsCreatedTopic string,
) *ServerWatermillConsumer {
return &ServerWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic,
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic,
eventTribeSnapshotsCreatedTopic: eventTribeSnapshotsCreatedTopic,
}
}
@ -79,6 +82,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) {
c.subscriber,
c.updateEnnoblementDataSyncedAt,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateTribeSnapshotsCreatedAt",
c.eventTribeSnapshotsCreatedTopic,
c.subscriber,
c.updateTribeSnapshotsCreatedAt,
)
}
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
@ -228,3 +237,27 @@ func (c *ServerWatermillConsumer) updateEnnoblementDataSyncedAt(msg *message.Mes
return c.svc.UpdateEnnoblementDataSyncedAt(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateTribeSnapshotsCreatedAt(msg *message.Message) error {
var rawPayload watermillmsg.SnapshotsCreatedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewSnapshotsCreatedEventPayload(
rawPayload.ServerKey,
rawPayload.VersionCode,
)
if err != nil {
c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateTribeSnapshotsCreatedAt(msg.Context(), payload)
}

View File

@ -10,28 +10,34 @@ import (
type TribeWatermillConsumer struct {
svc *app.TribeService
snapshotSvc *app.TribeSnapshotService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
eventServerSyncedTopic string
eventVillagesSyncedTopic string
cmdCreateSnapshotsTopic string
}
func NewTribeWatermillConsumer(
svc *app.TribeService,
snapshotSvc *app.TribeSnapshotService,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
marshaler watermillmsg.Marshaler,
eventServerSyncedTopic string,
eventVillagesSyncedTopic string,
cmdCreateSnapshotsTopic string,
) *TribeWatermillConsumer {
return &TribeWatermillConsumer{
svc: svc,
snapshotSvc: snapshotSvc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
eventServerSyncedTopic: eventServerSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
cmdCreateSnapshotsTopic: cmdCreateSnapshotsTopic,
}
}
@ -48,6 +54,12 @@ func (c *TribeWatermillConsumer) Register(router *message.Router) {
c.subscriber,
c.updateDominance,
)
router.AddNoPublisherHandler(
"TribeConsumer.createSnapshots",
c.cmdCreateSnapshotsTopic,
c.subscriber,
c.createSnapshots,
)
}
func (c *TribeWatermillConsumer) sync(msg *message.Message) error {
@ -99,3 +111,29 @@ func (c *TribeWatermillConsumer) updateDominance(msg *message.Message) error {
return c.svc.UpdateDominance(msg.Context(), payload)
}
func (c *TribeWatermillConsumer) createSnapshots(msg *message.Message) error {
var rawPayload watermillmsg.CreateSnapshotsCmdPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewCreateSnapshotsCmdPayload(
rawPayload.ServerKey,
rawPayload.VersionCode,
rawPayload.VersionTimezone,
rawPayload.Date,
)
if err != nil {
c.logger.Error("couldn't construct domain.CreateSnapshotsCmdPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.snapshotSvc.Create(msg.Context(), payload)
}

View File

@ -166,8 +166,18 @@ func TestDataSync(t *testing.T) {
playerEventSynced,
villageEventSynced,
"",
"",
),
port.NewTribeWatermillConsumer(
tribeSvc,
nil,
tribeSub,
nopLogger,
marshaler,
serverEventSynced,
villageEventSynced,
"",
),
port.NewTribeWatermillConsumer(tribeSvc, tribeSub, nopLogger, marshaler, serverEventSynced, villageEventSynced),
port.NewPlayerWatermillConsumer(playerSvc, playerSub, nopLogger, marshaler, serverEventSynced),
port.NewVillageWatermillConsumer(villageSvc, villageSub, nopLogger, marshaler, serverEventSynced),
)

View File

@ -147,6 +147,7 @@ func TestEnnoblementSync(t *testing.T) {
"",
"",
ennoblementEventSynced,
"",
),
port.NewEnnoblementWatermillConsumer(ennoblementSvc, ennoblementSub, nopLogger, marshaler, ennoblementCmdSync),
)