core/internal/port/consumer_watermill_server.go

297 lines
9.1 KiB
Go

package port
import (
"gitea.dwysokinski.me/twhelp/core/internal/app"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"gitea.dwysokinski.me/twhelp/core/internal/watermill/watermillmsg"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type ServerWatermillConsumer struct {
svc *app.ServerService
subscriber message.Subscriber
logger watermill.LoggerAdapter
marshaler watermillmsg.Marshaler
cmdSyncTopic string
eventServerSyncedTopic string
eventTribesSyncedTopic string
eventPlayersSyncedTopic string
eventVillagesSyncedTopic string
eventEnnoblementsSyncedTopic string
eventTribeSnapshotsCreatedTopic string
eventPlayerSnapshotsCreatedTopic string
}
func NewServerWatermillConsumer(
svc *app.ServerService,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
marshaler watermillmsg.Marshaler,
cmdSyncTopic string,
eventServerSyncedTopic string,
eventTribesSyncedTopic string,
eventPlayersSyncedTopic string,
eventVillagesSyncedTopic string,
eventEnnoblementsSyncedTopic string,
eventTribeSnapshotsCreatedTopic string,
eventPlayerSnapshotsCreatedTopic string,
) *ServerWatermillConsumer {
return &ServerWatermillConsumer{
svc: svc,
subscriber: subscriber,
logger: logger,
marshaler: marshaler,
cmdSyncTopic: cmdSyncTopic,
eventServerSyncedTopic: eventServerSyncedTopic,
eventTribesSyncedTopic: eventTribesSyncedTopic,
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic,
eventTribeSnapshotsCreatedTopic: eventTribeSnapshotsCreatedTopic,
eventPlayerSnapshotsCreatedTopic: eventPlayerSnapshotsCreatedTopic,
}
}
func (c *ServerWatermillConsumer) Register(router *message.Router) {
router.AddNoPublisherHandler("ServerConsumer.sync", c.cmdSyncTopic, c.subscriber, c.sync)
router.AddNoPublisherHandler(
"ServerConsumer.syncConfigAndInfo",
c.eventServerSyncedTopic,
c.subscriber,
c.syncConfigAndInfo,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumTribes",
c.eventTribesSyncedTopic,
c.subscriber,
c.updateNumTribes,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumPlayers",
c.eventPlayersSyncedTopic,
c.subscriber,
c.updateNumPlayers,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumVillages",
c.eventVillagesSyncedTopic,
c.subscriber,
c.updateNumVillages,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateEnnoblementDataSyncedAt",
c.eventEnnoblementsSyncedTopic,
c.subscriber,
c.updateEnnoblementDataSyncedAt,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateTribeSnapshotsCreatedAt",
c.eventTribeSnapshotsCreatedTopic,
c.subscriber,
c.updateTribeSnapshotsCreatedAt,
)
router.AddNoPublisherHandler(
"ServerConsumer.updatePlayerSnapshotsCreatedAt",
c.eventPlayerSnapshotsCreatedTopic,
c.subscriber,
c.updatePlayerSnapshotsCreatedAt,
)
}
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
var rawPayload watermillmsg.SyncServersCmdPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewSyncServersCmdPayload(rawPayload.VersionCode, rawPayload.URL)
if err != nil {
c.logger.Error("couldn't construct domain.SyncServersCmdPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.Sync(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) syncConfigAndInfo(msg *message.Message) error {
var rawPayload watermillmsg.ServerSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode)
if err != nil {
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.SyncConfigAndInfo(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error {
var rawPayload watermillmsg.TribesSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewTribesSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumTribes,
)
if err != nil {
c.logger.Error("couldn't construct domain.TribesSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateNumTribes(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error {
var rawPayload watermillmsg.PlayersSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewPlayersSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumPlayers,
)
if err != nil {
c.logger.Error("couldn't construct domain.PlayersSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateNumPlayers(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateNumVillages(msg *message.Message) error {
var rawPayload watermillmsg.VillagesSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewVillagesSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
rawPayload.NumVillages,
rawPayload.NumPlayerVillages,
rawPayload.NumBarbarianVillages,
rawPayload.NumBonusVillages,
)
if err != nil {
c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateNumVillages(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateEnnoblementDataSyncedAt(msg *message.Message) error {
var rawPayload watermillmsg.EnnoblementsSyncedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewEnnoblementsSyncedEventPayload(
rawPayload.ServerKey,
rawPayload.ServerURL,
rawPayload.VersionCode,
)
if err != nil {
c.logger.Error("couldn't construct domain.EnnoblementsSyncedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateEnnoblementDataSyncedAt(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updateTribeSnapshotsCreatedAt(msg *message.Message) error {
var rawPayload watermillmsg.SnapshotsCreatedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewSnapshotsCreatedEventPayload(
rawPayload.ServerKey,
rawPayload.VersionCode,
)
if err != nil {
c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdateTribeSnapshotsCreatedAt(msg.Context(), payload)
}
func (c *ServerWatermillConsumer) updatePlayerSnapshotsCreatedAt(msg *message.Message) error {
var rawPayload watermillmsg.SnapshotsCreatedEventPayload
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
payload, err := domain.NewSnapshotsCreatedEventPayload(
rawPayload.ServerKey,
rawPayload.VersionCode,
)
if err != nil {
c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{
"handler": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
return c.svc.UpdatePlayerSnapshotsCreatedAt(msg.Context(), payload)
}