package service import ( "context" "errors" "fmt" "sync" "time" "gitea.dwysokinski.me/twhelp/dcbot/internal/domain" "gitea.dwysokinski.me/twhelp/dcbot/internal/twhelp" ) const ( maxMonitorsPerGroup = 10 ) //counterfeiter:generate -o internal/mock/monitor_repository.gen.go . MonitorRepository type MonitorRepository interface { Create(ctx context.Context, params domain.CreateMonitorParams) (domain.Monitor, error) List(ctx context.Context, groupID string) ([]domain.Monitor, error) Get(ctx context.Context, id string) (domain.Monitor, error) Delete(ctx context.Context, id string) error } //counterfeiter:generate -o internal/mock/group_reader.gen.go . GroupReader type GroupReader interface { Get(ctx context.Context, id, serverID string) (domain.Group, error) List(ctx context.Context, params domain.ListGroupsParams) ([]domain.Group, error) } type Monitor struct { repo MonitorRepository client TWHelpClient groupSvc GroupReader ennoblementsMu sync.Mutex // ennoblementsMu is used by Monitor.fetchEnnoblements ennoblementsSince map[string]time.Time // ennoblementsSince is used by Monitor.fetchEnnoblements ennoblementsMaxConcurrentRequests int // ennoblementsMaxConcurrentRequests is used by Monitor.fetchEnnoblements } func NewMonitor( repo MonitorRepository, groupSvc GroupReader, client TWHelpClient, ennoblementsMaxConcurrentRequests int, ) *Monitor { return &Monitor{ repo: repo, client: client, groupSvc: groupSvc, ennoblementsMaxConcurrentRequests: ennoblementsMaxConcurrentRequests, ennoblementsSince: make(map[string]time.Time), } } func (m *Monitor) Create(ctx context.Context, groupID, serverID, tribeTag string) (domain.Monitor, error) { // check if group exists group, err := m.groupSvc.Get(ctx, groupID, serverID) if err != nil { if errors.Is(err, domain.GroupNotFoundError{ID: groupID}) { return domain.Monitor{}, domain.GroupDoesNotExistError{ID: groupID} } return domain.Monitor{}, fmt.Errorf("GroupService.Get: %w", err) } // check limit monitors, err := m.repo.List(ctx, group.ID) if err != nil { return domain.Monitor{}, fmt.Errorf("MonitorRepository.List: %w", err) } if len(monitors) >= maxMonitorsPerGroup { return domain.Monitor{}, domain.MonitorLimitReachedError{ Current: len(monitors), Limit: maxMonitorsPerGroup, } } // check if tribe exists tribe, err := m.client.GetTribeByTag(ctx, group.VersionCode, group.ServerKey, tribeTag) if err != nil { var apiErr twhelp.APIError if !errors.As(err, &apiErr) { return domain.Monitor{}, fmt.Errorf("TWHelpClient.GetServer: %w", err) } return domain.Monitor{}, domain.TribeDoesNotExistError{ Tag: tribeTag, } } if !tribe.DeletedAt.IsZero() { return domain.Monitor{}, domain.TribeDoesNotExistError{ Tag: tribeTag, } } params, err := domain.NewCreateMonitorParams(group.ID, tribe.ID) if err != nil { return domain.Monitor{}, fmt.Errorf("domain.NewCreateMonitorParams: %w", err) } monitor, err := m.repo.Create(ctx, params) if err != nil { return domain.Monitor{}, fmt.Errorf("MonitorRepository.Create: %w", err) } return monitor, nil } func (m *Monitor) Delete(ctx context.Context, id, serverID string) error { monitor, err := m.repo.Get(ctx, id) if err != nil { return fmt.Errorf("MonitorRepository.Get: %w", err) } _, err = m.groupSvc.Get(ctx, monitor.GroupID, serverID) if err != nil { return fmt.Errorf("GroupService.Get: %w", err) } if err = m.repo.Delete(ctx, id); err != nil { return fmt.Errorf("MonitorRepository.Delete: %w", err) } return nil } func (m *Monitor) Execute(ctx context.Context) ([]domain.EnnoblementNotification, error) { groups, err := m.groupSvc.List(ctx, domain.ListGroupsParams{}) if err != nil { return nil, fmt.Errorf("GroupService.List: %w", err) } res, err := m.fetchEnnoblements(ctx, groups) if err != nil { return nil, err } //nolint:prealloc var notifications []domain.EnnoblementNotification for _, r := range res { for _, g := range groups { if g.ServerKey != r.serverKey || g.VersionCode != r.versionCode { continue } if g.ChannelGains == "" && g.ChannelLosses == "" { continue } ns, err := m.executeGroup(ctx, g, r.ennoblements) if err != nil { // TODO: log this error continue } notifications = append(notifications, ns...) } } return notifications, nil } func (m *Monitor) executeGroup( ctx context.Context, g domain.Group, ennoblements []twhelp.Ennoblement, ) ([]domain.EnnoblementNotification, error) { monitors, err := m.repo.List(ctx, g.ID) if err != nil { return nil, fmt.Errorf("MonitorRepository.List: %w", err) } //nolint:prealloc var notifications []domain.EnnoblementNotification for _, e := range ennoblements { if isInternal(e, monitors) { continue } if g.ChannelGains != "" && isGain(e, monitors) { notifications = append(notifications, domain.EnnoblementNotification{ Type: domain.EnnoblementNotificationTypeGain, ServerID: g.ServerID, ChannelID: g.ChannelGains, Ennoblement: ennoblementToDomainModel(e), }) } if g.ChannelLosses != "" && isLoss(e, monitors) { notifications = append(notifications, domain.EnnoblementNotification{ Type: domain.EnnoblementNotificationTypeLoss, ServerID: g.ServerID, ChannelID: g.ChannelLosses, Ennoblement: ennoblementToDomainModel(e), }) } } return notifications, nil } type listEnnoblementsResult struct { versionCode string serverKey string ennoblements []twhelp.Ennoblement err error } type fetchEnnoblementsResult struct { versionCode string serverKey string ennoblements []twhelp.Ennoblement } func (m *Monitor) fetchEnnoblements(ctx context.Context, groups []domain.Group) ([]fetchEnnoblementsResult, error) { m.ennoblementsMu.Lock() defer m.ennoblementsMu.Unlock() var wg sync.WaitGroup ch := make(chan listEnnoblementsResult) reqLimiter := make(chan struct{}, m.ennoblementsMaxConcurrentRequests) defer close(reqLimiter) ennoblementsSince := m.ennoblementsSince skip := make(map[string]struct{}, len(ennoblementsSince)) for _, g := range groups { if g.ChannelGains == "" && g.ChannelLosses == "" { continue } key := g.VersionCode + ":" + g.ServerKey if _, ok := skip[key]; ok { continue } skip[key] = struct{}{} since := ennoblementsSince[key] if since.IsZero() { since = time.Now().Add(-1 * time.Minute) } wg.Add(1) go func(g domain.Group, since time.Time) { reqLimiter <- struct{}{} defer func() { <-reqLimiter }() res := listEnnoblementsResult{ versionCode: g.VersionCode, serverKey: g.ServerKey, } res.ennoblements, res.err = m.client.ListEnnoblements( ctx, g.VersionCode, g.ServerKey, twhelp.ListEnnoblementsQueryParams{ Since: since, Sort: []twhelp.ListEnnoblementsSort{twhelp.ListEnnoblementsSortCreatedAtASC}, }, ) ch <- res }(g, since) } go func() { wg.Wait() close(ch) }() // reinitialize ennoblementsSince m.ennoblementsSince = make(map[string]time.Time) //nolint:prealloc var results []fetchEnnoblementsResult for res := range ch { wg.Done() key := res.versionCode + ":" + res.serverKey if l := len(res.ennoblements); l > 0 { m.ennoblementsSince[key] = res.ennoblements[l-1].CreatedAt.Add(time.Second) } else { m.ennoblementsSince[key] = ennoblementsSince[key] } if res.err != nil { // TODO: log this error continue } results = append(results, fetchEnnoblementsResult{ versionCode: res.versionCode, serverKey: res.serverKey, ennoblements: res.ennoblements, }) } return results, nil } func isInternal(e twhelp.Ennoblement, monitors []domain.Monitor) bool { var n, o bool for _, m := range monitors { if m.TribeID == e.NewOwner.Player.Tribe.Tribe.ID { n = true } if m.TribeID == e.OldOwner.Player.Tribe.Tribe.ID { o = true } } return n && o } func isGain(e twhelp.Ennoblement, monitors []domain.Monitor) bool { var n bool for _, m := range monitors { if m.TribeID == e.NewOwner.Player.Tribe.Tribe.ID { n = true break } } return n && e.NewOwner.Player.Tribe.Tribe.ID != e.OldOwner.Player.Tribe.Tribe.ID } func isLoss(e twhelp.Ennoblement, monitors []domain.Monitor) bool { var o bool for _, m := range monitors { if m.TribeID == e.OldOwner.Player.Tribe.Tribe.ID { o = true break } } return o && e.NewOwner.Player.Tribe.Tribe.ID != e.OldOwner.Player.Tribe.Tribe.ID } func ennoblementToDomainModel(e twhelp.Ennoblement) domain.Ennoblement { return domain.Ennoblement{ ID: e.ID, Village: domain.VillageMeta(e.Village), NewOwner: domain.NullPlayerMeta{ Player: domain.PlayerMeta{ ID: e.NewOwner.Player.ID, Name: e.NewOwner.Player.Name, ProfileURL: e.NewOwner.Player.ProfileURL, Tribe: domain.NullTribeMeta{ Tribe: domain.TribeMeta(e.NewOwner.Player.Tribe.Tribe), Valid: e.NewOwner.Player.Tribe.Valid, }, }, Valid: e.NewOwner.Valid, }, OldOwner: domain.NullPlayerMeta{ Player: domain.PlayerMeta{ ID: e.OldOwner.Player.ID, Name: e.OldOwner.Player.Name, ProfileURL: e.OldOwner.Player.ProfileURL, Tribe: domain.NullTribeMeta{ Tribe: domain.TribeMeta(e.OldOwner.Player.Tribe.Tribe), Valid: e.OldOwner.Player.Tribe.Valid, }, }, Valid: e.OldOwner.Valid, }, CreatedAt: e.CreatedAt, } }