diff --git a/internal/monitor/dbwriter.go b/internal/monitor/dbwriter.go index 87e8c68..ebdbb89 100644 --- a/internal/monitor/dbwriter.go +++ b/internal/monitor/dbwriter.go @@ -44,3 +44,15 @@ type writeAlertHealth struct{ rec models.AlertHealthRecord } func (w writeAlertHealth) exec(s store.Store) error { return s.SaveAlertHealth(w.rec) } func (w writeAlertHealth) desc() string { return "alert-health" } + +type writeProbeCheck struct { + siteID int + nodeID string + latencyNs int64 + isUp bool +} + +func (w writeProbeCheck) exec(s store.Store) error { + return s.SaveCheckFromNode(w.siteID, w.nodeID, w.latencyNs, w.isUp) +} +func (w writeProbeCheck) desc() string { return "probe-check" } diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index d4de80c..0dea01b 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -60,6 +60,9 @@ type Engine struct { recheckMu sync.RWMutex recheck map[int]chan struct{} + maintCacheMu sync.RWMutex + maintCache map[int]bool + db store.Store insecureSkipVerify bool allowPrivateTargets bool @@ -67,10 +70,11 @@ type Engine struct { strictClient *http.Client insecureClient *http.Client - dbWrites chan dbWrite - writerWG sync.WaitGroup - cancel context.CancelFunc - stopOnce sync.Once + dbWrites chan dbWrite + writerWG sync.WaitGroup + checkerWG sync.WaitGroup + cancel context.CancelFunc + stopOnce sync.Once } func NewEngine(s store.Store) *Engine { @@ -231,7 +235,9 @@ func (e *Engine) Stop() { if e.cancel != nil { e.cancel() } + e.checkerWG.Wait() e.writerWG.Wait() + e.drainWrites() }) } @@ -408,6 +414,8 @@ func (e *Engine) Start(ctx context.Context) { default: } + e.refreshMaintenanceCache() + sites, err := e.db.GetSites() if err != nil { e.AddLog(fmt.Sprintf("Failed to load sites: %v", err)) @@ -438,7 +446,11 @@ func (e *Engine) Start(ctx context.Context) { e.liveState[s.ID] = s e.addToTokenIndex(s) e.mu.Unlock() - go e.monitorRoutine(ctx, s.ID) + e.checkerWG.Add(1) + go func(id int) { + defer e.checkerWG.Done() + e.monitorRoutine(ctx, id) + }(s.ID) } } @@ -529,6 +541,10 @@ func (e *Engine) RemoveSite(id int) { e.mu.Unlock() e.removeHistory(id) + e.probeResultsMu.Lock() + delete(e.probeResults, id) + e.probeResultsMu.Unlock() + e.recheckMu.Lock() delete(e.recheck, id) e.recheckMu.Unlock() @@ -748,22 +764,22 @@ func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int } // Status + failure-count transition, based on the CURRENT live status. - switch { - case prev == "UP" && rawStatus != "UP": - s.FailureCount++ + if rawStatus == "UP" { + s.FailureCount = 0 + s.Status = "UP" + } else { + if s.FailureCount <= s.MaxRetries { + s.FailureCount++ + } if s.FailureCount > s.MaxRetries { + if s.Status != rawStatus { + confirmedDown = true + } s.Status = rawStatus s.FailureCount = s.MaxRetries + 1 - confirmedDown = true } else { failedCheck = true } - case rawStatus == "UP": - s.FailureCount = 0 - s.Status = "UP" - default: - s.Status = rawStatus - s.FailureCount = s.MaxRetries + 1 } failCount = s.FailureCount @@ -927,11 +943,39 @@ func (e *Engine) TestAlert(alertID int) error { } func (e *Engine) isInMaintenance(monitorID int) bool { - inMaint, err := e.db.IsMonitorInMaintenance(monitorID) + e.maintCacheMu.RLock() + defer e.maintCacheMu.RUnlock() + return e.maintCache[monitorID] +} + +func (e *Engine) refreshMaintenanceCache() { + windows, err := e.db.GetActiveMaintenanceWindows() if err != nil { - return false + return } - return inMaint + + directMaint := make(map[int]bool) + var globalMaint bool + for _, w := range windows { + if w.MonitorID == 0 { + globalMaint = true + } else { + directMaint[w.MonitorID] = true + } + } + + resolved := make(map[int]bool) + e.mu.RLock() + for id, site := range e.liveState { + if globalMaint || directMaint[id] || (site.ParentID > 0 && directMaint[site.ParentID]) { + resolved[id] = true + } + } + e.mu.RUnlock() + + e.maintCacheMu.Lock() + e.maintCache = resolved + e.maintCacheMu.Unlock() } func (e *Engine) GetDisplayStatus(site models.Site) string { @@ -948,15 +992,11 @@ func (e *Engine) checkGroup(site models.Site) { e.mu.RLock() status := "UP" hasChildren := false - allPaused := true for _, child := range e.liveState { if child.ParentID != site.ID || child.Type == "group" { continue } hasChildren = true - if !child.Paused { - allPaused = false - } if child.Paused || e.isInMaintenance(child.ID) { continue } @@ -978,17 +1018,31 @@ func (e *Engine) checkGroup(site models.Site) { e.applyState(site.ID, func(s *models.Site) { s.Status = status - if hasChildren && allPaused { - s.Paused = true - } }) } +func (e *Engine) EnqueueProbeCheck(siteID int, nodeID string, latencyNs int64, isUp bool) { + e.enqueueWrite(writeProbeCheck{siteID: siteID, nodeID: nodeID, latencyNs: latencyNs, isUp: isUp}) +} + func (e *Engine) SetAggStrategy(strategy AggregationStrategy) { e.aggStrategy = strategy } func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool, errorReason string) { + e.mu.RLock() + site, exists := e.liveState[siteID] + e.mu.RUnlock() + if !exists { + return + } + + staleAfter := time.Duration(site.Interval) * time.Second * 3 + if staleAfter < time.Minute { + staleAfter = time.Minute + } + + now := time.Now() e.probeResultsMu.Lock() if e.probeResults[siteID] == nil { e.probeResults[siteID] = make(map[string]NodeResult) @@ -997,24 +1051,21 @@ func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, i NodeID: nodeID, IsUp: isUp, LatencyNs: latencyNs, - CheckedAt: time.Now(), + CheckedAt: now, ErrorReason: errorReason, } results := make([]NodeResult, 0, len(e.probeResults[siteID])) - for _, r := range e.probeResults[siteID] { + for id, r := range e.probeResults[siteID] { + if now.Sub(r.CheckedAt) > staleAfter { + delete(e.probeResults[siteID], id) + continue + } results = append(results, r) } e.probeResultsMu.Unlock() aggUp, avgLatency := AggregateStatus(results, e.aggStrategy) - e.mu.RLock() - site, exists := e.liveState[siteID] - e.mu.RUnlock() - if !exists { - return - } - rawStatus := "UP" if !aggUp { rawStatus = "DOWN" diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index 630da58..1a8d35d 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -69,7 +69,13 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) { - return nil, nil + m.mu.Lock() + defer m.mu.Unlock() + var windows []models.MaintenanceWindow + for id := range m.maintenance { + windows = append(windows, models.MaintenanceWindow{MonitorID: id}) + } + return windows, nil } func (m *mockStore) GetAllMaintenanceWindows(int) ([]models.MaintenanceWindow, error) { return nil, nil @@ -330,6 +336,7 @@ func TestHandleStatusChange_AlertSuppressedMaintenance(t *testing.T) { e := newTestEngine(ms) site := models.Site{ID: 1, Name: "test", Status: "UP", MaxRetries: 0, AlertID: 1} injectSite(e, site) + e.refreshMaintenanceCache() e.handleStatusChange(site, "DOWN", 0, 0, "test error") @@ -361,6 +368,7 @@ func TestHandleStatusChange_RecoverySuppressedMaintenance(t *testing.T) { e := newTestEngine(ms) site := models.Site{ID: 1, Name: "test", Status: "DOWN", AlertID: 1} injectSite(e, site) + e.refreshMaintenanceCache() e.handleStatusChange(site, "UP", 200, 0, "") @@ -448,6 +456,7 @@ func TestHandleStatusChange_SSLWarningSuppressedMaint(t *testing.T) { CertExpiry: time.Now().Add(15 * 24 * time.Hour), } injectSite(e, site) + e.refreshMaintenanceCache() e.handleStatusChange(site, "UP", 200, 0, "") @@ -700,6 +709,7 @@ func TestCheckGroup_MaintenanceChildIgnored(t *testing.T) { injectSite(e, group) injectSite(e, child1) injectSite(e, child2) + e.refreshMaintenanceCache() e.checkGroup(group) @@ -1217,6 +1227,167 @@ func TestEngineStop_Idempotent(t *testing.T) { e.Stop() // must not panic or block } +// --- Group 12: Phase 3 engine correctness --- + +// Groups must not auto-pause when all children are paused — that creates a +// one-way trap because monitorRoutine skips paused sites. +func TestCheckGroup_AllPausedNoAutoFreeze(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + group := models.Site{ID: 1, Name: "group", Type: "group", Status: "UP"} + child1 := models.Site{ID: 2, Name: "child1", Type: "http", ParentID: 1, Status: "UP", Paused: true} + child2 := models.Site{ID: 3, Name: "child2", Type: "http", ParentID: 1, Status: "UP", Paused: true} + injectSite(e, group) + injectSite(e, child1) + injectSite(e, child2) + + e.checkGroup(group) + + s, _ := getSite(e, 1) + if s.Paused { + t.Error("group must not auto-pause when all children are paused") + } +} + +// PENDING→DOWN must honor MaxRetries instead of alerting on first failure. +func TestHandleStatusChange_PendingRetriesBeforeDown(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + site := models.Site{ID: 1, Name: "new-monitor", Status: "PENDING", MaxRetries: 2} + injectSite(e, site) + + e.handleStatusChange(site, "DOWN", 0, 0, "timeout") + s, _ := getSite(e, 1) + if s.Status != "PENDING" { + t.Errorf("expected PENDING during retry, got %s", s.Status) + } + if s.FailureCount != 1 { + t.Errorf("expected FailureCount 1, got %d", s.FailureCount) + } + + e.handleStatusChange(s, "DOWN", 0, 0, "timeout") + s, _ = getSite(e, 1) + if s.Status != "PENDING" { + t.Errorf("expected PENDING during retry 2, got %s", s.Status) + } + + e.handleStatusChange(s, "DOWN", 0, 0, "timeout") + s, _ = getSite(e, 1) + if s.Status != "DOWN" { + t.Errorf("expected DOWN after retries exhausted, got %s", s.Status) + } +} + +// LATE→DOWN must also honor MaxRetries. +func TestHandleStatusChange_LateRetriesBeforeDown(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + site := models.Site{ID: 1, Name: "push-mon", Status: "LATE", MaxRetries: 1} + injectSite(e, site) + + e.handleStatusChange(site, "DOWN", 0, 0, "missed heartbeat") + s, _ := getSite(e, 1) + if s.Status != "LATE" { + t.Errorf("expected LATE during retry, got %s", s.Status) + } + + e.handleStatusChange(s, "DOWN", 0, 0, "missed heartbeat") + s, _ = getSite(e, 1) + if s.Status != "DOWN" { + t.Errorf("expected DOWN after retries exhausted, got %s", s.Status) + } +} + +// Dead probe results must be expired so they don't poison aggregation. +func TestIngestProbeResult_ExpiresStaleProbes(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + site := models.Site{ID: 1, Name: "test", Type: "http", Status: "UP", Interval: 30} + injectSite(e, site) + + e.probeResultsMu.Lock() + e.probeResults[1] = map[string]NodeResult{ + "dead-probe": { + NodeID: "dead-probe", + IsUp: false, + CheckedAt: time.Now().Add(-10 * time.Minute), + }, + } + e.probeResultsMu.Unlock() + + e.IngestProbeResult("live-probe", 1, 5000, true, "") + + e.probeResultsMu.RLock() + _, deadExists := e.probeResults[1]["dead-probe"] + _, liveExists := e.probeResults[1]["live-probe"] + e.probeResultsMu.RUnlock() + + if deadExists { + t.Error("stale probe result should have been expired") + } + if !liveExists { + t.Error("live probe result should still exist") + } +} + +// RemoveSite must clean up probeResults. +func TestRemoveSite_CleansProbeResults(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + site := models.Site{ID: 1, Name: "test", Type: "http", Status: "UP"} + injectSite(e, site) + + e.probeResultsMu.Lock() + e.probeResults[1] = map[string]NodeResult{ + "node-a": {NodeID: "node-a", IsUp: true, CheckedAt: time.Now()}, + } + e.probeResultsMu.Unlock() + + e.RemoveSite(1) + + e.probeResultsMu.RLock() + defer e.probeResultsMu.RUnlock() + if _, exists := e.probeResults[1]; exists { + t.Error("probe results should be cleaned up after RemoveSite") + } +} + +// Maintenance cache resolves parent relationships correctly. +func TestIsInMaintenance_UsesCache(t *testing.T) { + ms := newMockStore() + ms.maintenance[10] = true // direct maintenance on group + e := newTestEngine(ms) + group := models.Site{ID: 10, Name: "group", Type: "group", Status: "UP"} + child := models.Site{ID: 20, Name: "child", Type: "http", ParentID: 10, Status: "UP"} + injectSite(e, group) + injectSite(e, child) + e.refreshMaintenanceCache() + + if !e.isInMaintenance(10) { + t.Error("group should be in maintenance (direct)") + } + if !e.isInMaintenance(20) { + t.Error("child should be in maintenance (parent)") + } + if e.isInMaintenance(99) { + t.Error("unknown monitor should not be in maintenance") + } +} + +// Global maintenance (monitor_id=0) applies to all monitors. +func TestIsInMaintenance_GlobalMaintenance(t *testing.T) { + ms := newMockStore() + ms.maintenance[0] = true + e := newTestEngine(ms) + site := models.Site{ID: 1, Name: "test", Type: "http", Status: "UP"} + injectSite(e, site) + e.refreshMaintenanceCache() + + if !e.isInMaintenance(1) { + t.Error("all monitors should be in maintenance during global window") + } +} + // --- Utilities --- func containsStr(s, substr string) bool { diff --git a/internal/server/server.go b/internal/server/server.go index 0430a26..f791892 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -424,9 +424,7 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) *http.Server { return } for _, result := range req.Results { - if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil { - log.Printf("Failed to save probe result: %v", err) - } + eng.EnqueueProbeCheck(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp) eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp, result.ErrorReason) } if err := s.UpdateNodeLastSeen(req.NodeID); err != nil { diff --git a/internal/store/postgres.go b/internal/store/postgres.go index c6e896d..4b4a5b0 100644 --- a/internal/store/postgres.go +++ b/internal/store/postgres.go @@ -133,6 +133,15 @@ func (d *PostgresDialect) ImportWipe(tx *sql.Tx) { if _, err := tx.Exec("TRUNCATE TABLE maintenance_windows RESTART IDENTITY CASCADE"); err != nil { log.Printf("import wipe error: %v", err) } + if _, err := tx.Exec("TRUNCATE TABLE check_history RESTART IDENTITY CASCADE"); err != nil { + log.Printf("import wipe error: %v", err) + } + if _, err := tx.Exec("TRUNCATE TABLE state_changes RESTART IDENTITY CASCADE"); err != nil { + log.Printf("import wipe error: %v", err) + } + if _, err := tx.Exec("TRUNCATE TABLE alert_health RESTART IDENTITY CASCADE"); err != nil { + log.Printf("import wipe error: %v", err) + } } func (d *PostgresDialect) ImportResetSequences(tx *sql.Tx) { diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index ba181d4..ca294e5 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -168,6 +168,15 @@ func (d *SQLiteDialect) ImportWipe(tx *sql.Tx) { if _, err := tx.Exec("DELETE FROM sqlite_sequence WHERE name='maintenance_windows'"); err != nil { log.Printf("import wipe error: %v", err) } + if _, err := tx.Exec("DELETE FROM check_history"); err != nil { + log.Printf("import wipe error: %v", err) + } + if _, err := tx.Exec("DELETE FROM state_changes"); err != nil { + log.Printf("import wipe error: %v", err) + } + if _, err := tx.Exec("DELETE FROM alert_health"); err != nil { + log.Printf("import wipe error: %v", err) + } } func (d *SQLiteDialect) ImportResetSequences(tx *sql.Tx) {} diff --git a/internal/store/sqlstore_test.go b/internal/store/sqlstore_test.go index 172bc80..5876de3 100644 --- a/internal/store/sqlstore_test.go +++ b/internal/store/sqlstore_test.go @@ -235,6 +235,46 @@ func TestImportExport(t *testing.T) { } } +func TestImportData_WipesHistory(t *testing.T) { + s := newTestStore(t) + + if err := s.AddSite(models.Site{Name: "OldSite", URL: "https://old.com", Type: "http", Interval: 30}); err != nil { + t.Fatalf("AddSite: %v", err) + } + if err := s.SaveCheck(1, 5000, true); err != nil { + t.Fatalf("SaveCheck: %v", err) + } + if err := s.SaveStateChange(1, "UP", "DOWN", "timeout"); err != nil { + t.Fatalf("SaveStateChange: %v", err) + } + if err := s.SaveAlertHealth(models.AlertHealthRecord{AlertID: 1, LastSendOK: true, SendCount: 1}); err != nil { + t.Fatalf("SaveAlertHealth: %v", err) + } + + backup := models.Backup{ + Sites: []models.Site{{ID: 1, Name: "NewSite", URL: "https://new.com", Type: "http", Interval: 60}}, + } + if err := s.ImportData(backup); err != nil { + t.Fatalf("ImportData: %v", err) + } + + history, err := s.LoadAllHistory(100) + if err != nil { + t.Fatalf("LoadAllHistory: %v", err) + } + if len(history) != 0 { + t.Errorf("expected empty check_history after import, got %d sites with history", len(history)) + } + + changes, err := s.GetStateChanges(1, 100) + if err != nil { + t.Fatalf("GetStateChanges: %v", err) + } + if len(changes) != 0 { + t.Errorf("expected empty state_changes after import, got %d", len(changes)) + } +} + func TestCheckHistory(t *testing.T) { s := newTestStore(t)