refactor(store): propagate context.Context through all Store methods
Every Store interface method (except Close) now takes context.Context as first parameter. All 54 db.Query/Exec/QueryRow calls in SQLStore replaced with their *Context variants. DB operations now respect cancellation and deadlines. Context sources by caller: - Engine dbWriter/poll/pruner: engine ctx from Start() - HTTP handlers: r.Context() - config.Apply/Export: caller-provided ctx - TUI/main.go init: context.Background() RunCheck and all sub-checks (HTTP/ping/port/DNS) accept parent ctx. HTTP checks now inherit shutdown cancellation instead of rooting in context.Background(). dbWrite.exec takes ctx so the writer goroutine can cancel stuck DB operations. DeleteSite/ImportData use BeginTx(ctx) instead of Begin().
This commit is contained in:
+110
-109
@@ -1,6 +1,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
@@ -73,14 +74,14 @@ func (s *SQLStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
func (s *SQLStore) Init() error {
|
||||
func (s *SQLStore) Init(ctx context.Context) error {
|
||||
for _, stmt := range s.dialect.CreateTablesSQL() {
|
||||
if _, err := s.db.Exec(stmt); err != nil {
|
||||
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, m := range s.dialect.MigrationsSQL() {
|
||||
if _, err := s.db.Exec(m); err != nil {
|
||||
if _, err := s.db.ExecContext(ctx, m); err != nil {
|
||||
errMsg := err.Error()
|
||||
if strings.Contains(errMsg, "already exists") || strings.Contains(errMsg, "duplicate column") {
|
||||
continue
|
||||
@@ -91,13 +92,13 @@ func (s *SQLStore) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetSites() ([]models.Site, error) {
|
||||
func (s *SQLStore) GetSites(ctx context.Context) ([]models.Site, error) {
|
||||
bf := s.dialect.BoolFalse()
|
||||
query := fmt.Sprintf( //nolint:gosec // bf is a dialect boolean literal, not user input
|
||||
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites",
|
||||
bf, bf,
|
||||
)
|
||||
rows, err := s.db.Query(query)
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -116,7 +117,7 @@ func (s *SQLStore) GetSites() ([]models.Site, error) {
|
||||
return sites, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddSite(site models.Site) error {
|
||||
func (s *SQLStore) AddSite(ctx context.Context, site models.Site) error {
|
||||
token := ""
|
||||
if site.Type == "push" {
|
||||
var err error
|
||||
@@ -125,15 +126,15 @@ func (s *SQLStore) AddSite(site models.Site) error {
|
||||
return fmt.Errorf("generate push token: %w", err)
|
||||
}
|
||||
}
|
||||
_, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
|
||||
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) UpdateSite(site models.Site) error {
|
||||
func (s *SQLStore) UpdateSite(ctx context.Context, site models.Site) error {
|
||||
var existingToken string
|
||||
_ = s.db.QueryRow(s.q("SELECT token FROM sites WHERE id=?"), site.ID).Scan(&existingToken) //nolint:errcheck
|
||||
_ = s.db.QueryRowContext(ctx, s.q("SELECT token FROM sites WHERE id=?"), site.ID).Scan(&existingToken) //nolint:errcheck
|
||||
if site.Type == "push" && existingToken == "" {
|
||||
var err error
|
||||
existingToken, err = generateToken()
|
||||
@@ -141,19 +142,19 @@ func (s *SQLStore) UpdateSite(site models.Site) error {
|
||||
return fmt.Errorf("generate push token: %w", err)
|
||||
}
|
||||
}
|
||||
_, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=?, regions=? WHERE id=?"),
|
||||
_, err := s.db.ExecContext(ctx, s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=?, regions=? WHERE id=?"),
|
||||
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
|
||||
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions, site.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) UpdateSitePaused(id int, paused bool) error {
|
||||
_, err := s.db.Exec(s.q("UPDATE sites SET paused=? WHERE id=?"), paused, id)
|
||||
func (s *SQLStore) UpdateSitePaused(ctx context.Context, id int, paused bool) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("UPDATE sites SET paused=? WHERE id=?"), paused, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) DeleteSite(id int) error {
|
||||
tx, err := s.db.Begin()
|
||||
func (s *SQLStore) DeleteSite(ctx context.Context, id int) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -165,7 +166,7 @@ func (s *SQLStore) DeleteSite(id int) error {
|
||||
"DELETE FROM state_changes WHERE site_id = ?",
|
||||
"DELETE FROM sites WHERE id = ?",
|
||||
} {
|
||||
if _, err := tx.Exec(s.q(q), id); err != nil {
|
||||
if _, err := tx.ExecContext(ctx, s.q(q), id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -177,14 +178,14 @@ func (s *SQLStore) DeleteSite(id int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetSiteByName(name string) (models.Site, error) {
|
||||
func (s *SQLStore) GetSiteByName(ctx context.Context, name string) (models.Site, error) {
|
||||
bf := s.dialect.BoolFalse()
|
||||
query := fmt.Sprintf( //nolint:gosec // bf is a dialect boolean literal, not user input
|
||||
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites WHERE name = %s",
|
||||
bf, bf, s.q("?"),
|
||||
)
|
||||
var st models.Site
|
||||
err := s.db.QueryRow(query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
|
||||
err := s.db.QueryRowContext(ctx, query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
|
||||
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
|
||||
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
|
||||
&st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions)
|
||||
@@ -211,10 +212,10 @@ func (s *SQLStore) marshalSettings(settings map[string]string) (string, error) {
|
||||
return s.encryptSettings(string(jsonBytes))
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAlertByName(name string) (models.AlertConfig, error) {
|
||||
func (s *SQLStore) GetAlertByName(ctx context.Context, name string) (models.AlertConfig, error) {
|
||||
var a models.AlertConfig
|
||||
var settingsRaw string
|
||||
err := s.db.QueryRow(s.q("SELECT id, name, type, settings FROM alerts WHERE name = ?"), name).Scan(&a.ID, &a.Name, &a.Type, &settingsRaw)
|
||||
err := s.db.QueryRowContext(ctx, s.q("SELECT id, name, type, settings FROM alerts WHERE name = ?"), name).Scan(&a.ID, &a.Name, &a.Type, &settingsRaw)
|
||||
if err != nil {
|
||||
return a, err
|
||||
}
|
||||
@@ -225,7 +226,7 @@ func (s *SQLStore) GetAlertByName(name string) (models.AlertConfig, error) {
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddSiteReturningID(site models.Site) (int, error) {
|
||||
func (s *SQLStore) AddSiteReturningID(ctx context.Context, site models.Site) (int, error) {
|
||||
token := ""
|
||||
if site.Type == "push" {
|
||||
var err error
|
||||
@@ -236,12 +237,12 @@ func (s *SQLStore) AddSiteReturningID(site models.Site) (int, error) {
|
||||
}
|
||||
if s.dollar {
|
||||
var id int
|
||||
err := s.db.QueryRow(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id"),
|
||||
err := s.db.QueryRowContext(ctx, s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id"),
|
||||
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
|
||||
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
result, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
result, err := s.db.ExecContext(ctx, s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
|
||||
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions)
|
||||
if err != nil {
|
||||
@@ -251,17 +252,17 @@ func (s *SQLStore) AddSiteReturningID(site models.Site) (int, error) {
|
||||
return int(id), err
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddAlertReturningID(name, aType string, settings map[string]string) (int, error) {
|
||||
func (s *SQLStore) AddAlertReturningID(ctx context.Context, name, aType string, settings map[string]string) (int, error) {
|
||||
stored, err := s.marshalSettings(settings)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if s.dollar {
|
||||
var id int
|
||||
err := s.db.QueryRow(s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?) RETURNING id"), name, aType, stored).Scan(&id)
|
||||
err := s.db.QueryRowContext(ctx, s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?) RETURNING id"), name, aType, stored).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
result, err := s.db.Exec(s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)"), name, aType, stored)
|
||||
result, err := s.db.ExecContext(ctx, s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)"), name, aType, stored)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -269,8 +270,8 @@ func (s *SQLStore) AddAlertReturningID(name, aType string, settings map[string]s
|
||||
return int(id), err
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAllAlerts() ([]models.AlertConfig, error) {
|
||||
rows, err := s.db.Query("SELECT id, name, type, settings FROM alerts")
|
||||
func (s *SQLStore) GetAllAlerts(ctx context.Context) ([]models.AlertConfig, error) {
|
||||
rows, err := s.db.QueryContext(ctx, "SELECT id, name, type, settings FROM alerts")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -291,10 +292,10 @@ func (s *SQLStore) GetAllAlerts() ([]models.AlertConfig, error) {
|
||||
return alerts, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAlert(id int) (models.AlertConfig, error) {
|
||||
func (s *SQLStore) GetAlert(ctx context.Context, id int) (models.AlertConfig, error) {
|
||||
var a models.AlertConfig
|
||||
var settingsRaw string
|
||||
err := s.db.QueryRow(s.q("SELECT id, name, type, settings FROM alerts WHERE id = ?"), id).Scan(&a.ID, &a.Name, &a.Type, &settingsRaw)
|
||||
err := s.db.QueryRowContext(ctx, s.q("SELECT id, name, type, settings FROM alerts WHERE id = ?"), id).Scan(&a.ID, &a.Name, &a.Type, &settingsRaw)
|
||||
if err != nil {
|
||||
return a, err
|
||||
}
|
||||
@@ -305,26 +306,26 @@ func (s *SQLStore) GetAlert(id int) (models.AlertConfig, error) {
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddAlert(name, aType string, settings map[string]string) error {
|
||||
func (s *SQLStore) AddAlert(ctx context.Context, name, aType string, settings map[string]string) error {
|
||||
stored, err := s.marshalSettings(settings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.Exec(s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)"), name, aType, stored)
|
||||
_, err = s.db.ExecContext(ctx, s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)"), name, aType, stored)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) UpdateAlert(id int, name, aType string, settings map[string]string) error {
|
||||
func (s *SQLStore) UpdateAlert(ctx context.Context, id int, name, aType string, settings map[string]string) error {
|
||||
stored, err := s.marshalSettings(settings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.Exec(s.q("UPDATE alerts SET name=?, type=?, settings=? WHERE id=?"), name, aType, stored, id)
|
||||
_, err = s.db.ExecContext(ctx, s.q("UPDATE alerts SET name=?, type=?, settings=? WHERE id=?"), name, aType, stored, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) DeleteAlert(id int) error {
|
||||
_, err := s.db.Exec(s.q("DELETE FROM alerts WHERE id=?"), id)
|
||||
func (s *SQLStore) DeleteAlert(ctx context.Context, id int) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("DELETE FROM alerts WHERE id=?"), id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -332,8 +333,8 @@ func (s *SQLStore) DeleteAlert(id int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAllUsers() ([]models.User, error) {
|
||||
rows, err := s.db.Query("SELECT id, username, public_key, role FROM users")
|
||||
func (s *SQLStore) GetAllUsers(ctx context.Context) ([]models.User, error) {
|
||||
rows, err := s.db.QueryContext(ctx, "SELECT id, username, public_key, role FROM users")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -349,29 +350,29 @@ func (s *SQLStore) GetAllUsers() ([]models.User, error) {
|
||||
return users, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddUser(username, publicKey, role string) error {
|
||||
_, err := s.db.Exec(s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), username, publicKey, role)
|
||||
func (s *SQLStore) AddUser(ctx context.Context, username, publicKey, role string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), username, publicKey, role)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) UpdateUser(id int, username, publicKey, role string) error {
|
||||
_, err := s.db.Exec(s.q("UPDATE users SET username=?, public_key=?, role=? WHERE id=?"), username, publicKey, role, id)
|
||||
func (s *SQLStore) UpdateUser(ctx context.Context, id int, username, publicKey, role string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("UPDATE users SET username=?, public_key=?, role=? WHERE id=?"), username, publicKey, role, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) DeleteUser(id int) error {
|
||||
_, err := s.db.Exec(s.q("DELETE FROM users WHERE id=?"), id)
|
||||
func (s *SQLStore) DeleteUser(ctx context.Context, id int) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("DELETE FROM users WHERE id=?"), id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) SaveStateChange(siteID int, fromStatus, toStatus, errorReason string) error {
|
||||
_, err := s.db.Exec(s.q("INSERT INTO state_changes (site_id, from_status, to_status, error_reason) VALUES (?, ?, ?, ?)"),
|
||||
func (s *SQLStore) SaveStateChange(ctx context.Context, siteID int, fromStatus, toStatus, errorReason string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO state_changes (site_id, from_status, to_status, error_reason) VALUES (?, ?, ?, ?)"),
|
||||
siteID, fromStatus, toStatus, errorReason)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetStateChanges(siteID int, limit int) ([]models.StateChange, error) {
|
||||
rows, err := s.db.Query(s.q("SELECT id, site_id, from_status, to_status, error_reason, changed_at FROM state_changes WHERE site_id = ? ORDER BY changed_at DESC LIMIT ?"), siteID, limit)
|
||||
func (s *SQLStore) GetStateChanges(ctx context.Context, siteID int, limit int) ([]models.StateChange, error) {
|
||||
rows, err := s.db.QueryContext(ctx, s.q("SELECT id, site_id, from_status, to_status, error_reason, changed_at FROM state_changes WHERE site_id = ? ORDER BY changed_at DESC LIMIT ?"), siteID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -387,8 +388,8 @@ func (s *SQLStore) GetStateChanges(siteID int, limit int) ([]models.StateChange,
|
||||
return changes, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetStateChangesSince(siteID int, since time.Time) ([]models.StateChange, error) {
|
||||
rows, err := s.db.Query(s.q("SELECT id, site_id, from_status, to_status, error_reason, changed_at FROM state_changes WHERE site_id = ? AND changed_at >= ? ORDER BY changed_at DESC"), siteID, since)
|
||||
func (s *SQLStore) GetStateChangesSince(ctx context.Context, siteID int, since time.Time) ([]models.StateChange, error) {
|
||||
rows, err := s.db.QueryContext(ctx, s.q("SELECT id, site_id, from_status, to_status, error_reason, changed_at FROM state_changes WHERE site_id = ? AND changed_at >= ? ORDER BY changed_at DESC"), siteID, since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -404,59 +405,59 @@ func (s *SQLStore) GetStateChangesSince(siteID int, since time.Time) ([]models.S
|
||||
return changes, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
|
||||
return s.SaveCheckFromNode(siteID, "", latencyNs, isUp)
|
||||
func (s *SQLStore) SaveCheck(ctx context.Context, siteID int, latencyNs int64, isUp bool) error {
|
||||
return s.SaveCheckFromNode(ctx, siteID, "", latencyNs, isUp)
|
||||
}
|
||||
|
||||
// SaveCheckFromNode inserts a single check row. Retention is handled out of
|
||||
// band by PruneCheckHistory on a timer, not per-insert, to keep the write hot
|
||||
// path a plain INSERT.
|
||||
func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error {
|
||||
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp)
|
||||
func (s *SQLStore) SaveCheckFromNode(ctx context.Context, siteID int, nodeID string, latencyNs int64, isUp bool) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp)
|
||||
return err
|
||||
}
|
||||
|
||||
// PruneCheckHistory trims check_history to the newest maxCheckHistory rows per
|
||||
// site, across all sites, in one pass. Intended to run periodically.
|
||||
func (s *SQLStore) PruneCheckHistory() error {
|
||||
func (s *SQLStore) PruneCheckHistory(ctx context.Context) error {
|
||||
q := fmt.Sprintf(`DELETE FROM check_history WHERE id IN (
|
||||
SELECT id FROM (
|
||||
SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC, id DESC) AS rn
|
||||
FROM check_history
|
||||
) ranked WHERE rn > %d
|
||||
)`, maxCheckHistory)
|
||||
_, err := s.db.Exec(s.q(q))
|
||||
_, err := s.db.ExecContext(ctx, s.q(q))
|
||||
return err
|
||||
}
|
||||
|
||||
// PruneStateChanges trims state_changes to the newest maxStateChangesPerSite
|
||||
// rows per site. Generous so realistic SLA windows are unaffected; bounds the
|
||||
// otherwise unbounded growth of a flapping monitor's history.
|
||||
func (s *SQLStore) PruneStateChanges() error {
|
||||
func (s *SQLStore) PruneStateChanges(ctx context.Context) error {
|
||||
q := fmt.Sprintf(`DELETE FROM state_changes WHERE id IN (
|
||||
SELECT id FROM (
|
||||
SELECT id, ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY changed_at DESC, id DESC) AS rn
|
||||
FROM state_changes
|
||||
) ranked WHERE rn > %d
|
||||
)`, maxStateChangesPerSite)
|
||||
_, err := s.db.Exec(s.q(q))
|
||||
_, err := s.db.ExecContext(ctx, s.q(q))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) RegisterNode(node models.ProbeNode) error {
|
||||
_, err := s.db.Exec(s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version)
|
||||
func (s *SQLStore) RegisterNode(ctx context.Context, node models.ProbeNode) error {
|
||||
_, err := s.db.ExecContext(ctx, s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetNode(id string) (models.ProbeNode, error) {
|
||||
func (s *SQLStore) GetNode(ctx context.Context, id string) (models.ProbeNode, error) {
|
||||
var n models.ProbeNode
|
||||
err := s.db.QueryRow(s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id).
|
||||
err := s.db.QueryRowContext(ctx, s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id).
|
||||
Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) {
|
||||
rows, err := s.db.Query("SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name")
|
||||
func (s *SQLStore) GetAllNodes(ctx context.Context) ([]models.ProbeNode, error) {
|
||||
rows, err := s.db.QueryContext(ctx, "SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -472,18 +473,18 @@ func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) {
|
||||
return nodes, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) UpdateNodeLastSeen(id string) error {
|
||||
_, err := s.db.Exec(s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id)
|
||||
func (s *SQLStore) UpdateNodeLastSeen(ctx context.Context, id string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) DeleteNode(id string) error {
|
||||
_, err := s.db.Exec(s.q("DELETE FROM nodes WHERE id = ?"), id)
|
||||
func (s *SQLStore) DeleteNode(ctx context.Context, id string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("DELETE FROM nodes WHERE id = ?"), id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) {
|
||||
rows, err := s.db.Query("SELECT alert_id, last_send_at, last_send_ok, last_error, send_count, fail_count FROM alert_health")
|
||||
func (s *SQLStore) LoadAlertHealth(ctx context.Context) (map[int]models.AlertHealthRecord, error) {
|
||||
rows, err := s.db.QueryContext(ctx, "SELECT alert_id, last_send_at, last_send_ok, last_error, send_count, fail_count FROM alert_health")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -503,35 +504,35 @@ func (s *SQLStore) LoadAlertHealth() (map[int]models.AlertHealthRecord, error) {
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) SaveAlertHealth(h models.AlertHealthRecord) error {
|
||||
func (s *SQLStore) SaveAlertHealth(ctx context.Context, h models.AlertHealthRecord) error {
|
||||
var lastSend interface{}
|
||||
if !h.LastSendAt.IsZero() {
|
||||
lastSend = h.LastSendAt
|
||||
}
|
||||
_, err := s.db.Exec(s.dialect.UpsertAlertHealthSQL(),
|
||||
_, err := s.db.ExecContext(ctx, s.dialect.UpsertAlertHealthSQL(),
|
||||
h.AlertID, lastSend, h.LastSendOK, h.LastError, h.SendCount, h.FailCount)
|
||||
return err
|
||||
}
|
||||
|
||||
// SaveLog inserts a single log row. Retention is handled by PruneLogs on a
|
||||
// timer, not per-insert.
|
||||
func (s *SQLStore) SaveLog(message string) error {
|
||||
_, err := s.db.Exec(s.q("INSERT INTO logs (message) VALUES (?)"), message)
|
||||
func (s *SQLStore) SaveLog(ctx context.Context, message string) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO logs (message) VALUES (?)"), message)
|
||||
return err
|
||||
}
|
||||
|
||||
// PruneLogs trims the logs table to the newest maxLogRows rows. The id DESC
|
||||
// tiebreak keeps ordering deterministic when rows share a created_at second.
|
||||
func (s *SQLStore) PruneLogs() error {
|
||||
func (s *SQLStore) PruneLogs(ctx context.Context) error {
|
||||
q := fmt.Sprintf(`DELETE FROM logs WHERE id NOT IN (
|
||||
SELECT id FROM logs ORDER BY created_at DESC, id DESC LIMIT %d
|
||||
)`, maxLogRows)
|
||||
_, err := s.db.Exec(s.q(q))
|
||||
_, err := s.db.ExecContext(ctx, s.q(q))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) LoadLogs(limit int) ([]string, error) {
|
||||
rows, err := s.db.Query(s.q("SELECT message FROM logs ORDER BY created_at DESC LIMIT ?"), limit)
|
||||
func (s *SQLStore) LoadLogs(ctx context.Context, limit int) ([]string, error) {
|
||||
rows, err := s.db.QueryContext(ctx, s.q("SELECT message FROM logs ORDER BY created_at DESC LIMIT ?"), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -547,9 +548,9 @@ func (s *SQLStore) LoadLogs(limit int) ([]string, error) {
|
||||
return logs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) {
|
||||
func (s *SQLStore) LoadAllHistory(ctx context.Context, limit int) (map[int][]models.CheckRecord, error) {
|
||||
result := make(map[int][]models.CheckRecord)
|
||||
rows, err := s.db.Query(s.q(`
|
||||
rows, err := s.db.QueryContext(ctx, s.q(`
|
||||
SELECT site_id, latency_ns, is_up FROM (
|
||||
SELECT site_id, latency_ns, is_up,
|
||||
ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC) AS rn
|
||||
@@ -587,8 +588,8 @@ func (s *SQLStore) scanMaintenanceWindow(rows *sql.Rows) (models.MaintenanceWind
|
||||
return mw, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, error) {
|
||||
rows, err := s.db.Query(s.q("SELECT id, monitor_id, title, description, type, start_time, end_time, created_by, created_at FROM maintenance_windows WHERE start_time <= CURRENT_TIMESTAMP AND (end_time IS NULL OR end_time > CURRENT_TIMESTAMP) ORDER BY start_time DESC"))
|
||||
func (s *SQLStore) GetActiveMaintenanceWindows(ctx context.Context) ([]models.MaintenanceWindow, error) {
|
||||
rows, err := s.db.QueryContext(ctx, s.q("SELECT id, monitor_id, title, description, type, start_time, end_time, created_by, created_at FROM maintenance_windows WHERE start_time <= CURRENT_TIMESTAMP AND (end_time IS NULL OR end_time > CURRENT_TIMESTAMP) ORDER BY start_time DESC"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -604,8 +605,8 @@ func (s *SQLStore) GetActiveMaintenanceWindows() ([]models.MaintenanceWindow, er
|
||||
return windows, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetAllMaintenanceWindows(limit int) ([]models.MaintenanceWindow, error) {
|
||||
rows, err := s.db.Query(s.q("SELECT id, monitor_id, title, description, type, start_time, end_time, created_by, created_at FROM maintenance_windows ORDER BY created_at DESC LIMIT ?"), limit)
|
||||
func (s *SQLStore) GetAllMaintenanceWindows(ctx context.Context, limit int) ([]models.MaintenanceWindow, error) {
|
||||
rows, err := s.db.QueryContext(ctx, s.q("SELECT id, monitor_id, title, description, type, start_time, end_time, created_by, created_at FROM maintenance_windows ORDER BY created_at DESC LIMIT ?"), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -621,22 +622,22 @@ func (s *SQLStore) GetAllMaintenanceWindows(limit int) ([]models.MaintenanceWind
|
||||
return windows, rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLStore) AddMaintenanceWindow(mw models.MaintenanceWindow) error {
|
||||
func (s *SQLStore) AddMaintenanceWindow(ctx context.Context, mw models.MaintenanceWindow) error {
|
||||
if mw.StartTime.IsZero() {
|
||||
mw.StartTime = time.Now()
|
||||
}
|
||||
_, err := s.db.Exec(s.q("INSERT INTO maintenance_windows (monitor_id, title, description, type, start_time, end_time, created_by) VALUES (?, ?, ?, ?, ?, ?, ?)"),
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO maintenance_windows (monitor_id, title, description, type, start_time, end_time, created_by) VALUES (?, ?, ?, ?, ?, ?, ?)"),
|
||||
mw.MonitorID, mw.Title, mw.Description, mw.Type, mw.StartTime, sql.NullTime{Time: mw.EndTime, Valid: !mw.EndTime.IsZero()}, mw.CreatedBy)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) EndMaintenanceWindow(id int) error {
|
||||
_, err := s.db.Exec(s.q("UPDATE maintenance_windows SET end_time = CURRENT_TIMESTAMP WHERE id = ?"), id)
|
||||
func (s *SQLStore) EndMaintenanceWindow(ctx context.Context, id int) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("UPDATE maintenance_windows SET end_time = CURRENT_TIMESTAMP WHERE id = ?"), id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) DeleteMaintenanceWindow(id int) error {
|
||||
_, err := s.db.Exec(s.q("DELETE FROM maintenance_windows WHERE id = ?"), id)
|
||||
func (s *SQLStore) DeleteMaintenanceWindow(ctx context.Context, id int) error {
|
||||
_, err := s.db.ExecContext(ctx, s.q("DELETE FROM maintenance_windows WHERE id = ?"), id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -644,9 +645,9 @@ func (s *SQLStore) DeleteMaintenanceWindow(id int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) PruneExpiredMaintenanceWindows(retention time.Duration) (int64, error) {
|
||||
func (s *SQLStore) PruneExpiredMaintenanceWindows(ctx context.Context, retention time.Duration) (int64, error) {
|
||||
cutoff := time.Now().Add(-retention)
|
||||
result, err := s.db.Exec(
|
||||
result, err := s.db.ExecContext(ctx,
|
||||
s.q("DELETE FROM maintenance_windows WHERE end_time IS NOT NULL AND end_time < ?"),
|
||||
cutoff,
|
||||
)
|
||||
@@ -656,9 +657,9 @@ func (s *SQLStore) PruneExpiredMaintenanceWindows(retention time.Duration) (int6
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
func (s *SQLStore) IsMonitorInMaintenance(monitorID int) (bool, error) {
|
||||
func (s *SQLStore) IsMonitorInMaintenance(ctx context.Context, monitorID int) (bool, error) {
|
||||
var count int
|
||||
err := s.db.QueryRow(s.q(`SELECT COUNT(*) FROM maintenance_windows
|
||||
err := s.db.QueryRowContext(ctx, s.q(`SELECT COUNT(*) FROM maintenance_windows
|
||||
WHERE type = 'maintenance'
|
||||
AND start_time <= CURRENT_TIMESTAMP
|
||||
AND (end_time IS NULL OR end_time > CURRENT_TIMESTAMP)
|
||||
@@ -671,46 +672,46 @@ func (s *SQLStore) IsMonitorInMaintenance(monitorID int) (bool, error) {
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) GetPreference(key string) (string, error) {
|
||||
func (s *SQLStore) GetPreference(ctx context.Context, key string) (string, error) {
|
||||
var value string
|
||||
err := s.db.QueryRow(s.q("SELECT value FROM preferences WHERE key = ?"), key).Scan(&value)
|
||||
err := s.db.QueryRowContext(ctx, s.q("SELECT value FROM preferences WHERE key = ?"), key).Scan(&value)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) SetPreference(key, value string) error {
|
||||
func (s *SQLStore) SetPreference(ctx context.Context, key, value string) error {
|
||||
if s.dollar {
|
||||
_, err := s.db.Exec(s.q("INSERT INTO preferences (key, value) VALUES (?, ?) ON CONFLICT (key) DO UPDATE SET value = ?"), key, value, value)
|
||||
_, err := s.db.ExecContext(ctx, s.q("INSERT INTO preferences (key, value) VALUES (?, ?) ON CONFLICT (key) DO UPDATE SET value = ?"), key, value, value)
|
||||
return err
|
||||
}
|
||||
_, err := s.db.Exec("INSERT OR REPLACE INTO preferences (key, value) VALUES (?, ?)", key, value)
|
||||
_, err := s.db.ExecContext(ctx, "INSERT OR REPLACE INTO preferences (key, value) VALUES (?, ?)", key, value)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLStore) ExportData() (models.Backup, error) {
|
||||
sites, err := s.GetSites()
|
||||
func (s *SQLStore) ExportData(ctx context.Context) (models.Backup, error) {
|
||||
sites, err := s.GetSites(ctx)
|
||||
if err != nil {
|
||||
return models.Backup{}, err
|
||||
}
|
||||
alerts, err := s.GetAllAlerts()
|
||||
alerts, err := s.GetAllAlerts(ctx)
|
||||
if err != nil {
|
||||
return models.Backup{}, err
|
||||
}
|
||||
users, err := s.GetAllUsers()
|
||||
users, err := s.GetAllUsers(ctx)
|
||||
if err != nil {
|
||||
return models.Backup{}, err
|
||||
}
|
||||
windows, err := s.GetAllMaintenanceWindows(maxMaintenanceExport)
|
||||
windows, err := s.GetAllMaintenanceWindows(ctx, maxMaintenanceExport)
|
||||
if err != nil {
|
||||
return models.Backup{}, err
|
||||
}
|
||||
return models.Backup{Sites: sites, Alerts: alerts, Users: users, MaintenanceWindows: windows}, nil
|
||||
}
|
||||
|
||||
func (s *SQLStore) ImportData(data models.Backup) error {
|
||||
tx, err := s.db.Begin()
|
||||
func (s *SQLStore) ImportData(ctx context.Context, data models.Backup) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -719,7 +720,7 @@ func (s *SQLStore) ImportData(data models.Backup) error {
|
||||
s.dialect.ImportWipe(tx)
|
||||
|
||||
for _, u := range data.Users {
|
||||
if _, err := tx.Exec(s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), u.Username, u.PublicKey, u.Role); err != nil {
|
||||
if _, err := tx.ExecContext(ctx, s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), u.Username, u.PublicKey, u.Role); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -730,12 +731,12 @@ func (s *SQLStore) ImportData(data models.Backup) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(s.q("INSERT INTO alerts (id, name, type, settings) VALUES (?, ?, ?, ?)"), a.ID, a.Name, a.Type, settingsStr); err != nil {
|
||||
if _, err := tx.ExecContext(ctx, s.q("INSERT INTO alerts (id, name, type, settings) VALUES (?, ?, ?, ?)"), a.ID, a.Name, a.Type, settingsStr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, st := range data.Sites {
|
||||
if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
if _, err := tx.ExecContext(ctx, s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
|
||||
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused, st.Regions); err != nil {
|
||||
return err
|
||||
@@ -743,7 +744,7 @@ func (s *SQLStore) ImportData(data models.Backup) error {
|
||||
}
|
||||
|
||||
for _, mw := range data.MaintenanceWindows {
|
||||
if _, err := tx.Exec(s.q("INSERT INTO maintenance_windows (id, monitor_id, title, description, type, start_time, end_time, created_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
if _, err := tx.ExecContext(ctx, s.q("INSERT INTO maintenance_windows (id, monitor_id, title, description, type, start_time, end_time, created_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"),
|
||||
mw.ID, mw.MonitorID, mw.Title, mw.Description, mw.Type, mw.StartTime, sql.NullTime{Time: mw.EndTime, Valid: !mw.EndTime.IsZero()}, mw.CreatedBy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user