feat: tracing #76

Merged
Kichiyaki merged 18 commits from feat/tracing into master 2022-09-12 05:34:47 +00:00
52 changed files with 1606 additions and 112 deletions

View File

@ -1,5 +1,6 @@
run:
tests: true
timeout: 3m
linters:
disable-all: true

View File

@ -6,6 +6,8 @@ import (
"fmt"
"time"
"github.com/uptrace/bun/extra/bunotel"
"github.com/kelseyhightower/envconfig"
"github.com/uptrace/bun/driver/pgdriver"
@ -41,6 +43,8 @@ func NewBunDB() (*bun.DB, error) {
return nil, fmt.Errorf("db.PingContext: %w", err)
}
db.AddQueryHook(bunotel.NewQueryHook())
return db, nil
}

View File

@ -8,6 +8,8 @@ import (
"syscall"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/msgotel"
"gitea.dwysokinski.me/twhelp/core/internal/msg"
"gitea.dwysokinski.me/twhelp/core/cmd/twhelp/internal"
@ -24,6 +26,8 @@ import (
)
const (
otelShutdownTimeout = 5 * time.Second
otelServiceNamePrefix = "twhelp.consumer."
routerCloseTimeout = 30 * time.Second
maxRetries = 3
retryInitialInterval = 3 * time.Second
@ -54,6 +58,7 @@ func newServerCommand() *cli.Command {
return runConsumer(
c,
"ServerConsumer",
"server",
func(
c *cli.Context,
router *message.Router,
@ -79,6 +84,7 @@ func newTribeCommand() *cli.Command {
return runConsumer(
c,
"TribeConsumer",
"tribe",
func(
c *cli.Context,
router *message.Router,
@ -104,6 +110,7 @@ func newPlayerCommand() *cli.Command {
return runConsumer(
c,
"PlayerConsumer",
"player",
func(
c *cli.Context,
router *message.Router,
@ -129,6 +136,7 @@ func newVillageCommand() *cli.Command {
return runConsumer(
c,
"VillageConsumer",
"village",
func(
c *cli.Context,
router *message.Router,
@ -154,6 +162,7 @@ func newEnnoblementCommand() *cli.Command {
return runConsumer(
c,
"EnnoblementConsumer",
"ennoblement",
func(
c *cli.Context,
router *message.Router,
@ -182,7 +191,17 @@ type registerHandlersFn func(
bunDB *bun.DB,
) error
func runConsumer(c *cli.Context, name string, registerHandlers registerHandlersFn) error {
func runConsumer(c *cli.Context, name, serviceNameSuffix string, registerHandlers registerHandlersFn) error {
shutdown, err := internal.InitOTEL(otelServiceNamePrefix+serviceNameSuffix, c.App.Version)
if err != nil {
return fmt.Errorf("internal.InitOTEL: %w", err)
}
defer func() {
ctx, cancel := context.WithTimeout(c.Context, otelShutdownTimeout)
defer cancel()
_ = shutdown(ctx)
}()
logger := zap.L()
bunDB, err := internal.NewBunDB()
@ -261,8 +280,12 @@ func newRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
if err != nil {
return nil, fmt.Errorf("message.NewRouter: %w", err)
}
router.AddMiddleware(msgotel.Middleware())
router.AddMiddleware(middleware.Recoverer)
router.AddMiddleware(newRetryMiddleware(logger).Middleware)
router.AddPublisherDecorators(func(pub message.Publisher) (message.Publisher, error) {
return msgotel.NewPublisherDecorator(pub), nil
})
return router, nil
}

View File

@ -1,12 +1,14 @@
package job
import (
"context"
"fmt"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/msgotel"
"gitea.dwysokinski.me/twhelp/core/internal/msg"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/cmd/twhelp/internal"
"gitea.dwysokinski.me/twhelp/core/internal/bundb"
"gitea.dwysokinski.me/twhelp/core/internal/service"
@ -15,6 +17,11 @@ import (
"go.uber.org/zap"
)
const (
otelShutdownTimeout = 5 * time.Second
otelServiceNamePrefix = "twhelp.job."
)
func New() *cli.Command {
return &cli.Command{
Name: "job",
@ -40,7 +47,15 @@ func newUpdateDataCommand() *cli.Command {
Name: "data",
Usage: "Launches data update (servers, players, tribes, villages)",
Action: func(c *cli.Context) error {
logger := zap.L()
shutdown, err := internal.InitOTEL(otelServiceNamePrefix+"update.data", c.App.Version)
if err != nil {
return fmt.Errorf("internal.InitOTEL: %w", err)
}
defer func() {
ctx, cancel := context.WithTimeout(c.Context, otelShutdownTimeout)
defer cancel()
_ = shutdown(ctx)
}()
db, err := internal.NewBunDB()
if err != nil {
@ -50,31 +65,26 @@ func newUpdateDataCommand() *cli.Command {
_ = db.Close()
}()
publisher, err := internal.NewAMQPPublisher(watermillzap.NewLogger(logger))
publisher, err := internal.NewAMQPPublisher(watermillzap.NewLogger(zap.L()))
if err != nil {
return fmt.Errorf("internal.NewAMQPPublisher: %w", err)
}
decoratedPublisher := msgotel.NewPublisherDecorator(publisher)
defer func() {
_ = decoratedPublisher.Close()
}()
marshaler := internal.NewCommandEventMarshaler()
versions, _, err := service.
NewVersion(bundb.NewVersion(db)).
List(c.Context, domain.ListVersionsParams{Count: false})
err = service.
NewUpdater(
service.NewVersion(bundb.NewVersion(db)),
service.NewServer(bundb.NewServer(db), internal.NewTWClient(c.App.Version)),
msg.NewServerPublisher(decoratedPublisher, marshaler),
msg.NewEnnoblementPublisher(decoratedPublisher, marshaler),
).
UpdateData(c.Context)
if err != nil {
return fmt.Errorf("VersionService.List: %w", err)
}
payloads := make([]domain.RefreshServersCmdPayload, 0, len(versions))
for _, v := range versions {
payloads = append(payloads, domain.RefreshServersCmdPayload{
Host: v.Host,
VersionCode: v.Code,
})
}
err = msg.
NewServerPublisher(publisher, internal.NewCommandEventMarshaler()).
CmdRefresh(c.Context, payloads...)
if err != nil {
return fmt.Errorf("ServerPublisher.CmdRefresh: %w", err)
return fmt.Errorf("Updater.UpdateData: %w", err)
}
return nil
@ -87,7 +97,15 @@ func newUpdateEnnoblementsCommand() *cli.Command {
Name: "ennoblements",
Usage: "Launches ennoblement update",
Action: func(c *cli.Context) error {
logger := zap.L()
shutdown, err := internal.InitOTEL(otelServiceNamePrefix+"update.ennoblements", c.App.Version)
if err != nil {
return fmt.Errorf("internal.InitOTEL: %w", err)
}
defer func() {
ctx, cancel := context.WithTimeout(c.Context, otelShutdownTimeout)
defer cancel()
_ = shutdown(ctx)
}()
db, err := internal.NewBunDB()
if err != nil {
@ -97,46 +115,26 @@ func newUpdateEnnoblementsCommand() *cli.Command {
_ = db.Close()
}()
publisher, err := internal.NewAMQPPublisher(watermillzap.NewLogger(logger))
publisher, err := internal.NewAMQPPublisher(watermillzap.NewLogger(zap.L()))
if err != nil {
return fmt.Errorf("internal.NewAMQPPublisher: %w", err)
}
decoratedPublisher := msgotel.NewPublisherDecorator(publisher)
defer func() {
_ = decoratedPublisher.Close()
}()
marshaler := internal.NewCommandEventMarshaler()
const paginationLimit = 2000
servers, _, err := service.
NewServer(bundb.NewServer(db), internal.NewTWClient(c.App.Version)).
List(c.Context, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
Open: domain.NullBool{
Bool: true,
Valid: true,
},
Pagination: domain.Pagination{
Limit: paginationLimit,
},
Count: false,
})
err = service.
NewUpdater(
service.NewVersion(bundb.NewVersion(db)),
service.NewServer(bundb.NewServer(db), internal.NewTWClient(c.App.Version)),
msg.NewServerPublisher(decoratedPublisher, marshaler),
msg.NewEnnoblementPublisher(decoratedPublisher, marshaler),
).
UpdateEnnoblements(c.Context)
if err != nil {
return fmt.Errorf("ServerService.List: %w", err)
}
payloads := make([]domain.RefreshEnnoblementsCmdPayload, 0, len(servers))
for _, srv := range servers {
payloads = append(payloads, domain.RefreshEnnoblementsCmdPayload{
Key: srv.Key,
URL: srv.URL,
VersionCode: srv.VersionCode,
})
}
err = msg.
NewEnnoblementPublisher(publisher, internal.NewCommandEventMarshaler()).
CmdRefresh(c.Context, payloads...)
if err != nil {
return fmt.Errorf("EnnoblementPublisher.CmdRefresh: %w", err)
return fmt.Errorf("Updater.UpdateEnnoblements: %w", err)
}
return nil

119
cmd/twhelp/internal/otel.go Normal file
View File

@ -0,0 +1,119 @@
package internal
import (
"context"
"fmt"
"time"
"github.com/Kichiyaki/appmode/v2"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"github.com/kelseyhightower/envconfig"
"go.opentelemetry.io/otel/exporters/jaeger"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
type otelConfig struct {
Enabled bool `envconfig:"ENABLED" default:"false"`
TracesExporter string `envconfig:"TRACES_EXPORTER" default:"stdout"`
TracesSamplerArg float64 `envconfig:"TRACES_SAMPLER_ARG" default:"0.1"` // Value passed to the sampler
}
func InitOTEL(serviceName, version string) (func(ctx context.Context) error, error) {
var cfg otelConfig
if err := envconfig.Process("OTEL", &cfg); err != nil {
return nil, fmt.Errorf("envconfig.Process: %w", err)
}
if !cfg.Enabled {
return noopShutdown, nil
}
tp, err := newTracerProvider(serviceName, version, cfg.TracesExporter, cfg.TracesSamplerArg)
if err != nil {
return nil, fmt.Errorf("newTracerProvider: %w", err)
}
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp.Shutdown, nil
}
func newTracerProvider(serviceName, version, exporter string, samplerArg float64) (*sdktrace.TracerProvider, error) {
exp, err := newTraceExporter(exporter)
if err != nil {
return nil, fmt.Errorf("newTraceExporter: %w", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(samplerArg)),
sdktrace.WithResource(
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(version),
semconv.DeploymentEnvironmentKey.String(appmode.Get()),
),
),
)
return tp, nil
}
func newTraceExporter(exporter string) (sdktrace.SpanExporter, error) {
switch exporter {
case "stdout":
return newStdoutTraceExporter()
case "jaeger":
return newJaegerTraceExporter()
default:
return nil, fmt.Errorf("unsupported exporter: %s", exporter)
}
}
type jaegerTraceExporterConfig struct {
AgentHost string `envconfig:"AGENT_HOST" required:"true"`
AgentPort string `envconfig:"AGENT_PORT" default:"6831"`
ReconnectingInterval time.Duration `envconfig:"RECONNECTING_INTERVAL" default:"2s"`
}
func newJaegerTraceExporter() (*jaeger.Exporter, error) {
var cfg jaegerTraceExporterConfig
if err := envconfig.Process("OTEL_JAEGER", &cfg); err != nil {
return nil, fmt.Errorf("envconfig.Process: %w", err)
}
exp, err := jaeger.New(
jaeger.WithAgentEndpoint(
jaeger.WithAgentHost(cfg.AgentHost),
jaeger.WithAgentPort(cfg.AgentPort),
jaeger.WithAttemptReconnectingInterval(cfg.ReconnectingInterval),
),
)
if err != nil {
return nil, fmt.Errorf("jaeger.New: %w", err)
}
return exp, nil
}
func newStdoutTraceExporter() (*stdouttrace.Exporter, error) {
exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
return nil, fmt.Errorf("stdouttrace.New: %w", err)
}
return exp, nil
}
func noopShutdown(ctx context.Context) error {
return nil
}

View File

@ -10,6 +10,8 @@ import (
"syscall"
"time"
"github.com/riandyrn/otelchi"
"github.com/kelseyhightower/envconfig"
"gitea.dwysokinski.me/twhelp/core/internal/rest"
@ -29,6 +31,8 @@ import (
)
const (
serviceName = "twhelp.api"
otelShutdownTimeout = 5 * time.Second
defaultPort = "9234"
readTimeout = 2 * time.Second
readHeaderTimeout = 2 * time.Second
@ -42,6 +46,16 @@ func New() *cli.Command {
Name: "serve",
Usage: "Runs the http server",
Action: func(c *cli.Context) error {
shutdown, err := internal.InitOTEL(serviceName, c.App.Version)
if err != nil {
return fmt.Errorf("internal.InitOTEL: %w", err)
}
defer func() {
ctx, cancel := context.WithTimeout(c.Context, otelShutdownTimeout)
defer cancel()
_ = shutdown(ctx)
}()
db, err := internal.NewBunDB()
if err != nil {
return fmt.Errorf("internal.NewBunDB: %w", err)
@ -158,6 +172,7 @@ func newAPIConfig() (apiConfig, error) {
func getChiMiddlewares(logger *zap.Logger) chi.Middlewares {
return chi.Middlewares{
middleware.RealIP,
otelchi.Middleware(serviceName),
chizap.Logger(logger),
middleware.Recoverer,
middleware.Heartbeat("/health"),

View File

@ -5,6 +5,8 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/core/internal/tw"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
)
const (
@ -13,7 +15,13 @@ const (
func NewTWClient(version string) *tw.Client {
return tw.NewClient(
tw.WithHTTPClient(&http.Client{Timeout: twClientTimeout}),
tw.WithHTTPClient(&http.Client{
Timeout: twClientTimeout,
Transport: otelhttp.NewTransport(
http.DefaultTransport,
otelhttp.WithPropagators(propagation.NewCompositeTextMapPropagator()),
),
}),
tw.WithUserAgent("tribalwarshelp/"+version),
)
}

14
go.mod
View File

@ -17,6 +17,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/kelseyhightower/envconfig v1.4.0
github.com/ory/dockertest/v3 v3.9.1
github.com/riandyrn/otelchi v0.4.0
github.com/stretchr/testify v1.8.0
github.com/swaggo/http-swagger v1.3.3
github.com/swaggo/swag v1.8.5
@ -24,7 +25,14 @@ require (
github.com/uptrace/bun/dbfixture v1.1.8
github.com/uptrace/bun/dialect/pgdialect v1.1.8
github.com/uptrace/bun/driver/pgdriver v1.1.8
github.com/uptrace/bun/extra/bunotel v1.1.8
github.com/urfave/cli/v2 v2.14.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0
go.opentelemetry.io/otel v1.9.0
go.opentelemetry.io/otel/exporters/jaeger v1.9.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0
go.opentelemetry.io/otel/sdk v1.9.0
go.opentelemetry.io/otel/trace v1.9.0
go.uber.org/zap v1.23.0
)
@ -41,6 +49,9 @@ require (
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/spec v0.20.6 // indirect
@ -67,12 +78,15 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.15 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opentelemetry.io/contrib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20220826181053-bd7e27e6170d // indirect

40
go.sum
View File

@ -52,6 +52,9 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/elliotchance/phpserialize v1.3.3 h1:hV4QVmGdCiYgoBbw+ADt6fNgyZ2mYX0OgpnON1adTCM=
github.com/elliotchance/phpserialize v1.3.3/go.mod h1:gt7XX9+ETUcLXbtTKEuyrqW3lcLUAeS/AnGZ2e49TZs=
github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/garsue/watermillzap v1.1.1 h1:qReFh5goSOoub8x0cc+iSaaeIMJF0m66nmdesqG7VBg=
github.com/garsue/watermillzap v1.1.1/go.mod h1:6wdF9VgwK7JMANUtlpUUfNwkrFRWQ4G+U/dgBcWpvTA=
@ -62,6 +65,14 @@ github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@ -87,6 +98,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
@ -171,6 +183,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk=
github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/riandyrn/otelchi v0.4.0 h1:btWguNWE2NluzNqRNaqAdJ809GKP1CjZEi8UP4AdYGw=
github.com/riandyrn/otelchi v0.4.0/go.mod h1:gYlFYJYyLUmgxgJFKE/NFno7ljcu2lCd8+aOlD8oLTw=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
@ -184,6 +198,7 @@ github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -210,6 +225,10 @@ github.com/uptrace/bun/dialect/pgdialect v1.1.8 h1:wayJhjYDPGv8tgOBLolbBtSFQ0Tih
github.com/uptrace/bun/dialect/pgdialect v1.1.8/go.mod h1:nNbU8PHTjTUM+CRtGmqyBb9zcuRAB8I680/qoFSmBUk=
github.com/uptrace/bun/driver/pgdriver v1.1.8 h1:gyL22axRQfjJS2Umq0erzJnp0bLOdUE8/USKZHPQB8o=
github.com/uptrace/bun/driver/pgdriver v1.1.8/go.mod h1:4tHK0h7a/UoldBoe9J3GU4tEYjr3mkd62U3Kq3PVk3E=
github.com/uptrace/bun/extra/bunotel v1.1.8 h1:Sg7XcLLI+KTalKg36aRpOJovYF9UUVuN5coBuIUCag8=
github.com/uptrace/bun/extra/bunotel v1.1.8/go.mod h1:1chzcC5wkd7FP+/SRUbmmqF0DL1ed+NgdHlGE+GIkPs=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.15 h1:s6BZwhj/2oZ9GSkfcTH8YRHjxj3MGo1j2Pg83Pc1xjw=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.15/go.mod h1:aZXwJzbTHnhh8vpd1bPjK68iTuNEtfvpHcJq73FdmhQ=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.14.1 h1:0Sx+C9404t2+DPuIJ3UpZFOEFhNG3wPxMj7uZHyZKFA=
github.com/urfave/cli/v2 v2.14.1/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
@ -229,6 +248,26 @@ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRT
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/contrib v1.0.0 h1:khwDCxdSspjOLmFnvMuSHd/5rPzbTx0+l6aURwtQdfE=
go.opentelemetry.io/contrib v1.0.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0 h1:9NkMW03wwEzPtP/KciZ4Ozu/Uz5ZA7kfqXJIObnrjGU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0/go.mod h1:548ZsYzmT4PL4zWKRd8q/N4z0Wxzn/ZxUE+lkEpwWQA=
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw=
go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo=
go.opentelemetry.io/otel/exporters/jaeger v1.9.0 h1:gAEgEVGDWwFjcis9jJTOJqZNxDzoZfR12WNIxr7g9Ww=
go.opentelemetry.io/otel/exporters/jaeger v1.9.0/go.mod h1:hquezOLVAybNW6vanIxkdLXTXvzlj2Vn3wevSP15RYs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.3.0/go.mod h1:PQLM+xJ3EMSZU9rMevmw+4nH1efyp23CW/nD9BlB3sg=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 h1:0uV0qzHk48i1SF8qRI8odMYiwPOLh9gBhiJFpj8H6JY=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0/go.mod h1:Fl1iS5ZhWgXXXTdJMuBSVsS5nkL5XluHbg97kjOuYU4=
go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs=
go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A=
go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs=
go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo=
go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4=
go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk=
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
@ -280,6 +319,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -5,8 +5,11 @@ import (
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel"
)
var tracer = otel.Tracer("gitea.dwysokinski.me/twhelp/core/internal/bundb")
func appendODSetClauses(q *bun.InsertQuery) *bun.InsertQuery {
return q.Set("rank_att = EXCLUDED.rank_att").
Set("score_att = EXCLUDED.score_att").

View File

@ -9,6 +9,7 @@ import (
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel/codes"
)
type Ennoblement struct {
@ -20,6 +21,9 @@ func NewEnnoblement(db *bun.DB) *Ennoblement {
}
func (e *Ennoblement) Create(ctx context.Context, params ...domain.CreateEnnoblementParams) error {
ctx, span := tracer.Start(ctx, "Ennoblement.Create")
defer span.End()
if len(params) == 0 {
return nil
}
@ -32,6 +36,8 @@ func (e *Ennoblement) Create(ctx context.Context, params ...domain.CreateEnnoble
Model(&ennoblements).
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("something went wrong while inserting ennoblements into db: %w", err)
}
@ -39,6 +45,9 @@ func (e *Ennoblement) Create(ctx context.Context, params ...domain.CreateEnnoble
}
func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsParams) ([]domain.Ennoblement, int64, error) {
ctx, span := tracer.Start(ctx, "Ennoblement.List")
defer span.End()
var ennoblements []model.Ennoblement
var count int
var err error
@ -49,6 +58,8 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
q, err = listEnnoblementsParamsApplier{params}.apply(q)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("listEnnoblementsParamsApplier.apply: %w", err)
}
@ -58,6 +69,8 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select ennoblements from the db: %w", err)
}
@ -65,6 +78,7 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
for _, ennoblement := range ennoblements {
result = append(result, ennoblement.ToDomain())
}
return result, int64(count), nil
}

View File

@ -8,6 +8,7 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
@ -22,6 +23,9 @@ func NewPlayer(db *bun.DB) *Player {
}
func (p *Player) CreateOrUpdate(ctx context.Context, params ...domain.CreatePlayerParams) error {
ctx, span := tracer.Start(ctx, "Player.CreateOrUpdate")
defer span.End()
if len(params) == 0 {
return nil
}
@ -42,6 +46,8 @@ func (p *Player) CreateOrUpdate(ctx context.Context, params ...domain.CreatePlay
Apply(appendODSetClauses).
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("something went wrong while inserting players into db: %w", err)
}
@ -49,6 +55,9 @@ func (p *Player) CreateOrUpdate(ctx context.Context, params ...domain.CreatePlay
}
func (p *Player) DeleteByID(ctx context.Context, ids ...int64) error {
ctx, span := tracer.Start(ctx, "Player.DeleteByID")
defer span.End()
if len(ids) == 0 {
return nil
}
@ -61,6 +70,8 @@ func (p *Player) DeleteByID(ctx context.Context, ids ...int64) error {
Set("tribe_id = NULL").
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("couldn't delete players: %w", err)
}
@ -68,6 +79,9 @@ func (p *Player) DeleteByID(ctx context.Context, ids ...int64) error {
}
func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]domain.Player, int64, error) {
ctx, span := tracer.Start(ctx, "Player.List")
defer span.End()
var players []model.Player
var count int
var err error
@ -78,6 +92,8 @@ func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]d
q, err = listPlayersParamsApplier{params}.apply(q)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("listPlayersParamsApplier.apply: %w", err)
}
@ -87,6 +103,8 @@ func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]d
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select players from the db: %w", err)
}

View File

@ -8,6 +8,9 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
@ -22,6 +25,9 @@ func NewServer(db *bun.DB) *Server {
}
func (s *Server) CreateOrUpdate(ctx context.Context, params ...domain.CreateServerParams) ([]domain.Server, error) {
ctx, span := tracer.Start(ctx, "Server.CreateOrUpdate")
defer span.End()
if len(params) == 0 {
return nil, nil
}
@ -45,6 +51,8 @@ func (s *Server) CreateOrUpdate(ctx context.Context, params ...domain.CreateServ
Set("open = EXCLUDED.open").
Returning("*").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("something went wrong while inserting servers into db: %w", err)
}
@ -52,10 +60,14 @@ func (s *Server) CreateOrUpdate(ctx context.Context, params ...domain.CreateServ
for _, srv := range servers {
result = append(result, srv.ToDomain())
}
return result, nil
}
func (s *Server) List(ctx context.Context, params domain.ListServersParams) ([]domain.Server, int64, error) {
ctx, span := tracer.Start(ctx, "Server.List")
defer span.End()
var servers []model.Server
var count int
var err error
@ -70,6 +82,8 @@ func (s *Server) List(ctx context.Context, params domain.ListServersParams) ([]d
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select servers from the db: %w", err)
}
@ -77,11 +91,19 @@ func (s *Server) List(ctx context.Context, params domain.ListServersParams) ([]d
for _, server := range servers {
result = append(result, server.ToDomain())
}
return result, int64(count), nil
}
func (s *Server) UpdateByKey(ctx context.Context, key string, params domain.UpdateServerParams) (domain.Server, error) {
ctx, span := tracer.Start(ctx, "Server.UpdateByKey", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
if params.IsZero() {
span.RecordError(domain.ErrNothingToUpdate)
span.SetStatus(codes.Error, domain.ErrNothingToUpdate.Error())
return domain.Server{}, domain.ErrNothingToUpdate
}
@ -94,6 +116,8 @@ func (s *Server) UpdateByKey(ctx context.Context, key string, params domain.Upda
Returning("*").
Exec(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Server{}, fmt.Errorf("couldn't update server (key=%s): %w", key, err)
}
if affected, _ := res.RowsAffected(); affected == 0 {

View File

@ -8,6 +8,9 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
@ -22,6 +25,9 @@ func NewTribe(db *bun.DB) *Tribe {
}
func (t *Tribe) CreateOrUpdate(ctx context.Context, params ...domain.CreateTribeParams) error {
ctx, span := tracer.Start(ctx, "Tribe.CreateOrUpdate")
defer span.End()
if len(params) == 0 {
return nil
}
@ -44,6 +50,8 @@ func (t *Tribe) CreateOrUpdate(ctx context.Context, params ...domain.CreateTribe
Apply(appendODSetClauses).
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("something went wrong while inserting tribes into db: %w", err)
}
@ -51,6 +59,9 @@ func (t *Tribe) CreateOrUpdate(ctx context.Context, params ...domain.CreateTribe
}
func (t *Tribe) DeleteByID(ctx context.Context, ids ...int64) error {
ctx, span := tracer.Start(ctx, "Tribe.DeleteByID")
defer span.End()
if len(ids) == 0 {
return nil
}
@ -62,6 +73,8 @@ func (t *Tribe) DeleteByID(ctx context.Context, ids ...int64) error {
Set("deleted_at = ?", time.Now()).
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("couldn't delete tribes: %w", err)
}
@ -69,6 +82,12 @@ func (t *Tribe) DeleteByID(ctx context.Context, ids ...int64) error {
}
func (t *Tribe) UpdateDominance(ctx context.Context, serverKey string, numPlayerVillages int64) error {
ctx, span := tracer.Start(ctx, "Tribe.UpdateDominance", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.Int64("server.num_player_villages", numPlayerVillages),
))
defer span.End()
q := t.db.NewUpdate().
Model(&model.Tribe{}).
Returning("NULL").
@ -80,6 +99,8 @@ func (t *Tribe) UpdateDominance(ctx context.Context, serverKey string, numPlayer
q = q.Set("dominance = 0")
}
if _, err := q.Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("couldn't update dominance (server=%s): %w", serverKey, err)
}
@ -87,6 +108,9 @@ func (t *Tribe) UpdateDominance(ctx context.Context, serverKey string, numPlayer
}
func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]domain.Tribe, int64, error) {
ctx, span := tracer.Start(ctx, "Tribe.List")
defer span.End()
var tribes []model.Tribe
var count int
var err error
@ -97,6 +121,8 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
q, err = listTribesParamsApplier{params}.apply(q)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("listTribesParamsApplier.apply: %w", err)
}
@ -106,6 +132,8 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select tribes from the db: %w", err)
}
@ -113,6 +141,7 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
for _, tribe := range tribes {
result = append(result, tribe.ToDomain())
}
return result, int64(count), nil
}

View File

@ -8,6 +8,9 @@ import (
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/uptrace/bun"
)
@ -21,6 +24,9 @@ func NewVersion(db *bun.DB) *Version {
}
func (v *Version) List(ctx context.Context, params domain.ListVersionsParams) ([]domain.Version, int64, error) {
ctx, span := tracer.Start(ctx, "Version.List")
defer span.End()
var versions []model.Version
var count int
var err error
@ -34,6 +40,8 @@ func (v *Version) List(ctx context.Context, params domain.ListVersionsParams) ([
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select versions from the db: %w", err)
}
@ -41,16 +49,24 @@ func (v *Version) List(ctx context.Context, params domain.ListVersionsParams) ([
for _, version := range versions {
result = append(result, version.ToDomain())
}
return result, int64(count), nil
}
func (v *Version) GetByCode(ctx context.Context, code string) (domain.Version, error) {
ctx, span := tracer.Start(ctx, "Version.GetByCode", trace.WithAttributes(
attribute.String("version.code", code),
))
defer span.End()
var version model.Version
if err := v.db.NewSelect().
Model(&version).
Where("code = ?", code).
Scan(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
if errors.Is(err, sql.ErrNoRows) {
return domain.Version{}, domain.VersionNotFoundError{VerCode: code}
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/bundb/internal/model"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/uptrace/bun"
@ -21,6 +22,9 @@ func NewVillage(db *bun.DB) *Village {
}
func (v *Village) CreateOrUpdate(ctx context.Context, params ...domain.CreateVillageParams) error {
ctx, span := tracer.Start(ctx, "Village.CreateOrUpdate")
defer span.End()
if len(params) == 0 {
return nil
}
@ -41,6 +45,8 @@ func (v *Village) CreateOrUpdate(ctx context.Context, params ...domain.CreateVil
Set("player_id = EXCLUDED.player_id").
Returning("NULL").
Exec(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("something went wrong while inserting villages into db: %w", err)
}
@ -48,6 +54,9 @@ func (v *Village) CreateOrUpdate(ctx context.Context, params ...domain.CreateVil
}
func (v *Village) List(ctx context.Context, params domain.ListVillagesParams) ([]domain.Village, int64, error) {
ctx, span := tracer.Start(ctx, "Village.List")
defer span.End()
var villages []model.Village
var count int
var err error
@ -62,6 +71,8 @@ func (v *Village) List(ctx context.Context, params domain.ListVillagesParams) ([
err = q.Scan(ctx)
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("couldn't select villages from the db: %w", err)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ThreeDotsLabs/watermill"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
@ -50,15 +51,24 @@ func (e *EnnoblementConsumer) Register(router *message.Router) {
}
func (e *EnnoblementConsumer) refresh(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "EnnoblementConsumer.refresh")
defer span.End()
var payload model.RefreshEnnoblementsCmdPayload
if err := e.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
e.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
if err := e.svc.Refresh(msg.Context(), payload.Key, payload.URL); err != nil {
span.SetAttributes(payload.OTELAttributes()...)
if err := e.svc.Refresh(ctx, payload.Key, payload.URL); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("EnnoblementService.Refresh: %w", err)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ThreeDotsLabs/watermill"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
@ -52,25 +53,34 @@ func (p *PlayerConsumer) Register(router *message.Router) {
}
func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
ctx, span := tracer.Start(msg.Context(), "PlayerConsumer.refresh")
defer span.End()
var payload model.ServerRefreshedEvPayload
if err := p.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
p.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil, nil
}
span.SetAttributes(payload.OTELAttributes()...)
if !payload.Open {
p.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil, nil
}
numPlayers, err := p.svc.Refresh(msg.Context(), payload.Key, payload.URL)
numPlayers, err := p.svc.Refresh(ctx, payload.Key, payload.URL)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("PlayerService.Refresh: %w", err)
}
@ -83,5 +93,6 @@ func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, erro
if err != nil {
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
return []*message.Message{ev}, nil
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ThreeDotsLabs/watermill"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
@ -85,16 +86,25 @@ func (s *ServerConsumer) Register(router *message.Router) {
}
func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
ctx, span := tracer.Start(msg.Context(), "ServerConsumer.refresh")
defer span.End()
var payload model.RefreshServersCmdPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil, nil
}
servers, err := s.svc.Refresh(msg.Context(), payload.Host, payload.VersionCode)
span.SetAttributes(payload.OTELAttributes()...)
servers, err := s.svc.Refresh(ctx, payload.Host, payload.VersionCode)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("ServerService.Refresh: %w", err)
}
@ -107,30 +117,44 @@ func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, erro
VersionCode: srv.VersionCode,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
events = append(events, ev)
}
return events, nil
}
func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "ServerConsumer.updateInfoAndConfig")
defer span.End()
var payload model.ServerRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
span.SetAttributes(payload.OTELAttributes()...)
if !payload.Open {
s.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil
}
if err := s.svc.UpdateInfoAndConfig(msg.Context(), payload.Key, payload.URL); err != nil {
if err := s.svc.UpdateInfoAndConfig(ctx, payload.Key, payload.URL); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerService.UpdateInfoAndConfig: %w", err)
}
@ -138,15 +162,24 @@ func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "ServerConsumer.updateNumPlayers")
defer span.End()
var payload model.PlayersRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
if err := s.svc.UpdateNumPlayers(msg.Context(), payload.Key, payload.NumPlayers); err != nil {
span.SetAttributes(payload.OTELAttributes()...)
if err := s.svc.UpdateNumPlayers(ctx, payload.Key, payload.NumPlayers); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerConsumer.UpdateNumPlayers: %w", err)
}
@ -154,15 +187,22 @@ func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "ServerConsumer.updateNumTribes")
defer span.End()
var payload model.TribesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
if err := s.svc.UpdateNumTribes(msg.Context(), payload.Key, payload.NumTribes); err != nil {
span.SetAttributes(payload.OTELAttributes()...)
if err := s.svc.UpdateNumTribes(ctx, payload.Key, payload.NumTribes); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumTribes: %w", err)
}
@ -170,15 +210,24 @@ func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
}
func (s *ServerConsumer) updateNumVillages(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "ServerConsumer.updateNumVillages")
defer span.End()
var payload model.VillagesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
if err := s.svc.UpdateNumVillages(msg.Context(), payload.Key, payload.NumVillages); err != nil {
span.SetAttributes(payload.OTELAttributes()...)
if err := s.svc.UpdateNumVillages(ctx, payload.Key, payload.NumVillages); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerConsumer.UpdateNumVillages: %w", err)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ThreeDotsLabs/watermill"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
@ -60,25 +61,34 @@ func (t *TribeConsumer) Register(router *message.Router) {
}
func (t *TribeConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
ctx, span := tracer.Start(msg.Context(), "TribeConsumer.refresh")
defer span.End()
var payload model.ServerRefreshedEvPayload
if err := t.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
t.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil, nil
}
span.SetAttributes(payload.OTELAttributes()...)
if !payload.Open {
t.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil, nil
}
numTribes, err := t.svc.Refresh(msg.Context(), payload.Key, payload.URL)
numTribes, err := t.svc.Refresh(ctx, payload.Key, payload.URL)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("TribeService.Refresh: %w", err)
}
@ -89,21 +99,33 @@ func (t *TribeConsumer) refresh(msg *message.Message) ([]*message.Message, error
NumTribes: numTribes,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
return []*message.Message{ev}, nil
}
func (t *TribeConsumer) updateDominance(msg *message.Message) error {
ctx, span := tracer.Start(msg.Context(), "TribeConsumer.updateDominance")
defer span.End()
var payload model.VillagesRefreshedEvPayload
if err := t.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
t.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil
}
if err := t.svc.UpdateDominance(msg.Context(), payload.Key, payload.NumPlayerVillages); err != nil {
span.SetAttributes(payload.OTELAttributes()...)
if err := t.svc.UpdateDominance(ctx, payload.Key, payload.NumPlayerVillages); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TribeService.UpdateDominance: %w", err)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ThreeDotsLabs/watermill"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
@ -54,25 +55,34 @@ func (v *VillageConsumer) Register(router *message.Router) {
}
func (v *VillageConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
ctx, span := tracer.Start(msg.Context(), "VillageConsumer.refresh")
defer span.End()
var payload model.ServerRefreshedEvPayload
if err := v.marshaler.Unmarshal(msg, &payload); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
v.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
})
return nil, nil
}
span.SetAttributes(payload.OTELAttributes()...)
if !payload.Open {
v.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"handler_name": message.HandlerNameFromCtx(ctx),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil, nil
}
res, err := v.svc.Refresh(msg.Context(), payload.Key, payload.URL)
res, err := v.svc.Refresh(ctx, payload.Key, payload.URL)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("VillageService.Refresh: %w", err)
}
@ -86,7 +96,10 @@ func (v *VillageConsumer) refresh(msg *message.Message) ([]*message.Message, err
NumBonusVillages: res.NumBonusVillages,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
return []*message.Message{ev}, nil
}

View File

@ -1,7 +1,17 @@
package model
import "go.opentelemetry.io/otel/attribute"
type RefreshEnnoblementsCmdPayload struct {
Key string
URL string
VersionCode string
}
func (p RefreshEnnoblementsCmdPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("payload.server.key", p.Key),
attribute.String("payload.server.url", p.URL),
attribute.String("payload.version.code", p.VersionCode),
}
}

View File

@ -1,8 +1,19 @@
package model
import "go.opentelemetry.io/otel/attribute"
type PlayersRefreshedEvPayload struct {
Key string
URL string
VersionCode string
NumPlayers int64
}
func (p PlayersRefreshedEvPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("payload.server.key", p.Key),
attribute.String("payload.server.url", p.URL),
attribute.String("payload.version.code", p.VersionCode),
attribute.Int64("payload.server.num_players", p.NumPlayers),
}
}

View File

@ -1,13 +1,31 @@
package model
import "go.opentelemetry.io/otel/attribute"
type RefreshServersCmdPayload struct {
Host string
VersionCode string
}
func (p RefreshServersCmdPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("version.host", p.Host),
attribute.String("version.code", p.VersionCode),
}
}
type ServerRefreshedEvPayload struct {
Key string
URL string
Open bool
VersionCode string
}
func (p ServerRefreshedEvPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("payload.server.key", p.Key),
attribute.String("payload.server.url", p.URL),
attribute.Bool("payload.server.open", p.Open),
attribute.String("payload.version.code", p.VersionCode),
}
}

View File

@ -1,8 +1,19 @@
package model
import "go.opentelemetry.io/otel/attribute"
type TribesRefreshedEvPayload struct {
Key string
URL string
VersionCode string
NumTribes int64
}
func (p TribesRefreshedEvPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("payload.server.key", p.Key),
attribute.String("payload.server.url", p.URL),
attribute.String("payload.version.code", p.VersionCode),
attribute.Int64("payload.server.num_tribes", p.NumTribes),
}
}

View File

@ -1,5 +1,7 @@
package model
import "go.opentelemetry.io/otel/attribute"
type VillagesRefreshedEvPayload struct {
Key string
URL string
@ -9,3 +11,15 @@ type VillagesRefreshedEvPayload struct {
NumBarbarianVillages int64
NumBonusVillages int64
}
func (p VillagesRefreshedEvPayload) OTELAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("payload.server.key", p.Key),
attribute.String("payload.server.url", p.URL),
attribute.String("payload.version.code", p.VersionCode),
attribute.Int64("payload.server.num_villages", p.NumVillages),
attribute.Int64("payload.server.num_player_villages", p.NumPlayerVillages),
attribute.Int64("payload.server.num_barbarian_villages", p.NumBarbarianVillages),
attribute.Int64("payload.server.num_bonus_villages", p.NumBonusVillages),
}
}

View File

@ -1,5 +1,7 @@
package msg
import "go.opentelemetry.io/otel"
//go:generate counterfeiter -generate
const (
@ -10,3 +12,7 @@ const (
villagesEventRefreshed = "villages.event.refreshed"
ennoblementsCmdRefresh = "ennoblements.cmd.refresh"
)
var (
tracer = otel.Tracer("gitea.dwysokinski.me/twhelp/core/internal/msg")
)

View File

@ -0,0 +1,60 @@
package msgotel
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// config represents the configuration options available for subscriber
// middlewares and publisher decorators.
type config struct {
tracer trace.Tracer
spanAttributes []attribute.KeyValue
propagators propagation.TextMapPropagator
}
// Option provides a convenience wrapper for simple options that can be
// represented as functions.
type Option func(*config)
func newConfig(opts ...Option) config {
cfg := config{
tracer: newTracer(otel.GetTracerProvider()),
propagators: otel.GetTextMapPropagator(),
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}
// WithSpanAttributes includes the given attributes to the generated Spans.
func WithSpanAttributes(attributes ...attribute.KeyValue) Option {
return func(c *config) {
c.spanAttributes = attributes
}
}
// WithTracerProvider specifies a tracer provider to use for creating a tracer.
// If this option isn't specified, then the global TracerProvider is used.
func WithTracerProvider(provider trace.TracerProvider) Option {
return func(c *config) {
if provider != nil {
c.tracer = newTracer(provider)
}
}
}
// WithPropagators configures specific propagators.
// If this option isn't specified, then the global TextMapPropagator is used.
func WithPropagators(p propagation.TextMapPropagator) Option {
return func(c *config) {
if p != nil {
c.propagators = p
}
}
}

View File

@ -0,0 +1,70 @@
package msgotel
import (
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
)
// Middleware defines a middleware that will add tracing.
func Middleware(options ...Option) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return HandlerDecorator(h, options...)
}
}
// HandlerDecorator decorates a watermill HandlerFunc to add tracing when a message is received.
func HandlerDecorator(h message.HandlerFunc, options ...Option) message.HandlerFunc {
cfg := newConfig(options...)
spanStartOpts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(cfg.spanAttributes...),
}
return func(msg *message.Message) ([]*message.Message, error) {
topic := message.SubscribeTopicFromCtx(msg.Context())
handlerName := message.HandlerNameFromCtx(msg.Context())
ctx, span := cfg.tracer.Start(
cfg.propagators.Extract(msg.Context(), propagation.MapCarrier(msg.Metadata)),
handlerName+" - "+topic+" process",
spanStartOpts...,
)
defer span.End()
span.SetAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topic),
semconv.MessagingOperationReceive,
)
msg.SetContext(ctx)
msgs, err := h(msg)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
for _, m := range msgs {
m.SetContext(trace.ContextWithSpan(m.Context(), span))
}
return msgs, err
}
}
// NoPublishHandlerDecorator decorates a watermill NoPublishHandlerFunc to add tracing when a message is received.
func NoPublishHandlerDecorator(h message.NoPublishHandlerFunc, options ...Option) message.NoPublishHandlerFunc {
decoratedHandler := HandlerDecorator(func(msg *message.Message) ([]*message.Message, error) {
return nil, h(msg)
}, options...)
return func(msg *message.Message) error {
_, err := decoratedHandler(msg)
return err
}
}

View File

@ -0,0 +1,122 @@
package msgotel_test
import (
"context"
"sync/atomic"
"testing"
"time"
"go.opentelemetry.io/otel/trace"
"github.com/stretchr/testify/assert"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stretchr/testify/require"
"gitea.dwysokinski.me/twhelp/core/internal/msg/msgotel"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)
func TestMiddleware(t *testing.T) {
t.Parallel()
pubSub := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, watermill.NopLogger{})
t.Cleanup(func() {
_ = pubSub.Close()
})
sr := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
propagators := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
msgotelOpts := []msgotel.Option{msgotel.WithTracerProvider(provider), msgotel.WithPropagators(propagators)}
decoratedPublisher := msgotel.NewPublisherDecorator(pubSub, msgotelOpts...)
t.Cleanup(func() {
_ = decoratedPublisher.Close()
})
router, err := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
require.NoError(t, err)
t.Cleanup(func() {
_ = router.Close()
})
router.AddPublisherDecorators(func(pub message.Publisher) (message.Publisher, error) {
return msgotel.NewPublisherDecorator(pub, msgotelOpts...), nil
})
waitCh := make(chan struct{})
router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
var counter int32
return func(msg *message.Message) ([]*message.Message, error) {
if message.HandlerNameFromCtx(msg.Context()) == "TestHandler2" && atomic.AddInt32(&counter, 1) == 2 {
defer func() {
close(waitCh)
}()
}
return h(msg)
}
}, msgotel.Middleware(msgotelOpts...))
router.AddHandler(
"TestHandler1",
"test_subscribe",
pubSub,
"test_publish",
pubSub,
func(msg *message.Message) ([]*message.Message, error) {
return []*message.Message{
message.NewMessage(watermill.NewUUID(), message.Payload("test")),
message.NewMessage(watermill.NewUUID(), message.Payload("test")),
}, nil
},
)
router.AddNoPublisherHandler(
"TestHandler2",
"test_publish",
pubSub,
func(msg *message.Message) error {
return nil
},
)
go func() {
require.NoError(t, router.Run(context.Background()))
}()
<-router.Running()
assert.NoError(t, decoratedPublisher.Publish(
"test_subscribe",
message.NewMessage(watermill.NewUUID(), message.Payload("test")),
))
select {
case <-waitCh:
case <-time.After(time.Second):
t.Fatal("timeout")
}
spans := sr.Ended()
require.Len(t, spans, 6)
assert.Equal(t, "gochannel.GoChannel - test_subscribe send", spans[0].Name())
assert.Equal(t, trace.SpanKindProducer, spans[0].SpanKind())
assert.Equal(t, "TestHandler1 - test_subscribe process", spans[1].Name())
assert.Equal(t, trace.SpanKindConsumer, spans[1].SpanKind())
assert.Equal(t, spans[0].SpanContext().TraceID(), spans[1].SpanContext().TraceID())
assert.Equal(t, "gochannel.GoChannel - test_publish send", spans[2].Name())
assert.Equal(t, trace.SpanKindProducer, spans[2].SpanKind())
assert.Equal(t, spans[0].SpanContext().TraceID(), spans[2].SpanContext().TraceID())
assert.Equal(t, "gochannel.GoChannel - test_publish send", spans[3].Name())
assert.Equal(t, trace.SpanKindProducer, spans[3].SpanKind())
assert.Equal(t, spans[0].SpanContext().TraceID(), spans[3].SpanContext().TraceID())
assert.Equal(t, "TestHandler2 - test_publish process", spans[4].Name())
assert.Equal(t, trace.SpanKindConsumer, spans[4].SpanKind())
assert.Equal(t, spans[0].SpanContext().TraceID(), spans[4].SpanContext().TraceID())
assert.Equal(t, "TestHandler2 - test_publish process", spans[5].Name())
assert.Equal(t, trace.SpanKindConsumer, spans[5].SpanKind())
assert.Equal(t, spans[0].SpanContext().TraceID(), spans[5].SpanContext().TraceID())
}

View File

@ -0,0 +1,11 @@
package msgotel
import (
"go.opentelemetry.io/otel/trace"
)
const instrumentationName = "gitea.dwysokinski.me/twhelp/core/internal/msg/msgotel"
func newTracer(provider trace.TracerProvider) trace.Tracer {
return provider.Tracer(instrumentationName)
}

View File

@ -0,0 +1,95 @@
package msgotel
import (
"fmt"
"strings"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
)
// PublisherDecorator decorates a standard watermill publisher to add tracing capabilities.
type PublisherDecorator struct {
pub message.Publisher
publisherName string
spanAttributes []attribute.KeyValue
tracer trace.Tracer
propagators propagation.TextMapPropagator
}
// NewPublisherDecorator instantiates a PublisherDecorator with a default name.
func NewPublisherDecorator(pub message.Publisher, options ...Option) *PublisherDecorator {
return NewNamedPublisherDecorator(structName(pub), pub, options...)
}
// NewNamedPublisherDecorator instantiates a PublisherDecorator with a provided name.
func NewNamedPublisherDecorator(name string, pub message.Publisher, opts ...Option) *PublisherDecorator {
cfg := newConfig(opts...)
return &PublisherDecorator{
pub: pub,
publisherName: name,
spanAttributes: cfg.spanAttributes,
tracer: cfg.tracer,
propagators: cfg.propagators,
}
}
// Publish implements the watermill Publisher interface and creates traces.
// Publishing of messages is delegated to the decorated Publisher.
func (p *PublisherDecorator) Publish(topic string, messages ...*message.Message) error {
if len(messages) == 0 {
return nil
}
publisherName := message.PublisherNameFromCtx(messages[0].Context())
if publisherName == "" {
publisherName = p.publisherName
}
_, span := p.tracer.Start(
messages[0].Context(),
publisherName+" - "+topic+" send",
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(p.spanAttributes...),
)
defer span.End()
span.SetAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topic),
semconv.MessagingOperationProcess,
)
for _, msg := range messages {
msg.SetContext(trace.ContextWithSpan(msg.Context(), span))
p.propagators.Inject(msg.Context(), propagation.MapCarrier(msg.Metadata))
}
err := p.pub.Publish(topic, messages...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}
// Close implements the watermill Publisher interface.
func (p *PublisherDecorator) Close() error {
return p.pub.Close()
}
func structName(v interface{}) string {
if s, ok := v.(fmt.Stringer); ok {
return s.String()
}
s := fmt.Sprintf("%T", v)
// trim the pointer marker, if any
return strings.TrimLeft(s, "*")
}

View File

@ -0,0 +1,78 @@
package msgotel_test
import (
"context"
"testing"
"time"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitea.dwysokinski.me/twhelp/core/internal/msg/msgotel"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func TestPublisherDecorator(t *testing.T) {
t.Parallel()
pubSub := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, watermill.NopLogger{})
t.Cleanup(func() {
_ = pubSub.Close()
})
sr := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
propagators := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
decoratedPublisher := msgotel.NewPublisherDecorator(
pubSub,
msgotel.WithTracerProvider(provider),
msgotel.WithPropagators(propagators),
msgotel.WithSpanAttributes(attribute.Bool("test", true)),
)
t.Cleanup(func() {
_ = decoratedPublisher.Close()
})
msgs := []*message.Message{
message.NewMessage(watermill.NewUUID(), message.Payload("msg 1")),
message.NewMessage(watermill.NewUUID(), message.Payload("msg 2")),
message.NewMessage(watermill.NewUUID(), message.Payload("msg 3")),
}
assert.NoError(t, decoratedPublisher.Publish("topic", msgs...))
spans := sr.Ended()
require.Len(t, spans, 1)
assert.Equal(t, "gochannel.GoChannel - topic send", spans[0].Name())
assert.Equal(t, trace.SpanKindProducer, spans[0].SpanKind())
assert.Equal(t, []attribute.KeyValue{
attribute.Bool("test", true),
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String("topic"),
semconv.MessagingOperationProcess,
}, spans[0].Attributes())
msgsCh, err := pubSub.Subscribe(context.Background(), "topic")
require.NoError(t, err)
receivedMsgs, _ := subscriber.BulkRead(msgsCh, len(msgs), time.Second)
require.Len(t, receivedMsgs, len(msgs))
for _, msg := range receivedMsgs {
ctx := propagators.Extract(context.Background(), propagation.MapCarrier(msg.Metadata))
spanCtx := trace.SpanContextFromContext(ctx)
assert.Equal(t, spans[0].SpanContext().TraceID(), spanCtx.TraceID())
assert.Equal(t, spans[0].SpanContext().SpanID(), spanCtx.SpanID())
assert.Equal(t, spans[0].SpanContext().IsSampled(), spanCtx.IsSampled())
}
}

View File

@ -7,6 +7,7 @@ import (
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/codes"
)
type EnnoblementPublisher struct {
@ -18,7 +19,10 @@ func NewEnnoblementPublisher(publisher message.Publisher, marshaler CommandEvent
return &EnnoblementPublisher{publisher: publisher, marshaler: marshaler}
}
func (e *EnnoblementPublisher) CmdRefresh(_ context.Context, payloads ...domain.RefreshEnnoblementsCmdPayload) error {
func (e *EnnoblementPublisher) CmdRefresh(ctx context.Context, payloads ...domain.RefreshEnnoblementsCmdPayload) error {
ctx, span := tracer.Start(ctx, "EnnoblementPublisher.CmdRefresh")
defer span.End()
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := e.marshaler.Marshal(model.RefreshEnnoblementsCmdPayload{
@ -27,14 +31,20 @@ func (e *EnnoblementPublisher) CmdRefresh(_ context.Context, payloads ...domain.
VersionCode: p.VersionCode,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("marshaler.Marshal: %w", err)
}
msg.SetContext(ctx)
msgs = append(msgs, msg)
}
err := e.publisher.Publish(ennoblementsCmdRefresh, msgs...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("publisher.Publish: %w", err)
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"go.opentelemetry.io/otel/codes"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
@ -20,7 +21,10 @@ func NewServerPublisher(publisher message.Publisher, marshaler CommandEventMarsh
return &ServerPublisher{publisher: publisher, marshaler: marshaler}
}
func (s *ServerPublisher) CmdRefresh(_ context.Context, payloads ...domain.RefreshServersCmdPayload) error {
func (s *ServerPublisher) CmdRefresh(ctx context.Context, payloads ...domain.RefreshServersCmdPayload) error {
ctx, span := tracer.Start(ctx, "EnnoblementPublisher.CmdRefresh")
defer span.End()
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := s.marshaler.Marshal(model.RefreshServersCmdPayload{
@ -28,14 +32,20 @@ func (s *ServerPublisher) CmdRefresh(_ context.Context, payloads ...domain.Refre
VersionCode: p.VersionCode,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("marshaler.Marshal: %w", err)
}
msg.SetContext(ctx)
msgs = append(msgs, msg)
}
err := s.publisher.Publish(serversCmdRefresh, msgs...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("publisher.Publish: %w", err)
}
return nil
}

View File

@ -6,6 +6,10 @@ import (
"fmt"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
)
@ -40,8 +44,16 @@ func NewEnnoblement(repo EnnoblementRepository, client EnnoblementsGetter) *Enno
}
func (e *Ennoblement) Refresh(ctx context.Context, key, url string) error {
ctx, span := tracer.Start(ctx, "Ennoblement.Refresh", trace.WithAttributes(
attribute.String("server.key", key),
attribute.String("server.url", url),
))
defer span.End()
latestEnnoblement, err := e.getLatestEnnoblement(ctx, key)
if err != nil && !errors.Is(err, errEnnoblementNotFound) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("EnnoblementService.getLatestEnnoblements: %w", err)
}
@ -50,8 +62,12 @@ func (e *Ennoblement) Refresh(ctx context.Context, key, url string) error {
since = latestEnnoblement.CreatedAt
}
span.SetAttributes(attribute.Int64("since", since.Unix()))
ennoblements, err := e.client.GetEnnoblements(ctx, url, since)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TWClient.GetEnnoblements: %w", err)
}
@ -71,6 +87,7 @@ func (e *Ennoblement) Refresh(ctx context.Context, key, url string) error {
})
}
if err := e.repo.Create(ctx, params...); err != nil {
span.RecordError(err)
return fmt.Errorf("EnnoblementRepository.Create: %w", err)
}
}
@ -79,6 +96,9 @@ func (e *Ennoblement) Refresh(ctx context.Context, key, url string) error {
}
func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsParams) ([]domain.Ennoblement, int64, error) {
ctx, span := tracer.Start(ctx, "Ennoblement.List")
defer span.End()
if len(params.Sort) == 0 {
params.Sort = []domain.EnnoblementSort{
{
@ -89,12 +109,15 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
}
if len(params.Sort) > ennoblementSortMaxLen {
return nil, 0, domain.ValidationError{
err := domain.ValidationError{
Field: "sort",
Err: domain.MaxLengthError{
Max: ennoblementSortMaxLen,
},
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, err
}
if params.Pagination.Limit == 0 {
@ -102,11 +125,15 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
}
if err := validatePagination(params.Pagination, ennoblementMaxLimit); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("validatePagination: %w", err)
}
ennoblements, count, err := e.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("EnnoblementRepository.List: %w", err)
}
@ -114,6 +141,11 @@ func (e *Ennoblement) List(ctx context.Context, params domain.ListEnnoblementsPa
}
func (e *Ennoblement) getLatestEnnoblement(ctx context.Context, key string) (domain.Ennoblement, error) {
ctx, span := tracer.Start(ctx, "Ennoblement.getLatestEnnoblement", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
ennoblements, _, err := e.repo.List(ctx, domain.ListEnnoblementsParams{
ServerKeys: []string{key},
Pagination: domain.Pagination{
@ -125,10 +157,14 @@ func (e *Ennoblement) getLatestEnnoblement(ctx context.Context, key string) (dom
Count: false,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Ennoblement{}, fmt.Errorf("EnnoblementRepository.List: %w", err)
}
if len(ennoblements) == 0 {
span.RecordError(errEnnoblementNotFound)
return domain.Ennoblement{}, errEnnoblementNotFound
}
return ennoblements[0], nil
}

View File

@ -6,6 +6,9 @@ import (
"sort"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const (
@ -36,16 +39,28 @@ func NewPlayer(repo PlayerRepository, client PlayersGetter) *Player {
}
func (p *Player) Refresh(ctx context.Context, key, url string) (int64, error) {
ctx, span := tracer.Start(ctx, "Player.Refresh", trace.WithAttributes(
attribute.String("server.key", key),
attribute.String("server.url", url),
))
defer span.End()
players, err := p.client.GetPlayers(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, fmt.Errorf("TWClient.GetPlayers: %w", err)
}
if err := p.createOrUpdate(ctx, key, players); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
if err := p.delete(ctx, key, players); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
@ -53,6 +68,11 @@ func (p *Player) Refresh(ctx context.Context, key, url string) (int64, error) {
}
func (p *Player) createOrUpdate(ctx context.Context, key string, players []domain.BasePlayer) error {
ctx, span := tracer.Start(ctx, "Player.createOrUpdate", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
for i := 0; i < len(players); i += playerChunkSize {
end := i + playerChunkSize
if end > len(players) {
@ -69,6 +89,7 @@ func (p *Player) createOrUpdate(ctx context.Context, key string, players []domai
})
}
if err := p.repo.CreateOrUpdate(ctx, params...); err != nil {
span.RecordError(err)
return fmt.Errorf("PlayerRepository.CreateOrUpdate: %w", err)
}
}
@ -77,6 +98,11 @@ func (p *Player) createOrUpdate(ctx context.Context, key string, players []domai
}
func (p *Player) delete(ctx context.Context, key string, existing []domain.BasePlayer) error {
ctx, span := tracer.Start(ctx, "Player.delete", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
//nolint:prealloc
var playersToDelete []int64
var lastID int64
@ -99,6 +125,7 @@ func (p *Player) delete(ctx context.Context, key string, existing []domain.BaseP
},
})
if err != nil {
span.RecordError(err)
return fmt.Errorf("PlayerRepository.List: %w", err)
}
@ -120,6 +147,8 @@ func (p *Player) delete(ctx context.Context, key string, existing []domain.BaseP
}
if err := p.repo.DeleteByID(ctx, playersToDelete...); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("PlayerRepository.DeleteByID: %w", err)
}
@ -127,6 +156,9 @@ func (p *Player) delete(ctx context.Context, key string, existing []domain.BaseP
}
func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]domain.Player, int64, error) {
ctx, span := tracer.Start(ctx, "Player.List")
defer span.End()
if len(params.Sort) == 0 {
params.Sort = []domain.PlayerSort{
{
@ -137,12 +169,15 @@ func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]d
}
if len(params.Sort) > playerSortMaxLen {
return nil, 0, domain.ValidationError{
err := domain.ValidationError{
Field: "sort",
Err: domain.MaxLengthError{
Max: playerSortMaxLen,
},
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, err
}
if params.Pagination.Limit == 0 {
@ -150,11 +185,15 @@ func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]d
}
if err := validatePagination(params.Pagination, playerMaxLimit); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("validatePagination: %w", err)
}
players, count, err := p.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("PlayerRepository.List: %w", err)
}
@ -162,6 +201,12 @@ func (p *Player) List(ctx context.Context, params domain.ListPlayersParams) ([]d
}
func (p *Player) GetByServerKeyAndID(ctx context.Context, serverKey string, id int64, includeTribe bool) (domain.Player, error) {
ctx, span := tracer.Start(ctx, "Player.GetByServerKeyAndID", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.Int64("player.id", id),
))
defer span.End()
players, _, err := p.repo.List(ctx, domain.ListPlayersParams{
IDs: []int64{id},
ServerKeys: []string{serverKey},
@ -171,12 +216,18 @@ func (p *Player) GetByServerKeyAndID(ctx context.Context, serverKey string, id i
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Player{}, fmt.Errorf("PlayerRepository.List: %w", err)
}
if len(players) == 0 {
return domain.Player{}, domain.PlayerNotFoundError{
err = domain.PlayerNotFoundError{
ID: id,
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Player{}, err
}
return players[0], nil
}

View File

@ -6,6 +6,9 @@ import (
"time"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const (
@ -37,8 +40,16 @@ func NewServer(repo ServerRepository, client OpenServersConfigsGetter) *Server {
}
func (s *Server) Refresh(ctx context.Context, host, versionCode string) ([]domain.Server, error) {
ctx, span := tracer.Start(ctx, "Server.Refresh", trace.WithAttributes(
attribute.String("version.code", versionCode),
attribute.String("version.host", host),
))
defer span.End()
openServers, err := s.client.GetOpenServers(ctx, host)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("TWClient.GetOpenServers: %w", err)
}
@ -48,6 +59,8 @@ func (s *Server) Refresh(ctx context.Context, host, versionCode string) ([]domai
Count: false,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("ServerRepository.List: %w", err)
}
@ -57,6 +70,8 @@ func (s *Server) Refresh(ctx context.Context, host, versionCode string) ([]domai
Count: false,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("ServerRepository.List: %w", err)
}
@ -88,24 +103,39 @@ func (s *Server) Refresh(ctx context.Context, host, versionCode string) ([]domai
upsertedServers, err := s.repo.CreateOrUpdate(ctx, params...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("ServerRepository.CreateOrUpdate: %w", err)
}
return upsertedServers, nil
}
func (s *Server) UpdateInfoAndConfig(ctx context.Context, key, url string) error {
ctx, span := tracer.Start(ctx, "Server.UpdateInfoAndConfig", trace.WithAttributes(
attribute.String("server.key", key),
attribute.String("server.url", url),
))
defer span.End()
cfg, err := s.client.GetServerConfig(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TWClient.GetServerConfig: %w", err)
}
buildingInfo, err := s.client.GetBuildingInfo(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TWClient.GetBuildingInfo: %w", err)
}
unitInfo, err := s.client.GetUnitInfo(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TWClient.GetUnitInfo: %w", err)
}
@ -124,6 +154,8 @@ func (s *Server) UpdateInfoAndConfig(ctx context.Context, key, url string) error
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
@ -131,6 +163,12 @@ func (s *Server) UpdateInfoAndConfig(ctx context.Context, key, url string) error
}
func (s *Server) UpdateNumPlayers(ctx context.Context, key string, numPlayers int64) error {
ctx, span := tracer.Start(ctx, "Server.UpdateNumPlayers", trace.WithAttributes(
attribute.String("server.key", key),
attribute.Int64("server.num_players", numPlayers),
))
defer span.End()
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumPlayers: domain.NullInt64{
Valid: true,
@ -139,12 +177,21 @@ func (s *Server) UpdateNumPlayers(ctx context.Context, key string, numPlayers in
PlayerDataUpdatedAt: time.Now(),
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func (s *Server) UpdateNumTribes(ctx context.Context, key string, numTribes int64) error {
ctx, span := tracer.Start(ctx, "Server.UpdateNumTribes", trace.WithAttributes(
attribute.String("server.key", key),
attribute.Int64("server.num_tribes", numTribes),
))
defer span.End()
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumTribes: domain.NullInt64{
Valid: true,
@ -153,12 +200,21 @@ func (s *Server) UpdateNumTribes(ctx context.Context, key string, numTribes int6
TribeDataUpdatedAt: time.Now(),
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func (s *Server) UpdateNumVillages(ctx context.Context, key string, numVillages int64) error {
ctx, span := tracer.Start(ctx, "Server.UpdateNumVillages", trace.WithAttributes(
attribute.String("server.key", key),
attribute.Int64("server.num_villages", numVillages),
))
defer span.End()
_, err := s.repo.UpdateByKey(ctx, key, domain.UpdateServerParams{
NumVillages: domain.NullInt64{
Valid: true,
@ -167,22 +223,32 @@ func (s *Server) UpdateNumVillages(ctx context.Context, key string, numVillages
VillageDataUpdatedAt: time.Now(),
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerRepository.UpdateByKey: %w", err)
}
return nil
}
func (s *Server) List(ctx context.Context, params domain.ListServersParams) ([]domain.Server, int64, error) {
ctx, span := tracer.Start(ctx, "Server.List")
defer span.End()
if params.Pagination.Limit == 0 {
params.Pagination.Limit = serverMaxLimit
}
if err := validatePagination(params.Pagination, serverMaxLimit); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("validatePagination: %w", err)
}
servers, count, err := s.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("ServerRepository.List: %w", err)
}
@ -190,6 +256,12 @@ func (s *Server) List(ctx context.Context, params domain.ListServersParams) ([]d
}
func (s *Server) GetNormalByVersionCodeAndKey(ctx context.Context, versionCode, key string) (domain.Server, error) {
ctx, span := tracer.Start(ctx, "Server.GetNormalByVersionCodeAndKey", trace.WithAttributes(
attribute.String("version.code", versionCode),
attribute.String("server.key", key),
))
defer span.End()
servers, _, err := s.repo.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
@ -202,13 +274,19 @@ func (s *Server) GetNormalByVersionCodeAndKey(ctx context.Context, versionCode,
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Server{}, fmt.Errorf("ServerRepository.List: %w", err)
}
if len(servers) == 0 {
return domain.Server{}, domain.ServerNotFoundError{
err = domain.ServerNotFoundError{
Key: key,
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Server{}, err
}
return servers[0], nil
}

View File

@ -1,3 +1,9 @@
package service
import "go.opentelemetry.io/otel"
//go:generate counterfeiter -generate
var (
tracer = otel.Tracer("gitea.dwysokinski.me/twhelp/core/internal/service")
)

View File

@ -6,6 +6,9 @@ import (
"sort"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const (
@ -37,16 +40,28 @@ func NewTribe(repo TribeRepository, client TribesGetter) *Tribe {
}
func (t *Tribe) Refresh(ctx context.Context, key, url string) (int64, error) {
ctx, span := tracer.Start(ctx, "Tribe.Refresh", trace.WithAttributes(
attribute.String("server.key", key),
attribute.String("server.url", url),
))
defer span.End()
tribes, err := t.client.GetTribes(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, fmt.Errorf("TWClient.GetTribes: %w", err)
}
if err = t.createOrUpdate(ctx, key, tribes); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
if err = t.delete(ctx, key, tribes); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
@ -54,6 +69,11 @@ func (t *Tribe) Refresh(ctx context.Context, key, url string) (int64, error) {
}
func (t *Tribe) createOrUpdate(ctx context.Context, key string, tribes []domain.BaseTribe) error {
ctx, span := tracer.Start(ctx, "Tribe.createOrUpdate", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
for i := 0; i < len(tribes); i += tribeChunkSize {
end := i + tribeChunkSize
if end > len(tribes) {
@ -70,6 +90,7 @@ func (t *Tribe) createOrUpdate(ctx context.Context, key string, tribes []domain.
})
}
if err := t.repo.CreateOrUpdate(ctx, params...); err != nil {
span.RecordError(err)
return fmt.Errorf("TribeRepository.CreateOrUpdate: %w", err)
}
}
@ -78,6 +99,11 @@ func (t *Tribe) createOrUpdate(ctx context.Context, key string, tribes []domain.
}
func (t *Tribe) delete(ctx context.Context, key string, existing []domain.BaseTribe) error {
ctx, span := tracer.Start(ctx, "Tribe.delete", trace.WithAttributes(
attribute.String("server.key", key),
))
defer span.End()
//nolint:prealloc
var tribesToDelete []int64
var lastID int64
@ -100,6 +126,7 @@ func (t *Tribe) delete(ctx context.Context, key string, existing []domain.BaseTr
},
})
if err != nil {
span.RecordError(err)
return fmt.Errorf("TribeRepository.List: %w", err)
}
@ -121,6 +148,8 @@ func (t *Tribe) delete(ctx context.Context, key string, existing []domain.BaseTr
}
if err := t.repo.DeleteByID(ctx, tribesToDelete...); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TribeRepository.DeleteByID: %w", err)
}
@ -128,13 +157,24 @@ func (t *Tribe) delete(ctx context.Context, key string, existing []domain.BaseTr
}
func (t *Tribe) UpdateDominance(ctx context.Context, serverKey string, numPlayerVillages int64) error {
ctx, span := tracer.Start(ctx, "Tribe.UpdateDominance", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.Int64("server.num_player_villages", numPlayerVillages),
))
defer span.End()
if err := t.repo.UpdateDominance(ctx, serverKey, numPlayerVillages); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("TribeRepository.UpdateDominance: %w", err)
}
return nil
}
func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]domain.Tribe, int64, error) {
ctx, span := tracer.Start(ctx, "Tribe.List")
defer span.End()
if len(params.Sort) == 0 {
params.Sort = []domain.TribeSort{
{
@ -145,12 +185,15 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
}
if len(params.Sort) > tribeSortMaxLen {
return nil, 0, domain.ValidationError{
err := domain.ValidationError{
Field: "sort",
Err: domain.MaxLengthError{
Max: tribeSortMaxLen,
},
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, err
}
if params.Pagination.Limit == 0 {
@ -158,11 +201,15 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
}
if err := validatePagination(params.Pagination, tribeMaxLimit); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("validatePagination: %w", err)
}
tribes, count, err := t.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("TribeRepository.List: %w", err)
}
@ -170,6 +217,12 @@ func (t *Tribe) List(ctx context.Context, params domain.ListTribesParams) ([]dom
}
func (t *Tribe) GetByServerKeyAndID(ctx context.Context, serverKey string, id int64) (domain.Tribe, error) {
ctx, span := tracer.Start(ctx, "Tribe.GetByServerKeyAndID", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.Int64("tribe.id", id),
))
defer span.End()
tribes, _, err := t.repo.List(ctx, domain.ListTribesParams{
IDs: []int64{id},
ServerKeys: []string{serverKey},
@ -178,17 +231,29 @@ func (t *Tribe) GetByServerKeyAndID(ctx context.Context, serverKey string, id in
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Tribe{}, fmt.Errorf("TribeRepository.List: %w", err)
}
if len(tribes) == 0 {
return domain.Tribe{}, domain.TribeNotFoundError{
err = domain.TribeNotFoundError{
ID: id,
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Tribe{}, err
}
return tribes[0], nil
}
func (t *Tribe) GetByServerKeyAndTag(ctx context.Context, serverKey string, tag string) (domain.Tribe, error) {
ctx, span := tracer.Start(ctx, "Tribe.GetByServerKeyAndTag", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.String("tribe.tag", tag),
))
defer span.End()
tribes, _, err := t.repo.List(ctx, domain.ListTribesParams{
Tags: []string{tag},
ServerKeys: []string{serverKey},
@ -197,12 +262,18 @@ func (t *Tribe) GetByServerKeyAndTag(ctx context.Context, serverKey string, tag
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Tribe{}, fmt.Errorf("TribeRepository.List: %w", err)
}
if len(tribes) == 0 {
return domain.Tribe{}, domain.TribeNotFoundError{
err = domain.TribeNotFoundError{
Tag: tag,
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Tribe{}, err
}
return tribes[0], nil
}

119
internal/service/updater.go Normal file
View File

@ -0,0 +1,119 @@
package service
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/codes"
)
type VersionLister interface {
List(ctx context.Context, params domain.ListVersionsParams) ([]domain.Version, int64, error)
}
type ServerLister interface {
List(ctx context.Context, params domain.ListServersParams) ([]domain.Server, int64, error)
}
type ServerPublisher interface {
CmdRefresh(_ context.Context, payloads ...domain.RefreshServersCmdPayload) error
}
type EnnoblementPublisher interface {
CmdRefresh(_ context.Context, payloads ...domain.RefreshEnnoblementsCmdPayload) error
}
type Updater struct {
versionSvc VersionLister
serverSvc ServerLister
serverPublisher ServerPublisher
ennoblementPublisher EnnoblementPublisher
}
func NewUpdater(
versionSvc VersionLister,
serverSvc ServerLister,
serverPublisher ServerPublisher,
ennoblementPublisher EnnoblementPublisher,
) *Updater {
return &Updater{
versionSvc: versionSvc,
serverSvc: serverSvc,
serverPublisher: serverPublisher,
ennoblementPublisher: ennoblementPublisher,
}
}
func (u *Updater) UpdateData(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "Updater.UpdateData")
defer span.End()
versions, _, err := u.versionSvc.List(ctx, domain.ListVersionsParams{Count: false})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("VersionService.List: %w", err)
}
payloads := make([]domain.RefreshServersCmdPayload, 0, len(versions))
for _, v := range versions {
payloads = append(payloads, domain.RefreshServersCmdPayload{
Host: v.Host,
VersionCode: v.Code,
})
}
err = u.serverPublisher.CmdRefresh(ctx, payloads...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerPublisher.CmdRefresh: %w", err)
}
return nil
}
func (u *Updater) UpdateEnnoblements(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "Updater.UpdateEnnoblements")
defer span.End()
const paginationLimit = 2000
servers, _, err := u.serverSvc.List(ctx, domain.ListServersParams{
Special: domain.NullBool{
Bool: false,
Valid: true,
},
Open: domain.NullBool{
Bool: true,
Valid: true,
},
Pagination: domain.Pagination{
Limit: paginationLimit,
},
Count: false,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("ServerService.List: %w", err)
}
payloads := make([]domain.RefreshEnnoblementsCmdPayload, 0, len(servers))
for _, srv := range servers {
payloads = append(payloads, domain.RefreshEnnoblementsCmdPayload{
Key: srv.Key,
URL: srv.URL,
VersionCode: srv.VersionCode,
})
}
err = u.ennoblementPublisher.CmdRefresh(ctx, payloads...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("EnnoblementPublisher.CmdRefresh: %w", err)
}
return nil
}

View File

@ -5,6 +5,9 @@ import (
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
type VersionRepository interface {
@ -21,17 +24,31 @@ func NewVersion(repo VersionRepository) *Version {
}
func (v *Version) List(ctx context.Context, params domain.ListVersionsParams) ([]domain.Version, int64, error) {
ctx, span := tracer.Start(ctx, "Version.List")
defer span.End()
versions, count, err := v.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("VersionRepository.List: %w", err)
}
return versions, count, nil
}
func (v *Version) GetByCode(ctx context.Context, code string) (domain.Version, error) {
ctx, span := tracer.Start(ctx, "Version.GetByCode", trace.WithAttributes(
attribute.String("version.code", code),
))
defer span.End()
version, err := v.repo.GetByCode(ctx, code)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Version{}, fmt.Errorf("VersionRepository.GetByCode: %w", err)
}
return version, nil
}

View File

@ -5,6 +5,9 @@ import (
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const (
@ -33,8 +36,16 @@ func NewVillage(repo VillageRepository, client VillagesGetter) *Village {
}
func (v *Village) Refresh(ctx context.Context, key, url string) (domain.RefreshVillagesResult, error) {
ctx, span := tracer.Start(ctx, "Village.Refresh", trace.WithAttributes(
attribute.String("server.key", key),
attribute.String("server.url", url),
))
defer span.End()
villages, err := v.client.GetVillages(ctx, url)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.RefreshVillagesResult{}, fmt.Errorf("TWClient.GetVillages: %w", err)
}
@ -66,6 +77,7 @@ func (v *Village) Refresh(ctx context.Context, key, url string) (domain.RefreshV
}
}
if err := v.repo.CreateOrUpdate(ctx, params...); err != nil {
span.RecordError(err)
return domain.RefreshVillagesResult{}, fmt.Errorf("VillageRepository.CreateOrUpdate: %w", err)
}
}
@ -74,16 +86,23 @@ func (v *Village) Refresh(ctx context.Context, key, url string) (domain.RefreshV
}
func (v *Village) List(ctx context.Context, params domain.ListVillagesParams) ([]domain.Village, int64, error) {
ctx, span := tracer.Start(ctx, "Village.List")
defer span.End()
if params.Pagination.Limit == 0 {
params.Pagination.Limit = villageMaxLimit
}
if err := validatePagination(params.Pagination, villageMaxLimit); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("validatePagination: %w", err)
}
villages, count, err := v.repo.List(ctx, params)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, 0, fmt.Errorf("VillageRepository.List: %w", err)
}
@ -96,6 +115,12 @@ func (v *Village) GetByServerKeyAndID(
id int64,
includePlayer, includePlayerTribe bool,
) (domain.Village, error) {
ctx, span := tracer.Start(ctx, "Village.GetByServerKeyAndID", trace.WithAttributes(
attribute.String("server.key", serverKey),
attribute.Int64("village.id", id),
))
defer span.End()
villages, _, err := v.repo.List(ctx, domain.ListVillagesParams{
IDs: []int64{id},
ServerKeys: []string{serverKey},
@ -106,12 +131,18 @@ func (v *Village) GetByServerKeyAndID(
},
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Village{}, fmt.Errorf("VillageRepository.List: %w", err)
}
if len(villages) == 0 {
return domain.Village{}, domain.VillageNotFoundError{
err = domain.VillageNotFoundError{
ID: id,
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.Village{}, err
}
return villages[0], nil
}

View File

@ -14,9 +14,11 @@ import (
"strings"
"time"
"github.com/elliotchance/phpserialize"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/elliotchance/phpserialize"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const (
@ -88,25 +90,32 @@ func NewClient(opts ...ClientOption) *Client {
}
func (c *Client) GetOpenServers(ctx context.Context, baseURL string) ([]domain.OpenServer, error) {
ctx, span := tracer.Start(ctx, "Client.GetOpenServers", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
resp, err := c.get(ctx, buildURL(baseURL, endpointGetServers))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.get: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
_, _ = io.Copy(io.Discard, resp.Body)
return nil, fmt.Errorf("Non-OK HTTP status: %d", resp.StatusCode)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("couldn't read response body: %w", err)
}
m, err := phpserialize.UnmarshalAssociativeArray(b)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("phpserialize.UnmarshalAssociativeArray: %w", err)
}
@ -114,12 +123,18 @@ func (c *Client) GetOpenServers(ctx context.Context, baseURL string) ([]domain.O
for key, val := range m {
keyStr, ok := key.(string)
if !ok || keyStr == "" {
return nil, fmt.Errorf("%w: %v", ErrInvalidServerKey, key)
err = fmt.Errorf("%w: %v", ErrInvalidServerKey, key)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
urlStr, ok := val.(string)
if !ok || urlStr == "" {
return nil, fmt.Errorf("%w: %v", ErrInvalidServerURL, val)
err = fmt.Errorf("%w: %v", ErrInvalidServerURL, val)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
servers = append(servers, domain.OpenServer{
@ -132,9 +147,16 @@ func (c *Client) GetOpenServers(ctx context.Context, baseURL string) ([]domain.O
}
func (c *Client) GetServerConfig(ctx context.Context, baseURL string) (domain.ServerConfig, error) {
ctx, span := tracer.Start(ctx, "Client.GetServerConfig", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
var cfg serverConfig
if err := c.getXML(ctx, buildURL(baseURL, endpointConfig), &cfg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.ServerConfig{}, fmt.Errorf("c.getXML: %w", err)
}
@ -142,9 +164,16 @@ func (c *Client) GetServerConfig(ctx context.Context, baseURL string) (domain.Se
}
func (c *Client) GetBuildingInfo(ctx context.Context, baseURL string) (domain.BuildingInfo, error) {
ctx, span := tracer.Start(ctx, "Client.GetBuildingInfo", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
var info buildingInfo
if err := c.getXML(ctx, buildURL(baseURL, endpointBuildingInfo), &info); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.BuildingInfo{}, fmt.Errorf("c.getXML: %w", err)
}
@ -152,9 +181,16 @@ func (c *Client) GetBuildingInfo(ctx context.Context, baseURL string) (domain.Bu
}
func (c *Client) GetUnitInfo(ctx context.Context, baseURL string) (domain.UnitInfo, error) {
ctx, span := tracer.Start(ctx, "Client.GetUnitInfo", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
var info unitInfo
if err := c.getXML(ctx, buildURL(baseURL, endpointUnitInfo), &info); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return domain.UnitInfo{}, fmt.Errorf("c.getXML: %w", err)
}
@ -162,13 +198,22 @@ func (c *Client) GetUnitInfo(ctx context.Context, baseURL string) (domain.UnitIn
}
func (c *Client) GetTribes(ctx context.Context, baseURL string) ([]domain.BaseTribe, error) {
ctx, span := tracer.Start(ctx, "Client.GetTribes", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
od, err := c.getOD(ctx, baseURL, true)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getOD: %w", err)
}
records, err := c.getCSV(ctx, buildURL(baseURL, endpointTribes), fieldsPerRecordTribe)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getCSV: %w", err)
}
@ -176,6 +221,8 @@ func (c *Client) GetTribes(ctx context.Context, baseURL string) ([]domain.BaseTr
for _, rec := range records {
tribe, err := parseTribeRecord(rec, od)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("parseTribeRecord: %w", err)
}
@ -190,13 +237,22 @@ func (c *Client) GetTribes(ctx context.Context, baseURL string) ([]domain.BaseTr
}
func (c *Client) GetPlayers(ctx context.Context, baseURL string) ([]domain.BasePlayer, error) {
ctx, span := tracer.Start(ctx, "Client.GetPlayers", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
od, err := c.getOD(ctx, baseURL, false)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getOD: %w", err)
}
records, err := c.getCSV(ctx, buildURL(baseURL, endpointPlayers), fieldsPerRecordPlayer)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getCSV: %w", err)
}
@ -204,6 +260,8 @@ func (c *Client) GetPlayers(ctx context.Context, baseURL string) ([]domain.BaseP
for _, rec := range records {
player, err := parsePlayerRecord(rec, od)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("parsePlayerRecord: %w", err)
}
@ -218,6 +276,12 @@ func (c *Client) GetPlayers(ctx context.Context, baseURL string) ([]domain.BaseP
}
func (c *Client) getOD(ctx context.Context, baseURL string, tribe bool) (map[int64]domain.OpponentsDefeated, error) {
ctx, span := tracer.Start(ctx, "Client.getOD", trace.WithAttributes(
attribute.String("base_url", baseURL),
attribute.Bool("tribe", tribe),
))
defer span.End()
m := make(map[int64]domain.OpponentsDefeated)
urls := buildODURLs(baseURL, tribe)
@ -228,12 +292,16 @@ func (c *Client) getOD(ctx context.Context, baseURL string, tribe bool) (map[int
records, err := c.getCSV(ctx, u, fieldsPerRecordOD)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getCSV: %w", err)
}
for _, rec := range records {
parsed, err := parseODRecord(rec)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("parseODRecord: %w", err)
}
@ -262,8 +330,15 @@ func (c *Client) getOD(ctx context.Context, baseURL string, tribe bool) (map[int
}
func (c *Client) GetVillages(ctx context.Context, baseURL string) ([]domain.BaseVillage, error) {
ctx, span := tracer.Start(ctx, "Client.GetVillages", trace.WithAttributes(
attribute.String("base_url", baseURL),
))
defer span.End()
records, err := c.getCSV(ctx, buildURL(baseURL, endpointVillages), fieldsPerRecordVillage)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getCSV: %w", err)
}
@ -271,6 +346,8 @@ func (c *Client) GetVillages(ctx context.Context, baseURL string) ([]domain.Base
for _, rec := range records {
village, err := parseVillageRecord(rec)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("parseVillageRecord: %w", err)
}
@ -285,8 +362,16 @@ func (c *Client) GetVillages(ctx context.Context, baseURL string) ([]domain.Base
}
func (c *Client) GetEnnoblements(ctx context.Context, baseURL string, since time.Time) ([]domain.BaseEnnoblement, error) {
ctx, span := tracer.Start(ctx, "Client.GetEnnoblements", trace.WithAttributes(
attribute.String("base_url", baseURL),
attribute.Int64("since", since.Unix()),
))
defer span.End()
records, err := c.getCSV(ctx, buildEnnoblementURL(baseURL, since), fieldsPerRecordEnnoblement)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("c.getCSV: %w", err)
}
@ -295,7 +380,10 @@ func (c *Client) GetEnnoblements(ctx context.Context, baseURL string, since time
for _, rec := range records {
createdAt, err := parseTimestamp(rec[1])
if err != nil {
return nil, NewParseError(err, rec, "ennoblement.CreatedAt")
err = NewParseError(err, rec, "ennoblement.CreatedAt")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
if !createdAt.After(since) {
@ -309,6 +397,8 @@ func (c *Client) GetEnnoblements(ctx context.Context, baseURL string, since time
for _, rec := range records {
ennoblement, err := parseEnnoblementRecord(rec)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("parseEnnoblementRecord: %w", err)
}
@ -330,10 +420,6 @@ func (c *Client) getXML(ctx context.Context, url string, v any) error {
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
_, _ = io.Copy(io.Discard, resp.Body)
return fmt.Errorf("got non-ok HTTP status: %d", resp.StatusCode)
}
if err := xml.NewDecoder(resp.Body).Decode(v); err != nil {
return fmt.Errorf("xml.Decode: %w", err)
@ -350,10 +436,6 @@ func (c *Client) getCSV(ctx context.Context, url string, fieldsPerRecord int) ([
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
_, _ = io.Copy(io.Discard, resp.Body)
return nil, fmt.Errorf("got non-ok HTTP status: %d", resp.StatusCode)
}
r := csv.NewReader(resp.Body)
r.Comma = ','
@ -381,6 +463,12 @@ func (c *Client) get(ctx context.Context, url string) (*http.Response, error) {
return nil, fmt.Errorf("client.Do: %w", err)
}
if resp.StatusCode != http.StatusOK {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("got non-ok HTTP status: %d", resp.StatusCode)
}
return resp, nil
}

7
internal/tw/tw.go Normal file
View File

@ -0,0 +1,7 @@
package tw
import "go.opentelemetry.io/otel"
var (
tracer = otel.Tracer("gitea.dwysokinski.me/twhelp/core/internal/tw")
)

View File

@ -30,6 +30,8 @@ spec:
value: "10"
- name: DB_MAX_IDLE_CONNECTIONS
value: "5"
- name: OTEL_ENABLED
value: "false"
livenessProbe:
httpGet:
path: /health

View File

@ -32,6 +32,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "false"
livenessProbe:
exec:
command: [ "cat", "/tmp/healthy" ]

View File

@ -33,6 +33,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "false"
---
apiVersion: batch/v1
kind: CronJob
@ -70,3 +72,11 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "true" # TODO: otel should be disabled by default
- name: OTEL_TRACES_EXPORTER
value: "jaeger"
- name: OTEL_TRACES_SAMPLER_ARG
value: "1.0"
- name: OTEL_JAEGER_AGENT_HOST
value: "jaeger-all-in-one"

View File

@ -32,6 +32,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "false"
livenessProbe:
exec:
command: [ "cat", "/tmp/healthy" ]

View File

@ -32,6 +32,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "true"
livenessProbe:
exec:
command: [ "cat", "/tmp/healthy" ]

View File

@ -32,6 +32,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "false"
livenessProbe:
exec:
command: [ "cat", "/tmp/healthy" ]

View File

@ -32,6 +32,8 @@ spec:
secretKeyRef:
name: twhelp-secret
key: amqp-uri
- name: OTEL_ENABLED
value: "false"
livenessProbe:
exec:
command: [ "cat", "/tmp/healthy" ]