diff --git a/cmd/twhelp/amqp.go b/cmd/twhelp/amqp.go index e10a8a3..b50c889 100644 --- a/cmd/twhelp/amqp.go +++ b/cmd/twhelp/amqp.go @@ -21,9 +21,15 @@ var ( Value: "servers.cmd.sync", EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"}, } + rmqFlagTopicServerSyncedEvent = &cli.StringFlag{ + Name: "rabbitmq.topic.serverSyncedEvent", + Value: "servers.event.synced", + EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"}, + } rmqFlags = []cli.Flag{ rmqFlagConnectionString, rmqFlagTopicSyncServersCmd, + rmqFlagTopicServerSyncedEvent, } ) @@ -37,16 +43,10 @@ func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp }, logger) } -// we will need this function later -// -//nolint:unused func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) { return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn) } -// we will need this function later -// -//nolint:unused func newAMQPSubscriber( conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter, @@ -55,9 +55,6 @@ func newAMQPSubscriber( return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn) } -// we will need this function later -// -//nolint:unused func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config { cfg := amqp.NewDurablePubSubConfig("", generateQueueName) cfg.Consume.NoRequeueOnNack = true @@ -66,16 +63,10 @@ func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config { return cfg } -// we will need this function later -// -//nolint:unused func generateExchangeName(topic string) string { return "twhelp_" + topic } -// we will need this function later -// -//nolint:unused func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator { return func(topic string) string { return generateExchangeName(topic) + "_" + suffix diff --git a/cmd/twhelp/cmd_consumer.go b/cmd/twhelp/cmd_consumer.go index 66745e7..d28785d 100644 --- a/cmd/twhelp/cmd_consumer.go +++ b/cmd/twhelp/cmd_consumer.go @@ -47,8 +47,15 @@ var cmdConsumer = &cli.Command{ return err } + serverPublisher := adapter.NewServerWatermillPublisher( + publisher, + marshaler, + c.String(rmqFlagTopicSyncServersCmd.Name), + c.String(rmqFlagTopicServerSyncedEvent.Name), + ) + consumer := port.NewServerWatermillConsumer( - app.NewServerService(adapter.NewServerBunRepository(db), twSvc), + app.NewServerService(adapter.NewServerBunRepository(db), twSvc, serverPublisher), subscriber, logger, marshaler, diff --git a/cmd/twhelp/cmd_job.go b/cmd/twhelp/cmd_job.go index 1a44b2d..81267bc 100644 --- a/cmd/twhelp/cmd_job.go +++ b/cmd/twhelp/cmd_job.go @@ -63,6 +63,7 @@ var ( publisher, newWatermillMarshaler(), c.String(rmqFlagTopicSyncServersCmd.Name), + c.String(rmqFlagTopicServerSyncedEvent.Name), ) versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB)) diff --git a/internal/adapter/publisher_watermill_server.go b/internal/adapter/publisher_watermill_server.go index e208613..7ac6d5c 100644 --- a/internal/adapter/publisher_watermill_server.go +++ b/internal/adapter/publisher_watermill_server.go @@ -10,17 +10,24 @@ import ( ) type ServerWatermillPublisher struct { - marshaler watermillmsg.Marshaler - publisher message.Publisher - cmdSyncTopic string + marshaler watermillmsg.Marshaler + publisher message.Publisher + cmdSyncTopic string + eventSyncedTopic string } func NewServerWatermillPublisher( publisher message.Publisher, marshaler watermillmsg.Marshaler, cmdSyncTopic string, + eventSyncedTopic string, ) *ServerWatermillPublisher { - return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic} + return &ServerWatermillPublisher{ + publisher: publisher, + marshaler: marshaler, + cmdSyncTopic: cmdSyncTopic, + eventSyncedTopic: eventSyncedTopic, + } } func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error { @@ -44,3 +51,29 @@ func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...do return nil } + +func (pub *ServerWatermillPublisher) EventSynced( + ctx context.Context, + payloads ...domain.ServerSyncedEventPayload, +) error { + msgs := make([]*message.Message, 0, len(payloads)) + + for _, p := range payloads { + msg, err := pub.marshaler.Marshal(ctx, watermillmsg.ServerSyncedEventPayload{ + Key: p.Key(), + VersionCode: p.VersionCode(), + URL: p.URL().String(), + }) + if err != nil { + return fmt.Errorf("%s: couldn't marshal ServerSyncedEventPayload: %w", p.Key(), err) + } + + msgs = append(msgs, msg) + } + + if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { + return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) + } + + return nil +} diff --git a/internal/app/publisher.go b/internal/app/publisher.go new file mode 100644 index 0000000..c1eb09b --- /dev/null +++ b/internal/app/publisher.go @@ -0,0 +1,12 @@ +package app + +import ( + "context" + + "gitea.dwysokinski.me/twhelp/corev3/internal/domain" +) + +type ServerPublisher interface { + CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error + EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error +} diff --git a/internal/app/service_data_sync.go b/internal/app/service_data_sync.go index 196b0b0..15d0459 100644 --- a/internal/app/service_data_sync.go +++ b/internal/app/service_data_sync.go @@ -7,10 +7,6 @@ import ( "gitea.dwysokinski.me/twhelp/corev3/internal/domain" ) -type ServerPublisher interface { - CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error -} - type DataSyncService struct { versionSvc *VersionService serverPub ServerPublisher diff --git a/internal/app/service_server.go b/internal/app/service_server.go index 2432e01..f3c826b 100644 --- a/internal/app/service_server.go +++ b/internal/app/service_server.go @@ -13,12 +13,13 @@ type ServerRepository interface { } type ServerService struct { - repo ServerRepository - twSvc TWService + repo ServerRepository + twSvc TWService + publisher ServerPublisher } -func NewServerService(repo ServerRepository, twSvc TWService) *ServerService { - return &ServerService{repo: repo, twSvc: twSvc} +func NewServerService(repo ServerRepository, twSvc TWService, publisher ServerPublisher) *ServerService { + return &ServerService{repo: repo, twSvc: twSvc, publisher: publisher} } func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error { @@ -51,7 +52,16 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm return err } - return svc.repo.CreateOrUpdate(ctx, params...) + if err = svc.repo.CreateOrUpdate(ctx, params...); err != nil { + return err + } + + payloads, err := domain.NewServerSyncedEventPayloads(openServersWithoutSpecial, versionCode) + if err != nil { + return fmt.Errorf("couldn't construct server synced event payloads: %w", err) + } + + return svc.publisher.EventSynced(ctx, payloads...) } func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) { diff --git a/internal/domain/server_message_payloads.go b/internal/domain/server_message_payloads.go index 4a01076..1d3c720 100644 --- a/internal/domain/server_message_payloads.go +++ b/internal/domain/server_message_payloads.go @@ -2,6 +2,7 @@ package domain import ( "errors" + "fmt" "net/url" ) @@ -38,3 +39,64 @@ func (p SyncServersCmdPayload) VersionCode() string { func (p SyncServersCmdPayload) URL() *url.URL { return p.url } + +type ServerSyncedEventPayload struct { + key string + url *url.URL + versionCode string +} + +func NewServerSyncedEventPayload(key string, rawURL string, versionCode string) (ServerSyncedEventPayload, error) { + if key == "" { + return ServerSyncedEventPayload{}, errors.New("key can't be blank") + } + + if versionCode == "" { + return ServerSyncedEventPayload{}, errors.New("version code can't be blank") + } + + u, err := parseURL(rawURL) + if err != nil { + return ServerSyncedEventPayload{}, err + } + + return ServerSyncedEventPayload{ + key: key, + url: u, + versionCode: versionCode, + }, nil +} + +func NewServerSyncedEventPayloads(servers BaseServers, versionCode string) ([]ServerSyncedEventPayload, error) { + if versionCode == "" { + return nil, errors.New("version code can't be blank") + } + + res := make([]ServerSyncedEventPayload, 0, len(servers)) + + for i, s := range servers { + if s.IsZero() { + return nil, fmt.Errorf("servers[%d] is an empty struct", i) + } + + res = append(res, ServerSyncedEventPayload{ + key: s.Key(), + url: s.URL(), + versionCode: versionCode, + }) + } + + return res, nil +} + +func (p ServerSyncedEventPayload) Key() string { + return p.key +} + +func (p ServerSyncedEventPayload) URL() *url.URL { + return p.url +} + +func (p ServerSyncedEventPayload) VersionCode() string { + return p.versionCode +} diff --git a/internal/domain/server_message_payloads_test.go b/internal/domain/server_message_payloads_test.go index a45ed75..2c127a1 100644 --- a/internal/domain/server_message_payloads_test.go +++ b/internal/domain/server_message_payloads_test.go @@ -2,9 +2,11 @@ package domain_test import ( "net/url" + "slices" "testing" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" + "gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -99,3 +101,33 @@ func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) { }) } } + +func TestNewServerSyncedEventPayload(t *testing.T) { + t.Parallel() + + server := domaintest.NewServer(t) + + payload, err := domain.NewServerSyncedEventPayload(server.Key(), server.URL().String(), server.VersionCode()) + require.NoError(t, err) + assert.Equal(t, server.Key(), payload.Key()) + assert.Equal(t, server.URL(), payload.URL()) + assert.Equal(t, server.VersionCode(), payload.VersionCode()) +} + +func TestNewServerSyncedEventPayloads(t *testing.T) { + t.Parallel() + + versionCode := domaintest.RandVersionCode() + servers := domain.BaseServers{ + domaintest.NewBaseServer(t), + domaintest.NewBaseServer(t), + } + + payloads, err := domain.NewServerSyncedEventPayloads(servers, versionCode) + require.NoError(t, err) + for _, s := range servers { + assert.True(t, slices.ContainsFunc(payloads, func(payload domain.ServerSyncedEventPayload) bool { + return payload.Key() == s.Key() && payload.VersionCode() == versionCode && payload.URL() == s.URL() + })) + } +} diff --git a/internal/watermillmsg/server.go b/internal/watermillmsg/server.go index 7dafec0..cb99082 100644 --- a/internal/watermillmsg/server.go +++ b/internal/watermillmsg/server.go @@ -4,3 +4,9 @@ type SyncServersCmdPayload struct { VersionCode string `json:"versionCode"` URL string `json:"url"` } + +type ServerSyncedEventPayload struct { + Key string `json:"key"` + URL string `json:"url"` + VersionCode string `json:"versionCode"` +}