feat: server consumer (#6)

Reviewed-on: twhelp/corev3#6
This commit is contained in:
Dawid Wysokiński 2023-12-23 09:54:47 +00:00
parent 072023dafb
commit d787edde00
30 changed files with 1559 additions and 54 deletions

View File

@ -110,6 +110,7 @@ linters-settings:
ignored-functions:
- strconv.FormatInt
- strconv.ParseInt
- ^gofakeit\.
revive:
rules:
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#add-constant
@ -479,3 +480,11 @@ issues:
- linters:
- lll
source: "^//go:generate "
# Ram in used in a different context, so we have to disable this linter warning
# var-naming: struct field Ram should be RAM
- path: unit_info\.go
text: "var-naming: struct field Ram should be RAM"
- path: unit_info\.go
# Ram in used in a different context, so we have to disable this linter warning
# var-naming: struct field Ram should be RAM
text: "var-naming: method Ram should be RAM"

View File

@ -43,7 +43,7 @@ func newApp(name, version string) *appWrapper {
app.Name = name
app.HelpName = name
app.Version = version
app.Commands = []*cli.Command{cmdDB, cmdJob}
app.Commands = []*cli.Command{cmdDB, cmdJob, cmdConsumer}
app.Flags = concatSlices(appFlags, logFlags)
app.Before = app.handleBefore
return app

221
cmd/twhelp/cmd_consumer.go Normal file
View File

@ -0,0 +1,221 @@
package main
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthfile"
"gitea.dwysokinski.me/twhelp/corev3/internal/port"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/uptrace/bun"
"github.com/urfave/cli/v2"
)
var cmdConsumer = &cli.Command{
Name: "consumer",
Usage: "Run one of the workers",
Subcommands: []*cli.Command{
{
Name: "server",
Usage: "Run the worker responsible for consuming server-related messages",
Flags: concatSlices(dbFlags, rmqFlags, twSvcFlags),
Action: func(c *cli.Context) error {
return runConsumer(
c,
"ServerConsumer",
func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error {
twSvc, err := newTWServiceFromFlags(c)
if err != nil {
return err
}
consumer := port.NewServerWatermillConsumer(
app.NewServerService(twSvc),
subscriber,
logger,
marshaler,
c.String(rmqFlagTopicSyncServersCmd.Name),
)
consumer.Register(router)
return nil
},
)
},
},
},
}
type registerHandlersFunc func(
c *cli.Context,
router *message.Router,
logger watermill.LoggerAdapter,
publisher *amqp.Publisher,
subscriber *amqp.Subscriber,
marshaler watermillmsg.Marshaler,
db *bun.DB,
) error
//nolint:gocyclo
func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFunc) error {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
logger := loggerFromCtx(ctx)
watermillLogger := newWatermillLogger(logger)
var wg sync.WaitGroup
defer func() {
// it's required for the graceful shutdown
wg.Wait()
}()
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := amqpConn.Close(); closeErr != nil {
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
}
}()
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := publisher.Close(); closeErr != nil {
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
}
}()
subscriber, err := newAMQPSubscriber(amqpConn, watermillLogger, name)
if err != nil {
return err
}
defer func() {
if closeErr := subscriber.Close(); closeErr != nil {
logger.Warn("couldn't close amqp subscriber", slog.Any("error", err))
}
}()
bunDB, err := newBunDBFromFlags(c)
if err != nil {
return err
}
defer func() {
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
} else {
logger.Debug("db connections closed")
}
}()
healthObserver := healthfile.LiveObserver(health.New(), "/tmp/live")
defer func() {
logger.Debug("closing health observer...")
if closeErr := healthObserver.Close(); closeErr != nil {
logger.Warn("couldn't close health observer", slog.Any("error", closeErr))
} else {
logger.Debug("health observer closed")
}
}()
router, err := newWatermillRouter(watermillLogger)
if err != nil {
return err
}
if err = registerHandlers(
c,
router,
watermillLogger,
publisher,
subscriber,
newWatermillMarshaler(),
bunDB,
); err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
if runErr := healthObserver.Run(ctx); runErr != nil {
logger.Warn("couldn't run health observer", slog.Any("error", runErr))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-router.Running():
logger.Info("consumer is up and running", slog.String("name", name))
}
waitForShutdownSignal(ctx)
if closeErr := router.Close(); closeErr != nil {
logger.Warn("couldn't close router", slog.Any("error", err))
}
}()
if err = router.Run(ctx); err != nil {
cancel()
return fmt.Errorf("couldn't run router: %w", err)
}
return nil
}
const (
watermillRouterCloseTimeout = 30 * time.Second
watermillRetryMWMaxRetries = 3
watermillRetryMWMInitialInterval = 3 * time.Second
watermillRetryMWMMaxInterval = 10 * time.Second
watermillRetryMWMIntervalMultiplier = 1.5
)
func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
router, err := message.NewRouter(message.RouterConfig{
CloseTimeout: watermillRouterCloseTimeout,
}, logger)
if err != nil {
return nil, err
}
router.AddMiddleware(middleware.Recoverer)
router.AddMiddleware(middleware.Retry{
MaxRetries: watermillRetryMWMaxRetries,
InitialInterval: watermillRetryMWMInitialInterval,
MaxInterval: watermillRetryMWMMaxInterval,
Multiplier: watermillRetryMWMIntervalMultiplier,
Logger: logger,
}.Middleware)
return router, nil
}

View File

@ -32,7 +32,7 @@ func init() {
var cmdDB = &cli.Command{
Name: "db",
Usage: "Manages database migrations",
Usage: "Manage the database",
Subcommands: []*cli.Command{
{
Name: "migrate",

57
cmd/twhelp/twhelp.go Normal file
View File

@ -0,0 +1,57 @@
package main
import (
"bytes"
"fmt"
"net/http"
"text/template"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
"gitea.dwysokinski.me/twhelp/corev3/internal/tw"
"github.com/urfave/cli/v2"
)
var (
twSvcFlagUserAgent = &cli.StringFlag{
Name: "tw.userAgent",
Value: "tribalwarshelp/{{ .AppVersion }}",
EnvVars: []string{"TW_USER_AGENT"},
}
twSvcFlagTimeout = &cli.DurationFlag{
Name: "tw.timeout",
Value: 10 * time.Second, //nolint:gomnd
EnvVars: []string{"TW_TIMEOUT"},
Usage: "https://pkg.go.dev/net/http#Client.Timeout",
}
twSvcFlags = []cli.Flag{
twSvcFlagUserAgent,
twSvcFlagTimeout,
}
)
func newTWServiceFromFlags(c *cli.Context) (*adapter.TWHTTP, error) {
return newTWService(c.App.Version, c.String(twSvcFlagUserAgent.Name), c.Duration(twSvcFlagTimeout.Name))
}
func newTWService(appVersion, userAgent string, timeout time.Duration) (*adapter.TWHTTP, error) {
t, err := template.New("").Parse(userAgent)
if err != nil {
return nil, fmt.Errorf("something went wron while parsing user agent template: %w", err)
}
buf := bytes.NewBuffer(nil)
if err = t.Execute(buf, map[string]any{
"AppVersion": appVersion,
}); err != nil {
return nil, fmt.Errorf("couldn't exec user agent template: %w", err)
}
return adapter.NewTWHTTP(tw.NewClient(
tw.WithHTTPClient(&http.Client{
Timeout: timeout,
}),
tw.WithUserAgent(buf.String()),
)), nil
}

View File

@ -32,3 +32,9 @@ var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
func newShutdownSignalContext(parent context.Context) (context.Context, context.CancelFunc) {
return signal.NotifyContext(parent, shutdownSignals...)
}
func waitForShutdownSignal(ctx context.Context) {
ctx, cancel := newShutdownSignalContext(ctx)
defer cancel()
<-ctx.Done()
}

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.21
require (
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1
github.com/brianvoe/gofakeit/v6 v6.26.3
github.com/cenkalti/backoff/v4 v4.2.1
github.com/elliotchance/phpserialize v1.3.3
github.com/go-chi/chi/v5 v5.0.10
@ -57,6 +58,7 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect

4
go.sum
View File

@ -9,6 +9,8 @@ github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 h1:OxvMB2/3YtcQuC7quC+CGmFpGz9oaxP2ef5wkp+R2oM=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1/go.mod h1:MCNoh0HUg4w0bY64on9BnhUodHeimz8+vMfXrzyuWN8=
github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8=
github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
@ -130,6 +132,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

View File

@ -0,0 +1,41 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/tw"
)
type TWHTTP struct {
client *tw.Client
}
func NewTWHTTP(client *tw.Client) *TWHTTP {
return &TWHTTP{client: client}
}
func (t *TWHTTP) GetOpenServers(ctx context.Context, baseURL string) (domain.BaseServers, error) {
servers, err := t.client.GetOpenServers(ctx, baseURL)
if err != nil {
return nil, err
}
return t.convertServersToDomain(servers)
}
func (t *TWHTTP) convertServersToDomain(servers []tw.Server) (domain.BaseServers, error) {
res := make(domain.BaseServers, 0, len(servers))
for _, s := range servers {
converted, err := domain.NewBaseServer(s.Key, s.URL, true)
if err != nil {
return nil, fmt.Errorf("couldn't construct domain.BaseServer: %w", err)
}
res = append(res, converted)
}
return res, nil
}

View File

@ -0,0 +1,28 @@
package app
import (
"context"
"fmt"
"log"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
)
type ServerService struct {
twSvc TWService
}
func NewServerService(twSvc TWService) *ServerService {
return &ServerService{twSvc: twSvc}
}
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
openServers, err := svc.twSvc.GetOpenServers(ctx, payload.URL().String())
if err != nil {
return fmt.Errorf("couldn't get open servers for version code '%s': %w", payload.VersionCode(), err)
}
log.Println(openServers)
return nil
}

View File

@ -0,0 +1,11 @@
package app
import (
"context"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
)
type TWService interface {
GetOpenServers(ctx context.Context, baseURL string) (domain.BaseServers, error)
}

View File

@ -0,0 +1,50 @@
package domain
import "net/url"
type BaseServer struct {
key string
url *url.URL
open bool
}
func NewBaseServer(key, rawURL string, open bool) (BaseServer, error) {
if err := validateServerKey(key); err != nil {
return BaseServer{}, ValidationError{
Err: err,
Field: "key",
}
}
u, err := parseURL(rawURL)
if err != nil {
return BaseServer{}, ValidationError{
Err: err,
Field: "url",
}
}
return BaseServer{
key: key,
url: u,
open: open,
}, nil
}
func (b BaseServer) Key() string {
return b.key
}
func (b BaseServer) URL() *url.URL {
return b.url
}
func (b BaseServer) Open() bool {
return b.open
}
func (b BaseServer) IsZero() bool {
return b == BaseServer{}
}
type BaseServers []BaseServer

View File

@ -0,0 +1,83 @@
package domain_test
import (
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewBaseServer(t *testing.T) {
t.Parallel()
validBaseServer := domaintest.NewBaseServer(t, domaintest.BaseServerConfig{})
type args struct {
key string
url string
open bool
}
type test struct {
name string
args args
expectedErr error
}
tests := []test{
{
name: "OK",
args: args{
key: validBaseServer.Key(),
url: validBaseServer.URL().String(),
open: validBaseServer.Open(),
},
},
{
name: "ERR: invalid URL",
args: args{
key: validBaseServer.Key(),
url: " ",
open: validBaseServer.Open(),
},
expectedErr: domain.ValidationError{
Err: domain.InvalidURLError{URL: " "},
Field: "url",
},
},
}
for _, serverKeyTest := range newServerKeyValidationTests() {
tests = append(tests, test{
name: serverKeyTest.name,
args: args{
key: serverKeyTest.key,
url: validBaseServer.URL().String(),
open: validBaseServer.Open(),
},
expectedErr: domain.ValidationError{
Err: serverKeyTest.expectedErr,
Field: "key",
},
})
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
res, err := domain.NewBaseServer(tt.args.key, tt.args.url, tt.args.open)
require.ErrorIs(t, err, tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.key, res.Key())
assert.Equal(t, tt.args.url, res.URL().String())
assert.Equal(t, tt.args.open, res.Open())
})
}
}

View File

@ -0,0 +1,144 @@
package domain
type Building struct {
MaxLevel int
MinLevel int
Wood int
Stone int
Iron int
Pop int
WoodFactor float64
StoneFactor float64
IronFactor float64
PopFactor float64
BuildTime float64
BuildTimeFactor float64
}
type BuildingInfo struct {
main Building
barracks Building
stable Building
garage Building
watchtower Building
snob Building
smith Building
place Building
statue Building
market Building
wood Building
stone Building
iron Building
farm Building
storage Building
hide Building
wall Building
}
func NewBuildingInfo(
main Building,
barracks Building,
stable Building,
garage Building,
watchtower Building,
snob Building,
smith Building,
place Building,
statue Building,
market Building,
wood Building,
stone Building,
iron Building,
farm Building,
storage Building,
hide Building,
wall Building,
) (BuildingInfo, error) {
return BuildingInfo{
main: main,
barracks: barracks,
stable: stable,
garage: garage,
watchtower: watchtower,
snob: snob,
smith: smith,
place: place,
statue: statue,
market: market,
wood: wood,
stone: stone,
iron: iron,
farm: farm,
storage: storage,
hide: hide,
wall: wall,
}, nil
}
func (b BuildingInfo) Main() Building {
return b.main
}
func (b BuildingInfo) Barracks() Building {
return b.barracks
}
func (b BuildingInfo) Stable() Building {
return b.stable
}
func (b BuildingInfo) Garage() Building {
return b.garage
}
func (b BuildingInfo) Watchtower() Building {
return b.watchtower
}
func (b BuildingInfo) Snob() Building {
return b.snob
}
func (b BuildingInfo) Smith() Building {
return b.smith
}
func (b BuildingInfo) Place() Building {
return b.place
}
func (b BuildingInfo) Statue() Building {
return b.statue
}
func (b BuildingInfo) Market() Building {
return b.market
}
func (b BuildingInfo) Wood() Building {
return b.wood
}
func (b BuildingInfo) Stone() Building {
return b.stone
}
func (b BuildingInfo) Iron() Building {
return b.iron
}
func (b BuildingInfo) Farm() Building {
return b.farm
}
func (b BuildingInfo) Storage() Building {
return b.storage
}
func (b BuildingInfo) Hide() Building {
return b.hide
}
func (b BuildingInfo) Wall() Building {
return b.wall
}

View File

@ -0,0 +1,34 @@
package domaintest
import (
"net/url"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/require"
)
type BaseServerConfig struct {
Key string
URL *url.URL
}
func (cfg *BaseServerConfig) init() {
if cfg.Key == "" {
cfg.Key = gofakeit.LetterN(5)
}
if cfg.URL == nil {
cfg.URL = &url.URL{
Scheme: "https",
Host: cfg.Key + ".plemiona.pl",
}
}
}
func NewBaseServer(tb TestingTB, cfg BaseServerConfig) domain.BaseServer {
cfg.init()
s, err := domain.NewBaseServer(cfg.Key, cfg.URL.String(), true)
require.NoError(tb, err)
return s
}

View File

@ -0,0 +1,9 @@
package domaintest
// TestingTB is a subset of the API provided by both *testing.T and *testing.B.
type TestingTB interface {
Helper()
Errorf(format string, args ...interface{})
FailNow()
Cleanup(f func())
}

View File

@ -0,0 +1,256 @@
package domain
type ServerConfigBuild struct {
Destroy int
}
type ServerConfigMisc struct {
KillRanking int
Tutorial int
TradeCancelTime int
}
type ServerConfigCommands struct {
MillisArrival int
CommandCancelTime int
}
type ServerConfigNewbie struct {
Days int
RatioDays int
Ratio int
RemoveNewbieVillages int
}
type ServerConfigGame struct {
BuildtimeFormula int
Knight int
KnightNewItems int
Archer int
Tech int
FarmLimit int
Church int
Watchtower int
Stronghold int
FakeLimit float64
BarbarianRise float64
BarbarianShrink int
BarbarianMaxPoints int
Scavenging int
Hauls int
HaulsBase int
HaulsMax int
BaseProduction int
Event int
SuppressEvents int
}
type ServerConfigBuildings struct {
CustomMain int
CustomFarm int
CustomStorage int
CustomPlace int
CustomBarracks int
CustomChurch int
CustomSmith int
CustomWood int
CustomStone int
CustomIron int
CustomMarket int
CustomStable int
CustomWall int
CustomGarage int
CustomHide int
CustomSnob int
CustomStatue int
CustomWatchtower int
}
type ServerConfigSnob struct {
Gold int
CheapRebuild int
Rise int
MaxDist int
Factor float64
CoinWood int
CoinStone int
CoinIron int
NoBarbConquer int
}
type ServerConfigAlly struct {
NoHarm int
NoOtherSupport int
NoOtherSupportType int
AllytimeSupport int
NoLeave int
NoJoin int
Limit int
FixedAllies int
PointsMemberCount int
WarsMemberRequirement int
WarsPointsRequirement int
WarsAutoacceptDays int
Levels int
XpRequirements string
}
type ServerConfigCoord struct {
MapSize int
Func int
EmptyVillages int
BonusVillages int
BonusNew int
Inner int
SelectStart int
VillageMoveWait int
NobleRestart int
StartVillages int
}
type ServerConfigSitter struct {
Allow int
}
type ServerConfigSleep struct {
Active int
Delay int
Min int
Max int
MinAwake int
MaxAwake int
WarnTime int
}
type ServerConfigNight struct {
Active int
StartHour int
EndHour int
DefFactor float64
Duration int
}
type ServerConfigWin struct {
Check int
}
type ServerConfig struct {
speed float64
unitSpeed float64
moral int
build ServerConfigBuild
misc ServerConfigMisc
commands ServerConfigCommands
newbie ServerConfigNewbie
game ServerConfigGame
buildings ServerConfigBuildings
snob ServerConfigSnob
ally ServerConfigAlly
coord ServerConfigCoord
sitter ServerConfigSitter
sleep ServerConfigSleep
night ServerConfigNight
win ServerConfigWin
}
func NewServerConfig(
speed float64,
unitSpeed float64,
moral int,
build ServerConfigBuild,
misc ServerConfigMisc,
commands ServerConfigCommands,
newbie ServerConfigNewbie,
game ServerConfigGame,
buildings ServerConfigBuildings,
snob ServerConfigSnob,
ally ServerConfigAlly,
coord ServerConfigCoord,
sitter ServerConfigSitter,
sleep ServerConfigSleep,
night ServerConfigNight,
win ServerConfigWin,
) (ServerConfig, error) {
return ServerConfig{
speed: speed,
unitSpeed: unitSpeed,
moral: moral,
build: build,
misc: misc,
commands: commands,
newbie: newbie,
game: game,
buildings: buildings,
snob: snob,
ally: ally,
coord: coord,
sitter: sitter,
sleep: sleep,
night: night,
win: win,
}, nil
}
func (s ServerConfig) Speed() float64 {
return s.speed
}
func (s ServerConfig) UnitSpeed() float64 {
return s.unitSpeed
}
func (s ServerConfig) Moral() int {
return s.moral
}
func (s ServerConfig) Build() ServerConfigBuild {
return s.build
}
func (s ServerConfig) Misc() ServerConfigMisc {
return s.misc
}
func (s ServerConfig) Commands() ServerConfigCommands {
return s.commands
}
func (s ServerConfig) Newbie() ServerConfigNewbie {
return s.newbie
}
func (s ServerConfig) Game() ServerConfigGame {
return s.game
}
func (s ServerConfig) Buildings() ServerConfigBuildings {
return s.buildings
}
func (s ServerConfig) Snob() ServerConfigSnob {
return s.snob
}
func (s ServerConfig) Ally() ServerConfigAlly {
return s.ally
}
func (s ServerConfig) Coord() ServerConfigCoord {
return s.coord
}
func (s ServerConfig) Sitter() ServerConfigSitter {
return s.sitter
}
func (s ServerConfig) Sleep() ServerConfigSleep {
return s.sleep
}
func (s ServerConfig) Night() ServerConfigNight {
return s.night
}
func (s ServerConfig) Win() ServerConfigWin {
return s.win
}

View File

@ -99,3 +99,30 @@ func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) {
})
}
}
type serverKeyValidationTest struct {
name string
key string
expectedErr error
}
func newServerKeyValidationTests() []serverKeyValidationTest {
return []serverKeyValidationTest{
{
name: "ERR: server key length < 1",
expectedErr: domain.LenError{
Min: 1,
Max: 10,
},
},
{
name: "ERR: server key length > 10",
key: "keykeykeyke",
expectedErr: domain.LenError{
Min: 1,
Max: 10,
Current: len("keykeykeyke"),
},
},
}
}

View File

@ -0,0 +1,112 @@
package domain
type Unit struct {
BuildTime float64
Pop int
Speed float64
Attack int
Defense int
DefenseCavalry int
DefenseArcher int
Carry int
}
type UnitInfo struct {
spear Unit
sword Unit
axe Unit
archer Unit
spy Unit
light Unit
marcher Unit
heavy Unit
ram Unit
catapult Unit
knight Unit
snob Unit
militia Unit
}
func NewUnitInfo(
spear Unit,
sword Unit,
axe Unit,
archer Unit,
spy Unit,
light Unit,
marcher Unit,
heavy Unit,
ram Unit,
catapult Unit,
knight Unit,
snob Unit,
militia Unit,
) (UnitInfo, error) {
return UnitInfo{
spear: spear,
sword: sword,
axe: axe,
archer: archer,
spy: spy,
light: light,
marcher: marcher,
heavy: heavy,
ram: ram,
catapult: catapult,
knight: knight,
snob: snob,
militia: militia,
}, nil
}
func (u UnitInfo) Spear() Unit {
return u.spear
}
func (u UnitInfo) Sword() Unit {
return u.sword
}
func (u UnitInfo) Axe() Unit {
return u.axe
}
func (u UnitInfo) Archer() Unit {
return u.archer
}
func (u UnitInfo) Spy() Unit {
return u.spy
}
func (u UnitInfo) Light() Unit {
return u.light
}
func (u UnitInfo) Marcher() Unit {
return u.marcher
}
func (u UnitInfo) Heavy() Unit {
return u.heavy
}
func (u UnitInfo) Ram() Unit {
return u.ram
}
func (u UnitInfo) Catapult() Unit {
return u.catapult
}
func (u UnitInfo) Knight() Unit {
return u.knight
}
func (u UnitInfo) Snob() Unit {
return u.snob
}
func (u UnitInfo) Militia() Unit {
return u.militia
}

View File

@ -43,3 +43,20 @@ func validateSliceLen[S ~[]E, E any](s S, min, max int) error {
return nil
}
const (
serverKeyMinLength = 1
serverKeyMaxLength = 10
)
func validateServerKey(key string) error {
if l := len(key); l < serverKeyMinLength || l > serverKeyMaxLength {
return LenError{
Min: serverKeyMinLength,
Max: serverKeyMaxLength,
Current: l,
}
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
@ -32,9 +33,9 @@ func TestHealth_CheckReady(t *testing.T) {
{
name: "3 checks passed",
options: []health.Option{
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test2", err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: nil}),
health.WithMaxConcurrent(2),
},
expectedResult: health.Result{
@ -59,9 +60,9 @@ func TestHealth_CheckReady(t *testing.T) {
{
name: "2 checks passed, 1 check failed",
options: []health.Option{
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test2", err: errCheckFailed}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: errCheckFailed}),
},
expectedResult: health.Result{
Status: health.StatusFail,
@ -121,9 +122,9 @@ func TestHealth_CheckLive(t *testing.T) {
{
name: "3 checks passed",
options: []health.Option{
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test2", err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: nil}),
health.WithMaxConcurrent(2),
},
expectedResult: health.Result{
@ -148,9 +149,9 @@ func TestHealth_CheckLive(t *testing.T) {
{
name: "2 checks passed, 1 check failed",
options: []health.Option{
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test2", err: errCheckFailed}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: errCheckFailed}),
},
expectedResult: health.Result{
Status: health.StatusFail,
@ -191,16 +192,3 @@ func TestHealth_CheckLive(t *testing.T) {
})
}
}
type checker struct {
name string
err error
}
func (c checker) Name() string {
return c.name
}
func (c checker) Check(_ context.Context) error {
return c.err
}

View File

@ -0,0 +1,147 @@
package healthfile
import (
"context"
"errors"
"os"
"sync"
"sync/atomic"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
)
type checkFunc func(ctx context.Context) health.Result
type Observer struct {
runningMu sync.Mutex
wg sync.WaitGroup
running atomic.Int32
closeCh chan struct{}
path string
check checkFunc
checkInterval time.Duration
checkTimeout time.Duration
}
type Option func(o *Observer)
func WithCheckInterval(interval time.Duration) Option {
return func(o *Observer) {
o.checkInterval = interval
}
}
func WithCheckTimeout(timeout time.Duration) Option {
return func(o *Observer) {
o.checkTimeout = timeout
}
}
func LiveObserver(h *health.Health, path string, opts ...Option) *Observer {
return newObserver(path, h.CheckLive, opts...)
}
func ReadyObserver(h *health.Health, path string, opts ...Option) *Observer {
return newObserver(path, h.CheckReady, opts...)
}
const (
checkTimeoutDefault = 5 * time.Second
checkIntervalDefault = 15 * time.Second
)
func newObserver(path string, check checkFunc, opts ...Option) *Observer {
o := &Observer{
path: path,
check: check,
checkTimeout: checkTimeoutDefault,
checkInterval: checkIntervalDefault,
}
for _, opt := range opts {
opt(o)
}
return o
}
var ErrAlreadyRunning = errors.New("observer is already running")
func (o *Observer) Run(ctx context.Context) error {
o.runningMu.Lock()
if !o.running.CompareAndSwap(0, 1) {
o.runningMu.Unlock()
return ErrAlreadyRunning
}
o.wg.Add(1)
o.closeCh = make(chan struct{})
o.runningMu.Unlock()
defer func() {
o.running.Add(-1)
o.wg.Done()
}()
ticker := time.NewTicker(o.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-o.closeCh:
return nil
default:
}
o.tick(ctx)
select {
case <-ctx.Done():
return ctx.Err()
case <-o.closeCh:
return nil
case <-ticker.C:
}
}
}
const filePerm = 0644
func (o *Observer) tick(parent context.Context) {
ctx, cancel := context.WithTimeout(parent, o.checkTimeout)
defer cancel()
res := o.check(ctx)
fileExists := checkFileExists(o.path)
switch {
case res.Status == health.StatusPass && !fileExists:
_ = os.WriteFile(o.path, []byte("pass"), filePerm)
case res.Status == health.StatusFail && fileExists:
_ = os.Remove(o.path)
}
}
func (o *Observer) Path() string {
return o.path
}
func (o *Observer) Close() error {
o.runningMu.Lock()
defer o.runningMu.Unlock()
if o.running.Load() == 0 {
return nil
}
close(o.closeCh)
o.wg.Wait()
return nil
}
func checkFileExists(filePath string) bool {
_, err := os.Stat(filePath)
return !errors.Is(err, os.ErrNotExist)
}

View File

@ -0,0 +1,144 @@
package healthfile_test
import (
"context"
"errors"
"path"
"sync"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthfile"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLiveObserver(t *testing.T) {
t.Parallel()
testObserver(t, func(t *testing.T) *healthfile.Observer {
t.Helper()
h := health.New(
health.WithLiveCheck(healthtest.Checker{Nam: "test"}),
health.WithLiveCheck(healthtest.Checker{Nam: "test2"}),
)
dir := t.TempDir()
livePath := path.Join(dir, "live")
o := healthfile.LiveObserver(h, livePath)
t.Cleanup(func() {
_ = o.Close()
})
return o
})
}
func TestReadyObserver(t *testing.T) {
t.Parallel()
testObserver(t, func(t *testing.T) *healthfile.Observer {
t.Helper()
h := health.New(
health.WithReadyCheck(healthtest.Checker{Nam: "test"}),
health.WithReadyCheck(healthtest.Checker{Nam: "test2"}),
)
dir := t.TempDir()
readyPath := path.Join(dir, "ready")
o := healthfile.ReadyObserver(h, readyPath)
t.Cleanup(func() {
_ = o.Close()
})
return o
})
}
func testObserver(t *testing.T, newObserver func(t *testing.T) *healthfile.Observer) {
t.Helper()
t.Run("standard run/close path", func(t *testing.T) {
t.Parallel()
o := newObserver(t)
observerStopped := make(chan struct{})
go func() {
defer close(observerStopped)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_ = o.Run(ctx)
}()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
assert.FileExists(collect, o.Path())
}, time.Second, 10*time.Millisecond)
require.NoError(t, o.Close())
select {
case <-observerStopped:
// OK
default:
t.Error("Run goroutine not stopped")
}
})
t.Run("observer respects ctx.Done", func(t *testing.T) {
t.Parallel()
o := newObserver(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
require.ErrorIs(t, o.Run(ctx), context.DeadlineExceeded)
})
t.Run("run can only be called once", func(t *testing.T) {
t.Parallel()
expectedErrs := 2
o := newObserver(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var wg sync.WaitGroup
errCh := make(chan error)
for i := 1; i <= 1+expectedErrs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
errCh <- o.Run(ctx)
}()
}
go func() {
wg.Wait()
close(errCh)
}()
var cnt int
for err := range errCh {
if errors.Is(err, healthfile.ErrAlreadyRunning) {
cnt++
}
if cnt == expectedErrs {
require.NoError(t, o.Close())
}
}
assert.Equal(t, expectedErrs, cnt)
})
}

View File

@ -1,7 +1,6 @@
package healthhttp_test
import (
"context"
"encoding/json"
"errors"
"net/http"
@ -11,6 +10,7 @@ import (
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthhttp"
"gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
@ -29,9 +29,9 @@ func TestReadyHandler(t *testing.T) {
{
name: "pass",
options: []health.Option{
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test2", err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: nil}),
},
expectedStatus: http.StatusOK,
expectedResponse: healthhttp.Response{
@ -56,9 +56,9 @@ func TestReadyHandler(t *testing.T) {
{
name: "2 checks passed, 1 check failed",
options: []health.Option{
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test", err: nil}),
health.WithReadyCheck(checker{name: "test2", err: errors.New("failed")}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: errors.New("failed")}),
},
expectedStatus: http.StatusServiceUnavailable,
expectedResponse: healthhttp.Response{
@ -118,9 +118,9 @@ func TestLiveHandler(t *testing.T) {
{
name: "pass",
options: []health.Option{
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test2", err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: nil}),
},
expectedStatus: http.StatusOK,
expectedResponse: healthhttp.Response{
@ -145,9 +145,9 @@ func TestLiveHandler(t *testing.T) {
{
name: "2 checks passed, 1 check failed",
options: []health.Option{
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test", err: nil}),
health.WithLiveCheck(checker{name: "test2", err: errors.New("failed")}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}),
health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: errors.New("failed")}),
},
expectedStatus: http.StatusServiceUnavailable,
expectedResponse: healthhttp.Response{
@ -194,16 +194,3 @@ func TestLiveHandler(t *testing.T) {
})
}
}
type checker struct {
name string
err error
}
func (c checker) Name() string {
return c.name
}
func (c checker) Check(_ context.Context) error {
return c.err
}

View File

@ -0,0 +1,25 @@
package healthtest
import (
"context"
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
)
// Checker is a mock implementation of health.Checker.
type Checker struct {
// Nam is later returned in the Name method
Nam string
// Err is later returned in the Check method
Err error
}
var _ health.Checker = Checker{}
func (c Checker) Name() string {
return c.Nam
}
func (c Checker) Check(_ context.Context) error {
return c.Err
}

View File

@ -0,0 +1,58 @@
package port
import (
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type ServerWatermillConsumer struct {
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
}
func NewServerWatermillConsumer(
svc *app.ServerService,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
marshaler watermillmsg.Marshaler,
cmdSyncTopic string,
) *ServerWatermillConsumer {
return &ServerWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
}
}
func (c *ServerWatermillConsumer) Register(router *message.Router) {
router.AddNoPublisherHandler("ServerConsumer.sync", c.cmdSyncTopic, c.subscriber, c.sync)
}
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
var rawPayload watermillmsg.SyncServersCmdPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewSyncServersCmdPayloadWithStringURL(rawPayload.VersionCode, rawPayload.URL)
if err != nil {
c.logger.Error("couldn't construct domain.SyncServersCmdPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.Sync(msg.Context(), payload)
}

View File

@ -26,7 +26,7 @@ type UnitInfo struct {
Light Unit `xml:"light"`
Marcher Unit `xml:"marcher"`
Heavy Unit `xml:"heavy"`
Ram Unit `xml:"ram"` //nolint:revive
Ram Unit `xml:"ram"`
Catapult Unit `xml:"catapult"`
Knight Unit `xml:"knight"`
Snob Unit `xml:"snob"`

View File

@ -2,6 +2,7 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- jobs.yml
- server-consumer.yml
images:
- name: twhelp
newName: twhelp

View File

@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: twhelp-server-consumer-deployment
spec:
selector:
matchLabels:
app: twhelp-server-consumer
template:
metadata:
labels:
app: twhelp-server-consumer
spec:
containers:
- name: twhelp-server-consumer
image: twhelp
args: [consumer, server]
env:
- name: APP_MODE
value: development
- name: LOG_LEVEL
value: debug
- name: DB_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: db-connection-string
- name: RABBITMQ_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: rabbitmq-connection-string
livenessProbe:
exec:
command: [cat, /tmp/live]
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 300m
memory: 300Mi