package service import ( "context" "errors" "fmt" "sync" "time" "gitea.dwysokinski.me/twhelp/dcbot/internal/domain" "go.uber.org/zap" ) //counterfeiter:generate -o internal/mock/group_repository.gen.go . GroupRepository type GroupRepository interface { Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) Update(ctx context.Context, id string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) AddMonitor(ctx context.Context, id string, tribeID int64) (domain.GroupWithMonitors, error) DeleteMonitors(ctx context.Context, id string, monitorID ...string) (domain.GroupWithMonitors, error) List(ctx context.Context, params domain.ListGroupsParams) ([]domain.GroupWithMonitors, error) Get(ctx context.Context, id string) (domain.GroupWithMonitors, error) Delete(ctx context.Context, id string) error DeleteMany(ctx context.Context, id ...string) error } type Group struct { repo GroupRepository twhelpSvc TWHelpService logger *zap.Logger maxGroupsPerServer int maxMonitorsPerGroup int ennoblementsMu sync.Mutex // ennoblementsMu is used by listEnnoblements ennoblementsSince map[string]time.Time // ennoblementsSince is used by listEnnoblements } func NewGroup(repo GroupRepository, twhelpSvc TWHelpService, logger *zap.Logger, maxGroupsPerServer, maxMonitorsPerGroup int) *Group { return &Group{ repo: repo, twhelpSvc: twhelpSvc, logger: logger, maxGroupsPerServer: maxGroupsPerServer, maxMonitorsPerGroup: maxMonitorsPerGroup, } } func (g *Group) Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ ServerIDs: []string{params.ServerID()}, }) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.List: %w", err) } if len(groups) >= g.maxGroupsPerServer { return domain.GroupWithMonitors{}, domain.GroupLimitReachedError{ Current: len(groups), Limit: g.maxGroupsPerServer, } } if _, err = g.twhelpSvc.GetOpenServer(ctx, params.VersionCode(), params.ServerKey()); err != nil { return domain.GroupWithMonitors{}, err } group, err := g.repo.Create(ctx, params) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Create: %w", err) } return group, nil } func (g *Group) AddTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) { // check if group exists groupBeforeUpdate, err := g.Get(ctx, id, serverID) if err != nil { if errors.Is(err, domain.GroupNotFoundError{ID: id}) { return domain.GroupWithMonitors{}, domain.GroupDoesNotExistError{ID: id} } return domain.GroupWithMonitors{}, err } // check limit if len(groupBeforeUpdate.Monitors) >= g.maxMonitorsPerGroup { return domain.GroupWithMonitors{}, domain.MonitorLimitReachedError{ Current: len(groupBeforeUpdate.Monitors), Limit: g.maxMonitorsPerGroup, } } tribe, err := g.twhelpSvc.GetExistingTribeByTag(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, tribeTag) if err != nil { return domain.GroupWithMonitors{}, err } group, err := g.repo.AddMonitor(ctx, groupBeforeUpdate.ID, tribe.ID) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.AddMonitor: %w", err) } return group, nil } const listTribesLimit = 200 func (g *Group) RemoveTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) { // check if group exists groupBeforeUpdate, err := g.Get(ctx, id, serverID) if err != nil { return domain.GroupWithMonitors{}, err } tribes, err := g.twhelpSvc.ListTribesTag(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, []string{tribeTag}, 0, listTribesLimit) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err) } if len(tribes) == 0 { return domain.GroupWithMonitors{}, domain.TribeTagNotFoundError{ Tag: tribeTag, } } var ids []string for _, t := range tribes { for _, m := range groupBeforeUpdate.Monitors { if m.TribeID == t.ID { ids = append(ids, m.ID) } } } group, err := g.repo.DeleteMonitors(ctx, groupBeforeUpdate.ID, ids...) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.DeleteMonitors: %w", err) } return group, nil } func (g *Group) SetChannelGains(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ ChannelGains: domain.NullString{ String: channel, Valid: true, }, }) } func (g *Group) SetChannelLosses(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ ChannelLosses: domain.NullString{ String: channel, Valid: true, }, }) } func (g *Group) SetInternals(ctx context.Context, id, serverID string, internals bool) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ Internals: domain.NullBool{ Bool: internals, Valid: true, }, }) } func (g *Group) SetBarbarians(ctx context.Context, id, serverID string, barbarians bool) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ Barbarians: domain.NullBool{ Bool: barbarians, Valid: true, }, }) } func (g *Group) SetLanguageTag(ctx context.Context, id, serverID, languageTag string) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ LanguageTag: domain.NullString{ String: languageTag, Valid: true, }, }) } func (g *Group) update(ctx context.Context, id, serverID string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) { // check if group exists if _, err := g.Get(ctx, id, serverID); err != nil { return domain.GroupWithMonitors{}, err } group, err := g.repo.Update(ctx, id, params) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Update: %w", err) } return group, nil } func (g *Group) ListServer(ctx context.Context, serverID string) ([]domain.GroupWithMonitors, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ ServerIDs: []string{serverID}, }) if err != nil { return nil, fmt.Errorf("GroupRepository.List: %w", err) } return groups, nil } func (g *Group) Get(ctx context.Context, id, serverID string) (domain.GroupWithMonitors, error) { group, err := g.repo.Get(ctx, id) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Get: %w", err) } if group.ServerID != serverID { return domain.GroupWithMonitors{}, domain.GroupNotFoundError{ ID: id, } } return group, nil } type getTribeResult struct { index int monitor domain.Monitor tribe domain.Tribe err error } func (g *Group) GetWithTribes(ctx context.Context, id, serverID string) (domain.GroupWithMonitorsAndTribes, error) { group, err := g.Get(ctx, id, serverID) if err != nil { return domain.GroupWithMonitorsAndTribes{}, err } var wg sync.WaitGroup ch := make(chan getTribeResult) ctx, cancel := context.WithCancel(ctx) defer cancel() for i, monitor := range group.Monitors { wg.Add(1) go func(i int, monitor domain.Monitor) { defer wg.Done() res := getTribeResult{ index: i, monitor: monitor, } res.tribe, res.err = g.twhelpSvc.GetTribeByID( ctx, group.VersionCode, group.ServerKey, monitor.TribeID, ) ch <- res }(i, monitor) } go func() { wg.Wait() close(ch) }() var firstErr error monitors := make([]domain.MonitorWithTribe, len(group.Monitors)) for res := range ch { if res.err != nil && firstErr == nil { firstErr = fmt.Errorf("couldn't load tribe (monitorID=%s): %w", res.monitor.ID, res.err) cancel() continue } if res.err != nil { continue } monitors[res.index] = domain.MonitorWithTribe{ Monitor: res.monitor, Tribe: res.tribe, } } if firstErr != nil { return domain.GroupWithMonitorsAndTribes{}, firstErr } return domain.GroupWithMonitorsAndTribes{ Group: group.Group, Monitors: monitors, }, nil } func (g *Group) Delete(ctx context.Context, id, serverID string) error { // check if group exists if _, err := g.Get(ctx, id, serverID); err != nil { return err } if err := g.repo.Delete(ctx, id); err != nil { return fmt.Errorf("GroupRepository.Delete: %w", err) } return nil } func (g *Group) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ EnabledNotifications: domain.NullBool{ Bool: true, Valid: true, }, }) if err != nil { return nil, fmt.Errorf("GroupRepository.List: %w", err) } res, err := g.listEnnoblements(ctx, groups) if err != nil { return nil, err } return ennoblementNotificationBuilder{ groups: groups, ennoblements: res, }.build(), nil } type listEnnoblementsSingleResult struct { versionCode string serverKey string ennoblements []domain.Ennoblement } type listEnnoblementsResult []listEnnoblementsSingleResult func (r listEnnoblementsResult) find(versionCode, serverKey string) []domain.Ennoblement { for _, res := range r { if res.serverKey == serverKey && res.versionCode == versionCode { return res.ennoblements } } return nil } const listEnnoblementsLimit = 200 func (g *Group) listEnnoblements(ctx context.Context, groups []domain.GroupWithMonitors) (listEnnoblementsResult, error) { g.ennoblementsMu.Lock() defer g.ennoblementsMu.Unlock() var wg sync.WaitGroup ch := make(chan listEnnoblementsSingleResult) ennoblementsSince := g.ennoblementsSince skip := make(map[string]struct{}, len(ennoblementsSince)) for _, group := range groups { key := g.buildEnnoblementsSinceKey(group.VersionCode, group.ServerKey) if _, ok := skip[key]; ok { continue } skip[key] = struct{}{} since := ennoblementsSince[key] if since.IsZero() { since = time.Now().Add(-1 * time.Minute) } wg.Add(1) go func(group domain.GroupWithMonitors, since time.Time) { defer wg.Done() ennoblements, err := g.twhelpSvc.ListEnnoblementsSince(ctx, group.VersionCode, group.ServerKey, since, 0, listEnnoblementsLimit) if err != nil { g.logger.Warn( "couldn't list ennoblements", zap.String("versionCode", group.VersionCode), zap.String("serverKey", group.ServerKey), zap.Error(err), ) return } ch <- listEnnoblementsSingleResult{ versionCode: group.VersionCode, serverKey: group.ServerKey, ennoblements: ennoblements, } }(group, since) } go func() { wg.Wait() close(ch) }() // reinitialize ennoblementsSince g.ennoblementsSince = make(map[string]time.Time) results := make(listEnnoblementsResult, 0, len(skip)) for res := range ch { key := g.buildEnnoblementsSinceKey(res.versionCode, res.serverKey) if l := len(res.ennoblements); l > 0 { g.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second) } else { g.ennoblementsSince[key] = ennoblementsSince[key] } results = append(results, res) } return results, nil } func (g *Group) buildEnnoblementsSinceKey(versionCode, serverKey string) string { return versionCode + ":" + serverKey } func (g *Group) CleanUp(ctx context.Context) error { if err := g.deleteAllWithDisabledNotifications(ctx); err != nil { return err } if err := g.deleteAllWithClosedTWServers(ctx); err != nil { return err } return nil } func (g *Group) deleteAllWithDisabledNotifications(ctx context.Context) error { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ EnabledNotifications: domain.NullBool{ Bool: false, Valid: true, }, CreatedAtLTE: time.Now().Add(-24 * time.Hour), }) if err != nil { return fmt.Errorf("couldn't find groups with disabled notifications: %w", err) } ids := make([]string, 0, len(groups)) for _, group := range groups { ids = append(ids, group.ID) } if err = g.repo.DeleteMany(ctx, ids...); err != nil { return fmt.Errorf("couldn't delete groups with disabled notifications: %w", err) } return nil } const listServersLimit = 500 func (g *Group) deleteAllWithClosedTWServers(ctx context.Context) error { versions, err := g.twhelpSvc.ListVersions(ctx) if err != nil { return err } for _, v := range versions { servers, err := g.twhelpSvc.ListClosedServers(ctx, v.Code, 0, listServersLimit) if err != nil { g.logger.Warn("couldn't list closed servers", zap.Error(err), zap.String("versionCode", v.Code)) continue } if len(servers) == 0 { continue } params := domain.ListGroupsParams{ VersionCode: domain.NullString{ String: v.Code, Valid: true, }, ServerKeys: make([]string, 0, len(servers)), } for _, s := range servers { params.ServerKeys = append(params.ServerKeys, s.Key) } groups, err := g.repo.List(ctx, params) if err != nil { g.logger.Warn("couldn't list groups", zap.Error(err), zap.String("versionCode", v.Code)) continue } if len(groups) == 0 { continue } ids := make([]string, 0, len(groups)) for _, group := range groups { ids = append(ids, group.ID) } if err = g.repo.DeleteMany(ctx, ids...); err != nil { g.logger.Warn("couldn't delete groups", zap.Error(err), zap.String("versionCode", v.Code)) continue } } return nil }