feat: implement sync data job (#5)

Reviewed-on: twhelp/corev3#5
This commit is contained in:
Dawid Wysokiński 2023-12-21 06:28:53 +00:00
parent 4f0e5ff8d5
commit 918ce743e4
11 changed files with 406 additions and 2 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"log/slog"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/urfave/cli/v2"
@ -15,8 +16,14 @@ var (
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
Usage: "https://www.rabbitmq.com/uri-spec.html",
}
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
Name: "rabbitmq.topic.syncServersCmd",
Value: "servers.cmd.sync",
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd,
}
)
@ -78,3 +85,7 @@ func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
return watermill.NewSlogLogger(logger)
}
func newWatermillMarshaler() watermillmsg.Marshaler {
return watermillmsg.JSONMarshaler{}
}

View File

@ -1,8 +1,11 @@
package main
import (
"fmt"
"log/slog"
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"github.com/urfave/cli/v2"
)
@ -28,13 +31,48 @@ var (
return err
}
defer func() {
// watermill has its own logs, so we don't need to add any debug logs here
if closeErr := amqpConn.Close(); closeErr != nil {
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
}
}()
<-amqpConn.Connected()
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
if err != nil {
return err
}
defer func() {
if closeErr := publisher.Close(); closeErr != nil {
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
}
}()
bunDB, err := newBunDBFromFlags(c)
if err != nil {
return err
}
defer func() {
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
} else {
logger.Debug("db connections closed")
}
}()
serverPublisher := adapter.NewServerWatermillPublisher(
publisher,
newWatermillMarshaler(),
c.String(rmqFlagTopicSyncServersCmd.Name),
)
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
defer stop()
if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil {
return fmt.Errorf("couldn't trigger data sync: %w", err)
}
logger.Info("data sync triggered")

View File

@ -0,0 +1,46 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type ServerWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
cmdSyncTopic string
}
func NewServerWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
cmdSyncTopic string,
) *ServerWatermillPublisher {
return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic}
}
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{
VersionCode: p.VersionCode(),
URL: p.URL().String(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
}
return nil
}

View File

@ -0,0 +1,49 @@
package app
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
)
type ServerPublisher interface {
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
}
type DataSyncService struct {
versionSvc *VersionService
serverPub ServerPublisher
}
func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService {
return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub}
}
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
if err != nil {
return err
}
payloads := make([]domain.SyncServersCmdPayload, 0, len(versions))
for _, v := range versions {
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
if payloadErr != nil {
return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr)
}
payloads = append(payloads, p)
}
if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil {
return fmt.Errorf("couldn't publish commands: %w", err)
}
return err
}
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
panic("")
}

40
internal/domain/server.go Normal file
View File

@ -0,0 +1,40 @@
package domain
import (
"errors"
"net/url"
)
type SyncServersCmdPayload struct {
versionCode string
url *url.URL
}
func NewSyncServersCmdPayload(versionCode string, u *url.URL) (SyncServersCmdPayload, error) {
if versionCode == "" {
return SyncServersCmdPayload{}, errors.New("version code can't be blank")
}
if u == nil {
return SyncServersCmdPayload{}, errors.New("url can't be nil")
}
return SyncServersCmdPayload{versionCode: versionCode, url: u}, nil
}
func NewSyncServersCmdPayloadWithStringURL(versionCode string, rawURL string) (SyncServersCmdPayload, error) {
u, err := parseURL(rawURL)
if err != nil {
return SyncServersCmdPayload{}, err
}
return NewSyncServersCmdPayload(versionCode, u)
}
func (p SyncServersCmdPayload) VersionCode() string {
return p.versionCode
}
func (p SyncServersCmdPayload) URL() *url.URL {
return p.url
}

View File

@ -0,0 +1,101 @@
package domain_test
import (
"net/url"
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewSyncServersCmdPayload(t *testing.T) {
t.Parallel()
type args struct {
versionCode string
url *url.URL
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
versionCode: "pl",
url: &url.URL{
Scheme: "https",
Host: "plemiona.pl",
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url)
require.ErrorIs(t, err, tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
assert.Equal(t, tt.args.url, payload.URL())
})
}
}
func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) {
t.Parallel()
type args struct {
versionCode string
url string
}
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
versionCode: "pl",
url: "https://plemiona.pl",
},
},
{
name: "ERR: invalid url",
args: args{
versionCode: "pl",
url: "plemiona.pl",
},
expectedErr: domain.InvalidURLError{
URL: "plemiona.pl",
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
payload, err := domain.NewSyncServersCmdPayloadWithStringURL(tt.args.versionCode, tt.args.url)
require.ErrorIs(t, err, tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
assert.Equal(t, tt.args.url, payload.URL().String())
})
}
}

25
internal/domain/utils.go Normal file
View File

@ -0,0 +1,25 @@
package domain
import (
"fmt"
"net/url"
)
type InvalidURLError struct {
URL string
}
func (e InvalidURLError) Error() string {
return fmt.Sprintf("'%s': invalid URL", e.URL)
}
func parseURL(rawURL string) (*url.URL, error) {
u, err := url.ParseRequestURI(rawURL)
if err != nil {
return nil, InvalidURLError{
URL: rawURL,
}
}
return u, nil
}

View File

@ -0,0 +1,12 @@
package watermillmsg
import (
"context"
"github.com/ThreeDotsLabs/watermill/message"
)
type Marshaler interface {
Marshal(ctx context.Context, v any) (*message.Message, error)
Unmarshal(msg *message.Message, v any) error
}

View File

@ -0,0 +1,37 @@
package watermillmsg
import (
"context"
"encoding/json"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type JSONMarshaler struct {
NewUUID func() string
}
func (m JSONMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
msg := message.NewMessage(m.newUUID(), b)
msg.SetContext(ctx)
msg.Metadata.Set("Content-Type", "application/json")
return msg, nil
}
func (m JSONMarshaler) newUUID() string {
if m.NewUUID != nil {
return m.NewUUID()
}
return watermill.NewUUID()
}
func (m JSONMarshaler) Unmarshal(msg *message.Message, v any) error {
return json.Unmarshal(msg.Payload, v)
}

View File

@ -0,0 +1,39 @@
package watermillmsg_test
import (
"context"
"testing"
"time"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testPayload struct {
ID string `json:"id"`
Num int `json:"num"`
When time.Time `json:"when"`
}
func TestJSONMarshaler(t *testing.T) {
t.Parallel()
payloadToMarshal := testPayload{
ID: watermill.NewUUID(),
Num: 123,
When: time.Date(2022, time.July, 30, 14, 13, 12, 0, time.UTC),
}
marshaler := watermillmsg.JSONMarshaler{NewUUID: uuid.NewString}
m, err := marshaler.Marshal(context.Background(), payloadToMarshal)
require.NoError(t, err)
var payloadToUnmarshal testPayload
require.NoError(t, marshaler.Unmarshal(m, &payloadToUnmarshal))
assert.Equal(t, payloadToMarshal, payloadToUnmarshal)
}

View File

@ -0,0 +1,6 @@
package watermillmsg
type SyncServersCmdPayload struct {
VersionCode string `json:"versionCode"`
URL string `json:"url"`
}