dcbot/internal/service/group.go

556 lines
14 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"gitea.dwysokinski.me/twhelp/dcbot/internal/domain"
"gitea.dwysokinski.me/twhelp/dcbot/internal/twhelp"
"go.uber.org/zap"
)
//counterfeiter:generate -o internal/mock/group_repository.gen.go . GroupRepository
type GroupRepository interface {
Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error)
Update(ctx context.Context, id string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error)
AddMonitor(ctx context.Context, id string, tribeID int64) (domain.GroupWithMonitors, error)
DeleteMonitors(ctx context.Context, id string, monitorID ...string) (domain.GroupWithMonitors, error)
List(ctx context.Context, params domain.ListGroupsParams) ([]domain.GroupWithMonitors, error)
Get(ctx context.Context, id string) (domain.GroupWithMonitors, error)
Delete(ctx context.Context, id string) error
DeleteMany(ctx context.Context, id ...string) error
}
type Group struct {
repo GroupRepository
client TWHelpClient
logger *zap.Logger
maxGroupsPerServer int
maxMonitorsPerGroup int
ennoblementsMu sync.Mutex // ennoblementsMu is used by listEnnoblements
ennoblementsSince map[string]time.Time // ennoblementsSince is used by listEnnoblements
}
func NewGroup(repo GroupRepository, client TWHelpClient, logger *zap.Logger, maxGroupsPerServer, maxMonitorsPerGroup int) *Group {
return &Group{
repo: repo,
client: client,
logger: logger,
maxGroupsPerServer: maxGroupsPerServer,
maxMonitorsPerGroup: maxMonitorsPerGroup,
}
}
func (g *Group) Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
ServerIDs: []string{params.ServerID()},
})
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.List: %w", err)
}
if len(groups) >= g.maxGroupsPerServer {
return domain.GroupWithMonitors{}, domain.GroupLimitReachedError{
Current: len(groups),
Limit: g.maxGroupsPerServer,
}
}
if err = g.checkTWServer(ctx, params.VersionCode(), params.ServerKey()); err != nil {
return domain.GroupWithMonitors{}, err
}
group, err := g.repo.Create(ctx, params)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Create: %w", err)
}
return group, nil
}
func (g *Group) checkTWServer(ctx context.Context, versionCode, serverKey string) error {
server, err := g.client.GetServer(ctx, versionCode, serverKey)
if err != nil {
var apiErr twhelp.APIError
if !errors.As(err, &apiErr) || apiErr.Code != twhelp.ErrorCodeEntityNotFound {
return fmt.Errorf("TWHelpClient.GetServer: %w", err)
}
return domain.TWServerDoesNotExistError{
VersionCode: versionCode,
Key: serverKey,
}
}
if !server.Open {
return domain.TWServerIsClosedError{
VersionCode: versionCode,
Key: serverKey,
}
}
return nil
}
func (g *Group) AddTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) {
// check if group exists
groupBeforeUpdate, err := g.Get(ctx, id, serverID)
if err != nil {
if errors.Is(err, domain.GroupNotFoundError{ID: id}) {
return domain.GroupWithMonitors{}, domain.GroupDoesNotExistError{ID: id}
}
return domain.GroupWithMonitors{}, err
}
// check limit
if len(groupBeforeUpdate.Monitors) >= g.maxMonitorsPerGroup {
return domain.GroupWithMonitors{}, domain.MonitorLimitReachedError{
Current: len(groupBeforeUpdate.Monitors),
Limit: g.maxMonitorsPerGroup,
}
}
tribes, err := g.client.ListTribes(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, twhelp.ListTribesQueryParams{
Limit: 1,
Tags: []string{tribeTag},
Deleted: twhelp.NullBool{
Valid: true,
Bool: false,
},
})
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err)
}
if len(tribes) == 0 {
return domain.GroupWithMonitors{}, domain.TribeDoesNotExistError{
Tag: tribeTag,
}
}
group, err := g.repo.AddMonitor(ctx, groupBeforeUpdate.ID, tribes[0].ID)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.AddMonitor: %w", err)
}
return group, nil
}
func (g *Group) RemoveTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) {
// check if group exists
groupBeforeUpdate, err := g.Get(ctx, id, serverID)
if err != nil {
return domain.GroupWithMonitors{}, err
}
tribes, err := g.client.ListTribes(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, twhelp.ListTribesQueryParams{
Tags: []string{tribeTag},
})
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err)
}
if len(tribes) == 0 {
return domain.GroupWithMonitors{}, domain.TribeNotFoundError{
Tag: tribeTag,
}
}
var ids []string
for _, t := range tribes {
for _, m := range groupBeforeUpdate.Monitors {
if m.TribeID == t.ID {
ids = append(ids, m.ID)
}
}
}
group, err := g.repo.DeleteMonitors(ctx, groupBeforeUpdate.ID, ids...)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.DeleteMonitors: %w", err)
}
return group, nil
}
func (g *Group) SetChannelGains(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
ChannelGains: domain.NullString{
String: channel,
Valid: true,
},
})
}
func (g *Group) SetChannelLosses(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
ChannelLosses: domain.NullString{
String: channel,
Valid: true,
},
})
}
func (g *Group) SetInternals(ctx context.Context, id, serverID string, internals bool) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
Internals: domain.NullBool{
Bool: internals,
Valid: true,
},
})
}
func (g *Group) SetBarbarians(ctx context.Context, id, serverID string, barbarians bool) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
Barbarians: domain.NullBool{
Bool: barbarians,
Valid: true,
},
})
}
func (g *Group) SetLanguageTag(ctx context.Context, id, serverID, languageTag string) (domain.GroupWithMonitors, error) {
return g.update(ctx, id, serverID, domain.UpdateGroupParams{
LanguageTag: domain.NullString{
String: languageTag,
Valid: true,
},
})
}
func (g *Group) update(ctx context.Context, id, serverID string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) {
// check if group exists
if _, err := g.Get(ctx, id, serverID); err != nil {
return domain.GroupWithMonitors{}, err
}
group, err := g.repo.Update(ctx, id, params)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Update: %w", err)
}
return group, nil
}
func (g *Group) ListServer(ctx context.Context, serverID string) ([]domain.GroupWithMonitors, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
ServerIDs: []string{serverID},
})
if err != nil {
return nil, fmt.Errorf("GroupRepository.List: %w", err)
}
return groups, nil
}
func (g *Group) Get(ctx context.Context, id, serverID string) (domain.GroupWithMonitors, error) {
group, err := g.repo.Get(ctx, id)
if err != nil {
return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Get: %w", err)
}
if group.ServerID != serverID {
return domain.GroupWithMonitors{}, domain.GroupNotFoundError{
ID: id,
}
}
return group, nil
}
type getTribeResult struct {
index int
monitor domain.Monitor
tribe twhelp.Tribe
err error
}
func (g *Group) GetWithTribes(ctx context.Context, id, serverID string) (domain.GroupWithMonitorsAndTribes, error) {
group, err := g.Get(ctx, id, serverID)
if err != nil {
return domain.GroupWithMonitorsAndTribes{}, err
}
var wg sync.WaitGroup
ch := make(chan getTribeResult)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, monitor := range group.Monitors {
wg.Add(1)
go func(i int, monitor domain.Monitor) {
defer wg.Done()
res := getTribeResult{
index: i,
monitor: monitor,
}
res.tribe, res.err = g.client.GetTribeByID(
ctx,
group.VersionCode,
group.ServerKey,
monitor.TribeID,
)
ch <- res
}(i, monitor)
}
go func() {
wg.Wait()
close(ch)
}()
var firstErr error
monitors := make([]domain.MonitorWithTribe, len(group.Monitors))
for res := range ch {
if res.err != nil && firstErr == nil {
firstErr = fmt.Errorf("couldn't load tribe (monitorID=%s): %w", res.monitor.ID, res.err)
cancel()
continue
}
if res.err != nil {
continue
}
monitors[res.index] = domain.MonitorWithTribe{
Monitor: res.monitor,
Tribe: domain.TribeMeta{
ID: res.tribe.ID,
Name: res.tribe.Name,
Tag: res.tribe.Tag,
ProfileURL: res.tribe.ProfileURL,
},
}
}
if firstErr != nil {
return domain.GroupWithMonitorsAndTribes{}, firstErr
}
return domain.GroupWithMonitorsAndTribes{
Group: group.Group,
Monitors: monitors,
}, nil
}
func (g *Group) Delete(ctx context.Context, id, serverID string) error {
// check if group exists
if _, err := g.Get(ctx, id, serverID); err != nil {
return err
}
if err := g.repo.Delete(ctx, id); err != nil {
return fmt.Errorf("GroupRepository.Delete: %w", err)
}
return nil
}
func (g *Group) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
EnabledNotifications: domain.NullBool{
Bool: true,
Valid: true,
},
})
if err != nil {
return nil, fmt.Errorf("GroupRepository.List: %w", err)
}
res, err := g.listEnnoblements(ctx, groups)
if err != nil {
return nil, err
}
return ennoblementNotificationBuilder{
groups: groups,
ennoblements: res,
}.build(), nil
}
type listEnnoblementsSingleResult struct {
versionCode string
serverKey string
ennoblements []twhelp.Ennoblement
}
type listEnnoblementsResult []listEnnoblementsSingleResult
func (r listEnnoblementsResult) find(versionCode, serverKey string) []twhelp.Ennoblement {
for _, res := range r {
if res.serverKey == serverKey && res.versionCode == versionCode {
return res.ennoblements
}
}
return nil
}
func (g *Group) listEnnoblements(ctx context.Context, groups []domain.GroupWithMonitors) (listEnnoblementsResult, error) {
g.ennoblementsMu.Lock()
defer g.ennoblementsMu.Unlock()
var wg sync.WaitGroup
ch := make(chan listEnnoblementsSingleResult)
ennoblementsSince := g.ennoblementsSince
skip := make(map[string]struct{}, len(ennoblementsSince))
for _, group := range groups {
key := g.buildEnnoblementsSinceKey(group.VersionCode, group.ServerKey)
if _, ok := skip[key]; ok {
continue
}
skip[key] = struct{}{}
since := ennoblementsSince[key]
if since.IsZero() {
since = time.Now().Add(-1 * time.Minute)
}
wg.Add(1)
go func(group domain.GroupWithMonitors, since time.Time) {
defer wg.Done()
ennoblements, err := g.client.ListEnnoblements(
ctx,
group.VersionCode,
group.ServerKey,
twhelp.ListEnnoblementsQueryParams{
Since: since,
Sort: []twhelp.EnnoblementSort{twhelp.EnnoblementSortCreatedAtASC},
},
)
if err != nil {
g.logger.Warn(
"couldn't list ennoblements",
zap.String("versionCode", group.VersionCode),
zap.String("serverKey", group.ServerKey),
zap.Error(err),
)
return
}
ch <- listEnnoblementsSingleResult{
versionCode: group.VersionCode,
serverKey: group.ServerKey,
ennoblements: ennoblements,
}
}(group, since)
}
go func() {
wg.Wait()
close(ch)
}()
// reinitialize ennoblementsSince
g.ennoblementsSince = make(map[string]time.Time)
results := make(listEnnoblementsResult, 0, len(skip))
for res := range ch {
key := g.buildEnnoblementsSinceKey(res.versionCode, res.serverKey)
if l := len(res.ennoblements); l > 0 {
g.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second)
} else {
g.ennoblementsSince[key] = ennoblementsSince[key]
}
results = append(results, res)
}
return results, nil
}
func (g *Group) buildEnnoblementsSinceKey(versionCode, serverKey string) string {
return versionCode + ":" + serverKey
}
func (g *Group) CleanUp(ctx context.Context) error {
if err := g.deleteAllWithDisabledNotifications(ctx); err != nil {
return err
}
if err := g.deleteAllWithClosedTWServers(ctx); err != nil {
return err
}
return nil
}
func (g *Group) deleteAllWithDisabledNotifications(ctx context.Context) error {
groups, err := g.repo.List(ctx, domain.ListGroupsParams{
EnabledNotifications: domain.NullBool{
Bool: false,
Valid: true,
},
CreatedAtLTE: time.Now().Add(-24 * time.Hour),
})
if err != nil {
return fmt.Errorf("couldn't find groups with disabled notifications: %w", err)
}
ids := make([]string, 0, len(groups))
for _, group := range groups {
ids = append(ids, group.ID)
}
if err = g.repo.DeleteMany(ctx, ids...); err != nil {
return fmt.Errorf("couldn't delete groups with disabled notifications: %w", err)
}
return nil
}
func (g *Group) deleteAllWithClosedTWServers(ctx context.Context) error {
versions, err := g.client.ListVersions(ctx)
if err != nil {
return fmt.Errorf("TWHelpClient.ListVersions: %w", err)
}
for _, v := range versions {
servers, err := g.client.ListServers(ctx, v.Code, twhelp.ListServersQueryParams{
Open: twhelp.NullBool{
Bool: false,
Valid: true,
},
})
if err != nil {
g.logger.Warn("couldn't list closed servers", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
if len(servers) == 0 {
continue
}
params := domain.ListGroupsParams{
VersionCode: domain.NullString{
String: v.Code,
Valid: true,
},
ServerKeys: make([]string, 0, len(servers)),
}
for _, s := range servers {
params.ServerKeys = append(params.ServerKeys, s.Key)
}
groups, err := g.repo.List(ctx, params)
if err != nil {
g.logger.Warn("couldn't list groups", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
if len(groups) == 0 {
continue
}
ids := make([]string, 0, len(groups))
for _, group := range groups {
ids = append(ids, group.ID)
}
if err = g.repo.DeleteMany(ctx, ids...); err != nil {
g.logger.Warn("couldn't delete groups", zap.Error(err), zap.String("versionCode", v.Code))
continue
}
}
return nil
}