feat: server - update NumVillages & VillageDataSyncedAt (#24)

Reviewed-on: twhelp/corev3#24
This commit is contained in:
Dawid Wysokiński 2024-01-02 10:47:02 +00:00
parent 422fb7dcb9
commit c726411f43
14 changed files with 500 additions and 24 deletions

View File

@ -62,7 +62,8 @@ var cmdConsumer = &cli.Command{
c.String(rmqFlagTopicSyncServersCmd.Name),
c.String(rmqFlagTopicServerSyncedEvent.Name),
c.String(rmqFlagTopicTribesSyncedEvent.Name),
c.String(rmqFlagTopicPlayerSyncedEvent.Name),
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
)
consumer.Register(router)
@ -138,7 +139,7 @@ var cmdConsumer = &cli.Command{
playerPublisher := adapter.NewPlayerWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicPlayerSyncedEvent.Name),
c.String(rmqFlagTopicPlayersSyncedEvent.Name),
)
consumer := port.NewPlayerWatermillConsumer(
@ -177,8 +178,14 @@ var cmdConsumer = &cli.Command{
return err
}
villagePublisher := adapter.NewVillageWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicVillagesSyncedEvent.Name),
)
consumer := port.NewVillageWatermillConsumer(
app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc),
app.NewVillageService(adapter.NewVillageBunRepository(db), twSvc, villagePublisher),
subscriber,
logger,
marshaler,

View File

@ -31,17 +31,23 @@ var (
Value: "tribes.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
}
rmqFlagTopicPlayerSyncedEvent = &cli.StringFlag{
rmqFlagTopicPlayersSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.playersSyncedEvent",
Value: "players.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_PLAYERS_SYNCED_EVENT"},
}
rmqFlagTopicVillagesSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.villagesSyncedEvent",
Value: "villages.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_VILLAGES_SYNCED_EVENT"},
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd,
rmqFlagTopicServerSyncedEvent,
rmqFlagTopicTribesSyncedEvent,
rmqFlagTopicPlayerSyncedEvent,
rmqFlagTopicPlayersSyncedEvent,
rmqFlagTopicVillagesSyncedEvent,
}
)

View File

@ -0,0 +1,58 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type VillageWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
eventSyncedTopic string
}
func NewVillageWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
eventSyncedTopic string,
) *VillageWatermillPublisher {
return &VillageWatermillPublisher{
publisher: publisher,
marshaler: marshaler,
eventSyncedTopic: eventSyncedTopic,
}
}
func (pub *VillageWatermillPublisher) EventSynced(
ctx context.Context,
payloads ...domain.VillagesSyncedEventPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.VillagesSyncedEventPayload{
ServerKey: p.ServerKey(),
ServerURL: p.ServerURL(),
VersionCode: p.VersionCode(),
NumVillages: p.NumVillages(),
NumPlayerVillages: p.NumPlayerVillages(),
NumBarbarianVillages: p.NumBarbarianVillages(),
NumBonusVillages: p.NumBonusVillages(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal VillagesSyncedEventPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
}
return nil
}

View File

@ -140,6 +140,18 @@ func (a updateServerParamsApplier) apply(q *bun.UpdateQuery) *bun.UpdateQuery {
q = q.Set("num_villages = ?", numVillages.Value)
}
if numPlayerVillages := a.params.NumPlayerVillages(); numPlayerVillages.Valid {
q = q.Set("num_player_villages = ?", numPlayerVillages.Value)
}
if numBarbarianVillages := a.params.NumBarbarianVillages(); numBarbarianVillages.Valid {
q = q.Set("num_barbarian_villages = ?", numBarbarianVillages.Value)
}
if numBonusVillages := a.params.NumBonusVillages(); numBonusVillages.Valid {
q = q.Set("num_bonus_villages = ?", numBonusVillages.Value)
}
if villageDataSyncedAt := a.params.VillageDataSyncedAt(); villageDataSyncedAt.Valid {
// TODO: rename this column to village_data_synced_at
q = q.Set("village_data_updated_at = ?", villageDataSyncedAt.Value)

View File

@ -423,6 +423,18 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetNumPlayerVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetNumBonusVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetNumBarbarianVillages(domain.NullInt{
Value: gofakeit.IntRange(0, math.MaxInt),
Valid: true,
}))
require.NoError(t, updateParams.SetVillageDataSyncedAt(domain.NullTime{
Value: time.Now(),
Valid: true,
@ -459,6 +471,9 @@ func testServerRepository(t *testing.T, newRepos func(t *testing.T) repositories
time.Minute,
)
assert.Equal(t, updateParams.NumVillages().Value, serversAfterUpdate[0].NumVillages())
assert.Equal(t, updateParams.NumPlayerVillages().Value, serversAfterUpdate[0].NumPlayerVillages())
assert.Equal(t, updateParams.NumBarbarianVillages().Value, serversAfterUpdate[0].NumBarbarianVillages())
assert.Equal(t, updateParams.NumBonusVillages().Value, serversAfterUpdate[0].NumBonusVillages())
assert.WithinDuration(
t,
updateParams.VillageDataSyncedAt().Value,

View File

@ -18,3 +18,7 @@ type TribePublisher interface {
type PlayerPublisher interface {
EventSynced(ctx context.Context, payloads ...domain.PlayersSyncedEventPayload) error
}
type VillagePublisher interface {
EventSynced(ctx context.Context, payloads ...domain.VillagesSyncedEventPayload) error
}

View File

@ -219,3 +219,41 @@ func (svc *ServerService) UpdateNumPlayers(ctx context.Context, payload domain.P
return svc.repo.Update(ctx, key, updateParams)
}
func (svc *ServerService) UpdateNumVillages(ctx context.Context, payload domain.VillagesSyncedEventPayload) error {
key := payload.ServerKey()
var updateParams domain.UpdateServerParams
if err := updateParams.SetNumVillages(domain.NullInt{
Value: payload.NumVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumPlayerVillages(domain.NullInt{
Value: payload.NumPlayerVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumBarbarianVillages(domain.NullInt{
Value: payload.NumBarbarianVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetNumBonusVillages(domain.NullInt{
Value: payload.NumBonusVillages(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
if err := updateParams.SetVillageDataSyncedAt(domain.NullTime{
Value: time.Now(),
Valid: true,
}); err != nil {
return fmt.Errorf("%s: %w", key, err)
}
return svc.repo.Update(ctx, key, updateParams)
}

View File

@ -16,10 +16,11 @@ type VillageRepository interface {
type VillageService struct {
repo VillageRepository
twSvc TWService
pub VillagePublisher
}
func NewVillageService(repo VillageRepository, twSvc TWService) *VillageService {
return &VillageService{repo: repo, twSvc: twSvc}
func NewVillageService(repo VillageRepository, twSvc TWService, pub VillagePublisher) *VillageService {
return &VillageService{repo: repo, twSvc: twSvc, pub: pub}
}
func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error {
@ -39,6 +40,20 @@ func (svc *VillageService) Sync(ctx context.Context, serverSyncedPayload domain.
return fmt.Errorf("%s: couldn't delete villages: %w", serverKey, err)
}
villagesSyncedPayload, err := domain.NewVillagesSyncedEventPayloadFromVillages(
serverKey,
serverURL,
serverSyncedPayload.VersionCode(),
villages,
)
if err != nil {
return fmt.Errorf("%s: couldn't construct domain.PlayersSyncedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventSynced(ctx, villagesSyncedPayload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
type BaseVillageConfig struct {
ID int
Bonus int
PlayerID int
}
@ -19,6 +20,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B
cfg := &BaseVillageConfig{
ID: RandID(),
PlayerID: gofakeit.IntRange(0, 10000),
Bonus: 0,
}
for _, opt := range opts {
@ -35,7 +37,7 @@ func NewBaseVillage(tb TestingTB, opts ...func(cfg *BaseVillageConfig)) domain.B
gofakeit.IntRange(1, 1000),
gofakeit.IntRange(1, 1000),
gofakeit.LetterN(3),
0,
cfg.Bonus,
cfg.PlayerID,
u,
)

View File

@ -282,6 +282,9 @@ type UpdateServerParams struct {
numPlayers NullInt
playerDataSyncedAt NullTime
numVillages NullInt
numPlayerVillages NullInt
numBarbarianVillages NullInt
numBonusVillages NullInt
villageDataSyncedAt NullTime
ennoblementDataSyncedAt NullTime
}
@ -371,6 +374,33 @@ func (params *UpdateServerParams) SetNumVillages(numVillages NullInt) error {
return nil
}
func (params *UpdateServerParams) NumPlayerVillages() NullInt {
return params.numPlayerVillages
}
func (params *UpdateServerParams) SetNumPlayerVillages(numPlayerVillages NullInt) error {
params.numPlayerVillages = numPlayerVillages
return nil
}
func (params *UpdateServerParams) NumBarbarianVillages() NullInt {
return params.numBarbarianVillages
}
func (params *UpdateServerParams) SetNumBarbarianVillages(numBarbarianVillages NullInt) error {
params.numBarbarianVillages = numBarbarianVillages
return nil
}
func (params *UpdateServerParams) NumBonusVillages() NullInt {
return params.numBonusVillages
}
func (params *UpdateServerParams) SetNumBonusVillages(numBonusVillages NullInt) error {
params.numBonusVillages = numBonusVillages
return nil
}
func (params *UpdateServerParams) VillageDataSyncedAt() NullTime {
return params.villageDataSyncedAt
}

View File

@ -0,0 +1,155 @@
package domain
import (
"math"
"net/url"
)
type VillagesSyncedEventPayload struct {
serverKey string
serverURL *url.URL
versionCode string
numVillages int
numPlayerVillages int
numBarbarianVillages int
numBonusVillages int
}
const villagesSyncedEventPayloadModelName = "VillagesSyncedEventPayload"
func NewVillagesSyncedEventPayload(
serverKey string,
serverURL *url.URL,
versionCode string,
numVillages int,
numPlayerVillages int,
numBarbarianVillages int,
numBonusVillages int,
) (VillagesSyncedEventPayload, error) {
if serverKey == "" {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "serverKey",
Err: ErrRequired,
}
}
if serverURL == nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "serverURL",
Err: ErrNil,
}
}
if versionCode == "" {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "versionCode",
Err: ErrRequired,
}
}
if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numVillages",
Err: err,
}
}
if err := validateIntInRange(numPlayerVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numPlayerVillages",
Err: err,
}
}
if err := validateIntInRange(numBarbarianVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numBarbarianVillages",
Err: err,
}
}
if err := validateIntInRange(numBonusVillages, 0, math.MaxInt); err != nil {
return VillagesSyncedEventPayload{}, ValidationError{
Model: villagesSyncedEventPayloadModelName,
Field: "numBonusVillages",
Err: err,
}
}
return VillagesSyncedEventPayload{
serverKey: serverKey,
serverURL: serverURL,
versionCode: versionCode,
numVillages: numVillages,
numPlayerVillages: numPlayerVillages,
numBarbarianVillages: numBarbarianVillages,
numBonusVillages: numBonusVillages,
}, nil
}
func NewVillagesSyncedEventPayloadFromVillages(
serverKey string,
serverURL *url.URL,
versionCode string,
villages BaseVillages,
) (VillagesSyncedEventPayload, error) {
numPlayerVillages := 0
numBarbarianVillages := 0
numBonusVillages := 0
for _, v := range villages {
if v.PlayerID() > 0 {
numPlayerVillages++
} else {
numBarbarianVillages++
}
if v.Bonus() > 0 {
numBonusVillages++
}
}
return NewVillagesSyncedEventPayload(
serverKey,
serverURL,
versionCode,
len(villages),
numPlayerVillages,
numBarbarianVillages,
numBonusVillages,
)
}
func (p VillagesSyncedEventPayload) ServerKey() string {
return p.serverKey
}
func (p VillagesSyncedEventPayload) ServerURL() *url.URL {
return p.serverURL
}
func (p VillagesSyncedEventPayload) VersionCode() string {
return p.versionCode
}
func (p VillagesSyncedEventPayload) NumVillages() int {
return p.numVillages
}
func (p VillagesSyncedEventPayload) NumPlayerVillages() int {
return p.numPlayerVillages
}
func (p VillagesSyncedEventPayload) NumBarbarianVillages() int {
return p.numBarbarianVillages
}
func (p VillagesSyncedEventPayload) NumBonusVillages() int {
return p.numBonusVillages
}

View File

@ -0,0 +1,83 @@
package domain_test
import (
"math"
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewVillagesSyncedEventPayload(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
numVillages := gofakeit.IntRange(0, math.MaxInt)
numPlayerVillages := gofakeit.IntRange(0, math.MaxInt)
numBarbarianVillages := gofakeit.IntRange(0, math.MaxInt)
numBonusVillages := gofakeit.IntRange(0, math.MaxInt)
payload, err := domain.NewVillagesSyncedEventPayload(
server.Key(),
server.URL(),
server.VersionCode(),
numVillages,
numPlayerVillages,
numBarbarianVillages,
numBonusVillages,
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
assert.Equal(t, numVillages, payload.NumVillages())
assert.Equal(t, numPlayerVillages, payload.NumPlayerVillages())
assert.Equal(t, numBarbarianVillages, payload.NumBarbarianVillages())
assert.Equal(t, numBonusVillages, payload.NumBonusVillages())
}
func TestNewVillagesSyncedEventPayloadFromVillages(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
villages := domain.BaseVillages{
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 1
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = 0
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = gofakeit.IntRange(1, 10000)
cfg.Bonus = 0
}),
domaintest.NewBaseVillage(t, func(cfg *domaintest.BaseVillageConfig) {
cfg.PlayerID = gofakeit.IntRange(1, 10000)
cfg.Bonus = 1
}),
}
payload, err := domain.NewVillagesSyncedEventPayloadFromVillages(
server.Key(),
server.URL(),
server.VersionCode(),
villages,
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
assert.Equal(t, len(villages), payload.NumVillages()) //nolint:testifylint
assert.Equal(t, 2, payload.NumPlayerVillages())
assert.Equal(t, 2, payload.NumBonusVillages())
assert.Equal(t, 3, payload.NumBarbarianVillages())
}

View File

@ -9,14 +9,15 @@ import (
)
type ServerWatermillConsumer struct {
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
eventServerSyncedTopic string
eventTribesSyncedTopic string
eventPlayersSyncedTopic string
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
eventServerSyncedTopic string
eventTribesSyncedTopic string
eventPlayersSyncedTopic string
eventVillagesSyncedTopic string
}
func NewServerWatermillConsumer(
@ -28,16 +29,18 @@ func NewServerWatermillConsumer(
eventServerSyncedTopic string,
eventTribesSyncedTopic string,
eventPlayersSyncedTopic string,
eventVillagesSyncedTopic string,
) *ServerWatermillConsumer {
return &ServerWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
}
}
@ -61,6 +64,12 @@ func (c *ServerWatermillConsumer) Register(router *message.Router) {
c.subscriber,
c.updateNumPlayers,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumVillages",
c.eventVillagesSyncedTopic,
c.subscriber,
c.updateNumVillages,
)
}
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
@ -156,3 +165,32 @@ func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error {
return c.svc.UpdateNumPlayers(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateNumVillages(msg *message.Message) error {
var rawPayload watermillmsg.VillagesSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewVillagesSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumVillages,
rawPayload.NumPlayerVillages,
rawPayload.NumBarbarianVillages,
rawPayload.NumBonusVillages,
)
if err != nil {
c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateNumVillages(msg.Context(), payload)
}

View File

@ -0,0 +1,13 @@
package watermillmsg
import "net/url"
type VillagesSyncedEventPayload struct {
ServerKey string `json:"serverKey"`
ServerURL *url.URL `json:"serverUrl"`
VersionCode string `json:"versionCode"`
NumVillages int `json:"numVillages"`
NumPlayerVillages int `json:"numPlayerVillages"`
NumBarbarianVillages int `json:"numBarbarianVillages"`
NumBonusVillages int `json:"numBonusVillages"`
}