refactor: move event payloads closer to the msg package (#36)
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: twhelp/core#36
This commit is contained in:
Dawid Wysokiński 2022-08-17 04:23:05 +00:00
parent f948443812
commit e4a593c5a1
19 changed files with 103 additions and 77 deletions

View File

@ -52,9 +52,9 @@ func newUpdateDataCommand() *cli.Command {
return fmt.Errorf("VersionService.List: %w", err)
}
payloads := make([]domain.CmdRefreshServersPayload, 0, len(versions))
payloads := make([]domain.RefreshServersCmdPayload, 0, len(versions))
for _, v := range versions {
payloads = append(payloads, domain.CmdRefreshServersPayload{
payloads = append(payloads, domain.RefreshServersCmdPayload{
Host: v.Host,
VersionCode: v.Code,
})

View File

@ -33,10 +33,3 @@ type ListPlayersParams struct {
Deleted NullBool
Count bool
}
type EventPlayersRefreshedPayload struct {
Key string
URL string
VersionCode string
NumPlayers int64
}

View File

@ -67,14 +67,7 @@ type ListServersParams struct {
Count bool
}
type CmdRefreshServersPayload struct {
type RefreshServersCmdPayload struct {
Host string
VersionCode string
}
type EventServerRefreshedPayload struct {
Key string
URL string
Open bool
VersionCode string
}

View File

@ -36,10 +36,3 @@ type ListTribesParams struct {
Deleted NullBool
Count bool
}
type EventTribesRefreshedPayload struct {
Key string
URL string
VersionCode string
NumTribes int64
}

View File

@ -38,13 +38,3 @@ type RefreshVillagesResult struct {
NumBarbarianVillages int64
NumBonusVillages int64
}
type EventVillagesRefreshedPayload struct {
Key string
URL string
VersionCode string
NumVillages int64
NumPlayerVillages int64
NumBarbarianVillages int64
NumBonusVillages int64
}

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message"
)
@ -47,7 +47,7 @@ func (p *PlayerConsumer) Register(router *message.Router) {
}
func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload domain.EventServerRefreshedPayload
var payload model.ServerRefreshedEvPayload
if err := p.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil, nil
@ -63,7 +63,7 @@ func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, erro
return nil, fmt.Errorf("PlayerService.Refresh: %w", err)
}
ev, err := p.marshaler.Marshal(domain.EventPlayersRefreshedPayload{
ev, err := p.marshaler.Marshal(model.PlayersRefreshedEvPayload{
Key: payload.Key,
URL: payload.URL,
VersionCode: payload.VersionCode,

View File

@ -5,9 +5,10 @@ import (
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/mock"
"github.com/ThreeDotsLabs/watermill/message"
@ -33,7 +34,7 @@ func TestPlayerConsumer_refresh(t *testing.T) {
msgs, err := pubSub.Subscribe(context.Background(), "players.event.refreshed")
require.NoError(t, err)
serverRefreshedPayload := domain.EventServerRefreshedPayload{
serverRefreshedPayload := model.ServerRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
Open: true,
@ -45,7 +46,7 @@ func TestPlayerConsumer_refresh(t *testing.T) {
receivedMsgs, _ := subscriber.BulkRead(msgs, 1, 10*time.Second)
require.Len(t, receivedMsgs, 1)
var playersRefreshedPayload domain.EventPlayersRefreshedPayload
var playersRefreshedPayload model.PlayersRefreshedEvPayload
assert.NoError(t, marshaler.Unmarshal(receivedMsgs[0], &playersRefreshedPayload))
assert.Equal(t, serverRefreshedPayload.Key, playersRefreshedPayload.Key)
assert.Equal(t, serverRefreshedPayload.URL, playersRefreshedPayload.URL)
@ -81,7 +82,7 @@ func TestPlayerConsumer_refresh(t *testing.T) {
msg.NewPlayerConsumer(marshaler, pubSub, pubSub, playerSvc),
)
ev, err := marshaler.Marshal(domain.EventServerRefreshedPayload{Open: false})
ev, err := marshaler.Marshal(model.ServerRefreshedEvPayload{Open: false})
require.NoError(t, err)
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/ThreeDotsLabs/watermill/message"
)
@ -78,7 +80,7 @@ func (s *ServerConsumer) Register(router *message.Router) {
}
func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload domain.CmdRefreshServersPayload
var payload model.RefreshServersCmdPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil, nil
@ -91,7 +93,7 @@ func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, erro
events := make([]*message.Message, 0, len(servers))
for _, srv := range servers {
ev, err := s.marshaler.Marshal(domain.EventServerRefreshedPayload{
ev, err := s.marshaler.Marshal(model.ServerRefreshedEvPayload{
Key: srv.Key,
URL: srv.URL,
Open: srv.Open,
@ -106,7 +108,7 @@ func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, erro
}
func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
var payload domain.EventServerRefreshedPayload
var payload model.ServerRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil
@ -125,7 +127,7 @@ func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
var payload domain.EventPlayersRefreshedPayload
var payload model.PlayersRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}
@ -138,7 +140,7 @@ func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
var payload domain.EventTribesRefreshedPayload
var payload model.TribesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}
@ -151,7 +153,7 @@ func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumVillages(msg *message.Message) error {
var payload domain.EventVillagesRefreshedPayload
var payload model.VillagesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
return nil
}

View File

@ -6,6 +6,8 @@ import (
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
@ -56,7 +58,7 @@ func TestServerConsumer_refresh(t *testing.T) {
require.NoError(
t,
msg.NewServerPublisher(pubSub, marshaler).
CmdRefresh(context.Background(), domain.CmdRefreshServersPayload{
CmdRefresh(context.Background(), domain.RefreshServersCmdPayload{
Host: "plemiona.pl",
VersionCode: "pl",
}),
@ -65,12 +67,12 @@ func TestServerConsumer_refresh(t *testing.T) {
receivedMsgs, _ := subscriber.BulkRead(msgs, len(servers), 10*time.Second)
assert.Len(t, receivedMsgs, len(servers))
for _, m := range receivedMsgs {
var received domain.EventServerRefreshedPayload
var received model.ServerRefreshedEvPayload
assert.NoError(t, marshaler.Unmarshal(m, &received))
found := false
for _, srv := range servers {
expected := domain.EventServerRefreshedPayload{
expected := model.ServerRefreshedEvPayload{
Key: srv.Key,
URL: srv.URL,
Open: srv.Open,
@ -118,7 +120,7 @@ func TestServerConsumer_updateInfoAndConfig(t *testing.T) {
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventServerRefreshedPayload{Open: true, Key: "xxx1"}
payload := model.ServerRefreshedEvPayload{Open: true, Key: "xxx1"}
ev, err := marshaler.Marshal(payload)
require.NoError(t, err)
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))
@ -163,7 +165,7 @@ func TestServerConsumer_updateInfoAndConfig(t *testing.T) {
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
ev, err := marshaler.Marshal(domain.EventServerRefreshedPayload{Open: false})
ev, err := marshaler.Marshal(model.ServerRefreshedEvPayload{Open: false})
require.NoError(t, err)
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))
@ -207,7 +209,7 @@ func TestServerConsumer_updateNumPlayers(t *testing.T) {
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventPlayersRefreshedPayload{
payload := model.PlayersRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",
@ -259,7 +261,7 @@ func TestServerConsumer_updateNumTribes(t *testing.T) {
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventTribesRefreshedPayload{
payload := model.TribesRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",
@ -311,7 +313,7 @@ func TestServerConsumer_updateNumVillages(t *testing.T) {
msg.NewServerConsumer(marshaler, pubSub, pubSub, serverSvc),
)
payload := domain.EventVillagesRefreshedPayload{
payload := model.VillagesRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",

View File

@ -4,7 +4,8 @@ import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message"
)
@ -54,7 +55,7 @@ func (t *TribeConsumer) Register(router *message.Router) {
}
func (t *TribeConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload domain.EventServerRefreshedPayload
var payload model.ServerRefreshedEvPayload
if err := t.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil, nil
@ -70,7 +71,7 @@ func (t *TribeConsumer) refresh(msg *message.Message) ([]*message.Message, error
return nil, fmt.Errorf("TribeService.Refresh: %w", err)
}
ev, err := t.marshaler.Marshal(domain.EventTribesRefreshedPayload{
ev, err := t.marshaler.Marshal(model.TribesRefreshedEvPayload{
Key: payload.Key,
URL: payload.URL,
VersionCode: payload.VersionCode,
@ -83,7 +84,7 @@ func (t *TribeConsumer) refresh(msg *message.Message) ([]*message.Message, error
}
func (t *TribeConsumer) updateDominance(msg *message.Message) error {
var payload domain.EventVillagesRefreshedPayload
var payload model.VillagesRefreshedEvPayload
if err := t.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil

View File

@ -5,9 +5,10 @@ import (
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/mock"
"github.com/ThreeDotsLabs/watermill/message"
@ -33,7 +34,7 @@ func TestTribeConsumer_refresh(t *testing.T) {
msgs, err := pubSub.Subscribe(context.Background(), "tribes.event.refreshed")
require.NoError(t, err)
serverRefreshedPayload := domain.EventServerRefreshedPayload{
serverRefreshedPayload := model.ServerRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
Open: true,
@ -45,7 +46,7 @@ func TestTribeConsumer_refresh(t *testing.T) {
receivedMsgs, _ := subscriber.BulkRead(msgs, 1, 10*time.Second)
require.Len(t, receivedMsgs, 1)
var tribesRefreshedPayload domain.EventTribesRefreshedPayload
var tribesRefreshedPayload model.TribesRefreshedEvPayload
assert.NoError(t, marshaler.Unmarshal(receivedMsgs[0], &tribesRefreshedPayload))
assert.Equal(t, serverRefreshedPayload.Key, tribesRefreshedPayload.Key)
assert.Equal(t, serverRefreshedPayload.URL, tribesRefreshedPayload.URL)
@ -81,7 +82,7 @@ func TestTribeConsumer_refresh(t *testing.T) {
msg.NewTribeConsumer(marshaler, pubSub, pubSub, tribeSvc),
)
ev, err := marshaler.Marshal(domain.EventServerRefreshedPayload{Open: false})
ev, err := marshaler.Marshal(model.ServerRefreshedEvPayload{Open: false})
require.NoError(t, err)
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))
@ -125,7 +126,7 @@ func TestTribeConsumer_updateDominance(t *testing.T) {
msg.NewTribeConsumer(marshaler, pubSub, pubSub, tribeSvc),
)
payload := domain.EventVillagesRefreshedPayload{
payload := model.VillagesRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
VersionCode: "pl",

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/ThreeDotsLabs/watermill/message"
@ -47,7 +49,7 @@ func (v *VillageConsumer) Register(router *message.Router) {
}
func (v *VillageConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload domain.EventServerRefreshedPayload
var payload model.ServerRefreshedEvPayload
if err := v.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil, nil
@ -63,7 +65,7 @@ func (v *VillageConsumer) refresh(msg *message.Message) ([]*message.Message, err
return nil, fmt.Errorf("VillageService.Refresh: %w", err)
}
ev, err := v.marshaler.Marshal(domain.EventVillagesRefreshedPayload{
ev, err := v.marshaler.Marshal(model.VillagesRefreshedEvPayload{
Key: payload.Key,
URL: payload.URL,
VersionCode: payload.VersionCode,

View File

@ -5,6 +5,8 @@ import (
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
@ -38,7 +40,7 @@ func TestVillageConsumer_refresh(t *testing.T) {
msgs, err := pubSub.Subscribe(context.Background(), "villages.event.refreshed")
require.NoError(t, err)
serverRefreshedPayload := domain.EventServerRefreshedPayload{
serverRefreshedPayload := model.ServerRefreshedEvPayload{
Key: "pl151",
URL: "https://pl151.plemiona.pl",
Open: true,
@ -50,7 +52,7 @@ func TestVillageConsumer_refresh(t *testing.T) {
receivedMsgs, _ := subscriber.BulkRead(msgs, 1, 10*time.Second)
require.Len(t, receivedMsgs, 1)
var villagesRefreshedPayload domain.EventVillagesRefreshedPayload
var villagesRefreshedPayload model.VillagesRefreshedEvPayload
assert.NoError(t, marshaler.Unmarshal(receivedMsgs[0], &villagesRefreshedPayload))
assert.Equal(t, serverRefreshedPayload.Key, villagesRefreshedPayload.Key)
assert.Equal(t, serverRefreshedPayload.URL, villagesRefreshedPayload.URL)
@ -89,7 +91,7 @@ func TestVillageConsumer_refresh(t *testing.T) {
msg.NewVillageConsumer(marshaler, pubSub, pubSub, villageSvc),
)
ev, err := marshaler.Marshal(domain.EventServerRefreshedPayload{Open: false})
ev, err := marshaler.Marshal(model.ServerRefreshedEvPayload{Open: false})
require.NoError(t, err)
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))

View File

@ -0,0 +1,8 @@
package model
type PlayersRefreshedEvPayload struct {
Key string
URL string
VersionCode string
NumPlayers int64
}

View File

@ -0,0 +1,13 @@
package model
type RefreshServersCmdPayload struct {
Host string
VersionCode string
}
type ServerRefreshedEvPayload struct {
Key string
URL string
Open bool
VersionCode string
}

View File

@ -0,0 +1,8 @@
package model
type TribesRefreshedEvPayload struct {
Key string
URL string
VersionCode string
NumTribes int64
}

View File

@ -0,0 +1,11 @@
package model
type VillagesRefreshedEvPayload struct {
Key string
URL string
VersionCode string
NumVillages int64
NumPlayerVillages int64
NumBarbarianVillages int64
NumBonusVillages int64
}

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/ThreeDotsLabs/watermill/message"
@ -18,10 +20,13 @@ func NewServerPublisher(publisher message.Publisher, marshaler CommandEventMarsh
return &ServerPublisher{publisher: publisher, marshaler: marshaler}
}
func (s *ServerPublisher) CmdRefresh(_ context.Context, payloads ...domain.CmdRefreshServersPayload) error {
func (s *ServerPublisher) CmdRefresh(_ context.Context, payloads ...domain.RefreshServersCmdPayload) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := s.marshaler.Marshal(p)
msg, err := s.marshaler.Marshal(model.RefreshServersCmdPayload{
Host: p.Host,
VersionCode: p.VersionCode,
})
if err != nil {
return fmt.Errorf("marshaler.Marshal: %w", err)
}

View File

@ -2,10 +2,11 @@ package msg_test
import (
"context"
"reflect"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/stretchr/testify/assert"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
@ -23,7 +24,7 @@ func TestServerPublisher_CmdRefresh(t *testing.T) {
marshaler := msg.GobMarshaler{}
pubSub := newPubSub(t)
payloads := []domain.CmdRefreshServersPayload{
payloads := []domain.RefreshServersCmdPayload{
{
Host: "host1.com",
VersionCode: "vc1",
@ -40,14 +41,14 @@ func TestServerPublisher_CmdRefresh(t *testing.T) {
require.NoError(t, msg.NewServerPublisher(pubSub, marshaler).CmdRefresh(context.Background(), payloads...))
receivedMsgs, _ := subscriber.BulkRead(msgs, len(payloads), time.Second)
assert.Len(t, receivedMsgs, len(payloads))
require.Len(t, receivedMsgs, len(payloads))
for _, m := range receivedMsgs {
var received domain.CmdRefreshServersPayload
var received model.RefreshServersCmdPayload
assert.NoError(t, marshaler.Unmarshal(m, &received))
found := false
for _, ev := range payloads {
if reflect.DeepEqual(ev, received) {
for _, payload := range payloads {
if payload.VersionCode == received.VersionCode && payload.Host == received.Host {
found = true
break
}