84 lines
2.2 KiB
Go
84 lines
2.2 KiB
Go
|
package adapter
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
|
||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
||
|
"github.com/ThreeDotsLabs/watermill/message"
|
||
|
)
|
||
|
|
||
|
type SnapshotWatermillPublisher struct {
|
||
|
marshaler watermillmsg.Marshaler
|
||
|
publisher message.Publisher
|
||
|
cmdCreateTopic string
|
||
|
eventCreatedTopic string
|
||
|
}
|
||
|
|
||
|
func NewSnapshotWatermillPublisher(
|
||
|
publisher message.Publisher,
|
||
|
marshaler watermillmsg.Marshaler,
|
||
|
cmdCreateTopic string,
|
||
|
eventCreatedTopic string,
|
||
|
) *SnapshotWatermillPublisher {
|
||
|
return &SnapshotWatermillPublisher{
|
||
|
publisher: publisher,
|
||
|
marshaler: marshaler,
|
||
|
cmdCreateTopic: cmdCreateTopic,
|
||
|
eventCreatedTopic: eventCreatedTopic,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (pub *SnapshotWatermillPublisher) CmdCreate(
|
||
|
ctx context.Context,
|
||
|
payloads ...domain.CreateSnapshotsCmdPayload,
|
||
|
) error {
|
||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||
|
|
||
|
for _, p := range payloads {
|
||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CreateSnapshotsCmdPayload{
|
||
|
ServerKey: p.ServerKey(),
|
||
|
VersionCode: p.VersionCode(),
|
||
|
VersionTimezone: p.VersionTimezone(),
|
||
|
Date: p.Date(),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%s: couldn't marshal CreateSnapshotsCmdPayload: %w", p.ServerKey(), err)
|
||
|
}
|
||
|
|
||
|
msgs = append(msgs, msg)
|
||
|
}
|
||
|
|
||
|
if err := pub.publisher.Publish(pub.cmdCreateTopic, msgs...); err != nil {
|
||
|
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCreateTopic, err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (pub *SnapshotWatermillPublisher) EventCreated(
|
||
|
ctx context.Context,
|
||
|
payloads ...domain.SnapshotsCreatedEventPayload,
|
||
|
) error {
|
||
|
msgs := make([]*message.Message, 0, len(payloads))
|
||
|
|
||
|
for _, p := range payloads {
|
||
|
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SnapshotsCreatedEventPayload{
|
||
|
ServerKey: p.ServerKey(),
|
||
|
VersionCode: p.VersionCode(),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%s: couldn't marshal SnapshotsCreatedEventPayload: %w", p.ServerKey(), err)
|
||
|
}
|
||
|
|
||
|
msgs = append(msgs, msg)
|
||
|
}
|
||
|
|
||
|
if err := pub.publisher.Publish(pub.eventCreatedTopic, msgs...); err != nil {
|
||
|
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventCreatedTopic, err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|