package port import ( "gitea.dwysokinski.me/twhelp/corev3/internal/app" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" "gitea.dwysokinski.me/twhelp/corev3/internal/watermill/watermillmsg" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) type TribeWatermillConsumer struct { svc *app.TribeService snapshotSvc *app.TribeSnapshotService subscriber message.Subscriber logger watermill.LoggerAdapter marshaler watermillmsg.Marshaler eventServerSyncedTopic string eventVillagesSyncedTopic string cmdCreateSnapshotsTopic string cmdCleanUpTopic string } func NewTribeWatermillConsumer( svc *app.TribeService, snapshotSvc *app.TribeSnapshotService, subscriber message.Subscriber, logger watermill.LoggerAdapter, marshaler watermillmsg.Marshaler, eventServerSyncedTopic string, eventVillagesSyncedTopic string, cmdCreateSnapshotsTopic string, cmdCleanUpTopic string, ) *TribeWatermillConsumer { return &TribeWatermillConsumer{ svc: svc, snapshotSvc: snapshotSvc, subscriber: subscriber, logger: logger, marshaler: marshaler, eventServerSyncedTopic: eventServerSyncedTopic, eventVillagesSyncedTopic: eventVillagesSyncedTopic, cmdCreateSnapshotsTopic: cmdCreateSnapshotsTopic, cmdCleanUpTopic: cmdCleanUpTopic, } } func (c *TribeWatermillConsumer) Register(router *message.Router) { router.AddNoPublisherHandler( "TribeConsumer.sync", c.eventServerSyncedTopic, c.subscriber, c.sync, ) router.AddNoPublisherHandler( "TribeConsumer.updateDominance", c.eventVillagesSyncedTopic, c.subscriber, c.updateDominance, ) router.AddNoPublisherHandler( "TribeConsumer.createSnapshots", c.cmdCreateSnapshotsTopic, c.subscriber, c.createSnapshots, ) router.AddNoPublisherHandler( "TribeConsumer.cleanUp", c.cmdCleanUpTopic, c.subscriber, c.cleanUp, ) } func (c *TribeWatermillConsumer) sync(msg *message.Message) error { var rawPayload watermillmsg.ServerSyncedEventPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode) if err != nil { c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.svc.Sync(msg.Context(), payload) } func (c *TribeWatermillConsumer) updateDominance(msg *message.Message) error { var rawPayload watermillmsg.VillagesSyncedEventPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewVillagesSyncedEventPayload( rawPayload.ServerKey, rawPayload.ServerURL, rawPayload.VersionCode, rawPayload.NumVillages, rawPayload.NumPlayerVillages, rawPayload.NumBarbarianVillages, rawPayload.NumBonusVillages, ) if err != nil { c.logger.Error("couldn't construct domain.VillagesSyncedEventPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.svc.UpdateDominance(msg.Context(), payload) } func (c *TribeWatermillConsumer) createSnapshots(msg *message.Message) error { var rawPayload watermillmsg.CreateSnapshotsCmdPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewCreateSnapshotsCmdPayload( rawPayload.ServerKey, rawPayload.VersionCode, rawPayload.VersionTimezone, rawPayload.Date, ) if err != nil { c.logger.Error("couldn't construct domain.CreateSnapshotsCmdPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.snapshotSvc.Create(msg.Context(), payload) } func (c *TribeWatermillConsumer) cleanUp(msg *message.Message) error { var rawPayload watermillmsg.CleanUpDataCmdPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payloadServer, err := domain.NewCleanUpDataCmdPayloadServer( rawPayload.Server.Key, rawPayload.Server.VersionCode, rawPayload.Server.Open, rawPayload.Server.Special, rawPayload.Server.PlayerDataSyncedAt, rawPayload.Server.PlayerSnapshotsCreatedAt, rawPayload.Server.TribeDataSyncedAt, rawPayload.Server.TribeSnapshotsCreatedAt, rawPayload.Server.VillageDataSyncedAt, rawPayload.Server.EnnoblementDataSyncedAt, ) if err != nil { c.logger.Error("couldn't construct domain.CleanUpDataCmdPayloadServer", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewCleanUpDataCmdPayload(payloadServer) if err != nil { c.logger.Error("couldn't construct domain.CleanUpDataCmdPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.snapshotSvc.CleanUp(msg.Context(), payload) }