fix(monitor): serialize DB writes through a single drained writer #99

Merged
lerko merged 1 commits from fix/db-writer into main 2026-06-10 22:28:31 +00:00
12 changed files with 344 additions and 39 deletions
+4
View File
@@ -431,6 +431,10 @@ func runServe(args []string) {
} }
cancel() cancel()
// Drain pending DB writes before the deferred ss.Close() runs, so no
// write races a closed database.
eng.Stop()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel() defer shutdownCancel()
if httpSrv != nil { if httpSrv != nil {
+3
View File
@@ -58,6 +58,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error)
} }
func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil }
func (m *mockStore) SaveLog(string) error { return nil } func (m *mockStore) SaveLog(string) error { return nil }
func (m *mockStore) PruneLogs() error { return nil }
func (m *mockStore) PruneCheckHistory() error { return nil }
func (m *mockStore) PruneStateChanges() error { return nil }
func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil }
func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) { func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) {
return nil, nil return nil, nil
+3
View File
@@ -56,6 +56,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error)
} }
func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil }
func (m *mockStore) SaveLog(string) error { return nil } func (m *mockStore) SaveLog(string) error { return nil }
func (m *mockStore) PruneLogs() error { return nil }
func (m *mockStore) PruneCheckHistory() error { return nil }
func (m *mockStore) PruneStateChanges() error { return nil }
func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil }
func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) { func (m *mockStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) {
return nil, nil return nil, nil
+46
View File
@@ -0,0 +1,46 @@
package monitor
import (
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store"
)
// dbWrite is a single unit of deferred persistence. The engine enqueues these
// onto a buffered channel; a single writer goroutine drains and executes them,
// serializing all writes through one connection and surfacing errors instead of
// discarding them. desc names the write for diagnostics on drop/failure.
type dbWrite interface {
exec(s store.Store) error
desc() string
}
type writeLog struct{ message string }
func (w writeLog) exec(s store.Store) error { return s.SaveLog(w.message) }
func (w writeLog) desc() string { return "log" }
type writeCheck struct {
siteID int
latencyNs int64
isUp bool
}
func (w writeCheck) exec(s store.Store) error { return s.SaveCheck(w.siteID, w.latencyNs, w.isUp) }
func (w writeCheck) desc() string { return "check" }
type writeStateChange struct {
siteID int
fromStatus string
toStatus string
reason string
}
func (w writeStateChange) exec(s store.Store) error {
return s.SaveStateChange(w.siteID, w.fromStatus, w.toStatus, w.reason)
}
func (w writeStateChange) desc() string { return "state-change" }
type writeAlertHealth struct{ rec models.AlertHealthRecord }
func (w writeAlertHealth) exec(s store.Store) error { return s.SaveAlertHealth(w.rec) }
func (w writeAlertHealth) desc() string { return "alert-health" }
+1 -1
View File
@@ -61,7 +61,7 @@ func (e *Engine) recordCheck(siteID int, latency time.Duration, isUp bool) {
h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:] h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:]
} }
go func() { _ = e.db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }() e.enqueueWrite(writeCheck{siteID: siteID, latencyNs: latency.Nanoseconds(), isUp: isUp})
} }
func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) { func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) {
+107 -11
View File
@@ -23,6 +23,8 @@ const (
minPushGrace = 60 * time.Second minPushGrace = 60 * time.Second
maintPruneInterval = 15 * time.Minute maintPruneInterval = 15 * time.Minute
defaultMaintRetention = 7 * 24 * time.Hour defaultMaintRetention = 7 * 24 * time.Hour
dbWriteBuffer = 4096
dbPruneInterval = 10 * time.Minute
) )
type AlertHealth struct { type AlertHealth struct {
@@ -64,6 +66,11 @@ type Engine struct {
maintRetention time.Duration maintRetention time.Duration
strictClient *http.Client strictClient *http.Client
insecureClient *http.Client insecureClient *http.Client
dbWrites chan dbWrite
writerWG sync.WaitGroup
cancel context.CancelFunc
stopOnce sync.Once
} }
func NewEngine(s store.Store) *Engine { func NewEngine(s store.Store) *Engine {
@@ -87,6 +94,7 @@ func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
isActive: true, isActive: true,
allowPrivateTargets: allowPrivateTargets, allowPrivateTargets: allowPrivateTargets,
maintRetention: defaultMaintRetention, maintRetention: defaultMaintRetention,
dbWrites: make(chan dbWrite, dbWriteBuffer),
db: s, db: s,
strictClient: &http.Client{ strictClient: &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
@@ -133,16 +141,98 @@ func fmtDurationShort(d time.Duration) string {
return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24) return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24)
} }
func (e *Engine) AddLog(msg string) { // appendLog adds a timestamped entry to the in-memory ring buffer and returns
e.logMu.Lock() // it. It never touches the database, so it is safe to call from the db-write
defer e.logMu.Unlock() // drop/error path without recursing back through the write queue.
func (e *Engine) appendLog(msg string) string {
ts := time.Now().Format("15:04:05") ts := time.Now().Format("15:04:05")
entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg)) entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg))
e.logMu.Lock()
e.logStore = append([]string{entry}, e.logStore...) e.logStore = append([]string{entry}, e.logStore...)
if len(e.logStore) > maxLogEntries { if len(e.logStore) > maxLogEntries {
e.logStore = e.logStore[:maxLogEntries] e.logStore = e.logStore[:maxLogEntries]
} }
go func() { _ = e.db.SaveLog(entry) }() e.logMu.Unlock()
return entry
}
func (e *Engine) AddLog(msg string) {
entry := e.appendLog(msg)
e.enqueueWrite(writeLog{message: entry})
}
// enqueueWrite hands a persistence task to the writer goroutine without
// blocking the caller. If the queue is saturated the write is dropped and noted
// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog).
func (e *Engine) enqueueWrite(w dbWrite) {
select {
case e.dbWrites <- w:
default:
e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc()))
}
}
// dbWriter is the single goroutine that owns all writes. Serializing writes
// through one path removes the fire-and-forget goroutine pile-up, surfaces
// errors, and lets retention run on a timer instead of per-insert. It drains
// any buffered writes on shutdown before returning.
func (e *Engine) dbWriter(ctx context.Context) {
defer e.writerWG.Done()
pruneTicker := time.NewTicker(dbPruneInterval)
defer pruneTicker.Stop()
e.prune()
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err))
}
case <-pruneTicker.C:
e.prune()
case <-ctx.Done():
e.drainWrites()
return
}
}
}
// drainWrites flushes everything still buffered, best-effort, at shutdown.
func (e *Engine) drainWrites() {
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err))
}
default:
return
}
}
}
func (e *Engine) prune() {
if err := e.db.PruneLogs(); err != nil {
e.appendLog(fmt.Sprintf("log prune failed: %v", err))
}
if err := e.db.PruneCheckHistory(); err != nil {
e.appendLog(fmt.Sprintf("check-history prune failed: %v", err))
}
if err := e.db.PruneStateChanges(); err != nil {
e.appendLog(fmt.Sprintf("state-change prune failed: %v", err))
}
}
// Stop signals the writer goroutine to drain and exit, then blocks until it
// has. Call it before closing the store so no write races a closed DB.
func (e *Engine) Stop() {
e.stopOnce.Do(func() {
if e.cancel != nil {
e.cancel()
}
e.writerWG.Wait()
})
} }
func (e *Engine) InitLogs() { func (e *Engine) InitLogs() {
@@ -280,7 +370,7 @@ func (e *Engine) RecordHeartbeat(token string) bool {
} }
if prevStatus != "UP" && prevStatus != "PENDING" { if prevStatus != "UP" && prevStatus != "PENDING" {
go func() { _ = e.db.SaveStateChange(targetID, prevStatus, "UP", "") }() e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: prevStatus, toStatus: "UP"})
} }
return true return true
@@ -302,6 +392,14 @@ func (e *Engine) removeFromTokenIndex(id int) {
} }
func (e *Engine) Start(ctx context.Context) { func (e *Engine) Start(ctx context.Context) {
// e.cancel is invoked by Stop() to drain and halt the writer; gosec can't
// trace the cross-method call, and cancelling the parent reaps this child
// regardless, so the leak it warns about can't occur.
ctx, e.cancel = context.WithCancel(ctx) //nolint:gosec // cancel is called in Stop()
e.writerWG.Add(1)
go e.dbWriter(ctx)
go func() { go func() {
for { for {
select { select {
@@ -708,7 +806,7 @@ func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int
} }
if changed && prev != "PENDING" { if changed && prev != "PENDING" {
go func() { _ = e.db.SaveStateChange(snap.ID, prev, next, errorReason) }() e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: prev, toStatus: next, reason: errorReason})
} }
if sslWarnFire { if sslWarnFire {
@@ -790,17 +888,15 @@ func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) {
} }
e.alertHealth[alertID] = h e.alertHealth[alertID] = h
// Persist best-effort so health survives restarts; DB IO off the alert path. // Persist so health survives restarts; DB IO off the alert path.
go func(rec models.AlertHealthRecord) { e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{
_ = e.db.SaveAlertHealth(rec)
}(models.AlertHealthRecord{
AlertID: alertID, AlertID: alertID,
LastSendAt: h.LastSendAt, LastSendAt: h.LastSendAt,
LastSendOK: h.LastSendOK, LastSendOK: h.LastSendOK,
LastError: h.LastError, LastError: h.LastError,
SendCount: h.SendCount, SendCount: h.SendCount,
FailCount: h.FailCount, FailCount: h.FailCount,
}) }})
} }
func (e *Engine) GetAlertHealth(alertID int) AlertHealth { func (e *Engine) GetAlertHealth(alertID int) AlertHealth {
+50
View File
@@ -1,6 +1,7 @@
package monitor package monitor
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
@@ -145,6 +146,10 @@ func (m *mockStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, err
return m.history, nil return m.history, nil
} }
func (m *mockStore) PruneLogs() error { return nil }
func (m *mockStore) PruneCheckHistory() error { return nil }
func (m *mockStore) PruneStateChanges() error { return nil }
// --- Helpers --- // --- Helpers ---
func newTestEngine(ms *mockStore) *Engine { func newTestEngine(ms *mockStore) *Engine {
@@ -1167,6 +1172,51 @@ func TestHandleStatusChange_RemovedSiteDropped(t *testing.T) {
} }
} }
// --- Group 11: single DB writer ---
// Writes enqueued through the engine are persisted by the writer goroutine and
// fully drained when the engine stops — no fire-and-forget, no lost writes.
func TestDBWriter_DrainsOnStop(t *testing.T) {
ms := newMockStore()
e := newTestEngine(ms)
e.Start(context.Background())
e.enqueueWrite(writeCheck{siteID: 7, latencyNs: 100, isUp: true})
e.enqueueWrite(writeLog{message: "drain-me"})
e.Stop() // blocks until the writer has drained the queue
ms.mu.Lock()
defer ms.mu.Unlock()
gotCheck := false
for _, c := range ms.savedChecks {
if c.SiteID == 7 {
gotCheck = true
}
}
if !gotCheck {
t.Error("check was not persisted before Stop returned")
}
gotLog := false
for _, l := range ms.savedLogs {
if l == "drain-me" {
gotLog = true
}
}
if !gotLog {
t.Error("log was not persisted before Stop returned")
}
}
// Stop must be idempotent — safe to call more than once.
func TestEngineStop_Idempotent(t *testing.T) {
ms := newMockStore()
e := newTestEngine(ms)
e.Start(context.Background())
e.Stop()
e.Stop() // must not panic or block
}
// --- Utilities --- // --- Utilities ---
func containsStr(s, substr string) bool { func containsStr(s, substr string) bool {
+3
View File
@@ -70,6 +70,9 @@ func (m *mockStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error)
} }
func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil } func (m *mockStore) SaveAlertHealth(models.AlertHealthRecord) error { return nil }
func (m *mockStore) SaveLog(string) error { return nil } func (m *mockStore) SaveLog(string) error { return nil }
func (m *mockStore) PruneLogs() error { return nil }
func (m *mockStore) PruneCheckHistory() error { return nil }
func (m *mockStore) PruneStateChanges() error { return nil }
func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil }
func (m *mockStore) GetAllMaintenanceWindows(int) ([]models.MaintenanceWindow, error) { func (m *mockStore) GetAllMaintenanceWindows(int) ([]models.MaintenanceWindow, error) {
return nil, nil return nil, nil
+12 -4
View File
@@ -2,6 +2,7 @@ package store
import ( import (
"database/sql" "database/sql"
"fmt"
"log" "log"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
@@ -10,13 +11,20 @@ import (
type SQLiteDialect struct{} type SQLiteDialect struct{}
func NewSQLiteStore(path string) (*SQLStore, error) { func NewSQLiteStore(path string) (*SQLStore, error) {
s, err := NewSQLStore("sqlite3", path, &SQLiteDialect{}) // Apply pragmas via the DSN so every pooled connection gets them — a
// post-open PRAGMA Exec only affects a single connection. WAL allows
// concurrent readers alongside the single writer goroutine; busy_timeout
// rides out brief lock contention; synchronous=NORMAL is durable under WAL
// and far faster than the FULL default. (:memory: is left untouched —
// these pragmas are no-ops or harmful for the in-memory test DB.)
dsn := path
if path != ":memory:" {
dsn = fmt.Sprintf("file:%s?_journal_mode=WAL&_busy_timeout=5000&_synchronous=NORMAL", path)
}
s, err := NewSQLStore("sqlite3", dsn, &SQLiteDialect{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, err := s.db.Exec("PRAGMA journal_mode=WAL"); err != nil {
log.Printf("WAL mode failed: %v", err)
}
return s, nil return s, nil
} }
+41 -16
View File
@@ -14,7 +14,8 @@ import (
const ( const (
maxCheckHistory = 1000 maxCheckHistory = 1000
checkHistoryPruneAt = 1100 maxLogRows = 200
maxStateChangesPerSite = 5000
maxMaintenanceExport = 1000 maxMaintenanceExport = 1000
maxRequestBody = 1 << 20 maxRequestBody = 1 << 20
) )
@@ -407,21 +408,39 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
return s.SaveCheckFromNode(siteID, "", latencyNs, isUp) return s.SaveCheckFromNode(siteID, "", latencyNs, isUp)
} }
// SaveCheckFromNode inserts a single check row. Retention is handled out of
// band by PruneCheckHistory on a timer, not per-insert, to keep the write hot
// path a plain INSERT.
func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error { func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error {
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp) _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp)
if err != nil {
return err return err
} }
var count int
_ = s.db.QueryRow(s.q("SELECT COUNT(*) FROM check_history WHERE site_id = ?"), siteID).Scan(&count) // PruneCheckHistory trims check_history to the newest maxCheckHistory rows per
if count > checkHistoryPruneAt { // site, across all sites, in one pass. Intended to run periodically.
pruneQuery := fmt.Sprintf(`DELETE FROM check_history WHERE site_id = ? AND id NOT IN ( func (s *SQLStore) PruneCheckHistory() error {
SELECT id FROM check_history WHERE site_id = ? ORDER BY checked_at DESC LIMIT %d q := fmt.Sprintf(`DELETE FROM check_history WHERE id IN (
SELECT id FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC, id DESC) AS rn
FROM check_history
) ranked WHERE rn > %d
)`, maxCheckHistory) )`, maxCheckHistory)
_, err = s.db.Exec(s.q(pruneQuery), siteID, siteID) _, err := s.db.Exec(s.q(q))
return err
}
// PruneStateChanges trims state_changes to the newest maxStateChangesPerSite
// rows per site. Generous so realistic SLA windows are unaffected; bounds the
// otherwise unbounded growth of a flapping monitor's history.
func (s *SQLStore) PruneStateChanges() error {
q := fmt.Sprintf(`DELETE FROM state_changes WHERE id IN (
SELECT id FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY changed_at DESC, id DESC) AS rn
FROM state_changes
) ranked WHERE rn > %d
)`, maxStateChangesPerSite)
_, err := s.db.Exec(s.q(q))
return err return err
}
return nil
} }
func (s *SQLStore) RegisterNode(node models.ProbeNode) error { func (s *SQLStore) RegisterNode(node models.ProbeNode) error {
@@ -494,14 +513,20 @@ func (s *SQLStore) SaveAlertHealth(h models.AlertHealthRecord) error {
return err return err
} }
// SaveLog inserts a single log row. Retention is handled by PruneLogs on a
// timer, not per-insert.
func (s *SQLStore) SaveLog(message string) error { func (s *SQLStore) SaveLog(message string) error {
_, err := s.db.Exec(s.q("INSERT INTO logs (message) VALUES (?)"), message) _, err := s.db.Exec(s.q("INSERT INTO logs (message) VALUES (?)"), message)
if err != nil {
return err return err
} }
_, err = s.db.Exec(s.q(`DELETE FROM logs WHERE id NOT IN (
SELECT id FROM logs ORDER BY created_at DESC LIMIT 200 // PruneLogs trims the logs table to the newest maxLogRows rows. The id DESC
)`)) // tiebreak keeps ordering deterministic when rows share a created_at second.
func (s *SQLStore) PruneLogs() error {
q := fmt.Sprintf(`DELETE FROM logs WHERE id NOT IN (
SELECT id FROM logs ORDER BY created_at DESC, id DESC LIMIT %d
)`, maxLogRows)
_, err := s.db.Exec(s.q(q))
return err return err
} }
+64
View File
@@ -1,6 +1,7 @@
package store package store
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@@ -316,6 +317,69 @@ func TestDeleteSiteCascade(t *testing.T) {
} }
} }
func TestPruneLogs(t *testing.T) {
s := newTestStore(t)
for i := 0; i < maxLogRows+50; i++ {
if err := s.SaveLog(fmt.Sprintf("log %d", i)); err != nil {
t.Fatalf("SaveLog: %v", err)
}
}
if err := s.PruneLogs(); err != nil {
t.Fatalf("PruneLogs: %v", err)
}
logs, err := s.LoadLogs(maxLogRows * 2)
if err != nil {
t.Fatalf("LoadLogs: %v", err)
}
if len(logs) != maxLogRows {
t.Errorf("expected %d logs after prune, got %d", maxLogRows, len(logs))
}
// Newest must survive; oldest must be gone (membership, not position —
// LoadLogs ordering ties when rows share a created_at second).
present := make(map[string]bool, len(logs))
for _, l := range logs {
present[l] = true
}
if !present[fmt.Sprintf("log %d", maxLogRows+50-1)] {
t.Error("newest log was pruned")
}
if present["log 0"] {
t.Error("oldest log survived prune")
}
}
func TestPruneCheckHistory(t *testing.T) {
s := newTestStore(t)
for i := 0; i < maxCheckHistory+5; i++ {
if err := s.SaveCheck(1, int64(i), true); err != nil {
t.Fatalf("SaveCheck site 1: %v", err)
}
}
for i := 0; i < 3; i++ {
if err := s.SaveCheck(2, int64(i), true); err != nil {
t.Fatalf("SaveCheck site 2: %v", err)
}
}
if err := s.PruneCheckHistory(); err != nil {
t.Fatalf("PruneCheckHistory: %v", err)
}
history, err := s.LoadAllHistory(maxCheckHistory * 2)
if err != nil {
t.Fatalf("LoadAllHistory: %v", err)
}
if len(history[1]) != maxCheckHistory {
t.Errorf("site 1: expected %d rows after prune, got %d", maxCheckHistory, len(history[1]))
}
if len(history[2]) != 3 {
t.Errorf("site 2: expected 3 rows untouched, got %d", len(history[2]))
}
}
func TestPruneExpiredMaintenanceWindows(t *testing.T) { func TestPruneExpiredMaintenanceWindows(t *testing.T) {
s := newTestStore(t) s := newTestStore(t)
+3
View File
@@ -39,11 +39,13 @@ type Store interface {
SaveCheck(siteID int, latencyNs int64, isUp bool) error SaveCheck(siteID int, latencyNs int64, isUp bool) error
SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error
LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error)
PruneCheckHistory() error
// State Changes // State Changes
SaveStateChange(siteID int, fromStatus, toStatus, errorReason string) error SaveStateChange(siteID int, fromStatus, toStatus, errorReason string) error
GetStateChanges(siteID int, limit int) ([]models.StateChange, error) GetStateChanges(siteID int, limit int) ([]models.StateChange, error)
GetStateChangesSince(siteID int, since time.Time) ([]models.StateChange, error) GetStateChangesSince(siteID int, since time.Time) ([]models.StateChange, error)
PruneStateChanges() error
// Nodes // Nodes
RegisterNode(node models.ProbeNode) error RegisterNode(node models.ProbeNode) error
@@ -59,6 +61,7 @@ type Store interface {
// Logs // Logs
SaveLog(message string) error SaveLog(message string) error
LoadLogs(limit int) ([]string, error) LoadLogs(limit int) ([]string, error)
PruneLogs() error
// Maintenance Windows // Maintenance Windows
GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error)