parent
4f0e5ff8d5
commit
918ce743e4
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"log/slog"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
@ -15,8 +16,14 @@ var (
|
|||
EnvVars: []string{"RABBITMQ_CONNECTION_STRING"},
|
||||
Usage: "https://www.rabbitmq.com/uri-spec.html",
|
||||
}
|
||||
rmqFlagTopicSyncServersCmd = &cli.StringFlag{
|
||||
Name: "rabbitmq.topic.syncServersCmd",
|
||||
Value: "servers.cmd.sync",
|
||||
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
||||
}
|
||||
rmqFlags = []cli.Flag{
|
||||
rmqFlagConnectionString,
|
||||
rmqFlagTopicSyncServersCmd,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -78,3 +85,7 @@ func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
|||
func newWatermillLogger(logger *slog.Logger) watermill.LoggerAdapter {
|
||||
return watermill.NewSlogLogger(logger)
|
||||
}
|
||||
|
||||
func newWatermillMarshaler() watermillmsg.Marshaler {
|
||||
return watermillmsg.JSONMarshaler{}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/adapter"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
|
@ -28,13 +31,48 @@ var (
|
|||
return err
|
||||
}
|
||||
defer func() {
|
||||
// watermill has its own logs, so we don't need to add any debug logs here
|
||||
if closeErr := amqpConn.Close(); closeErr != nil {
|
||||
logger.Warn("couldn't close amqp connection", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
|
||||
<-amqpConn.Connected()
|
||||
publisher, err := newAMQPPublisher(amqpConn, watermillLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := publisher.Close(); closeErr != nil {
|
||||
logger.Warn("couldn't close amqp publisher", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
|
||||
bunDB, err := newBunDBFromFlags(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
logger.Debug("closing db connections...", slog.Int("db.openConnections", bunDB.Stats().OpenConnections))
|
||||
if dbCloseErr := bunDB.Close(); dbCloseErr != nil {
|
||||
logger.Warn("couldn't close db connections", slog.Any("error", dbCloseErr))
|
||||
} else {
|
||||
logger.Debug("db connections closed")
|
||||
}
|
||||
}()
|
||||
|
||||
serverPublisher := adapter.NewServerWatermillPublisher(
|
||||
publisher,
|
||||
newWatermillMarshaler(),
|
||||
c.String(rmqFlagTopicSyncServersCmd.Name),
|
||||
)
|
||||
|
||||
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||
|
||||
shutdownSignalCtx, stop := newShutdownSignalContext(c.Context)
|
||||
defer stop()
|
||||
|
||||
if err = app.NewDataSyncService(versionSvc, serverPublisher).TriggerDataSync(shutdownSignalCtx); err != nil {
|
||||
return fmt.Errorf("couldn't trigger data sync: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("data sync triggered")
|
||||
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
package adapter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
type ServerWatermillPublisher struct {
|
||||
marshaler watermillmsg.Marshaler
|
||||
publisher message.Publisher
|
||||
cmdSyncTopic string
|
||||
}
|
||||
|
||||
func NewServerWatermillPublisher(
|
||||
publisher message.Publisher,
|
||||
marshaler watermillmsg.Marshaler,
|
||||
cmdSyncTopic string,
|
||||
) *ServerWatermillPublisher {
|
||||
return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic}
|
||||
}
|
||||
|
||||
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
|
||||
msgs := make([]*message.Message, 0, len(payloads))
|
||||
|
||||
for _, p := range payloads {
|
||||
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{
|
||||
VersionCode: p.VersionCode(),
|
||||
URL: p.URL().String(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err)
|
||||
}
|
||||
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
|
||||
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
)
|
||||
|
||||
type ServerPublisher interface {
|
||||
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
||||
}
|
||||
|
||||
type DataSyncService struct {
|
||||
versionSvc *VersionService
|
||||
serverPub ServerPublisher
|
||||
}
|
||||
|
||||
func NewDataSyncService(versionSvc *VersionService, serverPub ServerPublisher) *DataSyncService {
|
||||
return &DataSyncService{versionSvc: versionSvc, serverPub: serverPub}
|
||||
}
|
||||
|
||||
func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||
versions, err := svc.versionSvc.List(ctx, domain.NewListVersionsParams())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
payloads := make([]domain.SyncServersCmdPayload, 0, len(versions))
|
||||
|
||||
for _, v := range versions {
|
||||
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
|
||||
if payloadErr != nil {
|
||||
return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr)
|
||||
}
|
||||
|
||||
payloads = append(payloads, p)
|
||||
}
|
||||
|
||||
if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil {
|
||||
return fmt.Errorf("couldn't publish commands: %w", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
|
||||
panic("")
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package domain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type SyncServersCmdPayload struct {
|
||||
versionCode string
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
func NewSyncServersCmdPayload(versionCode string, u *url.URL) (SyncServersCmdPayload, error) {
|
||||
if versionCode == "" {
|
||||
return SyncServersCmdPayload{}, errors.New("version code can't be blank")
|
||||
}
|
||||
|
||||
if u == nil {
|
||||
return SyncServersCmdPayload{}, errors.New("url can't be nil")
|
||||
}
|
||||
|
||||
return SyncServersCmdPayload{versionCode: versionCode, url: u}, nil
|
||||
}
|
||||
|
||||
func NewSyncServersCmdPayloadWithStringURL(versionCode string, rawURL string) (SyncServersCmdPayload, error) {
|
||||
u, err := parseURL(rawURL)
|
||||
if err != nil {
|
||||
return SyncServersCmdPayload{}, err
|
||||
}
|
||||
|
||||
return NewSyncServersCmdPayload(versionCode, u)
|
||||
}
|
||||
|
||||
func (p SyncServersCmdPayload) VersionCode() string {
|
||||
return p.versionCode
|
||||
}
|
||||
|
||||
func (p SyncServersCmdPayload) URL() *url.URL {
|
||||
return p.url
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
package domain_test
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewSyncServersCmdPayload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type args struct {
|
||||
versionCode string
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "OK",
|
||||
args: args{
|
||||
versionCode: "pl",
|
||||
url: &url.URL{
|
||||
Scheme: "https",
|
||||
Host: "plemiona.pl",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url)
|
||||
require.ErrorIs(t, err, tt.expectedErr)
|
||||
if tt.expectedErr != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
|
||||
assert.Equal(t, tt.args.url, payload.URL())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type args struct {
|
||||
versionCode string
|
||||
url string
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "OK",
|
||||
args: args{
|
||||
versionCode: "pl",
|
||||
url: "https://plemiona.pl",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ERR: invalid url",
|
||||
args: args{
|
||||
versionCode: "pl",
|
||||
url: "plemiona.pl",
|
||||
},
|
||||
expectedErr: domain.InvalidURLError{
|
||||
URL: "plemiona.pl",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
payload, err := domain.NewSyncServersCmdPayloadWithStringURL(tt.args.versionCode, tt.args.url)
|
||||
require.ErrorIs(t, err, tt.expectedErr)
|
||||
if tt.expectedErr != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
|
||||
assert.Equal(t, tt.args.url, payload.URL().String())
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package domain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type InvalidURLError struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e InvalidURLError) Error() string {
|
||||
return fmt.Sprintf("'%s': invalid URL", e.URL)
|
||||
}
|
||||
|
||||
func parseURL(rawURL string) (*url.URL, error) {
|
||||
u, err := url.ParseRequestURI(rawURL)
|
||||
if err != nil {
|
||||
return nil, InvalidURLError{
|
||||
URL: rawURL,
|
||||
}
|
||||
}
|
||||
|
||||
return u, nil
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package watermillmsg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
type Marshaler interface {
|
||||
Marshal(ctx context.Context, v any) (*message.Message, error)
|
||||
Unmarshal(msg *message.Message, v any) error
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package watermillmsg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
)
|
||||
|
||||
type JSONMarshaler struct {
|
||||
NewUUID func() string
|
||||
}
|
||||
|
||||
func (m JSONMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg := message.NewMessage(m.newUUID(), b)
|
||||
msg.SetContext(ctx)
|
||||
msg.Metadata.Set("Content-Type", "application/json")
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
func (m JSONMarshaler) newUUID() string {
|
||||
if m.NewUUID != nil {
|
||||
return m.NewUUID()
|
||||
}
|
||||
return watermill.NewUUID()
|
||||
}
|
||||
|
||||
func (m JSONMarshaler) Unmarshal(msg *message.Message, v any) error {
|
||||
return json.Unmarshal(msg.Payload, v)
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package watermillmsg_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPayload struct {
|
||||
ID string `json:"id"`
|
||||
Num int `json:"num"`
|
||||
When time.Time `json:"when"`
|
||||
}
|
||||
|
||||
func TestJSONMarshaler(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
payloadToMarshal := testPayload{
|
||||
ID: watermill.NewUUID(),
|
||||
Num: 123,
|
||||
When: time.Date(2022, time.July, 30, 14, 13, 12, 0, time.UTC),
|
||||
}
|
||||
|
||||
marshaler := watermillmsg.JSONMarshaler{NewUUID: uuid.NewString}
|
||||
|
||||
m, err := marshaler.Marshal(context.Background(), payloadToMarshal)
|
||||
require.NoError(t, err)
|
||||
|
||||
var payloadToUnmarshal testPayload
|
||||
require.NoError(t, marshaler.Unmarshal(m, &payloadToUnmarshal))
|
||||
|
||||
assert.Equal(t, payloadToMarshal, payloadToUnmarshal)
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package watermillmsg
|
||||
|
||||
type SyncServersCmdPayload struct {
|
||||
VersionCode string `json:"versionCode"`
|
||||
URL string `json:"url"`
|
||||
}
|
Loading…
Reference in New Issue