This repository has been archived on 2024-04-06. You can view files and clone it, but cannot push or open issues or pull requests.
core-old/internal/msg/consumer_server.go
Dawid Wysokiński 0f0e0cd3ee
All checks were successful
continuous-integration/drone/push Build is passing
feat(msg): add logger to consumers (#40)
Reviewed-on: twhelp/core#40
2022-08-21 07:11:28 +00:00

187 lines
5.0 KiB
Go

package msg
import (
"context"
"fmt"
"github.com/ThreeDotsLabs/watermill"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"gitea.dwysokinski.me/twhelp/core/internal/domain"
"github.com/ThreeDotsLabs/watermill/message"
)
//counterfeiter:generate -o internal/mock/server_service.gen.go . ServerService
type ServerService interface {
Refresh(ctx context.Context, host, versionCode string) ([]domain.Server, error)
UpdateInfoAndConfig(ctx context.Context, key, url string) error
UpdateNumPlayers(ctx context.Context, key string, numPlayers int64) error
UpdateNumTribes(ctx context.Context, key string, numTribes int64) error
UpdateNumVillages(ctx context.Context, key string, numVillages int64) error
}
type ServerConsumer struct {
marshaler CommandEventMarshaler
publisher message.Publisher
subscriber message.Subscriber
logger watermill.LoggerAdapter
svc ServerService
}
func NewServerConsumer(
marshaler CommandEventMarshaler,
publisher message.Publisher,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
svc ServerService,
) *ServerConsumer {
return &ServerConsumer{
marshaler: marshaler,
publisher: publisher,
subscriber: subscriber,
logger: logger,
svc: svc,
}
}
func (s *ServerConsumer) Register(router *message.Router) {
router.AddHandler(
"ServerConsumer.refresh",
serversCmdRefresh,
s.subscriber,
serversEventRefreshed,
s.publisher,
s.refresh,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateInfoAndConfig",
serversEventRefreshed,
s.subscriber,
s.updateInfoAndConfig,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumTribes",
tribesEventRefreshed,
s.subscriber,
s.updateNumTribes,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumPlayers",
playersEventRefreshed,
s.subscriber,
s.updateNumPlayers,
)
router.AddNoPublisherHandler(
"ServerConsumer.updateNumVillages",
villagesEventRefreshed,
s.subscriber,
s.updateNumVillages,
)
}
func (s *ServerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload model.RefreshServersCmdPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil, nil
}
servers, err := s.svc.Refresh(msg.Context(), payload.Host, payload.VersionCode)
if err != nil {
return nil, fmt.Errorf("ServerService.Refresh: %w", err)
}
events := make([]*message.Message, 0, len(servers))
for _, srv := range servers {
ev, err := s.marshaler.Marshal(model.ServerRefreshedEvPayload{
Key: srv.Key,
URL: srv.URL,
Open: srv.Open,
VersionCode: srv.VersionCode,
})
if err != nil {
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
events = append(events, ev)
}
return events, nil
}
func (s *ServerConsumer) updateInfoAndConfig(msg *message.Message) error {
var payload model.ServerRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
// drop message with invalid payload
return nil
}
if !payload.Open {
s.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil
}
if err := s.svc.UpdateInfoAndConfig(msg.Context(), payload.Key, payload.URL); err != nil {
return fmt.Errorf("ServerService.UpdateInfoAndConfig: %w", err)
}
return nil
}
func (s *ServerConsumer) updateNumPlayers(msg *message.Message) error {
var payload model.PlayersRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
if err := s.svc.UpdateNumPlayers(msg.Context(), payload.Key, payload.NumPlayers); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumPlayers: %w", err)
}
return nil
}
func (s *ServerConsumer) updateNumTribes(msg *message.Message) error {
var payload model.TribesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
if err := s.svc.UpdateNumTribes(msg.Context(), payload.Key, payload.NumTribes); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumTribes: %w", err)
}
return nil
}
func (s *ServerConsumer) updateNumVillages(msg *message.Message) error {
var payload model.VillagesRefreshedEvPayload
if err := s.marshaler.Unmarshal(msg, &payload); err != nil {
s.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil
}
if err := s.svc.UpdateNumVillages(msg.Context(), payload.Key, payload.NumVillages); err != nil {
return fmt.Errorf("ServerConsumer.UpdateNumVillages: %w", err)
}
return nil
}