2023-12-23 09:54:47 +00:00
|
|
|
package port
|
|
|
|
|
|
|
|
import (
|
|
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/app"
|
|
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/domain"
|
2024-01-10 06:39:09 +00:00
|
|
|
"gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg"
|
2023-12-23 09:54:47 +00:00
|
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ServerWatermillConsumer struct {
|
2024-01-16 06:28:03 +00:00
|
|
|
svc *app.ServerService
|
|
|
|
subscriber message.Subscriber
|
|
|
|
logger watermill.LoggerAdapter
|
|
|
|
marshaler watermillmsg.Marshaler
|
|
|
|
cmdSyncTopic string
|
|
|
|
eventServerSyncedTopic string
|
|
|
|
eventTribesSyncedTopic string
|
|
|
|
eventPlayersSyncedTopic string
|
|
|
|
eventVillagesSyncedTopic string
|
|
|
|
eventEnnoblementsSyncedTopic string
|
|
|
|
eventTribeSnapshotsCreatedTopic string
|
|
|
|
eventPlayerSnapshotsCreatedTopic string
|
2023-12-23 09:54:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewServerWatermillConsumer(
|
|
|
|
svc *app.ServerService,
|
|
|
|
subscriber message.Subscriber,
|
|
|
|
logger watermill.LoggerAdapter,
|
|
|
|
marshaler watermillmsg.Marshaler,
|
|
|
|
cmdSyncTopic string,
|
2023-12-25 11:04:12 +00:00
|
|
|
eventServerSyncedTopic string,
|
2023-12-30 09:10:03 +00:00
|
|
|
eventTribesSyncedTopic string,
|
2024-01-01 10:53:59 +00:00
|
|
|
eventPlayersSyncedTopic string,
|
2024-01-02 10:47:02 +00:00
|
|
|
eventVillagesSyncedTopic string,
|
2024-01-04 10:44:36 +00:00
|
|
|
eventEnnoblementsSyncedTopic string,
|
2024-01-15 07:00:08 +00:00
|
|
|
eventTribeSnapshotsCreatedTopic string,
|
2024-01-16 06:28:03 +00:00
|
|
|
eventPlayerSnapshotsCreatedTopic string,
|
2023-12-23 09:54:47 +00:00
|
|
|
) *ServerWatermillConsumer {
|
|
|
|
return &ServerWatermillConsumer{
|
2024-01-16 06:28:03 +00:00
|
|
|
svc: svc,
|
|
|
|
subscriber: subscriber,
|
|
|
|
logger: logger,
|
|
|
|
marshaler: marshaler,
|
|
|
|
cmdSyncTopic: cmdSyncTopic,
|
|
|
|
eventServerSyncedTopic: eventServerSyncedTopic,
|
|
|
|
eventTribesSyncedTopic: eventTribesSyncedTopic,
|
|
|
|
eventPlayersSyncedTopic: eventPlayersSyncedTopic,
|
|
|
|
eventVillagesSyncedTopic: eventVillagesSyncedTopic,
|
|
|
|
eventEnnoblementsSyncedTopic: eventEnnoblementsSyncedTopic,
|
|
|
|
eventTribeSnapshotsCreatedTopic: eventTribeSnapshotsCreatedTopic,
|
|
|
|
eventPlayerSnapshotsCreatedTopic: eventPlayerSnapshotsCreatedTopic,
|
2023-12-23 09:54:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) Register(router *message.Router) {
|
|
|
|
router.AddNoPublisherHandler("ServerConsumer.sync", c.cmdSyncTopic, c.subscriber, c.sync)
|
2023-12-25 11:04:12 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.syncConfigAndInfo",
|
|
|
|
c.eventServerSyncedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.syncConfigAndInfo,
|
|
|
|
)
|
2023-12-30 09:10:03 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updateNumTribes",
|
|
|
|
c.eventTribesSyncedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updateNumTribes,
|
|
|
|
)
|
2024-01-01 10:53:59 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updateNumPlayers",
|
|
|
|
c.eventPlayersSyncedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updateNumPlayers,
|
|
|
|
)
|
2024-01-02 10:47:02 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updateNumVillages",
|
|
|
|
c.eventVillagesSyncedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updateNumVillages,
|
|
|
|
)
|
2024-01-04 10:44:36 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updateEnnoblementDataSyncedAt",
|
|
|
|
c.eventEnnoblementsSyncedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updateEnnoblementDataSyncedAt,
|
|
|
|
)
|
2024-01-15 07:00:08 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updateTribeSnapshotsCreatedAt",
|
|
|
|
c.eventTribeSnapshotsCreatedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updateTribeSnapshotsCreatedAt,
|
|
|
|
)
|
2024-01-16 06:28:03 +00:00
|
|
|
router.AddNoPublisherHandler(
|
|
|
|
"ServerConsumer.updatePlayerSnapshotsCreatedAt",
|
|
|
|
c.eventPlayerSnapshotsCreatedTopic,
|
|
|
|
c.subscriber,
|
|
|
|
c.updatePlayerSnapshotsCreatedAt,
|
|
|
|
)
|
2023-12-23 09:54:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) sync(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.SyncServersCmdPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-12-27 08:07:40 +00:00
|
|
|
payload, err := domain.NewSyncServersCmdPayload(rawPayload.VersionCode, rawPayload.URL)
|
2023-12-23 09:54:47 +00:00
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.SyncServersCmdPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.Sync(msg.Context(), payload)
|
|
|
|
}
|
2023-12-25 11:04:12 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) syncConfigAndInfo(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.ServerSyncedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.SyncConfigAndInfo(msg.Context(), payload)
|
|
|
|
}
|
2023-12-30 09:10:03 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.TribesSyncedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewTribesSyncedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.ServerURL,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
rawPayload.NumTribes,
|
|
|
|
)
|
|
|
|
if err != nil {
|
2024-01-01 10:53:59 +00:00
|
|
|
c.logger.Error("couldn't construct domain.TribesSyncedEventPayload", err, watermill.LogFields{
|
2023-12-30 09:10:03 +00:00
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdateNumTribes(msg.Context(), payload)
|
|
|
|
}
|
2024-01-01 10:53:59 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updateNumPlayers(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.PlayersSyncedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewPlayersSyncedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.ServerURL,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
rawPayload.NumPlayers,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.PlayersSyncedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdateNumPlayers(msg.Context(), payload)
|
|
|
|
}
|
2024-01-02 10:47:02 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updateNumVillages(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.VillagesSyncedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewVillagesSyncedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.ServerURL,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
rawPayload.NumVillages,
|
|
|
|
rawPayload.NumPlayerVillages,
|
|
|
|
rawPayload.NumBarbarianVillages,
|
|
|
|
rawPayload.NumBonusVillages,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdateNumVillages(msg.Context(), payload)
|
|
|
|
}
|
2024-01-04 10:44:36 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updateEnnoblementDataSyncedAt(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.EnnoblementsSyncedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewEnnoblementsSyncedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.ServerURL,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.EnnoblementsSyncedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdateEnnoblementDataSyncedAt(msg.Context(), payload)
|
|
|
|
}
|
2024-01-15 07:00:08 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updateTribeSnapshotsCreatedAt(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.SnapshotsCreatedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewSnapshotsCreatedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdateTribeSnapshotsCreatedAt(msg.Context(), payload)
|
|
|
|
}
|
2024-01-16 06:28:03 +00:00
|
|
|
|
|
|
|
func (c *ServerWatermillConsumer) updatePlayerSnapshotsCreatedAt(msg *message.Message) error {
|
|
|
|
var rawPayload watermillmsg.SnapshotsCreatedEventPayload
|
|
|
|
|
|
|
|
if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil {
|
|
|
|
c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := domain.NewSnapshotsCreatedEventPayload(
|
|
|
|
rawPayload.ServerKey,
|
|
|
|
rawPayload.VersionCode,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("couldn't construct domain.SnapshotsCreatedEventPayload", err, watermill.LogFields{
|
|
|
|
"handler": message.HandlerNameFromCtx(msg.Context()),
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.svc.UpdatePlayerSnapshotsCreatedAt(msg.Context(), payload)
|
|
|
|
}
|