5d2b7a3e66
1. Alertless monitors no longer spam error logs — triggerAlert returns early when alertID <= 0. 2. HTTP response body drained before close — enables connection reuse via keep-alive instead of fresh TCP+TLS per check. 3. /api/backup/export enforces GET — was the only endpoint accepting any HTTP method. 4. limitStr guards against max < 3 — prevents negative slice index panic on very narrow terminals. 5. Filter input accepts multibyte characters — len(msg.Runes) instead of len(msg.String()) for proper Unicode support. 6. Startup warning corrected — with no UPTOP_CLUSTER_SECRET, endpoints reject (401), not accept. Warning now says so. 7. UPTOP_KEYS file open failure logged — was silently swallowed, leaving operators with no admin seeded and no message.
1104 lines
28 KiB
Go
1104 lines
28 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/alert"
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store"
|
|
)
|
|
|
|
const (
|
|
maxLogEntries = 100
|
|
pollInterval = 5 * time.Second
|
|
minCheckInterval = 5
|
|
minPushGrace = 60 * time.Second
|
|
maintPruneInterval = 15 * time.Minute
|
|
defaultMaintRetention = 7 * 24 * time.Hour
|
|
dbWriteBuffer = 4096
|
|
dbPruneInterval = 10 * time.Minute
|
|
)
|
|
|
|
type AlertHealth struct {
|
|
LastSendAt time.Time
|
|
LastSendOK bool
|
|
LastError string
|
|
SendCount int
|
|
FailCount int
|
|
}
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
liveState map[int]models.Site
|
|
|
|
logMu sync.RWMutex
|
|
logStore []string
|
|
|
|
activeMu sync.RWMutex
|
|
isActive bool
|
|
|
|
histMu sync.RWMutex
|
|
histories map[int]*SiteHistory
|
|
|
|
tokenIndex map[string]int // protected by mu
|
|
|
|
probeResultsMu sync.RWMutex
|
|
probeResults map[int]map[string]NodeResult
|
|
aggStrategy AggregationStrategy
|
|
|
|
alertHealthMu sync.RWMutex
|
|
alertHealth map[int]AlertHealth
|
|
|
|
recheckMu sync.RWMutex
|
|
recheck map[int]chan struct{}
|
|
|
|
maintCacheMu sync.RWMutex
|
|
maintCache map[int]bool
|
|
|
|
db store.Store
|
|
insecureSkipVerify bool
|
|
allowPrivateTargets bool
|
|
maintRetention time.Duration
|
|
strictClient *http.Client
|
|
insecureClient *http.Client
|
|
|
|
dbWrites chan dbWrite
|
|
writerWG sync.WaitGroup
|
|
checkerWG sync.WaitGroup
|
|
cancel context.CancelFunc
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
func NewEngine(s store.Store) *Engine {
|
|
return newEngine(s, false)
|
|
}
|
|
|
|
func NewEngineWithOpts(s store.Store, allowPrivateTargets bool) *Engine {
|
|
return newEngine(s, allowPrivateTargets)
|
|
}
|
|
|
|
func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
|
|
dial := SafeDialContext(allowPrivateTargets)
|
|
return &Engine{
|
|
liveState: make(map[int]models.Site),
|
|
histories: make(map[int]*SiteHistory),
|
|
tokenIndex: make(map[string]int),
|
|
recheck: make(map[int]chan struct{}),
|
|
probeResults: make(map[int]map[string]NodeResult),
|
|
alertHealth: make(map[int]AlertHealth),
|
|
aggStrategy: AggAnyDown,
|
|
isActive: true,
|
|
allowPrivateTargets: allowPrivateTargets,
|
|
maintRetention: defaultMaintRetention,
|
|
dbWrites: make(chan dbWrite, dbWriteBuffer),
|
|
db: s,
|
|
strictClient: &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
|
|
DialContext: dial,
|
|
},
|
|
},
|
|
insecureClient: &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // intentional for IgnoreTLS sites
|
|
DialContext: dial,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
|
e.insecureSkipVerify = skip
|
|
}
|
|
|
|
func (e *Engine) SetMaintRetention(d time.Duration) {
|
|
e.maintRetention = d
|
|
}
|
|
|
|
var ansiRe = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)
|
|
|
|
func sanitizeLog(s string) string {
|
|
s = ansiRe.ReplaceAllString(s, "")
|
|
s = strings.ReplaceAll(s, "\n", "\\n")
|
|
s = strings.ReplaceAll(s, "\r", "")
|
|
return s
|
|
}
|
|
|
|
func fmtDurationShort(d time.Duration) string {
|
|
if d < time.Minute {
|
|
return fmt.Sprintf("%ds", int(d.Seconds()))
|
|
}
|
|
if d < time.Hour {
|
|
return fmt.Sprintf("%dm", int(d.Minutes()))
|
|
}
|
|
if d < 24*time.Hour {
|
|
return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60)
|
|
}
|
|
return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24)
|
|
}
|
|
|
|
// appendLog adds a timestamped entry to the in-memory ring buffer and returns
|
|
// it. It never touches the database, so it is safe to call from the db-write
|
|
// drop/error path without recursing back through the write queue.
|
|
func (e *Engine) appendLog(msg string) string {
|
|
ts := time.Now().Format("15:04:05")
|
|
entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg))
|
|
e.logMu.Lock()
|
|
e.logStore = append([]string{entry}, e.logStore...)
|
|
if len(e.logStore) > maxLogEntries {
|
|
e.logStore = e.logStore[:maxLogEntries]
|
|
}
|
|
e.logMu.Unlock()
|
|
return entry
|
|
}
|
|
|
|
func (e *Engine) AddLog(msg string) {
|
|
entry := e.appendLog(msg)
|
|
e.enqueueWrite(writeLog{message: entry})
|
|
}
|
|
|
|
// enqueueWrite hands a persistence task to the writer goroutine without
|
|
// blocking the caller. If the queue is saturated the write is dropped and noted
|
|
// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog).
|
|
func (e *Engine) enqueueWrite(w dbWrite) {
|
|
select {
|
|
case e.dbWrites <- w:
|
|
default:
|
|
e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc()))
|
|
}
|
|
}
|
|
|
|
// dbWriter is the single goroutine that owns all writes. Serializing writes
|
|
// through one path removes the fire-and-forget goroutine pile-up, surfaces
|
|
// errors, and lets retention run on a timer instead of per-insert. It drains
|
|
// any buffered writes on shutdown before returning.
|
|
func (e *Engine) dbWriter(ctx context.Context) {
|
|
defer e.writerWG.Done()
|
|
|
|
pruneTicker := time.NewTicker(dbPruneInterval)
|
|
defer pruneTicker.Stop()
|
|
e.prune(ctx)
|
|
|
|
for {
|
|
select {
|
|
case w := <-e.dbWrites:
|
|
if err := w.exec(ctx, e.db); err != nil {
|
|
e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err))
|
|
}
|
|
case <-pruneTicker.C:
|
|
e.prune(ctx)
|
|
case <-ctx.Done():
|
|
e.drainWrites()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// drainWrites flushes everything still buffered, best-effort, at shutdown.
|
|
func (e *Engine) drainWrites() {
|
|
for {
|
|
select {
|
|
case w := <-e.dbWrites:
|
|
if err := w.exec(context.Background(), e.db); err != nil {
|
|
e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err))
|
|
}
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) prune(ctx context.Context) {
|
|
if err := e.db.PruneLogs(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("log prune failed: %v", err))
|
|
}
|
|
if err := e.db.PruneCheckHistory(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("check-history prune failed: %v", err))
|
|
}
|
|
if err := e.db.PruneStateChanges(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("state-change prune failed: %v", err))
|
|
}
|
|
}
|
|
|
|
// Stop signals the writer goroutine to drain and exit, then blocks until it
|
|
// has. Call it before closing the store so no write races a closed DB.
|
|
func (e *Engine) Stop() {
|
|
e.stopOnce.Do(func() {
|
|
if e.cancel != nil {
|
|
e.cancel()
|
|
}
|
|
e.checkerWG.Wait()
|
|
e.writerWG.Wait()
|
|
e.drainWrites()
|
|
})
|
|
}
|
|
|
|
func (e *Engine) InitLogs() {
|
|
logs, err := e.db.LoadLogs(context.Background(), maxLogEntries)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(logs) == 0 {
|
|
return
|
|
}
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
e.logStore = logs
|
|
}
|
|
|
|
// InitAlertHealth restores persisted alert send health so the dashboard shows real
|
|
// "last sent" / health state on startup instead of resetting every channel to "never".
|
|
func (e *Engine) InitAlertHealth() {
|
|
records, err := e.db.LoadAlertHealth(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
e.alertHealthMu.Lock()
|
|
defer e.alertHealthMu.Unlock()
|
|
for id, r := range records {
|
|
e.alertHealth[id] = AlertHealth{
|
|
LastSendAt: r.LastSendAt,
|
|
LastSendOK: r.LastSendOK,
|
|
LastError: r.LastError,
|
|
SendCount: r.SendCount,
|
|
FailCount: r.FailCount,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) GetLogs() []string {
|
|
e.logMu.RLock()
|
|
defer e.logMu.RUnlock()
|
|
logs := make([]string, len(e.logStore))
|
|
copy(logs, e.logStore)
|
|
return logs
|
|
}
|
|
|
|
func (e *Engine) SetActive(active bool) {
|
|
e.activeMu.Lock()
|
|
defer e.activeMu.Unlock()
|
|
if e.isActive != active {
|
|
e.isActive = active
|
|
status := "RESUMED (Active)"
|
|
if !active {
|
|
status = "PAUSED (Passive)"
|
|
}
|
|
e.AddLog(fmt.Sprintf("Engine %s", status))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) IsActive() bool {
|
|
e.activeMu.RLock()
|
|
defer e.activeMu.RUnlock()
|
|
return e.isActive
|
|
}
|
|
|
|
func (e *Engine) GetAllSites() []models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
sites := make([]models.Site, 0, len(e.liveState))
|
|
for _, s := range e.liveState {
|
|
sites = append(sites, s)
|
|
}
|
|
return sites
|
|
}
|
|
|
|
func (e *Engine) GetLiveState() map[int]models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
cp := make(map[int]models.Site, len(e.liveState))
|
|
for k, v := range e.liveState {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) RecordHeartbeat(token string) bool {
|
|
if !e.IsActive() {
|
|
return false
|
|
}
|
|
|
|
e.mu.RLock()
|
|
targetID, ok := e.tokenIndex[token]
|
|
e.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
var (
|
|
prevStatus models.Status
|
|
name string
|
|
alertID int
|
|
downSince time.Time
|
|
)
|
|
_, exists := e.applyState(targetID, func(s *models.Site) {
|
|
prevStatus = s.Status
|
|
name = s.Name
|
|
alertID = s.AlertID
|
|
downSince = s.StatusChangedAt // captured before mutation = when it went down
|
|
|
|
s.LastCheck = time.Now()
|
|
s.Status = models.StatusUp
|
|
s.FailureCount = 0
|
|
s.Latency = 0
|
|
s.LastError = ""
|
|
s.LastSuccessAt = time.Now()
|
|
if prevStatus != models.StatusUp {
|
|
s.StatusChangedAt = time.Now()
|
|
}
|
|
})
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
switch prevStatus {
|
|
case models.StatusPending:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' received first heartbeat", name))
|
|
case models.StatusLate:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was late)", name))
|
|
case models.StatusStale:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was stale)", name))
|
|
case models.StatusDown:
|
|
downDur := ""
|
|
if !downSince.IsZero() {
|
|
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
|
|
}
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered%s", name, downDur))
|
|
go e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.%s", name, downDur))
|
|
}
|
|
|
|
if prevStatus != models.StatusUp && prevStatus != models.StatusPending {
|
|
e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: string(prevStatus), toStatus: string(models.StatusUp)})
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) addToTokenIndex(site models.Site) {
|
|
if site.Type == "push" && site.Token != "" {
|
|
e.tokenIndex[site.Token] = site.ID
|
|
}
|
|
}
|
|
|
|
func (e *Engine) removeFromTokenIndex(id int) {
|
|
for token, sid := range e.tokenIndex {
|
|
if sid == id {
|
|
delete(e.tokenIndex, token)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start(ctx context.Context) {
|
|
// e.cancel is invoked by Stop() to drain and halt the writer; gosec can't
|
|
// trace the cross-method call, and cancelling the parent reaps this child
|
|
// regardless, so the leak it warns about can't occur.
|
|
ctx, e.cancel = context.WithCancel(ctx) //nolint:gosec // cancel is called in Stop()
|
|
|
|
e.writerWG.Add(1)
|
|
go e.dbWriter(ctx)
|
|
|
|
e.checkerWG.Add(1)
|
|
go func() {
|
|
defer e.checkerWG.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
e.refreshMaintenanceCache(ctx)
|
|
|
|
configs, err := e.db.GetSites(ctx)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
for _, cfg := range configs {
|
|
e.mu.RLock()
|
|
_, exists := e.liveState[cfg.ID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
e.mu.Lock()
|
|
site := models.Site{SiteConfig: cfg, SiteState: models.SiteState{Status: models.StatusPending}}
|
|
if h, ok := e.GetHistory(cfg.ID); ok && len(h.Statuses) > 0 {
|
|
if h.Statuses[len(h.Statuses)-1] {
|
|
site.Status = models.StatusUp
|
|
} else {
|
|
site.Status = models.StatusDown
|
|
}
|
|
if len(h.Latencies) > 0 {
|
|
site.Latency = h.Latencies[len(h.Latencies)-1]
|
|
}
|
|
}
|
|
e.liveState[cfg.ID] = site
|
|
e.addToTokenIndex(site)
|
|
e.mu.Unlock()
|
|
e.checkerWG.Add(1)
|
|
go func(id int) {
|
|
defer e.checkerWG.Done()
|
|
e.monitorRoutine(ctx, id)
|
|
}(cfg.ID)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
e.checkerWG.Add(1)
|
|
go func() {
|
|
defer e.checkerWG.Done()
|
|
e.maintenancePruner(ctx)
|
|
}()
|
|
}
|
|
|
|
func (e *Engine) maintenancePruner(ctx context.Context) {
|
|
ticker := time.NewTicker(maintPruneInterval)
|
|
defer ticker.Stop()
|
|
|
|
e.pruneMaintenanceWindows(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
e.pruneMaintenanceWindows(ctx)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) pruneMaintenanceWindows(ctx context.Context) {
|
|
pruned, err := e.db.PruneExpiredMaintenanceWindows(ctx, e.maintRetention)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Maintenance prune error: %v", err))
|
|
return
|
|
}
|
|
if pruned > 0 {
|
|
e.AddLog(fmt.Sprintf("Pruned %d expired maintenance window(s)", pruned))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) UpdateSiteConfig(cfg models.SiteConfig) {
|
|
e.mu.Lock()
|
|
if existing, ok := e.liveState[cfg.ID]; ok {
|
|
e.removeFromTokenIndex(cfg.ID)
|
|
existing.SiteConfig = cfg
|
|
e.liveState[cfg.ID] = existing
|
|
e.addToTokenIndex(existing)
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.signalRecheck(cfg.ID)
|
|
}
|
|
|
|
func (e *Engine) getRecheckChan(id int) chan struct{} {
|
|
e.recheckMu.Lock()
|
|
defer e.recheckMu.Unlock()
|
|
ch, ok := e.recheck[id]
|
|
if !ok {
|
|
ch = make(chan struct{}, 1)
|
|
e.recheck[id] = ch
|
|
}
|
|
return ch
|
|
}
|
|
|
|
func (e *Engine) signalRecheck(id int) {
|
|
ch := e.getRecheckChan(id)
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (e *Engine) RemoveSite(id int) {
|
|
e.mu.Lock()
|
|
e.removeFromTokenIndex(id)
|
|
delete(e.liveState, id)
|
|
e.mu.Unlock()
|
|
e.removeHistory(id)
|
|
|
|
e.probeResultsMu.Lock()
|
|
delete(e.probeResults, id)
|
|
e.probeResultsMu.Unlock()
|
|
|
|
e.recheckMu.Lock()
|
|
delete(e.recheck, id)
|
|
e.recheckMu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) ToggleSitePause(id int) bool {
|
|
var (
|
|
paused bool
|
|
name string
|
|
)
|
|
_, ok := e.applyState(id, func(s *models.Site) {
|
|
s.Paused = !s.Paused
|
|
paused = s.Paused
|
|
name = s.Name
|
|
})
|
|
if !ok {
|
|
return false
|
|
}
|
|
if paused {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' paused", name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", name))
|
|
}
|
|
return paused
|
|
}
|
|
|
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|
recheckCh := e.getRecheckChan(id)
|
|
|
|
// Stagger initial check to avoid thundering herd on startup
|
|
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(stagger):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
e.checkByID(ctx, id)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.IsActive() {
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if site.Paused {
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
continue
|
|
}
|
|
|
|
interval := site.Interval
|
|
if interval < minCheckInterval {
|
|
interval = minCheckInterval
|
|
}
|
|
jitter := time.Duration(rand.IntN(interval*100)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(time.Duration(interval)*time.Second + jitter):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
e.checkByID(ctx, id)
|
|
}
|
|
}
|
|
|
|
// applyState atomically reads, mutates, and writes back the live entry for id.
|
|
// The mutator runs under the engine write lock and receives a pointer to the
|
|
// CURRENT live state, so concurrent config edits, pauses, and heartbeats are
|
|
// never clobbered by a stale snapshot. The mutator must only touch runtime /
|
|
// check-result fields — config fields (Name/URL/Type/Token/Interval/AlertID/…)
|
|
// are owned by UpdateSiteConfig and must not be written here. Returns the
|
|
// post-mutation copy and whether the site still exists.
|
|
func (e *Engine) applyState(id int, mutate func(s *models.Site)) (models.Site, bool) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
cur, ok := e.liveState[id]
|
|
if !ok {
|
|
return models.Site{}, false
|
|
}
|
|
mutate(&cur)
|
|
e.liveState[id] = cur
|
|
return cur, true
|
|
}
|
|
|
|
func (e *Engine) checkByID(ctx context.Context, id int) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists || site.Paused {
|
|
return
|
|
}
|
|
|
|
switch site.Type {
|
|
case "push":
|
|
e.checkPush(ctx, site)
|
|
case "group":
|
|
e.checkGroup(ctx, site)
|
|
default:
|
|
result := RunCheck(ctx, site.SiteConfig, e.strictClient, e.insecureClient, e.insecureSkipVerify, e.allowPrivateTargets)
|
|
updatedSite := site
|
|
updatedSite.HasSSL = result.HasSSL
|
|
updatedSite.CertExpiry = result.CertExpiry
|
|
updatedSite.Latency = time.Duration(result.LatencyNs)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs), result.ErrorReason)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkPush(_ context.Context, site models.Site) {
|
|
if site.Status == models.StatusPending {
|
|
return
|
|
}
|
|
|
|
interval := time.Duration(site.Interval) * time.Second
|
|
grace := interval / 2
|
|
if grace < minPushGrace {
|
|
grace = minPushGrace
|
|
}
|
|
|
|
overdue := site.LastCheck.Add(interval)
|
|
staleMark := overdue.Add(grace / 2)
|
|
graceEnd := overdue.Add(grace)
|
|
now := time.Now()
|
|
|
|
if now.After(graceEnd) {
|
|
if site.Status != models.StatusDown {
|
|
e.handleStatusChange(site, string(models.StatusDown), 0, 0, "heartbeat missed")
|
|
}
|
|
} else if now.After(staleMark) {
|
|
if site.Status != models.StatusStale {
|
|
e.handleStatusChange(site, string(models.StatusStale), 0, 0, "heartbeat stale")
|
|
}
|
|
} else if now.After(overdue) {
|
|
if site.Status != models.StatusLate {
|
|
e.handleStatusChange(site, string(models.StatusLate), 0, 0, "heartbeat overdue")
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleStatusChange folds a check result into the live state. snap is the
|
|
// stale snapshot the check ran against; the actual mutation is applied onto the
|
|
// CURRENT live entry via applyState, so a concurrent pause / config edit /
|
|
// heartbeat is never reverted by this write. Logs and alerts are emitted after
|
|
// the lock is released, off the critical section.
|
|
func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int, latency time.Duration, errorReason string) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
inMaint := e.isInMaintenance(snap.ID)
|
|
status := models.Status(rawStatus)
|
|
|
|
var (
|
|
prev, next models.Status
|
|
name, typ string
|
|
alertID int
|
|
failCount, maxRetries int
|
|
confirmedDown bool
|
|
failedCheck bool
|
|
downSince time.Time
|
|
sslWarnFire bool
|
|
sslDays int
|
|
skipped bool
|
|
changed bool
|
|
)
|
|
|
|
_, exists := e.applyState(snap.ID, func(s *models.Site) {
|
|
// A non-UP result computed from a stale snapshot must not override a
|
|
// heartbeat (or newer check) that landed while we were evaluating.
|
|
if status != models.StatusUp && s.LastCheck.After(snap.LastCheck) {
|
|
skipped = true
|
|
return
|
|
}
|
|
|
|
prev = s.Status
|
|
name = s.Name
|
|
typ = s.Type
|
|
alertID = s.AlertID
|
|
maxRetries = s.MaxRetries
|
|
downSince = s.StatusChangedAt
|
|
|
|
// Fresh check results (measured by the run against snap).
|
|
s.StatusCode = code
|
|
s.Latency = snap.Latency
|
|
s.LastCheck = snap.LastCheck
|
|
s.HasSSL = snap.HasSSL
|
|
s.CertExpiry = snap.CertExpiry
|
|
s.LastError = errorReason
|
|
if status == models.StatusUp {
|
|
s.LastSuccessAt = time.Now()
|
|
s.LastError = ""
|
|
}
|
|
|
|
// Status + failure-count transition, based on the CURRENT live status.
|
|
if status == models.StatusUp {
|
|
s.FailureCount = 0
|
|
s.Status = models.StatusUp
|
|
} else {
|
|
if s.FailureCount <= s.MaxRetries {
|
|
s.FailureCount++
|
|
}
|
|
if s.FailureCount > s.MaxRetries {
|
|
if s.Status != status {
|
|
confirmedDown = true
|
|
}
|
|
s.Status = status
|
|
s.FailureCount = s.MaxRetries + 1
|
|
} else {
|
|
failedCheck = true
|
|
}
|
|
}
|
|
failCount = s.FailureCount
|
|
|
|
if s.Status != prev && prev != models.StatusPending {
|
|
s.StatusChangedAt = time.Now()
|
|
} else if s.StatusChangedAt.IsZero() && s.Status != models.StatusPending {
|
|
s.StatusChangedAt = time.Now()
|
|
}
|
|
|
|
// SSL expiry warning (fresh HasSSL/CertExpiry + config threshold).
|
|
if typ == "http" && s.CheckSSL && s.HasSSL {
|
|
days := int(time.Until(s.CertExpiry).Hours() / 24)
|
|
if days <= s.ExpiryThreshold && !s.SentSSLWarning && status != models.StatusSSLExp {
|
|
sslWarnFire = true
|
|
sslDays = days
|
|
s.SentSSLWarning = true
|
|
} else if days > s.ExpiryThreshold {
|
|
s.SentSSLWarning = false
|
|
}
|
|
}
|
|
|
|
next = s.Status
|
|
changed = next != prev
|
|
})
|
|
|
|
if !exists || skipped {
|
|
return
|
|
}
|
|
|
|
e.recordCheck(snap.ID, latency, status == models.StatusUp)
|
|
|
|
if confirmedDown {
|
|
if errorReason != "" {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN: %s", name, errorReason))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", name))
|
|
}
|
|
} else if failedCheck {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", name, failCount, maxRetries))
|
|
}
|
|
|
|
if changed && prev != models.StatusPending {
|
|
e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: string(prev), toStatus: string(next), reason: errorReason})
|
|
}
|
|
|
|
if sslWarnFire {
|
|
if !inMaint {
|
|
e.triggerAlert(alertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", name, sslDays))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("SSL warning for '%s' suppressed (maintenance)", name))
|
|
}
|
|
}
|
|
|
|
if prev == models.StatusUp && next == models.StatusLate {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat overdue", name))
|
|
}
|
|
|
|
if !prev.IsBroken() && next.IsBroken() && next != models.StatusPending {
|
|
if inMaint {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' is DOWN (alerts suppressed — maintenance)", name))
|
|
} else {
|
|
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", name, rawStatus)
|
|
if errorReason != "" {
|
|
msg = fmt.Sprintf("Monitor '%s' is DOWN: %s", name, errorReason)
|
|
}
|
|
if typ == "push" {
|
|
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", name)
|
|
}
|
|
e.triggerAlert(alertID, "🚨 ALERT", msg)
|
|
}
|
|
}
|
|
if prev.IsBroken() && next == models.StatusUp {
|
|
downDur := ""
|
|
if !downSince.IsZero() {
|
|
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
|
|
}
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' recovered%s", name, downDur))
|
|
if !inMaint {
|
|
e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP%s", name, downDur))
|
|
}
|
|
}
|
|
if prev == models.StatusLate && next == models.StatusUp && !prev.IsBroken() {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat arrived (was late)", name))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
|
if alertID <= 0 {
|
|
return
|
|
}
|
|
cfg, err := e.db.GetAlert(context.Background(), alertID)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load alert config %d: %v", alertID, err))
|
|
return
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider != nil {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if err := provider.Send(ctx, title, message); err != nil {
|
|
e.AddLog(fmt.Sprintf("Alert send failed (%s): %v", cfg.Name, err))
|
|
e.recordAlertResult(alertID, false, err.Error())
|
|
} else {
|
|
e.recordAlertResult(alertID, true, "")
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) {
|
|
e.alertHealthMu.Lock()
|
|
defer e.alertHealthMu.Unlock()
|
|
h := e.alertHealth[alertID]
|
|
h.LastSendAt = time.Now()
|
|
h.LastSendOK = ok
|
|
h.SendCount++
|
|
if ok {
|
|
h.LastError = ""
|
|
} else {
|
|
h.LastError = errMsg
|
|
h.FailCount++
|
|
}
|
|
e.alertHealth[alertID] = h
|
|
|
|
// Persist so health survives restarts; DB IO off the alert path.
|
|
e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{
|
|
AlertID: alertID,
|
|
LastSendAt: h.LastSendAt,
|
|
LastSendOK: h.LastSendOK,
|
|
LastError: h.LastError,
|
|
SendCount: h.SendCount,
|
|
FailCount: h.FailCount,
|
|
}})
|
|
}
|
|
|
|
func (e *Engine) GetAlertHealth(alertID int) AlertHealth {
|
|
e.alertHealthMu.RLock()
|
|
defer e.alertHealthMu.RUnlock()
|
|
return e.alertHealth[alertID]
|
|
}
|
|
|
|
func (e *Engine) TestAlert(alertID int) error {
|
|
cfg, err := e.db.GetAlert(context.Background(), alertID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load alert: %w", err)
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider == nil {
|
|
return fmt.Errorf("no provider for type %q", cfg.Type)
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
err = provider.Send(ctx, "🧪 Test Alert", fmt.Sprintf("Test notification from uptop for channel '%s'.", cfg.Name))
|
|
if err != nil {
|
|
e.recordAlertResult(alertID, false, err.Error())
|
|
return err
|
|
}
|
|
e.recordAlertResult(alertID, true, "")
|
|
e.AddLog(fmt.Sprintf("Test alert sent to '%s'", cfg.Name))
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) isInMaintenance(monitorID int) bool {
|
|
e.maintCacheMu.RLock()
|
|
defer e.maintCacheMu.RUnlock()
|
|
return e.maintCache[monitorID]
|
|
}
|
|
|
|
func (e *Engine) refreshMaintenanceCache(ctx context.Context) {
|
|
windows, err := e.db.GetActiveMaintenanceWindows(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
directMaint := make(map[int]bool)
|
|
var globalMaint bool
|
|
for _, w := range windows {
|
|
if w.MonitorID == 0 {
|
|
globalMaint = true
|
|
} else {
|
|
directMaint[w.MonitorID] = true
|
|
}
|
|
}
|
|
|
|
resolved := make(map[int]bool)
|
|
e.mu.RLock()
|
|
for id, site := range e.liveState {
|
|
if globalMaint || directMaint[id] || (site.ParentID > 0 && directMaint[site.ParentID]) {
|
|
resolved[id] = true
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
e.maintCacheMu.Lock()
|
|
e.maintCache = resolved
|
|
e.maintCacheMu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) GetDisplayStatus(site models.Site) string {
|
|
if site.Paused {
|
|
return "PAUSED"
|
|
}
|
|
if e.isInMaintenance(site.ID) {
|
|
return "MAINT"
|
|
}
|
|
return string(site.Status)
|
|
}
|
|
|
|
func (e *Engine) checkGroup(_ context.Context, site models.Site) {
|
|
e.mu.RLock()
|
|
status := models.StatusUp
|
|
hasChildren := false
|
|
for _, child := range e.liveState {
|
|
if child.ParentID != site.ID || child.Type == "group" {
|
|
continue
|
|
}
|
|
hasChildren = true
|
|
if child.Paused || e.isInMaintenance(child.ID) {
|
|
continue
|
|
}
|
|
if child.Status == models.StatusDown || child.Status == models.StatusSSLExp {
|
|
status = models.StatusDown
|
|
} else if child.Status == models.StatusStale && status != models.StatusDown {
|
|
status = models.StatusStale
|
|
} else if child.Status == models.StatusLate && status != models.StatusDown && status != models.StatusStale {
|
|
status = models.StatusLate
|
|
} else if child.Status == models.StatusPending && status != models.StatusDown && status != models.StatusStale && status != models.StatusLate {
|
|
status = models.StatusPending
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if !hasChildren {
|
|
status = models.StatusPending
|
|
}
|
|
|
|
e.applyState(site.ID, func(s *models.Site) {
|
|
s.Status = status
|
|
})
|
|
}
|
|
|
|
func (e *Engine) EnqueueProbeCheck(siteID int, nodeID string, latencyNs int64, isUp bool) {
|
|
e.enqueueWrite(writeProbeCheck{siteID: siteID, nodeID: nodeID, latencyNs: latencyNs, isUp: isUp})
|
|
}
|
|
|
|
func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
|
|
e.aggStrategy = strategy
|
|
}
|
|
|
|
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool, errorReason string) {
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[siteID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
staleAfter := time.Duration(site.Interval) * time.Second * 3
|
|
if staleAfter < time.Minute {
|
|
staleAfter = time.Minute
|
|
}
|
|
|
|
now := time.Now()
|
|
e.probeResultsMu.Lock()
|
|
if e.probeResults[siteID] == nil {
|
|
e.probeResults[siteID] = make(map[string]NodeResult)
|
|
}
|
|
e.probeResults[siteID][nodeID] = NodeResult{
|
|
NodeID: nodeID,
|
|
IsUp: isUp,
|
|
LatencyNs: latencyNs,
|
|
CheckedAt: now,
|
|
ErrorReason: errorReason,
|
|
}
|
|
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
|
|
for id, r := range e.probeResults[siteID] {
|
|
if now.Sub(r.CheckedAt) > staleAfter {
|
|
delete(e.probeResults[siteID], id)
|
|
continue
|
|
}
|
|
results = append(results, r)
|
|
}
|
|
e.probeResultsMu.Unlock()
|
|
|
|
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
|
|
|
|
probeStatus := models.StatusUp
|
|
if !aggUp {
|
|
probeStatus = models.StatusDown
|
|
}
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = time.Duration(avgLatency)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, string(probeStatus), 0, time.Duration(avgLatency), errorReason)
|
|
}
|
|
|
|
func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
|
|
e.probeResultsMu.RLock()
|
|
defer e.probeResultsMu.RUnlock()
|
|
src := e.probeResults[siteID]
|
|
cp := make(map[string]NodeResult, len(src))
|
|
for k, v := range src {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) GetStateChanges(siteID int, limit int) []models.StateChange {
|
|
changes, err := e.db.GetStateChanges(context.Background(), siteID, limit)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return changes
|
|
}
|
|
|
|
func (e *Engine) GetStateChangesSince(siteID int, since time.Time) []models.StateChange {
|
|
changes, err := e.db.GetStateChangesSince(context.Background(), siteID, since)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return changes
|
|
}
|