148 lines
2.7 KiB
Go
148 lines
2.7 KiB
Go
|
package healthfile
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"gitea.dwysokinski.me/twhelp/corev3/internal/health"
|
||
|
)
|
||
|
|
||
|
type checkFunc func(ctx context.Context) health.Result
|
||
|
|
||
|
type Observer struct {
|
||
|
runningMu sync.Mutex
|
||
|
wg sync.WaitGroup
|
||
|
running atomic.Int32
|
||
|
closeCh chan struct{}
|
||
|
path string
|
||
|
check checkFunc
|
||
|
checkInterval time.Duration
|
||
|
checkTimeout time.Duration
|
||
|
}
|
||
|
|
||
|
type Option func(o *Observer)
|
||
|
|
||
|
func WithCheckInterval(interval time.Duration) Option {
|
||
|
return func(o *Observer) {
|
||
|
o.checkInterval = interval
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithCheckTimeout(timeout time.Duration) Option {
|
||
|
return func(o *Observer) {
|
||
|
o.checkTimeout = timeout
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func LiveObserver(h *health.Health, path string, opts ...Option) *Observer {
|
||
|
return newObserver(path, h.CheckLive, opts...)
|
||
|
}
|
||
|
|
||
|
func ReadyObserver(h *health.Health, path string, opts ...Option) *Observer {
|
||
|
return newObserver(path, h.CheckReady, opts...)
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
checkTimeoutDefault = 5 * time.Second
|
||
|
checkIntervalDefault = 15 * time.Second
|
||
|
)
|
||
|
|
||
|
func newObserver(path string, check checkFunc, opts ...Option) *Observer {
|
||
|
o := &Observer{
|
||
|
path: path,
|
||
|
check: check,
|
||
|
checkTimeout: checkTimeoutDefault,
|
||
|
checkInterval: checkIntervalDefault,
|
||
|
}
|
||
|
|
||
|
for _, opt := range opts {
|
||
|
opt(o)
|
||
|
}
|
||
|
|
||
|
return o
|
||
|
}
|
||
|
|
||
|
var ErrAlreadyRunning = errors.New("observer is already running")
|
||
|
|
||
|
func (o *Observer) Run(ctx context.Context) error {
|
||
|
o.runningMu.Lock()
|
||
|
if !o.running.CompareAndSwap(0, 1) {
|
||
|
o.runningMu.Unlock()
|
||
|
return ErrAlreadyRunning
|
||
|
}
|
||
|
o.wg.Add(1)
|
||
|
o.closeCh = make(chan struct{})
|
||
|
o.runningMu.Unlock()
|
||
|
defer func() {
|
||
|
o.running.Add(-1)
|
||
|
o.wg.Done()
|
||
|
}()
|
||
|
|
||
|
ticker := time.NewTicker(o.checkInterval)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
case <-o.closeCh:
|
||
|
return nil
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
o.tick(ctx)
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
case <-o.closeCh:
|
||
|
return nil
|
||
|
case <-ticker.C:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const filePerm = 0644
|
||
|
|
||
|
func (o *Observer) tick(parent context.Context) {
|
||
|
ctx, cancel := context.WithTimeout(parent, o.checkTimeout)
|
||
|
defer cancel()
|
||
|
|
||
|
res := o.check(ctx)
|
||
|
fileExists := checkFileExists(o.path)
|
||
|
|
||
|
switch {
|
||
|
case res.Status == health.StatusPass && !fileExists:
|
||
|
_ = os.WriteFile(o.path, []byte("pass"), filePerm)
|
||
|
case res.Status == health.StatusFail && fileExists:
|
||
|
_ = os.Remove(o.path)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *Observer) Path() string {
|
||
|
return o.path
|
||
|
}
|
||
|
|
||
|
func (o *Observer) Close() error {
|
||
|
o.runningMu.Lock()
|
||
|
defer o.runningMu.Unlock()
|
||
|
|
||
|
if o.running.Load() == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
close(o.closeCh)
|
||
|
o.wg.Wait()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func checkFileExists(filePath string) bool {
|
||
|
_, err := os.Stat(filePath)
|
||
|
return !errors.Is(err, os.ErrNotExist)
|
||
|
}
|