diff --git a/cmd/uptop/main.go b/cmd/uptop/main.go index da7302a..a1ccd0b 100644 --- a/cmd/uptop/main.go +++ b/cmd/uptop/main.go @@ -431,6 +431,10 @@ func runServe(args []string) { } cancel() + // Drain pending DB writes before the deferred ss.Close() runs, so no + // write races a closed database. + eng.Stop() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() if httpSrv != nil { diff --git a/internal/cluster/cluster_test.go b/internal/cluster/cluster_test.go index 58ced9b..0a702e5 100644 --- a/internal/cluster/cluster_test.go +++ b/internal/cluster/cluster_test.go @@ -58,6 +58,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveLog(string) error { return nil } +func (m *mockStore) PruneLogs() error { return nil } +func (m *mockStore) PruneCheckHistory() error { return nil } +func (m *mockStore) PruneStateChanges() error { return nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) { return nil, nil diff --git a/internal/metrics/prometheus_test.go b/internal/metrics/prometheus_test.go index 0a857c7..77cbbd8 100644 --- a/internal/metrics/prometheus_test.go +++ b/internal/metrics/prometheus_test.go @@ -56,6 +56,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveLog(string) error { return nil } +func (m *mockStore) PruneLogs() error { return nil } +func (m *mockStore) PruneCheckHistory() error { return nil } +func (m *mockStore) PruneStateChanges() error { return nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) { return nil, nil diff --git a/internal/monitor/dbwriter.go b/internal/monitor/dbwriter.go new file mode 100644 index 0000000..87e8c68 --- /dev/null +++ b/internal/monitor/dbwriter.go @@ -0,0 +1,46 @@ +package monitor + +import ( + "gitea.lerkolabs.com/lerkolabs/uptop/internal/models" + "gitea.lerkolabs.com/lerkolabs/uptop/internal/store" +) + +// dbWrite is a single unit of deferred persistence. The engine enqueues these +// onto a buffered channel; a single writer goroutine drains and executes them, +// serializing all writes through one connection and surfacing errors instead of +// discarding them. desc names the write for diagnostics on drop/failure. +type dbWrite interface { + exec(s store.Store) error + desc() string +} + +type writeLog struct{ message string } + +func (w writeLog) exec(s store.Store) error { return s.SaveLog(w.message) } +func (w writeLog) desc() string { return "log" } + +type writeCheck struct { + siteID int + latencyNs int64 + isUp bool +} + +func (w writeCheck) exec(s store.Store) error { return s.SaveCheck(w.siteID, w.latencyNs, w.isUp) } +func (w writeCheck) desc() string { return "check" } + +type writeStateChange struct { + siteID int + fromStatus string + toStatus string + reason string +} + +func (w writeStateChange) exec(s store.Store) error { + return s.SaveStateChange(w.siteID, w.fromStatus, w.toStatus, w.reason) +} +func (w writeStateChange) desc() string { return "state-change" } + +type writeAlertHealth struct{ rec models.AlertHealthRecord } + +func (w writeAlertHealth) exec(s store.Store) error { return s.SaveAlertHealth(w.rec) } +func (w writeAlertHealth) desc() string { return "alert-health" } diff --git a/internal/monitor/history.go b/internal/monitor/history.go index 69a7f4d..ec0e294 100644 --- a/internal/monitor/history.go +++ b/internal/monitor/history.go @@ -61,7 +61,7 @@ func (e *Engine) recordCheck(siteID int, latency time.Duration, isUp bool) { h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:] } - go func() { _ = e.db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }() + e.enqueueWrite(writeCheck{siteID: siteID, latencyNs: latency.Nanoseconds(), isUp: isUp}) } func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) { diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 6a04a2f..92a2ddb 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -23,6 +23,8 @@ const ( minPushGrace = 60 * time.Second maintPruneInterval = 15 * time.Minute defaultMaintRetention = 7 * 24 * time.Hour + dbWriteBuffer = 4096 + dbPruneInterval = 10 * time.Minute ) type AlertHealth struct { @@ -64,6 +66,11 @@ type Engine struct { maintRetention time.Duration strictClient *http.Client insecureClient *http.Client + + dbWrites chan dbWrite + writerWG sync.WaitGroup + cancel context.CancelFunc + stopOnce sync.Once } func NewEngine(s store.Store) *Engine { @@ -87,6 +94,7 @@ func newEngine(s store.Store, allowPrivateTargets bool) *Engine { isActive: true, allowPrivateTargets: allowPrivateTargets, maintRetention: defaultMaintRetention, + dbWrites: make(chan dbWrite, dbWriteBuffer), db: s, strictClient: &http.Client{ Transport: &http.Transport{ @@ -133,16 +141,98 @@ func fmtDurationShort(d time.Duration) string { return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24) } -func (e *Engine) AddLog(msg string) { - e.logMu.Lock() - defer e.logMu.Unlock() +// appendLog adds a timestamped entry to the in-memory ring buffer and returns +// it. It never touches the database, so it is safe to call from the db-write +// drop/error path without recursing back through the write queue. +func (e *Engine) appendLog(msg string) string { ts := time.Now().Format("15:04:05") entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg)) + e.logMu.Lock() e.logStore = append([]string{entry}, e.logStore...) if len(e.logStore) > maxLogEntries { e.logStore = e.logStore[:maxLogEntries] } - go func() { _ = e.db.SaveLog(entry) }() + e.logMu.Unlock() + return entry +} + +func (e *Engine) AddLog(msg string) { + entry := e.appendLog(msg) + e.enqueueWrite(writeLog{message: entry}) +} + +// enqueueWrite hands a persistence task to the writer goroutine without +// blocking the caller. If the queue is saturated the write is dropped and noted +// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog). +func (e *Engine) enqueueWrite(w dbWrite) { + select { + case e.dbWrites <- w: + default: + e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc())) + } +} + +// dbWriter is the single goroutine that owns all writes. Serializing writes +// through one path removes the fire-and-forget goroutine pile-up, surfaces +// errors, and lets retention run on a timer instead of per-insert. It drains +// any buffered writes on shutdown before returning. +func (e *Engine) dbWriter(ctx context.Context) { + defer e.writerWG.Done() + + pruneTicker := time.NewTicker(dbPruneInterval) + defer pruneTicker.Stop() + e.prune() + + for { + select { + case w := <-e.dbWrites: + if err := w.exec(e.db); err != nil { + e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err)) + } + case <-pruneTicker.C: + e.prune() + case <-ctx.Done(): + e.drainWrites() + return + } + } +} + +// drainWrites flushes everything still buffered, best-effort, at shutdown. +func (e *Engine) drainWrites() { + for { + select { + case w := <-e.dbWrites: + if err := w.exec(e.db); err != nil { + e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err)) + } + default: + return + } + } +} + +func (e *Engine) prune() { + if err := e.db.PruneLogs(); err != nil { + e.appendLog(fmt.Sprintf("log prune failed: %v", err)) + } + if err := e.db.PruneCheckHistory(); err != nil { + e.appendLog(fmt.Sprintf("check-history prune failed: %v", err)) + } + if err := e.db.PruneStateChanges(); err != nil { + e.appendLog(fmt.Sprintf("state-change prune failed: %v", err)) + } +} + +// Stop signals the writer goroutine to drain and exit, then blocks until it +// has. Call it before closing the store so no write races a closed DB. +func (e *Engine) Stop() { + e.stopOnce.Do(func() { + if e.cancel != nil { + e.cancel() + } + e.writerWG.Wait() + }) } func (e *Engine) InitLogs() { @@ -280,7 +370,7 @@ func (e *Engine) RecordHeartbeat(token string) bool { } if prevStatus != "UP" && prevStatus != "PENDING" { - go func() { _ = e.db.SaveStateChange(targetID, prevStatus, "UP", "") }() + e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: prevStatus, toStatus: "UP"}) } return true @@ -302,6 +392,11 @@ func (e *Engine) removeFromTokenIndex(id int) { } func (e *Engine) Start(ctx context.Context) { + ctx, e.cancel = context.WithCancel(ctx) + + e.writerWG.Add(1) + go e.dbWriter(ctx) + go func() { for { select { @@ -708,7 +803,7 @@ func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int } if changed && prev != "PENDING" { - go func() { _ = e.db.SaveStateChange(snap.ID, prev, next, errorReason) }() + e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: prev, toStatus: next, reason: errorReason}) } if sslWarnFire { @@ -790,17 +885,15 @@ func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) { } e.alertHealth[alertID] = h - // Persist best-effort so health survives restarts; DB IO off the alert path. - go func(rec models.AlertHealthRecord) { - _ = e.db.SaveAlertHealth(rec) - }(models.AlertHealthRecord{ + // Persist so health survives restarts; DB IO off the alert path. + e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{ AlertID: alertID, LastSendAt: h.LastSendAt, LastSendOK: h.LastSendOK, LastError: h.LastError, SendCount: h.SendCount, FailCount: h.FailCount, - }) + }}) } func (e *Engine) GetAlertHealth(alertID int) AlertHealth { diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index 39d2afd..630da58 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -1,6 +1,7 @@ package monitor import ( + "context" "fmt" "sync" "testing" @@ -145,6 +146,10 @@ func (m *mockStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, err return m.history, nil } +func (m *mockStore) PruneLogs() error { return nil } +func (m *mockStore) PruneCheckHistory() error { return nil } +func (m *mockStore) PruneStateChanges() error { return nil } + // --- Helpers --- func newTestEngine(ms *mockStore) *Engine { @@ -1167,6 +1172,51 @@ func TestHandleStatusChange_RemovedSiteDropped(t *testing.T) { } } +// --- Group 11: single DB writer --- + +// Writes enqueued through the engine are persisted by the writer goroutine and +// fully drained when the engine stops — no fire-and-forget, no lost writes. +func TestDBWriter_DrainsOnStop(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + e.Start(context.Background()) + + e.enqueueWrite(writeCheck{siteID: 7, latencyNs: 100, isUp: true}) + e.enqueueWrite(writeLog{message: "drain-me"}) + + e.Stop() // blocks until the writer has drained the queue + + ms.mu.Lock() + defer ms.mu.Unlock() + gotCheck := false + for _, c := range ms.savedChecks { + if c.SiteID == 7 { + gotCheck = true + } + } + if !gotCheck { + t.Error("check was not persisted before Stop returned") + } + gotLog := false + for _, l := range ms.savedLogs { + if l == "drain-me" { + gotLog = true + } + } + if !gotLog { + t.Error("log was not persisted before Stop returned") + } +} + +// Stop must be idempotent — safe to call more than once. +func TestEngineStop_Idempotent(t *testing.T) { + ms := newMockStore() + e := newTestEngine(ms) + e.Start(context.Background()) + e.Stop() + e.Stop() // must not panic or block +} + // --- Utilities --- func containsStr(s, substr string) bool { diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 77b60b4..a2805ed 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -70,6 +70,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveLog(string) error { return nil } +func (m *mockStore) PruneLogs() error { return nil } +func (m *mockStore) PruneCheckHistory() error { return nil } +func (m *mockStore) PruneStateChanges() error { return nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) GetAllMaintenanceWindows(int) ([]models.MaintenanceWindow, error) { return nil, nil diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index ee2d65e..c8afe87 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -2,6 +2,7 @@ package store import ( "database/sql" + "fmt" "log" _ "github.com/mattn/go-sqlite3" @@ -10,13 +11,20 @@ import ( type SQLiteDialect struct{} func NewSQLiteStore(path string) (*SQLStore, error) { - s, err := NewSQLStore("sqlite3", path, &SQLiteDialect{}) + // Apply pragmas via the DSN so every pooled connection gets them — a + // post-open PRAGMA Exec only affects a single connection. WAL allows + // concurrent readers alongside the single writer goroutine; busy_timeout + // rides out brief lock contention; synchronous=NORMAL is durable under WAL + // and far faster than the FULL default. (:memory: is left untouched — + // these pragmas are no-ops or harmful for the in-memory test DB.) + dsn := path + if path != ":memory:" { + dsn = fmt.Sprintf("file:%s?_journal_mode=WAL&_busy_timeout=5000&_synchronous=NORMAL", path) + } + s, err := NewSQLStore("sqlite3", dsn, &SQLiteDialect{}) if err != nil { return nil, err } - if _, err := s.db.Exec("PRAGMA journal_mode=WAL"); err != nil { - log.Printf("WAL mode failed: %v", err) - } return s, nil } diff --git a/internal/store/sqlstore.go b/internal/store/sqlstore.go index fb9f27d..f5e7af8 100644 --- a/internal/store/sqlstore.go +++ b/internal/store/sqlstore.go @@ -13,10 +13,11 @@ import ( ) const ( - maxCheckHistory = 1000 - checkHistoryPruneAt = 1100 - maxMaintenanceExport = 1000 - maxRequestBody = 1 << 20 + maxCheckHistory = 1000 + maxLogRows = 200 + maxStateChangesPerSite = 5000 + maxMaintenanceExport = 1000 + maxRequestBody = 1 << 20 ) type SQLStore struct { @@ -407,21 +408,39 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { return s.SaveCheckFromNode(siteID, "", latencyNs, isUp) } +// SaveCheckFromNode inserts a single check row. Retention is handled out of +// band by PruneCheckHistory on a timer, not per-insert, to keep the write hot +// path a plain INSERT. func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error { _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp) - if err != nil { - return err - } - var count int - _ = s.db.QueryRow(s.q("SELECT COUNT(*) FROM check_history WHERE site_id = ?"), siteID).Scan(&count) - if count > checkHistoryPruneAt { - pruneQuery := fmt.Sprintf(`DELETE FROM check_history WHERE site_id = ? AND id NOT IN ( - SELECT id FROM check_history WHERE site_id = ? ORDER BY checked_at DESC LIMIT %d - )`, maxCheckHistory) - _, err = s.db.Exec(s.q(pruneQuery), siteID, siteID) - return err - } - return nil + return err +} + +// PruneCheckHistory trims check_history to the newest maxCheckHistory rows per +// site, across all sites, in one pass. Intended to run periodically. +func (s *SQLStore) PruneCheckHistory() error { + q := fmt.Sprintf(`DELETE FROM check_history WHERE id IN ( + SELECT id FROM ( + SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC, id DESC) AS rn + FROM check_history + ) ranked WHERE rn > %d + )`, maxCheckHistory) + _, err := s.db.Exec(s.q(q)) + return err +} + +// PruneStateChanges trims state_changes to the newest maxStateChangesPerSite +// rows per site. Generous so realistic SLA windows are unaffected; bounds the +// otherwise unbounded growth of a flapping monitor's history. +func (s *SQLStore) PruneStateChanges() error { + q := fmt.Sprintf(`DELETE FROM state_changes WHERE id IN ( + SELECT id FROM ( + SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY changed_at DESC, id DESC) AS rn + FROM state_changes + ) ranked WHERE rn > %d + )`, maxStateChangesPerSite) + _, err := s.db.Exec(s.q(q)) + return err } func (s *SQLStore) RegisterNode(node models.ProbeNode) error { @@ -494,14 +513,20 @@ func (s *SQLStore) SaveAlertHealth(h models.AlertHealthRecord) error { return err } +// SaveLog inserts a single log row. Retention is handled by PruneLogs on a +// timer, not per-insert. func (s *SQLStore) SaveLog(message string) error { _, err := s.db.Exec(s.q("INSERT INTO logs (message) VALUES (?)"), message) - if err != nil { - return err - } - _, err = s.db.Exec(s.q(`DELETE FROM logs WHERE id NOT IN ( - SELECT id FROM logs ORDER BY created_at DESC LIMIT 200 - )`)) + return err +} + +// PruneLogs trims the logs table to the newest maxLogRows rows. The id DESC +// tiebreak keeps ordering deterministic when rows share a created_at second. +func (s *SQLStore) PruneLogs() error { + q := fmt.Sprintf(`DELETE FROM logs WHERE id NOT IN ( + SELECT id FROM logs ORDER BY created_at DESC, id DESC LIMIT %d + )`, maxLogRows) + _, err := s.db.Exec(s.q(q)) return err } diff --git a/internal/store/sqlstore_test.go b/internal/store/sqlstore_test.go index 9c124be..d041b2d 100644 --- a/internal/store/sqlstore_test.go +++ b/internal/store/sqlstore_test.go @@ -1,6 +1,7 @@ package store import ( + "fmt" "testing" "time" @@ -316,6 +317,69 @@ func TestDeleteSiteCascade(t *testing.T) { } } +func TestPruneLogs(t *testing.T) { + s := newTestStore(t) + + for i := 0; i < maxLogRows+50; i++ { + if err := s.SaveLog(fmt.Sprintf("log %d", i)); err != nil { + t.Fatalf("SaveLog: %v", err) + } + } + if err := s.PruneLogs(); err != nil { + t.Fatalf("PruneLogs: %v", err) + } + + logs, err := s.LoadLogs(maxLogRows * 2) + if err != nil { + t.Fatalf("LoadLogs: %v", err) + } + if len(logs) != maxLogRows { + t.Errorf("expected %d logs after prune, got %d", maxLogRows, len(logs)) + } + // Newest must survive; oldest must be gone (membership, not position — + // LoadLogs ordering ties when rows share a created_at second). + present := make(map[string]bool, len(logs)) + for _, l := range logs { + present[l] = true + } + if !present[fmt.Sprintf("log %d", maxLogRows+50-1)] { + t.Error("newest log was pruned") + } + if present["log 0"] { + t.Error("oldest log survived prune") + } +} + +func TestPruneCheckHistory(t *testing.T) { + s := newTestStore(t) + + for i := 0; i < maxCheckHistory+5; i++ { + if err := s.SaveCheck(1, int64(i), true); err != nil { + t.Fatalf("SaveCheck site 1: %v", err) + } + } + for i := 0; i < 3; i++ { + if err := s.SaveCheck(2, int64(i), true); err != nil { + t.Fatalf("SaveCheck site 2: %v", err) + } + } + + if err := s.PruneCheckHistory(); err != nil { + t.Fatalf("PruneCheckHistory: %v", err) + } + + history, err := s.LoadAllHistory(maxCheckHistory * 2) + if err != nil { + t.Fatalf("LoadAllHistory: %v", err) + } + if len(history[1]) != maxCheckHistory { + t.Errorf("site 1: expected %d rows after prune, got %d", maxCheckHistory, len(history[1])) + } + if len(history[2]) != 3 { + t.Errorf("site 2: expected 3 rows untouched, got %d", len(history[2])) + } +} + func TestPruneExpiredMaintenanceWindows(t *testing.T) { s := newTestStore(t) diff --git a/internal/store/store.go b/internal/store/store.go index d1242b5..6d1a29d 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -39,11 +39,13 @@ type Store interface { SaveCheck(siteID int, latencyNs int64, isUp bool) error SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) + PruneCheckHistory() error // State Changes SaveStateChange(siteID int, fromStatus, toStatus, errorReason string) error GetStateChanges(siteID int, limit int) ([]models.StateChange, error) GetStateChangesSince(siteID int, since time.Time) ([]models.StateChange, error) + PruneStateChanges() error // Nodes RegisterNode(node models.ProbeNode) error @@ -59,6 +61,7 @@ type Store interface { // Logs SaveLog(message string) error LoadLogs(limit int) ([]string, error) + PruneLogs() error // Maintenance Windows GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error)