Files
uptop/internal/monitor/monitor.go
T
lerko 5d5153351e
CI / test (pull_request) Successful in 1m59s
CI / lint (pull_request) Successful in 1m17s
CI / vulncheck (pull_request) Successful in 1m1s
fix(engine): six correctness fixes for the state machine
1. Group auto-pause trap: remove the one-way Paused=true mutation
   from checkGroup — monitorRoutine skipped paused groups, so they
   could never re-evaluate or auto-unpause.

2. Retry logic: apply MaxRetries to all →DOWN transitions, not just
   UP→DOWN. New monitors (PENDING) no longer alert on first transient
   failure when retries are configured.

3. Shutdown drain hole: track checker goroutines with checkerWG so
   Stop() waits for in-flight checks before draining the write queue.
   Final drainWrites() catches any writes enqueued after the writer's
   own drain.

4. Probe-ingest writer bypass: route SaveCheckFromNode through the
   engine's serialized dbWriter instead of writing directly to the
   store from the HTTP handler.

5. Dead-probe expiry: expire stale probe results (>3× site interval)
   before aggregation so a dead probe can't poison status forever.
   Also clean probeResults in RemoveSite.

6. Maintenance-cache N+1: replace per-check DB query with a
   fully-resolved in-memory cache refreshed every poll cycle. One
   GetActiveMaintenanceWindows() call instead of N IsMonitorInMaintenance.

ImportData now wipes check_history, state_changes, and alert_health
so re-inserted IDs don't inherit stale history from prior occupants.
2026-06-11 13:57:03 -04:00

1112 lines
27 KiB
Go

package monitor
import (
"context"
"crypto/tls"
"fmt"
"math/rand/v2"
"net/http"
"regexp"
"strings"
"sync"
"time"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/alert"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store"
)
const (
maxLogEntries = 100
pollInterval = 5 * time.Second
minCheckInterval = 5
minPushGrace = 60 * time.Second
maintPruneInterval = 15 * time.Minute
defaultMaintRetention = 7 * 24 * time.Hour
dbWriteBuffer = 4096
dbPruneInterval = 10 * time.Minute
)
type AlertHealth struct {
LastSendAt time.Time
LastSendOK bool
LastError string
SendCount int
FailCount int
}
type Engine struct {
mu sync.RWMutex
liveState map[int]models.Site
logMu sync.RWMutex
logStore []string
activeMu sync.RWMutex
isActive bool
histMu sync.RWMutex
histories map[int]*SiteHistory
tokenIndex map[string]int // protected by mu
probeResultsMu sync.RWMutex
probeResults map[int]map[string]NodeResult
aggStrategy AggregationStrategy
alertHealthMu sync.RWMutex
alertHealth map[int]AlertHealth
recheckMu sync.RWMutex
recheck map[int]chan struct{}
maintCacheMu sync.RWMutex
maintCache map[int]bool
db store.Store
insecureSkipVerify bool
allowPrivateTargets bool
maintRetention time.Duration
strictClient *http.Client
insecureClient *http.Client
dbWrites chan dbWrite
writerWG sync.WaitGroup
checkerWG sync.WaitGroup
cancel context.CancelFunc
stopOnce sync.Once
}
func NewEngine(s store.Store) *Engine {
return newEngine(s, false)
}
func NewEngineWithOpts(s store.Store, allowPrivateTargets bool) *Engine {
return newEngine(s, allowPrivateTargets)
}
func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
dial := SafeDialContext(allowPrivateTargets)
return &Engine{
liveState: make(map[int]models.Site),
histories: make(map[int]*SiteHistory),
tokenIndex: make(map[string]int),
recheck: make(map[int]chan struct{}),
probeResults: make(map[int]map[string]NodeResult),
alertHealth: make(map[int]AlertHealth),
aggStrategy: AggAnyDown,
isActive: true,
allowPrivateTargets: allowPrivateTargets,
maintRetention: defaultMaintRetention,
dbWrites: make(chan dbWrite, dbWriteBuffer),
db: s,
strictClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
DialContext: dial,
},
},
insecureClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // intentional for IgnoreTLS sites
DialContext: dial,
},
},
}
}
func (e *Engine) SetInsecureSkipVerify(skip bool) {
e.insecureSkipVerify = skip
}
func (e *Engine) SetMaintRetention(d time.Duration) {
e.maintRetention = d
}
var ansiRe = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)
func sanitizeLog(s string) string {
s = ansiRe.ReplaceAllString(s, "")
s = strings.ReplaceAll(s, "\n", "\\n")
s = strings.ReplaceAll(s, "\r", "")
return s
}
func fmtDurationShort(d time.Duration) string {
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
return fmt.Sprintf("%dm", int(d.Minutes()))
}
if d < 24*time.Hour {
return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60)
}
return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24)
}
// appendLog adds a timestamped entry to the in-memory ring buffer and returns
// it. It never touches the database, so it is safe to call from the db-write
// drop/error path without recursing back through the write queue.
func (e *Engine) appendLog(msg string) string {
ts := time.Now().Format("15:04:05")
entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg))
e.logMu.Lock()
e.logStore = append([]string{entry}, e.logStore...)
if len(e.logStore) > maxLogEntries {
e.logStore = e.logStore[:maxLogEntries]
}
e.logMu.Unlock()
return entry
}
func (e *Engine) AddLog(msg string) {
entry := e.appendLog(msg)
e.enqueueWrite(writeLog{message: entry})
}
// enqueueWrite hands a persistence task to the writer goroutine without
// blocking the caller. If the queue is saturated the write is dropped and noted
// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog).
func (e *Engine) enqueueWrite(w dbWrite) {
select {
case e.dbWrites <- w:
default:
e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc()))
}
}
// dbWriter is the single goroutine that owns all writes. Serializing writes
// through one path removes the fire-and-forget goroutine pile-up, surfaces
// errors, and lets retention run on a timer instead of per-insert. It drains
// any buffered writes on shutdown before returning.
func (e *Engine) dbWriter(ctx context.Context) {
defer e.writerWG.Done()
pruneTicker := time.NewTicker(dbPruneInterval)
defer pruneTicker.Stop()
e.prune()
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err))
}
case <-pruneTicker.C:
e.prune()
case <-ctx.Done():
e.drainWrites()
return
}
}
}
// drainWrites flushes everything still buffered, best-effort, at shutdown.
func (e *Engine) drainWrites() {
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err))
}
default:
return
}
}
}
func (e *Engine) prune() {
if err := e.db.PruneLogs(); err != nil {
e.appendLog(fmt.Sprintf("log prune failed: %v", err))
}
if err := e.db.PruneCheckHistory(); err != nil {
e.appendLog(fmt.Sprintf("check-history prune failed: %v", err))
}
if err := e.db.PruneStateChanges(); err != nil {
e.appendLog(fmt.Sprintf("state-change prune failed: %v", err))
}
}
// Stop signals the writer goroutine to drain and exit, then blocks until it
// has. Call it before closing the store so no write races a closed DB.
func (e *Engine) Stop() {
e.stopOnce.Do(func() {
if e.cancel != nil {
e.cancel()
}
e.checkerWG.Wait()
e.writerWG.Wait()
e.drainWrites()
})
}
func (e *Engine) InitLogs() {
logs, err := e.db.LoadLogs(maxLogEntries)
if err != nil {
return
}
if len(logs) == 0 {
return
}
e.logMu.Lock()
defer e.logMu.Unlock()
e.logStore = logs
}
// InitAlertHealth restores persisted alert send health so the dashboard shows real
// "last sent" / health state on startup instead of resetting every channel to "never".
func (e *Engine) InitAlertHealth() {
records, err := e.db.LoadAlertHealth()
if err != nil {
return
}
e.alertHealthMu.Lock()
defer e.alertHealthMu.Unlock()
for id, r := range records {
e.alertHealth[id] = AlertHealth{
LastSendAt: r.LastSendAt,
LastSendOK: r.LastSendOK,
LastError: r.LastError,
SendCount: r.SendCount,
FailCount: r.FailCount,
}
}
}
func (e *Engine) GetLogs() []string {
e.logMu.RLock()
defer e.logMu.RUnlock()
logs := make([]string, len(e.logStore))
copy(logs, e.logStore)
return logs
}
func (e *Engine) SetActive(active bool) {
e.activeMu.Lock()
defer e.activeMu.Unlock()
if e.isActive != active {
e.isActive = active
status := "RESUMED (Active)"
if !active {
status = "PAUSED (Passive)"
}
e.AddLog(fmt.Sprintf("Engine %s", status))
}
}
func (e *Engine) IsActive() bool {
e.activeMu.RLock()
defer e.activeMu.RUnlock()
return e.isActive
}
func (e *Engine) GetAllSites() []models.Site {
e.mu.RLock()
defer e.mu.RUnlock()
sites := make([]models.Site, 0, len(e.liveState))
for _, s := range e.liveState {
sites = append(sites, s)
}
return sites
}
func (e *Engine) GetLiveState() map[int]models.Site {
e.mu.RLock()
defer e.mu.RUnlock()
cp := make(map[int]models.Site, len(e.liveState))
for k, v := range e.liveState {
cp[k] = v
}
return cp
}
func (e *Engine) RecordHeartbeat(token string) bool {
if !e.IsActive() {
return false
}
e.mu.RLock()
targetID, ok := e.tokenIndex[token]
e.mu.RUnlock()
if !ok {
return false
}
var (
prevStatus string
name string
alertID int
downSince time.Time
)
_, exists := e.applyState(targetID, func(s *models.Site) {
prevStatus = s.Status
name = s.Name
alertID = s.AlertID
downSince = s.StatusChangedAt // captured before mutation = when it went down
s.LastCheck = time.Now()
s.Status = "UP"
s.FailureCount = 0
s.Latency = 0
s.LastError = ""
s.LastSuccessAt = time.Now()
if prevStatus != "UP" {
s.StatusChangedAt = time.Now()
}
})
if !exists {
return false
}
switch prevStatus {
case "PENDING":
e.AddLog(fmt.Sprintf("Push Monitor '%s' received first heartbeat", name))
case "LATE":
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was late)", name))
case "STALE":
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was stale)", name))
case "DOWN":
downDur := ""
if !downSince.IsZero() {
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
}
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered%s", name, downDur))
go e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.%s", name, downDur))
}
if prevStatus != "UP" && prevStatus != "PENDING" {
e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: prevStatus, toStatus: "UP"})
}
return true
}
func (e *Engine) addToTokenIndex(site models.Site) {
if site.Type == "push" && site.Token != "" {
e.tokenIndex[site.Token] = site.ID
}
}
func (e *Engine) removeFromTokenIndex(id int) {
for token, sid := range e.tokenIndex {
if sid == id {
delete(e.tokenIndex, token)
return
}
}
}
func (e *Engine) Start(ctx context.Context) {
// e.cancel is invoked by Stop() to drain and halt the writer; gosec can't
// trace the cross-method call, and cancelling the parent reaps this child
// regardless, so the leak it warns about can't occur.
ctx, e.cancel = context.WithCancel(ctx) //nolint:gosec // cancel is called in Stop()
e.writerWG.Add(1)
go e.dbWriter(ctx)
e.checkerWG.Add(1)
go func() {
defer e.checkerWG.Done()
for {
select {
case <-ctx.Done():
return
default:
}
e.refreshMaintenanceCache()
sites, err := e.db.GetSites()
if err != nil {
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
select {
case <-time.After(pollInterval):
case <-ctx.Done():
return
}
continue
}
for _, s := range sites {
e.mu.RLock()
_, exists := e.liveState[s.ID]
e.mu.RUnlock()
if !exists {
e.mu.Lock()
s.Status = "PENDING"
if h, ok := e.GetHistory(s.ID); ok && len(h.Statuses) > 0 {
if h.Statuses[len(h.Statuses)-1] {
s.Status = "UP"
} else {
s.Status = "DOWN"
}
if len(h.Latencies) > 0 {
s.Latency = h.Latencies[len(h.Latencies)-1]
}
}
e.liveState[s.ID] = s
e.addToTokenIndex(s)
e.mu.Unlock()
e.checkerWG.Add(1)
go func(id int) {
defer e.checkerWG.Done()
e.monitorRoutine(ctx, id)
}(s.ID)
}
}
select {
case <-time.After(pollInterval):
case <-ctx.Done():
return
}
}
}()
e.checkerWG.Add(1)
go func() {
defer e.checkerWG.Done()
e.maintenancePruner(ctx)
}()
}
func (e *Engine) maintenancePruner(ctx context.Context) {
ticker := time.NewTicker(maintPruneInterval)
defer ticker.Stop()
e.pruneMaintenanceWindows()
for {
select {
case <-ticker.C:
e.pruneMaintenanceWindows()
case <-ctx.Done():
return
}
}
}
func (e *Engine) pruneMaintenanceWindows() {
pruned, err := e.db.PruneExpiredMaintenanceWindows(e.maintRetention)
if err != nil {
e.AddLog(fmt.Sprintf("Maintenance prune error: %v", err))
return
}
if pruned > 0 {
e.AddLog(fmt.Sprintf("Pruned %d expired maintenance window(s)", pruned))
}
}
func (e *Engine) UpdateSiteConfig(site models.Site) {
e.mu.Lock()
if existing, ok := e.liveState[site.ID]; ok {
e.removeFromTokenIndex(site.ID)
site.Status = existing.Status
site.StatusCode = existing.StatusCode
site.Latency = existing.Latency
site.CertExpiry = existing.CertExpiry
site.HasSSL = existing.HasSSL
site.LastCheck = existing.LastCheck
site.SentSSLWarning = existing.SentSSLWarning
site.FailureCount = existing.FailureCount
site.LastError = existing.LastError
site.StatusChangedAt = existing.StatusChangedAt
site.LastSuccessAt = existing.LastSuccessAt
e.liveState[site.ID] = site
e.addToTokenIndex(site)
}
e.mu.Unlock()
e.signalRecheck(site.ID)
}
func (e *Engine) getRecheckChan(id int) chan struct{} {
e.recheckMu.Lock()
defer e.recheckMu.Unlock()
ch, ok := e.recheck[id]
if !ok {
ch = make(chan struct{}, 1)
e.recheck[id] = ch
}
return ch
}
func (e *Engine) signalRecheck(id int) {
ch := e.getRecheckChan(id)
select {
case ch <- struct{}{}:
default:
}
}
func (e *Engine) RemoveSite(id int) {
e.mu.Lock()
e.removeFromTokenIndex(id)
delete(e.liveState, id)
e.mu.Unlock()
e.removeHistory(id)
e.probeResultsMu.Lock()
delete(e.probeResults, id)
e.probeResultsMu.Unlock()
e.recheckMu.Lock()
delete(e.recheck, id)
e.recheckMu.Unlock()
}
func (e *Engine) ToggleSitePause(id int) bool {
var (
paused bool
name string
)
_, ok := e.applyState(id, func(s *models.Site) {
s.Paused = !s.Paused
paused = s.Paused
name = s.Name
})
if !ok {
return false
}
if paused {
e.AddLog(fmt.Sprintf("Monitor '%s' paused", name))
} else {
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", name))
}
return paused
}
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
recheckCh := e.getRecheckChan(id)
// Stagger initial check to avoid thundering herd on startup
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
select {
case <-time.After(stagger):
case <-ctx.Done():
return
}
e.checkByID(id)
for {
select {
case <-ctx.Done():
return
default:
}
if !e.IsActive() {
select {
case <-time.After(pollInterval):
case <-ctx.Done():
return
case <-recheckCh:
}
continue
}
e.mu.RLock()
site, exists := e.liveState[id]
e.mu.RUnlock()
if !exists {
return
}
if site.Paused {
select {
case <-time.After(pollInterval):
case <-ctx.Done():
return
case <-recheckCh:
}
continue
}
interval := site.Interval
if interval < minCheckInterval {
interval = minCheckInterval
}
jitter := time.Duration(rand.IntN(interval*100)) * time.Millisecond //nolint:gosec // non-security jitter
select {
case <-time.After(time.Duration(interval)*time.Second + jitter):
case <-ctx.Done():
return
case <-recheckCh:
}
e.checkByID(id)
}
}
// applyState atomically reads, mutates, and writes back the live entry for id.
// The mutator runs under the engine write lock and receives a pointer to the
// CURRENT live state, so concurrent config edits, pauses, and heartbeats are
// never clobbered by a stale snapshot. The mutator must only touch runtime /
// check-result fields — config fields (Name/URL/Type/Token/Interval/AlertID/…)
// are owned by UpdateSiteConfig and must not be written here. Returns the
// post-mutation copy and whether the site still exists.
func (e *Engine) applyState(id int, mutate func(s *models.Site)) (models.Site, bool) {
e.mu.Lock()
defer e.mu.Unlock()
cur, ok := e.liveState[id]
if !ok {
return models.Site{}, false
}
mutate(&cur)
e.liveState[id] = cur
return cur, true
}
func (e *Engine) checkByID(id int) {
if !e.IsActive() {
return
}
e.mu.RLock()
site, exists := e.liveState[id]
e.mu.RUnlock()
if !exists || site.Paused {
return
}
switch site.Type {
case "push":
e.checkPush(site)
case "group":
e.checkGroup(site)
default:
result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify, e.allowPrivateTargets)
updatedSite := site
updatedSite.HasSSL = result.HasSSL
updatedSite.CertExpiry = result.CertExpiry
updatedSite.Latency = time.Duration(result.LatencyNs)
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs), result.ErrorReason)
}
}
func (e *Engine) checkPush(site models.Site) {
if site.Status == "PENDING" {
return
}
interval := time.Duration(site.Interval) * time.Second
grace := interval / 2
if grace < minPushGrace {
grace = minPushGrace
}
overdue := site.LastCheck.Add(interval)
staleMark := overdue.Add(grace / 2)
graceEnd := overdue.Add(grace)
now := time.Now()
if now.After(graceEnd) {
if site.Status != "DOWN" {
e.handleStatusChange(site, "DOWN", 0, 0, "heartbeat missed")
}
} else if now.After(staleMark) {
if site.Status != "STALE" {
e.handleStatusChange(site, "STALE", 0, 0, "heartbeat stale")
}
} else if now.After(overdue) {
if site.Status != "LATE" {
e.handleStatusChange(site, "LATE", 0, 0, "heartbeat overdue")
}
}
}
// handleStatusChange folds a check result into the live state. snap is the
// stale snapshot the check ran against; the actual mutation is applied onto the
// CURRENT live entry via applyState, so a concurrent pause / config edit /
// heartbeat is never reverted by this write. Logs and alerts are emitted after
// the lock is released, off the critical section.
func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int, latency time.Duration, errorReason string) {
if !e.IsActive() {
return
}
inMaint := e.isInMaintenance(snap.ID)
var (
prev, next string
name, typ string
alertID int
failCount, maxRetries int
confirmedDown bool
failedCheck bool
downSince time.Time
sslWarnFire bool
sslDays int
skipped bool
changed bool
)
_, exists := e.applyState(snap.ID, func(s *models.Site) {
// A non-UP result computed from a stale snapshot must not override a
// heartbeat (or newer check) that landed while we were evaluating.
if rawStatus != "UP" && s.LastCheck.After(snap.LastCheck) {
skipped = true
return
}
prev = s.Status
name = s.Name
typ = s.Type
alertID = s.AlertID
maxRetries = s.MaxRetries
downSince = s.StatusChangedAt
// Fresh check results (measured by the run against snap).
s.StatusCode = code
s.Latency = snap.Latency
s.LastCheck = snap.LastCheck
s.HasSSL = snap.HasSSL
s.CertExpiry = snap.CertExpiry
s.LastError = errorReason
if rawStatus == "UP" {
s.LastSuccessAt = time.Now()
s.LastError = ""
}
// Status + failure-count transition, based on the CURRENT live status.
if rawStatus == "UP" {
s.FailureCount = 0
s.Status = "UP"
} else {
if s.FailureCount <= s.MaxRetries {
s.FailureCount++
}
if s.FailureCount > s.MaxRetries {
if s.Status != rawStatus {
confirmedDown = true
}
s.Status = rawStatus
s.FailureCount = s.MaxRetries + 1
} else {
failedCheck = true
}
}
failCount = s.FailureCount
if s.Status != prev && prev != "PENDING" {
s.StatusChangedAt = time.Now()
} else if s.StatusChangedAt.IsZero() && s.Status != "PENDING" {
s.StatusChangedAt = time.Now()
}
// SSL expiry warning (fresh HasSSL/CertExpiry + config threshold).
if typ == "http" && s.CheckSSL && s.HasSSL {
days := int(time.Until(s.CertExpiry).Hours() / 24)
if days <= s.ExpiryThreshold && !s.SentSSLWarning && rawStatus != "SSL EXP" {
sslWarnFire = true
sslDays = days
s.SentSSLWarning = true
} else if days > s.ExpiryThreshold {
s.SentSSLWarning = false
}
}
next = s.Status
changed = next != prev
})
if !exists || skipped {
return
}
e.recordCheck(snap.ID, latency, rawStatus == "UP")
if confirmedDown {
if errorReason != "" {
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN: %s", name, errorReason))
} else {
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", name))
}
} else if failedCheck {
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", name, failCount, maxRetries))
}
if changed && prev != "PENDING" {
e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: prev, toStatus: next, reason: errorReason})
}
if sslWarnFire {
if !inMaint {
e.triggerAlert(alertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", name, sslDays))
} else {
e.AddLog(fmt.Sprintf("SSL warning for '%s' suppressed (maintenance)", name))
}
}
isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" }
if prev == "UP" && next == "LATE" {
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat overdue", name))
}
if !isBroken(prev) && isBroken(next) && next != "PENDING" {
if inMaint {
e.AddLog(fmt.Sprintf("Monitor '%s' is DOWN (alerts suppressed — maintenance)", name))
} else {
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", name, rawStatus)
if errorReason != "" {
msg = fmt.Sprintf("Monitor '%s' is DOWN: %s", name, errorReason)
}
if typ == "push" {
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", name)
}
e.triggerAlert(alertID, "🚨 ALERT", msg)
}
}
if isBroken(prev) && next == "UP" {
downDur := ""
if !downSince.IsZero() {
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
}
e.AddLog(fmt.Sprintf("Monitor '%s' recovered%s", name, downDur))
if !inMaint {
e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP%s", name, downDur))
}
}
if prev == "LATE" && next == "UP" && !isBroken(prev) {
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat arrived (was late)", name))
}
}
func (e *Engine) triggerAlert(alertID int, title, message string) {
cfg, err := e.db.GetAlert(alertID)
if err != nil {
e.AddLog(fmt.Sprintf("Failed to load alert config %d: %v", alertID, err))
return
}
provider := alert.GetProvider(cfg)
if provider != nil {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := provider.Send(ctx, title, message); err != nil {
e.AddLog(fmt.Sprintf("Alert send failed (%s): %v", cfg.Name, err))
e.recordAlertResult(alertID, false, err.Error())
} else {
e.recordAlertResult(alertID, true, "")
}
}()
}
}
func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) {
e.alertHealthMu.Lock()
defer e.alertHealthMu.Unlock()
h := e.alertHealth[alertID]
h.LastSendAt = time.Now()
h.LastSendOK = ok
h.SendCount++
if ok {
h.LastError = ""
} else {
h.LastError = errMsg
h.FailCount++
}
e.alertHealth[alertID] = h
// Persist so health survives restarts; DB IO off the alert path.
e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{
AlertID: alertID,
LastSendAt: h.LastSendAt,
LastSendOK: h.LastSendOK,
LastError: h.LastError,
SendCount: h.SendCount,
FailCount: h.FailCount,
}})
}
func (e *Engine) GetAlertHealth(alertID int) AlertHealth {
e.alertHealthMu.RLock()
defer e.alertHealthMu.RUnlock()
return e.alertHealth[alertID]
}
func (e *Engine) TestAlert(alertID int) error {
cfg, err := e.db.GetAlert(alertID)
if err != nil {
return fmt.Errorf("failed to load alert: %w", err)
}
provider := alert.GetProvider(cfg)
if provider == nil {
return fmt.Errorf("no provider for type %q", cfg.Type)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err = provider.Send(ctx, "🧪 Test Alert", fmt.Sprintf("Test notification from uptop for channel '%s'.", cfg.Name))
if err != nil {
e.recordAlertResult(alertID, false, err.Error())
return err
}
e.recordAlertResult(alertID, true, "")
e.AddLog(fmt.Sprintf("Test alert sent to '%s'", cfg.Name))
return nil
}
func (e *Engine) isInMaintenance(monitorID int) bool {
e.maintCacheMu.RLock()
defer e.maintCacheMu.RUnlock()
return e.maintCache[monitorID]
}
func (e *Engine) refreshMaintenanceCache() {
windows, err := e.db.GetActiveMaintenanceWindows()
if err != nil {
return
}
directMaint := make(map[int]bool)
var globalMaint bool
for _, w := range windows {
if w.MonitorID == 0 {
globalMaint = true
} else {
directMaint[w.MonitorID] = true
}
}
resolved := make(map[int]bool)
e.mu.RLock()
for id, site := range e.liveState {
if globalMaint || directMaint[id] || (site.ParentID > 0 && directMaint[site.ParentID]) {
resolved[id] = true
}
}
e.mu.RUnlock()
e.maintCacheMu.Lock()
e.maintCache = resolved
e.maintCacheMu.Unlock()
}
func (e *Engine) GetDisplayStatus(site models.Site) string {
if site.Paused {
return "PAUSED"
}
if e.isInMaintenance(site.ID) {
return "MAINT"
}
return site.Status
}
func (e *Engine) checkGroup(site models.Site) {
e.mu.RLock()
status := "UP"
hasChildren := false
for _, child := range e.liveState {
if child.ParentID != site.ID || child.Type == "group" {
continue
}
hasChildren = true
if child.Paused || e.isInMaintenance(child.ID) {
continue
}
if child.Status == "DOWN" || child.Status == "SSL EXP" {
status = "DOWN"
} else if child.Status == "STALE" && status != "DOWN" {
status = "STALE"
} else if child.Status == "LATE" && status != "DOWN" && status != "STALE" {
status = "LATE"
} else if child.Status == "PENDING" && status != "DOWN" && status != "STALE" && status != "LATE" {
status = "PENDING"
}
}
e.mu.RUnlock()
if !hasChildren {
status = "PENDING"
}
e.applyState(site.ID, func(s *models.Site) {
s.Status = status
})
}
func (e *Engine) EnqueueProbeCheck(siteID int, nodeID string, latencyNs int64, isUp bool) {
e.enqueueWrite(writeProbeCheck{siteID: siteID, nodeID: nodeID, latencyNs: latencyNs, isUp: isUp})
}
func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
e.aggStrategy = strategy
}
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool, errorReason string) {
e.mu.RLock()
site, exists := e.liveState[siteID]
e.mu.RUnlock()
if !exists {
return
}
staleAfter := time.Duration(site.Interval) * time.Second * 3
if staleAfter < time.Minute {
staleAfter = time.Minute
}
now := time.Now()
e.probeResultsMu.Lock()
if e.probeResults[siteID] == nil {
e.probeResults[siteID] = make(map[string]NodeResult)
}
e.probeResults[siteID][nodeID] = NodeResult{
NodeID: nodeID,
IsUp: isUp,
LatencyNs: latencyNs,
CheckedAt: now,
ErrorReason: errorReason,
}
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
for id, r := range e.probeResults[siteID] {
if now.Sub(r.CheckedAt) > staleAfter {
delete(e.probeResults[siteID], id)
continue
}
results = append(results, r)
}
e.probeResultsMu.Unlock()
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
rawStatus := "UP"
if !aggUp {
rawStatus = "DOWN"
}
updatedSite := site
updatedSite.Latency = time.Duration(avgLatency)
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency), errorReason)
}
func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
e.probeResultsMu.RLock()
defer e.probeResultsMu.RUnlock()
src := e.probeResults[siteID]
cp := make(map[string]NodeResult, len(src))
for k, v := range src {
cp[k] = v
}
return cp
}
func (e *Engine) GetStateChanges(siteID int, limit int) []models.StateChange {
changes, err := e.db.GetStateChanges(siteID, limit)
if err != nil {
return nil
}
return changes
}
func (e *Engine) GetStateChangesSince(siteID int, since time.Time) []models.StateChange {
changes, err := e.db.GetStateChangesSince(siteID, since)
if err != nil {
return nil
}
return changes
}