core/internal/port/consumer_watermill_tribe.go

102 lines
2.9 KiB
Go

package port
import (
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
"gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type TribeWatermillConsumer struct {
svc *app.TribeService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
eventServerSyncedTopic string
eventVillagesSyncedTopic string
}
func NewTribeWatermillConsumer(
svc *app.TribeService,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
marshaler watermillmsg.Marshaler,
eventServerSyncedTopic string,
eventVillagesSyncedTopic string,
) *TribeWatermillConsumer {
return &TribeWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
eventServerSyncedTopic: eventServerSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
}
}
func (c *TribeWatermillConsumer) Register(router *message.Router) {
router.AddNoPublisherHandler(
"TribeConsumer.sync",
c.eventServerSyncedTopic,
c.subscriber,
c.sync,
)
router.AddNoPublisherHandler(
"TribeConsumer.updateDominance",
c.eventVillagesSyncedTopic,
c.subscriber,
c.updateDominance,
)
}
func (c *TribeWatermillConsumer) sync(msg *message.Message) error {
var rawPayload watermillmsg.ServerSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode)
if err != nil {
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.Sync(msg.Context(), payload)
}
func (c *TribeWatermillConsumer) updateDominance(msg *message.Message) error {
var rawPayload watermillmsg.VillagesSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewVillagesSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumVillages,
rawPayload.NumPlayerVillages,
rawPayload.NumBarbarianVillages,
rawPayload.NumBonusVillages,
)
if err != nil {
c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateDominance(msg.Context(), payload)
}