From e45d63e50d6de547c3901f113561331eafa8be25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Sat, 16 Dec 2023 08:03:02 +0000 Subject: [PATCH] feat: set up rabbitmq connection (#2) Reviewed-on: https://gitea.dwysokinski.me/twhelp/corev3/pulls/2 --- Makefile | 4 ++ cmd/twhelp/amqp.go | 80 ++++++++++++++++++++++++++++++ cmd/twhelp/app.go | 2 +- cmd/twhelp/cmd_job.go | 47 ++++++++++++++++++ cmd/twhelp/utils.go | 19 +++++++ go.mod | 11 +++- go.sum | 32 +++++++++++- k8s/base/jobs.yml | 43 ++++++++++++++++ k8s/overlays/dev/jobs.yml | 6 +++ k8s/overlays/dev/kustomization.yml | 2 + k8s/overlays/dev/secret.yml | 2 +- 11 files changed, 244 insertions(+), 4 deletions(-) create mode 100644 cmd/twhelp/amqp.go create mode 100644 cmd/twhelp/cmd_job.go create mode 100644 cmd/twhelp/utils.go create mode 100644 k8s/overlays/dev/jobs.yml diff --git a/Makefile b/Makefile index 8cff7f8..80769cd 100644 --- a/Makefile +++ b/Makefile @@ -28,3 +28,7 @@ install: install-tools install-git-hooks generate: @echo "Running go generate..." go generate ./... + +.PHONY: create-job-sync-data +create-job-sync-data: + kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)" diff --git a/cmd/twhelp/amqp.go b/cmd/twhelp/amqp.go new file mode 100644 index 0000000..6ca3a20 --- /dev/null +++ b/cmd/twhelp/amqp.go @@ -0,0 +1,80 @@ +package main + +import ( + "log/slog" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" + "github.com/urfave/cli/v2" +) + +var ( + rmqFlagConnectionString = &cli.StringFlag{ + Name: "rabbitmq.connectionString", + Required: true, + EnvVars: []string{"RABBITMQ_CONNECTION_STRING"}, + Usage: "https://www.rabbitmq.com/uri-spec.html", + } + rmqFlags = []cli.Flag{ + rmqFlagConnectionString, + } +) + +func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { + return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger) +} + +func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) { + return amqp.NewConnection(amqp.ConnectionConfig{ + AmqpURI: connString, + }, logger) +} + +// we will need this function later +// +//nolint:unused +func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) { + return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn) +} + +// we will need this function later +// +//nolint:unused +func newAMQPSubscriber( + conn *amqp.ConnectionWrapper, + logger watermill.LoggerAdapter, + queueNameSuffix string, +) (*amqp.Subscriber, error) { + return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn) +} + +// we will need this function later +// +//nolint:unused +func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config { + cfg := amqp.NewDurablePubSubConfig("", generateQueueName) + cfg.Consume.NoRequeueOnNack = true + cfg.Consume.Qos.PrefetchCount = 1 + cfg.Exchange.GenerateName = generateExchangeName + return cfg +} + +// we will need this function later +// +//nolint:unused +func generateExchangeName(topic string) string { + return "twhelp_" + topic +} + +// we will need this function later +// +//nolint:unused +func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator { + return func(topic string) string { + return generateExchangeName(topic) + "_" + suffix + } +} + +func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter { + return watermill.NewSlogLogger(logger) +} diff --git a/cmd/twhelp/app.go b/cmd/twhelp/app.go index c386c38..c3c9330 100644 --- a/cmd/twhelp/app.go +++ b/cmd/twhelp/app.go @@ -37,7 +37,7 @@ func newApp(name, version string) *appWrapper { app.Name = name app.HelpName = name app.Version = version - app.Commands = []*cli.Command{cmdDB} + app.Commands = []*cli.Command{cmdDB, cmdJob} app.Flags = appFlags app.Before = app.handleBefore return app diff --git a/cmd/twhelp/cmd_job.go b/cmd/twhelp/cmd_job.go new file mode 100644 index 0000000..5c22789 --- /dev/null +++ b/cmd/twhelp/cmd_job.go @@ -0,0 +1,47 @@ +package main + +import ( + "log/slog" + + "github.com/urfave/cli/v2" +) + +var ( + cmdJob = &cli.Command{ + Name: "job", + Usage: "Run various jobs (e.g. data sync)", + Subcommands: []*cli.Command{ + { + Name: "sync", + Usage: "Trigger various data syncs", + Subcommands: []*cli.Command{ + { + Name: "data", + Usage: "Trigger data sync (servers, players, tribes, villages)", + Flags: concatSlices(dbFlags, rmqFlags), + Action: func(c *cli.Context) error { + logger := loggerFromCtx(c.Context) + watermillLogger := newWatermillLogger(logger) + + amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := amqpConn.Close(); closeErr != nil { + logger.Warn("couldn't close amqp connection", slog.Any("error", err)) + } + }() + + <-amqpConn.Connected() + + logger.Info("data sync triggered") + + return nil + }, + }, + }, + }, + }, + } +) diff --git a/cmd/twhelp/utils.go b/cmd/twhelp/utils.go new file mode 100644 index 0000000..2c92173 --- /dev/null +++ b/cmd/twhelp/utils.go @@ -0,0 +1,19 @@ +package main + +func concatSlices[T any](slices ...[]T) []T { + var totalLen int + + for _, s := range slices { + totalLen += len(s) + } + + result := make([]T, totalLen) + + var i int + + for _, s := range slices { + i += copy(result[i:], s) + } + + return result +} diff --git a/go.mod b/go.mod index 9a96413..be84127 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module gitea.dwysokinski.me/twhelp/corev3 go 1.21 require ( + github.com/ThreeDotsLabs/watermill v1.3.5 + github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 github.com/elliotchance/phpserialize v1.3.3 github.com/go-chi/chi/v5 v5.0.10 github.com/google/go-cmp v0.6.0 @@ -14,11 +16,18 @@ require ( ) require ( + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect diff --git a/go.sum b/go.sum index 60d3770..4c7c001 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,11 @@ +github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= +github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= +github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 h1:OxvMB2/3YtcQuC7quC+CGmFpGz9oaxP2ef5wkp+R2oM= +github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1/go.mod h1:MCNoh0HUg4w0bY64on9BnhUodHeimz8+vMfXrzyuWN8= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -10,18 +15,40 @@ github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= @@ -40,11 +67,14 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/k8s/base/jobs.yml b/k8s/base/jobs.yml index 03f1178..75ba4f6 100644 --- a/k8s/base/jobs.yml +++ b/k8s/base/jobs.yml @@ -1,3 +1,46 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-sync-data +spec: + schedule: 0 * * * * + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + concurrencyPolicy: Forbid + jobTemplate: + spec: + parallelism: 1 + template: + spec: + restartPolicy: Never + containers: + - name: twhelp-job-sync-data + image: twhelp + args: [job, sync, data] + env: + - name: APP_MODE + value: development + - name: DB_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: db-connection-string + - name: DB_MAX_OPEN_CONNS + value: "1" + - name: DB_MAX_IDLE_CONNS + value: "1" + - name: RABBITMQ_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: twhelp-secret + key: rabbitmq-connection-string + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 100m + memory: 128Mi --- apiVersion: batch/v1 kind: Job diff --git a/k8s/overlays/dev/jobs.yml b/k8s/overlays/dev/jobs.yml new file mode 100644 index 0000000..695028a --- /dev/null +++ b/k8s/overlays/dev/jobs.yml @@ -0,0 +1,6 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: twhelp-job-sync-data +spec: + schedule: 0 0 * * * diff --git a/k8s/overlays/dev/kustomization.yml b/k8s/overlays/dev/kustomization.yml index d0601d7..f91b50e 100644 --- a/k8s/overlays/dev/kustomization.yml +++ b/k8s/overlays/dev/kustomization.yml @@ -4,3 +4,5 @@ nameSuffix: -dev resources: - secret.yml - ../../base +patchesStrategicMerge: + - jobs.yml diff --git a/k8s/overlays/dev/secret.yml b/k8s/overlays/dev/secret.yml index 650ea0d..cb3dcfe 100644 --- a/k8s/overlays/dev/secret.yml +++ b/k8s/overlays/dev/secret.yml @@ -7,4 +7,4 @@ data: # postgres://twhelp:twhelp@twhelpdb-postgresql:5432/twhelp?sslmode=disable db-connection-string: cG9zdGdyZXM6Ly90d2hlbHA6dHdoZWxwQHR3aGVscGRiLXBvc3RncmVzcWw6NTQzMi90d2hlbHA/c3NsbW9kZT1kaXNhYmxl # amqp://twhelp:twhelp@twhelprmq-rabbitmq:5672/ - rabbit-connection-string: YW1xcDovL3R3aGVscDp0d2hlbHBAdHdoZWxwcm1xLXJhYmJpdG1xOjU2NzIv + rabbitmq-connection-string: YW1xcDovL3R3aGVscDp0d2hlbHBAdHdoZWxwcm1xLXJhYmJpdG1xOjU2NzIv