117 lines
3.2 KiB
Go
117 lines
3.2 KiB
Go
package port
|
|
|
|
import (
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
)
|
|
|
|
type EnnoblementWatermillConsumer struct {
|
|
svc *app.EnnoblementService
|
|
subscriber message.Subscriber
|
|
logger watermill.LoggerAdapter
|
|
marshaler watermillmsg.Marshaler
|
|
cmdSyncTopic string
|
|
cmdCleanUpTopic string
|
|
}
|
|
|
|
func NewEnnoblementWatermillConsumer(
|
|
svc *app.EnnoblementService,
|
|
subscriber message.Subscriber,
|
|
logger watermill.LoggerAdapter,
|
|
marshaler watermillmsg.Marshaler,
|
|
cmdSyncTopic string,
|
|
cmdCleanUpTopic string,
|
|
) *EnnoblementWatermillConsumer {
|
|
return &EnnoblementWatermillConsumer{
|
|
svc: svc,
|
|
subscriber: subscriber,
|
|
logger: logger,
|
|
marshaler: marshaler,
|
|
cmdSyncTopic: cmdSyncTopic,
|
|
cmdCleanUpTopic: cmdCleanUpTopic,
|
|
}
|
|
}
|
|
|
|
func (c *EnnoblementWatermillConsumer) Register(router *message.Router) {
|
|
router.AddNoPublisherHandler(
|
|
"EnnoblementConsumer.sync",
|
|
c.cmdSyncTopic,
|
|
c.subscriber,
|
|
c.sync,
|
|
)
|
|
router.AddNoPublisherHandler(
|
|
"EnnoblementConsumer.cleanUp",
|
|
c.cmdCleanUpTopic,
|
|
c.subscriber,
|
|
c.cleanUp,
|
|
)
|
|
}
|
|
|
|
func (c *EnnoblementWatermillConsumer) sync(msg *message.Message) error {
|
|
var rawPayload watermillmsg.SyncEnnoblementsCmdPayload
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
payload, err := domain.NewSyncEnnoblementsCmdPayload(
|
|
rawPayload.ServerKey,
|
|
rawPayload.ServerURL,
|
|
rawPayload.VersionCode,
|
|
)
|
|
if err != nil {
|
|
c.logger.Error("couldn't construct domain.SyncEnnoblementsCmdPayload", err, watermill.LogFields{
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
return c.svc.Sync(msg.Context(), payload)
|
|
}
|
|
|
|
func (c *EnnoblementWatermillConsumer) cleanUp(msg *message.Message) error {
|
|
var rawPayload watermillmsg.CleanUpDataCmdPayload
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
payloadServer, err := domain.NewCleanUpDataCmdPayloadServer(
|
|
rawPayload.Server.Key,
|
|
rawPayload.Server.VersionCode,
|
|
rawPayload.Server.Open,
|
|
rawPayload.Server.Special,
|
|
rawPayload.Server.PlayerDataSyncedAt,
|
|
rawPayload.Server.PlayerSnapshotsCreatedAt,
|
|
rawPayload.Server.TribeDataSyncedAt,
|
|
rawPayload.Server.TribeSnapshotsCreatedAt,
|
|
rawPayload.Server.VillageDataSyncedAt,
|
|
rawPayload.Server.EnnoblementDataSyncedAt,
|
|
)
|
|
if err != nil {
|
|
c.logger.Error("couldn't construct domain.CleanUpDataCmdPayloadServer", err, watermill.LogFields{
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
payload, err := domain.NewCleanUpDataCmdPayload(payloadServer)
|
|
if err != nil {
|
|
c.logger.Error("couldn't construct domain.CleanUpDataCmdPayload", err, watermill.LogFields{
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
return c.svc.CleanUp(msg.Context(), payload)
|
|
}
|