diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index e59e091..0b8fa1a 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -62,6 +62,7 @@ var cmdConsumer = &cli.Command{ c.String(rmqFlagTopicSyncServersCmd.Name), c.String(rmqFlagTopicServerSyncedEvent.Name), c.String(rmqFlagTopicTribesSyncedEvent.Name), + c.String(rmqFlagTopicPlayerSyncedEvent.Name), ) consumer.Register(router) @@ -134,8 +135,14 @@ var cmdConsumer = &cli.Command{ return err } + playerPublisher := adapter.NewPlayerWatermillPublisher( + publisher, + marshaler, + c.String(rmqFlagTopicPlayerSyncedEvent.Name), + ) + consumer := port.NewPlayerWatermillConsumer( - app.NewPlayerService(adapter.NewPlayerBunRepository(db), twSvc), + app.NewPlayerService(adapter.NewPlayerBunRepository(db), twSvc, playerPublisher), subscriber, logger, marshaler, diff --git a/cmd/twhelp/rabbitmq.go b/cmd/twhelp/rabbitmq.go index 12d25cf..8981f49 100644 --- a/cmd/twhelp/rabbitmq.go +++ b/cmd/twhelp/rabbitmq.go @@ -31,11 +31,17 @@ var ( Value: "tribes.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"}, } + rmqFlagTopicPlayerSyncedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.playersSyncedEvent", + Value: "players.event.synced", + EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, rmqFlagTopicServerSyncedEvent, rmqFlagTopicTribesSyncedEvent, + rmqFlagTopicPlayerSyncedEvent, } ) diff --git a/internal/adapter/publisher_watermill_player.go b/internal/adapter/publisher_watermill_player.go new file mode 100644 index 0000000..f36cb2b --- /dev/null +++ b/internal/adapter/publisher_watermill_player.go @@ -0,0 +1,55 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type PlayerWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + eventSyncedTopic string +} + +func NewPlayerWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + eventSyncedTopic string, +) *PlayerWatermillPublisher { + return &PlayerWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + eventSyncedTopic: eventSyncedTopic, + } +} + +func (pub *PlayerWatermillPublisher) EventSynced( + ctx context.Context, + payloads ...domain.PlayersSyncedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.PlayersSyncedEventPayload{ + ServerKey: p.ServerKey(), + VersionCode: p.VersionCode(), + ServerURL: p.ServerURL(), + NumPlayers: p.NumPlayers(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal PlayersSyncedEventPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) + } + + return nil +} diff --git a/internal/adapter/repository_bun_server.go b/internal/adapter/repository_bun_server.go index 3b9fc02..7726ccd 100644 --- a/internal/adapter/repository_bun_server.go +++ b/internal/adapter/repository_bun_server.go @@ -104,6 +104,7 @@ type updateServerParamsApplier struct { params domain.UpdateServerParams } +//nolint:gocyclo func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery { if config := a.params.Config(); config.Valid { q = q.Set("config = ?", bunmodel.NewServerConfig(config.Value)) @@ -126,6 +127,29 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery { q = q.Set("tribe_data_updated_at = ?", tribeDataSyncedAt.Value) } + if numPlayers := a.params.NumPlayers(); numPlayers.Valid { + q = q.Set("num_players = ?", numPlayers.Value) + } + + if playerDataSyncedAt := a.params.PlayerDataSyncedAt(); playerDataSyncedAt.Valid { + // TODO: rename this column to player_data_synced_at + q = q.Set("player_data_updated_at = ?", playerDataSyncedAt.Value) + } + + if numVillages := a.params.NumVillages(); numVillages.Valid { + q = q.Set("num_villages = ?", numVillages.Value) + } + + if villageDataSyncedAt := a.params.VillageDataSyncedAt(); villageDataSyncedAt.Valid { + // TODO: rename this column to village_data_synced_at + q = q.Set("village_data_updated_at = ?", villageDataSyncedAt.Value) + } + + if ennoblementDataSyncedAt := a.params.EnnoblementDataSyncedAt(); ennoblementDataSyncedAt.Valid { + // TODO: rename this column to ennoblement_data_synced_at + q = q.Set("ennoblement_data_updated_at = ?", ennoblementDataSyncedAt.Value) + } + return q } diff --git a/internal/adapter/repository_server_test.go b/internal/adapter/repository_server_test.go index 5499a76..7565ee3 100644 --- a/internal/adapter/repository_server_test.go +++ b/internal/adapter/repository_server_test.go @@ -411,6 +411,26 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories Value: time.Now(), Valid: true, })) + require.NoError(t, updateParams.SetNumPlayers(domain.NullInt{ + Value: gofakeit.IntRange(0, math.MaxInt), + Valid: true, + })) + require.NoError(t, updateParams.SetPlayerDataSyncedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + })) + require.NoError(t, updateParams.SetNumVillages(domain.NullInt{ + Value: gofakeit.IntRange(0, math.MaxInt), + Valid: true, + })) + require.NoError(t, updateParams.SetVillageDataSyncedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + })) + require.NoError(t, updateParams.SetEnnoblementDataSyncedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + })) require.NoError(t, repos.server.Update(ctx, serversBeforeUpdate[0].Key(), updateParams)) @@ -431,6 +451,26 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories serversAfterUpdate[0].TribeDataSyncedAt(), time.Minute, ) + assert.Equal(t, updateParams.NumPlayers().Value, serversAfterUpdate[0].NumPlayers()) + assert.WithinDuration( + t, + updateParams.PlayerDataSyncedAt().Value, + serversAfterUpdate[0].PlayerDataSyncedAt(), + time.Minute, + ) + assert.Equal(t, updateParams.NumVillages().Value, serversAfterUpdate[0].NumVillages()) + assert.WithinDuration( + t, + updateParams.VillageDataSyncedAt().Value, + serversAfterUpdate[0].VillageDataSyncedAt(), + time.Minute, + ) + assert.WithinDuration( + t, + updateParams.EnnoblementDataSyncedAt().Value, + serversAfterUpdate[0].EnnoblementDataSyncedAt(), + time.Minute, + ) }) t.Run("ERR: not found", func(t *testing.T) { diff --git a/internal/app/publishers.go b/internal/app/publishers.go index 0095f00..ee8d272 100644 --- a/internal/app/publishers.go +++ b/internal/app/publishers.go @@ -14,3 +14,7 @@ type ServerPublisher interface { type TribePublisher interface { EventSynced(ctx context.Context, payloads ...domain.TribesSyncedEventPayload) error } + +type PlayerPublisher interface { + EventSynced(ctx context.Context, payloads ...domain.PlayersSyncedEventPayload) error +} diff --git a/internal/app/service_player.go b/internal/app/service_player.go index c4a2e46..91293f5 100644 --- a/internal/app/service_player.go +++ b/internal/app/service_player.go @@ -20,10 +20,11 @@ type PlayerRepository interface { type PlayerService struct { repo PlayerRepository twSvc TWService + pub PlayerPublisher } -func NewPlayerService(repo PlayerRepository, twSvc TWService) *PlayerService { - return &PlayerService{repo: repo, twSvc: twSvc} +func NewPlayerService(repo PlayerRepository, twSvc TWService, pub PlayerPublisher) *PlayerService { + return &PlayerService{repo: repo, twSvc: twSvc, pub: pub} } func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error { @@ -43,6 +44,20 @@ func (svc *PlayerService) Sync(ctx context.Context, serverSyncedPayload domain.S return fmt.Errorf("%s: couldn't delete players: %w", serverKey, err) } + playersSyncedPayload, err := domain.NewPlayersSyncedEventPayload( + serverKey, + serverURL, + serverSyncedPayload.VersionCode(), + len(players), + ) + if err != nil { + return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err) + } + + if err = svc.pub.EventSynced(ctx, playersSyncedPayload); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + return nil } diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 6c7db27..6ed00ea 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -199,3 +199,23 @@ func (svc *ServerService) UpdateNumTribes(ctx context.Context, payload domain.Tr return svc.repo.Update(ctx, key, updateParams) } + +func (svc *ServerService) UpdateNumPlayers(ctx context.Context, payload domain.PlayersSyncedEventPayload) error { + key := payload.ServerKey() + + var updateParams domain.UpdateServerParams + if err := updateParams.SetNumPlayers(domain.NullInt{ + Value: payload.NumPlayers(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + if err := updateParams.SetPlayerDataSyncedAt(domain.NullTime{ + Value: time.Now(), + Valid: true, + }); err != nil { + return fmt.Errorf("%s: %w", key, err) + } + + return svc.repo.Update(ctx, key, updateParams) +} diff --git a/internal/domain/player_message_payloads.go b/internal/domain/player_message_payloads.go new file mode 100644 index 0000000..06cf939 --- /dev/null +++ b/internal/domain/player_message_payloads.go @@ -0,0 +1,77 @@ +package domain + +import ( + "math" + "net/url" +) + +type PlayersSyncedEventPayload struct { + serverKey string + serverURL *url.URL + versionCode string + numPlayers int +} + +const playersSyncedEventPayloadModelName = "PlayersSyncedEventPayload" + +func NewPlayersSyncedEventPayload( + serverKey string, + serverURL *url.URL, + versionCode string, + numPlayers int, +) (PlayersSyncedEventPayload, error) { + if serverKey == "" { + return PlayersSyncedEventPayload{}, ValidationError{ + Model: playersSyncedEventPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if serverURL == nil { + return PlayersSyncedEventPayload{}, ValidationError{ + Model: playersSyncedEventPayloadModelName, + Field: "serverURL", + Err: ErrNil, + } + } + + if versionCode == "" { + return PlayersSyncedEventPayload{}, ValidationError{ + Model: playersSyncedEventPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + if err := validateIntInRange(numPlayers, 0, math.MaxInt); err != nil { + return PlayersSyncedEventPayload{}, ValidationError{ + Model: playersSyncedEventPayloadModelName, + Field: "numPlayers", + Err: err, + } + } + + return PlayersSyncedEventPayload{ + serverKey: serverKey, + serverURL: serverURL, + versionCode: versionCode, + numPlayers: numPlayers, + }, nil +} + +func (p PlayersSyncedEventPayload) ServerKey() string { + return p.serverKey +} + +func (p PlayersSyncedEventPayload) ServerURL() *url.URL { + return p.serverURL +} + +func (p PlayersSyncedEventPayload) VersionCode() string { + return p.versionCode +} + +func (p PlayersSyncedEventPayload) NumPlayers() int { + return p.numPlayers +} diff --git a/internal/domain/player_message_payloads_test.go b/internal/domain/player_message_payloads_test.go new file mode 100644 index 0000000..edd4f15 --- /dev/null +++ b/internal/domain/player_message_payloads_test.go @@ -0,0 +1,31 @@ +package domain_test + +import ( + "math" + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewPlayersSyncedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + numPlayers := gofakeit.IntRange(0, math.MaxInt) + + payload, err := domain.NewPlayersSyncedEventPayload( + server.Key(), + server.URL(), + server.VersionCode(), + numPlayers, + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) + assert.Equal(t, numPlayers, payload.NumPlayers()) +} diff --git a/internal/domain/server.go b/internal/domain/server.go index f4e3fd9..b593fd5 100644 --- a/internal/domain/server.go +++ b/internal/domain/server.go @@ -274,11 +274,16 @@ func (params CreateServerParams) VersionCode() string { } type UpdateServerParams struct { - config NullServerConfig - buildingInfo NullBuildingInfo - unitInfo NullUnitInfo - numTribes NullInt - tribeDataSyncedAt NullTime + config NullServerConfig + buildingInfo NullBuildingInfo + unitInfo NullUnitInfo + numTribes NullInt + tribeDataSyncedAt NullTime + numPlayers NullInt + playerDataSyncedAt NullTime + numVillages NullInt + villageDataSyncedAt NullTime + ennoblementDataSyncedAt NullTime } const updateServerParamsModelName = "UpdateServerParams" @@ -339,12 +344,62 @@ func (params *UpdateServerParams) SetTribeDataSyncedAt(tribeDataSyncedAt NullTim return nil } +func (params *UpdateServerParams) NumPlayers() NullInt { + return params.numPlayers +} + +func (params *UpdateServerParams) SetNumPlayers(numPlayers NullInt) error { + params.numPlayers = numPlayers + return nil +} + +func (params *UpdateServerParams) PlayerDataSyncedAt() NullTime { + return params.playerDataSyncedAt +} + +func (params *UpdateServerParams) SetPlayerDataSyncedAt(playerDataSyncedAt NullTime) error { + params.playerDataSyncedAt = playerDataSyncedAt + return nil +} + +func (params *UpdateServerParams) NumVillages() NullInt { + return params.numVillages +} + +func (params *UpdateServerParams) SetNumVillages(numVillages NullInt) error { + params.numVillages = numVillages + return nil +} + +func (params *UpdateServerParams) VillageDataSyncedAt() NullTime { + return params.villageDataSyncedAt +} + +func (params *UpdateServerParams) SetVillageDataSyncedAt(villageDataSyncedAt NullTime) error { + params.villageDataSyncedAt = villageDataSyncedAt + return nil +} + +func (params *UpdateServerParams) EnnoblementDataSyncedAt() NullTime { + return params.ennoblementDataSyncedAt +} + +func (params *UpdateServerParams) SetEnnoblementDataSyncedAt(ennoblementDataSyncedAt NullTime) error { + params.ennoblementDataSyncedAt = ennoblementDataSyncedAt + return nil +} + func (params *UpdateServerParams) IsZero() bool { return !params.config.Valid && !params.buildingInfo.Valid && !params.unitInfo.Valid && !params.numTribes.Valid && - !params.tribeDataSyncedAt.Valid + !params.tribeDataSyncedAt.Valid && + !params.numPlayers.Valid && + !params.playerDataSyncedAt.Valid && + !params.numVillages.Valid && + !params.villageDataSyncedAt.Valid && + !params.ennoblementDataSyncedAt.Valid } type ServerSort uint8 diff --git a/internal/port/consumer_watermill_server.go b/internal/port/consumer_watermill_server.go index 827083a..3f79c83 100644 --- a/internal/port/consumer_watermill_server.go +++ b/internal/port/consumer_watermill_server.go @@ -9,13 +9,14 @@ import ( ) type ServerWatermillConsumer struct { - svc *app.ServerService - subscriber message.Subscriber - logger watermill.LoggerAdapter - marshaler watermillmsg.Marshaler - cmdSyncTopic string - eventServerSyncedTopic string - eventTribesSyncedTopic string + svc *app.ServerService + subscriber message.Subscriber + logger watermill.LoggerAdapter + marshaler watermillmsg.Marshaler + cmdSyncTopic string + eventServerSyncedTopic string + eventTribesSyncedTopic string + eventPlayersSyncedTopic string } func NewServerWatermillConsumer( @@ -26,15 +27,17 @@ func NewServerWatermillConsumer( cmdSyncTopic string, eventServerSyncedTopic string, eventTribesSyncedTopic string, + eventPlayersSyncedTopic string, ) *ServerWatermillConsumer { return &ServerWatermillConsumer{ - svc: svc, - subscriber: subscriber, - logger: logger, - marshaler: marshaler, - cmdSyncTopic: cmdSyncTopic, - eventServerSyncedTopic: eventServerSyncedTopic, - eventTribesSyncedTopic: eventTribesSyncedTopic, + svc: svc, + subscriber: subscriber, + logger: logger, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + eventServerSyncedTopic: eventServerSyncedTopic, + eventTribesSyncedTopic: eventTribesSyncedTopic, + eventPlayersSyncedTopic: eventPlayersSyncedTopic, } } @@ -52,6 +55,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) { c.subscriber, c.updateNumTribes, ) + router.AddNoPublisherHandler( + "ServerConsumer.updateNumPlayers", + c.eventPlayersSyncedTopic, + c.subscriber, + c.updateNumPlayers, + ) } func (c *ServerWatermillConsumer) sync(msg *message.Message) error { @@ -113,7 +122,7 @@ func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error { rawPayload.NumTribes, ) if err != nil { - c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{ + c.logger.Error("couldn't construct domain.TribesSyncedEventPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil @@ -121,3 +130,29 @@ func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error { return c.svc.UpdateNumTribes(msg.Context(), payload) } + +func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error { + var rawPayload watermillmsg.PlayersSyncedEventPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewPlayersSyncedEventPayload( + rawPayload.ServerKey, + rawPayload.ServerURL, + rawPayload.VersionCode, + rawPayload.NumPlayers, + ) + if err != nil { + c.logger.Error("couldn't construct domain.PlayersSyncedEventPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.svc.UpdateNumPlayers(msg.Context(), payload) +} diff --git a/internal/watermillmsg/player.go b/internal/watermillmsg/player.go new file mode 100644 index 0000000..faf538b --- /dev/null +++ b/internal/watermillmsg/player.go @@ -0,0 +1,10 @@ +package watermillmsg + +import "net/url" + +type PlayersSyncedEventPayload struct { + ServerKey string `json:"serverKey"` + ServerURL *url.URL `json:"serverUrl"` + VersionCode string `json:"versionCode"` + NumPlayers int `json:"numPlayers"` +}