feat: add VillageConsumer (#30)
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: twhelp/core#30
This commit is contained in:
parent
cb13ad4f92
commit
b0bf482129
|
@ -41,6 +41,7 @@ func New() *cli.Command {
|
|||
newServerCommand(),
|
||||
newTribeCommand(),
|
||||
newPlayerCommand(),
|
||||
newVillageCommand(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -120,6 +121,31 @@ func newPlayerCommand() *cli.Command {
|
|||
}
|
||||
}
|
||||
|
||||
func newVillageCommand() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "village",
|
||||
Action: func(c *cli.Context) error {
|
||||
return runConsumer(
|
||||
c,
|
||||
"VillageConsumer",
|
||||
func(
|
||||
c *cli.Context,
|
||||
router *message.Router,
|
||||
watermillLogger watermill.LoggerAdapter,
|
||||
publisher *amqp.Publisher,
|
||||
subscriber *amqp.Subscriber,
|
||||
marshaler msg.CommandEventMarshaler,
|
||||
bunDB *bun.DB,
|
||||
) error {
|
||||
svc := service.NewVillage(bundb.NewVillage(bunDB), internal.NewTWClient(c.App.Version))
|
||||
msg.NewVillageConsumer(marshaler, publisher, subscriber, svc).Register(router)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type registerHandlersFn func(
|
||||
c *cli.Context,
|
||||
router *message.Router,
|
||||
|
|
55
internal/bundb/internal/model/village.go
Normal file
55
internal/bundb/internal/model/village.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type Village struct {
|
||||
bun.BaseModel `bun:"base_model,table:villages,alias:village"`
|
||||
|
||||
ID int64 `bun:"id,nullzero,pk"`
|
||||
ServerKey string `bun:"server_key,nullzero,pk,type:varchar(100)"`
|
||||
Name string `bun:"name,nullzero,notnull,type:varchar(150)"`
|
||||
Points int64 `bun:"points,default:0"`
|
||||
X int64 `bun:"x,default:0"`
|
||||
Y int64 `bun:"y,default:0"`
|
||||
Continent string `bun:"continent,type:varchar(5)"`
|
||||
Bonus int64 `bun:"bonus,default:0"`
|
||||
PlayerID int64 `bun:"player_id,nullzero"`
|
||||
CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
|
||||
}
|
||||
|
||||
func NewVillage(v domain.CreateVillageParams) Village {
|
||||
return Village{
|
||||
ID: v.ID,
|
||||
Name: v.Name,
|
||||
Points: v.Points,
|
||||
X: v.X,
|
||||
Y: v.Y,
|
||||
Continent: v.Continent,
|
||||
Bonus: v.Bonus,
|
||||
PlayerID: v.PlayerID,
|
||||
ServerKey: v.ServerKey,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (v Village) ToDomain() domain.Village {
|
||||
return domain.Village{
|
||||
BaseVillage: domain.BaseVillage{
|
||||
ID: v.ID,
|
||||
Name: v.Name,
|
||||
Points: v.Points,
|
||||
X: v.X,
|
||||
Y: v.Y,
|
||||
Continent: v.Continent,
|
||||
Bonus: v.Bonus,
|
||||
PlayerID: v.PlayerID,
|
||||
},
|
||||
ServerKey: v.ServerKey,
|
||||
CreatedAt: v.CreatedAt,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package migrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
|
||||
if _, err := db.NewCreateTable().
|
||||
Model(&model.Village{}).
|
||||
Varchar(defaultVarcharLength).
|
||||
ForeignKey(`(server_key) REFERENCES servers (key)`).
|
||||
Exec(ctx); err != nil {
|
||||
return fmt.Errorf("couldn't create the 'villages' table: %w", err)
|
||||
}
|
||||
return nil
|
||||
}, func(ctx context.Context, db *bun.DB) error {
|
||||
if _, err := db.NewDropTable().
|
||||
Model(&model.Village{}).
|
||||
Cascade().
|
||||
Exec(ctx); err != nil {
|
||||
return fmt.Errorf("couldn't drop the 'villages' table: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
79
internal/bundb/testdata/fixture.yml
vendored
79
internal/bundb/testdata/fixture.yml
vendored
|
@ -7120,3 +7120,82 @@
|
|||
rank: 231
|
||||
tribe_id: 2
|
||||
created_at: 2021-09-17T09:01:00.000Z
|
||||
- model: Village
|
||||
rows:
|
||||
- _id: pl169-village-1
|
||||
id: 1111
|
||||
server_key: pl169
|
||||
name: Village 1
|
||||
points: 12154
|
||||
x: 327
|
||||
y: 428
|
||||
continent: K43
|
||||
bonus: 0
|
||||
player_id: 699783765
|
||||
created_at: 2021-09-17T09:01:00.000Z
|
||||
- _id: pl169-village-2
|
||||
id: 1112
|
||||
server_key: pl169
|
||||
name: Village 2
|
||||
points: 12154
|
||||
x: 531
|
||||
y: 448
|
||||
continent: K45
|
||||
bonus: 1
|
||||
player_id: 699783765
|
||||
created_at: 2021-09-19T09:01:00.000Z
|
||||
- _id: pl169-village-3
|
||||
id: 1113
|
||||
server_key: pl169
|
||||
name: Village 3
|
||||
points: 2500
|
||||
x: 532
|
||||
y: 448
|
||||
continent: K45
|
||||
bonus: 0
|
||||
player_id: 0
|
||||
created_at: 2021-09-30T15:11:00.000Z
|
||||
- _id: pl169-village-4
|
||||
id: 1114
|
||||
server_key: pl169
|
||||
name: Village 4
|
||||
points: 2500
|
||||
x: 533
|
||||
y: 448
|
||||
continent: K45
|
||||
bonus: 4
|
||||
player_id: 0
|
||||
created_at: 2021-09-29T05:11:00.000Z
|
||||
- _id: it70-village-1
|
||||
id: 10022
|
||||
server_key: it70
|
||||
name: Village 1
|
||||
points: 12154
|
||||
x: 533
|
||||
y: 548
|
||||
continent: K55
|
||||
bonus: 4
|
||||
player_id: 578014
|
||||
created_at: 2022-02-21T18:00:10.000Z
|
||||
- _id: it70-village-2
|
||||
id: 10023
|
||||
server_key: it70
|
||||
name: Village 2
|
||||
points: 12154
|
||||
x: 633
|
||||
y: 548
|
||||
continent: K56
|
||||
bonus: 0
|
||||
player_id: 578014
|
||||
created_at: 2022-02-22T15:00:10.000Z
|
||||
- _id: it70-village-3
|
||||
id: 10024
|
||||
server_key: it70
|
||||
name: Village 3
|
||||
points: 12154
|
||||
x: 100
|
||||
y: 100
|
||||
continent: K11
|
||||
bonus: 0
|
||||
player_id: 0
|
||||
created_at: 2022-02-25T15:00:10.000Z
|
||||
|
|
88
internal/bundb/village.go
Normal file
88
internal/bundb/village.go
Normal file
|
@ -0,0 +1,88 @@
|
|||
package bundb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type Village struct {
|
||||
db *bun.DB
|
||||
}
|
||||
|
||||
func NewVillage(db *bun.DB) *Village {
|
||||
return &Village{db: db}
|
||||
}
|
||||
|
||||
func (v *Village) CreateOrUpdate(ctx context.Context, params ...domain.CreateVillageParams) error {
|
||||
if len(params) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
villages := make([]model.Village, 0, len(params))
|
||||
for _, p := range params {
|
||||
villages = append(villages, model.NewVillage(p))
|
||||
}
|
||||
if _, err := v.db.NewInsert().
|
||||
Model(&villages).
|
||||
On("CONFLICT ON CONSTRAINT villages_pkey DO UPDATE").
|
||||
Set("name = EXCLUDED.name").
|
||||
Set("points = EXCLUDED.points").
|
||||
Set("x = EXCLUDED.x").
|
||||
Set("y = EXCLUDED.y").
|
||||
Set("continent = EXCLUDED.continent").
|
||||
Set("bonus = EXCLUDED.bonus").
|
||||
Set("player_id = EXCLUDED.player_id").
|
||||
Returning("NULL").
|
||||
Exec(ctx); err != nil {
|
||||
return fmt.Errorf("something went wrong while inserting villages into db: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Village) List(ctx context.Context, params domain.ListVillagesParams) ([]domain.Village, int64, error) {
|
||||
var villages []model.Village
|
||||
var count int
|
||||
var err error
|
||||
|
||||
q := v.db.NewSelect().
|
||||
Model(&villages).
|
||||
Order("server_key ASC", "id ASC").
|
||||
Apply((listVillagesParamsApplier{params}).Apply)
|
||||
if params.Count {
|
||||
count, err = q.ScanAndCount(ctx)
|
||||
} else {
|
||||
err = q.Scan(ctx)
|
||||
}
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return nil, 0, fmt.Errorf("couldn't select villages from the db: %w", err)
|
||||
}
|
||||
|
||||
result := make([]domain.Village, 0, len(villages))
|
||||
for _, village := range villages {
|
||||
result = append(result, village.ToDomain())
|
||||
}
|
||||
return result, int64(count), nil
|
||||
}
|
||||
|
||||
type listVillagesParamsApplier struct {
|
||||
params domain.ListVillagesParams
|
||||
}
|
||||
|
||||
func (l listVillagesParamsApplier) Apply(q *bun.SelectQuery) *bun.SelectQuery {
|
||||
if l.params.IDs != nil {
|
||||
q = q.Where("id IN (?)", bun.In(l.params.IDs))
|
||||
}
|
||||
|
||||
if l.params.ServerKeys != nil {
|
||||
q = q.Where("server_key IN (?)", bun.In(l.params.ServerKeys))
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
289
internal/bundb/village_test.go
Normal file
289
internal/bundb/village_test.go
Normal file
|
@ -0,0 +1,289 @@
|
|||
package bundb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/uptrace/bun/dbfixture"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/bundb"
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
"github.com/jackc/pgerrcode"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/uptrace/bun/driver/pgdriver"
|
||||
)
|
||||
|
||||
func TestVillage_CreateOrUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
fixture := loadFixtures(t, db)
|
||||
repo := bundb.NewVillage(db)
|
||||
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
serverKey := "de188"
|
||||
playerGSus := getPlayerFromFixture(t, fixture, "de188-g-sus")
|
||||
playerRIP := getPlayerFromFixture(t, fixture, "de188-rest-in-peace")
|
||||
createParams := []domain.CreateVillageParams{
|
||||
{
|
||||
BaseVillage: domain.BaseVillage{
|
||||
ID: 999,
|
||||
Name: "name 999",
|
||||
Points: 4,
|
||||
X: 500,
|
||||
Y: 500,
|
||||
Continent: "K55",
|
||||
Bonus: 0,
|
||||
PlayerID: playerGSus.ID,
|
||||
},
|
||||
ServerKey: serverKey,
|
||||
},
|
||||
{
|
||||
BaseVillage: domain.BaseVillage{
|
||||
ID: 1000,
|
||||
Name: "name 1000",
|
||||
Points: 5,
|
||||
X: 501,
|
||||
Y: 500,
|
||||
Continent: "K55",
|
||||
Bonus: 2,
|
||||
PlayerID: 0,
|
||||
},
|
||||
ServerKey: serverKey,
|
||||
},
|
||||
}
|
||||
updateParams := make([]domain.CreateVillageParams, 0, len(createParams))
|
||||
ids := make([]int64, 0, len(createParams))
|
||||
for i, p := range createParams {
|
||||
var playerID int64
|
||||
if i%2 == 0 {
|
||||
playerID = playerRIP.ID
|
||||
}
|
||||
|
||||
updateParams = append(updateParams, domain.CreateVillageParams{
|
||||
BaseVillage: domain.BaseVillage{
|
||||
ID: p.ID,
|
||||
Name: p.Name + "Update",
|
||||
Points: p.Points + 1,
|
||||
X: p.X + 1,
|
||||
Y: p.Y + 1,
|
||||
Continent: p.Continent + "1",
|
||||
Bonus: p.Bonus + 1,
|
||||
PlayerID: playerID,
|
||||
},
|
||||
ServerKey: p.ServerKey,
|
||||
})
|
||||
ids = append(ids, p.ID)
|
||||
}
|
||||
listParams := domain.ListVillagesParams{
|
||||
IDs: ids,
|
||||
ServerKeys: []string{serverKey},
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
assert.NoError(t, repo.CreateOrUpdate(ctx, createParams...))
|
||||
createdVillages, _, err := repo.List(ctx, listParams)
|
||||
assert.NoError(t, err)
|
||||
assertCreatedVillages(t, createParams, createdVillages)
|
||||
|
||||
assert.NoError(t, repo.CreateOrUpdate(ctx, updateParams...))
|
||||
updatedVillages, _, err := repo.List(ctx, listParams)
|
||||
assert.NoError(t, err)
|
||||
assertCreatedVillages(t, updateParams, updatedVillages)
|
||||
})
|
||||
|
||||
t.Run("OK: len(params) == 0", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := repo.CreateOrUpdate(context.Background())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("ERR: server must exist in the db", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := repo.CreateOrUpdate(context.Background(), domain.CreateVillageParams{
|
||||
BaseVillage: domain.BaseVillage{
|
||||
ID: 999,
|
||||
Name: "name 999",
|
||||
Points: 4,
|
||||
X: 500,
|
||||
Y: 500,
|
||||
Continent: "K55",
|
||||
Bonus: 0,
|
||||
PlayerID: 0,
|
||||
},
|
||||
ServerKey: "pl999",
|
||||
})
|
||||
var pgErr pgdriver.Error
|
||||
assert.ErrorAs(t, err, &pgErr)
|
||||
assert.Equal(t, pgerrcode.ForeignKeyViolation, pgErr.Field('C'))
|
||||
assert.Contains(t, pgErr.Field('M'), "villages_server_key_fkey")
|
||||
})
|
||||
}
|
||||
|
||||
func TestVillage_List(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
fixture := loadFixtures(t, db)
|
||||
repo := bundb.NewVillage(db)
|
||||
villages := getAllVillagesFromFixture(t, fixture)
|
||||
|
||||
type expectedVillage struct {
|
||||
id int64
|
||||
serverKey string
|
||||
}
|
||||
|
||||
allVillages := make([]expectedVillage, 0, len(villages))
|
||||
for _, p := range villages {
|
||||
allVillages = append(allVillages, expectedVillage{
|
||||
id: p.ID,
|
||||
serverKey: p.ServerKey,
|
||||
})
|
||||
}
|
||||
|
||||
it70VillageCounter := 0
|
||||
for _, p := range villages {
|
||||
if p.ServerKey != "it70" {
|
||||
continue
|
||||
}
|
||||
it70VillageCounter++
|
||||
}
|
||||
it70Villages := make([]expectedVillage, 0, it70VillageCounter)
|
||||
for _, p := range villages {
|
||||
if p.ServerKey != "it70" {
|
||||
continue
|
||||
}
|
||||
|
||||
it70Villages = append(it70Villages, expectedVillage{
|
||||
id: p.ID,
|
||||
serverKey: p.ServerKey,
|
||||
})
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
params domain.ListVillagesParams
|
||||
expectedVillages []expectedVillage
|
||||
expectedCount int64
|
||||
}{
|
||||
{
|
||||
name: "Count=true",
|
||||
params: domain.ListVillagesParams{Count: true},
|
||||
expectedVillages: allVillages,
|
||||
expectedCount: int64(len(allVillages)),
|
||||
},
|
||||
{
|
||||
name: "Count=false",
|
||||
params: domain.ListVillagesParams{Count: false},
|
||||
expectedVillages: allVillages,
|
||||
expectedCount: 0,
|
||||
},
|
||||
{
|
||||
name: "ServerKeys=[it70],Count=true",
|
||||
params: domain.ListVillagesParams{ServerKeys: []string{"it70"}, Count: true},
|
||||
expectedVillages: it70Villages,
|
||||
expectedCount: int64(len(it70Villages)),
|
||||
},
|
||||
{
|
||||
name: "IDs=[10022, 1114],Count=true",
|
||||
params: domain.ListVillagesParams{IDs: []int64{10022, 1114}, Count: true},
|
||||
expectedVillages: []expectedVillage{
|
||||
{
|
||||
id: 10022,
|
||||
serverKey: "it70",
|
||||
},
|
||||
{
|
||||
id: 1114,
|
||||
serverKey: "pl169",
|
||||
},
|
||||
},
|
||||
expectedCount: 2,
|
||||
},
|
||||
{
|
||||
name: "IDs=[1113, 1114],ServerKeys=[pl169],Count=true",
|
||||
params: domain.ListVillagesParams{IDs: []int64{1113, 1114}, ServerKeys: []string{"pl169"}, Count: true},
|
||||
expectedVillages: []expectedVillage{
|
||||
{
|
||||
id: 1113,
|
||||
serverKey: "pl169",
|
||||
},
|
||||
{
|
||||
id: 1114,
|
||||
serverKey: "pl169",
|
||||
},
|
||||
},
|
||||
expectedCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
res, count, err := repo.List(context.Background(), tt.params)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedCount, count)
|
||||
assert.Len(t, res, len(tt.expectedVillages))
|
||||
for _, expVillage := range tt.expectedVillages {
|
||||
found := false
|
||||
for _, village := range res {
|
||||
if village.ID == expVillage.id && village.ServerKey == expVillage.serverKey {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.True(t, found, "village (id=%s,serverkey=%s) not found", expVillage.id, expVillage.serverKey)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertCreatedVillages(tb testing.TB, params []domain.CreateVillageParams, villages []domain.Village) {
|
||||
tb.Helper()
|
||||
|
||||
assert.Len(tb, villages, len(params))
|
||||
for _, p := range params {
|
||||
var village domain.Village
|
||||
for _, v := range villages {
|
||||
if v.ID == p.ID && v.ServerKey == p.ServerKey {
|
||||
village = v
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Equal(tb, p.BaseVillage, village.BaseVillage)
|
||||
assert.Equal(tb, p.ServerKey, village.ServerKey)
|
||||
assert.WithinDuration(tb, village.CreatedAt, time.Now(), 1*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func getAllVillagesFromFixture(tb testing.TB, fixture *dbfixture.Fixture) []model.Village {
|
||||
tb.Helper()
|
||||
|
||||
//nolint:lll
|
||||
ids := []string{"pl169-village-1", "pl169-village-2", "pl169-village-3", "pl169-village-4", "it70-village-1", "it70-village-2", "it70-village-3"}
|
||||
|
||||
villages := make([]model.Village, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
villages = append(villages, getVillageFromFixture(tb, fixture, id))
|
||||
}
|
||||
|
||||
return villages
|
||||
}
|
||||
|
||||
func getVillageFromFixture(tb testing.TB, fixture *dbfixture.Fixture, id string) model.Village {
|
||||
tb.Helper()
|
||||
v, err := fixture.Row("Village." + id)
|
||||
require.NoError(tb, err)
|
||||
p, ok := v.(*model.Village)
|
||||
require.True(tb, ok)
|
||||
return *p
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
package domain
|
||||
|
||||
import "time"
|
||||
|
||||
type BaseVillage struct {
|
||||
ID int64
|
||||
Name string
|
||||
|
@ -10,3 +12,39 @@ type BaseVillage struct {
|
|||
Bonus int64
|
||||
PlayerID int64
|
||||
}
|
||||
|
||||
type Village struct {
|
||||
BaseVillage
|
||||
|
||||
ServerKey string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
type CreateVillageParams struct {
|
||||
BaseVillage
|
||||
|
||||
ServerKey string
|
||||
}
|
||||
|
||||
type ListVillagesParams struct {
|
||||
IDs []int64
|
||||
ServerKeys []string
|
||||
Count bool
|
||||
}
|
||||
|
||||
type RefreshVillagesResult struct {
|
||||
NumVillages int64
|
||||
NumPlayerVillages int64
|
||||
NumBarbarianVillages int64
|
||||
NumBonusVillages int64
|
||||
}
|
||||
|
||||
type EventVillagesRefreshedPayload struct {
|
||||
Key string
|
||||
URL string
|
||||
VersionCode string
|
||||
NumVillages int64
|
||||
NumPlayerVillages int64
|
||||
NumBarbarianVillages int64
|
||||
NumBonusVillages int64
|
||||
}
|
||||
|
|
79
internal/msg/consumer_village.go
Normal file
79
internal/msg/consumer_village.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
//counterfeiter:generate -o internal/mock/village_service.gen.go . VillageService
|
||||
type VillageService interface {
|
||||
Refresh(ctx context.Context, key, url string) (domain.RefreshVillagesResult, error)
|
||||
}
|
||||
|
||||
type VillageConsumer struct {
|
||||
marshaler CommandEventMarshaler
|
||||
publisher message.Publisher
|
||||
subscriber message.Subscriber
|
||||
svc VillageService
|
||||
}
|
||||
|
||||
func NewVillageConsumer(
|
||||
marshaler CommandEventMarshaler,
|
||||
publisher message.Publisher,
|
||||
subscriber message.Subscriber,
|
||||
svc VillageService,
|
||||
) *VillageConsumer {
|
||||
return &VillageConsumer{
|
||||
marshaler: marshaler,
|
||||
publisher: publisher,
|
||||
subscriber: subscriber,
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *VillageConsumer) Register(router *message.Router) {
|
||||
router.AddHandler(
|
||||
"VillageConsumer.refresh",
|
||||
serversEventRefreshed,
|
||||
v.subscriber,
|
||||
villagesEventRefreshed,
|
||||
v.publisher,
|
||||
v.refresh,
|
||||
)
|
||||
}
|
||||
|
||||
func (v *VillageConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
|
||||
var payload domain.EventServerRefreshedPayload
|
||||
if err := v.marshaler.Unmarshal(msg, &payload); err != nil {
|
||||
// drop message with invalid payload
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !payload.Open {
|
||||
// only open servers should be updated
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
res, err := v.svc.Refresh(msg.Context(), payload.Key, payload.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("VillageService.Refresh: %w", err)
|
||||
}
|
||||
|
||||
ev, err := v.marshaler.Marshal(domain.EventVillagesRefreshedPayload{
|
||||
Key: payload.Key,
|
||||
URL: payload.URL,
|
||||
VersionCode: payload.VersionCode,
|
||||
NumVillages: res.NumVillages,
|
||||
NumPlayerVillages: res.NumPlayerVillages,
|
||||
NumBarbarianVillages: res.NumBarbarianVillages,
|
||||
NumBonusVillages: res.NumBonusVillages,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
|
||||
}
|
||||
return []*message.Message{ev}, nil
|
||||
}
|
104
internal/msg/consumer_village_test.go
Normal file
104
internal/msg/consumer_village_test.go
Normal file
|
@ -0,0 +1,104 @@
|
|||
package msg_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message/subscriber"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/msg"
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/mock"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestVillageConsumer_refresh(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
marshaler := msg.GobMarshaler{}
|
||||
pubSub := newPubSub(t)
|
||||
|
||||
refreshResult := domain.RefreshVillagesResult{
|
||||
NumVillages: 1500,
|
||||
NumPlayerVillages: 1400,
|
||||
NumBarbarianVillages: 100,
|
||||
NumBonusVillages: 50,
|
||||
}
|
||||
villageSvc := &mock.FakeVillageService{}
|
||||
villageSvc.RefreshReturns(refreshResult, nil)
|
||||
|
||||
runRouter(t, msg.NewVillageConsumer(marshaler, pubSub, pubSub, villageSvc))
|
||||
|
||||
msgs, err := pubSub.Subscribe(context.Background(), "villages.event.refreshed")
|
||||
require.NoError(t, err)
|
||||
|
||||
serverRefreshedPayload := domain.EventServerRefreshedPayload{
|
||||
Key: "pl151",
|
||||
URL: "https://pl151.plemiona.pl",
|
||||
Open: true,
|
||||
VersionCode: "pl",
|
||||
}
|
||||
ev, err := marshaler.Marshal(serverRefreshedPayload)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))
|
||||
|
||||
receivedMsgs, _ := subscriber.BulkRead(msgs, 1, 10*time.Second)
|
||||
require.Len(t, receivedMsgs, 1)
|
||||
var villagesRefreshedPayload domain.EventVillagesRefreshedPayload
|
||||
assert.NoError(t, marshaler.Unmarshal(receivedMsgs[0], &villagesRefreshedPayload))
|
||||
assert.Equal(t, serverRefreshedPayload.Key, villagesRefreshedPayload.Key)
|
||||
assert.Equal(t, serverRefreshedPayload.URL, villagesRefreshedPayload.URL)
|
||||
assert.Equal(t, serverRefreshedPayload.VersionCode, villagesRefreshedPayload.VersionCode)
|
||||
assert.Equal(t, refreshResult.NumVillages, villagesRefreshedPayload.NumVillages)
|
||||
assert.Equal(t, refreshResult.NumPlayerVillages, villagesRefreshedPayload.NumPlayerVillages)
|
||||
assert.Equal(t, refreshResult.NumBarbarianVillages, villagesRefreshedPayload.NumBarbarianVillages)
|
||||
assert.Equal(t, refreshResult.NumBonusVillages, villagesRefreshedPayload.NumBonusVillages)
|
||||
})
|
||||
|
||||
t.Run("OK: only open servers should be updated", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
marshaler := msg.GobMarshaler{}
|
||||
pubSub := newPubSub(t)
|
||||
villageSvc := &mock.FakeVillageService{}
|
||||
|
||||
wait := make(chan struct{})
|
||||
runRouter(
|
||||
t,
|
||||
middlewareRegisterer{
|
||||
h: func(h message.HandlerFunc) message.HandlerFunc {
|
||||
return func(msg *message.Message) ([]*message.Message, error) {
|
||||
if message.HandlerNameFromCtx(msg.Context()) != "VillageConsumer.refresh" {
|
||||
return h(msg)
|
||||
}
|
||||
|
||||
result, err := h(msg)
|
||||
if len(result) == 0 && err == nil {
|
||||
close(wait)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
},
|
||||
},
|
||||
msg.NewVillageConsumer(marshaler, pubSub, pubSub, villageSvc),
|
||||
)
|
||||
|
||||
ev, err := marshaler.Marshal(domain.EventServerRefreshedPayload{Open: false})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, pubSub.Publish("servers.event.refreshed", ev))
|
||||
|
||||
select {
|
||||
case <-wait:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
|
||||
assert.Equal(t, 0, villageSvc.RefreshCallCount())
|
||||
})
|
||||
}
|
60
internal/service/village.go
Normal file
60
internal/service/village.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
)
|
||||
|
||||
//counterfeiter:generate -o internal/mock/village_repository.gen.go . VillageRepository
|
||||
type VillageRepository interface {
|
||||
CreateOrUpdate(ctx context.Context, params ...domain.CreateVillageParams) error
|
||||
}
|
||||
|
||||
//counterfeiter:generate -o internal/mock/villages_getter.gen.go . VillagesGetter
|
||||
type VillagesGetter interface {
|
||||
GetVillages(ctx context.Context, baseURL string) ([]domain.BaseVillage, error)
|
||||
}
|
||||
|
||||
type Village struct {
|
||||
repo VillageRepository
|
||||
client VillagesGetter
|
||||
}
|
||||
|
||||
func NewVillage(repo VillageRepository, client VillagesGetter) *Village {
|
||||
return &Village{repo: repo, client: client}
|
||||
}
|
||||
|
||||
func (v *Village) Refresh(ctx context.Context, key, url string) (domain.RefreshVillagesResult, error) {
|
||||
villages, err := v.client.GetVillages(ctx, url)
|
||||
if err != nil {
|
||||
return domain.RefreshVillagesResult{}, fmt.Errorf("TWClient.GetVillages: %w", err)
|
||||
}
|
||||
|
||||
res := domain.RefreshVillagesResult{
|
||||
NumVillages: int64(len(villages)),
|
||||
}
|
||||
params := make([]domain.CreateVillageParams, 0, len(villages))
|
||||
for _, village := range villages {
|
||||
params = append(params, domain.CreateVillageParams{
|
||||
BaseVillage: village,
|
||||
ServerKey: key,
|
||||
})
|
||||
|
||||
if village.PlayerID > 0 {
|
||||
res.NumPlayerVillages++
|
||||
} else {
|
||||
res.NumBarbarianVillages++
|
||||
}
|
||||
if village.Bonus > 0 {
|
||||
res.NumBonusVillages++
|
||||
}
|
||||
}
|
||||
err = v.repo.CreateOrUpdate(ctx, params...)
|
||||
if err != nil {
|
||||
return domain.RefreshVillagesResult{}, fmt.Errorf("VillageRepository.CreateOrUpdate: %w", err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
90
internal/service/village_test.go
Normal file
90
internal/service/village_test.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package service_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/service"
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/service/internal/mock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
||||
)
|
||||
|
||||
func TestVillage_Refresh(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
villages := []domain.BaseVillage{
|
||||
{
|
||||
ID: 1,
|
||||
Name: "111",
|
||||
Points: 111,
|
||||
X: 111,
|
||||
Y: 111,
|
||||
Continent: "K11",
|
||||
Bonus: 1,
|
||||
PlayerID: 123,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
Name: "222",
|
||||
Points: 222,
|
||||
X: 222,
|
||||
Y: 222,
|
||||
Continent: "K22",
|
||||
Bonus: 0,
|
||||
PlayerID: 123,
|
||||
},
|
||||
{
|
||||
ID: 3,
|
||||
Name: "333",
|
||||
Points: 333,
|
||||
X: 333,
|
||||
Y: 333,
|
||||
Continent: "K33",
|
||||
Bonus: 3,
|
||||
PlayerID: 0,
|
||||
},
|
||||
{
|
||||
ID: 4,
|
||||
Name: "444",
|
||||
Points: 444,
|
||||
X: 444,
|
||||
Y: 444,
|
||||
Continent: "K44",
|
||||
Bonus: 0,
|
||||
PlayerID: 0,
|
||||
},
|
||||
}
|
||||
client := &mock.FakeVillagesGetter{}
|
||||
client.GetVillagesReturns(villages, nil)
|
||||
|
||||
repo := &mock.FakeVillageRepository{}
|
||||
repo.CreateOrUpdateReturns(nil)
|
||||
|
||||
serverKey, serverURL := "pl151", "https://pl151.plemiona.pl"
|
||||
|
||||
res, err := service.NewVillage(repo, client).Refresh(context.Background(), serverKey, serverURL)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, len(villages), res.NumVillages)
|
||||
assert.EqualValues(t, 2, res.NumBarbarianVillages)
|
||||
assert.EqualValues(t, 2, res.NumBonusVillages)
|
||||
assert.EqualValues(t, 2, res.NumPlayerVillages)
|
||||
require.Equal(t, 1, client.GetVillagesCallCount())
|
||||
require.Equal(t, 1, repo.CreateOrUpdateCallCount())
|
||||
_, params := repo.CreateOrUpdateArgsForCall(0)
|
||||
assert.Len(t, params, len(villages))
|
||||
for _, village := range villages {
|
||||
var villageToCreate domain.CreateVillageParams
|
||||
for _, p := range params {
|
||||
if p.ID == village.ID {
|
||||
villageToCreate = p
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Equal(t, village, villageToCreate.BaseVillage)
|
||||
assert.Equal(t, serverKey, villageToCreate.ServerKey)
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ resources:
|
|||
- server-consumer.yml
|
||||
- tribe-consumer.yml
|
||||
- player-consumer.yml
|
||||
- village-consumer.yml
|
||||
images:
|
||||
- name: twhelp
|
||||
newName: twhelp
|
||||
|
|
39
k8s/base/village-consumer.yml
Normal file
39
k8s/base/village-consumer.yml
Normal file
|
@ -0,0 +1,39 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: twhelp-village-consumer-deployment
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: twhelp-village-consumer
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: twhelp-village-consumer
|
||||
spec:
|
||||
containers:
|
||||
- name: twhelp-village-consumer
|
||||
image: twhelp
|
||||
args: ["consumer", "village"]
|
||||
env:
|
||||
- name: APP_MODE
|
||||
value: development
|
||||
- name: DB_MAX_OPEN_CONNECTIONS
|
||||
value: "5"
|
||||
- name: DB_MAX_IDLE_CONNECTIONS
|
||||
value: "3"
|
||||
- name: DB_DSN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: db-dsn
|
||||
- name: AMQP_URI
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: twhelp-secret
|
||||
key: amqp-uri
|
||||
livenessProbe:
|
||||
exec:
|
||||
command: [ "cat", "/tmp/healthy" ]
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 5
|
Reference in New Issue
Block a user