diff --git a/.golangci.yml b/.golangci.yml index 9e789fa..27fd47e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -110,6 +110,7 @@ linters-settings: ignored-functions: - strconv.FormatInt - strconv.ParseInt + - ^gofakeit\. revive: rules: # https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#add-constant @@ -479,3 +480,11 @@ issues: - linters: - lll source: "^//go:generate " + # Ram in used in a different context, so we have to disable this linter warning + # var-naming: struct field Ram should be RAM + - path: unit_info\.go + text: "var-naming: struct field Ram should be RAM" + - path: unit_info\.go + # Ram in used in a different context, so we have to disable this linter warning + # var-naming: struct field Ram should be RAM + text: "var-naming: method Ram should be RAM" diff --git a/cmd/twhelp/app.go b/cmd/twhelp/app.go index edf6aa1..2e98d7f 100644 --- a/cmd/twhelp/app.go +++ b/cmd/twhelp/app.go @@ -43,7 +43,7 @@ func newApp(name, version string) *appWrapper { app.Name = name app.HelpName = name app.Version = version - app.Commands = []*cli.Command{cmdDB, cmdJob} + app.Commands = []*cli.Command{cmdDB, cmdJob, cmdConsumer} app.Flags = concatSlices(appFlags, logFlags) app.Before = app.handleBefore return app diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go new file mode 100644 index 0000000..8a54425 --- /dev/null +++ b/cmd/twhelp/cmd_consumer.go @@ -0,0 +1,221 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/app" + "gitea.dwysokinski.me/twhelp/corev3/internal/health" + "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthfile" + "gitea.dwysokinski.me/twhelp/corev3/internal/port" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/uptrace/bun" + "github.com/urfave/cli/v2" +) + +var cmdConsumer = &cli.Command{ + Name: "consumer", + Usage: "Run one of the workers", + Subcommands: []*cli.Command{ + { + Name: "server", + Usage: "Run the worker responsible for consuming server-related messages", + Flags: concatSlices(dbFlags, rmqFlags, twSvcFlags), + Action: func(c *cli.Context) error { + return runConsumer( + c, + "ServerConsumer", + func( + c *cli.Context, + router *message.Router, + logger watermill.LoggerAdapter, + publisher *amqp.Publisher, + subscriber *amqp.Subscriber, + marshaler watermillmsg.Marshaler, + db *bun.DB, + ) error { + twSvc, err := newTWServiceFromFlags(c) + if err != nil { + return err + } + + consumer := port.NewServerWatermillConsumer( + app.NewServerService(twSvc), + subscriber, + logger, + marshaler, + c.String(rmqFlagTopicSyncServersCmd.Name), + ) + consumer.Register(router) + + return nil + }, + ) + }, + }, + }, +} + +type registerHandlersFunc func( + c *cli.Context, + router *message.Router, + logger watermill.LoggerAdapter, + publisher *amqp.Publisher, + subscriber *amqp.Subscriber, + marshaler watermillmsg.Marshaler, + db *bun.DB, +) error + +//nolint:gocyclo +func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFunc) error { + ctx, cancel := context.WithCancel(c.Context) + defer cancel() + + logger := loggerFromCtx(ctx) + watermillLogger := newWatermillLogger(logger) + + var wg sync.WaitGroup + defer func() { + // it's required for the graceful shutdown + wg.Wait() + }() + + amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := amqpConn.Close(); closeErr != nil { + logger.Warn("couldn't close amqp connection", slog.Any("error", err)) + } + }() + + publisher, err := newAMQPPublisher(amqpConn, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := publisher.Close(); closeErr != nil { + logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) + } + }() + + subscriber, err := newAMQPSubscriber(amqpConn, watermillLogger, name) + if err != nil { + return err + } + defer func() { + if closeErr := subscriber.Close(); closeErr != nil { + logger.Warn("couldn't close amqp subscriber", slog.Any("error", err)) + } + }() + + bunDB, err := newBunDBFromFlags(c) + if err != nil { + return err + } + defer func() { + logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) + if dbCloseErr := bunDB.Close(); dbCloseErr != nil { + logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) + } else { + logger.Debug("db connections closed") + } + }() + + healthObserver := healthfile.LiveObserver(health.New(), "/tmp/live") + defer func() { + logger.Debug("closing health observer...") + if closeErr := healthObserver.Close(); closeErr != nil { + logger.Warn("couldn't close health observer", slog.Any("error", closeErr)) + } else { + logger.Debug("health observer closed") + } + }() + + router, err := newWatermillRouter(watermillLogger) + if err != nil { + return err + } + + if err = registerHandlers( + c, + router, + watermillLogger, + publisher, + subscriber, + newWatermillMarshaler(), + bunDB, + ); err != nil { + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + + if runErr := healthObserver.Run(ctx); runErr != nil { + logger.Warn("couldn't run health observer", slog.Any("error", runErr)) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-ctx.Done(): + return + case <-router.Running(): + logger.Info("consumer is up and running", slog.String("name", name)) + } + + waitForShutdownSignal(ctx) + + if closeErr := router.Close(); closeErr != nil { + logger.Warn("couldn't close router", slog.Any("error", err)) + } + }() + + if err = router.Run(ctx); err != nil { + cancel() + return fmt.Errorf("couldn't run router: %w", err) + } + + return nil +} + +const ( + watermillRouterCloseTimeout = 30 * time.Second + watermillRetryMWMaxRetries = 3 + watermillRetryMWMInitialInterval = 3 * time.Second + watermillRetryMWMMaxInterval = 10 * time.Second + watermillRetryMWMIntervalMultiplier = 1.5 +) + +func newWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) { + router, err := message.NewRouter(message.RouterConfig{ + CloseTimeout: watermillRouterCloseTimeout, + }, logger) + if err != nil { + return nil, err + } + + router.AddMiddleware(middleware.Recoverer) + router.AddMiddleware(middleware.Retry{ + MaxRetries: watermillRetryMWMaxRetries, + InitialInterval: watermillRetryMWMInitialInterval, + MaxInterval: watermillRetryMWMMaxInterval, + Multiplier: watermillRetryMWMIntervalMultiplier, + Logger: logger, + }.Middleware) + + return router, nil +} diff --git a/cmd/twhelp/cmd_db.go b/cmd/twhelp/cmd_db.go index d0f9f1c..1508f20 100644 --- a/cmd/twhelp/cmd_db.go +++ b/cmd/twhelp/cmd_db.go @@ -32,7 +32,7 @@ func init() { var cmdDB = &cli.Command{ Name: "db", - Usage: "Manages database migrations", + Usage: "Manage the database", Subcommands: []*cli.Command{ { Name: "migrate", diff --git a/cmd/twhelp/twhelp.go b/cmd/twhelp/twhelp.go new file mode 100644 index 0000000..1a5622f --- /dev/null +++ b/cmd/twhelp/twhelp.go @@ -0,0 +1,57 @@ +package main + +import ( + "bytes" + "fmt" + "net/http" + "text/template" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" + "gitea.dwysokinski.me/twhelp/corev3/internal/tw" + "github.com/urfave/cli/v2" +) + +var ( + twSvcFlagUserAgent = &cli.StringFlag{ + Name: "tw.userAgent", + Value: "tribalwarshelp/{{ .AppVersion }}", + EnvVars: []string{"TW_USER_AGENT"}, + } + twSvcFlagTimeout = &cli.DurationFlag{ + Name: "tw.timeout", + Value: 10 * time.Second, //nolint:gomnd + EnvVars: []string{"TW_TIMEOUT"}, + Usage: "https://pkg.go.dev/net/http#Client.Timeout", + } + twSvcFlags = []cli.Flag{ + twSvcFlagUserAgent, + twSvcFlagTimeout, + } +) + +func newTWServiceFromFlags(c *cli.Context) (*adapter.TWHTTP, error) { + return newTWService(c.App.Version, c.String(twSvcFlagUserAgent.Name), c.Duration(twSvcFlagTimeout.Name)) +} + +func newTWService(appVersion, userAgent string, timeout time.Duration) (*adapter.TWHTTP, error) { + t, err := template.New("").Parse(userAgent) + if err != nil { + return nil, fmt.Errorf("something went wron while parsing user agent template: %w", err) + } + + buf := bytes.NewBuffer(nil) + + if err = t.Execute(buf, map[string]any{ + "AppVersion": appVersion, + }); err != nil { + return nil, fmt.Errorf("couldn't exec user agent template: %w", err) + } + + return adapter.NewTWHTTP(tw.NewClient( + tw.WithHTTPClient(&http.Client{ + Timeout: timeout, + }), + tw.WithUserAgent(buf.String()), + )), nil +} diff --git a/cmd/twhelp/utils.go b/cmd/twhelp/utils.go index c0094ed..4e4c0d6 100644 --- a/cmd/twhelp/utils.go +++ b/cmd/twhelp/utils.go @@ -32,3 +32,9 @@ var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} func newShutdownSignalContext(parent context.Context) (context.Context, context.CancelFunc) { return signal.NotifyContext(parent, shutdownSignals...) } + +func waitForShutdownSignal(ctx context.Context) { + ctx, cancel := newShutdownSignalContext(ctx) + defer cancel() + <-ctx.Done() +} diff --git a/go.mod b/go.mod index 1666120..5d8a300 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/ThreeDotsLabs/watermill v1.3.5 github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 + github.com/brianvoe/gofakeit/v6 v6.26.3 github.com/cenkalti/backoff/v4 v4.2.1 github.com/elliotchance/phpserialize v1.3.3 github.com/go-chi/chi/v5 v5.0.10 @@ -57,6 +58,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect + github.com/sony/gobreaker v0.5.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 54504da..31c1af3 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 h1:OxvMB2/3YtcQuC7quC+CGmFpGz9oaxP2ef5wkp+R2oM= github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1/go.mod h1:MCNoh0HUg4w0bY64on9BnhUodHeimz8+vMfXrzyuWN8= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -130,6 +132,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/internal/adapter/adaptertest/testing_t.go b/internal/adapter/adaptertest/testing_tb.go similarity index 100% rename from internal/adapter/adaptertest/testing_t.go rename to internal/adapter/adaptertest/testing_tb.go diff --git a/internal/adapter/http_tw.go b/internal/adapter/http_tw.go new file mode 100644 index 0000000..d874545 --- /dev/null +++ b/internal/adapter/http_tw.go @@ -0,0 +1,41 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/tw" +) + +type TWHTTP struct { + client *tw.Client +} + +func NewTWHTTP(client *tw.Client) *TWHTTP { + return &TWHTTP{client: client} +} + +func (t *TWHTTP) GetOpenServers(ctx context.Context, baseURL string) (domain.BaseServers, error) { + servers, err := t.client.GetOpenServers(ctx, baseURL) + if err != nil { + return nil, err + } + + return t.convertServersToDomain(servers) +} + +func (t *TWHTTP) convertServersToDomain(servers []tw.Server) (domain.BaseServers, error) { + res := make(domain.BaseServers, 0, len(servers)) + + for _, s := range servers { + converted, err := domain.NewBaseServer(s.Key, s.URL, true) + if err != nil { + return nil, fmt.Errorf("couldn't construct domain.BaseServer: %w", err) + } + + res = append(res, converted) + } + + return res, nil +} diff --git a/internal/app/service_server.go b/internal/app/service_server.go new file mode 100644 index 0000000..0f08b9c --- /dev/null +++ b/internal/app/service_server.go @@ -0,0 +1,28 @@ +package app + +import ( + "context" + "fmt" + "log" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type ServerService struct { + twSvc TWService +} + +func NewServerService(twSvc TWService) *ServerService { + return &ServerService{twSvc: twSvc} +} + +func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error { + openServers, err := svc.twSvc.GetOpenServers(ctx, payload.URL().String()) + if err != nil { + return fmt.Errorf("couldn't get open servers for version code '%s': %w", payload.VersionCode(), err) + } + + log.Println(openServers) + + return nil +} diff --git a/internal/app/service_tw.go b/internal/app/service_tw.go new file mode 100644 index 0000000..cdf90d7 --- /dev/null +++ b/internal/app/service_tw.go @@ -0,0 +1,11 @@ +package app + +import ( + "context" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type TWService interface { + GetOpenServers(ctx context.Context, baseURL string) (domain.BaseServers, error) +} diff --git a/internal/domain/base_server.go b/internal/domain/base_server.go new file mode 100644 index 0000000..be85f7c --- /dev/null +++ b/internal/domain/base_server.go @@ -0,0 +1,50 @@ +package domain + +import "net/url" + +type BaseServer struct { + key string + url *url.URL + open bool +} + +func NewBaseServer(key, rawURL string, open bool) (BaseServer, error) { + if err := validateServerKey(key); err != nil { + return BaseServer{}, ValidationError{ + Err: err, + Field: "key", + } + } + + u, err := parseURL(rawURL) + if err != nil { + return BaseServer{}, ValidationError{ + Err: err, + Field: "url", + } + } + + return BaseServer{ + key: key, + url: u, + open: open, + }, nil +} + +func (b BaseServer) Key() string { + return b.key +} + +func (b BaseServer) URL() *url.URL { + return b.url +} + +func (b BaseServer) Open() bool { + return b.open +} + +func (b BaseServer) IsZero() bool { + return b == BaseServer{} +} + +type BaseServers []BaseServer diff --git a/internal/domain/base_server_test.go b/internal/domain/base_server_test.go new file mode 100644 index 0000000..6cbe3a3 --- /dev/null +++ b/internal/domain/base_server_test.go @@ -0,0 +1,83 @@ +package domain_test + +import ( + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewBaseServer(t *testing.T) { + t.Parallel() + + validBaseServer := domaintest.NewBaseServer(t, domaintest.BaseServerConfig{}) + + type args struct { + key string + url string + open bool + } + + type test struct { + name string + args args + expectedErr error + } + + tests := []test{ + { + name: "OK", + args: args{ + key: validBaseServer.Key(), + url: validBaseServer.URL().String(), + open: validBaseServer.Open(), + }, + }, + { + name: "ERR: invalid URL", + args: args{ + key: validBaseServer.Key(), + url: " ", + open: validBaseServer.Open(), + }, + expectedErr: domain.ValidationError{ + Err: domain.InvalidURLError{URL: " "}, + Field: "url", + }, + }, + } + + for _, serverKeyTest := range newServerKeyValidationTests() { + tests = append(tests, test{ + name: serverKeyTest.name, + args: args{ + key: serverKeyTest.key, + url: validBaseServer.URL().String(), + open: validBaseServer.Open(), + }, + expectedErr: domain.ValidationError{ + Err: serverKeyTest.expectedErr, + Field: "key", + }, + }) + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + res, err := domain.NewBaseServer(tt.args.key, tt.args.url, tt.args.open) + require.ErrorIs(t, err, tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.key, res.Key()) + assert.Equal(t, tt.args.url, res.URL().String()) + assert.Equal(t, tt.args.open, res.Open()) + }) + } +} diff --git a/internal/domain/building_info.go b/internal/domain/building_info.go new file mode 100644 index 0000000..6cb58e9 --- /dev/null +++ b/internal/domain/building_info.go @@ -0,0 +1,144 @@ +package domain + +type Building struct { + MaxLevel int + MinLevel int + Wood int + Stone int + Iron int + Pop int + WoodFactor float64 + StoneFactor float64 + IronFactor float64 + PopFactor float64 + BuildTime float64 + BuildTimeFactor float64 +} + +type BuildingInfo struct { + main Building + barracks Building + stable Building + garage Building + watchtower Building + snob Building + smith Building + place Building + statue Building + market Building + wood Building + stone Building + iron Building + farm Building + storage Building + hide Building + wall Building +} + +func NewBuildingInfo( + main Building, + barracks Building, + stable Building, + garage Building, + watchtower Building, + snob Building, + smith Building, + place Building, + statue Building, + market Building, + wood Building, + stone Building, + iron Building, + farm Building, + storage Building, + hide Building, + wall Building, +) (BuildingInfo, error) { + return BuildingInfo{ + main: main, + barracks: barracks, + stable: stable, + garage: garage, + watchtower: watchtower, + snob: snob, + smith: smith, + place: place, + statue: statue, + market: market, + wood: wood, + stone: stone, + iron: iron, + farm: farm, + storage: storage, + hide: hide, + wall: wall, + }, nil +} + +func (b BuildingInfo) Main() Building { + return b.main +} + +func (b BuildingInfo) Barracks() Building { + return b.barracks +} + +func (b BuildingInfo) Stable() Building { + return b.stable +} + +func (b BuildingInfo) Garage() Building { + return b.garage +} + +func (b BuildingInfo) Watchtower() Building { + return b.watchtower +} + +func (b BuildingInfo) Snob() Building { + return b.snob +} + +func (b BuildingInfo) Smith() Building { + return b.smith +} + +func (b BuildingInfo) Place() Building { + return b.place +} + +func (b BuildingInfo) Statue() Building { + return b.statue +} + +func (b BuildingInfo) Market() Building { + return b.market +} + +func (b BuildingInfo) Wood() Building { + return b.wood +} + +func (b BuildingInfo) Stone() Building { + return b.stone +} + +func (b BuildingInfo) Iron() Building { + return b.iron +} + +func (b BuildingInfo) Farm() Building { + return b.farm +} + +func (b BuildingInfo) Storage() Building { + return b.storage +} + +func (b BuildingInfo) Hide() Building { + return b.hide +} + +func (b BuildingInfo) Wall() Building { + return b.wall +} diff --git a/internal/domain/domaintest/base_server.go b/internal/domain/domaintest/base_server.go new file mode 100644 index 0000000..13d5552 --- /dev/null +++ b/internal/domain/domaintest/base_server.go @@ -0,0 +1,34 @@ +package domaintest + +import ( + "net/url" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/require" +) + +type BaseServerConfig struct { + Key string + URL *url.URL +} + +func (cfg *BaseServerConfig) init() { + if cfg.Key == "" { + cfg.Key = gofakeit.LetterN(5) + } + + if cfg.URL == nil { + cfg.URL = &url.URL{ + Scheme: "https", + Host: cfg.Key + ".plemiona.pl", + } + } +} + +func NewBaseServer(tb TestingTB, cfg BaseServerConfig) domain.BaseServer { + cfg.init() + s, err := domain.NewBaseServer(cfg.Key, cfg.URL.String(), true) + require.NoError(tb, err) + return s +} diff --git a/internal/domain/domaintest/testing_tb.go b/internal/domain/domaintest/testing_tb.go new file mode 100644 index 0000000..a3d8f7c --- /dev/null +++ b/internal/domain/domaintest/testing_tb.go @@ -0,0 +1,9 @@ +package domaintest + +// TestingTB is a subset of the API provided by both *testing.T and *testing.B. +type TestingTB interface { + Helper() + Errorf(format string, args ...interface{}) + FailNow() + Cleanup(f func()) +} diff --git a/internal/domain/server_config.go b/internal/domain/server_config.go new file mode 100644 index 0000000..3df9592 --- /dev/null +++ b/internal/domain/server_config.go @@ -0,0 +1,256 @@ +package domain + +type ServerConfigBuild struct { + Destroy int +} + +type ServerConfigMisc struct { + KillRanking int + Tutorial int + TradeCancelTime int +} + +type ServerConfigCommands struct { + MillisArrival int + CommandCancelTime int +} + +type ServerConfigNewbie struct { + Days int + RatioDays int + Ratio int + RemoveNewbieVillages int +} + +type ServerConfigGame struct { + BuildtimeFormula int + Knight int + KnightNewItems int + Archer int + Tech int + FarmLimit int + Church int + Watchtower int + Stronghold int + FakeLimit float64 + BarbarianRise float64 + BarbarianShrink int + BarbarianMaxPoints int + Scavenging int + Hauls int + HaulsBase int + HaulsMax int + BaseProduction int + Event int + SuppressEvents int +} + +type ServerConfigBuildings struct { + CustomMain int + CustomFarm int + CustomStorage int + CustomPlace int + CustomBarracks int + CustomChurch int + CustomSmith int + CustomWood int + CustomStone int + CustomIron int + CustomMarket int + CustomStable int + CustomWall int + CustomGarage int + CustomHide int + CustomSnob int + CustomStatue int + CustomWatchtower int +} + +type ServerConfigSnob struct { + Gold int + CheapRebuild int + Rise int + MaxDist int + Factor float64 + CoinWood int + CoinStone int + CoinIron int + NoBarbConquer int +} + +type ServerConfigAlly struct { + NoHarm int + NoOtherSupport int + NoOtherSupportType int + AllytimeSupport int + NoLeave int + NoJoin int + Limit int + FixedAllies int + PointsMemberCount int + WarsMemberRequirement int + WarsPointsRequirement int + WarsAutoacceptDays int + Levels int + XpRequirements string +} + +type ServerConfigCoord struct { + MapSize int + Func int + EmptyVillages int + BonusVillages int + BonusNew int + Inner int + SelectStart int + VillageMoveWait int + NobleRestart int + StartVillages int +} + +type ServerConfigSitter struct { + Allow int +} + +type ServerConfigSleep struct { + Active int + Delay int + Min int + Max int + MinAwake int + MaxAwake int + WarnTime int +} + +type ServerConfigNight struct { + Active int + StartHour int + EndHour int + DefFactor float64 + Duration int +} + +type ServerConfigWin struct { + Check int +} + +type ServerConfig struct { + speed float64 + unitSpeed float64 + moral int + build ServerConfigBuild + misc ServerConfigMisc + commands ServerConfigCommands + newbie ServerConfigNewbie + game ServerConfigGame + buildings ServerConfigBuildings + snob ServerConfigSnob + ally ServerConfigAlly + coord ServerConfigCoord + sitter ServerConfigSitter + sleep ServerConfigSleep + night ServerConfigNight + win ServerConfigWin +} + +func NewServerConfig( + speed float64, + unitSpeed float64, + moral int, + build ServerConfigBuild, + misc ServerConfigMisc, + commands ServerConfigCommands, + newbie ServerConfigNewbie, + game ServerConfigGame, + buildings ServerConfigBuildings, + snob ServerConfigSnob, + ally ServerConfigAlly, + coord ServerConfigCoord, + sitter ServerConfigSitter, + sleep ServerConfigSleep, + night ServerConfigNight, + win ServerConfigWin, +) (ServerConfig, error) { + return ServerConfig{ + speed: speed, + unitSpeed: unitSpeed, + moral: moral, + build: build, + misc: misc, + commands: commands, + newbie: newbie, + game: game, + buildings: buildings, + snob: snob, + ally: ally, + coord: coord, + sitter: sitter, + sleep: sleep, + night: night, + win: win, + }, nil +} + +func (s ServerConfig) Speed() float64 { + return s.speed +} + +func (s ServerConfig) UnitSpeed() float64 { + return s.unitSpeed +} + +func (s ServerConfig) Moral() int { + return s.moral +} + +func (s ServerConfig) Build() ServerConfigBuild { + return s.build +} + +func (s ServerConfig) Misc() ServerConfigMisc { + return s.misc +} + +func (s ServerConfig) Commands() ServerConfigCommands { + return s.commands +} + +func (s ServerConfig) Newbie() ServerConfigNewbie { + return s.newbie +} + +func (s ServerConfig) Game() ServerConfigGame { + return s.game +} + +func (s ServerConfig) Buildings() ServerConfigBuildings { + return s.buildings +} + +func (s ServerConfig) Snob() ServerConfigSnob { + return s.snob +} + +func (s ServerConfig) Ally() ServerConfigAlly { + return s.ally +} + +func (s ServerConfig) Coord() ServerConfigCoord { + return s.coord +} + +func (s ServerConfig) Sitter() ServerConfigSitter { + return s.sitter +} + +func (s ServerConfig) Sleep() ServerConfigSleep { + return s.sleep +} + +func (s ServerConfig) Night() ServerConfigNight { + return s.night +} + +func (s ServerConfig) Win() ServerConfigWin { + return s.win +} diff --git a/internal/domain/server_test.go b/internal/domain/server_test.go index a45ed75..20f85ad 100644 --- a/internal/domain/server_test.go +++ b/internal/domain/server_test.go @@ -99,3 +99,30 @@ func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) { }) } } + +type serverKeyValidationTest struct { + name string + key string + expectedErr error +} + +func newServerKeyValidationTests() []serverKeyValidationTest { + return []serverKeyValidationTest{ + { + name: "ERR: server key length < 1", + expectedErr: domain.LenError{ + Min: 1, + Max: 10, + }, + }, + { + name: "ERR: server key length > 10", + key: "keykeykeyke", + expectedErr: domain.LenError{ + Min: 1, + Max: 10, + Current: len("keykeykeyke"), + }, + }, + } +} diff --git a/internal/domain/unit_info.go b/internal/domain/unit_info.go new file mode 100644 index 0000000..4cba101 --- /dev/null +++ b/internal/domain/unit_info.go @@ -0,0 +1,112 @@ +package domain + +type Unit struct { + BuildTime float64 + Pop int + Speed float64 + Attack int + Defense int + DefenseCavalry int + DefenseArcher int + Carry int +} + +type UnitInfo struct { + spear Unit + sword Unit + axe Unit + archer Unit + spy Unit + light Unit + marcher Unit + heavy Unit + ram Unit + catapult Unit + knight Unit + snob Unit + militia Unit +} + +func NewUnitInfo( + spear Unit, + sword Unit, + axe Unit, + archer Unit, + spy Unit, + light Unit, + marcher Unit, + heavy Unit, + ram Unit, + catapult Unit, + knight Unit, + snob Unit, + militia Unit, +) (UnitInfo, error) { + return UnitInfo{ + spear: spear, + sword: sword, + axe: axe, + archer: archer, + spy: spy, + light: light, + marcher: marcher, + heavy: heavy, + ram: ram, + catapult: catapult, + knight: knight, + snob: snob, + militia: militia, + }, nil +} + +func (u UnitInfo) Spear() Unit { + return u.spear +} + +func (u UnitInfo) Sword() Unit { + return u.sword +} + +func (u UnitInfo) Axe() Unit { + return u.axe +} + +func (u UnitInfo) Archer() Unit { + return u.archer +} + +func (u UnitInfo) Spy() Unit { + return u.spy +} + +func (u UnitInfo) Light() Unit { + return u.light +} + +func (u UnitInfo) Marcher() Unit { + return u.marcher +} + +func (u UnitInfo) Heavy() Unit { + return u.heavy +} + +func (u UnitInfo) Ram() Unit { + return u.ram +} + +func (u UnitInfo) Catapult() Unit { + return u.catapult +} + +func (u UnitInfo) Knight() Unit { + return u.knight +} + +func (u UnitInfo) Snob() Unit { + return u.snob +} + +func (u UnitInfo) Militia() Unit { + return u.militia +} diff --git a/internal/domain/validators.go b/internal/domain/validators.go index 6119480..0f349db 100644 --- a/internal/domain/validators.go +++ b/internal/domain/validators.go @@ -43,3 +43,20 @@ func validateSliceLen[S ~[]E, E any](s S, min, max int) error { return nil } + +const ( + serverKeyMinLength = 1 + serverKeyMaxLength = 10 +) + +func validateServerKey(key string) error { + if l := len(key); l < serverKeyMinLength || l > serverKeyMaxLength { + return LenError{ + Min: serverKeyMinLength, + Max: serverKeyMaxLength, + Current: l, + } + } + + return nil +} diff --git a/internal/health/health_test.go b/internal/health/health_test.go index f7626be..95dfce5 100644 --- a/internal/health/health_test.go +++ b/internal/health/health_test.go @@ -7,6 +7,7 @@ import ( "time" "gitea.dwysokinski.me/twhelp/corev3/internal/health" + "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" @@ -32,9 +33,9 @@ func TestHealth_CheckReady(t *testing.T) { { name: "3 checks passed", options: []health.Option{ - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test2", err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: nil}), health.WithMaxConcurrent(2), }, expectedResult: health.Result{ @@ -59,9 +60,9 @@ func TestHealth_CheckReady(t *testing.T) { { name: "2 checks passed, 1 check failed", options: []health.Option{ - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test2", err: errCheckFailed}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: errCheckFailed}), }, expectedResult: health.Result{ Status: health.StatusFail, @@ -121,9 +122,9 @@ func TestHealth_CheckLive(t *testing.T) { { name: "3 checks passed", options: []health.Option{ - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test2", err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: nil}), health.WithMaxConcurrent(2), }, expectedResult: health.Result{ @@ -148,9 +149,9 @@ func TestHealth_CheckLive(t *testing.T) { { name: "2 checks passed, 1 check failed", options: []health.Option{ - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test2", err: errCheckFailed}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: errCheckFailed}), }, expectedResult: health.Result{ Status: health.StatusFail, @@ -191,16 +192,3 @@ func TestHealth_CheckLive(t *testing.T) { }) } } - -type checker struct { - name string - err error -} - -func (c checker) Name() string { - return c.name -} - -func (c checker) Check(_ context.Context) error { - return c.err -} diff --git a/internal/health/healthfile/observer.go b/internal/health/healthfile/observer.go new file mode 100644 index 0000000..e5ec47e --- /dev/null +++ b/internal/health/healthfile/observer.go @@ -0,0 +1,147 @@ +package healthfile + +import ( + "context" + "errors" + "os" + "sync" + "sync/atomic" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/health" +) + +type checkFunc func(ctx context.Context) health.Result + +type Observer struct { + runningMu sync.Mutex + wg sync.WaitGroup + running atomic.Int32 + closeCh chan struct{} + path string + check checkFunc + checkInterval time.Duration + checkTimeout time.Duration +} + +type Option func(o *Observer) + +func WithCheckInterval(interval time.Duration) Option { + return func(o *Observer) { + o.checkInterval = interval + } +} + +func WithCheckTimeout(timeout time.Duration) Option { + return func(o *Observer) { + o.checkTimeout = timeout + } +} + +func LiveObserver(h *health.Health, path string, opts ...Option) *Observer { + return newObserver(path, h.CheckLive, opts...) +} + +func ReadyObserver(h *health.Health, path string, opts ...Option) *Observer { + return newObserver(path, h.CheckReady, opts...) +} + +const ( + checkTimeoutDefault = 5 * time.Second + checkIntervalDefault = 15 * time.Second +) + +func newObserver(path string, check checkFunc, opts ...Option) *Observer { + o := &Observer{ + path: path, + check: check, + checkTimeout: checkTimeoutDefault, + checkInterval: checkIntervalDefault, + } + + for _, opt := range opts { + opt(o) + } + + return o +} + +var ErrAlreadyRunning = errors.New("observer is already running") + +func (o *Observer) Run(ctx context.Context) error { + o.runningMu.Lock() + if !o.running.CompareAndSwap(0, 1) { + o.runningMu.Unlock() + return ErrAlreadyRunning + } + o.wg.Add(1) + o.closeCh = make(chan struct{}) + o.runningMu.Unlock() + defer func() { + o.running.Add(-1) + o.wg.Done() + }() + + ticker := time.NewTicker(o.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-o.closeCh: + return nil + default: + } + + o.tick(ctx) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-o.closeCh: + return nil + case <-ticker.C: + } + } +} + +const filePerm = 0644 + +func (o *Observer) tick(parent context.Context) { + ctx, cancel := context.WithTimeout(parent, o.checkTimeout) + defer cancel() + + res := o.check(ctx) + fileExists := checkFileExists(o.path) + + switch { + case res.Status == health.StatusPass && !fileExists: + _ = os.WriteFile(o.path, []byte("pass"), filePerm) + case res.Status == health.StatusFail && fileExists: + _ = os.Remove(o.path) + } +} + +func (o *Observer) Path() string { + return o.path +} + +func (o *Observer) Close() error { + o.runningMu.Lock() + defer o.runningMu.Unlock() + + if o.running.Load() == 0 { + return nil + } + + close(o.closeCh) + o.wg.Wait() + + return nil +} + +func checkFileExists(filePath string) bool { + _, err := os.Stat(filePath) + return !errors.Is(err, os.ErrNotExist) +} diff --git a/internal/health/healthfile/observer_test.go b/internal/health/healthfile/observer_test.go new file mode 100644 index 0000000..c47c200 --- /dev/null +++ b/internal/health/healthfile/observer_test.go @@ -0,0 +1,144 @@ +package healthfile_test + +import ( + "context" + "errors" + "path" + "sync" + "testing" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/health" + "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthfile" + "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLiveObserver(t *testing.T) { + t.Parallel() + + testObserver(t, func(t *testing.T) *healthfile.Observer { + t.Helper() + + h := health.New( + health.WithLiveCheck(healthtest.Checker{Nam: "test"}), + health.WithLiveCheck(healthtest.Checker{Nam: "test2"}), + ) + + dir := t.TempDir() + livePath := path.Join(dir, "live") + + o := healthfile.LiveObserver(h, livePath) + t.Cleanup(func() { + _ = o.Close() + }) + + return o + }) +} + +func TestReadyObserver(t *testing.T) { + t.Parallel() + + testObserver(t, func(t *testing.T) *healthfile.Observer { + t.Helper() + + h := health.New( + health.WithReadyCheck(healthtest.Checker{Nam: "test"}), + health.WithReadyCheck(healthtest.Checker{Nam: "test2"}), + ) + + dir := t.TempDir() + readyPath := path.Join(dir, "ready") + + o := healthfile.ReadyObserver(h, readyPath) + t.Cleanup(func() { + _ = o.Close() + }) + + return o + }) +} + +func testObserver(t *testing.T, newObserver func(t *testing.T) *healthfile.Observer) { + t.Helper() + + t.Run("standard run/close path", func(t *testing.T) { + t.Parallel() + + o := newObserver(t) + + observerStopped := make(chan struct{}) + go func() { + defer close(observerStopped) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _ = o.Run(ctx) + }() + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.FileExists(collect, o.Path()) + }, time.Second, 10*time.Millisecond) + + require.NoError(t, o.Close()) + + select { + case <-observerStopped: + // OK + default: + t.Error("Run goroutine not stopped") + } + }) + + t.Run("observer respects ctx.Done", func(t *testing.T) { + t.Parallel() + + o := newObserver(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + require.ErrorIs(t, o.Run(ctx), context.DeadlineExceeded) + }) + + t.Run("run can only be called once", func(t *testing.T) { + t.Parallel() + + expectedErrs := 2 + + o := newObserver(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var wg sync.WaitGroup + errCh := make(chan error) + + for i := 1; i <= 1+expectedErrs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + errCh <- o.Run(ctx) + }() + } + + go func() { + wg.Wait() + close(errCh) + }() + + var cnt int + + for err := range errCh { + if errors.Is(err, healthfile.ErrAlreadyRunning) { + cnt++ + } + + if cnt == expectedErrs { + require.NoError(t, o.Close()) + } + } + + assert.Equal(t, expectedErrs, cnt) + }) +} diff --git a/internal/health/healthhttp/handler_test.go b/internal/health/healthhttp/handler_test.go index 61626fa..c170f24 100644 --- a/internal/health/healthhttp/handler_test.go +++ b/internal/health/healthhttp/handler_test.go @@ -1,7 +1,6 @@ package healthhttp_test import ( - "context" "encoding/json" "errors" "net/http" @@ -11,6 +10,7 @@ import ( "gitea.dwysokinski.me/twhelp/corev3/internal/health" "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthhttp" + "gitea.dwysokinski.me/twhelp/corev3/internal/health/healthtest" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" @@ -29,9 +29,9 @@ func TestReadyHandler(t *testing.T) { { name: "pass", options: []health.Option{ - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test2", err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: nil}), }, expectedStatus: http.StatusOK, expectedResponse: healthhttp.Response{ @@ -56,9 +56,9 @@ func TestReadyHandler(t *testing.T) { { name: "2 checks passed, 1 check failed", options: []health.Option{ - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test", err: nil}), - health.WithReadyCheck(checker{name: "test2", err: errors.New("failed")}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithReadyCheck(healthtest.Checker{Nam: "test2", Err: errors.New("failed")}), }, expectedStatus: http.StatusServiceUnavailable, expectedResponse: healthhttp.Response{ @@ -118,9 +118,9 @@ func TestLiveHandler(t *testing.T) { { name: "pass", options: []health.Option{ - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test2", err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: nil}), }, expectedStatus: http.StatusOK, expectedResponse: healthhttp.Response{ @@ -145,9 +145,9 @@ func TestLiveHandler(t *testing.T) { { name: "2 checks passed, 1 check failed", options: []health.Option{ - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test", err: nil}), - health.WithLiveCheck(checker{name: "test2", err: errors.New("failed")}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test", Err: nil}), + health.WithLiveCheck(healthtest.Checker{Nam: "test2", Err: errors.New("failed")}), }, expectedStatus: http.StatusServiceUnavailable, expectedResponse: healthhttp.Response{ @@ -194,16 +194,3 @@ func TestLiveHandler(t *testing.T) { }) } } - -type checker struct { - name string - err error -} - -func (c checker) Name() string { - return c.name -} - -func (c checker) Check(_ context.Context) error { - return c.err -} diff --git a/internal/health/healthtest/checker.go b/internal/health/healthtest/checker.go new file mode 100644 index 0000000..9e07b3d --- /dev/null +++ b/internal/health/healthtest/checker.go @@ -0,0 +1,25 @@ +package healthtest + +import ( + "context" + + "gitea.dwysokinski.me/twhelp/corev3/internal/health" +) + +// Checker is a mock implementation of health.Checker. +type Checker struct { + // Nam is later returned in the Name method + Nam string + // Err is later returned in the Check method + Err error +} + +var _ health.Checker = Checker{} + +func (c Checker) Name() string { + return c.Nam +} + +func (c Checker) Check(_ context.Context) error { + return c.Err +} diff --git a/internal/port/consumer_watermill_server.go b/internal/port/consumer_watermill_server.go new file mode 100644 index 0000000..d301fec --- /dev/null +++ b/internal/port/consumer_watermill_server.go @@ -0,0 +1,58 @@ +package port + +import ( + "gitea.dwysokinski.me/twhelp/corev3/internal/app" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type ServerWatermillConsumer struct { + svc *app.ServerService + subscriber message.Subscriber + logger watermill.LoggerAdapter + marshaler watermillmsg.Marshaler + cmdSyncTopic string +} + +func NewServerWatermillConsumer( + svc *app.ServerService, + subscriber message.Subscriber, + logger watermill.LoggerAdapter, + marshaler watermillmsg.Marshaler, + cmdSyncTopic string, +) *ServerWatermillConsumer { + return &ServerWatermillConsumer{ + svc: svc, + subscriber: subscriber, + logger: logger, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + } +} + +func (c *ServerWatermillConsumer) Register(router *message.Router) { + router.AddNoPublisherHandler("ServerConsumer.sync", c.cmdSyncTopic, c.subscriber, c.sync) +} + +func (c *ServerWatermillConsumer) sync(msg *message.Message) error { + var rawPayload watermillmsg.SyncServersCmdPayload + + if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { + c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + payload, err := domain.NewSyncServersCmdPayloadWithStringURL(rawPayload.VersionCode, rawPayload.URL) + if err != nil { + c.logger.Error("couldn't construct domain.SyncServersCmdPayload", err, watermill.LogFields{ + "handler": message.HandlerNameFromCtx(msg.Context()), + }) + return nil + } + + return c.svc.Sync(msg.Context(), payload) +} diff --git a/internal/tw/unit_info.go b/internal/tw/unit_info.go index 852f4b6..a1fcc47 100644 --- a/internal/tw/unit_info.go +++ b/internal/tw/unit_info.go @@ -26,7 +26,7 @@ type UnitInfo struct { Light Unit `xml:"light"` Marcher Unit `xml:"marcher"` Heavy Unit `xml:"heavy"` - Ram Unit `xml:"ram"` //nolint:revive + Ram Unit `xml:"ram"` Catapult Unit `xml:"catapult"` Knight Unit `xml:"knight"` Snob Unit `xml:"snob"` diff --git a/k8s/base/kustomization.yml b/k8s/base/kustomization.yml index f43cae7..07244cb 100644 --- a/k8s/base/kustomization.yml +++ b/k8s/base/kustomization.yml @@ -2,6 +2,7 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - jobs.yml + - server-consumer.yml images: - name: twhelp newName: twhelp diff --git a/k8s/base/server-consumer.yml b/k8s/base/server-consumer.yml new file mode 100644 index 0000000..4587103 --- /dev/null +++ b/k8s/base/server-consumer.yml @@ -0,0 +1,44 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: twhelp-server-consumer-deployment +spec: + selector: + matchLabels: + app: twhelp-server-consumer + template: + metadata: + labels: + app: twhelp-server-consumer + spec: + containers: + - name: twhelp-server-consumer + image: twhelp + args: [consumer, server] + env: + - name: APP_MODE + value: development + - name: LOG_LEVEL + value: debug + - name: DB_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: db-connection-string + - name: RABBITMQ_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: rabbitmq-connection-string + livenessProbe: + exec: + command: [cat, /tmp/live] + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 300m + memory: 300Mi