core/internal/adapter/publisher_watermill_snapsho...

84 lines
2.2 KiB
Go

package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type SnapshotWatermillPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
cmdCreateTopic string
eventCreatedTopic string
}
func NewSnapshotWatermillPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
cmdCreateTopic string,
eventCreatedTopic string,
) *SnapshotWatermillPublisher {
return &SnapshotWatermillPublisher{
publisher: publisher,
marshaler: marshaler,
cmdCreateTopic: cmdCreateTopic,
eventCreatedTopic: eventCreatedTopic,
}
}
func (pub *SnapshotWatermillPublisher) CmdCreate(
ctx context.Context,
payloads ...domain.CreateSnapshotsCmdPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CreateSnapshotsCmdPayload{
ServerKey: p.ServerKey(),
VersionCode: p.VersionCode(),
VersionTimezone: p.VersionTimezone(),
Date: p.Date(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal CreateSnapshotsCmdPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.cmdCreateTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCreateTopic, err)
}
return nil
}
func (pub *SnapshotWatermillPublisher) EventCreated(
ctx context.Context,
payloads ...domain.SnapshotsCreatedEventPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SnapshotsCreatedEventPayload{
ServerKey: p.ServerKey(),
VersionCode: p.VersionCode(),
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal SnapshotsCreatedEventPayload: %w", p.ServerKey(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.eventCreatedTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventCreatedTopic, err)
}
return nil
}