parent
4f0e5ff8d5
commit
918ce743e4
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||||
"github.com/ThreeDotsLabs/watermill"
|
"github.com/ThreeDotsLabs/watermill"
|
||||||
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
@ -15,8 +16,14 @@ var (
|
||||||
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
|
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
|
||||||
Usage: "https://www.rabbitmq.com/uri-spec.html",
|
Usage: "https://www.rabbitmq.com/uri-spec.html",
|
||||||
}
|
}
|
||||||
|
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.syncServersCmd",
|
||||||
|
Value: "servers.cmd.sync",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
||||||
|
}
|
||||||
rmqFlags = []cli.Flag{
|
rmqFlags = []cli.Flag{
|
||||||
rmqFlagConnectionString,
|
rmqFlagConnectionString,
|
||||||
|
rmqFlagTopicSyncServersCmd,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -78,3 +85,7 @@ func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
||||||
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
|
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
|
||||||
return watermill.NewSlogLogger(logger)
|
return watermill.NewSlogLogger(logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newWatermillMarshaler() watermillmsg.Marshaler {
|
||||||
|
return watermillmsg.JSONMarshaler{}
|
||||||
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,13 +31,48 @@ var (
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// watermill has its own logs, so we don't need to add any debug logs here
|
|
||||||
if closeErr := amqpConn.Close(); closeErr != nil {
|
if closeErr := amqpConn.Close(); closeErr != nil {
|
||||||
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-amqpConn.Connected()
|
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closeErr := publisher.Close(); closeErr != nil {
|
||||||
|
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
bunDB, err := newBunDBFromFlags(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
|
||||||
|
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
|
||||||
|
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
|
||||||
|
} else {
|
||||||
|
logger.Debug("db connections closed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
serverPublisher := adapter.NewServerWatermillPublisher(
|
||||||
|
publisher,
|
||||||
|
newWatermillMarshaler(),
|
||||||
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
||||||
|
)
|
||||||
|
|
||||||
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||||
|
|
||||||
|
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil {
|
||||||
|
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
logger.Info("data sync triggered")
|
logger.Info("data sync triggered")
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
package adapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerWatermillPublisher struct {
|
||||||
|
marshaler watermillmsg.Marshaler
|
||||||
|
publisher message.Publisher
|
||||||
|
cmdSyncTopic string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerWatermillPublisher(
|
||||||
|
publisher message.Publisher,
|
||||||
|
marshaler watermillmsg.Marshaler,
|
||||||
|
cmdSyncTopic string,
|
||||||
|
) *ServerWatermillPublisher {
|
||||||
|
return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
URL: p.URL().String(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerPublisher interface {
|
||||||
|
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type DataSyncService struct {
|
||||||
|
versionSvc *VersionService
|
||||||
|
serverPub ServerPublisher
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService {
|
||||||
|
return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||||
|
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads := make([]domain.SyncServersCmdPayload, 0, len(versions))
|
||||||
|
|
||||||
|
for _, v := range versions {
|
||||||
|
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
|
||||||
|
if payloadErr != nil {
|
||||||
|
return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads = append(payloads, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish commands: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
|
||||||
|
panic("")
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncServersCmdPayload struct {
|
||||||
|
versionCode string
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncServersCmdPayload(versionCode string, u *url.URL) (SyncServersCmdPayload, error) {
|
||||||
|
if versionCode == "" {
|
||||||
|
return SyncServersCmdPayload{}, errors.New("version code can't be blank")
|
||||||
|
}
|
||||||
|
|
||||||
|
if u == nil {
|
||||||
|
return SyncServersCmdPayload{}, errors.New("url can't be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
return SyncServersCmdPayload{versionCode: versionCode, url: u}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncServersCmdPayloadWithStringURL(versionCode string, rawURL string) (SyncServersCmdPayload, error) {
|
||||||
|
u, err := parseURL(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return SyncServersCmdPayload{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewSyncServersCmdPayload(versionCode, u)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SyncServersCmdPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p SyncServersCmdPayload) URL() *url.URL {
|
||||||
|
return p.url
|
||||||
|
}
|
|
@ -0,0 +1,101 @@
|
||||||
|
package domain_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewSyncServersCmdPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
type args struct {
|
||||||
|
versionCode string
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "OK",
|
||||||
|
args: args{
|
||||||
|
versionCode: "pl",
|
||||||
|
url: &url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: "plemiona.pl",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
tt := tt
|
||||||
|
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url)
|
||||||
|
require.ErrorIs(t, err, tt.expectedErr)
|
||||||
|
if tt.expectedErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
|
||||||
|
assert.Equal(t, tt.args.url, payload.URL())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
type args struct {
|
||||||
|
versionCode string
|
||||||
|
url string
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "OK",
|
||||||
|
args: args{
|
||||||
|
versionCode: "pl",
|
||||||
|
url: "https://plemiona.pl",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ERR: invalid url",
|
||||||
|
args: args{
|
||||||
|
versionCode: "pl",
|
||||||
|
url: "plemiona.pl",
|
||||||
|
},
|
||||||
|
expectedErr: domain.InvalidURLError{
|
||||||
|
URL: "plemiona.pl",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
tt := tt
|
||||||
|
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
payload, err := domain.NewSyncServersCmdPayloadWithStringURL(tt.args.versionCode, tt.args.url)
|
||||||
|
require.ErrorIs(t, err, tt.expectedErr)
|
||||||
|
if tt.expectedErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
|
||||||
|
assert.Equal(t, tt.args.url, payload.URL().String())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type InvalidURLError struct {
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e InvalidURLError) Error() string {
|
||||||
|
return fmt.Sprintf("'%s': invalid URL", e.URL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseURL(rawURL string) (*url.URL, error) {
|
||||||
|
u, err := url.ParseRequestURI(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, InvalidURLError{
|
||||||
|
URL: rawURL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return u, nil
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Marshaler interface {
|
||||||
|
Marshal(ctx context.Context, v any) (*message.Message, error)
|
||||||
|
Unmarshal(msg *message.Message, v any) error
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/ThreeDotsLabs/watermill"
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JSONMarshaler struct {
|
||||||
|
NewUUID func() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m JSONMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) {
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := message.NewMessage(m.newUUID(), b)
|
||||||
|
msg.SetContext(ctx)
|
||||||
|
msg.Metadata.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m JSONMarshaler) newUUID() string {
|
||||||
|
if m.NewUUID != nil {
|
||||||
|
return m.NewUUID()
|
||||||
|
}
|
||||||
|
return watermill.NewUUID()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m JSONMarshaler) Unmarshal(msg *message.Message, v any) error {
|
||||||
|
return json.Unmarshal(msg.Payload, v)
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package watermillmsg_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||||
|
"github.com/ThreeDotsLabs/watermill"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testPayload struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Num int `json:"num"`
|
||||||
|
When time.Time `json:"when"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJSONMarshaler(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
payloadToMarshal := testPayload{
|
||||||
|
ID: watermill.NewUUID(),
|
||||||
|
Num: 123,
|
||||||
|
When: time.Date(2022, time.July, 30, 14, 13, 12, 0, time.UTC),
|
||||||
|
}
|
||||||
|
|
||||||
|
marshaler := watermillmsg.JSONMarshaler{NewUUID: uuid.NewString}
|
||||||
|
|
||||||
|
m, err := marshaler.Marshal(context.Background(), payloadToMarshal)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var payloadToUnmarshal testPayload
|
||||||
|
require.NoError(t, marshaler.Unmarshal(m, &payloadToUnmarshal))
|
||||||
|
|
||||||
|
assert.Equal(t, payloadToMarshal, payloadToUnmarshal)
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
type SyncServersCmdPayload struct {
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
URL string `json:"url"`
|
||||||
|
}
|
Loading…
Reference in New Issue