2023-12-21 06:28:53 +00:00
|
|
|
package adapter
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
|
2024-04-06 04:32:29 +00:00
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/domain"
|
|
|
|
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
|
2023-12-21 06:28:53 +00:00
|
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ServerWatermillPublisher struct {
|
2023-12-25 09:53:01 +00:00
|
|
|
marshaler watermillmsg.Marshaler
|
|
|
|
publisher message.Publisher
|
|
|
|
cmdSyncTopic string
|
|
|
|
eventSyncedTopic string
|
2023-12-21 06:28:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewServerWatermillPublisher(
|
|
|
|
publisher message.Publisher,
|
|
|
|
marshaler watermillmsg.Marshaler,
|
|
|
|
cmdSyncTopic string,
|
2023-12-25 09:53:01 +00:00
|
|
|
eventSyncedTopic string,
|
2023-12-21 06:28:53 +00:00
|
|
|
) *ServerWatermillPublisher {
|
2023-12-25 09:53:01 +00:00
|
|
|
return &ServerWatermillPublisher{
|
|
|
|
publisher: publisher,
|
|
|
|
marshaler: marshaler,
|
|
|
|
cmdSyncTopic: cmdSyncTopic,
|
|
|
|
eventSyncedTopic: eventSyncedTopic,
|
|
|
|
}
|
2023-12-21 06:28:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error {
|
|
|
|
msgs := make([]*message.Message, 0, len(payloads))
|
|
|
|
|
|
|
|
for _, p := range payloads {
|
|
|
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{
|
|
|
|
VersionCode: p.VersionCode(),
|
2023-12-27 08:07:40 +00:00
|
|
|
URL: p.URL(),
|
2023-12-21 06:28:53 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err)
|
|
|
|
}
|
|
|
|
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil {
|
|
|
|
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2023-12-25 09:53:01 +00:00
|
|
|
|
|
|
|
func (pub *ServerWatermillPublisher) EventSynced(
|
|
|
|
ctx context.Context,
|
|
|
|
payloads ...domain.ServerSyncedEventPayload,
|
|
|
|
) error {
|
|
|
|
msgs := make([]*message.Message, 0, len(payloads))
|
|
|
|
|
|
|
|
for _, p := range payloads {
|
|
|
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.ServerSyncedEventPayload{
|
|
|
|
Key: p.Key(),
|
|
|
|
VersionCode: p.VersionCode(),
|
2023-12-27 08:07:40 +00:00
|
|
|
URL: p.URL(),
|
2023-12-25 09:53:01 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("%s: couldn't marshal ServerSyncedEventPayload: %w", p.Key(), err)
|
|
|
|
}
|
|
|
|
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil {
|
|
|
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|