package port import ( "gitea.dwysokinski.me/twhelp/corev3/internal/app" "gitea.dwysokinski.me/twhelp/corev3/internal/domain" "gitea.dwysokinski.me/twhelp/corev3/internal/watermillmsg" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) type ServerWatermillConsumer struct { svc *app.ServerService subscriber message.Subscriber logger watermill.LoggerAdapter marshaler watermillmsg.Marshaler cmdSyncTopic string eventServerSyncedTopic string eventTribesSyncedTopic string } func NewServerWatermillConsumer( svc *app.ServerService, subscriber message.Subscriber, logger watermill.LoggerAdapter, marshaler watermillmsg.Marshaler, cmdSyncTopic string, eventServerSyncedTopic string, eventTribesSyncedTopic string, ) *ServerWatermillConsumer { return &ServerWatermillConsumer{ svc: svc, subscriber: subscriber, logger: logger, marshaler: marshaler, cmdSyncTopic: cmdSyncTopic, eventServerSyncedTopic: eventServerSyncedTopic, eventTribesSyncedTopic: eventTribesSyncedTopic, } } func (c *ServerWatermillConsumer) Register(router *message.Router) { router.AddNoPublisherHandler("ServerConsumer.sync", c.cmdSyncTopic, c.subscriber, c.sync) router.AddNoPublisherHandler( "ServerConsumer.syncConfigAndInfo", c.eventServerSyncedTopic, c.subscriber, c.syncConfigAndInfo, ) router.AddNoPublisherHandler( "ServerConsumer.updateNumTribes", c.eventTribesSyncedTopic, c.subscriber, c.updateNumTribes, ) } func (c *ServerWatermillConsumer) sync(msg *message.Message) error { var rawPayload watermillmsg.SyncServersCmdPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewSyncServersCmdPayload(rawPayload.VersionCode, rawPayload.URL) if err != nil { c.logger.Error("couldn't construct domain.SyncServersCmdPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.svc.Sync(msg.Context(), payload) } func (c *ServerWatermillConsumer) syncConfigAndInfo(msg *message.Message) error { var rawPayload watermillmsg.ServerSyncedEventPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewServerSyncedEventPayload(rawPayload.Key, rawPayload.URL, rawPayload.VersionCode) if err != nil { c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.svc.SyncConfigAndInfo(msg.Context(), payload) } func (c *ServerWatermillConsumer) updateNumTribes(msg *message.Message) error { var rawPayload watermillmsg.TribesSyncedEventPayload if err := c.marshaler.Unmarshal(msg, &rawPayload); err != nil { c.logger.Error("couldn't unmarshal payload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } payload, err := domain.NewTribesSyncedEventPayload( rawPayload.ServerKey, rawPayload.ServerURL, rawPayload.VersionCode, rawPayload.NumTribes, ) if err != nil { c.logger.Error("couldn't construct domain.ServerSyncedEventPayload", err, watermill.LogFields{ "handler": message.HandlerNameFromCtx(msg.Context()), }) return nil } return c.svc.UpdateNumTribes(msg.Context(), payload) }