feat: server - update NumTribes & TribeDataSyncedAt (#18)
Reviewed-on: twhelp/corev3#18
This commit is contained in:
parent
62832de065
commit
d5973877a6
|
@ -61,6 +61,7 @@ var cmdConsumer = &cli.Command{
|
||||||
marshaler,
|
marshaler,
|
||||||
c.String(rmqFlagTopicSyncServersCmd.Name),
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
||||||
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
||||||
|
c.String(rmqFlagTopicTribesSyncedEvent.Name),
|
||||||
)
|
)
|
||||||
consumer.Register(router)
|
consumer.Register(router)
|
||||||
|
|
||||||
|
|
|
@ -9,28 +9,32 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
bun.BaseModel `bun:"table:servers,alias:server"`
|
bun.BaseModel `bun:"table:servers,alias:server"`
|
||||||
Key string `bun:"key,nullzero,pk"`
|
Key string `bun:"key,nullzero,pk"`
|
||||||
URL string `bun:"url,nullzero"`
|
URL string `bun:"url,nullzero"`
|
||||||
Open bool `bun:"open"`
|
Open bool `bun:"open"`
|
||||||
Special bool `bun:"special"`
|
Special bool `bun:"special"`
|
||||||
NumPlayers int `bun:"num_players"`
|
NumPlayers int `bun:"num_players"`
|
||||||
NumTribes int `bun:"num_tribes"`
|
NumTribes int `bun:"num_tribes"`
|
||||||
NumVillages int `bun:"num_villages"`
|
NumVillages int `bun:"num_villages"`
|
||||||
NumPlayerVillages int `bun:"num_player_villages"`
|
NumPlayerVillages int `bun:"num_player_villages"`
|
||||||
NumBarbarianVillages int `bun:"num_barbarian_villages"`
|
NumBarbarianVillages int `bun:"num_barbarian_villages"`
|
||||||
NumBonusVillages int `bun:"num_bonus_villages"`
|
NumBonusVillages int `bun:"num_bonus_villages"`
|
||||||
Config ServerConfig `bun:"config"`
|
Config ServerConfig `bun:"config"`
|
||||||
BuildingInfo BuildingInfo `bun:"building_info"`
|
BuildingInfo BuildingInfo `bun:"building_info"`
|
||||||
UnitInfo UnitInfo `bun:"unit_info"`
|
UnitInfo UnitInfo `bun:"unit_info"`
|
||||||
CreatedAt time.Time `bun:"created_at,nullzero"`
|
CreatedAt time.Time `bun:"created_at,nullzero"`
|
||||||
PlayerDataUpdatedAt time.Time `bun:"player_data_updated_at,nullzero"`
|
// TODO: rename this column to player_data_synced_at
|
||||||
PlayerSnapshotsCreatedAt time.Time `bun:"player_snapshots_created_at,nullzero"`
|
PlayerDataUpdatedAt time.Time `bun:"player_data_updated_at,nullzero"`
|
||||||
TribeDataUpdatedAt time.Time `bun:"tribe_data_updated_at,nullzero"`
|
PlayerSnapshotsCreatedAt time.Time `bun:"player_snapshots_created_at,nullzero"`
|
||||||
TribeSnapshotsCreatedAt time.Time `bun:"tribe_snapshots_created_at,nullzero"`
|
// TODO: rename this column to tribe_data_synced_at
|
||||||
VillageDataUpdatedAt time.Time `bun:"village_data_updated_at,nullzero"`
|
TribeDataUpdatedAt time.Time `bun:"tribe_data_updated_at,nullzero"`
|
||||||
EnnoblementDataUpdatedAt time.Time `bun:"ennoblement_data_updated_at,nullzero"`
|
TribeSnapshotsCreatedAt time.Time `bun:"tribe_snapshots_created_at,nullzero"`
|
||||||
VersionCode string `bun:"version_code,nullzero"`
|
// TODO: rename this column to village_data_synced_at
|
||||||
|
VillageDataUpdatedAt time.Time `bun:"village_data_updated_at,nullzero"`
|
||||||
|
// TODO: rename this column to ennoblement_data_synced_at
|
||||||
|
EnnoblementDataUpdatedAt time.Time `bun:"ennoblement_data_updated_at,nullzero"`
|
||||||
|
VersionCode string `bun:"version_code,nullzero"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s Server) ToDomain() (domain.Server, error) {
|
func (s Server) ToDomain() (domain.Server, error) {
|
||||||
|
|
|
@ -89,7 +89,7 @@ func (repo *ServerBunRepository) Update(ctx context.Context, key string, params
|
||||||
Returning("").
|
Returning("").
|
||||||
Exec(ctx)
|
Exec(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't update server with key %s: %w", key, err)
|
return fmt.Errorf("%s: couldn't update server: %w", key, err)
|
||||||
}
|
}
|
||||||
if affected, _ := res.RowsAffected(); affected == 0 {
|
if affected, _ := res.RowsAffected(); affected == 0 {
|
||||||
return domain.ServerNotFoundError{
|
return domain.ServerNotFoundError{
|
||||||
|
@ -117,6 +117,15 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery {
|
||||||
q = q.Set("building_info = ?", bunmodel.NewBuildingInfo(buildingInfo.Value))
|
q = q.Set("building_info = ?", bunmodel.NewBuildingInfo(buildingInfo.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if numTribes := a.params.NumTribes(); numTribes.Valid {
|
||||||
|
q = q.Set("num_tribes = ?", numTribes.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tribeDataSyncedAt := a.params.TribeDataSyncedAt(); tribeDataSyncedAt.Valid {
|
||||||
|
// TODO: rename this column to tribe_data_synced_at
|
||||||
|
q = q.Set("tribe_data_updated_at = ?", tribeDataSyncedAt.Value)
|
||||||
|
}
|
||||||
|
|
||||||
return q
|
return q
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,11 +4,14 @@ import (
|
||||||
"cmp"
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
|
"github.com/brianvoe/gofakeit/v6"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -400,6 +403,14 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
||||||
Value: domaintest.NewBuildingInfo(t),
|
Value: domaintest.NewBuildingInfo(t),
|
||||||
Valid: true,
|
Valid: true,
|
||||||
}))
|
}))
|
||||||
|
require.NoError(t, updateParams.SetNumTribes(domain.NullInt{
|
||||||
|
Value: gofakeit.IntRange(0, math.MaxInt),
|
||||||
|
Valid: true,
|
||||||
|
}))
|
||||||
|
require.NoError(t, updateParams.SetTribeDataSyncedAt(domain.NullTime{
|
||||||
|
Value: time.Now(),
|
||||||
|
Valid: true,
|
||||||
|
}))
|
||||||
|
|
||||||
require.NoError(t, repos.server.Update(ctx, serversBeforeUpdate[0].Key(), updateParams))
|
require.NoError(t, repos.server.Update(ctx, serversBeforeUpdate[0].Key(), updateParams))
|
||||||
|
|
||||||
|
@ -413,6 +424,13 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
|
||||||
assert.Equal(t, updateParams.Config().Value, serversAfterUpdate[0].Config())
|
assert.Equal(t, updateParams.Config().Value, serversAfterUpdate[0].Config())
|
||||||
assert.Equal(t, updateParams.UnitInfo().Value, serversAfterUpdate[0].UnitInfo())
|
assert.Equal(t, updateParams.UnitInfo().Value, serversAfterUpdate[0].UnitInfo())
|
||||||
assert.Equal(t, updateParams.BuildingInfo().Value, serversAfterUpdate[0].BuildingInfo())
|
assert.Equal(t, updateParams.BuildingInfo().Value, serversAfterUpdate[0].BuildingInfo())
|
||||||
|
assert.Equal(t, updateParams.NumTribes().Value, serversAfterUpdate[0].NumTribes())
|
||||||
|
assert.WithinDuration(
|
||||||
|
t,
|
||||||
|
updateParams.TribeDataSyncedAt().Value,
|
||||||
|
serversAfterUpdate[0].TribeDataSyncedAt(),
|
||||||
|
time.Minute,
|
||||||
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ERR: not found", func(t *testing.T) {
|
t.Run("ERR: not found", func(t *testing.T) {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package app
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
)
|
)
|
||||||
|
@ -138,21 +139,22 @@ func (svc *ServerService) ListAll(ctx context.Context, params domain.ListServers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) SyncConfigAndInfo(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
|
func (svc *ServerService) SyncConfigAndInfo(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
|
||||||
|
key := payload.Key()
|
||||||
u := payload.URL()
|
u := payload.URL()
|
||||||
|
|
||||||
cfg, err := svc.twSvc.GetServerConfig(ctx, u)
|
cfg, err := svc.twSvc.GetServerConfig(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't get server config for server %s: %w", payload.Key(), err)
|
return fmt.Errorf("%s: couldn't get server config: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
buildingInfo, err := svc.twSvc.GetBuildingInfo(ctx, u)
|
buildingInfo, err := svc.twSvc.GetBuildingInfo(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't get building info for server %s: %w", payload.Key(), err)
|
return fmt.Errorf("%s: couldn't get building info: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
unitInfo, err := svc.twSvc.GetUnitInfo(ctx, u)
|
unitInfo, err := svc.twSvc.GetUnitInfo(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't get unit info for server %s: %w", payload.Key(), err)
|
return fmt.Errorf("%s: couldn't get unit info: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateParams domain.UpdateServerParams
|
var updateParams domain.UpdateServerParams
|
||||||
|
@ -160,20 +162,40 @@ func (svc *ServerService) SyncConfigAndInfo(ctx context.Context, payload domain.
|
||||||
Value: cfg,
|
Value: cfg,
|
||||||
Valid: true,
|
Valid: true,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return fmt.Errorf("%s: %w", key, err)
|
||||||
}
|
}
|
||||||
if err = updateParams.SetBuildingInfo(domain.NullBuildingInfo{
|
if err = updateParams.SetBuildingInfo(domain.NullBuildingInfo{
|
||||||
Value: buildingInfo,
|
Value: buildingInfo,
|
||||||
Valid: true,
|
Valid: true,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return fmt.Errorf("%s: %w", key, err)
|
||||||
}
|
}
|
||||||
if err = updateParams.SetUnitInfo(domain.NullUnitInfo{
|
if err = updateParams.SetUnitInfo(domain.NullUnitInfo{
|
||||||
Value: unitInfo,
|
Value: unitInfo,
|
||||||
Valid: true,
|
Valid: true,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return fmt.Errorf("%s: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return svc.repo.Update(ctx, payload.Key(), updateParams)
|
return svc.repo.Update(ctx, key, updateParams)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *ServerService) UpdateNumTribes(ctx context.Context, payload domain.TribesSyncedEventPayload) error {
|
||||||
|
key := payload.ServerKey()
|
||||||
|
|
||||||
|
var updateParams domain.UpdateServerParams
|
||||||
|
if err := updateParams.SetNumTribes(domain.NullInt{
|
||||||
|
Value: payload.NumTribes(),
|
||||||
|
Valid: true,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", key, err)
|
||||||
|
}
|
||||||
|
if err := updateParams.SetTribeDataSyncedAt(domain.NullTime{
|
||||||
|
Value: time.Now(),
|
||||||
|
Valid: true,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc.repo.Update(ctx, key, updateParams)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package domain
|
package domain
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
type NullValue[T any] struct {
|
type NullValue[T any] struct {
|
||||||
Value T
|
Value T
|
||||||
Valid bool // Valid is true if Value is not NULL
|
Valid bool // Valid is true if Value is not NULL
|
||||||
|
@ -10,3 +12,5 @@ type NullInt = NullValue[int]
|
||||||
type NullString = NullValue[string]
|
type NullString = NullValue[string]
|
||||||
|
|
||||||
type NullBool = NullValue[bool]
|
type NullBool = NullValue[bool]
|
||||||
|
|
||||||
|
type NullTime = NullValue[time.Time]
|
||||||
|
|
|
@ -274,11 +274,15 @@ func (params CreateServerParams) VersionCode() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpdateServerParams struct {
|
type UpdateServerParams struct {
|
||||||
config NullServerConfig
|
config NullServerConfig
|
||||||
buildingInfo NullBuildingInfo
|
buildingInfo NullBuildingInfo
|
||||||
unitInfo NullUnitInfo
|
unitInfo NullUnitInfo
|
||||||
|
numTribes NullInt
|
||||||
|
tribeDataSyncedAt NullTime
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const updateServerParamsModelName = "UpdateServerParams"
|
||||||
|
|
||||||
func (params *UpdateServerParams) Config() NullServerConfig {
|
func (params *UpdateServerParams) Config() NullServerConfig {
|
||||||
return params.config
|
return params.config
|
||||||
}
|
}
|
||||||
|
@ -306,10 +310,41 @@ func (params *UpdateServerParams) SetUnitInfo(unitInfo NullUnitInfo) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (params *UpdateServerParams) NumTribes() NullInt {
|
||||||
|
return params.numTribes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *UpdateServerParams) SetNumTribes(numTribes NullInt) error {
|
||||||
|
if numTribes.Valid {
|
||||||
|
if err := validateIntInRange(numTribes.Value, 0, math.MaxInt); err != nil {
|
||||||
|
return ValidationError{
|
||||||
|
Model: updateServerParamsModelName,
|
||||||
|
Field: "numTribes",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
params.numTribes = numTribes
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *UpdateServerParams) TribeDataSyncedAt() NullTime {
|
||||||
|
return params.tribeDataSyncedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (params *UpdateServerParams) SetTribeDataSyncedAt(tribeDataSyncedAt NullTime) error {
|
||||||
|
params.tribeDataSyncedAt = tribeDataSyncedAt
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (params *UpdateServerParams) IsZero() bool {
|
func (params *UpdateServerParams) IsZero() bool {
|
||||||
return !params.config.Valid &&
|
return !params.config.Valid &&
|
||||||
!params.buildingInfo.Valid &&
|
!params.buildingInfo.Valid &&
|
||||||
!params.unitInfo.Valid
|
!params.unitInfo.Valid &&
|
||||||
|
!params.numTribes.Valid &&
|
||||||
|
!params.tribeDataSyncedAt.Valid
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerSort uint8
|
type ServerSort uint8
|
||||||
|
|
|
@ -2,11 +2,13 @@ package domain_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
|
"github.com/brianvoe/gofakeit/v6"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -120,6 +122,71 @@ func TestNewCreateServerParams(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateServerParams_SetNumTribes(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
type args struct {
|
||||||
|
numTribes domain.NullInt
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "OK",
|
||||||
|
args: args{
|
||||||
|
numTribes: domain.NullInt{
|
||||||
|
Value: gofakeit.IntRange(0, math.MaxInt),
|
||||||
|
Valid: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OK: null value",
|
||||||
|
args: args{
|
||||||
|
numTribes: domain.NullInt{
|
||||||
|
Valid: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ERR: numTribes < 0",
|
||||||
|
args: args{
|
||||||
|
numTribes: domain.NullInt{
|
||||||
|
Value: -1,
|
||||||
|
Valid: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedErr: domain.ValidationError{
|
||||||
|
Model: "UpdateServerParams",
|
||||||
|
Field: "numTribes",
|
||||||
|
Err: domain.MinGreaterEqualError{
|
||||||
|
Min: 0,
|
||||||
|
Current: -1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
tt := tt
|
||||||
|
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var params domain.UpdateServerParams
|
||||||
|
|
||||||
|
require.ErrorIs(t, params.SetNumTribes(tt.args.numTribes), tt.expectedErr)
|
||||||
|
if tt.expectedErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert.Equal(t, tt.args.numTribes, params.NumTribes())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestListServersParams_SetSort(t *testing.T) {
|
func TestListServersParams_SetSort(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ type ServerWatermillConsumer struct {
|
||||||
marshaler watermillmsg.Marshaler
|
marshaler watermillmsg.Marshaler
|
||||||
cmdSyncTopic string
|
cmdSyncTopic string
|
||||||
eventServerSyncedTopic string
|
eventServerSyncedTopic string
|
||||||
|
eventTribesSyncedTopic string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerWatermillConsumer(
|
func NewServerWatermillConsumer(
|
||||||
|
@ -24,6 +25,7 @@ func NewServerWatermillConsumer(
|
||||||
marshaler watermillmsg.Marshaler,
|
marshaler watermillmsg.Marshaler,
|
||||||
cmdSyncTopic string,
|
cmdSyncTopic string,
|
||||||
eventServerSyncedTopic string,
|
eventServerSyncedTopic string,
|
||||||
|
eventTribesSyncedTopic string,
|
||||||
) *ServerWatermillConsumer {
|
) *ServerWatermillConsumer {
|
||||||
return &ServerWatermillConsumer{
|
return &ServerWatermillConsumer{
|
||||||
svc: svc,
|
svc: svc,
|
||||||
|
@ -32,6 +34,7 @@ func NewServerWatermillConsumer(
|
||||||
marshaler: marshaler,
|
marshaler: marshaler,
|
||||||
cmdSyncTopic: cmdSyncTopic,
|
cmdSyncTopic: cmdSyncTopic,
|
||||||
eventServerSyncedTopic: eventServerSyncedTopic,
|
eventServerSyncedTopic: eventServerSyncedTopic,
|
||||||
|
eventTribesSyncedTopic: eventTribesSyncedTopic,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,6 +46,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) {
|
||||||
c.subscriber,
|
c.subscriber,
|
||||||
c.syncConfigAndInfo,
|
c.syncConfigAndInfo,
|
||||||
)
|
)
|
||||||
|
router.AddNoPublisherHandler(
|
||||||
|
"ServerConsumer.updateNumTribes",
|
||||||
|
c.eventTribesSyncedTopic,
|
||||||
|
c.subscriber,
|
||||||
|
c.updateNumTribes,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
|
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
|
||||||
|
@ -86,3 +95,29 @@ func (c *ServerWatermillConsumer) syncConfigAndInfo(msg *message.Message) error
|
||||||
|
|
||||||
return c.svc.SyncConfigAndInfo(msg.Context(), payload)
|
return c.svc.SyncConfigAndInfo(msg.Context(), payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error {
|
||||||
|
var rawPayload watermillmsg.TribesSyncedEventPayload
|
||||||
|
|
||||||
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
||||||
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
||||||
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, err := domain.NewTribesSyncedEventPayload(
|
||||||
|
rawPayload.ServerKey,
|
||||||
|
rawPayload.ServerURL,
|
||||||
|
rawPayload.VersionCode,
|
||||||
|
rawPayload.NumTribes,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
|
||||||
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.svc.UpdateNumTribes(msg.Context(), payload)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue