core/internal/adapter/publisher_watermill_server.go

47 lines
1.2 KiB
Go

package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type ServerWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
cmdSyncTopic string
}
func NewServerWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
cmdSyncTopic string,
) *ServerWatermillPublisher {
return &ServerWatermillPublisher{publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic}
}
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{
VersionCode: p.VersionCode(),
URL: p.URL().String(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
}
return nil
}