refactor(store): unify SQLite and Postgres into dialect-based SQLStore

Extract shared SQLStore with Dialect interface for the ~5% that
differs between backends (DDL, placeholders, sequence resets).

- New dialect.go: Dialect interface + placeholder rewriter (? → $N)
- New sqlstore.go: single implementation of all 19 Store methods
- sqlite.go: reduced from 286 to 83 lines (SQLiteDialect only)
- postgres.go: reduced from 266 to 78 lines (PostgresDialect only)
- main.go: use NewSQLiteStore/NewPostgresStore constructors

Zero CRUD logic duplication. Every future schema change written once.
This commit is contained in:
2026-05-15 00:31:44 -04:00
parent 4d5116644f
commit ab75f61c6b
5 changed files with 368 additions and 474 deletions
+8 -3
View File
@@ -80,16 +80,21 @@ func main() {
flag.Parse() flag.Parse()
var s store.Store var s store.Store
var dbErr error
if *flagDBType == "postgres" { if *flagDBType == "postgres" {
s = &store.PostgresStore{ConnStr: *flagDSN} s, dbErr = store.NewPostgresStore(*flagDSN)
fmt.Printf("Using PostgreSQL: %s\n", *flagDSN) fmt.Printf("Using PostgreSQL: %s\n", *flagDSN)
} else { } else {
s = &store.SQLiteStore{DBPath: *flagDSN} s, dbErr = store.NewSQLiteStore(*flagDSN)
fmt.Printf("Using SQLite: %s\n", *flagDSN) fmt.Printf("Using SQLite: %s\n", *flagDSN)
} }
if dbErr != nil {
fmt.Printf("Database connection error: %v\n", dbErr)
os.Exit(1)
}
if err := s.Init(); err != nil { if err := s.Init(); err != nil {
fmt.Printf("Database Init Error: %v\n", err) fmt.Printf("Database init error: %v\n", err)
os.Exit(1) os.Exit(1)
} }
store.SetGlobal(s) store.SetGlobal(s)
+36
View File
@@ -0,0 +1,36 @@
package store
import "database/sql"
type Dialect interface {
DriverName() string
CreateTablesSQL() []string
MigrationsSQL() []string
BoolFalse() string
ResetSequenceOnEmpty(db *sql.DB, table string)
ImportWipe(tx *sql.Tx)
ImportResetSequences(tx *sql.Tx)
}
// rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres.
// For SQLite (or any dialect not needing rewrite), returns the input unchanged.
func rewritePlaceholders(query string, dollarStyle bool) string {
if !dollarStyle {
return query
}
buf := make([]byte, 0, len(query)+32)
n := 0
for i := 0; i < len(query); i++ {
if query[i] == '?' {
n++
buf = append(buf, '$')
if n >= 10 {
buf = append(buf, byte('0'+n/10))
}
buf = append(buf, byte('0'+n%10))
} else {
buf = append(buf, query[i])
}
}
return string(buf)
}
+37 -225
View File
@@ -2,77 +2,53 @@ package store
import ( import (
"database/sql" "database/sql"
"encoding/json"
"go-upkeep/internal/models"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
type PostgresStore struct { type PostgresDialect struct{}
ConnStr string
db *sql.DB func NewPostgresStore(connStr string) (*SQLStore, error) {
return NewSQLStore("postgres", connStr, &PostgresDialect{})
} }
func (p *PostgresStore) Init() error { func (d *PostgresDialect) DriverName() string { return "postgres" }
var err error func (d *PostgresDialect) BoolFalse() string { return "FALSE" }
p.db, err = sql.Open("postgres", p.ConnStr)
if err != nil {
return err
}
queries := []string{ func (d *PostgresDialect) CreateTablesSQL() []string {
return []string{
`CREATE TABLE IF NOT EXISTS alerts ( `CREATE TABLE IF NOT EXISTS alerts (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
name TEXT, name TEXT, type TEXT, settings TEXT
type TEXT, )`,
settings TEXT
);`,
`CREATE TABLE IF NOT EXISTS sites ( `CREATE TABLE IF NOT EXISTS sites (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
name TEXT DEFAULT 'New Monitor', name TEXT DEFAULT 'New Monitor', url TEXT, type TEXT DEFAULT 'http',
url TEXT, token TEXT, interval INTEGER, alert_id INTEGER,
type TEXT DEFAULT 'http', check_ssl BOOLEAN DEFAULT FALSE, threshold INTEGER DEFAULT 7,
token TEXT, max_retries INTEGER DEFAULT 0, hostname TEXT DEFAULT '',
interval INTEGER, port INTEGER DEFAULT 0, timeout INTEGER DEFAULT 0,
alert_id INTEGER, method TEXT DEFAULT 'GET', description TEXT DEFAULT '',
check_ssl BOOLEAN DEFAULT FALSE, parent_id INTEGER DEFAULT 0, accepted_codes TEXT DEFAULT '200-299',
threshold INTEGER DEFAULT 7, dns_resolve_type TEXT DEFAULT '', dns_server TEXT DEFAULT '',
max_retries INTEGER DEFAULT 0, ignore_tls BOOLEAN DEFAULT FALSE, paused BOOLEAN DEFAULT FALSE
hostname TEXT DEFAULT '', )`,
port INTEGER DEFAULT 0,
timeout INTEGER DEFAULT 0,
method TEXT DEFAULT 'GET',
description TEXT DEFAULT '',
parent_id INTEGER DEFAULT 0,
accepted_codes TEXT DEFAULT '200-299',
dns_resolve_type TEXT DEFAULT '',
dns_server TEXT DEFAULT '',
ignore_tls BOOLEAN DEFAULT FALSE,
paused BOOLEAN DEFAULT FALSE
);`,
`CREATE TABLE IF NOT EXISTS users ( `CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
username TEXT NOT NULL, username TEXT NOT NULL, public_key TEXT NOT NULL,
public_key TEXT NOT NULL,
role TEXT DEFAULT 'user' role TEXT DEFAULT 'user'
);`, )`,
`CREATE TABLE IF NOT EXISTS check_history ( `CREATE TABLE IF NOT EXISTS check_history (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
site_id INTEGER NOT NULL, site_id INTEGER NOT NULL, latency_ns BIGINT,
latency_ns BIGINT, is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW()
is_up BOOLEAN, )`,
checked_at TIMESTAMP DEFAULT NOW() `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
);`,
}
for _, q := range queries {
if _, err := p.db.Exec(q); err != nil {
return err
}
} }
}
p.db.Exec("CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)") func (d *PostgresDialect) MigrationsSQL() []string {
return []string{
migrations := []string{
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS hostname TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS hostname TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS port INTEGER DEFAULT 0", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS port INTEGER DEFAULT 0",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS timeout INTEGER DEFAULT 0", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS timeout INTEGER DEFAULT 0",
@@ -85,182 +61,18 @@ func (p *PostgresStore) Init() error {
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE",
} }
for _, m := range migrations {
p.db.Exec(m)
}
return nil
} }
// ... [CRUD Methods are identical to Phase 4, keeping them concise here] ... func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {}
func (p *PostgresStore) GetSites() []models.Site {
rows, err := p.db.Query("SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, FALSE), COALESCE(paused, FALSE) FROM sites")
if err != nil {
return []models.Site{}
}
defer rows.Close()
var sites []models.Site
for rows.Next() {
var s models.Site
rows.Scan(&s.ID, &s.Name, &s.URL, &s.Type, &s.Token, &s.Interval, &s.AlertID, &s.CheckSSL, &s.ExpiryThreshold, &s.MaxRetries,
&s.Hostname, &s.Port, &s.Timeout, &s.Method, &s.Description, &s.ParentID, &s.AcceptedCodes, &s.DNSResolveType, &s.DNSServer, &s.IgnoreTLS, &s.Paused)
sites = append(sites, s)
}
return sites
}
func (p *PostgresStore) AddSite(site models.Site) {
token := ""
if site.Type == "push" {
token = generateToken()
}
p.db.Exec("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)",
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused)
}
func (p *PostgresStore) UpdateSite(site models.Site) {
var existingToken string
p.db.QueryRow("SELECT token FROM sites WHERE id=$1", site.ID).Scan(&existingToken)
if site.Type == "push" && existingToken == "" {
existingToken = generateToken()
}
p.db.Exec("UPDATE sites SET name=$1, url=$2, type=$3, token=$4, interval=$5, alert_id=$6, check_ssl=$7, threshold=$8, max_retries=$9, hostname=$10, port=$11, timeout=$12, method=$13, description=$14, parent_id=$15, accepted_codes=$16, dns_resolve_type=$17, dns_server=$18, ignore_tls=$19, paused=$20 WHERE id=$21",
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID)
}
func (p *PostgresStore) UpdateSitePaused(id int, paused bool) {
p.db.Exec("UPDATE sites SET paused=$1 WHERE id=$2", paused, id)
}
func (p *PostgresStore) DeleteSite(id int) { p.db.Exec("DELETE FROM sites WHERE id=$1", id) }
func (p *PostgresStore) GetAllAlerts() []models.AlertConfig {
rows, err := p.db.Query("SELECT id, name, type, settings FROM alerts")
if err != nil {
return []models.AlertConfig{}
}
defer rows.Close()
var alerts []models.AlertConfig
for rows.Next() {
var a models.AlertConfig
var settingsJSON string
rows.Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
json.Unmarshal([]byte(settingsJSON), &a.Settings)
alerts = append(alerts, a)
}
return alerts
}
func (p *PostgresStore) GetAlert(id int) (models.AlertConfig, bool) {
var a models.AlertConfig
var settingsJSON string
err := p.db.QueryRow("SELECT id, name, type, settings FROM alerts WHERE id = $1", id).Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
if err != nil {
return a, false
}
json.Unmarshal([]byte(settingsJSON), &a.Settings)
return a, true
}
func (p *PostgresStore) AddAlert(name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
p.db.Exec("INSERT INTO alerts (name, type, settings) VALUES ($1, $2, $3)", name, aType, string(jsonBytes))
}
func (p *PostgresStore) UpdateAlert(id int, name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
p.db.Exec("UPDATE alerts SET name=$1, type=$2, settings=$3 WHERE id=$4", name, aType, string(jsonBytes), id)
}
func (p *PostgresStore) DeleteAlert(id int) { p.db.Exec("DELETE FROM alerts WHERE id=$1", id) }
func (p *PostgresStore) GetAllUsers() []models.User {
rows, err := p.db.Query("SELECT id, username, public_key, role FROM users")
if err != nil {
return []models.User{}
}
defer rows.Close()
var users []models.User
for rows.Next() {
var u models.User
rows.Scan(&u.ID, &u.Username, &u.PublicKey, &u.Role)
users = append(users, u)
}
return users
}
func (p *PostgresStore) AddUser(username, publicKey, role string) error {
_, err := p.db.Exec("INSERT INTO users (username, public_key, role) VALUES ($1, $2, $3)", username, publicKey, role)
return err
}
func (p *PostgresStore) UpdateUser(id int, username, publicKey, role string) error {
_, err := p.db.Exec("UPDATE users SET username=$1, public_key=$2, role=$3 WHERE id=$4", username, publicKey, role, id)
return err
}
func (p *PostgresStore) DeleteUser(id int) error {
_, err := p.db.Exec("DELETE FROM users WHERE id=$1", id)
return err
}
func (p *PostgresStore) SaveCheck(siteID int, latencyNs int64, isUp bool) {
p.db.Exec("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES ($1, $2, $3)", siteID, latencyNs, isUp)
p.db.Exec(`DELETE FROM check_history WHERE site_id = $1 AND id NOT IN (
SELECT id FROM check_history WHERE site_id = $1 ORDER BY checked_at DESC LIMIT 1000
)`, siteID)
}
func (p *PostgresStore) LoadAllHistory(limit int) map[int][]models.CheckRecord {
result := make(map[int][]models.CheckRecord)
rows, err := p.db.Query(`
SELECT site_id, latency_ns, is_up FROM (
SELECT site_id, latency_ns, is_up,
ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC) AS rn
FROM check_history
) sub WHERE rn <= $1`, limit)
if err != nil {
return result
}
defer rows.Close()
for rows.Next() {
var r models.CheckRecord
rows.Scan(&r.SiteID, &r.LatencyNs, &r.IsUp)
result[r.SiteID] = append(result[r.SiteID], r)
}
for id, records := range result {
for i, j := 0, len(records)-1; i < j; i, j = i+1, j-1 {
records[i], records[j] = records[j], records[i]
}
result[id] = records
}
return result
}
func (p *PostgresStore) ExportData() models.Backup {
return models.Backup{
Sites: p.GetSites(),
Alerts: p.GetAllAlerts(),
Users: p.GetAllUsers(),
}
}
func (p *PostgresStore) ImportData(data models.Backup) error {
tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
func (d *PostgresDialect) ImportWipe(tx *sql.Tx) {
tx.Exec("TRUNCATE TABLE sites RESTART IDENTITY CASCADE") tx.Exec("TRUNCATE TABLE sites RESTART IDENTITY CASCADE")
tx.Exec("TRUNCATE TABLE alerts RESTART IDENTITY CASCADE") tx.Exec("TRUNCATE TABLE alerts RESTART IDENTITY CASCADE")
tx.Exec("TRUNCATE TABLE users RESTART IDENTITY CASCADE") tx.Exec("TRUNCATE TABLE users RESTART IDENTITY CASCADE")
}
for _, u := range data.Users {
tx.Exec("INSERT INTO users (username, public_key, role) VALUES ($1, $2, $3)", u.Username, u.PublicKey, u.Role) func (d *PostgresDialect) ImportResetSequences(tx *sql.Tx) {
} tx.Exec("SELECT setval('sites_id_seq', (SELECT COALESCE(MAX(id), 1) FROM sites))")
for _, a := range data.Alerts { tx.Exec("SELECT setval('alerts_id_seq', (SELECT COALESCE(MAX(id), 1) FROM alerts))")
jsonBytes, _ := json.Marshal(a.Settings) tx.Exec("SELECT setval('users_id_seq', (SELECT COALESCE(MAX(id), 1) FROM users))")
tx.Exec("INSERT INTO alerts (id, name, type, settings) VALUES ($1, $2, $3, $4)", a.ID, a.Name, a.Type, string(jsonBytes))
}
for _, st := range data.Sites {
tx.Exec("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)",
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused)
}
tx.Exec("SELECT setval('sites_id_seq', (SELECT MAX(id) FROM sites))")
tx.Exec("SELECT setval('alerts_id_seq', (SELECT MAX(id) FROM alerts))")
tx.Exec("SELECT setval('users_id_seq', (SELECT MAX(id) FROM users))")
return tx.Commit()
} }
+44 -246
View File
@@ -1,77 +1,54 @@
package store package store
import ( import (
"crypto/rand"
"database/sql" "database/sql"
"encoding/hex"
"encoding/json"
"go-upkeep/internal/models"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
type SQLiteStore struct { type SQLiteDialect struct{}
DBPath string
db *sql.DB func NewSQLiteStore(path string) (*SQLStore, error) {
return NewSQLStore("sqlite3", path, &SQLiteDialect{})
} }
func (s *SQLiteStore) Init() error { func (d *SQLiteDialect) DriverName() string { return "sqlite3" }
var err error func (d *SQLiteDialect) BoolFalse() string { return "0" }
s.db, err = sql.Open("sqlite3", s.DBPath)
if err != nil {
return err
}
createTables := ` func (d *SQLiteDialect) CreateTablesSQL() []string {
CREATE TABLE IF NOT EXISTS alerts ( return []string{
id INTEGER PRIMARY KEY AUTOINCREMENT, `CREATE TABLE IF NOT EXISTS alerts (
name TEXT, id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT, name TEXT, type TEXT, settings TEXT
settings TEXT )`,
); `CREATE TABLE IF NOT EXISTS sites (
CREATE TABLE IF NOT EXISTS sites ( id INTEGER PRIMARY KEY AUTOINCREMENT,
id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT DEFAULT 'New Monitor', url TEXT, type TEXT DEFAULT 'http',
name TEXT DEFAULT 'New Monitor', token TEXT, interval INTEGER, alert_id INTEGER,
url TEXT, check_ssl BOOLEAN DEFAULT 0, threshold INTEGER DEFAULT 7,
type TEXT DEFAULT 'http', max_retries INTEGER DEFAULT 0, hostname TEXT DEFAULT '',
token TEXT, port INTEGER DEFAULT 0, timeout INTEGER DEFAULT 0,
interval INTEGER, method TEXT DEFAULT 'GET', description TEXT DEFAULT '',
alert_id INTEGER, parent_id INTEGER DEFAULT 0, accepted_codes TEXT DEFAULT '200-299',
check_ssl BOOLEAN DEFAULT 0, dns_resolve_type TEXT DEFAULT '', dns_server TEXT DEFAULT '',
threshold INTEGER DEFAULT 7, ignore_tls BOOLEAN DEFAULT 0, paused BOOLEAN DEFAULT 0
max_retries INTEGER DEFAULT 0, )`,
hostname TEXT DEFAULT '', `CREATE TABLE IF NOT EXISTS users (
port INTEGER DEFAULT 0, id INTEGER PRIMARY KEY AUTOINCREMENT,
timeout INTEGER DEFAULT 0, username TEXT NOT NULL, public_key TEXT NOT NULL,
method TEXT DEFAULT 'GET', role TEXT DEFAULT 'user'
description TEXT DEFAULT '', )`,
parent_id INTEGER DEFAULT 0, `CREATE TABLE IF NOT EXISTS check_history (
accepted_codes TEXT DEFAULT '200-299', id INTEGER PRIMARY KEY AUTOINCREMENT,
dns_resolve_type TEXT DEFAULT '', site_id INTEGER NOT NULL, latency_ns INTEGER,
dns_server TEXT DEFAULT '', is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP
ignore_tls BOOLEAN DEFAULT 0, )`,
paused BOOLEAN DEFAULT 0 `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
public_key TEXT NOT NULL,
role TEXT DEFAULT 'user'
);
CREATE TABLE IF NOT EXISTS check_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL,
latency_ns INTEGER,
is_up BOOLEAN,
checked_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC);`
_, err = s.db.Exec(createTables)
if err != nil {
return err
} }
}
migrations := []string{ func (d *SQLiteDialect) MigrationsSQL() []string {
return []string{
"ALTER TABLE sites ADD COLUMN hostname TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN hostname TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN port INTEGER DEFAULT 0", "ALTER TABLE sites ADD COLUMN port INTEGER DEFAULT 0",
"ALTER TABLE sites ADD COLUMN timeout INTEGER DEFAULT 0", "ALTER TABLE sites ADD COLUMN timeout INTEGER DEFAULT 0",
@@ -84,202 +61,23 @@ func (s *SQLiteStore) Init() error {
"ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0",
"ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0",
} }
for _, m := range migrations {
s.db.Exec(m)
}
return nil
} }
func generateToken() string { func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
panic("crypto/rand failed: " + err.Error())
}
return hex.EncodeToString(b)
}
func (s *SQLiteStore) GetSites() []models.Site {
rows, err := s.db.Query("SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, 0), COALESCE(paused, 0) FROM sites")
if err != nil {
return []models.Site{}
}
defer rows.Close()
var sites []models.Site
for rows.Next() {
var st models.Site
rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID, &st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout, &st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType, &st.DNSServer, &st.IgnoreTLS, &st.Paused)
sites = append(sites, st)
}
return sites
}
func (s *SQLiteStore) AddSite(site models.Site) {
token := ""
if site.Type == "push" {
token = generateToken()
}
s.db.Exec("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused)
}
func (s *SQLiteStore) UpdateSite(site models.Site) {
var existingToken string
s.db.QueryRow("SELECT token FROM sites WHERE id=?", site.ID).Scan(&existingToken)
if site.Type == "push" && existingToken == "" {
existingToken = generateToken()
}
s.db.Exec("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=? WHERE id=?",
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID)
}
func (s *SQLiteStore) UpdateSitePaused(id int, paused bool) {
s.db.Exec("UPDATE sites SET paused=? WHERE id=?", paused, id)
}
func (s *SQLiteStore) DeleteSite(id int) {
s.db.Exec("DELETE FROM sites WHERE id=?", id)
var count int var count int
s.db.QueryRow("SELECT COUNT(*) FROM sites").Scan(&count) db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
if count == 0 { if count == 0 {
s.db.Exec("DELETE FROM sqlite_sequence WHERE name='sites'") db.Exec("DELETE FROM sqlite_sequence WHERE name=?", table)
}
}
func (s *SQLiteStore) GetAllAlerts() []models.AlertConfig {
rows, err := s.db.Query("SELECT id, name, type, settings FROM alerts")
if err != nil {
return []models.AlertConfig{}
}
defer rows.Close()
var alerts []models.AlertConfig
for rows.Next() {
var a models.AlertConfig
var settingsJSON string
rows.Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
json.Unmarshal([]byte(settingsJSON), &a.Settings)
alerts = append(alerts, a)
}
return alerts
}
func (s *SQLiteStore) GetAlert(id int) (models.AlertConfig, bool) {
var a models.AlertConfig
var settingsJSON string
err := s.db.QueryRow("SELECT id, name, type, settings FROM alerts WHERE id = ?", id).Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
if err != nil {
return a, false
}
json.Unmarshal([]byte(settingsJSON), &a.Settings)
return a, true
}
func (s *SQLiteStore) AddAlert(name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
s.db.Exec("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)", name, aType, string(jsonBytes))
}
func (s *SQLiteStore) UpdateAlert(id int, name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
s.db.Exec("UPDATE alerts SET name=?, type=?, settings=? WHERE id=?", name, aType, string(jsonBytes), id)
}
func (s *SQLiteStore) DeleteAlert(id int) {
s.db.Exec("DELETE FROM alerts WHERE id=?", id)
var count int
s.db.QueryRow("SELECT COUNT(*) FROM alerts").Scan(&count)
if count == 0 {
s.db.Exec("DELETE FROM sqlite_sequence WHERE name='alerts'")
}
}
func (s *SQLiteStore) GetAllUsers() []models.User {
rows, err := s.db.Query("SELECT id, username, public_key, role FROM users")
if err != nil {
return []models.User{}
}
defer rows.Close()
var users []models.User
for rows.Next() {
var u models.User
rows.Scan(&u.ID, &u.Username, &u.PublicKey, &u.Role)
users = append(users, u)
}
return users
}
func (s *SQLiteStore) AddUser(username, publicKey, role string) error {
_, err := s.db.Exec("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)", username, publicKey, role)
return err
}
func (s *SQLiteStore) UpdateUser(id int, username, publicKey, role string) error {
_, err := s.db.Exec("UPDATE users SET username=?, public_key=?, role=? WHERE id=?", username, publicKey, role, id)
return err
}
func (s *SQLiteStore) DeleteUser(id int) error {
_, err := s.db.Exec("DELETE FROM users WHERE id=?", id)
return err
}
func (s *SQLiteStore) SaveCheck(siteID int, latencyNs int64, isUp bool) {
s.db.Exec("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)", siteID, latencyNs, isUp)
s.db.Exec(`DELETE FROM check_history WHERE site_id = ? AND id NOT IN (
SELECT id FROM check_history WHERE site_id = ? ORDER BY checked_at DESC LIMIT 1000
)`, siteID, siteID)
}
func (s *SQLiteStore) LoadAllHistory(limit int) map[int][]models.CheckRecord {
result := make(map[int][]models.CheckRecord)
rows, err := s.db.Query(`
SELECT site_id, latency_ns, is_up FROM (
SELECT site_id, latency_ns, is_up,
ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC) AS rn
FROM check_history
) WHERE rn <= ?`, limit)
if err != nil {
return result
}
defer rows.Close()
for rows.Next() {
var r models.CheckRecord
rows.Scan(&r.SiteID, &r.LatencyNs, &r.IsUp)
result[r.SiteID] = append(result[r.SiteID], r)
}
for id, records := range result {
for i, j := 0, len(records)-1; i < j; i, j = i+1, j-1 {
records[i], records[j] = records[j], records[i]
}
result[id] = records
}
return result
}
func (s *SQLiteStore) ExportData() models.Backup {
return models.Backup{
Sites: s.GetSites(),
Alerts: s.GetAllAlerts(),
Users: s.GetAllUsers(),
} }
} }
func (s *SQLiteStore) ImportData(data models.Backup) error { func (d *SQLiteDialect) ImportWipe(tx *sql.Tx) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
tx.Exec("DELETE FROM sites") tx.Exec("DELETE FROM sites")
tx.Exec("DELETE FROM sqlite_sequence WHERE name='sites'") tx.Exec("DELETE FROM sqlite_sequence WHERE name='sites'")
tx.Exec("DELETE FROM alerts") tx.Exec("DELETE FROM alerts")
tx.Exec("DELETE FROM sqlite_sequence WHERE name='alerts'") tx.Exec("DELETE FROM sqlite_sequence WHERE name='alerts'")
tx.Exec("DELETE FROM users") tx.Exec("DELETE FROM users")
tx.Exec("DELETE FROM sqlite_sequence WHERE name='users'") tx.Exec("DELETE FROM sqlite_sequence WHERE name='users'")
// Insert New
for _, u := range data.Users {
tx.Exec("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)", u.Username, u.PublicKey, u.Role)
}
for _, a := range data.Alerts {
jsonBytes, _ := json.Marshal(a.Settings)
tx.Exec("INSERT INTO alerts (id, name, type, settings) VALUES (?, ?, ?, ?)", a.ID, a.Name, a.Type, string(jsonBytes))
}
for _, st := range data.Sites {
tx.Exec("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused)
}
return tx.Commit()
} }
func (d *SQLiteDialect) ImportResetSequences(tx *sql.Tx) {}
+243
View File
@@ -0,0 +1,243 @@
package store
import (
"crypto/rand"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"go-upkeep/internal/models"
)
type SQLStore struct {
db *sql.DB
dialect Dialect
dollar bool
}
func NewSQLStore(driverName, dsn string, dialect Dialect) (*SQLStore, error) {
db, err := sql.Open(driverName, dsn)
if err != nil {
return nil, err
}
_, isDollar := dialect.(*PostgresDialect)
return &SQLStore{db: db, dialect: dialect, dollar: isDollar}, nil
}
func (s *SQLStore) q(query string) string {
return rewritePlaceholders(query, s.dollar)
}
func generateToken() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
panic("crypto/rand failed: " + err.Error())
}
return hex.EncodeToString(b)
}
func (s *SQLStore) Init() error {
for _, stmt := range s.dialect.CreateTablesSQL() {
if _, err := s.db.Exec(stmt); err != nil {
return err
}
}
for _, m := range s.dialect.MigrationsSQL() {
s.db.Exec(m)
}
return nil
}
func (s *SQLStore) GetSites() []models.Site {
bf := s.dialect.BoolFalse()
query := fmt.Sprintf(
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites",
bf, bf,
)
rows, err := s.db.Query(query)
if err != nil {
return []models.Site{}
}
defer rows.Close()
var sites []models.Site
for rows.Next() {
var st models.Site
rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
&st.DNSServer, &st.IgnoreTLS, &st.Paused)
sites = append(sites, st)
}
return sites
}
func (s *SQLStore) AddSite(site models.Site) {
token := ""
if site.Type == "push" {
token = generateToken()
}
s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused)
}
func (s *SQLStore) UpdateSite(site models.Site) {
var existingToken string
s.db.QueryRow(s.q("SELECT token FROM sites WHERE id=?"), site.ID).Scan(&existingToken)
if site.Type == "push" && existingToken == "" {
existingToken = generateToken()
}
s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=? WHERE id=?"),
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID)
}
func (s *SQLStore) UpdateSitePaused(id int, paused bool) {
s.db.Exec(s.q("UPDATE sites SET paused=? WHERE id=?"), paused, id)
}
func (s *SQLStore) DeleteSite(id int) {
s.db.Exec(s.q("DELETE FROM sites WHERE id=?"), id)
s.dialect.ResetSequenceOnEmpty(s.db, "sites")
}
func (s *SQLStore) GetAllAlerts() []models.AlertConfig {
rows, err := s.db.Query("SELECT id, name, type, settings FROM alerts")
if err != nil {
return []models.AlertConfig{}
}
defer rows.Close()
var alerts []models.AlertConfig
for rows.Next() {
var a models.AlertConfig
var settingsJSON string
rows.Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
json.Unmarshal([]byte(settingsJSON), &a.Settings)
alerts = append(alerts, a)
}
return alerts
}
func (s *SQLStore) GetAlert(id int) (models.AlertConfig, bool) {
var a models.AlertConfig
var settingsJSON string
err := s.db.QueryRow(s.q("SELECT id, name, type, settings FROM alerts WHERE id = ?"), id).Scan(&a.ID, &a.Name, &a.Type, &settingsJSON)
if err != nil {
return a, false
}
json.Unmarshal([]byte(settingsJSON), &a.Settings)
return a, true
}
func (s *SQLStore) AddAlert(name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
s.db.Exec(s.q("INSERT INTO alerts (name, type, settings) VALUES (?, ?, ?)"), name, aType, string(jsonBytes))
}
func (s *SQLStore) UpdateAlert(id int, name, aType string, settings map[string]string) {
jsonBytes, _ := json.Marshal(settings)
s.db.Exec(s.q("UPDATE alerts SET name=?, type=?, settings=? WHERE id=?"), name, aType, string(jsonBytes), id)
}
func (s *SQLStore) DeleteAlert(id int) {
s.db.Exec(s.q("DELETE FROM alerts WHERE id=?"), id)
s.dialect.ResetSequenceOnEmpty(s.db, "alerts")
}
func (s *SQLStore) GetAllUsers() []models.User {
rows, err := s.db.Query("SELECT id, username, public_key, role FROM users")
if err != nil {
return []models.User{}
}
defer rows.Close()
var users []models.User
for rows.Next() {
var u models.User
rows.Scan(&u.ID, &u.Username, &u.PublicKey, &u.Role)
users = append(users, u)
}
return users
}
func (s *SQLStore) AddUser(username, publicKey, role string) error {
_, err := s.db.Exec(s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), username, publicKey, role)
return err
}
func (s *SQLStore) UpdateUser(id int, username, publicKey, role string) error {
_, err := s.db.Exec(s.q("UPDATE users SET username=?, public_key=?, role=? WHERE id=?"), username, publicKey, role, id)
return err
}
func (s *SQLStore) DeleteUser(id int) error {
_, err := s.db.Exec(s.q("DELETE FROM users WHERE id=?"), id)
return err
}
func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) {
s.db.Exec(s.q("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)"), siteID, latencyNs, isUp)
s.db.Exec(s.q(`DELETE FROM check_history WHERE site_id = ? AND id NOT IN (
SELECT id FROM check_history WHERE site_id = ? ORDER BY checked_at DESC LIMIT 1000
)`), siteID, siteID)
}
func (s *SQLStore) LoadAllHistory(limit int) map[int][]models.CheckRecord {
result := make(map[int][]models.CheckRecord)
rows, err := s.db.Query(s.q(`
SELECT site_id, latency_ns, is_up FROM (
SELECT site_id, latency_ns, is_up,
ROW_NUMBER() OVER (PARTITION BY site_id ORDER BY checked_at DESC) AS rn
FROM check_history
) sub WHERE rn <= ?`), limit)
if err != nil {
return result
}
defer rows.Close()
for rows.Next() {
var r models.CheckRecord
rows.Scan(&r.SiteID, &r.LatencyNs, &r.IsUp)
result[r.SiteID] = append(result[r.SiteID], r)
}
for id, records := range result {
for i, j := 0, len(records)-1; i < j; i, j = i+1, j-1 {
records[i], records[j] = records[j], records[i]
}
result[id] = records
}
return result
}
func (s *SQLStore) ExportData() models.Backup {
return models.Backup{
Sites: s.GetSites(),
Alerts: s.GetAllAlerts(),
Users: s.GetAllUsers(),
}
}
func (s *SQLStore) ImportData(data models.Backup) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
s.dialect.ImportWipe(tx)
for _, u := range data.Users {
tx.Exec(s.q("INSERT INTO users (username, public_key, role) VALUES (?, ?, ?)"), u.Username, u.PublicKey, u.Role)
}
for _, a := range data.Alerts {
jsonBytes, _ := json.Marshal(a.Settings)
tx.Exec(s.q("INSERT INTO alerts (id, name, type, settings) VALUES (?, ?, ?, ?)"), a.ID, a.Name, a.Type, string(jsonBytes))
}
for _, st := range data.Sites {
tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused)
}
s.dialect.ImportResetSequences(tx)
return tx.Commit()
}