fix(monitor): serialize DB writes through a single drained writer
CI / test (pull_request) Successful in 2m36s
CI / lint (pull_request) Successful in 56s
CI / vulncheck (pull_request) Successful in 51s

Every check spawned `go e.db.Save*(...)` with the error discarded: a
fire-and-forget goroutine per log line, check, state change, and alert
health update. SaveLog ran a full-table prune DELETE on every insert and
SaveCheck a COUNT + conditional prune on every check, so the hot path
amplified each write into several statements. Nothing tracked these
goroutines, so at shutdown they raced the store's Close() — writes to a
closing DB, silently swallowed.

Introduce a single writer goroutine that drains a buffered channel of
typed dbWrite values (log/check/state-change/alert-health). Writes are
enqueued non-blocking; a saturated queue drops and notes it in the
in-memory log rather than blocking the check loop. Write errors are now
logged instead of discarded. Retention moves off the hot path: SaveLog
and SaveCheck become plain INSERTs, and PruneLogs/PruneCheckHistory/
PruneStateChanges run on a 10-minute timer inside the writer (single
keep-newest-N-per-site pass via a window function). state_changes was
previously never pruned — now bounded.

Add Engine.Stop(): cancels the engine's context, then waits for the
writer to drain every buffered write before returning. main wires it in
before the deferred store Close() so no write races a closed DB.

SQLite gains busy_timeout=5000 and synchronous=NORMAL, applied via the
DSN so every pooled connection inherits them (a post-open PRAGMA only
touches one connection); WAL moves to the DSN too. :memory: test DBs are
left as-is.

Tests: writer drains on Stop, Stop is idempotent, and the prune queries
keep newest-N per site / N logs on real SQLite. Full suite green under
-race.
This commit was merged in pull request #99.
This commit is contained in:
2026-06-10 17:11:12 -04:00
parent 5e7faf9ea7
commit 8b39d4c1a1
12 changed files with 344 additions and 39 deletions
+46
View File
@@ -0,0 +1,46 @@
package monitor
import (
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store"
)
// dbWrite is a single unit of deferred persistence. The engine enqueues these
// onto a buffered channel; a single writer goroutine drains and executes them,
// serializing all writes through one connection and surfacing errors instead of
// discarding them. desc names the write for diagnostics on drop/failure.
type dbWrite interface {
exec(s store.Store) error
desc() string
}
type writeLog struct{ message string }
func (w writeLog) exec(s store.Store) error { return s.SaveLog(w.message) }
func (w writeLog) desc() string { return "log" }
type writeCheck struct {
siteID int
latencyNs int64
isUp bool
}
func (w writeCheck) exec(s store.Store) error { return s.SaveCheck(w.siteID, w.latencyNs, w.isUp) }
func (w writeCheck) desc() string { return "check" }
type writeStateChange struct {
siteID int
fromStatus string
toStatus string
reason string
}
func (w writeStateChange) exec(s store.Store) error {
return s.SaveStateChange(w.siteID, w.fromStatus, w.toStatus, w.reason)
}
func (w writeStateChange) desc() string { return "state-change" }
type writeAlertHealth struct{ rec models.AlertHealthRecord }
func (w writeAlertHealth) exec(s store.Store) error { return s.SaveAlertHealth(w.rec) }
func (w writeAlertHealth) desc() string { return "alert-health" }
+1 -1
View File
@@ -61,7 +61,7 @@ func (e *Engine) recordCheck(siteID int, latency time.Duration, isUp bool) {
h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:]
}
go func() { _ = e.db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }()
e.enqueueWrite(writeCheck{siteID: siteID, latencyNs: latency.Nanoseconds(), isUp: isUp})
}
func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) {
+107 -11
View File
@@ -23,6 +23,8 @@ const (
minPushGrace = 60 * time.Second
maintPruneInterval = 15 * time.Minute
defaultMaintRetention = 7 * 24 * time.Hour
dbWriteBuffer = 4096
dbPruneInterval = 10 * time.Minute
)
type AlertHealth struct {
@@ -64,6 +66,11 @@ type Engine struct {
maintRetention time.Duration
strictClient *http.Client
insecureClient *http.Client
dbWrites chan dbWrite
writerWG sync.WaitGroup
cancel context.CancelFunc
stopOnce sync.Once
}
func NewEngine(s store.Store) *Engine {
@@ -87,6 +94,7 @@ func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
isActive: true,
allowPrivateTargets: allowPrivateTargets,
maintRetention: defaultMaintRetention,
dbWrites: make(chan dbWrite, dbWriteBuffer),
db: s,
strictClient: &http.Client{
Transport: &http.Transport{
@@ -133,16 +141,98 @@ func fmtDurationShort(d time.Duration) string {
return fmt.Sprintf("%dd %dh", int(d.Hours())/24, int(d.Hours())%24)
}
func (e *Engine) AddLog(msg string) {
e.logMu.Lock()
defer e.logMu.Unlock()
// appendLog adds a timestamped entry to the in-memory ring buffer and returns
// it. It never touches the database, so it is safe to call from the db-write
// drop/error path without recursing back through the write queue.
func (e *Engine) appendLog(msg string) string {
ts := time.Now().Format("15:04:05")
entry := fmt.Sprintf("[%s] %s", ts, sanitizeLog(msg))
e.logMu.Lock()
e.logStore = append([]string{entry}, e.logStore...)
if len(e.logStore) > maxLogEntries {
e.logStore = e.logStore[:maxLogEntries]
}
go func() { _ = e.db.SaveLog(entry) }()
e.logMu.Unlock()
return entry
}
func (e *Engine) AddLog(msg string) {
entry := e.appendLog(msg)
e.enqueueWrite(writeLog{message: entry})
}
// enqueueWrite hands a persistence task to the writer goroutine without
// blocking the caller. If the queue is saturated the write is dropped and noted
// in the in-memory log only (never re-enqueued, to avoid recursion via AddLog).
func (e *Engine) enqueueWrite(w dbWrite) {
select {
case e.dbWrites <- w:
default:
e.appendLog(fmt.Sprintf("db write queue full, dropped %s", w.desc()))
}
}
// dbWriter is the single goroutine that owns all writes. Serializing writes
// through one path removes the fire-and-forget goroutine pile-up, surfaces
// errors, and lets retention run on a timer instead of per-insert. It drains
// any buffered writes on shutdown before returning.
func (e *Engine) dbWriter(ctx context.Context) {
defer e.writerWG.Done()
pruneTicker := time.NewTicker(dbPruneInterval)
defer pruneTicker.Stop()
e.prune()
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed: %v", w.desc(), err))
}
case <-pruneTicker.C:
e.prune()
case <-ctx.Done():
e.drainWrites()
return
}
}
}
// drainWrites flushes everything still buffered, best-effort, at shutdown.
func (e *Engine) drainWrites() {
for {
select {
case w := <-e.dbWrites:
if err := w.exec(e.db); err != nil {
e.appendLog(fmt.Sprintf("db %s write failed (drain): %v", w.desc(), err))
}
default:
return
}
}
}
func (e *Engine) prune() {
if err := e.db.PruneLogs(); err != nil {
e.appendLog(fmt.Sprintf("log prune failed: %v", err))
}
if err := e.db.PruneCheckHistory(); err != nil {
e.appendLog(fmt.Sprintf("check-history prune failed: %v", err))
}
if err := e.db.PruneStateChanges(); err != nil {
e.appendLog(fmt.Sprintf("state-change prune failed: %v", err))
}
}
// Stop signals the writer goroutine to drain and exit, then blocks until it
// has. Call it before closing the store so no write races a closed DB.
func (e *Engine) Stop() {
e.stopOnce.Do(func() {
if e.cancel != nil {
e.cancel()
}
e.writerWG.Wait()
})
}
func (e *Engine) InitLogs() {
@@ -280,7 +370,7 @@ func (e *Engine) RecordHeartbeat(token string) bool {
}
if prevStatus != "UP" && prevStatus != "PENDING" {
go func() { _ = e.db.SaveStateChange(targetID, prevStatus, "UP", "") }()
e.enqueueWrite(writeStateChange{siteID: targetID, fromStatus: prevStatus, toStatus: "UP"})
}
return true
@@ -302,6 +392,14 @@ func (e *Engine) removeFromTokenIndex(id int) {
}
func (e *Engine) Start(ctx context.Context) {
// e.cancel is invoked by Stop() to drain and halt the writer; gosec can't
// trace the cross-method call, and cancelling the parent reaps this child
// regardless, so the leak it warns about can't occur.
ctx, e.cancel = context.WithCancel(ctx) //nolint:gosec // cancel is called in Stop()
e.writerWG.Add(1)
go e.dbWriter(ctx)
go func() {
for {
select {
@@ -708,7 +806,7 @@ func (e *Engine) handleStatusChange(snap models.Site, rawStatus string, code int
}
if changed && prev != "PENDING" {
go func() { _ = e.db.SaveStateChange(snap.ID, prev, next, errorReason) }()
e.enqueueWrite(writeStateChange{siteID: snap.ID, fromStatus: prev, toStatus: next, reason: errorReason})
}
if sslWarnFire {
@@ -790,17 +888,15 @@ func (e *Engine) recordAlertResult(alertID int, ok bool, errMsg string) {
}
e.alertHealth[alertID] = h
// Persist best-effort so health survives restarts; DB IO off the alert path.
go func(rec models.AlertHealthRecord) {
_ = e.db.SaveAlertHealth(rec)
}(models.AlertHealthRecord{
// Persist so health survives restarts; DB IO off the alert path.
e.enqueueWrite(writeAlertHealth{rec: models.AlertHealthRecord{
AlertID: alertID,
LastSendAt: h.LastSendAt,
LastSendOK: h.LastSendOK,
LastError: h.LastError,
SendCount: h.SendCount,
FailCount: h.FailCount,
})
}})
}
func (e *Engine) GetAlertHealth(alertID int) AlertHealth {
+50
View File
@@ -1,6 +1,7 @@
package monitor
import (
"context"
"fmt"
"sync"
"testing"
@@ -145,6 +146,10 @@ func (m *mockStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, err
return m.history, nil
}
func (m *mockStore) PruneLogs() error { return nil }
func (m *mockStore) PruneCheckHistory() error { return nil }
func (m *mockStore) PruneStateChanges() error { return nil }
// --- Helpers ---
func newTestEngine(ms *mockStore) *Engine {
@@ -1167,6 +1172,51 @@ func TestHandleStatusChange_RemovedSiteDropped(t *testing.T) {
}
}
// --- Group 11: single DB writer ---
// Writes enqueued through the engine are persisted by the writer goroutine and
// fully drained when the engine stops — no fire-and-forget, no lost writes.
func TestDBWriter_DrainsOnStop(t *testing.T) {
ms := newMockStore()
e := newTestEngine(ms)
e.Start(context.Background())
e.enqueueWrite(writeCheck{siteID: 7, latencyNs: 100, isUp: true})
e.enqueueWrite(writeLog{message: "drain-me"})
e.Stop() // blocks until the writer has drained the queue
ms.mu.Lock()
defer ms.mu.Unlock()
gotCheck := false
for _, c := range ms.savedChecks {
if c.SiteID == 7 {
gotCheck = true
}
}
if !gotCheck {
t.Error("check was not persisted before Stop returned")
}
gotLog := false
for _, l := range ms.savedLogs {
if l == "drain-me" {
gotLog = true
}
}
if !gotLog {
t.Error("log was not persisted before Stop returned")
}
}
// Stop must be idempotent — safe to call more than once.
func TestEngineStop_Idempotent(t *testing.T) {
ms := newMockStore()
e := newTestEngine(ms)
e.Start(context.Background())
e.Stop()
e.Stop() // must not panic or block
}
// --- Utilities ---
func containsStr(s, substr string) bool {