core/internal/health/healthfile/observer.go

148 lines
2.7 KiB
Go

package healthfile
import (
"context"
"errors"
"os"
"sync"
"sync/atomic"
"time"
"gitea.dwysokinski.me/twhelp/core/internal/health"
)
type checkFunc func(ctx context.Context) health.Result
type Observer struct {
runningMu sync.Mutex
wg sync.WaitGroup
running atomic.Int32
closeCh chan struct{}
path string
check checkFunc
checkInterval time.Duration
checkTimeout time.Duration
}
type Option func(o *Observer)
func WithCheckInterval(interval time.Duration) Option {
return func(o *Observer) {
o.checkInterval = interval
}
}
func WithCheckTimeout(timeout time.Duration) Option {
return func(o *Observer) {
o.checkTimeout = timeout
}
}
func LiveObserver(h *health.Health, path string, opts ...Option) *Observer {
return newObserver(path, h.CheckLive, opts...)
}
func ReadyObserver(h *health.Health, path string, opts ...Option) *Observer {
return newObserver(path, h.CheckReady, opts...)
}
const (
checkTimeoutDefault = 5 * time.Second
checkIntervalDefault = 15 * time.Second
)
func newObserver(path string, check checkFunc, opts ...Option) *Observer {
o := &Observer{
path: path,
check: check,
checkTimeout: checkTimeoutDefault,
checkInterval: checkIntervalDefault,
}
for _, opt := range opts {
opt(o)
}
return o
}
var ErrAlreadyRunning = errors.New("observer is already running")
func (o *Observer) Run(ctx context.Context) error {
o.runningMu.Lock()
if !o.running.CompareAndSwap(0, 1) {
o.runningMu.Unlock()
return ErrAlreadyRunning
}
o.wg.Add(1)
o.closeCh = make(chan struct{})
o.runningMu.Unlock()
defer func() {
o.running.Add(-1)
o.wg.Done()
}()
ticker := time.NewTicker(o.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-o.closeCh:
return nil
default:
}
o.tick(ctx)
select {
case <-ctx.Done():
return ctx.Err()
case <-o.closeCh:
return nil
case <-ticker.C:
}
}
}
const filePerm = 0644
func (o *Observer) tick(parent context.Context) {
ctx, cancel := context.WithTimeout(parent, o.checkTimeout)
defer cancel()
res := o.check(ctx)
fileExists := checkFileExists(o.path)
switch {
case res.Status == health.StatusPass && !fileExists:
_ = os.WriteFile(o.path, []byte("pass"), filePerm)
case res.Status == health.StatusFail && fileExists:
_ = os.Remove(o.path)
}
}
func (o *Observer) Path() string {
return o.path
}
func (o *Observer) Close() error {
o.runningMu.Lock()
defer o.runningMu.Unlock()
if o.running.Load() == 0 {
return nil
}
close(o.closeCh)
o.wg.Wait()
return nil
}
func checkFileExists(filePath string) bool {
_, err := os.Stat(filePath)
return !errors.Is(err, os.ErrNotExist)
}