From c726411f43f3bf8a44c91897b5b6a179519b8a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Tue, 2 Jan 2024 10:47:02 +0000 Subject: [PATCH] feat: server - update NumVillages & VillageDataSyncedAt (#24) Reviewed-on: https://gitea.dwysokinski.me/twhelp/corev3/pulls/24 --- cmd/twhelp/cmd_consumer.go | 13 +- cmd/twhelp/rabbitmq.go | 10 +- .../adapter/publisher_watermill_village.go | 58 +++++++ internal/adapter/repository_bun_server.go | 12 ++ internal/adapter/repository_server_test.go | 15 ++ internal/app/publishers.go | 4 + internal/app/service_server.go | 38 +++++ internal/app/service_village.go | 19 ++- internal/domain/domaintest/base_village.go | 4 +- internal/domain/server.go | 30 ++++ internal/domain/village_message_payloads.go | 155 ++++++++++++++++++ .../domain/village_message_payloads_test.go | 83 ++++++++++ internal/port/consumer_watermill_server.go | 70 ++++++-- internal/watermillmsg/village.go | 13 ++ 14 files changed, 500 insertions(+), 24 deletions(-) create mode 100644 internal/adapter/publisher_watermill_village.go create mode 100644 internal/domain/village_message_payloads.go create mode 100644 internal/domain/village_message_payloads_test.go create mode 100644 internal/watermillmsg/village.go diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index faf2df6..1afab24 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -62,7 +62,8 @@ var cmdConsumer = &cli.Command{ c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicTribesSyncedEvent.Name), - c.String(rmqFlagTopicPlayerSyncedEvent.Name), + c.String(rmqFlagTopicPlayersSyncedEvent.Name), + c.String(rmqFlagTopicVillagesSyncedEvent.Name), ) consumer.Register(router) @@ -138,7 +139,7 @@ var cmdConsumer = &cli.Command{ playerPublisher := adapter.NewPlayerWatermillPublisher( publisher, marshaler, - c.String(rmqFlagTopicPlayerSyncedEvent.Name), + c.String(rmqFlagTopicPlayersSyncedEvent.Name), ) consumer := port.NewPlayerWatermillConsumer( @@ -177,8 +178,14 @@ var cmdConsumer = &cli.Command{ return err } + villagePublisher := adapter.NewVillageWatermillPublisher( + publisher, + marshaler, + c.String(rmqFlagTopicVillagesSyncedEvent.Name), + ) + consumer := port.NewVillageWatermillConsumer( - app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc), + app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher), subscriber, logger, marshaler, diff --git a/cmd/twhelp/rabbitmq.go b/cmd/twhelp/rabbitmq.go index 8981f49..fe6d2da 100644 --- a/cmd/twhelp/rabbitmq.go +++ b/cmd/twhelp/rabbitmq.go @@ -31,17 +31,23 @@ var ( Value: "tribes.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"}, } - rmqFlagTopicPlayerSyncedEvent = &cli.StringFlag{ + rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{ Name: "rabbitmq.topic.playersSyncedEvent", Value: "players.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"}, } + rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.villagesSyncedEvent", + Value: "villages.event.synced", + EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, rmqFlagTopicServerSyncedEvent, rmqFlagTopicTribesSyncedEvent, - rmqFlagTopicPlayerSyncedEvent, + rmqFlagTopicPlayersSyncedEvent, + rmqFlagTopicVillagesSyncedEvent, } ) diff --git a/internal/adapter/publisher_watermill_village.go b/internal/adapter/publisher_watermill_village.go new file mode 100644 index 0000000..9fd6e58 --- /dev/null +++ b/internal/adapter/publisher_watermill_village.go @@ -0,0 +1,58 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type VillageWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + eventSyncedTopic string +} + +func NewVillageWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + eventSyncedTopic string, +) *VillageWatermillPublisher { + return &VillageWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + eventSyncedTopic: eventSyncedTopic, + } +} + +func (pub *VillageWatermillPublisher) EventSynced( + ctx context.Context, + payloads ...domain.VillagesSyncedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.VillagesSyncedEventPayload{ + ServerKey: p.ServerKey(), + ServerURL: p.ServerURL(), + VersionCode: p.VersionCode(), + NumVillages: p.NumVillages(), + NumPlayerVillages: p.NumPlayerVillages(), + NumBarbarianVillages: p.NumBarbarianVillages(), + NumBonusVillages: p.NumBonusVillages(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal VillagesSyncedEventPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) + } + + return nil +} diff --git a/internal/adapter/repository_bun_server.go b/internal/adapter/repository_bun_server.go index 7726ccd..c1e362a 100644 --- a/internal/adapter/repository_bun_server.go +++ b/internal/adapter/repository_bun_server.go @@ -140,6 +140,18 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery { q = q.Set("num_villages = ?", numVillages.Value) } + if numPlayerVillages := a.params.NumPlayerVillages(); numPlayerVillages.Valid { + q = q.Set("num_player_villages = ?", numPlayerVillages.Value) + } + + if numBarbarianVillages := a.params.NumBarbarianVillages(); numBarbarianVillages.Valid { + q = q.Set("num_barbarian_villages = ?", numBarbarianVillages.Value) + } + + if numBonusVillages := a.params.NumBonusVillages(); numBonusVillages.Valid { + q = q.Set("num_bonus_villages = ?", numBonusVillages.Value) + } + if villageDataSyncedAt := a.params.VillageDataSyncedAt(); villageDataSyncedAt.Valid { // TODO: rename this column to village_data_synced_at q = q.Set("village_data_updated_at = ?", villageDataSyncedAt.Value) diff --git a/internal/adapter/repository_server_test.go b/internal/adapter/repository_server_test.go index 7565ee3..9594670 100644 --- a/internal/adapter/repository_server_test.go +++ b/internal/adapter/repository_server_test.go @@ -423,6 +423,18 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories Value: gofakeit.IntRange(0, math.MaxInt), Valid: true, })) + require.NoError(t, updateParams.SetNumPlayerVillages(domain.NullInt{ + Value: gofakeit.IntRange(0, math.MaxInt), + Valid: true, + })) + require.NoError(t, updateParams.SetNumBonusVillages(domain.NullInt{ + Value: gofakeit.IntRange(0, math.MaxInt), + Valid: true, + })) + require.NoError(t, updateParams.SetNumBarbarianVillages(domain.NullInt{ + Value: gofakeit.IntRange(0, math.MaxInt), + Valid: true, + })) require.NoError(t, updateParams.SetVillageDataSyncedAt(domain.NullTime{ Value: time.Now(), Valid: true, @@ -459,6 +471,9 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories time.Minute, ) assert.Equal(t, updateParams.NumVillages().Value, serversAfterUpdate[0].NumVillages()) + assert.Equal(t, updateParams.NumPlayerVillages().Value, serversAfterUpdate[0].NumPlayerVillages()) + assert.Equal(t, updateParams.NumBarbarianVillages().Value, serversAfterUpdate[0].NumBarbarianVillages()) + assert.Equal(t, updateParams.NumBonusVillages().Value, serversAfterUpdate[0].NumBonusVillages()) assert.WithinDuration( t, updateParams.VillageDataSyncedAt().Value, diff --git a/internal/app/publishers.go b/internal/app/publishers.go index ee8d272..166e90b 100644 --- a/internal/app/publishers.go +++ b/internal/app/publishers.go @@ -18,3 +18,7 @@ type TribePublisher interface { type PlayerPublisher interface { EventSynced(ctx context.Context, payloads ...domain.PlayersSyncedEventPayload) error } + +type VillagePublisher interface { + EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error +} diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 6ed00ea..2a2433e 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -219,3 +219,41 @@ func (svc *ServerService) UpdateNumPlayers(ctx context.Context, payload domain.P return svc.repo.Update(ctx, key, updateParams) } + +func (svc *ServerService) UpdateNumVillages(ctx context.Context, payload domain.VillagesSyncedEventPayload) error { + key := payload.ServerKey() + + var updateParams domain.UpdateServerParams + if err := updateParams.SetNumVillages(domain.NullInt{ + Value: payload.NumVillages(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + if err := updateParams.SetNumPlayerVillages(domain.NullInt{ + Value: payload.NumPlayerVillages(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + if err := updateParams.SetNumBarbarianVillages(domain.NullInt{ + Value: payload.NumBarbarianVillages(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + if err := updateParams.SetNumBonusVillages(domain.NullInt{ + Value: payload.NumBonusVillages(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + if err := updateParams.SetVillageDataSyncedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + + return svc.repo.Update(ctx, key, updateParams) +} diff --git a/internal/app/service_village.go b/internal/app/service_village.go index c63bb56..4b9ca9d 100644 --- a/internal/app/service_village.go +++ b/internal/app/service_village.go @@ -16,10 +16,11 @@ type VillageRepository interface { type VillageService struct { repo VillageRepository twSvc TWService + pub VillagePublisher } -func NewVillageService(repo VillageRepository, twSvc TWService) *VillageService { - return &VillageService{repo: repo, twSvc: twSvc} +func NewVillageService(repo VillageRepository, twSvc TWService, pub VillagePublisher) *VillageService { + return &VillageService{repo: repo, twSvc: twSvc, pub: pub} } func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error { @@ -39,6 +40,20 @@ func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain. return fmt.Errorf("%s: couldn't delete villages: %w", serverKey, err) } + villagesSyncedPayload, err := domain.NewVillagesSyncedEventPayloadFromVillages( + serverKey, + serverURL, + serverSyncedPayload.VersionCode(), + villages, + ) + if err != nil { + return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err) + } + + if err = svc.pub.EventSynced(ctx, villagesSyncedPayload); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + return nil } diff --git a/internal/domain/domaintest/base_village.go b/internal/domain/domaintest/base_village.go index 9b4c7d8..d2d028f 100644 --- a/internal/domain/domaintest/base_village.go +++ b/internal/domain/domaintest/base_village.go @@ -10,6 +10,7 @@ import ( type BaseVillageConfig struct { ID int + Bonus int PlayerID int } @@ -19,6 +20,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B cfg := &BaseVillageConfig{ ID: RandID(), PlayerID: gofakeit.IntRange(0, 10000), + Bonus: 0, } for _, opt := range opts { @@ -35,7 +37,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B gofakeit.IntRange(1, 1000), gofakeit.IntRange(1, 1000), gofakeit.LetterN(3), - 0, + cfg.Bonus, cfg.PlayerID, u, ) diff --git a/internal/domain/server.go b/internal/domain/server.go index b593fd5..d1bca21 100644 --- a/internal/domain/server.go +++ b/internal/domain/server.go @@ -282,6 +282,9 @@ type UpdateServerParams struct { numPlayers NullInt playerDataSyncedAt NullTime numVillages NullInt + numPlayerVillages NullInt + numBarbarianVillages NullInt + numBonusVillages NullInt villageDataSyncedAt NullTime ennoblementDataSyncedAt NullTime } @@ -371,6 +374,33 @@ func (params *UpdateServerParams) SetNumVillages(numVillages NullInt) error { return nil } +func (params *UpdateServerParams) NumPlayerVillages() NullInt { + return params.numPlayerVillages +} + +func (params *UpdateServerParams) SetNumPlayerVillages(numPlayerVillages NullInt) error { + params.numPlayerVillages = numPlayerVillages + return nil +} + +func (params *UpdateServerParams) NumBarbarianVillages() NullInt { + return params.numBarbarianVillages +} + +func (params *UpdateServerParams) SetNumBarbarianVillages(numBarbarianVillages NullInt) error { + params.numBarbarianVillages = numBarbarianVillages + return nil +} + +func (params *UpdateServerParams) NumBonusVillages() NullInt { + return params.numBonusVillages +} + +func (params *UpdateServerParams) SetNumBonusVillages(numBonusVillages NullInt) error { + params.numBonusVillages = numBonusVillages + return nil +} + func (params *UpdateServerParams) VillageDataSyncedAt() NullTime { return params.villageDataSyncedAt } diff --git a/internal/domain/village_message_payloads.go b/internal/domain/village_message_payloads.go new file mode 100644 index 0000000..d1f5499 --- /dev/null +++ b/internal/domain/village_message_payloads.go @@ -0,0 +1,155 @@ +package domain + +import ( + "math" + "net/url" +) + +type VillagesSyncedEventPayload struct { + serverKey string + serverURL *url.URL + versionCode string + numVillages int + numPlayerVillages int + numBarbarianVillages int + numBonusVillages int +} + +const villagesSyncedEventPayloadModelName = "VillagesSyncedEventPayload" + +func NewVillagesSyncedEventPayload( + serverKey string, + serverURL *url.URL, + versionCode string, + numVillages int, + numPlayerVillages int, + numBarbarianVillages int, + numBonusVillages int, +) (VillagesSyncedEventPayload, error) { + if serverKey == "" { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if serverURL == nil { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "serverURL", + Err: ErrNil, + } + } + + if versionCode == "" { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "numVillages", + Err: err, + } + } + + if err := validateIntInRange(numPlayerVillages, 0, math.MaxInt); err != nil { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "numPlayerVillages", + Err: err, + } + } + + if err := validateIntInRange(numBarbarianVillages, 0, math.MaxInt); err != nil { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "numBarbarianVillages", + Err: err, + } + } + + if err := validateIntInRange(numBonusVillages, 0, math.MaxInt); err != nil { + return VillagesSyncedEventPayload{}, ValidationError{ + Model: villagesSyncedEventPayloadModelName, + Field: "numBonusVillages", + Err: err, + } + } + + return VillagesSyncedEventPayload{ + serverKey: serverKey, + serverURL: serverURL, + versionCode: versionCode, + numVillages: numVillages, + numPlayerVillages: numPlayerVillages, + numBarbarianVillages: numBarbarianVillages, + numBonusVillages: numBonusVillages, + }, nil +} + +func NewVillagesSyncedEventPayloadFromVillages( + serverKey string, + serverURL *url.URL, + versionCode string, + villages BaseVillages, +) (VillagesSyncedEventPayload, error) { + numPlayerVillages := 0 + numBarbarianVillages := 0 + numBonusVillages := 0 + + for _, v := range villages { + if v.PlayerID() > 0 { + numPlayerVillages++ + } else { + numBarbarianVillages++ + } + + if v.Bonus() > 0 { + numBonusVillages++ + } + } + + return NewVillagesSyncedEventPayload( + serverKey, + serverURL, + versionCode, + len(villages), + numPlayerVillages, + numBarbarianVillages, + numBonusVillages, + ) +} + +func (p VillagesSyncedEventPayload) ServerKey() string { + return p.serverKey +} + +func (p VillagesSyncedEventPayload) ServerURL() *url.URL { + return p.serverURL +} + +func (p VillagesSyncedEventPayload) VersionCode() string { + return p.versionCode +} + +func (p VillagesSyncedEventPayload) NumVillages() int { + return p.numVillages +} + +func (p VillagesSyncedEventPayload) NumPlayerVillages() int { + return p.numPlayerVillages +} + +func (p VillagesSyncedEventPayload) NumBarbarianVillages() int { + return p.numBarbarianVillages +} + +func (p VillagesSyncedEventPayload) NumBonusVillages() int { + return p.numBonusVillages +} diff --git a/internal/domain/village_message_payloads_test.go b/internal/domain/village_message_payloads_test.go new file mode 100644 index 0000000..e189a79 --- /dev/null +++ b/internal/domain/village_message_payloads_test.go @@ -0,0 +1,83 @@ +package domain_test + +import ( + "math" + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewVillagesSyncedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + numVillages := gofakeit.IntRange(0, math.MaxInt) + numPlayerVillages := gofakeit.IntRange(0, math.MaxInt) + numBarbarianVillages := gofakeit.IntRange(0, math.MaxInt) + numBonusVillages := gofakeit.IntRange(0, math.MaxInt) + + payload, err := domain.NewVillagesSyncedEventPayload( + server.Key(), + server.URL(), + server.VersionCode(), + numVillages, + numPlayerVillages, + numBarbarianVillages, + numBonusVillages, + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) + assert.Equal(t, numVillages, payload.NumVillages()) + assert.Equal(t, numPlayerVillages, payload.NumPlayerVillages()) + assert.Equal(t, numBarbarianVillages, payload.NumBarbarianVillages()) + assert.Equal(t, numBonusVillages, payload.NumBonusVillages()) +} + +func TestNewVillagesSyncedEventPayloadFromVillages(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + villages := domain.BaseVillages{ + domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) { + cfg.PlayerID = 0 + cfg.Bonus = 1 + }), + domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) { + cfg.PlayerID = 0 + cfg.Bonus = 0 + }), + domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) { + cfg.PlayerID = 0 + cfg.Bonus = 0 + }), + domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) { + cfg.PlayerID = gofakeit.IntRange(1, 10000) + cfg.Bonus = 0 + }), + domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) { + cfg.PlayerID = gofakeit.IntRange(1, 10000) + cfg.Bonus = 1 + }), + } + + payload, err := domain.NewVillagesSyncedEventPayloadFromVillages( + server.Key(), + server.URL(), + server.VersionCode(), + villages, + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) + assert.Equal(t, len(villages), payload.NumVillages()) //nolint:testifylint + assert.Equal(t, 2, payload.NumPlayerVillages()) + assert.Equal(t, 2, payload.NumBonusVillages()) + assert.Equal(t, 3, payload.NumBarbarianVillages()) +} diff --git a/internal/port/consumer_watermill_server.go b/internal/port/consumer_watermill_server.go index 3f79c83..33067f6 100644 --- a/internal/port/consumer_watermill_server.go +++ b/internal/port/consumer_watermill_server.go @@ -9,14 +9,15 @@ import ( ) type ServerWatermillConsumer struct { - svc *app.ServerService - subscriber message.Subscriber - logger watermill.LoggerAdapter - marshaler watermillmsg.Marshaler - cmdSyncTopic string - eventServerSyncedTopic string - eventTribesSyncedTopic string - eventPlayersSyncedTopic string + svc *app.ServerService + subscriber message.Subscriber + logger watermill.LoggerAdapter + marshaler watermillmsg.Marshaler + cmdSyncTopic string + eventServerSyncedTopic string + eventTribesSyncedTopic string + eventPlayersSyncedTopic string + eventVillagesSyncedTopic string } func NewServerWatermillConsumer( @@ -28,16 +29,18 @@ func NewServerWatermillConsumer( eventServerSyncedTopic string, eventTribesSyncedTopic string, eventPlayersSyncedTopic string, + eventVillagesSyncedTopic string, ) *ServerWatermillConsumer { return &ServerWatermillConsumer{ - svc: svc, - subscriber: subscriber, - logger: logger, - marshaler: marshaler, - cmdSyncTopic: cmdSyncTopic, - eventServerSyncedTopic: eventServerSyncedTopic, - eventTribesSyncedTopic: eventTribesSyncedTopic, - eventPlayersSyncedTopic: eventPlayersSyncedTopic, + svc: svc, + subscriber: subscriber, + logger: logger, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + eventServerSyncedTopic: eventServerSyncedTopic, + eventTribesSyncedTopic: eventTribesSyncedTopic, + eventPlayersSyncedTopic: eventPlayersSyncedTopic, + eventVillagesSyncedTopic: eventVillagesSyncedTopic, } } @@ -61,6 +64,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) { c.subscriber, c.updateNumPlayers, ) + router.AddNoPublisherHandler( + "ServerConsumer.updateNumVillages", + c.eventVillagesSyncedTopic, + c.subscriber, + c.updateNumVillages, + ) } func (c *ServerWatermillConsumer) sync(msg *message.Message) error { @@ -156,3 +165,32 @@ func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error { return c.svc.UpdateNumPlayers(msg.Context(), payload) } + +func (c *ServerWatermillConsumer) updateNumVillages(msg *message.Message) error { + var rawPayload watermillmsg.VillagesSyncedEventPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewVillagesSyncedEventPayload( + rawPayload.ServerKey, + rawPayload.ServerURL, + rawPayload.VersionCode, + rawPayload.NumVillages, + rawPayload.NumPlayerVillages, + rawPayload.NumBarbarianVillages, + rawPayload.NumBonusVillages, + ) + if err != nil { + c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.svc.UpdateNumVillages(msg.Context(), payload) +} diff --git a/internal/watermillmsg/village.go b/internal/watermillmsg/village.go new file mode 100644 index 0000000..fb9b414 --- /dev/null +++ b/internal/watermillmsg/village.go @@ -0,0 +1,13 @@ +package watermillmsg + +import "net/url" + +type VillagesSyncedEventPayload struct { + ServerKey string `json:"serverKey"` + ServerURL *url.URL `json:"serverUrl"` + VersionCode string `json:"versionCode"` + NumVillages int `json:"numVillages"` + NumPlayerVillages int `json:"numPlayerVillages"` + NumBarbarianVillages int `json:"numBarbarianVillages"` + NumBonusVillages int `json:"numBonusVillages"` +}