From 918ce743e49e2dec076bb4d67f2e5bb80151bbb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20Wysoki=C5=84ski?= Date: Thu, 21 Dec 2023 06:28:53 +0000 Subject: [PATCH] feat: implement sync data job (#5) Reviewed-on: https://gitea.dwysokinski.me/twhelp/corev3/pulls/5 --- cmd/twhelp/amqp.go | 11 ++ cmd/twhelp/cmd_job.go | 42 +++++++- .../adapter/publisher_watermill_server.go | 46 ++++++++ internal/app/service_data_sync.go | 49 +++++++++ internal/domain/server.go | 40 +++++++ internal/domain/server_test.go | 101 ++++++++++++++++++ internal/domain/utils.go | 25 +++++ internal/watermillmsg/marshaler.go | 12 +++ internal/watermillmsg/marshaler_json.go | 37 +++++++ internal/watermillmsg/marshaler_json_test.go | 39 +++++++ internal/watermillmsg/server.go | 6 ++ 11 files changed, 406 insertions(+), 2 deletions(-) create mode 100644 internal/adapter/publisher_watermill_server.go create mode 100644 internal/app/service_data_sync.go create mode 100644 internal/domain/server.go create mode 100644 internal/domain/server_test.go create mode 100644 internal/domain/utils.go create mode 100644 internal/watermillmsg/marshaler.go create mode 100644 internal/watermillmsg/marshaler_json.go create mode 100644 internal/watermillmsg/marshaler_json_test.go create mode 100644 internal/watermillmsg/server.go diff --git a/cmd/twhelp/amqp.go b/cmd/twhelp/amqp.go index 6ca3a20..e10a8a3 100644 --- a/cmd/twhelp/amqp.go +++ b/cmd/twhelp/amqp.go @@ -3,6 +3,7 @@ package main import ( "log/slog" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/urfave/cli/v2" @@ -15,8 +16,14 @@ var ( EnvVars: []string{"RABBITMQ_CONNECTION_STRING"}, Usage: "https://www.rabbitmq.com/uri-spec.html", } + rmqFlagTopicSyncServersCmd = &cli.StringFlag{ + Name: "rabbitmq.topic.syncServersCmd", + Value: "servers.cmd.sync", + EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, + rmqFlagTopicSyncServersCmd, } ) @@ -78,3 +85,7 @@ func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator { func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter { return watermill.NewSlogLogger(logger) } + +func newWatermillMarshaler() watermillmsg.Marshaler { + return watermillmsg.JSONMarshaler{} +} diff --git a/cmd/twhelp/cmd_job.go b/cmd/twhelp/cmd_job.go index 8668240..1a44b2d 100644 --- a/cmd/twhelp/cmd_job.go +++ b/cmd/twhelp/cmd_job.go @@ -1,8 +1,11 @@ package main import ( + "fmt" "log/slog" + "gitea.dwysokinski.me/twhelp/corev3/internal/adapter" + "gitea.dwysokinski.me/twhelp/corev3/internal/app" "github.com/urfave/cli/v2" ) @@ -28,13 +31,48 @@ var ( return err } defer func() { - // watermill has its own logs, so we don't need to add any debug logs here if closeErr := amqpConn.Close(); closeErr != nil { logger.Warn("couldn't close amqp connection", slog.Any("error", err)) } }() - <-amqpConn.Connected() + publisher, err := newAMQPPublisher(amqpConn, watermillLogger) + if err != nil { + return err + } + defer func() { + if closeErr := publisher.Close(); closeErr != nil { + logger.Warn("couldn't close amqp publisher", slog.Any("error", err)) + } + }() + + bunDB, err := newBunDBFromFlags(c) + if err != nil { + return err + } + defer func() { + logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections)) + if dbCloseErr := bunDB.Close(); dbCloseErr != nil { + logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr)) + } else { + logger.Debug("db connections closed") + } + }() + + serverPublisher := adapter.NewServerWatermillPublisher( + publisher, + newWatermillMarshaler(), + c.String(rmqFlagTopicSyncServersCmd.Name), + ) + + versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) + + shutdownSignalCtx, stop := newShutdownSignalContext(c.Context) + defer stop() + + if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil { + return fmt.Errorf("couldn't trigger data sync: %w", err) + } logger.Info("data sync triggered") diff --git a/internal/adapter/publisher_watermill_server.go b/internal/adapter/publisher_watermill_server.go new file mode 100644 index 0000000..e208613 --- /dev/null +++ b/internal/adapter/publisher_watermill_server.go @@ -0,0 +1,46 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type ServerWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + cmdSyncTopic string +} + +func NewServerWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + cmdSyncTopic string, +) *ServerWatermillPublisher { + return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic} +} + +func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{ + VersionCode: p.VersionCode(), + URL: p.URL().String(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err) + } + + return nil +} diff --git a/internal/app/service_data_sync.go b/internal/app/service_data_sync.go new file mode 100644 index 0000000..196b0b0 --- /dev/null +++ b/internal/app/service_data_sync.go @@ -0,0 +1,49 @@ +package app + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type ServerPublisher interface { + CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error +} + +type DataSyncService struct { + versionSvc *VersionService + serverPub ServerPublisher +} + +func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService { + return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub} +} + +func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { + versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams()) + if err != nil { + return err + } + + payloads := make([]domain.SyncServersCmdPayload, 0, len(versions)) + + for _, v := range versions { + p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL()) + if payloadErr != nil { + return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr) + } + + payloads = append(payloads, p) + } + + if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil { + return fmt.Errorf("couldn't publish commands: %w", err) + } + + return err +} + +func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error { + panic("") +} diff --git a/internal/domain/server.go b/internal/domain/server.go new file mode 100644 index 0000000..4a01076 --- /dev/null +++ b/internal/domain/server.go @@ -0,0 +1,40 @@ +package domain + +import ( + "errors" + "net/url" +) + +type SyncServersCmdPayload struct { + versionCode string + url *url.URL +} + +func NewSyncServersCmdPayload(versionCode string, u *url.URL) (SyncServersCmdPayload, error) { + if versionCode == "" { + return SyncServersCmdPayload{}, errors.New("version code can't be blank") + } + + if u == nil { + return SyncServersCmdPayload{}, errors.New("url can't be nil") + } + + return SyncServersCmdPayload{versionCode: versionCode, url: u}, nil +} + +func NewSyncServersCmdPayloadWithStringURL(versionCode string, rawURL string) (SyncServersCmdPayload, error) { + u, err := parseURL(rawURL) + if err != nil { + return SyncServersCmdPayload{}, err + } + + return NewSyncServersCmdPayload(versionCode, u) +} + +func (p SyncServersCmdPayload) VersionCode() string { + return p.versionCode +} + +func (p SyncServersCmdPayload) URL() *url.URL { + return p.url +} diff --git a/internal/domain/server_test.go b/internal/domain/server_test.go new file mode 100644 index 0000000..a45ed75 --- /dev/null +++ b/internal/domain/server_test.go @@ -0,0 +1,101 @@ +package domain_test + +import ( + "net/url" + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewSyncServersCmdPayload(t *testing.T) { + t.Parallel() + + type args struct { + versionCode string + url *url.URL + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + versionCode: "pl", + url: &url.URL{ + Scheme: "https", + Host: "plemiona.pl", + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url) + require.ErrorIs(t, err, tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.versionCode, payload.VersionCode()) + assert.Equal(t, tt.args.url, payload.URL()) + }) + } +} + +func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) { + t.Parallel() + + type args struct { + versionCode string + url string + } + + tests := []struct { + name string + args args + expectedErr error + }{ + { + name: "OK", + args: args{ + versionCode: "pl", + url: "https://plemiona.pl", + }, + }, + { + name: "ERR: invalid url", + args: args{ + versionCode: "pl", + url: "plemiona.pl", + }, + expectedErr: domain.InvalidURLError{ + URL: "plemiona.pl", + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + payload, err := domain.NewSyncServersCmdPayloadWithStringURL(tt.args.versionCode, tt.args.url) + require.ErrorIs(t, err, tt.expectedErr) + if tt.expectedErr != nil { + return + } + assert.Equal(t, tt.args.versionCode, payload.VersionCode()) + assert.Equal(t, tt.args.url, payload.URL().String()) + }) + } +} diff --git a/internal/domain/utils.go b/internal/domain/utils.go new file mode 100644 index 0000000..ec3f9a3 --- /dev/null +++ b/internal/domain/utils.go @@ -0,0 +1,25 @@ +package domain + +import ( + "fmt" + "net/url" +) + +type InvalidURLError struct { + URL string +} + +func (e InvalidURLError) Error() string { + return fmt.Sprintf("'%s': invalid URL", e.URL) +} + +func parseURL(rawURL string) (*url.URL, error) { + u, err := url.ParseRequestURI(rawURL) + if err != nil { + return nil, InvalidURLError{ + URL: rawURL, + } + } + + return u, nil +} diff --git a/internal/watermillmsg/marshaler.go b/internal/watermillmsg/marshaler.go new file mode 100644 index 0000000..6233ea4 --- /dev/null +++ b/internal/watermillmsg/marshaler.go @@ -0,0 +1,12 @@ +package watermillmsg + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" +) + +type Marshaler interface { + Marshal(ctx context.Context, v any) (*message.Message, error) + Unmarshal(msg *message.Message, v any) error +} diff --git a/internal/watermillmsg/marshaler_json.go b/internal/watermillmsg/marshaler_json.go new file mode 100644 index 0000000..444c4d0 --- /dev/null +++ b/internal/watermillmsg/marshaler_json.go @@ -0,0 +1,37 @@ +package watermillmsg + +import ( + "context" + "encoding/json" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type JSONMarshaler struct { + NewUUID func() string +} + +func (m JSONMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + + msg := message.NewMessage(m.newUUID(), b) + msg.SetContext(ctx) + msg.Metadata.Set("Content-Type", "application/json") + + return msg, nil +} + +func (m JSONMarshaler) newUUID() string { + if m.NewUUID != nil { + return m.NewUUID() + } + return watermill.NewUUID() +} + +func (m JSONMarshaler) Unmarshal(msg *message.Message, v any) error { + return json.Unmarshal(msg.Payload, v) +} diff --git a/internal/watermillmsg/marshaler_json_test.go b/internal/watermillmsg/marshaler_json_test.go new file mode 100644 index 0000000..efeb432 --- /dev/null +++ b/internal/watermillmsg/marshaler_json_test.go @@ -0,0 +1,39 @@ +package watermillmsg_test + +import ( + "context" + "testing" + "time" + + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testPayload struct { + ID string `json:"id"` + Num int `json:"num"` + When time.Time `json:"when"` +} + +func TestJSONMarshaler(t *testing.T) { + t.Parallel() + + payloadToMarshal := testPayload{ + ID: watermill.NewUUID(), + Num: 123, + When: time.Date(2022, time.July, 30, 14, 13, 12, 0, time.UTC), + } + + marshaler := watermillmsg.JSONMarshaler{NewUUID: uuid.NewString} + + m, err := marshaler.Marshal(context.Background(), payloadToMarshal) + require.NoError(t, err) + + var payloadToUnmarshal testPayload + require.NoError(t, marshaler.Unmarshal(m, &payloadToUnmarshal)) + + assert.Equal(t, payloadToMarshal, payloadToUnmarshal) +} diff --git a/internal/watermillmsg/server.go b/internal/watermillmsg/server.go new file mode 100644 index 0000000..7dafec0 --- /dev/null +++ b/internal/watermillmsg/server.go @@ -0,0 +1,6 @@ +package watermillmsg + +type SyncServersCmdPayload struct { + VersionCode string `json:"versionCode"` + URL string `json:"url"` +}