916c963663
1. Poll loop now fully converges with the DB: updated site configs are refreshed via UpdateSiteConfig, and sites removed from the DB are evicted from liveState. Previously the loop only added new sites — config edits via apply were ignored until restart, and pruned sites kept being checked and alerting. 2. Push monitors now record check history on each heartbeat via recordCheck. Previously RecordHeartbeat updated state but never wrote to check_history — push uptime % and sparklines were empty. 3. Groups record a synthetic check per evaluation tick so they get uptime history and sparklines instead of blank displays.
1124 lines
28 KiB
Go
1124 lines
28 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/alert"
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
|
|
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store"
|
|
)
|
|
|
|
const (
|
|
maxLogEntries = 100
|
|
pollInterval = 5 * time.Second
|
|
minCheckInterval = 5
|
|
minPushGrace = 60 * time.Second
|
|
maintPruneInterval = 15 * time.Minute
|
|
defaultMaintRetention = 7 * 24 * time.Hour
|
|
dbWriteBuffer = 4096
|
|
dbPruneInterval = 10 * time.Minute
|
|
)
|
|
|
|
type AlertHealth struct {
|
|
LastSendAt time.Time
|
|
LastSendOK bool
|
|
LastError string
|
|
SendCount int
|
|
FailCount int
|
|
}
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
liveState map[int]models.Site
|
|
|
|
logMu sync.RWMutex
|
|
logStore []string
|
|
|
|
activeMu sync.RWMutex
|
|
isActive bool
|
|
|
|
histMu sync.RWMutex
|
|
histories map[int]*SiteHistory
|
|
|
|
tokenIndex map[string]int // protected by mu
|
|
|
|
probeResultsMu sync.RWMutex
|
|
probeResults map[int]map[string]NodeResult
|
|
aggStrategy AggregationStrategy
|
|
|
|
alertHealthMu sync.RWMutex
|
|
alertHealth map[int]AlertHealth
|
|
|
|
recheckMu sync.RWMutex
|
|
recheck map[int]chan struct{}
|
|
|
|
maintCacheMu sync.RWMutex
|
|
maintCache map[int]bool
|
|
|
|
db store.Store
|
|
insecureSkipVerify bool
|
|
allowPrivateTargets bool
|
|
maintRetention time.Duration
|
|
strictClient *http.Client
|
|
insecureClient *http.Client
|
|
|
|
dbWrites chan dbWrite
|
|
writerWG sync.WaitGroup
|
|
checkerWG sync.WaitGroup
|
|
cancel context.CancelFunc
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
func NewEngine(s store.Store) *Engine {
|
|
return newEngine(s, false)
|
|
}
|
|
|
|
func NewEngineWithOpts(s store.Store, allowPrivateTargets bool) *Engine {
|
|
return newEngine(s, allowPrivateTargets)
|
|
}
|
|
|
|
func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
|
|
dial := SafeDialContext(allowPrivateTargets)
|
|
return &Engine{
|
|
liveState: make(map[int]models.Site),
|
|
histories: make(map[int]*SiteHistory),
|
|
tokenIndex: make(map[string]int),
|
|
recheck: make(map[int]chan struct{}),
|
|
probeResults: make(map[int]map[string]NodeResult),
|
|
alertHealth: make(map[int]AlertHealth),
|
|
aggStrategy: AggAnyDown,
|
|
isActive: true,
|
|
allowPrivateTargets: allowPrivateTargets,
|
|
maintRetention: defaultMaintRetention,
|
|
dbWrites: make(chan dbWrite, dbWriteBuffer),
|
|
db: s,
|
|
strictClient: &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
|
|
DialContext: dial,
|
|
},
|
|
},
|
|
insecureClient: &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // intentional for IgnoreTLS sites
|
|
DialContext: dial,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
|
e.insecureSkipVerify = skip
|
|
}
|
|
|
|
func (e *Engine) SetMaintRetention(d time.Duration) {
|
|
e.maintRetention = d
|
|
}
|
|
|
|
var ansiRe = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)
|
|
|
|
func sanitizeLog(s string) string {
|
|
s = ansiRe.ReplaceAllString(s, "")
|
|
s = strings.ReplaceAll(s, "\n", "\\n")
|
|
s = strings.ReplaceAll(s, "\r", "")
|
|
return s
|
|
}
|
|
|
|
func fmtDurationShort(d time.Duration) string {
|
|
if d < time.Minute {
|
|
return fmt.Sprintf("%ds", int(d.Seconds()))
|
|
}
|
|
if d < time.Hour {
|
|
return fmt.Sprintf("%dm", int(d.Minutes()))
|
|
}
|
|
if d < 24*time.Hour {
|
|
return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60)
|
|
}
|
|
return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24)
|
|
}
|
|
|
|
// appendLog adds a timestamped entry to the in-memory ring buffer and returns
|
|
// it. It never touches the database, so it is safe to call from the db-write
|
|
// drop/error path without recursing back through the write queue.
|
|
func (e *Engine) appendLog(msg string) string {
|
|
ts := time.Now().Format("15:04:05")
|
|
entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg))
|
|
e.logMu.Lock()
|
|
e.logStore = append([]string{entry}, e.logStore...)
|
|
if len(e.logStore) > maxLogEntries {
|
|
e.logStore = e.logStore[:maxLogEntries]
|
|
}
|
|
e.logMu.Unlock()
|
|
return entry
|
|
}
|
|
|
|
func (e *Engine) AddLog(msg string) {
|
|
entry := e.appendLog(msg)
|
|
e.enqueueWrite(writeLog{message: entry})
|
|
}
|
|
|
|
// enqueueWrite hands a persistence task to the writer goroutine without
|
|
// blocking the caller. If the queue is saturated the write is dropped and noted
|
|
// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog).
|
|
func (e *Engine) enqueueWrite(w dbWrite) {
|
|
select {
|
|
case e.dbWrites <- w:
|
|
default:
|
|
e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc()))
|
|
}
|
|
}
|
|
|
|
// dbWriter is the single goroutine that owns all writes. Serializing writes
|
|
// through one path removes the fire-and-forget goroutine pile-up, surfaces
|
|
// errors, and lets retention run on a timer instead of per-insert. It drains
|
|
// any buffered writes on shutdown before returning.
|
|
func (e *Engine) dbWriter(ctx context.Context) {
|
|
defer e.writerWG.Done()
|
|
|
|
pruneTicker := time.NewTicker(dbPruneInterval)
|
|
defer pruneTicker.Stop()
|
|
e.prune(ctx)
|
|
|
|
for {
|
|
select {
|
|
case w := <-e.dbWrites:
|
|
if err := w.exec(ctx, e.db); err != nil {
|
|
e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err))
|
|
}
|
|
case <-pruneTicker.C:
|
|
e.prune(ctx)
|
|
case <-ctx.Done():
|
|
e.drainWrites()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// drainWrites flushes everything still buffered, best-effort, at shutdown.
|
|
func (e *Engine) drainWrites() {
|
|
for {
|
|
select {
|
|
case w := <-e.dbWrites:
|
|
if err := w.exec(context.Background(), e.db); err != nil {
|
|
e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err))
|
|
}
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) prune(ctx context.Context) {
|
|
if err := e.db.PruneLogs(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("log prune failed: %v", err))
|
|
}
|
|
if err := e.db.PruneCheckHistory(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("check-history prune failed: %v", err))
|
|
}
|
|
if err := e.db.PruneStateChanges(ctx); err != nil {
|
|
e.appendLog(fmt.Sprintf("state-change prune failed: %v", err))
|
|
}
|
|
}
|
|
|
|
// Stop signals the writer goroutine to drain and exit, then blocks until it
|
|
// has. Call it before closing the store so no write races a closed DB.
|
|
func (e *Engine) Stop() {
|
|
e.stopOnce.Do(func() {
|
|
if e.cancel != nil {
|
|
e.cancel()
|
|
}
|
|
e.checkerWG.Wait()
|
|
e.writerWG.Wait()
|
|
e.drainWrites()
|
|
})
|
|
}
|
|
|
|
func (e *Engine) InitLogs() {
|
|
logs, err := e.db.LoadLogs(context.Background(), maxLogEntries)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(logs) == 0 {
|
|
return
|
|
}
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
e.logStore = logs
|
|
}
|
|
|
|
// InitAlertHealth restores persisted alert send health so the dashboard shows real
|
|
// "last sent" / health state on startup instead of resetting every channel to "never".
|
|
func (e *Engine) InitAlertHealth() {
|
|
records, err := e.db.LoadAlertHealth(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
e.alertHealthMu.Lock()
|
|
defer e.alertHealthMu.Unlock()
|
|
for id, r := range records {
|
|
e.alertHealth[id] = AlertHealth{
|
|
LastSendAt: r.LastSendAt,
|
|
LastSendOK: r.LastSendOK,
|
|
LastError: r.LastError,
|
|
SendCount: r.SendCount,
|
|
FailCount: r.FailCount,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) GetLogs() []string {
|
|
e.logMu.RLock()
|
|
defer e.logMu.RUnlock()
|
|
logs := make([]string, len(e.logStore))
|
|
copy(logs, e.logStore)
|
|
return logs
|
|
}
|
|
|
|
func (e *Engine) SetActive(active bool) {
|
|
e.activeMu.Lock()
|
|
defer e.activeMu.Unlock()
|
|
if e.isActive != active {
|
|
e.isActive = active
|
|
status := "RESUMED (Active)"
|
|
if !active {
|
|
status = "PAUSED (Passive)"
|
|
}
|
|
e.AddLog(fmt.Sprintf("Engine %s", status))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) IsActive() bool {
|
|
e.activeMu.RLock()
|
|
defer e.activeMu.RUnlock()
|
|
return e.isActive
|
|
}
|
|
|
|
func (e *Engine) GetAllSites() []models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
sites := make([]models.Site, 0, len(e.liveState))
|
|
for _, s := range e.liveState {
|
|
sites = append(sites, s)
|
|
}
|
|
return sites
|
|
}
|
|
|
|
func (e *Engine) GetLiveState() map[int]models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
cp := make(map[int]models.Site, len(e.liveState))
|
|
for k, v := range e.liveState {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) RecordHeartbeat(token string) bool {
|
|
if !e.IsActive() {
|
|
return false
|
|
}
|
|
|
|
e.mu.RLock()
|
|
targetID, ok := e.tokenIndex[token]
|
|
e.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
var (
|
|
prevStatus models.Status
|
|
name string
|
|
alertID int
|
|
downSince time.Time
|
|
)
|
|
_, exists := e.applyState(targetID, func(s *models.Site) {
|
|
prevStatus = s.Status
|
|
name = s.Name
|
|
alertID = s.AlertID
|
|
downSince = s.StatusChangedAt // captured before mutation = when it went down
|
|
|
|
s.LastCheck = time.Now()
|
|
s.Status = models.StatusUp
|
|
s.FailureCount = 0
|
|
s.Latency = 0
|
|
s.LastError = ""
|
|
s.LastSuccessAt = time.Now()
|
|
if prevStatus != models.StatusUp {
|
|
s.StatusChangedAt = time.Now()
|
|
}
|
|
})
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
switch prevStatus {
|
|
case models.StatusPending:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' received first heartbeat", name))
|
|
case models.StatusLate:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was late)", name))
|
|
case models.StatusStale:
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' heartbeat arrived (was stale)", name))
|
|
case models.StatusDown:
|
|
downDur := ""
|
|
if !downSince.IsZero() {
|
|
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
|
|
}
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered%s", name, downDur))
|
|
go e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.%s", name, downDur))
|
|
}
|
|
|
|
e.recordCheck(targetID, 0, true)
|
|
|
|
if prevStatus != models.StatusUp && prevStatus != models.StatusPending {
|
|
e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: string(prevStatus), toStatus: string(models.StatusUp)})
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) addToTokenIndex(site models.Site) {
|
|
if site.Type == "push" && site.Token != "" {
|
|
e.tokenIndex[site.Token] = site.ID
|
|
}
|
|
}
|
|
|
|
func (e *Engine) removeFromTokenIndex(id int) {
|
|
for token, sid := range e.tokenIndex {
|
|
if sid == id {
|
|
delete(e.tokenIndex, token)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start(ctx context.Context) {
|
|
// e.cancel is invoked by Stop() to drain and halt the writer; gosec can't
|
|
// trace the cross-method call, and cancelling the parent reaps this child
|
|
// regardless, so the leak it warns about can't occur.
|
|
ctx, e.cancel = context.WithCancel(ctx) //nolint:gosec // cancel is called in Stop()
|
|
|
|
e.writerWG.Add(1)
|
|
go e.dbWriter(ctx)
|
|
|
|
e.checkerWG.Add(1)
|
|
go func() {
|
|
defer e.checkerWG.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
e.refreshMaintenanceCache(ctx)
|
|
|
|
configs, err := e.db.GetSites(ctx)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
dbIDs := make(map[int]bool, len(configs))
|
|
for _, cfg := range configs {
|
|
dbIDs[cfg.ID] = true
|
|
e.mu.RLock()
|
|
existing, exists := e.liveState[cfg.ID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
e.mu.Lock()
|
|
site := models.Site{SiteConfig: cfg, SiteState: models.SiteState{Status: models.StatusPending}}
|
|
if h, ok := e.GetHistory(cfg.ID); ok && len(h.Statuses) > 0 {
|
|
if h.Statuses[len(h.Statuses)-1] {
|
|
site.Status = models.StatusUp
|
|
} else {
|
|
site.Status = models.StatusDown
|
|
}
|
|
if len(h.Latencies) > 0 {
|
|
site.Latency = h.Latencies[len(h.Latencies)-1]
|
|
}
|
|
}
|
|
e.liveState[cfg.ID] = site
|
|
e.addToTokenIndex(site)
|
|
e.mu.Unlock()
|
|
e.checkerWG.Add(1)
|
|
go func(id int) {
|
|
defer e.checkerWG.Done()
|
|
e.monitorRoutine(ctx, id)
|
|
}(cfg.ID)
|
|
} else if existing.SiteConfig != cfg {
|
|
e.UpdateSiteConfig(cfg)
|
|
}
|
|
}
|
|
|
|
e.mu.RLock()
|
|
var vanished []int
|
|
for id := range e.liveState {
|
|
if !dbIDs[id] {
|
|
vanished = append(vanished, id)
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
for _, id := range vanished {
|
|
e.RemoveSite(id)
|
|
e.AddLog(fmt.Sprintf("Monitor removed (no longer in DB): ID %d", id))
|
|
}
|
|
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
e.checkerWG.Add(1)
|
|
go func() {
|
|
defer e.checkerWG.Done()
|
|
e.maintenancePruner(ctx)
|
|
}()
|
|
}
|
|
|
|
func (e *Engine) maintenancePruner(ctx context.Context) {
|
|
ticker := time.NewTicker(maintPruneInterval)
|
|
defer ticker.Stop()
|
|
|
|
e.pruneMaintenanceWindows(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
e.pruneMaintenanceWindows(ctx)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) pruneMaintenanceWindows(ctx context.Context) {
|
|
pruned, err := e.db.PruneExpiredMaintenanceWindows(ctx, e.maintRetention)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Maintenance prune error: %v", err))
|
|
return
|
|
}
|
|
if pruned > 0 {
|
|
e.AddLog(fmt.Sprintf("Pruned %d expired maintenance window(s)", pruned))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) UpdateSiteConfig(cfg models.SiteConfig) {
|
|
e.mu.Lock()
|
|
if existing, ok := e.liveState[cfg.ID]; ok {
|
|
e.removeFromTokenIndex(cfg.ID)
|
|
existing.SiteConfig = cfg
|
|
e.liveState[cfg.ID] = existing
|
|
e.addToTokenIndex(existing)
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.signalRecheck(cfg.ID)
|
|
}
|
|
|
|
func (e *Engine) getRecheckChan(id int) chan struct{} {
|
|
e.recheckMu.Lock()
|
|
defer e.recheckMu.Unlock()
|
|
ch, ok := e.recheck[id]
|
|
if !ok {
|
|
ch = make(chan struct{}, 1)
|
|
e.recheck[id] = ch
|
|
}
|
|
return ch
|
|
}
|
|
|
|
func (e *Engine) signalRecheck(id int) {
|
|
ch := e.getRecheckChan(id)
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (e *Engine) RemoveSite(id int) {
|
|
e.mu.Lock()
|
|
e.removeFromTokenIndex(id)
|
|
delete(e.liveState, id)
|
|
e.mu.Unlock()
|
|
e.removeHistory(id)
|
|
|
|
e.probeResultsMu.Lock()
|
|
delete(e.probeResults, id)
|
|
e.probeResultsMu.Unlock()
|
|
|
|
e.recheckMu.Lock()
|
|
delete(e.recheck, id)
|
|
e.recheckMu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) ToggleSitePause(id int) bool {
|
|
var (
|
|
paused bool
|
|
name string
|
|
)
|
|
_, ok := e.applyState(id, func(s *models.Site) {
|
|
s.Paused = !s.Paused
|
|
paused = s.Paused
|
|
name = s.Name
|
|
})
|
|
if !ok {
|
|
return false
|
|
}
|
|
if paused {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' paused", name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", name))
|
|
}
|
|
return paused
|
|
}
|
|
|
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|
recheckCh := e.getRecheckChan(id)
|
|
|
|
// Stagger initial check to avoid thundering herd on startup
|
|
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(stagger):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
e.checkByID(ctx, id)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.IsActive() {
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if site.Paused {
|
|
select {
|
|
case <-time.After(pollInterval):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
continue
|
|
}
|
|
|
|
interval := site.Interval
|
|
if interval < minCheckInterval {
|
|
interval = minCheckInterval
|
|
}
|
|
jitter := time.Duration(rand.IntN(interval*100)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(time.Duration(interval)*time.Second + jitter):
|
|
case <-ctx.Done():
|
|
return
|
|
case <-recheckCh:
|
|
}
|
|
e.checkByID(ctx, id)
|
|
}
|
|
}
|
|
|
|
// applyState atomically reads, mutates, and writes back the live entry for id.
|
|
// The mutator runs under the engine write lock and receives a pointer to the
|
|
// CURRENT live state, so concurrent config edits, pauses, and heartbeats are
|
|
// never clobbered by a stale snapshot. The mutator must only touch runtime /
|
|
// check-result fields — config fields (Name/URL/Type/Token/Interval/AlertID/…)
|
|
// are owned by UpdateSiteConfig and must not be written here. Returns the
|
|
// post-mutation copy and whether the site still exists.
|
|
func (e *Engine) applyState(id int, mutate func(s *models.Site)) (models.Site, bool) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
cur, ok := e.liveState[id]
|
|
if !ok {
|
|
return models.Site{}, false
|
|
}
|
|
mutate(&cur)
|
|
e.liveState[id] = cur
|
|
return cur, true
|
|
}
|
|
|
|
func (e *Engine) checkByID(ctx context.Context, id int) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists || site.Paused {
|
|
return
|
|
}
|
|
|
|
switch site.Type {
|
|
case "push":
|
|
e.checkPush(ctx, site)
|
|
case "group":
|
|
e.checkGroup(ctx, site)
|
|
default:
|
|
result := RunCheck(ctx, site.SiteConfig, e.strictClient, e.insecureClient, e.insecureSkipVerify, e.allowPrivateTargets)
|
|
updatedSite := site
|
|
updatedSite.HasSSL = result.HasSSL
|
|
updatedSite.CertExpiry = result.CertExpiry
|
|
updatedSite.Latency = time.Duration(result.LatencyNs)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs), result.ErrorReason)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkPush(_ context.Context, site models.Site) {
|
|
if site.Status == models.StatusPending {
|
|
return
|
|
}
|
|
|
|
interval := time.Duration(site.Interval) * time.Second
|
|
grace := interval / 2
|
|
if grace < minPushGrace {
|
|
grace = minPushGrace
|
|
}
|
|
|
|
overdue := site.LastCheck.Add(interval)
|
|
staleMark := overdue.Add(grace / 2)
|
|
graceEnd := overdue.Add(grace)
|
|
now := time.Now()
|
|
|
|
if now.After(graceEnd) {
|
|
if site.Status != models.StatusDown {
|
|
e.handleStatusChange(site, string(models.StatusDown), 0, 0, "heartbeat missed")
|
|
}
|
|
} else if now.After(staleMark) {
|
|
if site.Status != models.StatusStale {
|
|
e.handleStatusChange(site, string(models.StatusStale), 0, 0, "heartbeat stale")
|
|
}
|
|
} else if now.After(overdue) {
|
|
if site.Status != models.StatusLate {
|
|
e.handleStatusChange(site, string(models.StatusLate), 0, 0, "heartbeat overdue")
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleStatusChange folds a check result into the live state. snap is the
|
|
// stale snapshot the check ran against; the actual mutation is applied onto the
|
|
// CURRENT live entry via applyState, so a concurrent pause / config edit /
|
|
// heartbeat is never reverted by this write. Logs and alerts are emitted after
|
|
// the lock is released, off the critical section.
|
|
func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int, latency time.Duration, errorReason string) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
inMaint := e.isInMaintenance(snap.ID)
|
|
status := models.Status(rawStatus)
|
|
|
|
var (
|
|
prev, next models.Status
|
|
name, typ string
|
|
alertID int
|
|
failCount, maxRetries int
|
|
confirmedDown bool
|
|
failedCheck bool
|
|
downSince time.Time
|
|
sslWarnFire bool
|
|
sslDays int
|
|
skipped bool
|
|
changed bool
|
|
)
|
|
|
|
_, exists := e.applyState(snap.ID, func(s *models.Site) {
|
|
// A non-UP result computed from a stale snapshot must not override a
|
|
// heartbeat (or newer check) that landed while we were evaluating.
|
|
if status != models.StatusUp && s.LastCheck.After(snap.LastCheck) {
|
|
skipped = true
|
|
return
|
|
}
|
|
|
|
prev = s.Status
|
|
name = s.Name
|
|
typ = s.Type
|
|
alertID = s.AlertID
|
|
maxRetries = s.MaxRetries
|
|
downSince = s.StatusChangedAt
|
|
|
|
// Fresh check results (measured by the run against snap).
|
|
s.StatusCode = code
|
|
s.Latency = snap.Latency
|
|
s.LastCheck = snap.LastCheck
|
|
s.HasSSL = snap.HasSSL
|
|
s.CertExpiry = snap.CertExpiry
|
|
s.LastError = errorReason
|
|
if status == models.StatusUp {
|
|
s.LastSuccessAt = time.Now()
|
|
s.LastError = ""
|
|
}
|
|
|
|
// Status + failure-count transition, based on the CURRENT live status.
|
|
if status == models.StatusUp {
|
|
s.FailureCount = 0
|
|
s.Status = models.StatusUp
|
|
} else {
|
|
if s.FailureCount <= s.MaxRetries {
|
|
s.FailureCount++
|
|
}
|
|
if s.FailureCount > s.MaxRetries {
|
|
if s.Status != status {
|
|
confirmedDown = true
|
|
}
|
|
s.Status = status
|
|
s.FailureCount = s.MaxRetries + 1
|
|
} else {
|
|
failedCheck = true
|
|
}
|
|
}
|
|
failCount = s.FailureCount
|
|
|
|
if s.Status != prev && prev != models.StatusPending {
|
|
s.StatusChangedAt = time.Now()
|
|
} else if s.StatusChangedAt.IsZero() && s.Status != models.StatusPending {
|
|
s.StatusChangedAt = time.Now()
|
|
}
|
|
|
|
// SSL expiry warning (fresh HasSSL/CertExpiry + config threshold).
|
|
if typ == "http" && s.CheckSSL && s.HasSSL {
|
|
days := int(time.Until(s.CertExpiry).Hours() / 24)
|
|
if days <= s.ExpiryThreshold && !s.SentSSLWarning && status != models.StatusSSLExp {
|
|
sslWarnFire = true
|
|
sslDays = days
|
|
s.SentSSLWarning = true
|
|
} else if days > s.ExpiryThreshold {
|
|
s.SentSSLWarning = false
|
|
}
|
|
}
|
|
|
|
next = s.Status
|
|
changed = next != prev
|
|
})
|
|
|
|
if !exists || skipped {
|
|
return
|
|
}
|
|
|
|
e.recordCheck(snap.ID, latency, status == models.StatusUp)
|
|
|
|
if confirmedDown {
|
|
if errorReason != "" {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN: %s", name, errorReason))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", name))
|
|
}
|
|
} else if failedCheck {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", name, failCount, maxRetries))
|
|
}
|
|
|
|
if changed && prev != models.StatusPending {
|
|
e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: string(prev), toStatus: string(next), reason: errorReason})
|
|
}
|
|
|
|
if sslWarnFire {
|
|
if !inMaint {
|
|
e.triggerAlert(alertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", name, sslDays))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("SSL warning for '%s' suppressed (maintenance)", name))
|
|
}
|
|
}
|
|
|
|
if prev == models.StatusUp && next == models.StatusLate {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat overdue", name))
|
|
}
|
|
|
|
if !prev.IsBroken() && next.IsBroken() && next != models.StatusPending {
|
|
if inMaint {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' is DOWN (alerts suppressed — maintenance)", name))
|
|
} else {
|
|
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", name, rawStatus)
|
|
if errorReason != "" {
|
|
msg = fmt.Sprintf("Monitor '%s' is DOWN: %s", name, errorReason)
|
|
}
|
|
if typ == "push" {
|
|
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", name)
|
|
}
|
|
e.triggerAlert(alertID, "🚨 ALERT", msg)
|
|
}
|
|
}
|
|
if prev.IsBroken() && next == models.StatusUp {
|
|
downDur := ""
|
|
if !downSince.IsZero() {
|
|
downDur = fmt.Sprintf(" (was down %s)", fmtDurationShort(time.Since(downSince)))
|
|
}
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' recovered%s", name, downDur))
|
|
if !inMaint {
|
|
e.triggerAlert(alertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP%s", name, downDur))
|
|
}
|
|
}
|
|
if prev == models.StatusLate && next == models.StatusUp && !prev.IsBroken() {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' heartbeat arrived (was late)", name))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
|
if alertID <= 0 {
|
|
return
|
|
}
|
|
cfg, err := e.db.GetAlert(context.Background(), alertID)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load alert config %d: %v", alertID, err))
|
|
return
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider != nil {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if err := provider.Send(ctx, title, message); err != nil {
|
|
e.AddLog(fmt.Sprintf("Alert send failed (%s): %v", cfg.Name, err))
|
|
e.recordAlertResult(alertID, false, err.Error())
|
|
} else {
|
|
e.recordAlertResult(alertID, true, "")
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) {
|
|
e.alertHealthMu.Lock()
|
|
defer e.alertHealthMu.Unlock()
|
|
h := e.alertHealth[alertID]
|
|
h.LastSendAt = time.Now()
|
|
h.LastSendOK = ok
|
|
h.SendCount++
|
|
if ok {
|
|
h.LastError = ""
|
|
} else {
|
|
h.LastError = errMsg
|
|
h.FailCount++
|
|
}
|
|
e.alertHealth[alertID] = h
|
|
|
|
// Persist so health survives restarts; DB IO off the alert path.
|
|
e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{
|
|
AlertID: alertID,
|
|
LastSendAt: h.LastSendAt,
|
|
LastSendOK: h.LastSendOK,
|
|
LastError: h.LastError,
|
|
SendCount: h.SendCount,
|
|
FailCount: h.FailCount,
|
|
}})
|
|
}
|
|
|
|
func (e *Engine) GetAlertHealth(alertID int) AlertHealth {
|
|
e.alertHealthMu.RLock()
|
|
defer e.alertHealthMu.RUnlock()
|
|
return e.alertHealth[alertID]
|
|
}
|
|
|
|
func (e *Engine) TestAlert(alertID int) error {
|
|
cfg, err := e.db.GetAlert(context.Background(), alertID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load alert: %w", err)
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider == nil {
|
|
return fmt.Errorf("no provider for type %q", cfg.Type)
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
err = provider.Send(ctx, "🧪 Test Alert", fmt.Sprintf("Test notification from uptop for channel '%s'.", cfg.Name))
|
|
if err != nil {
|
|
e.recordAlertResult(alertID, false, err.Error())
|
|
return err
|
|
}
|
|
e.recordAlertResult(alertID, true, "")
|
|
e.AddLog(fmt.Sprintf("Test alert sent to '%s'", cfg.Name))
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) isInMaintenance(monitorID int) bool {
|
|
e.maintCacheMu.RLock()
|
|
defer e.maintCacheMu.RUnlock()
|
|
return e.maintCache[monitorID]
|
|
}
|
|
|
|
func (e *Engine) refreshMaintenanceCache(ctx context.Context) {
|
|
windows, err := e.db.GetActiveMaintenanceWindows(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
directMaint := make(map[int]bool)
|
|
var globalMaint bool
|
|
for _, w := range windows {
|
|
if w.MonitorID == 0 {
|
|
globalMaint = true
|
|
} else {
|
|
directMaint[w.MonitorID] = true
|
|
}
|
|
}
|
|
|
|
resolved := make(map[int]bool)
|
|
e.mu.RLock()
|
|
for id, site := range e.liveState {
|
|
if globalMaint || directMaint[id] || (site.ParentID > 0 && directMaint[site.ParentID]) {
|
|
resolved[id] = true
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
e.maintCacheMu.Lock()
|
|
e.maintCache = resolved
|
|
e.maintCacheMu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) GetDisplayStatus(site models.Site) string {
|
|
if site.Paused {
|
|
return "PAUSED"
|
|
}
|
|
if e.isInMaintenance(site.ID) {
|
|
return "MAINT"
|
|
}
|
|
return string(site.Status)
|
|
}
|
|
|
|
func (e *Engine) checkGroup(_ context.Context, site models.Site) {
|
|
e.mu.RLock()
|
|
status := models.StatusUp
|
|
hasChildren := false
|
|
for _, child := range e.liveState {
|
|
if child.ParentID != site.ID || child.Type == "group" {
|
|
continue
|
|
}
|
|
hasChildren = true
|
|
if child.Paused || e.isInMaintenance(child.ID) {
|
|
continue
|
|
}
|
|
if child.Status == models.StatusDown || child.Status == models.StatusSSLExp {
|
|
status = models.StatusDown
|
|
} else if child.Status == models.StatusStale && status != models.StatusDown {
|
|
status = models.StatusStale
|
|
} else if child.Status == models.StatusLate && status != models.StatusDown && status != models.StatusStale {
|
|
status = models.StatusLate
|
|
} else if child.Status == models.StatusPending && status != models.StatusDown && status != models.StatusStale && status != models.StatusLate {
|
|
status = models.StatusPending
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if !hasChildren {
|
|
status = models.StatusPending
|
|
}
|
|
|
|
e.applyState(site.ID, func(s *models.Site) {
|
|
s.Status = status
|
|
})
|
|
e.recordCheck(site.ID, 0, !status.IsBroken())
|
|
}
|
|
|
|
func (e *Engine) EnqueueProbeCheck(siteID int, nodeID string, latencyNs int64, isUp bool) {
|
|
e.enqueueWrite(writeProbeCheck{siteID: siteID, nodeID: nodeID, latencyNs: latencyNs, isUp: isUp})
|
|
}
|
|
|
|
func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
|
|
e.aggStrategy = strategy
|
|
}
|
|
|
|
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool, errorReason string) {
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[siteID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
staleAfter := time.Duration(site.Interval) * time.Second * 3
|
|
if staleAfter < time.Minute {
|
|
staleAfter = time.Minute
|
|
}
|
|
|
|
now := time.Now()
|
|
e.probeResultsMu.Lock()
|
|
if e.probeResults[siteID] == nil {
|
|
e.probeResults[siteID] = make(map[string]NodeResult)
|
|
}
|
|
e.probeResults[siteID][nodeID] = NodeResult{
|
|
NodeID: nodeID,
|
|
IsUp: isUp,
|
|
LatencyNs: latencyNs,
|
|
CheckedAt: now,
|
|
ErrorReason: errorReason,
|
|
}
|
|
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
|
|
for id, r := range e.probeResults[siteID] {
|
|
if now.Sub(r.CheckedAt) > staleAfter {
|
|
delete(e.probeResults[siteID], id)
|
|
continue
|
|
}
|
|
results = append(results, r)
|
|
}
|
|
e.probeResultsMu.Unlock()
|
|
|
|
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
|
|
|
|
probeStatus := models.StatusUp
|
|
if !aggUp {
|
|
probeStatus = models.StatusDown
|
|
}
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = time.Duration(avgLatency)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, string(probeStatus), 0, time.Duration(avgLatency), errorReason)
|
|
}
|
|
|
|
func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
|
|
e.probeResultsMu.RLock()
|
|
defer e.probeResultsMu.RUnlock()
|
|
src := e.probeResults[siteID]
|
|
cp := make(map[string]NodeResult, len(src))
|
|
for k, v := range src {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) GetStateChanges(siteID int, limit int) []models.StateChange {
|
|
changes, err := e.db.GetStateChanges(context.Background(), siteID, limit)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return changes
|
|
}
|
|
|
|
func (e *Engine) GetStateChangesSince(siteID int, since time.Time) []models.StateChange {
|
|
changes, err := e.db.GetStateChangesSince(context.Background(), siteID, since)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return changes
|
|
}
|