feat: player consumer (#20)

Reviewed-on: twhelp/corev3#20
This commit is contained in:
Dawid Wysokiński 2024-01-01 07:49:47 +00:00
parent 4d67bdb30c
commit 1b699d225f
11 changed files with 659 additions and 2 deletions

View File

@ -107,6 +107,42 @@ var cmdConsumer = &cli.Command{
)
consumer.Register(router)
return nil
},
)
},
},
{
Name: "player",
Usage: "Run the worker responsible for consuming player-related messages",
Flags: concatSlices(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"PlayerConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
consumer := port.NewPlayerWatermillConsumer(
app.NewPlayerService(twSvc),
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicServerSyncedEvent.Name),
)
consumer.Register(router)
return nil
},
)
@ -115,7 +151,7 @@ var cmdConsumer = &cli.Command{
},
}
type registerHandlersFunc func(
type registerConsumerHandlersFunc func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
@ -126,7 +162,7 @@ type registerHandlersFunc func(
) error
//nolint:gocyclo
func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFunc) error {
func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
@ -250,6 +286,7 @@ const (
watermillRetryMWMInitialInterval = 3 * time.Second
watermillRetryMWMMaxInterval = 10 * time.Second
watermillRetryMWMIntervalMultiplier = 1.5
watermillTimeout = 2 * time.Minute
)
func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
@ -261,6 +298,7 @@ func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error)
}
router.AddMiddleware(middleware.Recoverer)
router.AddMiddleware(middleware.Timeout(watermillTimeout))
router.AddMiddleware(middleware.Retry{
MaxRetries: watermillRetryMWMaxRetries,
InitialInterval: watermillRetryMWMInitialInterval,

View File

@ -201,3 +201,50 @@ func (t *TWHTTP) convertTribesToDomain(tribes []tw.Tribe) (domain.BaseTribes, er
return res, nil
}
func (t *TWHTTP) GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error) {
tribes, err := t.client.GetPlayers(ctx, baseURL)
if err != nil {
return nil, err
}
return t.convertPlayersToDomain(tribes)
}
func (t *TWHTTP) convertPlayersToDomain(players []tw.Player) (domain.BasePlayers, error) {
res := make(domain.BasePlayers, 0, len(players))
for _, p := range players {
od, err := domain.NewOpponentsDefeated(
p.OpponentsDefeated.RankAtt,
p.OpponentsDefeated.ScoreAtt,
p.OpponentsDefeated.RankDef,
p.OpponentsDefeated.ScoreDef,
p.OpponentsDefeated.RankSup,
p.OpponentsDefeated.ScoreSup,
p.OpponentsDefeated.RankTotal,
p.OpponentsDefeated.ScoreTotal,
)
if err != nil {
return nil, fmt.Errorf("couldn't construct domain.OpponentsDefeated: %w", err)
}
converted, err := domain.NewBasePlayer(
p.ID,
p.Name,
p.NumVillages,
p.Points,
p.Rank,
p.TribeID,
od,
p.ProfileURL,
)
if err != nil {
return nil, fmt.Errorf("couldn't construct domain.BasePlayer: %w", err)
}
res = append(res, converted)
}
return res, nil
}

View File

@ -0,0 +1,27 @@
package app
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
)
type PlayerService struct {
twSvc TWService
}
func NewPlayerService(twSvc TWService) *PlayerService {
return &PlayerService{twSvc: twSvc}
}
func (svc *PlayerService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
tribes, err := svc.twSvc.GetPlayers(ctx, payload.URL())
if err != nil {
return fmt.Errorf("%s: couldn't get players: %w", payload.Key(), err)
}
fmt.Println(payload.URL(), len(tribes))
return nil
}

View File

@ -13,4 +13,5 @@ type TWService interface {
GetUnitInfo(ctx context.Context, baseURL *url.URL) (domain.UnitInfo, error)
GetBuildingInfo(ctx context.Context, baseURL *url.URL) (domain.BuildingInfo, error)
GetTribes(ctx context.Context, baseURL *url.URL) (domain.BaseTribes, error)
GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error)
}

View File

@ -0,0 +1,131 @@
package domain
import (
"math"
"net/url"
)
type BasePlayer struct {
id int
name string
numVillages int
points int
rank int
tribeID int
od OpponentsDefeated
profileURL *url.URL
}
const basePlayerModelName = "BasePlayer"
func NewBasePlayer(
id int,
name string,
numVillages int,
points int,
rank int,
tribeID int,
od OpponentsDefeated,
profileURL *url.URL,
) (BasePlayer, error) {
if err := validateIntInRange(id, 1, math.MaxInt); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "id",
Err: err,
}
}
if err := validateStringLen(name, playerNameMinLength, playerNameMaxLength); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "name",
Err: err,
}
}
if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "numVillages",
Err: err,
}
}
if err := validateIntInRange(points, 0, math.MaxInt); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "points",
Err: err,
}
}
if err := validateIntInRange(rank, 0, math.MaxInt); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "rank",
Err: err,
}
}
if err := validateIntInRange(tribeID, 0, math.MaxInt); err != nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "tribeID",
Err: err,
}
}
if profileURL == nil {
return BasePlayer{}, ValidationError{
Model: basePlayerModelName,
Field: "profileURL",
Err: ErrNil,
}
}
return BasePlayer{
id: id,
name: name,
numVillages: numVillages,
points: points,
rank: rank,
tribeID: tribeID,
od: od,
profileURL: profileURL,
}, nil
}
func (p BasePlayer) ID() int {
return p.id
}
func (p BasePlayer) Name() string {
return p.name
}
func (p BasePlayer) NumVillages() int {
return p.numVillages
}
func (p BasePlayer) Points() int {
return p.points
}
func (p BasePlayer) Rank() int {
return p.rank
}
func (p BasePlayer) TribeID() int {
return p.tribeID
}
func (p BasePlayer) OD() OpponentsDefeated {
return p.od
}
func (p BasePlayer) ProfileURL() *url.URL {
return p.profileURL
}
type BasePlayers []BasePlayer

View File

@ -0,0 +1,247 @@
package domain_test
import (
"net/url"
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewBasePlayer(t *testing.T) {
t.Parallel()
validBasePlayer := domaintest.NewBasePlayer(t)
type args struct {
id int
name string
numVillages int
points int
rank int
tribeID int
od domain.OpponentsDefeated
profileURL *url.URL
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
},
{
name: "ERR: id < 1",
args: args{
id: 0,
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "id",
Err: domain.MinGreaterEqualError{
Min: 1,
Current: 0,
},
},
},
{
name: "ERR: len(name) < 1",
args: args{
id: validBasePlayer.ID(),
name: "",
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "name",
Err: domain.LenOutOfRangeError{
Min: 1,
Max: 150,
Current: 0,
},
},
},
{
name: "ERR: len(name) > 150",
args: args{
id: validBasePlayer.ID(),
name: gofakeit.LetterN(151),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "name",
Err: domain.LenOutOfRangeError{
Min: 1,
Max: 150,
Current: 151,
},
},
},
{
name: "ERR: numVillages < 0",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: -1,
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "numVillages",
Err: domain.MinGreaterEqualError{
Min: 0,
Current: -1,
},
},
},
{
name: "ERR: points < 0",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: -1,
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "points",
Err: domain.MinGreaterEqualError{
Min: 0,
Current: -1,
},
},
},
{
name: "ERR: rank < 0",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: -1,
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "rank",
Err: domain.MinGreaterEqualError{
Min: 0,
Current: -1,
},
},
},
{
name: "ERR: tribeID < 0",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: -1,
od: validBasePlayer.OD(),
profileURL: validBasePlayer.ProfileURL(),
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "tribeID",
Err: domain.MinGreaterEqualError{
Min: 0,
Current: -1,
},
},
},
{
name: "ERR: profileURL can't be nil",
args: args{
id: validBasePlayer.ID(),
name: validBasePlayer.Name(),
numVillages: validBasePlayer.NumVillages(),
points: validBasePlayer.Points(),
rank: validBasePlayer.Rank(),
tribeID: validBasePlayer.TribeID(),
od: validBasePlayer.OD(),
profileURL: nil,
},
expectedErr: domain.ValidationError{
Model: "BasePlayer",
Field: "profileURL",
Err: domain.ErrNil,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
res, err := domain.NewBasePlayer(
tt.args.id,
tt.args.name,
tt.args.numVillages,
tt.args.points,
tt.args.rank,
tt.args.tribeID,
tt.args.od,
tt.args.profileURL,
)
require.ErrorIs(t, err, tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.id, res.ID())
assert.Equal(t, tt.args.name, res.Name())
assert.Equal(t, tt.args.numVillages, res.NumVillages())
assert.Equal(t, tt.args.points, res.Points())
assert.Equal(t, tt.args.rank, res.Rank())
assert.Equal(t, tt.args.tribeID, res.TribeID())
assert.Equal(t, tt.args.od, res.OD())
assert.Equal(t, tt.args.profileURL, res.ProfileURL())
})
}
}

View File

@ -0,0 +1,52 @@
package domaintest
import (
"net/url"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/require"
)
type BasePlayerConfig struct {
ID int
OD domain.OpponentsDefeated
NumVillages int
AllPoints int
Rank int
TribeID int
}
func NewBasePlayer(tb TestingTB, opts ...func(cfg *BasePlayerConfig)) domain.BasePlayer {
tb.Helper()
cfg := &BasePlayerConfig{
ID: RandID(),
OD: NewOpponentsDefeated(tb),
NumVillages: gofakeit.IntRange(1, 10000),
AllPoints: gofakeit.IntRange(1, 10000),
Rank: gofakeit.IntRange(1, 10000),
TribeID: gofakeit.IntRange(0, 10000),
}
for _, opt := range opts {
opt(cfg)
}
u, err := url.ParseRequestURI(gofakeit.URL())
require.NoError(tb, err)
p, err := domain.NewBasePlayer(
cfg.ID,
gofakeit.LetterN(50),
cfg.NumVillages,
cfg.AllPoints,
cfg.Rank,
cfg.TribeID,
cfg.OD,
u,
)
require.NoError(tb, err)
return p
}

View File

@ -0,0 +1,6 @@
package domain
const (
playerNameMinLength = 1
playerNameMaxLength = 150
)

View File

@ -0,0 +1,63 @@
package port
import (
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type PlayerWatermillConsumer struct {
svc *app.PlayerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
eventServerSyncedTopic string
}
func NewPlayerWatermillConsumer(
svc *app.PlayerService,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
marshaler watermillmsg.Marshaler,
eventServerSyncedTopic string,
) *PlayerWatermillConsumer {
return &PlayerWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
eventServerSyncedTopic: eventServerSyncedTopic,
}
}
func (c *PlayerWatermillConsumer) Register(router *message.Router) {
router.AddNoPublisherHandler(
"PlayerConsumer.sync",
c.eventServerSyncedTopic,
c.subscriber,
c.sync,
)
}
func (c *PlayerWatermillConsumer) sync(msg *message.Message) error {
var rawPayload watermillmsg.ServerSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode)
if err != nil {
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.Sync(msg.Context(), payload)
}

View File

@ -4,6 +4,7 @@ resources:
- jobs.yml
- server-consumer.yml
- tribe-consumer.yml
- player-consumer.yml
images:
- name: twhelp
newName: twhelp

View File

@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: twhelp-player-consumer-deployment
spec:
selector:
matchLabels:
app: twhelp-player-consumer
template:
metadata:
labels:
app: twhelp-player-consumer
spec:
containers:
- name: twhelp-player-consumer
image: twhelp
args: [consumer, player]
env:
- name: APP_MODE
value: development
- name: LOG_LEVEL
value: debug
- name: DB_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: db-connection-string
- name: RABBITMQ_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: rabbitmq-connection-string
livenessProbe:
exec:
command: [cat, /tmp/live]
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 300m
memory: 300Mi