feat: set up rabbitmq connection (#2)

Reviewed-on: twhelp/corev3#2
This commit is contained in:
Dawid Wysokiński 2023-12-16 08:03:02 +00:00
parent e85191d770
commit e45d63e50d
11 changed files with 244 additions and 4 deletions

View File

@ -28,3 +28,7 @@ install: install-tools install-git-hooks
generate:
@echo "Running go generate..."
go generate ./...
.PHONY: create-job-sync-data
create-job-sync-data:
kubectl create job --from=cronjob/twhelp-job-sync-data-dev "twhelp-job-sync-data-$(shell openssl rand -hex 10)"

80
cmd/twhelp/amqp.go Normal file
View File

@ -0,0 +1,80 @@
package main
import (
"log/slog"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/urfave/cli/v2"
)
var (
rmqFlagConnectionString = &cli.StringFlag{
Name: "rabbitmq.connectionString",
Required: true,
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
Usage: "https://www.rabbitmq.com/uri-spec.html",
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
}
)
func newAMQPConnectionFromFlags(c *cli.Context, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
return newAMQPConnection(c.String(rmqFlagConnectionString.Name), logger)
}
func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp.ConnectionWrapper, error) {
return amqp.NewConnection(amqp.ConnectionConfig{
AmqpURI: connString,
}, logger)
}
// we will need this function later
//
//nolint:unused
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
}
// we will need this function later
//
//nolint:unused
func newAMQPSubscriber(
conn *amqp.ConnectionWrapper,
logger watermill.LoggerAdapter,
queueNameSuffix string,
) (*amqp.Subscriber, error) {
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
}
// we will need this function later
//
//nolint:unused
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
cfg.Consume.NoRequeueOnNack = true
cfg.Consume.Qos.PrefetchCount = 1
cfg.Exchange.GenerateName = generateExchangeName
return cfg
}
// we will need this function later
//
//nolint:unused
func generateExchangeName(topic string) string {
return "twhelp_" + topic
}
// we will need this function later
//
//nolint:unused
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
return func(topic string) string {
return generateExchangeName(topic) + "_" + suffix
}
}
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
return watermill.NewSlogLogger(logger)
}

View File

@ -37,7 +37,7 @@ func newApp(name, version string) *appWrapper {
app.Name = name
app.HelpName = name
app.Version = version
app.Commands = []*cli.Command{cmdDB}
app.Commands = []*cli.Command{cmdDB, cmdJob}
app.Flags = appFlags
app.Before = app.handleBefore
return app

47
cmd/twhelp/cmd_job.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"log/slog"
"github.com/urfave/cli/v2"
)
var (
cmdJob = &cli.Command{
Name: "job",
Usage: "Run various jobs (e.g. data sync)",
Subcommands: []*cli.Command{
{
Name: "sync",
Usage: "Trigger various data syncs",
Subcommands: []*cli.Command{
{
Name: "data",
Usage: "Trigger data sync (servers, players, tribes, villages)",
Flags: concatSlices(dbFlags, rmqFlags),
Action: func(c *cli.Context) error {
logger := loggerFromCtx(c.Context)
watermillLogger := newWatermillLogger(logger)
amqpConn, err := newAMQPConnectionFromFlags(c, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := amqpConn.Close(); closeErr != nil {
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
}
}()
<-amqpConn.Connected()
logger.Info("data sync triggered")
return nil
},
},
},
},
},
}
)

19
cmd/twhelp/utils.go Normal file
View File

@ -0,0 +1,19 @@
package main
func concatSlices[T any](slices ...[]T) []T {
var totalLen int
for _, s := range slices {
totalLen += len(s)
}
result := make([]T, totalLen)
var i int
for _, s := range slices {
i += copy(result[i:], s)
}
return result
}

11
go.mod
View File

@ -3,6 +3,8 @@ module gitea.dwysokinski.me/twhelp/corev3
go 1.21
require (
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1
github.com/elliotchance/phpserialize v1.3.3
github.com/go-chi/chi/v5 v5.0.10
github.com/google/go-cmp v0.6.0
@ -14,11 +16,18 @@ require (
)
require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect

32
go.sum
View File

@ -1,6 +1,11 @@
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1 h1:OxvMB2/3YtcQuC7quC+CGmFpGz9oaxP2ef5wkp+R2oM=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.1/go.mod h1:MCNoh0HUg4w0bY64on9BnhUodHeimz8+vMfXrzyuWN8=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -10,18 +15,40 @@ github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo=
@ -40,11 +67,14 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,3 +1,46 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: twhelp-job-sync-data
spec:
schedule: 0 * * * *
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
concurrencyPolicy: Forbid
jobTemplate:
spec:
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: twhelp-job-sync-data
image: twhelp
args: [job, sync, data]
env:
- name: APP_MODE
value: development
- name: DB_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: db-connection-string
- name: DB_MAX_OPEN_CONNS
value: "1"
- name: DB_MAX_IDLE_CONNS
value: "1"
- name: RABBITMQ_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: twhelp-secret
key: rabbitmq-connection-string
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 100m
memory: 128Mi
---
apiVersion: batch/v1
kind: Job

View File

@ -0,0 +1,6 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: twhelp-job-sync-data
spec:
schedule: 0 0 * * *

View File

@ -4,3 +4,5 @@ nameSuffix: -dev
resources:
- secret.yml
- ../../base
patchesStrategicMerge:
- jobs.yml

View File

@ -7,4 +7,4 @@ data:
# postgres://twhelp:twhelp@twhelpdb-postgresql:5432/twhelp?sslmode=disable
db-connection-string: cG9zdGdyZXM6Ly90d2hlbHA6dHdoZWxwQHR3aGVscGRiLXBvc3RncmVzcWw6NTQzMi90d2hlbHA/c3NsbW9kZT1kaXNhYmxl
# amqp://twhelp:twhelp@twhelprmq-rabbitmq:5672/
rabbit-connection-string: YW1xcDovL3R3aGVscDp0d2hlbHBAdHdoZWxwcm1xLXJhYmJpdG1xOjU2NzIv
rabbitmq-connection-string: YW1xcDovL3R3aGVscDp0d2hlbHBAdHdoZWxwcm1xLXJhYmJpdG1xOjU2NzIv