diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index e4a5e2b..6eb118c 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -107,6 +107,42 @@ var cmdConsumer = &cli.Command{ ) consumer.Register(router) + return nil + }, + ) + }, + }, + { + Name: "player", + Usage: "Run the worker responsible for consuming player-related messages", + Flags: concatSlices(dbFlags, rmqFlags, twSvcFlags), + Action: func(c *cli.Context) error { + return runConsumer( + c, + "PlayerConsumer", + func( + c *cli.Context, + router *message.Router, + logger watermill.LoggerAdapter, + publisher *amqp.Publisher, + subscriber *amqp.Subscriber, + marshaler watermillmsg.Marshaler, + db *bun.DB, + ) error { + twSvc, err := newTWServiceFromFlags(c) + if err != nil { + return err + } + + consumer := port.NewPlayerWatermillConsumer( + app.NewPlayerService(twSvc), + subscriber, + logger, + marshaler, + c.String(rmqFlagTopicServerSyncedEvent.Name), + ) + consumer.Register(router) + return nil }, ) @@ -115,7 +151,7 @@ var cmdConsumer = &cli.Command{ }, } -type registerHandlersFunc func( +type registerConsumerHandlersFunc func( c *cli.Context, router *message.Router, logger watermill.LoggerAdapter, @@ -126,7 +162,7 @@ type registerHandlersFunc func( ) error //nolint:gocyclo -func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFunc) error { +func runConsumer(c *cli.Context, name string, registerHandlers registerConsumerHandlersFunc) error { ctx, cancel := context.WithCancel(c.Context) defer cancel() @@ -250,6 +286,7 @@ const ( watermillRetryMWMInitialInterval = 3 * time.Second watermillRetryMWMMaxInterval = 10 * time.Second watermillRetryMWMIntervalMultiplier = 1.5 + watermillTimeout = 2 * time.Minute ) func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) { @@ -261,6 +298,7 @@ func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) } router.AddMiddleware(middleware.Recoverer) + router.AddMiddleware(middleware.Timeout(watermillTimeout)) router.AddMiddleware(middleware.Retry{ MaxRetries: watermillRetryMWMaxRetries, InitialInterval: watermillRetryMWMInitialInterval, diff --git a/internal/adapter/http_tw.go b/internal/adapter/http_tw.go index eb4c351..a5ebc4e 100644 --- a/internal/adapter/http_tw.go +++ b/internal/adapter/http_tw.go @@ -201,3 +201,50 @@ func (t *TWHTTP) convertTribesToDomain(tribes []tw.Tribe) (domain.BaseTribes, er return res, nil } + +func (t *TWHTTP) GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error) { + tribes, err := t.client.GetPlayers(ctx, baseURL) + if err != nil { + return nil, err + } + + return t.convertPlayersToDomain(tribes) +} + +func (t *TWHTTP) convertPlayersToDomain(players []tw.Player) (domain.BasePlayers, error) { + res := make(domain.BasePlayers, 0, len(players)) + + for _, p := range players { + od, err := domain.NewOpponentsDefeated( + p.OpponentsDefeated.RankAtt, + p.OpponentsDefeated.ScoreAtt, + p.OpponentsDefeated.RankDef, + p.OpponentsDefeated.ScoreDef, + p.OpponentsDefeated.RankSup, + p.OpponentsDefeated.ScoreSup, + p.OpponentsDefeated.RankTotal, + p.OpponentsDefeated.ScoreTotal, + ) + if err != nil { + return nil, fmt.Errorf("couldn't construct domain.OpponentsDefeated: %w", err) + } + + converted, err := domain.NewBasePlayer( + p.ID, + p.Name, + p.NumVillages, + p.Points, + p.Rank, + p.TribeID, + od, + p.ProfileURL, + ) + if err != nil { + return nil, fmt.Errorf("couldn't construct domain.BasePlayer: %w", err) + } + + res = append(res, converted) + } + + return res, nil +} diff --git a/internal/app/service_player.go b/internal/app/service_player.go new file mode 100644 index 0000000..7f234d0 --- /dev/null +++ b/internal/app/service_player.go @@ -0,0 +1,27 @@ +package app + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type PlayerService struct { + twSvc TWService +} + +func NewPlayerService(twSvc TWService) *PlayerService { + return &PlayerService{twSvc: twSvc} +} + +func (svc *PlayerService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error { + tribes, err := svc.twSvc.GetPlayers(ctx, payload.URL()) + if err != nil { + return fmt.Errorf("%s: couldn't get players: %w", payload.Key(), err) + } + + fmt.Println(payload.URL(), len(tribes)) + + return nil +} diff --git a/internal/app/service_tw.go b/internal/app/service_tw.go index b0e8d54..a9e243d 100644 --- a/internal/app/service_tw.go +++ b/internal/app/service_tw.go @@ -13,4 +13,5 @@ type TWService interface { GetUnitInfo(ctx context.Context, baseURL *url.URL) (domain.UnitInfo, error) GetBuildingInfo(ctx context.Context, baseURL *url.URL) (domain.BuildingInfo, error) GetTribes(ctx context.Context, baseURL *url.URL) (domain.BaseTribes, error) + GetPlayers(ctx context.Context, baseURL *url.URL) (domain.BasePlayers, error) } diff --git a/internal/domain/base_player.go b/internal/domain/base_player.go new file mode 100644 index 0000000..3329b2c --- /dev/null +++ b/internal/domain/base_player.go @@ -0,0 +1,131 @@ +package domain + +import ( + "math" + "net/url" +) + +type BasePlayer struct { + id int + name string + numVillages int + points int + rank int + tribeID int + od OpponentsDefeated + profileURL *url.URL +} + +const basePlayerModelName = "BasePlayer" + +func NewBasePlayer( + id int, + name string, + numVillages int, + points int, + rank int, + tribeID int, + od OpponentsDefeated, + profileURL *url.URL, +) (BasePlayer, error) { + if err := validateIntInRange(id, 1, math.MaxInt); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "id", + Err: err, + } + } + + if err := validateStringLen(name, playerNameMinLength, playerNameMaxLength); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "name", + Err: err, + } + } + + if err := validateIntInRange(numVillages, 0, math.MaxInt); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "numVillages", + Err: err, + } + } + + if err := validateIntInRange(points, 0, math.MaxInt); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "points", + Err: err, + } + } + + if err := validateIntInRange(rank, 0, math.MaxInt); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "rank", + Err: err, + } + } + + if err := validateIntInRange(tribeID, 0, math.MaxInt); err != nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "tribeID", + Err: err, + } + } + + if profileURL == nil { + return BasePlayer{}, ValidationError{ + Model: basePlayerModelName, + Field: "profileURL", + Err: ErrNil, + } + } + + return BasePlayer{ + id: id, + name: name, + numVillages: numVillages, + points: points, + rank: rank, + tribeID: tribeID, + od: od, + profileURL: profileURL, + }, nil +} + +func (p BasePlayer) ID() int { + return p.id +} + +func (p BasePlayer) Name() string { + return p.name +} + +func (p BasePlayer) NumVillages() int { + return p.numVillages +} + +func (p BasePlayer) Points() int { + return p.points +} + +func (p BasePlayer) Rank() int { + return p.rank +} + +func (p BasePlayer) TribeID() int { + return p.tribeID +} + +func (p BasePlayer) OD() OpponentsDefeated { + return p.od +} + +func (p BasePlayer) ProfileURL() *url.URL { + return p.profileURL +} + +type BasePlayers []BasePlayer diff --git a/internal/domain/base_player_test.go b/internal/domain/base_player_test.go new file mode 100644 index 0000000..1a68459 --- /dev/null +++ b/internal/domain/base_player_test.go @@ -0,0 +1,247 @@ +package domain_test + +import ( + "net/url" + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewBasePlayer(t *testing.T) { + t.Parallel() + + validBasePlayer := domaintest.NewBasePlayer(t) + + type args struct { + id int + name string + numVillages int + points int + rank int + tribeID int + od domain.OpponentsDefeated + profileURL *url.URL + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + }, + { + name: "ERR: id < 1", + args: args{ + id: 0, + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "id", + Err: domain.MinGreaterEqualError{ + Min: 1, + Current: 0, + }, + }, + }, + { + name: "ERR: len(name) < 1", + args: args{ + id: validBasePlayer.ID(), + name: "", + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "name", + Err: domain.LenOutOfRangeError{ + Min: 1, + Max: 150, + Current: 0, + }, + }, + }, + { + name: "ERR: len(name) > 150", + args: args{ + id: validBasePlayer.ID(), + name: gofakeit.LetterN(151), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "name", + Err: domain.LenOutOfRangeError{ + Min: 1, + Max: 150, + Current: 151, + }, + }, + }, + { + name: "ERR: numVillages < 0", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: -1, + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "numVillages", + Err: domain.MinGreaterEqualError{ + Min: 0, + Current: -1, + }, + }, + }, + { + name: "ERR: points < 0", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: -1, + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "points", + Err: domain.MinGreaterEqualError{ + Min: 0, + Current: -1, + }, + }, + }, + { + name: "ERR: rank < 0", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: -1, + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "rank", + Err: domain.MinGreaterEqualError{ + Min: 0, + Current: -1, + }, + }, + }, + { + name: "ERR: tribeID < 0", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: -1, + od: validBasePlayer.OD(), + profileURL: validBasePlayer.ProfileURL(), + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "tribeID", + Err: domain.MinGreaterEqualError{ + Min: 0, + Current: -1, + }, + }, + }, + { + name: "ERR: profileURL can't be nil", + args: args{ + id: validBasePlayer.ID(), + name: validBasePlayer.Name(), + numVillages: validBasePlayer.NumVillages(), + points: validBasePlayer.Points(), + rank: validBasePlayer.Rank(), + tribeID: validBasePlayer.TribeID(), + od: validBasePlayer.OD(), + profileURL: nil, + }, + expectedErr: domain.ValidationError{ + Model: "BasePlayer", + Field: "profileURL", + Err: domain.ErrNil, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + res, err := domain.NewBasePlayer( + tt.args.id, + tt.args.name, + tt.args.numVillages, + tt.args.points, + tt.args.rank, + tt.args.tribeID, + tt.args.od, + tt.args.profileURL, + ) + require.ErrorIs(t, err, tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.id, res.ID()) + assert.Equal(t, tt.args.name, res.Name()) + assert.Equal(t, tt.args.numVillages, res.NumVillages()) + assert.Equal(t, tt.args.points, res.Points()) + assert.Equal(t, tt.args.rank, res.Rank()) + assert.Equal(t, tt.args.tribeID, res.TribeID()) + assert.Equal(t, tt.args.od, res.OD()) + assert.Equal(t, tt.args.profileURL, res.ProfileURL()) + }) + } +} diff --git a/internal/domain/domaintest/base_player.go b/internal/domain/domaintest/base_player.go new file mode 100644 index 0000000..7bfacd5 --- /dev/null +++ b/internal/domain/domaintest/base_player.go @@ -0,0 +1,52 @@ +package domaintest + +import ( + "net/url" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/require" +) + +type BasePlayerConfig struct { + ID int + OD domain.OpponentsDefeated + NumVillages int + AllPoints int + Rank int + TribeID int +} + +func NewBasePlayer(tb TestingTB, opts ...func(cfg *BasePlayerConfig)) domain.BasePlayer { + tb.Helper() + + cfg := &BasePlayerConfig{ + ID: RandID(), + OD: NewOpponentsDefeated(tb), + NumVillages: gofakeit.IntRange(1, 10000), + AllPoints: gofakeit.IntRange(1, 10000), + Rank: gofakeit.IntRange(1, 10000), + TribeID: gofakeit.IntRange(0, 10000), + } + + for _, opt := range opts { + opt(cfg) + } + + u, err := url.ParseRequestURI(gofakeit.URL()) + require.NoError(tb, err) + + p, err := domain.NewBasePlayer( + cfg.ID, + gofakeit.LetterN(50), + cfg.NumVillages, + cfg.AllPoints, + cfg.Rank, + cfg.TribeID, + cfg.OD, + u, + ) + require.NoError(tb, err) + + return p +} diff --git a/internal/domain/player.go b/internal/domain/player.go new file mode 100644 index 0000000..dca21ac --- /dev/null +++ b/internal/domain/player.go @@ -0,0 +1,6 @@ +package domain + +const ( + playerNameMinLength = 1 + playerNameMaxLength = 150 +) diff --git a/internal/port/consumer_watermill_player.go b/internal/port/consumer_watermill_player.go new file mode 100644 index 0000000..25fbb8a --- /dev/null +++ b/internal/port/consumer_watermill_player.go @@ -0,0 +1,63 @@ +package port + +import ( + "gitea.dwysokinski.me/twhelp/corev3/internal/app" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type PlayerWatermillConsumer struct { + svc *app.PlayerService + subscriber message.Subscriber + logger watermill.LoggerAdapter + marshaler watermillmsg.Marshaler + eventServerSyncedTopic string +} + +func NewPlayerWatermillConsumer( + svc *app.PlayerService, + subscriber message.Subscriber, + logger watermill.LoggerAdapter, + marshaler watermillmsg.Marshaler, + eventServerSyncedTopic string, +) *PlayerWatermillConsumer { + return &PlayerWatermillConsumer{ + svc: svc, + subscriber: subscriber, + logger: logger, + marshaler: marshaler, + eventServerSyncedTopic: eventServerSyncedTopic, + } +} + +func (c *PlayerWatermillConsumer) Register(router *message.Router) { + router.AddNoPublisherHandler( + "PlayerConsumer.sync", + c.eventServerSyncedTopic, + c.subscriber, + c.sync, + ) +} + +func (c *PlayerWatermillConsumer) sync(msg *message.Message) error { + var rawPayload watermillmsg.ServerSyncedEventPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode) + if err != nil { + c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.svc.Sync(msg.Context(), payload) +} diff --git a/k8s/base/kustomization.yml b/k8s/base/kustomization.yml index e8d20b9..df64eba 100644 --- a/k8s/base/kustomization.yml +++ b/k8s/base/kustomization.yml @@ -4,6 +4,7 @@ resources: - jobs.yml - server-consumer.yml - tribe-consumer.yml + - player-consumer.yml images: - name: twhelp newName: twhelp diff --git a/k8s/base/player-consumer.yml b/k8s/base/player-consumer.yml new file mode 100644 index 0000000..0009f97 --- /dev/null +++ b/k8s/base/player-consumer.yml @@ -0,0 +1,44 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: twhelp-player-consumer-deployment +spec: + selector: + matchLabels: + app: twhelp-player-consumer + template: + metadata: + labels: + app: twhelp-player-consumer + spec: + containers: + - name: twhelp-player-consumer + image: twhelp + args: [consumer, player] + env: + - name: APP_MODE + value: development + - name: LOG_LEVEL + value: debug + - name: DB_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: db-connection-string + - name: RABBITMQ_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: rabbitmq-connection-string + livenessProbe: + exec: + command: [cat, /tmp/live] + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 300m + memory: 300Mi