dcbot/internal/service/group.go

508 lines
13 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"gitea.dwysokinski.me/twhelp/dcbot/internal/domain"
"go.uber.org/zap"
)
//counterfeiter:generate -o internal/mock/group_repository.gen.go . GroupRepository
type GroupRepository interface {
Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error)
Update(ctx context.Context, id string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error)
AddMonitor(ctx context.Context, id string, tribeID int64) (domain.GroupWithMonitors, error)
DeleteMonitors(ctx context.Context, id string, monitorID ...string) (domain.GroupWithMonitors, error)
List(ctx context.Context, params domain.ListGroupsParams) ([]domain.GroupWithMonitors, error)
Get(ctx context.Context, id string) (domain.GroupWithMonitors, error)
Delete(ctx context.Context, id string) error
DeleteMany(ctx context.Context, id ...string) error
}
type Group struct {
repo GroupRepository
twhelpSvc TWHelpService
logger *zap.Logger
maxGroupsPerServer int
maxMonitorsPerGroup int
ennoblementsMu sync.Mutex // ennoblementsMu is used by listEnnoblements
ennoblementsSince map[string]time.Time // ennoblementsSince is used by listEnnoblements
}
func NewGroup(repo GroupRepository, twhelpSvc TWHelpService, logger *zap.Logger, maxGroupsPerServer, maxMonitorsPerGroup int) *Group {
return &Group{
repo: repo,
twhelpSvc: twhelpSvc,
logger: logger,
maxGroupsPerServer: maxGroupsPerServer,
maxMonitorsPerGroup: maxMonitorsPerGroup,
}
}
func (g *Group) Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
ServerIDs: []string{params.ServerID()},
})
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.List: %w", err)
}
if len(groups) >= g.maxGroupsPerServer {
return domain.GroupWithMonitors{}, domain.GroupLimitReachedError{
Current: len(groups),
Limit: g.maxGroupsPerServer,
}
}
if _, err = g.twhelpSvc.GetOpenServer(ctx, params.VersionCode(), params.ServerKey()); err != nil {
return domain.GroupWithMonitors{}, err
}
group, err := g.repo.Create(ctx, params)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Create: %w", err)
}
return group, nil
}
func (g *Group) AddTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) {
// check if group exists
groupBeforeUpdate, err := g.Get(ctx, id, serverID)
if err != nil {
if errors.Is(err, domain.GroupNotFoundError{ID: id}) {
return domain.GroupWithMonitors{}, domain.GroupDoesNotExistError{ID: id}
}
return domain.GroupWithMonitors{}, err
}
// check limit
if len(groupBeforeUpdate.Monitors) >= g.maxMonitorsPerGroup {
return domain.GroupWithMonitors{}, domain.MonitorLimitReachedError{
Current: len(groupBeforeUpdate.Monitors),
Limit: g.maxMonitorsPerGroup,
}
}
tribe, err := g.twhelpSvc.GetExistingTribeByTag(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, tribeTag)
if err != nil {
return domain.GroupWithMonitors{}, err
}
group, err := g.repo.AddMonitor(ctx, groupBeforeUpdate.ID, tribe.ID)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.AddMonitor: %w", err)
}
return group, nil
}
const listTribesLimit = 200
func (g *Group) RemoveTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) {
// check if group exists
groupBeforeUpdate, err := g.Get(ctx, id, serverID)
if err != nil {
return domain.GroupWithMonitors{}, err
}
tribes, err := g.twhelpSvc.ListTribesByTag(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, []string{tribeTag}, 0, listTribesLimit)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err)
}
if len(tribes) == 0 {
return domain.GroupWithMonitors{}, domain.TribeTagNotFoundError{
Tag: tribeTag,
}
}
var ids []string
for _, t := range tribes {
for _, m := range groupBeforeUpdate.Monitors {
if m.TribeID == t.ID {
ids = append(ids, m.ID)
}
}
}
group, err := g.repo.DeleteMonitors(ctx, groupBeforeUpdate.ID, ids...)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.DeleteMonitors: %w", err)
}
return group, nil
}
func (g *Group) SetChannelGains(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
ChannelGains: domain.NullString{
String: channel,
Valid: true,
},
})
}
func (g *Group) SetChannelLosses(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
ChannelLosses: domain.NullString{
String: channel,
Valid: true,
},
})
}
func (g *Group) SetInternals(ctx context.Context, id, serverID string, internals bool) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
Internals: domain.NullBool{
Bool: internals,
Valid: true,
},
})
}
func (g *Group) SetBarbarians(ctx context.Context, id, serverID string, barbarians bool) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
Barbarians: domain.NullBool{
Bool: barbarians,
Valid: true,
},
})
}
func (g *Group) SetLanguageTag(ctx context.Context, id, serverID, languageTag string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
LanguageTag: domain.NullString{
String: languageTag,
Valid: true,
},
})
}
func (g *Group) update(ctx context.Context, id, serverID string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) {
// check if group exists
if _, err := g.Get(ctx, id, serverID); err != nil {
return domain.GroupWithMonitors{}, err
}
group, err := g.repo.Update(ctx, id, params)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Update: %w", err)
}
return group, nil
}
func (g *Group) ListServer(ctx context.Context, serverID string) ([]domain.GroupWithMonitors, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
ServerIDs: []string{serverID},
})
if err != nil {
return nil, fmt.Errorf("GroupRepository.List: %w", err)
}
return groups, nil
}
func (g *Group) Get(ctx context.Context, id, serverID string) (domain.GroupWithMonitors, error) {
group, err := g.repo.Get(ctx, id)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Get: %w", err)
}
if group.ServerID != serverID {
return domain.GroupWithMonitors{}, domain.GroupNotFoundError{
ID: id,
}
}
return group, nil
}
type getTribeResult struct {
index int
monitor domain.Monitor
tribe domain.Tribe
err error
}
func (g *Group) GetWithTribes(ctx context.Context, id, serverID string) (domain.GroupWithMonitorsAndTribes, error) {
group, err := g.Get(ctx, id, serverID)
if err != nil {
return domain.GroupWithMonitorsAndTribes{}, err
}
var wg sync.WaitGroup
ch := make(chan getTribeResult)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, monitor := range group.Monitors {
wg.Add(1)
go func(i int, monitor domain.Monitor) {
defer wg.Done()
res := getTribeResult{
index: i,
monitor: monitor,
}
res.tribe, res.err = g.twhelpSvc.GetTribeByID(
ctx,
group.VersionCode,
group.ServerKey,
monitor.TribeID,
)
ch <- res
}(i, monitor)
}
go func() {
wg.Wait()
close(ch)
}()
var firstErr error
monitors := make([]domain.MonitorWithTribe, len(group.Monitors))
for res := range ch {
if res.err != nil && firstErr == nil {
firstErr = fmt.Errorf("couldn't load tribe (monitorID=%s): %w", res.monitor.ID, res.err)
cancel()
continue
}
if res.err != nil {
continue
}
monitors[res.index] = domain.MonitorWithTribe{
Monitor: res.monitor,
Tribe: res.tribe,
}
}
if firstErr != nil {
return domain.GroupWithMonitorsAndTribes{}, firstErr
}
return domain.GroupWithMonitorsAndTribes{
Group: group.Group,
Monitors: monitors,
}, nil
}
func (g *Group) Delete(ctx context.Context, id, serverID string) error {
// check if group exists
if _, err := g.Get(ctx, id, serverID); err != nil {
return err
}
if err := g.repo.Delete(ctx, id); err != nil {
return fmt.Errorf("GroupRepository.Delete: %w", err)
}
return nil
}
func (g *Group) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
EnabledNotifications: domain.NullBool{
Bool: true,
Valid: true,
},
})
if err != nil {
return nil, fmt.Errorf("GroupRepository.List: %w", err)
}
res, err := g.listEnnoblements(ctx, groups)
if err != nil {
return nil, err
}
var notifications []domain.EnnoblementNotification
for _, group := range groups {
notifications = append(notifications, domain.NewEnnoblementNotifications(group, res.find(group.VersionCode, group.ServerKey))...)
}
return notifications, nil
}
type listEnnoblementsSingleResult struct {
versionCode string
serverKey string
ennoblements []domain.Ennoblement
}
type listEnnoblementsResult []listEnnoblementsSingleResult
func (r listEnnoblementsResult) find(versionCode, serverKey string) []domain.Ennoblement {
for _, res := range r {
if res.serverKey == serverKey && res.versionCode == versionCode {
return res.ennoblements
}
}
return nil
}
const listEnnoblementsLimit = 200
func (g *Group) listEnnoblements(ctx context.Context, groups []domain.GroupWithMonitors) (listEnnoblementsResult, error) {
g.ennoblementsMu.Lock()
defer g.ennoblementsMu.Unlock()
var wg sync.WaitGroup
ch := make(chan listEnnoblementsSingleResult)
ennoblementsSince := g.ennoblementsSince
skip := make(map[string]struct{}, len(ennoblementsSince))
for _, group := range groups {
key := g.buildEnnoblementsSinceKey(group.VersionCode, group.ServerKey)
if _, ok := skip[key]; ok {
continue
}
skip[key] = struct{}{}
since := ennoblementsSince[key]
if since.IsZero() {
since = time.Now().Add(-1 * time.Minute)
}
wg.Add(1)
go func(group domain.GroupWithMonitors, since time.Time) {
defer wg.Done()
ennoblements, err := g.twhelpSvc.ListEnnoblementsSince(ctx, group.VersionCode, group.ServerKey, since, 0, listEnnoblementsLimit)
if err != nil {
g.logger.Warn(
"couldn't list ennoblements",
zap.String("versionCode", group.VersionCode),
zap.String("serverKey", group.ServerKey),
zap.Error(err),
)
return
}
ch <- listEnnoblementsSingleResult{
versionCode: group.VersionCode,
serverKey: group.ServerKey,
ennoblements: ennoblements,
}
}(group, since)
}
go func() {
wg.Wait()
close(ch)
}()
// reinitialize ennoblementsSince
g.ennoblementsSince = make(map[string]time.Time)
results := make(listEnnoblementsResult, 0, len(skip))
for res := range ch {
key := g.buildEnnoblementsSinceKey(res.versionCode, res.serverKey)
if l := len(res.ennoblements); l > 0 {
g.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second)
} else {
g.ennoblementsSince[key] = ennoblementsSince[key]
}
results = append(results, res)
}
return results, nil
}
func (g *Group) buildEnnoblementsSinceKey(versionCode, serverKey string) string {
return versionCode + ":" + serverKey
}
func (g *Group) CleanUp(ctx context.Context) error {
if err := g.deleteAllWithDisabledNotifications(ctx); err != nil {
return err
}
if err := g.deleteAllWithClosedTWServers(ctx); err != nil {
return err
}
return nil
}
func (g *Group) deleteAllWithDisabledNotifications(ctx context.Context) error {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
EnabledNotifications: domain.NullBool{
Bool: false,
Valid: true,
},
CreatedAtLTE: time.Now().Add(-24 * time.Hour),
})
if err != nil {
return fmt.Errorf("couldn't find groups with disabled notifications: %w", err)
}
ids := make([]string, 0, len(groups))
for _, group := range groups {
ids = append(ids, group.ID)
}
if err = g.repo.DeleteMany(ctx, ids...); err != nil {
return fmt.Errorf("couldn't delete groups with disabled notifications: %w", err)
}
return nil
}
const listServersLimit = 500
func (g *Group) deleteAllWithClosedTWServers(ctx context.Context) error {
versions, err := g.twhelpSvc.ListVersions(ctx)
if err != nil {
return err
}
for _, v := range versions {
servers, err := g.twhelpSvc.ListClosedServers(ctx, v.Code, 0, listServersLimit)
if err != nil {
g.logger.Warn("couldn't list closed servers", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
if len(servers) == 0 {
continue
}
params := domain.ListGroupsParams{
VersionCode: domain.NullString{
String: v.Code,
Valid: true,
},
ServerKeys: make([]string, 0, len(servers)),
}
for _, s := range servers {
params.ServerKeys = append(params.ServerKeys, s.Key)
}
groups, err := g.repo.List(ctx, params)
if err != nil {
g.logger.Warn("couldn't list groups", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
if len(groups) == 0 {
continue
}
ids := make([]string, 0, len(groups))
for _, group := range groups {
ids = append(ids, group.ID)
}
if err = g.repo.DeleteMany(ctx, ids...); err != nil {
g.logger.Warn("couldn't delete groups", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
}
return nil
}