feat: publish event after tribes sync (#17)
Reviewed-on: twhelp/corev3#17
This commit is contained in:
parent
40a0490ccc
commit
62832de065
|
@ -91,8 +91,14 @@ var cmdConsumer = &cli.Command{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tribePublisher := adapter.NewTribeWatermillPublisher(
|
||||||
|
publisher,
|
||||||
|
marshaler,
|
||||||
|
c.String(rmqFlagTopicTribesSyncedEvent.Name),
|
||||||
|
)
|
||||||
|
|
||||||
consumer := port.NewTribeWatermillConsumer(
|
consumer := port.NewTribeWatermillConsumer(
|
||||||
app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc),
|
app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher),
|
||||||
subscriber,
|
subscriber,
|
||||||
logger,
|
logger,
|
||||||
marshaler,
|
marshaler,
|
||||||
|
|
|
@ -26,10 +26,16 @@ var (
|
||||||
Value: "servers.event.synced",
|
Value: "servers.event.synced",
|
||||||
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
|
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
|
||||||
}
|
}
|
||||||
|
rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{
|
||||||
|
Name: "rabbitmq.topic.tribesSyncedEvent",
|
||||||
|
Value: "tribes.event.synced",
|
||||||
|
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
|
||||||
|
}
|
||||||
rmqFlags = []cli.Flag{
|
rmqFlags = []cli.Flag{
|
||||||
rmqFlagConnectionString,
|
rmqFlagConnectionString,
|
||||||
rmqFlagTopicSyncServersCmd,
|
rmqFlagTopicSyncServersCmd,
|
||||||
rmqFlagTopicServerSyncedEvent,
|
rmqFlagTopicServerSyncedEvent,
|
||||||
|
rmqFlagTopicTribesSyncedEvent,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package adapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
|
||||||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TribeWatermillPublisher struct {
|
||||||
|
marshaler watermillmsg.Marshaler
|
||||||
|
publisher message.Publisher
|
||||||
|
eventSyncedTopic string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTribeWatermillPublisher(
|
||||||
|
publisher message.Publisher,
|
||||||
|
marshaler watermillmsg.Marshaler,
|
||||||
|
eventSyncedTopic string,
|
||||||
|
) *TribeWatermillPublisher {
|
||||||
|
return &TribeWatermillPublisher{
|
||||||
|
publisher: publisher,
|
||||||
|
marshaler: marshaler,
|
||||||
|
eventSyncedTopic: eventSyncedTopic,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *TribeWatermillPublisher) EventSynced(
|
||||||
|
ctx context.Context,
|
||||||
|
payloads ...domain.TribesSyncedEventPayload,
|
||||||
|
) error {
|
||||||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||||||
|
|
||||||
|
for _, p := range payloads {
|
||||||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.TribesSyncedEventPayload{
|
||||||
|
ServerKey: p.ServerKey(),
|
||||||
|
VersionCode: p.VersionCode(),
|
||||||
|
ServerURL: p.ServerURL(),
|
||||||
|
NumTribes: p.NumTribes(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't marshal TribesSyncedEventPayload: %w", p.ServerKey(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
|
||||||
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -10,3 +10,7 @@ type ServerPublisher interface {
|
||||||
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
|
||||||
EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error
|
EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TribePublisher interface {
|
||||||
|
EventSynced(ctx context.Context, payloads ...domain.TribesSyncedEventPayload) error
|
||||||
|
}
|
|
@ -27,19 +27,15 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
|
||||||
for _, v := range versions {
|
for _, v := range versions {
|
||||||
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
|
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
|
||||||
if payloadErr != nil {
|
if payloadErr != nil {
|
||||||
return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr)
|
return fmt.Errorf("%s: couldn't construct domain.SyncServersCmdPayload: %w", v.Code(), payloadErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
payloads = append(payloads, p)
|
payloads = append(payloads, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil {
|
return svc.serverPub.CmdSync(ctx, payloads...)
|
||||||
return fmt.Errorf("couldn't publish commands: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
|
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
|
||||||
panic("")
|
panic("unimplemented")
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,13 +14,13 @@ type ServerRepository interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerService struct {
|
type ServerService struct {
|
||||||
repo ServerRepository
|
repo ServerRepository
|
||||||
twSvc TWService
|
twSvc TWService
|
||||||
publisher ServerPublisher
|
pub ServerPublisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerService(repo ServerRepository, twSvc TWService, publisher ServerPublisher) *ServerService {
|
func NewServerService(repo ServerRepository, twSvc TWService, pub ServerPublisher) *ServerService {
|
||||||
return &ServerService{repo: repo, twSvc: twSvc, publisher: publisher}
|
return &ServerService{repo: repo, twSvc: twSvc, pub: pub}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
|
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
|
||||||
|
@ -62,7 +62,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm
|
||||||
return fmt.Errorf("%s: couldn't construct server synced event payloads: %w", versionCode, err)
|
return fmt.Errorf("%s: couldn't construct server synced event payloads: %w", versionCode, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return svc.publisher.EventSynced(ctx, payloads...)
|
return svc.pub.EventSynced(ctx, payloads...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) {
|
func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) {
|
||||||
|
|
|
@ -16,16 +16,18 @@ type TribeRepository interface {
|
||||||
type TribeService struct {
|
type TribeService struct {
|
||||||
repo TribeRepository
|
repo TribeRepository
|
||||||
twSvc TWService
|
twSvc TWService
|
||||||
|
pub TribePublisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTribeService(repo TribeRepository, twSvc TWService) *TribeService {
|
func NewTribeService(repo TribeRepository, twSvc TWService, pub TribePublisher) *TribeService {
|
||||||
return &TribeService{repo: repo, twSvc: twSvc}
|
return &TribeService{repo: repo, twSvc: twSvc, pub: pub}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
|
func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error {
|
||||||
serverKey := payload.Key()
|
serverKey := serverSyncedPayload.Key()
|
||||||
|
serverURL := serverSyncedPayload.URL()
|
||||||
|
|
||||||
tribes, err := svc.twSvc.GetTribes(ctx, payload.URL())
|
tribes, err := svc.twSvc.GetTribes(ctx, serverURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: couldn't get tribes: %w", serverKey, err)
|
return fmt.Errorf("%s: couldn't get tribes: %w", serverKey, err)
|
||||||
}
|
}
|
||||||
|
@ -38,6 +40,20 @@ func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEv
|
||||||
return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err)
|
return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tribesSyncedPayload, err := domain.NewTribesSyncedEventPayload(
|
||||||
|
serverKey,
|
||||||
|
serverURL,
|
||||||
|
serverSyncedPayload.VersionCode(),
|
||||||
|
len(tribes),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: couldn't construct domain.TribesSyncedEventPayload: %w", serverKey, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = svc.pub.EventSynced(ctx, tribesSyncedPayload); err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", serverKey, err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package domain_test
|
package domain_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/url"
|
|
||||||
"slices"
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -14,43 +13,12 @@ import (
|
||||||
func TestNewSyncServersCmdPayload(t *testing.T) {
|
func TestNewSyncServersCmdPayload(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
type args struct {
|
version := domaintest.NewVersion(t)
|
||||||
versionCode string
|
|
||||||
url *url.URL
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
payload, err := domain.NewSyncServersCmdPayload(version.Code(), version.URL())
|
||||||
name string
|
require.NoError(t, err)
|
||||||
args args
|
assert.Equal(t, version.Code(), payload.VersionCode())
|
||||||
expectedErr error
|
assert.Equal(t, version.URL(), payload.URL())
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "OK",
|
|
||||||
args: args{
|
|
||||||
versionCode: "pl",
|
|
||||||
url: &url.URL{
|
|
||||||
Scheme: "https",
|
|
||||||
Host: "plemiona.pl",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
tt := tt
|
|
||||||
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url)
|
|
||||||
require.ErrorIs(t, err, tt.expectedErr)
|
|
||||||
if tt.expectedErr != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
|
|
||||||
assert.Equal(t, tt.args.url, payload.URL())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewServerSyncedEventPayload(t *testing.T) {
|
func TestNewServerSyncedEventPayload(t *testing.T) {
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TribesSyncedEventPayload struct {
|
||||||
|
serverKey string
|
||||||
|
serverURL *url.URL
|
||||||
|
versionCode string
|
||||||
|
numTribes int
|
||||||
|
}
|
||||||
|
|
||||||
|
const tribesSyncedEventPayloadModelName = "TribesSyncedEventPayload"
|
||||||
|
|
||||||
|
func NewTribesSyncedEventPayload(
|
||||||
|
serverKey string,
|
||||||
|
serverURL *url.URL,
|
||||||
|
versionCode string,
|
||||||
|
numTribes int,
|
||||||
|
) (TribesSyncedEventPayload, error) {
|
||||||
|
if serverKey == "" {
|
||||||
|
return TribesSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: tribesSyncedEventPayloadModelName,
|
||||||
|
Field: "serverKey",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if serverURL == nil {
|
||||||
|
return TribesSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: tribesSyncedEventPayloadModelName,
|
||||||
|
Field: "serverURL",
|
||||||
|
Err: ErrNil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionCode == "" {
|
||||||
|
return TribesSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: tribesSyncedEventPayloadModelName,
|
||||||
|
Field: "versionCode",
|
||||||
|
Err: ErrRequired,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateIntInRange(numTribes, 0, math.MaxInt); err != nil {
|
||||||
|
return TribesSyncedEventPayload{}, ValidationError{
|
||||||
|
Model: tribesSyncedEventPayloadModelName,
|
||||||
|
Field: "numTribes",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TribesSyncedEventPayload{
|
||||||
|
serverKey: serverKey,
|
||||||
|
serverURL: serverURL,
|
||||||
|
versionCode: versionCode,
|
||||||
|
numTribes: numTribes,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p TribesSyncedEventPayload) ServerKey() string {
|
||||||
|
return p.serverKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p TribesSyncedEventPayload) ServerURL() *url.URL {
|
||||||
|
return p.serverURL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p TribesSyncedEventPayload) VersionCode() string {
|
||||||
|
return p.versionCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p TribesSyncedEventPayload) NumTribes() int {
|
||||||
|
return p.numTribes
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package domain_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||||||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
|
||||||
|
"github.com/brianvoe/gofakeit/v6"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewTribesSyncedEventPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
server := domaintest.NewServer(t)
|
||||||
|
numTribes := gofakeit.IntRange(0, math.MaxInt)
|
||||||
|
|
||||||
|
payload, err := domain.NewTribesSyncedEventPayload(
|
||||||
|
server.Key(),
|
||||||
|
server.URL(),
|
||||||
|
server.VersionCode(),
|
||||||
|
numTribes,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, server.Key(), payload.ServerKey())
|
||||||
|
assert.Equal(t, server.URL(), payload.ServerURL())
|
||||||
|
assert.Equal(t, server.VersionCode(), payload.VersionCode())
|
||||||
|
assert.Equal(t, numTribes, payload.NumTribes())
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
package watermillmsg
|
||||||
|
|
||||||
|
import "net/url"
|
||||||
|
|
||||||
|
type TribesSyncedEventPayload struct {
|
||||||
|
ServerKey string `json:"serverKey"`
|
||||||
|
ServerURL *url.URL `json:"serverUrl"`
|
||||||
|
VersionCode string `json:"versionCode"`
|
||||||
|
NumTribes int `json:"numTribes"`
|
||||||
|
}
|
Loading…
Reference in New Issue