dcbot/internal/service/monitor.go
Dawid Wysokiński c487800200
All checks were successful
continuous-integration/drone/push Build is passing
feat: add a new command - monitor list (#28)
Reviewed-on: #28
2022-10-27 13:18:58 +00:00

425 lines
10 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"gitea.dwysokinski.me/twhelp/dcbot/internal/domain"
"gitea.dwysokinski.me/twhelp/dcbot/internal/twhelp"
)
//counterfeiter:generate -o internal/mock/monitor_repository.gen.go . MonitorRepository
type MonitorRepository interface {
Create(ctx context.Context, params domain.CreateMonitorParams) (domain.Monitor, error)
List(ctx context.Context, groupID string) ([]domain.Monitor, error)
Get(ctx context.Context, id string) (domain.Monitor, error)
Delete(ctx context.Context, id string) error
}
//counterfeiter:generate -o internal/mock/group_reader.gen.go . GroupReader
type GroupReader interface {
Get(ctx context.Context, id, serverID string) (domain.Group, error)
List(ctx context.Context, params domain.ListGroupsParams) ([]domain.Group, error)
}
type Monitor struct {
repo MonitorRepository
client TWHelpClient
groupSvc GroupReader
maxMonitorsPerGroup int
ennoblementsMu sync.Mutex // ennoblementsMu is used by Monitor.fetchEnnoblements
ennoblementsSince map[string]time.Time // ennoblementsSince is used by Monitor.fetchEnnoblements
}
func NewMonitor(
repo MonitorRepository,
groupSvc GroupReader,
client TWHelpClient,
maxMonitorsPerGroup int,
) *Monitor {
return &Monitor{
repo: repo,
client: client,
groupSvc: groupSvc,
maxMonitorsPerGroup: maxMonitorsPerGroup,
ennoblementsSince: make(map[string]time.Time),
}
}
func (m *Monitor) Create(ctx context.Context, groupID, serverID, tribeTag string) (domain.Monitor, error) {
// check if group exists
group, err := m.groupSvc.Get(ctx, groupID, serverID)
if err != nil {
if errors.Is(err, domain.GroupNotFoundError{ID: groupID}) {
return domain.Monitor{}, domain.GroupDoesNotExistError{ID: groupID}
}
return domain.Monitor{}, fmt.Errorf("GroupService.Get: %w", err)
}
// check limit
monitors, err := m.repo.List(ctx, group.ID)
if err != nil {
return domain.Monitor{}, fmt.Errorf("MonitorRepository.List: %w", err)
}
if len(monitors) >= m.maxMonitorsPerGroup {
return domain.Monitor{}, domain.MonitorLimitReachedError{
Current: len(monitors),
Limit: m.maxMonitorsPerGroup,
}
}
// check if tribe exists
tribe, err := m.client.GetTribeByTag(ctx, group.VersionCode, group.ServerKey, tribeTag)
if err != nil {
var apiErr twhelp.APIError
if !errors.As(err, &apiErr) {
return domain.Monitor{}, fmt.Errorf("TWHelpClient.GetServer: %w", err)
}
return domain.Monitor{}, domain.TribeDoesNotExistError{
Tag: tribeTag,
}
}
if !tribe.DeletedAt.IsZero() {
return domain.Monitor{}, domain.TribeDoesNotExistError{
Tag: tribeTag,
}
}
params, err := domain.NewCreateMonitorParams(group.ID, tribe.ID)
if err != nil {
return domain.Monitor{}, fmt.Errorf("domain.NewCreateMonitorParams: %w", err)
}
monitor, err := m.repo.Create(ctx, params)
if err != nil {
return domain.Monitor{}, fmt.Errorf("MonitorRepository.Create: %w", err)
}
return monitor, nil
}
type getTribeResult struct {
monitor domain.Monitor
tribe twhelp.Tribe
err error
}
func (m *Monitor) List(ctx context.Context, groupID, serverID string) ([]domain.MonitorWithTribe, error) {
group, err := m.groupSvc.Get(ctx, groupID, serverID)
if err != nil {
return nil, fmt.Errorf("GroupService.Get: %w", err)
}
monitors, err := m.repo.List(ctx, group.ID)
if err != nil {
return nil, fmt.Errorf("MonitorRepository.Delete: %w", err)
}
var wg sync.WaitGroup
ch := make(chan getTribeResult)
for _, monitor := range monitors {
wg.Add(1)
go func(monitor domain.Monitor) {
res := getTribeResult{
monitor: monitor,
}
res.tribe, res.err = m.client.GetTribeByID(
ctx,
group.VersionCode,
group.ServerKey,
monitor.TribeID,
)
ch <- res
}(monitor)
}
go func() {
wg.Wait()
close(ch)
}()
var firstErr error
results := make([]domain.MonitorWithTribe, 0, len(monitors))
for res := range ch {
wg.Done()
if res.err != nil && firstErr == nil {
firstErr = fmt.Errorf("couldn't load tribe (monitorID=%s): %w", res.monitor.ID, res.err)
continue
}
results = append(results, domain.MonitorWithTribe{
Monitor: res.monitor,
Tribe: domain.TribeMeta{
ID: res.tribe.ID,
Name: res.tribe.Name,
Tag: res.tribe.Tag,
ProfileURL: res.tribe.ProfileURL,
},
})
}
if firstErr != nil {
return nil, firstErr
}
return results, nil
}
func (m *Monitor) Delete(ctx context.Context, id, serverID string) error {
monitor, err := m.repo.Get(ctx, id)
if err != nil {
return fmt.Errorf("MonitorRepository.Get: %w", err)
}
_, err = m.groupSvc.Get(ctx, monitor.GroupID, serverID)
if err != nil {
return fmt.Errorf("GroupService.Get: %w", err)
}
if err = m.repo.Delete(ctx, id); err != nil {
return fmt.Errorf("MonitorRepository.Delete: %w", err)
}
return nil
}
func (m *Monitor) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) {
groups, err := m.groupSvc.List(ctx, domain.ListGroupsParams{})
if err != nil {
return nil, fmt.Errorf("GroupService.List: %w", err)
}
res, err := m.fetchEnnoblements(ctx, groups)
if err != nil {
return nil, err
}
//nolint:prealloc
var notifications []domain.EnnoblementNotification
for _, r := range res {
for _, g := range groups {
if g.ServerKey != r.serverKey || g.VersionCode != r.versionCode {
continue
}
if g.ChannelGains == "" && g.ChannelLosses == "" {
continue
}
ns, err := m.executeGroup(ctx, g, r.ennoblements)
if err != nil {
// TODO: log this error
continue
}
notifications = append(notifications, ns...)
}
}
return notifications, nil
}
func (m *Monitor) executeGroup(
ctx context.Context,
g domain.Group,
ennoblements []twhelp.Ennoblement,
) ([]domain.EnnoblementNotification, error) {
monitors, err := m.repo.List(ctx, g.ID)
if err != nil {
return nil, fmt.Errorf("MonitorRepository.List: %w", err)
}
//nolint:prealloc
var notifications []domain.EnnoblementNotification
for _, e := range ennoblements {
if isInternal(e, monitors) {
continue
}
if g.ChannelGains != "" && isGain(e, monitors) {
notifications = append(notifications, domain.EnnoblementNotification{
Type: domain.EnnoblementNotificationTypeGain,
ServerID: g.ServerID,
ChannelID: g.ChannelGains,
Ennoblement: ennoblementToDomainModel(e),
})
}
if g.ChannelLosses != "" && isLoss(e, monitors) {
notifications = append(notifications, domain.EnnoblementNotification{
Type: domain.EnnoblementNotificationTypeLoss,
ServerID: g.ServerID,
ChannelID: g.ChannelLosses,
Ennoblement: ennoblementToDomainModel(e),
})
}
}
return notifications, nil
}
type listEnnoblementsResult struct {
versionCode string
serverKey string
ennoblements []twhelp.Ennoblement
err error
}
type fetchEnnoblementsResult struct {
versionCode string
serverKey string
ennoblements []twhelp.Ennoblement
}
func (m *Monitor) fetchEnnoblements(ctx context.Context, groups []domain.Group) ([]fetchEnnoblementsResult, error) {
m.ennoblementsMu.Lock()
defer m.ennoblementsMu.Unlock()
var wg sync.WaitGroup
ch := make(chan listEnnoblementsResult)
ennoblementsSince := m.ennoblementsSince
skip := make(map[string]struct{}, len(ennoblementsSince))
for _, g := range groups {
if g.ChannelGains == "" && g.ChannelLosses == "" {
continue
}
key := g.VersionCode + ":" + g.ServerKey
if _, ok := skip[key]; ok {
continue
}
skip[key] = struct{}{}
since := ennoblementsSince[key]
if since.IsZero() {
since = time.Now().Add(-1 * time.Minute)
}
wg.Add(1)
go func(g domain.Group, since time.Time) {
res := listEnnoblementsResult{
versionCode: g.VersionCode,
serverKey: g.ServerKey,
}
res.ennoblements, res.err = m.client.ListEnnoblements(
ctx,
g.VersionCode,
g.ServerKey,
twhelp.ListEnnoblementsQueryParams{
Since: since,
Sort: []twhelp.ListEnnoblementsSort{twhelp.ListEnnoblementsSortCreatedAtASC},
},
)
ch <- res
}(g, since)
}
go func() {
wg.Wait()
close(ch)
}()
// reinitialize ennoblementsSince
m.ennoblementsSince = make(map[string]time.Time)
//nolint:prealloc
var results []fetchEnnoblementsResult
for res := range ch {
wg.Done()
key := res.versionCode + ":" + res.serverKey
if l := len(res.ennoblements); l > 0 {
m.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second)
} else {
m.ennoblementsSince[key] = ennoblementsSince[key]
}
if res.err != nil {
// TODO: log this error
continue
}
results = append(results, fetchEnnoblementsResult{
versionCode: res.versionCode,
serverKey: res.serverKey,
ennoblements: res.ennoblements,
})
}
return results, nil
}
func isInternal(e twhelp.Ennoblement, monitors []domain.Monitor) bool {
var n, o bool
for _, m := range monitors {
if m.TribeID == e.NewOwner.Player.Tribe.Tribe.ID {
n = true
}
if m.TribeID == e.OldOwner.Player.Tribe.Tribe.ID {
o = true
}
}
return n && o
}
func isGain(e twhelp.Ennoblement, monitors []domain.Monitor) bool {
var n bool
for _, m := range monitors {
if m.TribeID == e.NewOwner.Player.Tribe.Tribe.ID {
n = true
break
}
}
return n && e.NewOwner.Player.Tribe.Tribe.ID != e.OldOwner.Player.Tribe.Tribe.ID
}
func isLoss(e twhelp.Ennoblement, monitors []domain.Monitor) bool {
var o bool
for _, m := range monitors {
if m.TribeID == e.OldOwner.Player.Tribe.Tribe.ID {
o = true
break
}
}
return o && e.NewOwner.Player.Tribe.Tribe.ID != e.OldOwner.Player.Tribe.Tribe.ID
}
func ennoblementToDomainModel(e twhelp.Ennoblement) domain.Ennoblement {
return domain.Ennoblement{
ID: e.ID,
Village: domain.VillageMeta(e.Village),
NewOwner: domain.NullPlayerMeta{
Player: domain.PlayerMeta{
ID: e.NewOwner.Player.ID,
Name: e.NewOwner.Player.Name,
ProfileURL: e.NewOwner.Player.ProfileURL,
Tribe: domain.NullTribeMeta{
Tribe: domain.TribeMeta(e.NewOwner.Player.Tribe.Tribe),
Valid: e.NewOwner.Player.Tribe.Valid,
},
},
Valid: e.NewOwner.Valid,
},
OldOwner: domain.NullPlayerMeta{
Player: domain.PlayerMeta{
ID: e.OldOwner.Player.ID,
Name: e.OldOwner.Player.Name,
ProfileURL: e.OldOwner.Player.ProfileURL,
Tribe: domain.NullTribeMeta{
Tribe: domain.TribeMeta(e.OldOwner.Player.Tribe.Tribe),
Valid: e.OldOwner.Player.Tribe.Valid,
},
},
Valid: e.OldOwner.Valid,
},
CreatedAt: e.CreatedAt,
}
}