package adapter import ( "context" "fmt" "gitea.dwysokinski.me/twhelp/core/internal/domain" "gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg" "github.com/ThreeDotsLabs/watermill/message" ) type ServerWatermillPublisher struct { marshaler watermillmsg.Marshaler publisher message.Publisher cmdSyncTopic string eventSyncedTopic string } func NewServerWatermillPublisher( publisher message.Publisher, marshaler watermillmsg.Marshaler, cmdSyncTopic string, eventSyncedTopic string, ) *ServerWatermillPublisher { return &ServerWatermillPublisher{ publisher: publisher, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic, eventSyncedTopic: eventSyncedTopic, } } func (pub *ServerWatermillPublisher) CmdSync(ctx context.Context, payloads ...domain.SyncServersCmdPayload) error { msgs := make([]*message.Message, 0, len(payloads)) for _, p := range payloads { msg, err := pub.marshaler.Marshal(ctx, watermillmsg.SyncServersCmdPayload{ VersionCode: p.VersionCode(), URL: p.URL(), }) if err != nil { return fmt.Errorf("%s: couldn't marshal SyncServersCmdPayload: %w", p.VersionCode(), err) } msgs = append(msgs, msg) } if err := pub.publisher.Publish(pub.cmdSyncTopic, msgs...); err != nil { return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdSyncTopic, err) } return nil } func (pub *ServerWatermillPublisher) EventSynced( ctx context.Context, payloads ...domain.ServerSyncedEventPayload, ) error { msgs := make([]*message.Message, 0, len(payloads)) for _, p := range payloads { msg, err := pub.marshaler.Marshal(ctx, watermillmsg.ServerSyncedEventPayload{ Key: p.Key(), VersionCode: p.VersionCode(), URL: p.URL(), }) if err != nil { return fmt.Errorf("%s: couldn't marshal ServerSyncedEventPayload: %w", p.Key(), err) } msgs = append(msgs, msg) } if err := pub.publisher.Publish(pub.eventSyncedTopic, msgs...); err != nil { return fmt.Errorf("couldn't publish message to topic '%s': %w", pub.eventSyncedTopic, err) } return nil }