core/internal/adapter/publisher_watermill_data_cl...

64 lines
1.9 KiB
Go

package adapter
import (
"context"
"fmt"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
"github.com/ThreeDotsLabs/watermill/message"
)
type DataCleanupPublisher struct {
marshaler watermillmsg.Marshaler
publisher message.Publisher
cmdCleanUpDataTopic string
}
func NewDataCleanupPublisher(
publisher message.Publisher,
marshaler watermillmsg.Marshaler,
cmdCleanUpDataTopic string,
) *DataCleanupPublisher {
return &DataCleanupPublisher{
publisher: publisher,
marshaler: marshaler,
cmdCleanUpDataTopic: cmdCleanUpDataTopic,
}
}
func (pub *DataCleanupPublisher) CmdCleanUp(
ctx context.Context,
payloads ...domain.CleanUpDataCmdPayload,
) error {
msgs := make([]*message.Message, 0, len(payloads))
for _, p := range payloads {
msg, err := pub.marshaler.Marshal(ctx, watermillmsg.CleanUpDataCmdPayload{
Server: watermillmsg.CleanUpDataCmdPayloadServer{
Key: p.Server().Key(),
VersionCode: p.Server().VersionCode(),
Open: p.Server().Open(),
Special: p.Server().Special(),
PlayerDataSyncedAt: p.Server().PlayerDataSyncedAt(),
PlayerSnapshotsCreatedAt: p.Server().PlayerSnapshotsCreatedAt(),
TribeDataSyncedAt: p.Server().TribeDataSyncedAt(),
TribeSnapshotsCreatedAt: p.Server().TribeSnapshotsCreatedAt(),
VillageDataSyncedAt: p.Server().VillageDataSyncedAt(),
EnnoblementDataSyncedAt: p.Server().EnnoblementDataSyncedAt(),
},
})
if err != nil {
return fmt.Errorf("%s: couldn't marshal CleanUpDataCmdPayload: %w", p.Server().Key(), err)
}
msgs = append(msgs, msg)
}
if err := pub.publisher.Publish(pub.cmdCleanUpDataTopic, msgs...); err != nil {
return fmt.Errorf("couldn't publish messages to topic '%s': %w", pub.cmdCleanUpDataTopic, err)
}
return nil
}