feat: server - update NumVillages & VillageDataSyncedAt (#24)

Reviewed-on: twhelp/corev3#24
This commit is contained in:
Dawid Wysokiński 2024-01-02 10:47:02 +00:00
parent 422fb7dcb9
commit c726411f43
14 changed files with 500 additions and 24 deletions

View File

@ -62,7 +62,8 @@ var cmdConsumer = &cli.Command{
c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicSyncServersCmd.Name),
c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicTribesSyncedEvent.Name), c.String(rmqFlagTopicTribesSyncedEvent.Name),
c.String(rmqFlagTopicPlayerSyncedEvent.Name), c.String(rmqFlagTopicPlayersSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
) )
consumer.Register(router) consumer.Register(router)
@ -138,7 +139,7 @@ var cmdConsumer = &cli.Command{
playerPublisher := adapter.NewPlayerWatermillPublisher( playerPublisher := adapter.NewPlayerWatermillPublisher(
publisher, publisher,
marshaler, marshaler,
c.String(rmqFlagTopicPlayerSyncedEvent.Name), c.String(rmqFlagTopicPlayersSyncedEvent.Name),
) )
consumer := port.NewPlayerWatermillConsumer( consumer := port.NewPlayerWatermillConsumer(
@ -177,8 +178,14 @@ var cmdConsumer = &cli.Command{
return err return err
} }
villagePublisher := adapter.NewVillageWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
)
consumer := port.NewVillageWatermillConsumer( consumer := port.NewVillageWatermillConsumer(
app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc), app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher),
subscriber, subscriber,
logger, logger,
marshaler, marshaler,

View File

@ -31,17 +31,23 @@ var (
Value: "tribes.event.synced", Value: "tribes.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"}, EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
} }
rmqFlagTopicPlayerSyncedEvent = &cli.StringFlag{ rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.playersSyncedEvent", Name: "rabbitmq.topic.playersSyncedEvent",
Value: "players.event.synced", Value: "players.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"}, EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"},
} }
rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.villagesSyncedEvent",
Value: "villages.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
}
rmqFlags = []cli.Flag{ rmqFlags = []cli.Flag{
rmqFlagConnectionString, rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd, rmqFlagTopicSyncServersCmd,
rmqFlagTopicServerSyncedEvent, rmqFlagTopicServerSyncedEvent,
rmqFlagTopicTribesSyncedEvent, rmqFlagTopicTribesSyncedEvent,
rmqFlagTopicPlayerSyncedEvent, rmqFlagTopicPlayersSyncedEvent,
rmqFlagTopicVillagesSyncedEvent,
} }
) )

View File

@ -0,0 +1,58 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type VillageWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
eventSyncedTopic string
}
func NewVillageWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
eventSyncedTopic string,
) *VillageWatermillPublisher {
return &VillageWatermillPublisher{
publisher: publisher,
marshaler: marshaler,
eventSyncedTopic: eventSyncedTopic,
}
}
func (pub *VillageWatermillPublisher) EventSynced(
ctx context.Context,
payloads ...domain.VillagesSyncedEventPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.VillagesSyncedEventPayload{
ServerKey: p.ServerKey(),
ServerURL: p.ServerURL(),
VersionCode: p.VersionCode(),
NumVillages: p.NumVillages(),
NumPlayerVillages: p.NumPlayerVillages(),
NumBarbarianVillages: p.NumBarbarianVillages(),
NumBonusVillages: p.NumBonusVillages(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal VillagesSyncedEventPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
}
return nil
}

View File

@ -140,6 +140,18 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery {
q = q.Set("num_villages = ?", numVillages.Value) q = q.Set("num_villages = ?", numVillages.Value)
} }
if numPlayerVillages := a.params.NumPlayerVillages(); numPlayerVillages.Valid {
q = q.Set("num_player_villages = ?", numPlayerVillages.Value)
}
if numBarbarianVillages := a.params.NumBarbarianVillages(); numBarbarianVillages.Valid {
q = q.Set("num_barbarian_villages = ?", numBarbarianVillages.Value)
}
if numBonusVillages := a.params.NumBonusVillages(); numBonusVillages.Valid {
q = q.Set("num_bonus_villages = ?", numBonusVillages.Value)
}
if villageDataSyncedAt := a.params.VillageDataSyncedAt(); villageDataSyncedAt.Valid { if villageDataSyncedAt := a.params.VillageDataSyncedAt(); villageDataSyncedAt.Valid {
// TODO: rename this column to village_data_synced_at // TODO: rename this column to village_data_synced_at
q = q.Set("village_data_updated_at = ?", villageDataSyncedAt.Value) q = q.Set("village_data_updated_at = ?", villageDataSyncedAt.Value)

View File

@ -423,6 +423,18 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
Value: gofakeit.IntRange(0, math.MaxInt), Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true, Valid: true,
})) }))
require.NoError(t, updateParams.SetNumPlayerVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetNumBonusVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetNumBarbarianVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetVillageDataSyncedAt(domain.NullTime{ require.NoError(t, updateParams.SetVillageDataSyncedAt(domain.NullTime{
Value: time.Now(), Value: time.Now(),
Valid: true, Valid: true,
@ -459,6 +471,9 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
time.Minute, time.Minute,
) )
assert.Equal(t, updateParams.NumVillages().Value, serversAfterUpdate[0].NumVillages()) assert.Equal(t, updateParams.NumVillages().Value, serversAfterUpdate[0].NumVillages())
assert.Equal(t, updateParams.NumPlayerVillages().Value, serversAfterUpdate[0].NumPlayerVillages())
assert.Equal(t, updateParams.NumBarbarianVillages().Value, serversAfterUpdate[0].NumBarbarianVillages())
assert.Equal(t, updateParams.NumBonusVillages().Value, serversAfterUpdate[0].NumBonusVillages())
assert.WithinDuration( assert.WithinDuration(
t, t,
updateParams.VillageDataSyncedAt().Value, updateParams.VillageDataSyncedAt().Value,

View File

@ -18,3 +18,7 @@ type TribePublisher interface {
type PlayerPublisher interface { type PlayerPublisher interface {
EventSynced(ctx context.Context, payloads ...domain.PlayersSyncedEventPayload) error EventSynced(ctx context.Context, payloads ...domain.PlayersSyncedEventPayload) error
} }
type VillagePublisher interface {
EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error
}

View File

@ -219,3 +219,41 @@ func (svc *ServerService) UpdateNumPlayers(ctx context.Context, payload domain.P
return svc.repo.Update(ctx, key, updateParams) return svc.repo.Update(ctx, key, updateParams)
} }
func (svc *ServerService) UpdateNumVillages(ctx context.Context, payload domain.VillagesSyncedEventPayload) error {
key := payload.ServerKey()
var updateParams domain.UpdateServerParams
if err := updateParams.SetNumVillages(domain.NullInt{
Value: payload.NumVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumPlayerVillages(domain.NullInt{
Value: payload.NumPlayerVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumBarbarianVillages(domain.NullInt{
Value: payload.NumBarbarianVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumBonusVillages(domain.NullInt{
Value: payload.NumBonusVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetVillageDataSyncedAt(domain.NullTime{
Value: time.Now(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
return svc.repo.Update(ctx, key, updateParams)
}

View File

@ -16,10 +16,11 @@ type VillageRepository interface {
type VillageService struct { type VillageService struct {
repo VillageRepository repo VillageRepository
twSvc TWService twSvc TWService
pub VillagePublisher
} }
func NewVillageService(repo VillageRepository, twSvc TWService) *VillageService { func NewVillageService(repo VillageRepository, twSvc TWService, pub VillagePublisher) *VillageService {
return &VillageService{repo: repo, twSvc: twSvc} return &VillageService{repo: repo, twSvc: twSvc, pub: pub}
} }
func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error { func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error {
@ -39,6 +40,20 @@ func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.
return fmt.Errorf("%s: couldn't delete villages: %w", serverKey, err) return fmt.Errorf("%s: couldn't delete villages: %w", serverKey, err)
} }
villagesSyncedPayload, err := domain.NewVillagesSyncedEventPayloadFromVillages(
serverKey,
serverURL,
serverSyncedPayload.VersionCode(),
villages,
)
if err != nil {
return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventSynced(ctx, villagesSyncedPayload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
return nil return nil
} }

View File

@ -10,6 +10,7 @@ import (
type BaseVillageConfig struct { type BaseVillageConfig struct {
ID int ID int
Bonus int
PlayerID int PlayerID int
} }
@ -19,6 +20,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B
cfg := &BaseVillageConfig{ cfg := &BaseVillageConfig{
ID: RandID(), ID: RandID(),
PlayerID: gofakeit.IntRange(0, 10000), PlayerID: gofakeit.IntRange(0, 10000),
Bonus: 0,
} }
for _, opt := range opts { for _, opt := range opts {
@ -35,7 +37,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B
gofakeit.IntRange(1, 1000), gofakeit.IntRange(1, 1000),
gofakeit.IntRange(1, 1000), gofakeit.IntRange(1, 1000),
gofakeit.LetterN(3), gofakeit.LetterN(3),
0, cfg.Bonus,
cfg.PlayerID, cfg.PlayerID,
u, u,
) )

View File

@ -282,6 +282,9 @@ type UpdateServerParams struct {
numPlayers NullInt numPlayers NullInt
playerDataSyncedAt NullTime playerDataSyncedAt NullTime
numVillages NullInt numVillages NullInt
numPlayerVillages NullInt
numBarbarianVillages NullInt
numBonusVillages NullInt
villageDataSyncedAt NullTime villageDataSyncedAt NullTime
ennoblementDataSyncedAt NullTime ennoblementDataSyncedAt NullTime
} }
@ -371,6 +374,33 @@ func (params *UpdateServerParams) SetNumVillages(numVillages NullInt) error {
return nil return nil
} }
func (params *UpdateServerParams) NumPlayerVillages() NullInt {
return params.numPlayerVillages
}
func (params *UpdateServerParams) SetNumPlayerVillages(numPlayerVillages NullInt) error {
params.numPlayerVillages = numPlayerVillages
return nil
}
func (params *UpdateServerParams) NumBarbarianVillages() NullInt {
return params.numBarbarianVillages
}
func (params *UpdateServerParams) SetNumBarbarianVillages(numBarbarianVillages NullInt) error {
params.numBarbarianVillages = numBarbarianVillages
return nil
}
func (params *UpdateServerParams) NumBonusVillages() NullInt {
return params.numBonusVillages
}
func (params *UpdateServerParams) SetNumBonusVillages(numBonusVillages NullInt) error {
params.numBonusVillages = numBonusVillages
return nil
}
func (params *UpdateServerParams) VillageDataSyncedAt() NullTime { func (params *UpdateServerParams) VillageDataSyncedAt() NullTime {
return params.villageDataSyncedAt return params.villageDataSyncedAt
} }

View File

@ -0,0 +1,155 @@
package domain
import (
"math"
"net/url"
)
type VillagesSyncedEventPayload struct {
serverKey string
serverURL *url.URL
versionCode string
numVillages int
numPlayerVillages int
numBarbarianVillages int
numBonusVillages int
}
const villagesSyncedEventPayloadModelName = "VillagesSyncedEventPayload"
func NewVillagesSyncedEventPayload(
serverKey string,
serverURL *url.URL,
versionCode string,
numVillages int,
numPlayerVillages int,
numBarbarianVillages int,
numBonusVillages int,
) (VillagesSyncedEventPayload, error) {
if serverKey == "" {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "serverKey",
Err: ErrRequired,
}
}
if serverURL == nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "serverURL",
Err: ErrNil,
}
}
if versionCode == "" {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "versionCode",
Err: ErrRequired,
}
}
if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numVillages",
Err: err,
}
}
if err := validateIntInRange(numPlayerVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numPlayerVillages",
Err: err,
}
}
if err := validateIntInRange(numBarbarianVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numBarbarianVillages",
Err: err,
}
}
if err := validateIntInRange(numBonusVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numBonusVillages",
Err: err,
}
}
return VillagesSyncedEventPayload{
serverKey: serverKey,
serverURL: serverURL,
versionCode: versionCode,
numVillages: numVillages,
numPlayerVillages: numPlayerVillages,
numBarbarianVillages: numBarbarianVillages,
numBonusVillages: numBonusVillages,
}, nil
}
func NewVillagesSyncedEventPayloadFromVillages(
serverKey string,
serverURL *url.URL,
versionCode string,
villages BaseVillages,
) (VillagesSyncedEventPayload, error) {
numPlayerVillages := 0
numBarbarianVillages := 0
numBonusVillages := 0
for _, v := range villages {
if v.PlayerID() > 0 {
numPlayerVillages++
} else {
numBarbarianVillages++
}
if v.Bonus() > 0 {
numBonusVillages++
}
}
return NewVillagesSyncedEventPayload(
serverKey,
serverURL,
versionCode,
len(villages),
numPlayerVillages,
numBarbarianVillages,
numBonusVillages,
)
}
func (p VillagesSyncedEventPayload) ServerKey() string {
return p.serverKey
}
func (p VillagesSyncedEventPayload) ServerURL() *url.URL {
return p.serverURL
}
func (p VillagesSyncedEventPayload) VersionCode() string {
return p.versionCode
}
func (p VillagesSyncedEventPayload) NumVillages() int {
return p.numVillages
}
func (p VillagesSyncedEventPayload) NumPlayerVillages() int {
return p.numPlayerVillages
}
func (p VillagesSyncedEventPayload) NumBarbarianVillages() int {
return p.numBarbarianVillages
}
func (p VillagesSyncedEventPayload) NumBonusVillages() int {
return p.numBonusVillages
}

View File

@ -0,0 +1,83 @@
package domain_test
import (
"math"
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewVillagesSyncedEventPayload(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
numVillages := gofakeit.IntRange(0, math.MaxInt)
numPlayerVillages := gofakeit.IntRange(0, math.MaxInt)
numBarbarianVillages := gofakeit.IntRange(0, math.MaxInt)
numBonusVillages := gofakeit.IntRange(0, math.MaxInt)
payload, err := domain.NewVillagesSyncedEventPayload(
server.Key(),
server.URL(),
server.VersionCode(),
numVillages,
numPlayerVillages,
numBarbarianVillages,
numBonusVillages,
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
assert.Equal(t, numVillages, payload.NumVillages())
assert.Equal(t, numPlayerVillages, payload.NumPlayerVillages())
assert.Equal(t, numBarbarianVillages, payload.NumBarbarianVillages())
assert.Equal(t, numBonusVillages, payload.NumBonusVillages())
}
func TestNewVillagesSyncedEventPayloadFromVillages(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
villages := domain.BaseVillages{
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 1
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = gofakeit.IntRange(1, 10000)
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = gofakeit.IntRange(1, 10000)
cfg.Bonus = 1
}),
}
payload, err := domain.NewVillagesSyncedEventPayloadFromVillages(
server.Key(),
server.URL(),
server.VersionCode(),
villages,
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
assert.Equal(t, len(villages), payload.NumVillages()) //nolint:testifylint
assert.Equal(t, 2, payload.NumPlayerVillages())
assert.Equal(t, 2, payload.NumBonusVillages())
assert.Equal(t, 3, payload.NumBarbarianVillages())
}

View File

@ -9,14 +9,15 @@ import (
) )
type ServerWatermillConsumer struct { type ServerWatermillConsumer struct {
svc *app.ServerService svc *app.ServerService
subscriber message.Subscriber subscriber message.Subscriber
logger watermill.LoggerAdapter logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler marshaler watermillmsg.Marshaler
cmdSyncTopic string cmdSyncTopic string
eventServerSyncedTopic string eventServerSyncedTopic string
eventTribesSyncedTopic string eventTribesSyncedTopic string
eventPlayersSyncedTopic string eventPlayersSyncedTopic string
eventVillagesSyncedTopic string
} }
func NewServerWatermillConsumer( func NewServerWatermillConsumer(
@ -28,16 +29,18 @@ func NewServerWatermillConsumer(
eventServerSyncedTopic string, eventServerSyncedTopic string,
eventTribesSyncedTopic string, eventTribesSyncedTopic string,
eventPlayersSyncedTopic string, eventPlayersSyncedTopic string,
eventVillagesSyncedTopic string,
) *ServerWatermillConsumer { ) *ServerWatermillConsumer {
return &ServerWatermillConsumer{ return &ServerWatermillConsumer{
svc: svc, svc: svc,
subscriber: subscriber, subscriber: subscriber,
logger: logger, logger: logger,
marshaler: marshaler, marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic, cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic, eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic, eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic, eventPlayersSyncedTopic: eventPlayersSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
} }
} }
@ -61,6 +64,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) {
c.subscriber, c.subscriber,
c.updateNumPlayers, c.updateNumPlayers,
) )
router.AddNoPublisherHandler(
"ServerConsumer.updateNumVillages",
c.eventVillagesSyncedTopic,
c.subscriber,
c.updateNumVillages,
)
} }
func (c *ServerWatermillConsumer) sync(msg *message.Message) error { func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
@ -156,3 +165,32 @@ func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error {
return c.svc.UpdateNumPlayers(msg.Context(), payload) return c.svc.UpdateNumPlayers(msg.Context(), payload)
} }
func (c *ServerWatermillConsumer) updateNumVillages(msg *message.Message) error {
var rawPayload watermillmsg.VillagesSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewVillagesSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumVillages,
rawPayload.NumPlayerVillages,
rawPayload.NumBarbarianVillages,
rawPayload.NumBonusVillages,
)
if err != nil {
c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateNumVillages(msg.Context(), payload)
}

View File

@ -0,0 +1,13 @@
package watermillmsg
import "net/url"
type VillagesSyncedEventPayload struct {
ServerKey string `json:"serverKey"`
ServerURL *url.URL `json:"serverUrl"`
VersionCode string `json:"versionCode"`
NumVillages int `json:"numVillages"`
NumPlayerVillages int `json:"numPlayerVillages"`
NumBarbarianVillages int `json:"numBarbarianVillages"`
NumBonusVillages int `json:"numBonusVillages"`
}