feat(ServerConsumer): add 3 new handlers (#34)
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: twhelp/core#34
This commit is contained in:
Dawid Wysokiński 2022-08-16 04:48:55 +00:00
parent 631c080376
commit 2a8bebd68e
8 changed files with 377 additions and 4 deletions

View File

@ -140,5 +140,29 @@ func (u updateServerParamsApplier) Apply(q *bun.UpdateQuery) *bun.UpdateQuery {
q = q.Set("building_info = ?", model.NewBuildingInfo(u.params.BuildingInfo.Info))
}
if u.params.NumPlayers.Valid {
q = q.Set("num_players = ?", u.params.NumPlayers.Int64)
}
if !u.params.PlayerDataUpdatedAt.IsZero() {
q = q.Set("player_data_updated_at = ?", u.params.PlayerDataUpdatedAt)
}
if u.params.NumTribes.Valid {
q = q.Set("num_tribes = ?", u.params.NumTribes.Int64)
}
if !u.params.TribeDataUpdatedAt.IsZero() {
q = q.Set("tribe_data_updated_at = ?", u.params.TribeDataUpdatedAt)
}
if u.params.NumVillages.Valid {
q = q.Set("num_villages = ?", u.params.NumVillages.Int64)
}
if !u.params.VillageDataUpdatedAt.IsZero() {
q = q.Set("village_data_updated_at = ?", u.params.VillageDataUpdatedAt)
}
return q
}

View File

@ -3,6 +3,7 @@ package bundb_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/uptrace/bun/dbfixture"
@ -297,6 +298,21 @@ func TestServer_UpdateByKey(t *testing.T) {
},
},
},
NumPlayers: domain.NullInt64{
Int64: 123,
Valid: true,
},
PlayerDataUpdatedAt: time.Now(),
NumTribes: domain.NullInt64{
Int64: 10,
Valid: true,
},
TribeDataUpdatedAt: time.Now().Add(-1 * time.Minute),
NumVillages: domain.NullInt64{
Int64: 12345,
Valid: true,
},
VillageDataUpdatedAt: time.Now().Add(-1 * time.Hour),
}
updatedServer, err := repo.UpdateByKey(context.Background(), server.Key, params)
@ -305,6 +321,12 @@ func TestServer_UpdateByKey(t *testing.T) {
assert.Equal(t, params.Config.Config, updatedServer.Config)
assert.Equal(t, params.UnitInfo.Info, updatedServer.UnitInfo)
assert.Equal(t, params.BuildingInfo.Info, updatedServer.BuildingInfo)
assert.Equal(t, params.NumPlayers.Int64, updatedServer.NumPlayers)
assert.WithinDuration(t, params.PlayerDataUpdatedAt, updatedServer.PlayerDataUpdatedAt, 1*time.Second)
assert.Equal(t, params.NumTribes.Int64, updatedServer.NumTribes)
assert.WithinDuration(t, params.TribeDataUpdatedAt, updatedServer.TribeDataUpdatedAt, 1*time.Second)
assert.Equal(t, params.NumVillages.Int64, updatedServer.NumVillages)
assert.WithinDuration(t, params.VillageDataUpdatedAt, updatedServer.VillageDataUpdatedAt, 1*time.Second)
})
t.Run("ERR: nothing to update", func(t *testing.T) {

View File

@ -8,3 +8,8 @@ type NullBool struct {
Bool bool
Valid bool // Valid is true if Bool is not NULL
}
type NullInt64 struct {
Int64 int64
Valid bool // Valid is true if Int64 is not NULL
}

View File

@ -37,13 +37,27 @@ type CreateServerParams struct {
}
type UpdateServerParams struct {
Config NullServerConfig
BuildingInfo NullBuildingInfo
UnitInfo NullUnitInfo
Config NullServerConfig
BuildingInfo NullBuildingInfo
UnitInfo NullUnitInfo
NumPlayers NullInt64
PlayerDataUpdatedAt time.Time
NumTribes NullInt64
TribeDataUpdatedAt time.Time
NumVillages NullInt64
VillageDataUpdatedAt time.Time
}
func (u UpdateServerParams) IsZero() bool {
return !u.Config.Valid && !u.BuildingInfo.Valid && !u.UnitInfo.Valid
return !u.Config.Valid &&
!u.BuildingInfo.Valid &&
!u.UnitInfo.Valid &&
!u.NumPlayers.Valid &&
u.PlayerDataUpdatedAt.IsZero() &&
!u.NumTribes.Valid &&
u.TribeDataUpdatedAt.IsZero() &&
!u.NumVillages.Valid &&
u.VillageDataUpdatedAt.IsZero()
}
type ListServersParams struct {

View File

@ -2,6 +2,7 @@ package domain_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
@ -37,6 +38,48 @@ func TestUpdateServerParams_IsZero(t *testing.T) {
},
output: false,
},
{
name: "OK: NumPlayers",
params: domain.UpdateServerParams{
NumPlayers: domain.NullInt64{Valid: true},
},
output: false,
},
{
name: "OK: PlayerDataUpdatedAt",
params: domain.UpdateServerParams{
PlayerDataUpdatedAt: time.Now(),
},
output: false,
},
{
name: "OK: NumTribes",
params: domain.UpdateServerParams{
NumTribes: domain.NullInt64{Valid: true},
},
output: false,
},
{
name: "OK: TribeDataUpdatedAt",
params: domain.UpdateServerParams{
TribeDataUpdatedAt: time.Now(),
},
output: false,
},
{
name: "OK: NumVillages",
params: domain.UpdateServerParams{
NumVillages: domain.NullInt64{Valid: true},
},
output: false,
},
{
name: "OK: VillageDataUpdatedAt",
params: domain.UpdateServerParams{
VillageDataUpdatedAt: time.Now(),
},
output: false,
},
{
name: "OK: empty struct",
params: domain.UpdateServerParams{},

View File

@ -12,6 +12,9 @@ import (
type ServerService interface {
Refresh(ctx context.Context, host, versionCode string) ([]domain.Server, error)
UpdateInfoAndConfig(ctx context.Context, key, url string) error
UpdateNumPlayers(ctx context.Context, key string, numPlayers int64) error
UpdateNumTribes(ctx context.Context, key string, numTribes int64) error
UpdateNumVillages(ctx context.Context, key string, numVillages int64) error
}
type ServerConsumer struct {
@ -51,6 +54,27 @@ func (s *ServerConsumer) Register(router *message.Router) {
s.subscriber,
s.updateInfoAndConfig,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumTribes",
tribesEventRefreshed,
s.subscriber,
s.updateNumTribes,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumPlayers",
playersEventRefreshed,
s.subscriber,
s.updateNumPlayers,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumVillages",
villagesEventRefreshed,
s.subscriber,
s.updateNumVillages,
)
}
func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
@ -99,3 +123,42 @@ func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
return nil
}
func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
var payload domain.EventPlayersRefreshedPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}
if err := s.svc.UpdateNumPlayers(msg.Context(), payload.Key, payload.NumPlayers); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumPlayers: %w", err)
}
return nil
}
func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
var payload domain.EventTribesRefreshedPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}
if err := s.svc.UpdateNumTribes(msg.Context(), payload.Key, payload.NumTribes); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumTribes: %w", err)
}
return nil
}
func (s *ServerConsumer) updateNumVillages(msg *message.Message) error {
var payload domain.EventVillagesRefreshedPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}
if err := s.svc.UpdateNumVillages(msg.Context(), payload.Key, payload.NumVillages); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumVillages: %w", err)
}
return nil
}

View File

@ -176,3 +176,162 @@ func TestServerConsumer_updateInfoAndConfig(t *testing.T) {
assert.Equal(t, 0, serverSvc.UpdateInfoAndConfigCallCount())
})
}
func TestServerConsumer_updateNumPlayers(t *testing.T) {
t.Parallel()
marshaler := msg.GobMarshaler{}
pubSub := newPubSub(t)
serverSvc := &mock.FakeServerService{}
serverSvc.UpdateNumPlayersReturns(nil)
wait := make(chan struct{})
runRouter(
t,
middlewareRegisterer{
h: func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if message.HandlerNameFromCtx(msg.Context()) != "ServerConsumer.updateNumPlayers" {
return h(msg)
}
result, err := h(msg)
if len(result) == 0 && err == nil {
close(wait)
}
return result, err
}
},
},
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventPlayersRefreshedPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",
NumPlayers: 12345,
}
ev, err := marshaler.Marshal(payload)
require.NoError(t, err)
require.NoError(t, pubSub.Publish("players.event.refreshed", ev))
select {
case <-wait:
case <-time.After(time.Second):
t.Fatal("timeout")
}
require.Equal(t, 1, serverSvc.UpdateNumPlayersCallCount())
_, key, numPlayers := serverSvc.UpdateNumPlayersArgsForCall(0)
assert.Equal(t, payload.Key, key)
assert.Equal(t, payload.NumPlayers, numPlayers)
}
func TestServerConsumer_updateNumTribes(t *testing.T) {
t.Parallel()
marshaler := msg.GobMarshaler{}
pubSub := newPubSub(t)
serverSvc := &mock.FakeServerService{}
serverSvc.UpdateNumTribesReturns(nil)
wait := make(chan struct{})
runRouter(
t,
middlewareRegisterer{
h: func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if message.HandlerNameFromCtx(msg.Context()) != "ServerConsumer.updateNumTribes" {
return h(msg)
}
result, err := h(msg)
if len(result) == 0 && err == nil {
close(wait)
}
return result, err
}
},
},
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventTribesRefreshedPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",
NumTribes: 12345,
}
ev, err := marshaler.Marshal(payload)
require.NoError(t, err)
require.NoError(t, pubSub.Publish("tribes.event.refreshed", ev))
select {
case <-wait:
case <-time.After(time.Second):
t.Fatal("timeout")
}
require.Equal(t, 1, serverSvc.UpdateNumTribesCallCount())
_, key, numTribes := serverSvc.UpdateNumTribesArgsForCall(0)
assert.Equal(t, payload.Key, key)
assert.Equal(t, payload.NumTribes, numTribes)
}
func TestServerConsumer_updateNumVillages(t *testing.T) {
t.Parallel()
marshaler := msg.GobMarshaler{}
pubSub := newPubSub(t)
serverSvc := &mock.FakeServerService{}
serverSvc.UpdateNumVillagesReturns(nil)
wait := make(chan struct{})
runRouter(
t,
middlewareRegisterer{
h: func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if message.HandlerNameFromCtx(msg.Context()) != "ServerConsumer.updateNumVillages" {
return h(msg)
}
result, err := h(msg)
if len(result) == 0 && err == nil {
close(wait)
}
return result, err
}
},
},
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventVillagesRefreshedPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",
NumVillages: 12345,
NumPlayerVillages: 12000,
NumBarbarianVillages: 345,
NumBonusVillages: 521,
}
ev, err := marshaler.Marshal(payload)
require.NoError(t, err)
require.NoError(t, pubSub.Publish("villages.event.refreshed", ev))
select {
case <-wait:
case <-time.After(time.Second):
t.Fatal("timeout")
}
require.Equal(t, 1, serverSvc.UpdateNumVillagesCallCount())
_, key, numVillages := serverSvc.UpdateNumVillagesArgsForCall(0)
assert.Equal(t, payload.Key, key)
assert.Equal(t, payload.NumVillages, numVillages)
}

View File

@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
)
@ -125,6 +126,48 @@ func (s *Server) UpdateInfoAndConfig(ctx context.Context, key, url string) error
return nil
}
func (s *Server) UpdateNumPlayers(ctx context.Context, key string, numPlayers int64) error {
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumPlayers: domain.NullInt64{
Valid: true,
Int64: numPlayers,
},
PlayerDataUpdatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func (s *Server) UpdateNumTribes(ctx context.Context, key string, numTribes int64) error {
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumTribes: domain.NullInt64{
Valid: true,
Int64: numTribes,
},
TribeDataUpdatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func (s *Server) UpdateNumVillages(ctx context.Context, key string, numVillages int64) error {
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumVillages: domain.NullInt64{
Valid: true,
Int64: numVillages,
},
VillageDataUpdatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func isInOpen(servers []domain.OpenServer, key string) bool {
for _, srv := range servers {
if srv.Key == key {