parent
4d67bdb30c
commit
1b699d225f
|
@ -107,6 +107,42 @@ var cmdConsumer = &cli.Command{
|
|||
)
|
||||
consumer.Register(router)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "player",
|
||||
Usage: "Run the worker responsible for consuming player-related messages",
|
||||
Flags: concatSlices(dbFlags, rmqFlags, twSvcFlags),
|
||||
Action: func(c *cli.Context) error {
|
||||
return runConsumer(
|
||||
c,
|
||||
"PlayerConsumer",
|
||||
func(
|
||||
c *cli.Context,
|
||||
router *message.Router,
|
||||
logger watermill.LoggerAdapter,
|
||||
publisher *amqp.Publisher,
|
||||
subscriber *amqp.Subscriber,
|
||||
marshaler watermillmsg.Marshaler,
|
||||
db *bun.DB,
|
||||
) error {
|
||||
twSvc, err := newTWServiceFromFlags(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
consumer := port.NewPlayerWatermillConsumer(
|
||||
app.NewPlayerService(twSvc),
|
||||
subscriber,
|
||||
logger,
|
||||
marshaler,
|
||||
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
||||
)
|
||||
consumer.Register(router)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
@ -115,7 +151,7 @@ var cmdConsumer = &cli.Command{
|
|||
},
|
||||
}
|
||||
|
||||
type registerHandlersFunc func(
|
||||
type registerConsumerHandlersFunc func(
|
||||
c *cli.Context,
|
||||
router *message.Router,
|
||||
logger watermill.LoggerAdapter,
|
||||
|
@ -126,7 +162,7 @@ type registerHandlersFunc func(
|
|||
) error
|
||||
|
||||
//nolint:gocyclo
|
||||
func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFunc) error {
|
||||
func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error {
|
||||
ctx, cancel := context.WithCancel(c.Context)
|
||||
defer cancel()
|
||||
|
||||
|
@ -250,6 +286,7 @@ const (
|
|||
watermillRetryMWMInitialInterval = 3 * time.Second
|
||||
watermillRetryMWMMaxInterval = 10 * time.Second
|
||||
watermillRetryMWMIntervalMultiplier = 1.5
|
||||
watermillTimeout = 2 * time.Minute
|
||||
)
|
||||
|
||||
func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
|
||||
|
@ -261,6 +298,7 @@ func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error)
|
|||
}
|
||||
|
||||
router.AddMiddleware(middleware.Recoverer)
|
||||
router.AddMiddleware(middleware.Timeout(watermillTimeout))
|
||||
router.AddMiddleware(middleware.Retry{
|
||||
MaxRetries: watermillRetryMWMaxRetries,
|
||||
InitialInterval: watermillRetryMWMInitialInterval,
|
||||
|
|
|
@ -201,3 +201,50 @@ func (t *TWHTTP) convertTribesToDomain(tribes []tw.Tribe) (domain.BaseTribes, er
|
|||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (t *TWHTTP) GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error) {
|
||||
tribes, err := t.client.GetPlayers(ctx, baseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t.convertPlayersToDomain(tribes)
|
||||
}
|
||||
|
||||
func (t *TWHTTP) convertPlayersToDomain(players []tw.Player) (domain.BasePlayers, error) {
|
||||
res := make(domain.BasePlayers, 0, len(players))
|
||||
|
||||
for _, p := range players {
|
||||
od, err := domain.NewOpponentsDefeated(
|
||||
p.OpponentsDefeated.RankAtt,
|
||||
p.OpponentsDefeated.ScoreAtt,
|
||||
p.OpponentsDefeated.RankDef,
|
||||
p.OpponentsDefeated.ScoreDef,
|
||||
p.OpponentsDefeated.RankSup,
|
||||
p.OpponentsDefeated.ScoreSup,
|
||||
p.OpponentsDefeated.RankTotal,
|
||||
p.OpponentsDefeated.ScoreTotal,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't construct domain.OpponentsDefeated: %w", err)
|
||||
}
|
||||
|
||||
converted, err := domain.NewBasePlayer(
|
||||
p.ID,
|
||||
p.Name,
|
||||
p.NumVillages,
|
||||
p.Points,
|
||||
p.Rank,
|
||||
p.TribeID,
|
||||
od,
|
||||
p.ProfileURL,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't construct domain.BasePlayer: %w", err)
|
||||
}
|
||||
|
||||
res = append(res, converted)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
|
27
internal/app/service_player.go
Normal file
27
internal/app/service_player.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
)
|
||||
|
||||
type PlayerService struct {
|
||||
twSvc TWService
|
||||
}
|
||||
|
||||
func NewPlayerService(twSvc TWService) *PlayerService {
|
||||
return &PlayerService{twSvc: twSvc}
|
||||
}
|
||||
|
||||
func (svc *PlayerService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
|
||||
tribes, err := svc.twSvc.GetPlayers(ctx, payload.URL())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: couldn't get players: %w", payload.Key(), err)
|
||||
}
|
||||
|
||||
fmt.Println(payload.URL(), len(tribes))
|
||||
|
||||
return nil
|
||||
}
|
|
@ -13,4 +13,5 @@ type TWService interface {
|
|||
GetUnitInfo(ctx context.Context, baseURL *url.URL) (domain.UnitInfo, error)
|
||||
GetBuildingInfo(ctx context.Context, baseURL *url.URL) (domain.BuildingInfo, error)
|
||||
GetTribes(ctx context.Context, baseURL *url.URL) (domain.BaseTribes, error)
|
||||
GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error)
|
||||
}
|
||||
|
|
131
internal/domain/base_player.go
Normal file
131
internal/domain/base_player.go
Normal file
|
@ -0,0 +1,131 @@
|
|||
package domain
|
||||
|
||||
import (
|
||||
"math"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type BasePlayer struct {
|
||||
id int
|
||||
name string
|
||||
numVillages int
|
||||
points int
|
||||
rank int
|
||||
tribeID int
|
||||
od OpponentsDefeated
|
||||
profileURL *url.URL
|
||||
}
|
||||
|
||||
const basePlayerModelName = "BasePlayer"
|
||||
|
||||
func NewBasePlayer(
|
||||
id int,
|
||||
name string,
|
||||
numVillages int,
|
||||
points int,
|
||||
rank int,
|
||||
tribeID int,
|
||||
od OpponentsDefeated,
|
||||
profileURL *url.URL,
|
||||
) (BasePlayer, error) {
|
||||
if err := validateIntInRange(id, 1, math.MaxInt); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "id",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateStringLen(name, playerNameMinLength, playerNameMaxLength); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "name",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "numVillages",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateIntInRange(points, 0, math.MaxInt); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "points",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateIntInRange(rank, 0, math.MaxInt); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "rank",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateIntInRange(tribeID, 0, math.MaxInt); err != nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "tribeID",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if profileURL == nil {
|
||||
return BasePlayer{}, ValidationError{
|
||||
Model: basePlayerModelName,
|
||||
Field: "profileURL",
|
||||
Err: ErrNil,
|
||||
}
|
||||
}
|
||||
|
||||
return BasePlayer{
|
||||
id: id,
|
||||
name: name,
|
||||
numVillages: numVillages,
|
||||
points: points,
|
||||
rank: rank,
|
||||
tribeID: tribeID,
|
||||
od: od,
|
||||
profileURL: profileURL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p BasePlayer) ID() int {
|
||||
return p.id
|
||||
}
|
||||
|
||||
func (p BasePlayer) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p BasePlayer) NumVillages() int {
|
||||
return p.numVillages
|
||||
}
|
||||
|
||||
func (p BasePlayer) Points() int {
|
||||
return p.points
|
||||
}
|
||||
|
||||
func (p BasePlayer) Rank() int {
|
||||
return p.rank
|
||||
}
|
||||
|
||||
func (p BasePlayer) TribeID() int {
|
||||
return p.tribeID
|
||||
}
|
||||
|
||||
func (p BasePlayer) OD() OpponentsDefeated {
|
||||
return p.od
|
||||
}
|
||||
|
||||
func (p BasePlayer) ProfileURL() *url.URL {
|
||||
return p.profileURL
|
||||
}
|
||||
|
||||
type BasePlayers []BasePlayer
|
247
internal/domain/base_player_test.go
Normal file
247
internal/domain/base_player_test.go
Normal file
|
@ -0,0 +1,247 @@
|
|||
package domain_test
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||
"github.com/brianvoe/gofakeit/v6"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewBasePlayer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
validBasePlayer := domaintest.NewBasePlayer(t)
|
||||
|
||||
type args struct {
|
||||
id int
|
||||
name string
|
||||
numVillages int
|
||||
points int
|
||||
rank int
|
||||
tribeID int
|
||||
od domain.OpponentsDefeated
|
||||
profileURL *url.URL
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "OK",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: id < 1",
|
||||
args: args{
|
||||
id: 0,
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "id",
|
||||
Err: domain.MinGreaterEqualError{
|
||||
Min: 1,
|
||||
Current: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: len(name) < 1",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: "",
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "name",
|
||||
Err: domain.LenOutOfRangeError{
|
||||
Min: 1,
|
||||
Max: 150,
|
||||
Current: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: len(name) > 150",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: gofakeit.LetterN(151),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "name",
|
||||
Err: domain.LenOutOfRangeError{
|
||||
Min: 1,
|
||||
Max: 150,
|
||||
Current: 151,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: numVillages < 0",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: -1,
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "numVillages",
|
||||
Err: domain.MinGreaterEqualError{
|
||||
Min: 0,
|
||||
Current: -1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: points < 0",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: -1,
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "points",
|
||||
Err: domain.MinGreaterEqualError{
|
||||
Min: 0,
|
||||
Current: -1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: rank < 0",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: -1,
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "rank",
|
||||
Err: domain.MinGreaterEqualError{
|
||||
Min: 0,
|
||||
Current: -1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: tribeID < 0",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: -1,
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: validBasePlayer.ProfileURL(),
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "tribeID",
|
||||
Err: domain.MinGreaterEqualError{
|
||||
Min: 0,
|
||||
Current: -1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: profileURL can't be nil",
|
||||
args: args{
|
||||
id: validBasePlayer.ID(),
|
||||
name: validBasePlayer.Name(),
|
||||
numVillages: validBasePlayer.NumVillages(),
|
||||
points: validBasePlayer.Points(),
|
||||
rank: validBasePlayer.Rank(),
|
||||
tribeID: validBasePlayer.TribeID(),
|
||||
od: validBasePlayer.OD(),
|
||||
profileURL: nil,
|
||||
},
|
||||
expectedErr: domain.ValidationError{
|
||||
Model: "BasePlayer",
|
||||
Field: "profileURL",
|
||||
Err: domain.ErrNil,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
res, err := domain.NewBasePlayer(
|
||||
tt.args.id,
|
||||
tt.args.name,
|
||||
tt.args.numVillages,
|
||||
tt.args.points,
|
||||
tt.args.rank,
|
||||
tt.args.tribeID,
|
||||
tt.args.od,
|
||||
tt.args.profileURL,
|
||||
)
|
||||
require.ErrorIs(t, err, tt.expectedErr)
|
||||
if tt.expectedErr != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, tt.args.id, res.ID())
|
||||
assert.Equal(t, tt.args.name, res.Name())
|
||||
assert.Equal(t, tt.args.numVillages, res.NumVillages())
|
||||
assert.Equal(t, tt.args.points, res.Points())
|
||||
assert.Equal(t, tt.args.rank, res.Rank())
|
||||
assert.Equal(t, tt.args.tribeID, res.TribeID())
|
||||
assert.Equal(t, tt.args.od, res.OD())
|
||||
assert.Equal(t, tt.args.profileURL, res.ProfileURL())
|
||||
})
|
||||
}
|
||||
}
|
52
internal/domain/domaintest/base_player.go
Normal file
52
internal/domain/domaintest/base_player.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package domaintest
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"github.com/brianvoe/gofakeit/v6"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type BasePlayerConfig struct {
|
||||
ID int
|
||||
OD domain.OpponentsDefeated
|
||||
NumVillages int
|
||||
AllPoints int
|
||||
Rank int
|
||||
TribeID int
|
||||
}
|
||||
|
||||
func NewBasePlayer(tb TestingTB, opts ...func(cfg *BasePlayerConfig)) domain.BasePlayer {
|
||||
tb.Helper()
|
||||
|
||||
cfg := &BasePlayerConfig{
|
||||
ID: RandID(),
|
||||
OD: NewOpponentsDefeated(tb),
|
||||
NumVillages: gofakeit.IntRange(1, 10000),
|
||||
AllPoints: gofakeit.IntRange(1, 10000),
|
||||
Rank: gofakeit.IntRange(1, 10000),
|
||||
TribeID: gofakeit.IntRange(0, 10000),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
u, err := url.ParseRequestURI(gofakeit.URL())
|
||||
require.NoError(tb, err)
|
||||
|
||||
p, err := domain.NewBasePlayer(
|
||||
cfg.ID,
|
||||
gofakeit.LetterN(50),
|
||||
cfg.NumVillages,
|
||||
cfg.AllPoints,
|
||||
cfg.Rank,
|
||||
cfg.TribeID,
|
||||
cfg.OD,
|
||||
u,
|
||||
)
|
||||
require.NoError(tb, err)
|
||||
|
||||
return p
|
||||
}
|
6
internal/domain/player.go
Normal file
6
internal/domain/player.go
Normal file
|
@ -0,0 +1,6 @@
|
|||
package domain
|
||||
|
||||
const (
|
||||
playerNameMinLength = 1
|
||||
playerNameMaxLength = 150
|
||||
)
|
63
internal/port/consumer_watermill_player.go
Normal file
63
internal/port/consumer_watermill_player.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package port
|
||||
|
||||
import (
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
type PlayerWatermillConsumer struct {
|
||||
svc *app.PlayerService
|
||||
subscriber message.Subscriber
|
||||
logger watermill.LoggerAdapter
|
||||
marshaler watermillmsg.Marshaler
|
||||
eventServerSyncedTopic string
|
||||
}
|
||||
|
||||
func NewPlayerWatermillConsumer(
|
||||
svc *app.PlayerService,
|
||||
subscriber message.Subscriber,
|
||||
logger watermill.LoggerAdapter,
|
||||
marshaler watermillmsg.Marshaler,
|
||||
eventServerSyncedTopic string,
|
||||
) *PlayerWatermillConsumer {
|
||||
return &PlayerWatermillConsumer{
|
||||
svc: svc,
|
||||
subscriber: subscriber,
|
||||
logger: logger,
|
||||
marshaler: marshaler,
|
||||
eventServerSyncedTopic: eventServerSyncedTopic,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *PlayerWatermillConsumer) Register(router *message.Router) {
|
||||
router.AddNoPublisherHandler(
|
||||
"PlayerConsumer.sync",
|
||||
c.eventServerSyncedTopic,
|
||||
c.subscriber,
|
||||
c.sync,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *PlayerWatermillConsumer) sync(msg *message.Message) error {
|
||||
var rawPayload watermillmsg.ServerSyncedEventPayload
|
||||
|
||||
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
||||
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
||||
"handler": message.HandlerNameFromCtx(msg.Context()),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode)
|
||||
if err != nil {
|
||||
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
|
||||
"handler": message.HandlerNameFromCtx(msg.Context()),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.svc.Sync(msg.Context(), payload)
|
||||
}
|
|
@ -4,6 +4,7 @@ resources:
|
|||
- jobs.yml
|
||||
- server-consumer.yml
|
||||
- tribe-consumer.yml
|
||||
- player-consumer.yml
|
||||
images:
|
||||
- name: twhelp
|
||||
newName: twhelp
|
||||
|
|
44
k8s/base/player-consumer.yml
Normal file
44
k8s/base/player-consumer.yml
Normal file
|
@ -0,0 +1,44 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: twhelp-player-consumer-deployment
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: twhelp-player-consumer
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: twhelp-player-consumer
|
||||
spec:
|
||||
containers:
|
||||
- name: twhelp-player-consumer
|
||||
image: twhelp
|
||||
args: [consumer, player]
|
||||
env:
|
||||
- name: APP_MODE
|
||||
value: development
|
||||
- name: LOG_LEVEL
|
||||
value: debug
|
||||
- name: DB_CONNECTION_STRING
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: db-connection-string
|
||||
- name: RABBITMQ_CONNECTION_STRING
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: rabbitmq-connection-string
|
||||
livenessProbe:
|
||||
exec:
|
||||
command: [cat, /tmp/live]
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 10
|
||||
resources:
|
||||
requests:
|
||||
cpu: 100m
|
||||
memory: 100Mi
|
||||
limits:
|
||||
cpu: 300m
|
||||
memory: 300Mi
|
Loading…
Reference in New Issue
Block a user