diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index b9dba2e..2422796 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -91,8 +91,14 @@ var cmdConsumer = &cli.Command{ return err } + tribePublisher := adapter.NewTribeWatermillPublisher( + publisher, + marshaler, + c.String(rmqFlagTopicTribesSyncedEvent.Name), + ) + consumer := port.NewTribeWatermillConsumer( - app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc), + app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher), subscriber, logger, marshaler, diff --git a/cmd/twhelp/amqp.go b/cmd/twhelp/rabbitmq.go similarity index 91% rename from cmd/twhelp/amqp.go rename to cmd/twhelp/rabbitmq.go index b50c889..12d25cf 100644 --- a/cmd/twhelp/amqp.go +++ b/cmd/twhelp/rabbitmq.go @@ -26,10 +26,16 @@ var ( Value: "servers.event.synced", EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"}, } + rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.tribesSyncedEvent", + Value: "tribes.event.synced", + EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, rmqFlagTopicServerSyncedEvent, + rmqFlagTopicTribesSyncedEvent, } ) diff --git a/cmd/twhelp/twhelp.go b/cmd/twhelp/tw.go similarity index 100% rename from cmd/twhelp/twhelp.go rename to cmd/twhelp/tw.go diff --git a/internal/adapter/publisher_watermill_tribe.go b/internal/adapter/publisher_watermill_tribe.go new file mode 100644 index 0000000..0a1ede9 --- /dev/null +++ b/internal/adapter/publisher_watermill_tribe.go @@ -0,0 +1,55 @@ +package adapter + +import ( + "context" + "fmt" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" + "github.com/ThreeDotsLabs/watermill/message" +) + +type TribeWatermillPublisher struct { + marshaler watermillmsg.Marshaler + publisher message.Publisher + eventSyncedTopic string +} + +func NewTribeWatermillPublisher( + publisher message.Publisher, + marshaler watermillmsg.Marshaler, + eventSyncedTopic string, +) *TribeWatermillPublisher { + return &TribeWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + eventSyncedTopic: eventSyncedTopic, + } +} + +func (pub *TribeWatermillPublisher) EventSynced( + ctx context.Context, + payloads ...domain.TribesSyncedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.TribesSyncedEventPayload{ + ServerKey: p.ServerKey(), + VersionCode: p.VersionCode(), + ServerURL: p.ServerURL(), + NumTribes: p.NumTribes(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal TribesSyncedEventPayload: %w", p.ServerKey(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) + } + + return nil +} diff --git a/internal/app/publisher.go b/internal/app/publishers.go similarity index 70% rename from internal/app/publisher.go rename to internal/app/publishers.go index c1eb09b..0095f00 100644 --- a/internal/app/publisher.go +++ b/internal/app/publishers.go @@ -10,3 +10,7 @@ type ServerPublisher interface { CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error } + +type TribePublisher interface { + EventSynced(ctx context.Context, payloads ...domain.TribesSyncedEventPayload) error +} diff --git a/internal/app/service_data_sync.go b/internal/app/service_data_sync.go index 15d0459..992c0ec 100644 --- a/internal/app/service_data_sync.go +++ b/internal/app/service_data_sync.go @@ -27,19 +27,15 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error { for _, v := range versions { p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL()) if payloadErr != nil { - return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr) + return fmt.Errorf("%s: couldn't construct domain.SyncServersCmdPayload: %w", v.Code(), payloadErr) } payloads = append(payloads, p) } - if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil { - return fmt.Errorf("couldn't publish commands: %w", err) - } - - return err + return svc.serverPub.CmdSync(ctx, payloads...) } func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error { - panic("") + panic("unimplemented") } diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 5180c03..26ddf21 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -14,13 +14,13 @@ type ServerRepository interface { } type ServerService struct { - repo ServerRepository - twSvc TWService - publisher ServerPublisher + repo ServerRepository + twSvc TWService + pub ServerPublisher } -func NewServerService(repo ServerRepository, twSvc TWService, publisher ServerPublisher) *ServerService { - return &ServerService{repo: repo, twSvc: twSvc, publisher: publisher} +func NewServerService(repo ServerRepository, twSvc TWService, pub ServerPublisher) *ServerService { + return &ServerService{repo: repo, twSvc: twSvc, pub: pub} } func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error { @@ -62,7 +62,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm return fmt.Errorf("%s: couldn't construct server synced event payloads: %w", versionCode, err) } - return svc.publisher.EventSynced(ctx, payloads...) + return svc.pub.EventSynced(ctx, payloads...) } func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) { diff --git a/internal/app/service_tribe.go b/internal/app/service_tribe.go index 253fcf9..749dc7f 100644 --- a/internal/app/service_tribe.go +++ b/internal/app/service_tribe.go @@ -16,16 +16,18 @@ type TribeRepository interface { type TribeService struct { repo TribeRepository twSvc TWService + pub TribePublisher } -func NewTribeService(repo TribeRepository, twSvc TWService) *TribeService { - return &TribeService{repo: repo, twSvc: twSvc} +func NewTribeService(repo TribeRepository, twSvc TWService, pub TribePublisher) *TribeService { + return &TribeService{repo: repo, twSvc: twSvc, pub: pub} } -func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error { - serverKey := payload.Key() +func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error { + serverKey := serverSyncedPayload.Key() + serverURL := serverSyncedPayload.URL() - tribes, err := svc.twSvc.GetTribes(ctx, payload.URL()) + tribes, err := svc.twSvc.GetTribes(ctx, serverURL) if err != nil { return fmt.Errorf("%s: couldn't get tribes: %w", serverKey, err) } @@ -38,6 +40,20 @@ func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEv return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err) } + tribesSyncedPayload, err := domain.NewTribesSyncedEventPayload( + serverKey, + serverURL, + serverSyncedPayload.VersionCode(), + len(tribes), + ) + if err != nil { + return fmt.Errorf("%s: couldn't construct domain.TribesSyncedEventPayload: %w", serverKey, err) + } + + if err = svc.pub.EventSynced(ctx, tribesSyncedPayload); err != nil { + return fmt.Errorf("%s: %w", serverKey, err) + } + return nil } diff --git a/internal/domain/server_message_payloads_test.go b/internal/domain/server_message_payloads_test.go index 620e472..4a90433 100644 --- a/internal/domain/server_message_payloads_test.go +++ b/internal/domain/server_message_payloads_test.go @@ -1,7 +1,6 @@ package domain_test import ( - "net/url" "slices" "testing" @@ -14,43 +13,12 @@ import ( func TestNewSyncServersCmdPayload(t *testing.T) { t.Parallel() - type args struct { - versionCode string - url *url.URL - } + version := domaintest.NewVersion(t) - tests := []struct { - name string - args args - expectedErr error - }{ - { - name: "OK", - args: args{ - versionCode: "pl", - url: &url.URL{ - Scheme: "https", - Host: "plemiona.pl", - }, - }, - }, - } - - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url) - require.ErrorIs(t, err, tt.expectedErr) - if tt.expectedErr != nil { - return - } - assert.Equal(t, tt.args.versionCode, payload.VersionCode()) - assert.Equal(t, tt.args.url, payload.URL()) - }) - } + payload, err := domain.NewSyncServersCmdPayload(version.Code(), version.URL()) + require.NoError(t, err) + assert.Equal(t, version.Code(), payload.VersionCode()) + assert.Equal(t, version.URL(), payload.URL()) } func TestNewServerSyncedEventPayload(t *testing.T) { diff --git a/internal/domain/tribe_message_payloads.go b/internal/domain/tribe_message_payloads.go new file mode 100644 index 0000000..6fd3fd2 --- /dev/null +++ b/internal/domain/tribe_message_payloads.go @@ -0,0 +1,77 @@ +package domain + +import ( + "math" + "net/url" +) + +type TribesSyncedEventPayload struct { + serverKey string + serverURL *url.URL + versionCode string + numTribes int +} + +const tribesSyncedEventPayloadModelName = "TribesSyncedEventPayload" + +func NewTribesSyncedEventPayload( + serverKey string, + serverURL *url.URL, + versionCode string, + numTribes int, +) (TribesSyncedEventPayload, error) { + if serverKey == "" { + return TribesSyncedEventPayload{}, ValidationError{ + Model: tribesSyncedEventPayloadModelName, + Field: "serverKey", + Err: ErrRequired, + } + } + + if serverURL == nil { + return TribesSyncedEventPayload{}, ValidationError{ + Model: tribesSyncedEventPayloadModelName, + Field: "serverURL", + Err: ErrNil, + } + } + + if versionCode == "" { + return TribesSyncedEventPayload{}, ValidationError{ + Model: tribesSyncedEventPayloadModelName, + Field: "versionCode", + Err: ErrRequired, + } + } + + if err := validateIntInRange(numTribes, 0, math.MaxInt); err != nil { + return TribesSyncedEventPayload{}, ValidationError{ + Model: tribesSyncedEventPayloadModelName, + Field: "numTribes", + Err: err, + } + } + + return TribesSyncedEventPayload{ + serverKey: serverKey, + serverURL: serverURL, + versionCode: versionCode, + numTribes: numTribes, + }, nil +} + +func (p TribesSyncedEventPayload) ServerKey() string { + return p.serverKey +} + +func (p TribesSyncedEventPayload) ServerURL() *url.URL { + return p.serverURL +} + +func (p TribesSyncedEventPayload) VersionCode() string { + return p.versionCode +} + +func (p TribesSyncedEventPayload) NumTribes() int { + return p.numTribes +} diff --git a/internal/domain/tribe_message_payloads_test.go b/internal/domain/tribe_message_payloads_test.go new file mode 100644 index 0000000..aa7fb42 --- /dev/null +++ b/internal/domain/tribe_message_payloads_test.go @@ -0,0 +1,31 @@ +package domain_test + +import ( + "math" + "testing" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewTribesSyncedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + numTribes := gofakeit.IntRange(0, math.MaxInt) + + payload, err := domain.NewTribesSyncedEventPayload( + server.Key(), + server.URL(), + server.VersionCode(), + numTribes, + ) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.ServerKey()) + assert.Equal(t, server.URL(), payload.ServerURL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) + assert.Equal(t, numTribes, payload.NumTribes()) +} diff --git a/internal/watermillmsg/tribe.go b/internal/watermillmsg/tribe.go new file mode 100644 index 0000000..a97aa9a --- /dev/null +++ b/internal/watermillmsg/tribe.go @@ -0,0 +1,10 @@ +package watermillmsg + +import "net/url" + +type TribesSyncedEventPayload struct { + ServerKey string `json:"serverKey"` + ServerURL *url.URL `json:"serverUrl"` + VersionCode string `json:"versionCode"` + NumTribes int `json:"numTribes"` +}