package service import ( "context" "errors" "fmt" "sync" "time" "gitea.dwysokinski.me/twhelp/dcbot/internal/domain" "gitea.dwysokinski.me/twhelp/dcbot/internal/twhelp" "go.uber.org/zap" ) //counterfeiter:generate -o internal/mock/group_repository.gen.go . GroupRepository type GroupRepository interface { Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) Update(ctx context.Context, id string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) AddMonitor(ctx context.Context, id string, tribeID int64) (domain.GroupWithMonitors, error) DeleteMonitors(ctx context.Context, id string, monitorID ...string) (domain.GroupWithMonitors, error) List(ctx context.Context, params domain.ListGroupsParams) ([]domain.GroupWithMonitors, error) Get(ctx context.Context, id string) (domain.GroupWithMonitors, error) Delete(ctx context.Context, id string) error DeleteMany(ctx context.Context, id ...string) error } type Group struct { repo GroupRepository client TWHelpClient logger *zap.Logger maxGroupsPerServer int maxMonitorsPerGroup int ennoblementsMu sync.Mutex // ennoblementsMu is used by listEnnoblements ennoblementsSince map[string]time.Time // ennoblementsSince is used by listEnnoblements } func NewGroup(repo GroupRepository, client TWHelpClient, logger *zap.Logger, maxGroupsPerServer, maxMonitorsPerGroup int) *Group { return &Group{ repo: repo, client: client, logger: logger, maxGroupsPerServer: maxGroupsPerServer, maxMonitorsPerGroup: maxMonitorsPerGroup, } } func (g *Group) Create(ctx context.Context, params domain.CreateGroupParams) (domain.GroupWithMonitors, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ ServerIDs: []string{params.ServerID()}, }) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.List: %w", err) } if len(groups) >= g.maxGroupsPerServer { return domain.GroupWithMonitors{}, domain.GroupLimitReachedError{ Current: len(groups), Limit: g.maxGroupsPerServer, } } if err = g.checkTWServer(ctx, params.VersionCode(), params.ServerKey()); err != nil { return domain.GroupWithMonitors{}, err } group, err := g.repo.Create(ctx, params) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Create: %w", err) } return group, nil } func (g *Group) checkTWServer(ctx context.Context, versionCode, serverKey string) error { server, err := g.client.GetServer(ctx, versionCode, serverKey) if err != nil { var apiErr twhelp.APIError if !errors.As(err, &apiErr) || apiErr.Code != twhelp.ErrorCodeEntityNotFound { return fmt.Errorf("TWHelpClient.GetServer: %w", err) } return domain.TWServerDoesNotExistError{ VersionCode: versionCode, Key: serverKey, } } if !server.Open { return domain.TWServerIsClosedError{ VersionCode: versionCode, Key: serverKey, } } return nil } func (g *Group) AddTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) { // check if group exists groupBeforeUpdate, err := g.Get(ctx, id, serverID) if err != nil { if errors.Is(err, domain.GroupNotFoundError{ID: id}) { return domain.GroupWithMonitors{}, domain.GroupDoesNotExistError{ID: id} } return domain.GroupWithMonitors{}, err } // check limit if len(groupBeforeUpdate.Monitors) >= g.maxMonitorsPerGroup { return domain.GroupWithMonitors{}, domain.MonitorLimitReachedError{ Current: len(groupBeforeUpdate.Monitors), Limit: g.maxMonitorsPerGroup, } } tribes, err := g.client.ListTribes(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, twhelp.ListTribesQueryParams{ Limit: 1, Tags: []string{tribeTag}, Deleted: twhelp.NullBool{ Valid: true, Bool: false, }, }) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err) } if len(tribes) == 0 { return domain.GroupWithMonitors{}, domain.TribeDoesNotExistError{ Tag: tribeTag, } } group, err := g.repo.AddMonitor(ctx, groupBeforeUpdate.ID, tribes[0].ID) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.AddMonitor: %w", err) } return group, nil } func (g *Group) RemoveTribe(ctx context.Context, id, serverID, tribeTag string) (domain.GroupWithMonitors, error) { // check if group exists groupBeforeUpdate, err := g.Get(ctx, id, serverID) if err != nil { return domain.GroupWithMonitors{}, err } tribes, err := g.client.ListTribes(ctx, groupBeforeUpdate.VersionCode, groupBeforeUpdate.ServerKey, twhelp.ListTribesQueryParams{ Tags: []string{tribeTag}, }) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("TWHelpClient.ListTribes: %w", err) } if len(tribes) == 0 { return domain.GroupWithMonitors{}, domain.TribeNotFoundError{ Tag: tribeTag, } } var ids []string for _, t := range tribes { for _, m := range groupBeforeUpdate.Monitors { if m.TribeID == t.ID { ids = append(ids, m.ID) } } } group, err := g.repo.DeleteMonitors(ctx, groupBeforeUpdate.ID, ids...) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.DeleteMonitors: %w", err) } return group, nil } func (g *Group) SetChannelGains(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ ChannelGains: domain.NullString{ String: channel, Valid: true, }, }) } func (g *Group) SetChannelLosses(ctx context.Context, id, serverID, channel string) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ ChannelLosses: domain.NullString{ String: channel, Valid: true, }, }) } func (g *Group) SetInternals(ctx context.Context, id, serverID string, internals bool) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ Internals: domain.NullBool{ Bool: internals, Valid: true, }, }) } func (g *Group) SetBarbarians(ctx context.Context, id, serverID string, barbarians bool) (domain.GroupWithMonitors, error) { return g.update(ctx, id, serverID, domain.UpdateGroupParams{ Barbarians: domain.NullBool{ Bool: barbarians, Valid: true, }, }) } func (g *Group) update(ctx context.Context, id, serverID string, params domain.UpdateGroupParams) (domain.GroupWithMonitors, error) { // check if group exists if _, err := g.Get(ctx, id, serverID); err != nil { return domain.GroupWithMonitors{}, err } group, err := g.repo.Update(ctx, id, params) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Update: %w", err) } return group, nil } func (g *Group) ListServer(ctx context.Context, serverID string) ([]domain.GroupWithMonitors, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ ServerIDs: []string{serverID}, }) if err != nil { return nil, fmt.Errorf("GroupRepository.List: %w", err) } return groups, nil } func (g *Group) Get(ctx context.Context, id, serverID string) (domain.GroupWithMonitors, error) { group, err := g.repo.Get(ctx, id) if err != nil { return domain.GroupWithMonitors{}, fmt.Errorf("GroupRepository.Get: %w", err) } if group.ServerID != serverID { return domain.GroupWithMonitors{}, domain.GroupNotFoundError{ ID: id, } } return group, nil } type getTribeResult struct { index int monitor domain.Monitor tribe twhelp.Tribe err error } func (g *Group) GetWithTribes(ctx context.Context, id, serverID string) (domain.GroupWithMonitorsAndTribes, error) { group, err := g.Get(ctx, id, serverID) if err != nil { return domain.GroupWithMonitorsAndTribes{}, err } var wg sync.WaitGroup ch := make(chan getTribeResult) ctx, cancel := context.WithCancel(ctx) defer cancel() for i, monitor := range group.Monitors { wg.Add(1) go func(i int, monitor domain.Monitor) { defer wg.Done() res := getTribeResult{ index: i, monitor: monitor, } res.tribe, res.err = g.client.GetTribeByID( ctx, group.VersionCode, group.ServerKey, monitor.TribeID, ) ch <- res }(i, monitor) } go func() { wg.Wait() close(ch) }() var firstErr error monitors := make([]domain.MonitorWithTribe, len(group.Monitors)) for res := range ch { if res.err != nil && firstErr == nil { firstErr = fmt.Errorf("couldn't load tribe (monitorID=%s): %w", res.monitor.ID, res.err) cancel() continue } if res.err != nil { continue } monitors[res.index] = domain.MonitorWithTribe{ Monitor: res.monitor, Tribe: domain.TribeMeta{ ID: res.tribe.ID, Name: res.tribe.Name, Tag: res.tribe.Tag, ProfileURL: res.tribe.ProfileURL, }, } } if firstErr != nil { return domain.GroupWithMonitorsAndTribes{}, firstErr } return domain.GroupWithMonitorsAndTribes{ Group: group.Group, Monitors: monitors, }, nil } func (g *Group) Delete(ctx context.Context, id, serverID string) error { // check if group exists if _, err := g.Get(ctx, id, serverID); err != nil { return err } if err := g.repo.Delete(ctx, id); err != nil { return fmt.Errorf("GroupRepository.Delete: %w", err) } return nil } func (g *Group) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ EnabledNotifications: domain.NullBool{ Bool: true, Valid: true, }, }) if err != nil { return nil, fmt.Errorf("GroupRepository.List: %w", err) } res, err := g.listEnnoblements(ctx, groups) if err != nil { return nil, err } return ennoblementNotificationBuilder{ groups: groups, ennoblements: res, }.build(), nil } type listEnnoblementsSingleResult struct { versionCode string serverKey string ennoblements []twhelp.Ennoblement } type listEnnoblementsResult []listEnnoblementsSingleResult func (r listEnnoblementsResult) find(versionCode, serverKey string) []twhelp.Ennoblement { for _, res := range r { if res.serverKey == serverKey && res.versionCode == versionCode { return res.ennoblements } } return nil } func (g *Group) listEnnoblements(ctx context.Context, groups []domain.GroupWithMonitors) (listEnnoblementsResult, error) { g.ennoblementsMu.Lock() defer g.ennoblementsMu.Unlock() var wg sync.WaitGroup ch := make(chan listEnnoblementsSingleResult) ennoblementsSince := g.ennoblementsSince skip := make(map[string]struct{}, len(ennoblementsSince)) for _, group := range groups { key := g.buildEnnoblementsSinceKey(group.VersionCode, group.ServerKey) if _, ok := skip[key]; ok { continue } skip[key] = struct{}{} since := ennoblementsSince[key] if since.IsZero() { since = time.Now().Add(-1 * time.Minute) } wg.Add(1) go func(group domain.GroupWithMonitors, since time.Time) { defer wg.Done() ennoblements, err := g.client.ListEnnoblements( ctx, group.VersionCode, group.ServerKey, twhelp.ListEnnoblementsQueryParams{ Since: since, Sort: []twhelp.EnnoblementSort{twhelp.EnnoblementSortCreatedAtASC}, }, ) if err != nil { g.logger.Warn( "couldn't list ennoblements", zap.String("versionCode", group.VersionCode), zap.String("serverKey", group.ServerKey), zap.Error(err), ) return } ch <- listEnnoblementsSingleResult{ versionCode: group.VersionCode, serverKey: group.ServerKey, ennoblements: ennoblements, } }(group, since) } go func() { wg.Wait() close(ch) }() // reinitialize ennoblementsSince g.ennoblementsSince = make(map[string]time.Time) results := make(listEnnoblementsResult, 0, len(skip)) for res := range ch { key := g.buildEnnoblementsSinceKey(res.versionCode, res.serverKey) if l := len(res.ennoblements); l > 0 { g.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second) } else { g.ennoblementsSince[key] = ennoblementsSince[key] } results = append(results, res) } return results, nil } func (g *Group) buildEnnoblementsSinceKey(versionCode, serverKey string) string { return versionCode + ":" + serverKey } func (g *Group) CleanUp(ctx context.Context) error { if err := g.deleteAllWithDisabledNotifications(ctx); err != nil { return err } if err := g.deleteAllWithClosedTWServers(ctx); err != nil { return err } return nil } func (g *Group) deleteAllWithDisabledNotifications(ctx context.Context) error { groups, err := g.repo.List(ctx, domain.ListGroupsParams{ EnabledNotifications: domain.NullBool{ Bool: false, Valid: true, }, CreatedAtLTE: time.Now().Add(-24 * time.Hour), }) if err != nil { return fmt.Errorf("couldn't find groups with disabled notifications: %w", err) } ids := make([]string, 0, len(groups)) for _, group := range groups { ids = append(ids, group.ID) } if err = g.repo.DeleteMany(ctx, ids...); err != nil { return fmt.Errorf("couldn't delete groups with disabled notifications: %w", err) } return nil } func (g *Group) deleteAllWithClosedTWServers(ctx context.Context) error { versions, err := g.client.ListVersions(ctx) if err != nil { return fmt.Errorf("TWHelpClient.ListVersions: %w", err) } for _, v := range versions { servers, err := g.client.ListServers(ctx, v.Code, twhelp.ListServersQueryParams{ Open: twhelp.NullBool{ Bool: false, Valid: true, }, }) if err != nil { g.logger.Warn("couldn't list closed servers", zap.Error(err), zap.String("versionCode", v.Code)) continue } if len(servers) == 0 { continue } params := domain.ListGroupsParams{ VersionCode: domain.NullString{ String: v.Code, Valid: true, }, ServerKeys: make([]string, 0, len(servers)), } for _, s := range servers { params.ServerKeys = append(params.ServerKeys, s.Key) } groups, err := g.repo.List(ctx, params) if err != nil { g.logger.Warn("couldn't list groups", zap.Error(err), zap.String("versionCode", v.Code)) continue } if len(groups) == 0 { continue } ids := make([]string, 0, len(groups)) for _, group := range groups { ids = append(ids, group.ID) } if err = g.repo.DeleteMany(ctx, ids...); err != nil { g.logger.Warn("couldn't delete groups", zap.Error(err), zap.String("versionCode", v.Code)) continue } } return nil }