feat: publish event after server sync (#9)
Reviewed-on: twhelp/corev3#9
This commit is contained in:
parent
482870d3a8
commit
1b078c8212
|
@ -21,9 +21,15 @@ var (
|
||||||
Value: "servers.cmd.sync",
|
Value: "servers.cmd.sync",
|
||||||
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
EnvVars: []string{"RABBITMQ_TOPIC_SYNC_SERVERS_CMD"},
|
||||||
}
|
}
|
||||||
|
rmqFlagTopicServerSyncedEvent = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.serverSyncedEvent",
|
||||||
|
Value: "servers.event.synced",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
|
||||||
|
}
|
||||||
rmqFlags = []cli.Flag{
|
rmqFlags = []cli.Flag{
|
||||||
rmqFlagConnectionString,
|
rmqFlagConnectionString,
|
||||||
rmqFlagTopicSyncServersCmd,
|
rmqFlagTopicSyncServersCmd,
|
||||||
|
rmqFlagTopicServerSyncedEvent,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,16 +43,10 @@ func newAMQPConnection(connString string, logger watermill.LoggerAdapter) (*amqp
|
||||||
}, logger)
|
}, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we will need this function later
|
|
||||||
//
|
|
||||||
//nolint:unused
|
|
||||||
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
|
func newAMQPPublisher(conn *amqp.ConnectionWrapper, logger watermill.LoggerAdapter) (*amqp.Publisher, error) {
|
||||||
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
|
return amqp.NewPublisherWithConnection(newAMQPConfig(amqp.GenerateQueueNameTopicName), logger, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we will need this function later
|
|
||||||
//
|
|
||||||
//nolint:unused
|
|
||||||
func newAMQPSubscriber(
|
func newAMQPSubscriber(
|
||||||
conn *amqp.ConnectionWrapper,
|
conn *amqp.ConnectionWrapper,
|
||||||
logger watermill.LoggerAdapter,
|
logger watermill.LoggerAdapter,
|
||||||
|
@ -55,9 +55,6 @@ func newAMQPSubscriber(
|
||||||
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
|
return amqp.NewSubscriberWithConnection(newAMQPConfig(newQueueNameGenerator(queueNameSuffix)), logger, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we will need this function later
|
|
||||||
//
|
|
||||||
//nolint:unused
|
|
||||||
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
|
func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
|
||||||
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
|
cfg := amqp.NewDurablePubSubConfig("", generateQueueName)
|
||||||
cfg.Consume.NoRequeueOnNack = true
|
cfg.Consume.NoRequeueOnNack = true
|
||||||
|
@ -66,16 +63,10 @@ func newAMQPConfig(generateQueueName amqp.QueueNameGenerator) amqp.Config {
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// we will need this function later
|
|
||||||
//
|
|
||||||
//nolint:unused
|
|
||||||
func generateExchangeName(topic string) string {
|
func generateExchangeName(topic string) string {
|
||||||
return "twhelp_" + topic
|
return "twhelp_" + topic
|
||||||
}
|
}
|
||||||
|
|
||||||
// we will need this function later
|
|
||||||
//
|
|
||||||
//nolint:unused
|
|
||||||
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
func newQueueNameGenerator(suffix string) amqp.QueueNameGenerator {
|
||||||
return func(topic string) string {
|
return func(topic string) string {
|
||||||
return generateExchangeName(topic) + "_" + suffix
|
return generateExchangeName(topic) + "_" + suffix
|
||||||
|
|
|
@ -47,8 +47,15 @@ var cmdConsumer = &cli.Command{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
serverPublisher := adapter.NewServerWatermillPublisher(
|
||||||
|
publisher,
|
||||||
|
marshaler,
|
||||||
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
||||||
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
||||||
|
)
|
||||||
|
|
||||||
consumer := port.NewServerWatermillConsumer(
|
consumer := port.NewServerWatermillConsumer(
|
||||||
app.NewServerService(adapter.NewServerBunRepository(db), twSvc),
|
app.NewServerService(adapter.NewServerBunRepository(db), twSvc, serverPublisher),
|
||||||
subscriber,
|
subscriber,
|
||||||
logger,
|
logger,
|
||||||
marshaler,
|
marshaler,
|
||||||
|
|
|
@ -63,6 +63,7 @@ var (
|
||||||
publisher,
|
publisher,
|
||||||
newWatermillMarshaler(),
|
newWatermillMarshaler(),
|
||||||
c.String(rmqFlagTopicSyncServersCmd.Name),
|
c.String(rmqFlagTopicSyncServersCmd.Name),
|
||||||
|
c.String(rmqFlagTopicServerSyncedEvent.Name),
|
||||||
)
|
)
|
||||||
|
|
||||||
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
versionSvc := app.NewVersionService(adapter.NewVersionBunRepository(bunDB))
|
||||||
|
|
|
@ -10,17 +10,24 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerWatermillPublisher struct {
|
type ServerWatermillPublisher struct {
|
||||||
marshaler watermillmsg.Marshaler
|
marshaler watermillmsg.Marshaler
|
||||||
publisher message.Publisher
|
publisher message.Publisher
|
||||||
cmdSyncTopic string
|
cmdSyncTopic string
|
||||||
|
eventSyncedTopic string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerWatermillPublisher(
|
func NewServerWatermillPublisher(
|
||||||
publisher message.Publisher,
|
publisher message.Publisher,
|
||||||
marshaler watermillmsg.Marshaler,
|
marshaler watermillmsg.Marshaler,
|
||||||
cmdSyncTopic string,
|
cmdSyncTopic string,
|
||||||
|
eventSyncedTopic string,
|
||||||
) *ServerWatermillPublisher {
|
) *ServerWatermillPublisher {
|
||||||
return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic}
|
return &ServerWatermillPublisher{
|
||||||
|
publisher: publisher,
|
||||||
|
marshaler: marshaler,
|
||||||
|
cmdSyncTopic: cmdSyncTopic,
|
||||||
|
eventSyncedTopic: eventSyncedTopic,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
|
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
|
||||||
|
@ -44,3 +51,29 @@ func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...do
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pub *ServerWatermillPublisher) EventSynced(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.ServerSyncedEventPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.ServerSyncedEventPayload{
|
||||||
|
Key: p.Key(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
URL: p.URL().String(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal ServerSyncedEventPayload: %w", p.Key(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
12
internal/app/publisher.go
Normal file
12
internal/app/publisher.go
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerPublisher interface {
|
||||||
|
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
||||||
|
EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error
|
||||||
|
}
|
|
@ -7,10 +7,6 @@ import (
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerPublisher interface {
|
|
||||||
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type DataSyncService struct {
|
type DataSyncService struct {
|
||||||
versionSvc *VersionService
|
versionSvc *VersionService
|
||||||
serverPub ServerPublisher
|
serverPub ServerPublisher
|
||||||
|
|
|
@ -13,12 +13,13 @@ type ServerRepository interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerService struct {
|
type ServerService struct {
|
||||||
repo ServerRepository
|
repo ServerRepository
|
||||||
twSvc TWService
|
twSvc TWService
|
||||||
|
publisher ServerPublisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerService(repo ServerRepository, twSvc TWService) *ServerService {
|
func NewServerService(repo ServerRepository, twSvc TWService, publisher ServerPublisher) *ServerService {
|
||||||
return &ServerService{repo: repo, twSvc: twSvc}
|
return &ServerService{repo: repo, twSvc: twSvc, publisher: publisher}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
|
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
|
||||||
|
@ -51,7 +52,16 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return svc.repo.CreateOrUpdate(ctx, params...)
|
if err = svc.repo.CreateOrUpdate(ctx, params...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads, err := domain.NewServerSyncedEventPayloads(openServersWithoutSpecial, versionCode)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't construct server synced event payloads: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc.publisher.EventSynced(ctx, payloads...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) {
|
func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package domain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,3 +39,64 @@ func (p SyncServersCmdPayload) VersionCode() string {
|
||||||
func (p SyncServersCmdPayload) URL() *url.URL {
|
func (p SyncServersCmdPayload) URL() *url.URL {
|
||||||
return p.url
|
return p.url
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ServerSyncedEventPayload struct {
|
||||||
|
key string
|
||||||
|
url *url.URL
|
||||||
|
versionCode string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerSyncedEventPayload(key string, rawURL string, versionCode string) (ServerSyncedEventPayload, error) {
|
||||||
|
if key == "" {
|
||||||
|
return ServerSyncedEventPayload{}, errors.New("key can't be blank")
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return ServerSyncedEventPayload{}, errors.New("version code can't be blank")
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := parseURL(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return ServerSyncedEventPayload{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ServerSyncedEventPayload{
|
||||||
|
key: key,
|
||||||
|
url: u,
|
||||||
|
versionCode: versionCode,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerSyncedEventPayloads(servers BaseServers, versionCode string) ([]ServerSyncedEventPayload, error) {
|
||||||
|
if versionCode == "" {
|
||||||
|
return nil, errors.New("version code can't be blank")
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([]ServerSyncedEventPayload, 0, len(servers))
|
||||||
|
|
||||||
|
for i, s := range servers {
|
||||||
|
if s.IsZero() {
|
||||||
|
return nil, fmt.Errorf("servers[%d] is an empty struct", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
res = append(res, ServerSyncedEventPayload{
|
||||||
|
key: s.Key(),
|
||||||
|
url: s.URL(),
|
||||||
|
versionCode: versionCode,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p ServerSyncedEventPayload) Key() string {
|
||||||
|
return p.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p ServerSyncedEventPayload) URL() *url.URL {
|
||||||
|
return p.url
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p ServerSyncedEventPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
||||||
|
|
|
@ -2,9 +2,11 @@ package domain_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -99,3 +101,33 @@ func TestNewSyncServersCmdPayloadWithStringURL(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewServerSyncedEventPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
server := domaintest.NewServer(t)
|
||||||
|
|
||||||
|
payload, err := domain.NewServerSyncedEventPayload(server.Key(), server.URL().String(), server.VersionCode())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.Key())
|
||||||
|
assert.Equal(t, server.URL(), payload.URL())
|
||||||
|
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewServerSyncedEventPayloads(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
versionCode := domaintest.RandVersionCode()
|
||||||
|
servers := domain.BaseServers{
|
||||||
|
domaintest.NewBaseServer(t),
|
||||||
|
domaintest.NewBaseServer(t),
|
||||||
|
}
|
||||||
|
|
||||||
|
payloads, err := domain.NewServerSyncedEventPayloads(servers, versionCode)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, s := range servers {
|
||||||
|
assert.True(t, slices.ContainsFunc(payloads, func(payload domain.ServerSyncedEventPayload) bool {
|
||||||
|
return payload.Key() == s.Key() && payload.VersionCode() == versionCode && payload.URL() == s.URL()
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,3 +4,9 @@ type SyncServersCmdPayload struct {
|
||||||
VersionCode string `json:"versionCode"`
|
VersionCode string `json:"versionCode"`
|
||||||
URL string `json:"url"`
|
URL string `json:"url"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ServerSyncedEventPayload struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
URL string `json:"url"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user