feat: publish event after tribes sync (#17)

Reviewed-on: twhelp/corev3#17
This commit is contained in:
Dawid Wysokiński 2023-12-30 08:38:32 +00:00
parent 40a0490ccc
commit 62832de065
12 changed files with 225 additions and 56 deletions

View File

@ -91,8 +91,14 @@ var cmdConsumer = &cli.Command{
return err
}
tribePublisher := adapter.NewTribeWatermillPublisher(
publisher,
marshaler,
c.String(rmqFlagTopicTribesSyncedEvent.Name),
)
consumer := port.NewTribeWatermillConsumer(
app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc),
app.NewTribeService(adapter.NewTribeBunRepository(db), twSvc, tribePublisher),
subscriber,
logger,
marshaler,

View File

@ -26,10 +26,16 @@ var (
Value: "servers.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_SERVER_SYNCED_EVENT"},
}
rmqFlagTopicTribesSyncedEvent = &cli.StringFlag{
Name: "rabbitmq.topic.tribesSyncedEvent",
Value: "tribes.event.synced",
EnvVars: []string{"RABBITMQ_TOPIC_TRIBES_SYNCED_EVENT"},
}
rmqFlags = []cli.Flag{
rmqFlagConnectionString,
rmqFlagTopicSyncServersCmd,
rmqFlagTopicServerSyncedEvent,
rmqFlagTopicTribesSyncedEvent,
}
)

View File

@ -0,0 +1,55 @@
package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type TribeWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
eventSyncedTopic string
}
func NewTribeWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
eventSyncedTopic string,
) *TribeWatermillPublisher {
return &TribeWatermillPublisher{
publisher: publisher,
marshaler: marshaler,
eventSyncedTopic: eventSyncedTopic,
}
}
func (pub *TribeWatermillPublisher) EventSynced(
ctx context.Context,
payloads ...domain.TribesSyncedEventPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.TribesSyncedEventPayload{
ServerKey: p.ServerKey(),
VersionCode: p.VersionCode(),
ServerURL: p.ServerURL(),
NumTribes: p.NumTribes(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal TribesSyncedEventPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
}
return nil
}

View File

@ -10,3 +10,7 @@ type ServerPublisher interface {
CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error
EventSynced(ctx context.Context, payloads ...domain.ServerSyncedEventPayload) error
}
type TribePublisher interface {
EventSynced(ctx context.Context, payloads ...domain.TribesSyncedEventPayload) error
}

View File

@ -27,19 +27,15 @@ func (svc *DataSyncService) TriggerDataSync(ctx context.Context) error {
for _, v := range versions {
p, payloadErr := domain.NewSyncServersCmdPayload(v.Code(), v.URL())
if payloadErr != nil {
return fmt.Errorf("couldn't construct domain.SyncServersCmdPayload (version=%s): %w", v.Code(), payloadErr)
return fmt.Errorf("%s: couldn't construct domain.SyncServersCmdPayload: %w", v.Code(), payloadErr)
}
payloads = append(payloads, p)
}
if err = svc.serverPub.CmdSync(ctx, payloads...); err != nil {
return fmt.Errorf("couldn't publish commands: %w", err)
}
return err
return svc.serverPub.CmdSync(ctx, payloads...)
}
func (svc *DataSyncService) TriggerEnnoblementSync(_ context.Context) error {
panic("")
panic("unimplemented")
}

View File

@ -14,13 +14,13 @@ type ServerRepository interface {
}
type ServerService struct {
repo ServerRepository
twSvc TWService
publisher ServerPublisher
repo ServerRepository
twSvc TWService
pub ServerPublisher
}
func NewServerService(repo ServerRepository, twSvc TWService, publisher ServerPublisher) *ServerService {
return &ServerService{repo: repo, twSvc: twSvc, publisher: publisher}
func NewServerService(repo ServerRepository, twSvc TWService, pub ServerPublisher) *ServerService {
return &ServerService{repo: repo, twSvc: twSvc, pub: pub}
}
func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCmdPayload) error {
@ -62,7 +62,7 @@ func (svc *ServerService) Sync(ctx context.Context, payload domain.SyncServersCm
return fmt.Errorf("%s: couldn't construct server synced event payloads: %w", versionCode, err)
}
return svc.publisher.EventSynced(ctx, payloads...)
return svc.pub.EventSynced(ctx, payloads...)
}
func (svc *ServerService) listAllSpecial(ctx context.Context, versionCode string) (domain.Servers, error) {

View File

@ -16,16 +16,18 @@ type TribeRepository interface {
type TribeService struct {
repo TribeRepository
twSvc TWService
pub TribePublisher
}
func NewTribeService(repo TribeRepository, twSvc TWService) *TribeService {
return &TribeService{repo: repo, twSvc: twSvc}
func NewTribeService(repo TribeRepository, twSvc TWService, pub TribePublisher) *TribeService {
return &TribeService{repo: repo, twSvc: twSvc, pub: pub}
}
func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEventPayload) error {
serverKey := payload.Key()
func (svc *TribeService) Sync(ctx context.Context, serverSyncedPayload domain.ServerSyncedEventPayload) error {
serverKey := serverSyncedPayload.Key()
serverURL := serverSyncedPayload.URL()
tribes, err := svc.twSvc.GetTribes(ctx, payload.URL())
tribes, err := svc.twSvc.GetTribes(ctx, serverURL)
if err != nil {
return fmt.Errorf("%s: couldn't get tribes: %w", serverKey, err)
}
@ -38,6 +40,20 @@ func (svc *TribeService) Sync(ctx context.Context, payload domain.ServerSyncedEv
return fmt.Errorf("%s: couldn't delete tribes: %w", serverKey, err)
}
tribesSyncedPayload, err := domain.NewTribesSyncedEventPayload(
serverKey,
serverURL,
serverSyncedPayload.VersionCode(),
len(tribes),
)
if err != nil {
return fmt.Errorf("%s: couldn't construct domain.TribesSyncedEventPayload: %w", serverKey, err)
}
if err = svc.pub.EventSynced(ctx, tribesSyncedPayload); err != nil {
return fmt.Errorf("%s: %w", serverKey, err)
}
return nil
}

View File

@ -1,7 +1,6 @@
package domain_test
import (
"net/url"
"slices"
"testing"
@ -14,43 +13,12 @@ import (
func TestNewSyncServersCmdPayload(t *testing.T) {
t.Parallel()
type args struct {
versionCode string
url *url.URL
}
version := domaintest.NewVersion(t)
tests := []struct {
name string
args args
expectedErr error
}{
{
name: "OK",
args: args{
versionCode: "pl",
url: &url.URL{
Scheme: "https",
Host: "plemiona.pl",
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
payload, err := domain.NewSyncServersCmdPayload(tt.args.versionCode, tt.args.url)
require.ErrorIs(t, err, tt.expectedErr)
if tt.expectedErr != nil {
return
}
assert.Equal(t, tt.args.versionCode, payload.VersionCode())
assert.Equal(t, tt.args.url, payload.URL())
})
}
payload, err := domain.NewSyncServersCmdPayload(version.Code(), version.URL())
require.NoError(t, err)
assert.Equal(t, version.Code(), payload.VersionCode())
assert.Equal(t, version.URL(), payload.URL())
}
func TestNewServerSyncedEventPayload(t *testing.T) {

View File

@ -0,0 +1,77 @@
package domain
import (
"math"
"net/url"
)
type TribesSyncedEventPayload struct {
serverKey string
serverURL *url.URL
versionCode string
numTribes int
}
const tribesSyncedEventPayloadModelName = "TribesSyncedEventPayload"
func NewTribesSyncedEventPayload(
serverKey string,
serverURL *url.URL,
versionCode string,
numTribes int,
) (TribesSyncedEventPayload, error) {
if serverKey == "" {
return TribesSyncedEventPayload{}, ValidationError{
Model: tribesSyncedEventPayloadModelName,
Field: "serverKey",
Err: ErrRequired,
}
}
if serverURL == nil {
return TribesSyncedEventPayload{}, ValidationError{
Model: tribesSyncedEventPayloadModelName,
Field: "serverURL",
Err: ErrNil,
}
}
if versionCode == "" {
return TribesSyncedEventPayload{}, ValidationError{
Model: tribesSyncedEventPayloadModelName,
Field: "versionCode",
Err: ErrRequired,
}
}
if err := validateIntInRange(numTribes, 0, math.MaxInt); err != nil {
return TribesSyncedEventPayload{}, ValidationError{
Model: tribesSyncedEventPayloadModelName,
Field: "numTribes",
Err: err,
}
}
return TribesSyncedEventPayload{
serverKey: serverKey,
serverURL: serverURL,
versionCode: versionCode,
numTribes: numTribes,
}, nil
}
func (p TribesSyncedEventPayload) ServerKey() string {
return p.serverKey
}
func (p TribesSyncedEventPayload) ServerURL() *url.URL {
return p.serverURL
}
func (p TribesSyncedEventPayload) VersionCode() string {
return p.versionCode
}
func (p TribesSyncedEventPayload) NumTribes() int {
return p.numTribes
}

View File

@ -0,0 +1,31 @@
package domain_test
import (
"math"
"testing"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain/domaintest"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewTribesSyncedEventPayload(t *testing.T) {
t.Parallel()
server := domaintest.NewServer(t)
numTribes := gofakeit.IntRange(0, math.MaxInt)
payload, err := domain.NewTribesSyncedEventPayload(
server.Key(),
server.URL(),
server.VersionCode(),
numTribes,
)
require.NoError(t, err)
assert.Equal(t, server.Key(), payload.ServerKey())
assert.Equal(t, server.URL(), payload.ServerURL())
assert.Equal(t, server.VersionCode(), payload.VersionCode())
assert.Equal(t, numTribes, payload.NumTribes())
}

View File

@ -0,0 +1,10 @@
package watermillmsg
import "net/url"
type TribesSyncedEventPayload struct {
ServerKey string `json:"serverKey"`
ServerURL *url.URL `json:"serverUrl"`
VersionCode string `json:"versionCode"`
NumTribes int `json:"numTribes"`
}