This repository has been archived on 2024-04-06. You can view files and clone it, but cannot push or open issues or pull requests.
core-old/internal/msg/consumer_player.go
Dawid Wysokiński c86ce6ad8b
All checks were successful
continuous-integration/drone/push Build is passing
feat(msg): new integration test - CreateSnapshots (#194)
Reviewed-on: twhelp/core#194
2023-02-24 06:16:47 +00:00

130 lines
3.4 KiB
Go

package msg
import (
"context"
"fmt"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/msg/internal/model"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
//counterfeiter:generate -o internal/mock/player_service.gen.go . PlayerService
type PlayerService interface {
Refresh(ctx context.Context, key, url string) (int64, error)
}
//counterfeiter:generate -o internal/mock/player_snapshot_service.gen.go . PlayerSnapshotService
type PlayerSnapshotService interface {
Create(ctx context.Context, key string, date time.Time) error
}
type PlayerConsumer struct {
marshaler CommandEventMarshaler
publisher message.Publisher
subscriber message.Subscriber
logger watermill.LoggerAdapter
svc PlayerService
snapshotSvc PlayerSnapshotService
}
func NewPlayerConsumer(
marshaler CommandEventMarshaler,
publisher message.Publisher,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
svc PlayerService,
snapshotSvc PlayerSnapshotService,
) *PlayerConsumer {
return &PlayerConsumer{
marshaler: marshaler,
publisher: publisher,
subscriber: subscriber,
logger: logger,
svc: svc,
snapshotSvc: snapshotSvc,
}
}
func (p *PlayerConsumer) Register(router *message.Router) {
router.AddHandler(
"PlayerConsumer.refresh",
serversEventRefreshed,
p.subscriber,
playersEventRefreshed,
p.publisher,
p.refresh,
)
router.AddHandler(
"PlayerConsumer.createSnapshots",
playersCmdCreateSnapshots,
p.subscriber,
playersEventSnapshotsCreated,
p.publisher,
p.createSnapshots,
)
}
func (p *PlayerConsumer) refresh(msg *message.Message) ([]*message.Message, error) {
var payload model.ServerRefreshedEvPayload
if err := p.marshaler.Unmarshal(msg, &payload); err != nil {
p.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil, nil
}
if !payload.Open {
p.logger.Debug("server is closed, dropping message", watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
"server_key": payload.Key,
"version_code": payload.VersionCode,
})
return nil, nil
}
numPlayers, err := p.svc.Refresh(msg.Context(), payload.Key, payload.URL)
if err != nil {
return nil, fmt.Errorf("PlayerService.Refresh: %w", err)
}
ev, err := p.marshaler.Marshal(model.PlayersRefreshedEvPayload{
Key: payload.Key,
URL: payload.URL,
VersionCode: payload.VersionCode,
NumPlayers: numPlayers,
})
if err != nil {
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
return []*message.Message{ev}, nil
}
func (p *PlayerConsumer) createSnapshots(msg *message.Message) ([]*message.Message, error) {
var payload model.CreateSnapshotsCmdPayload
if err := p.marshaler.Unmarshal(msg, &payload); err != nil {
p.logger.Error("invalid payload, dropping message", err, watermill.LogFields{
"handler_name": message.HandlerNameFromCtx(msg.Context()),
})
return nil, nil
}
if err := p.snapshotSvc.Create(msg.Context(), payload.Key, payload.Date); err != nil {
return nil, fmt.Errorf("PlayerSnapshotService.Create: %w", err)
}
ev, err := p.marshaler.Marshal(model.SnapshotsCreatedEvPayload{
Key: payload.Key,
VersionCode: payload.VersionCode,
})
if err != nil {
return nil, fmt.Errorf("marshaler.Marshal: %w", err)
}
return []*message.Message{ev}, nil
}