From ca9faa0acd607f9cff4ed43065fb062c1965fce0 Mon Sep 17 00:00:00 2001 From: Tyler Koenig Date: Sat, 16 May 2026 11:05:06 -0400 Subject: [PATCH] =?UTF-8?q?feat(cluster):=20add=20distributed=20probing=20?= =?UTF-8?q?foundation=20=E2=80=94=20schema,=20models,=20and=20probe=20APIs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add node-aware check history and probe registration infrastructure: - ProbeNode model and nodes table (SQLite + Postgres) - node_id column on check_history for multi-source tracking - Store interface: RegisterNode, GetNode, GetAllNodes, DeleteNode, SaveCheckFromNode - Dialect: UpsertNodeSQL (INSERT OR REPLACE / ON CONFLICT) - API endpoints: POST /api/probe/register, GET /api/probe/assignments, POST /api/probe/results - Backward compatible: existing SaveCheck wraps SaveCheckFromNode with empty node_id --- internal/metrics/prometheus_test.go | 6 ++ internal/models/models.go | 9 +++ internal/server/server.go | 91 ++++++++++++++++++++++++++++- internal/store/dialect.go | 1 + internal/store/postgres.go | 12 ++++ internal/store/sqlite.go | 12 ++++ internal/store/sqlstore.go | 45 +++++++++++++- internal/store/store.go | 8 +++ 8 files changed, 181 insertions(+), 3 deletions(-) diff --git a/internal/metrics/prometheus_test.go b/internal/metrics/prometheus_test.go index 091a5df..cd86d26 100644 --- a/internal/metrics/prometheus_test.go +++ b/internal/metrics/prometheus_test.go @@ -44,6 +44,12 @@ func (m *mockStore) AddSiteReturningID(models.Site) (int, error) { return 0, nil func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) { return 0, nil } +func (m *mockStore) SaveCheckFromNode(int, string, int64, bool) error { return nil } +func (m *mockStore) RegisterNode(models.ProbeNode) error { return nil } +func (m *mockStore) GetNode(string) (models.ProbeNode, error) { return models.ProbeNode{}, nil } +func (m *mockStore) GetAllNodes() ([]models.ProbeNode, error) { return nil, nil } +func (m *mockStore) UpdateNodeLastSeen(string) error { return nil } +func (m *mockStore) DeleteNode(string) error { return nil } func TestMetricsHandler(t *testing.T) { ms := &mockStore{ diff --git a/internal/models/models.go b/internal/models/models.go index cdf4c68..d3ce4ac 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -52,11 +52,20 @@ type User struct { type CheckRecord struct { SiteID int + NodeID string LatencyNs int64 IsUp bool CheckedAt time.Time } +type ProbeNode struct { + ID string + Name string + Region string + LastSeen time.Time + Version string +} + type Backup struct { Sites []Site `json:"sites"` Alerts []AlertConfig `json:"alerts"` diff --git a/internal/server/server.go b/internal/server/server.go index fdf7f9b..18670c2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -243,10 +243,97 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) { w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version))) }) - // 6. Prometheus Metrics + // 6. Probe Registration + mux.HandleFunc("/api/probe/register", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST required", 405) + return + } + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + var req struct { + ID string `json:"id"` + Name string `json:"name"` + Region string `json:"region"` + Version string `json:"version"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", 400) + return + } + if req.ID == "" { + http.Error(w, "id is required", 400) + return + } + if err := s.RegisterNode(models.ProbeNode{ + ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version, + }); err != nil { + log.Printf("Probe register failed: %v", err) + http.Error(w, "Registration failed", 500) + return + } + json.NewEncoder(w).Encode(map[string]bool{"ok": true}) + }) + + // 7. Probe Assignment Fetch + mux.HandleFunc("/api/probe/assignments", func(w http.ResponseWriter, r *http.Request) { + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + sites := eng.GetAllSites() + var assigned []models.Site + for _, site := range sites { + if site.Paused || site.Type == "push" || site.Type == "group" { + continue + } + assigned = append(assigned, site) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned}) + }) + + // 8. Probe Result Submission + mux.HandleFunc("/api/probe/results", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST required", 405) + return + } + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + var req struct { + NodeID string `json:"node_id"` + Results []struct { + SiteID int `json:"site_id"` + LatencyNs int64 `json:"latency_ns"` + IsUp bool `json:"is_up"` + } `json:"results"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", 400) + return + } + if req.NodeID == "" { + http.Error(w, "node_id is required", 400) + return + } + for _, result := range req.Results { + if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil { + log.Printf("Failed to save probe result: %v", err) + } + } + s.UpdateNodeLastSeen(req.NodeID) + json.NewEncoder(w).Encode(map[string]bool{"ok": true}) + }) + + // 9. Prometheus Metrics mux.HandleFunc("/metrics", metrics.Handler(eng)) - // 7. Status Page + // 10. Status Page if cfg.EnableStatus { mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) }) mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/store/dialect.go b/internal/store/dialect.go index 4e1ba04..f6e35b2 100644 --- a/internal/store/dialect.go +++ b/internal/store/dialect.go @@ -10,6 +10,7 @@ type Dialect interface { ResetSequenceOnEmpty(db *sql.DB, table string) ImportWipe(tx *sql.Tx) ImportResetSequences(tx *sql.Tx) + UpsertNodeSQL() string } // rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres. diff --git a/internal/store/postgres.go b/internal/store/postgres.go index 78fcc8d..df3c038 100644 --- a/internal/store/postgres.go +++ b/internal/store/postgres.go @@ -44,6 +44,13 @@ func (d *PostgresDialect) CreateTablesSQL() []string { is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW() )`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, + `CREATE TABLE IF NOT EXISTS nodes ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + region TEXT DEFAULT '', + last_seen TIMESTAMP DEFAULT NOW(), + version TEXT DEFAULT '' + )`, } } @@ -60,9 +67,14 @@ func (d *PostgresDialect) MigrationsSQL() []string { "ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE", + "ALTER TABLE check_history ADD COLUMN IF NOT EXISTS node_id TEXT DEFAULT ''", } } +func (d *PostgresDialect) UpsertNodeSQL() string { + return "INSERT INTO nodes (id, name, region, last_seen, version) VALUES ($1, $2, $3, NOW(), $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, region = EXCLUDED.region, last_seen = NOW(), version = EXCLUDED.version" +} + func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {} func (d *PostgresDialect) ImportWipe(tx *sql.Tx) { diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index dbeb74d..ab9686a 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -44,6 +44,13 @@ func (d *SQLiteDialect) CreateTablesSQL() []string { is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP )`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, + `CREATE TABLE IF NOT EXISTS nodes ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + region TEXT DEFAULT '', + last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, + version TEXT DEFAULT '' + )`, } } @@ -60,9 +67,14 @@ func (d *SQLiteDialect) MigrationsSQL() []string { "ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0", + "ALTER TABLE check_history ADD COLUMN node_id TEXT DEFAULT ''", } } +func (d *SQLiteDialect) UpsertNodeSQL() string { + return "INSERT OR REPLACE INTO nodes (id, name, region, last_seen, version) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)" +} + func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) { var count int db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count) diff --git a/internal/store/sqlstore.go b/internal/store/sqlstore.go index 8adc020..12a3c7f 100644 --- a/internal/store/sqlstore.go +++ b/internal/store/sqlstore.go @@ -247,7 +247,11 @@ func (s *SQLStore) DeleteUser(id int) error { } func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { - _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)"), siteID, latencyNs, isUp) + return s.SaveCheckFromNode(siteID, "", latencyNs, isUp) +} + +func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error { + _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp) if err != nil { return err } @@ -257,6 +261,45 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { return err } +func (s *SQLStore) RegisterNode(node models.ProbeNode) error { + _, err := s.db.Exec(s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version) + return err +} + +func (s *SQLStore) GetNode(id string) (models.ProbeNode, error) { + var n models.ProbeNode + err := s.db.QueryRow(s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id). + Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version) + return n, err +} + +func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) { + rows, err := s.db.Query("SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name") + if err != nil { + return nil, err + } + defer rows.Close() + var nodes []models.ProbeNode + for rows.Next() { + var n models.ProbeNode + if err := rows.Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version); err != nil { + return nodes, err + } + nodes = append(nodes, n) + } + return nodes, rows.Err() +} + +func (s *SQLStore) UpdateNodeLastSeen(id string) error { + _, err := s.db.Exec(s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id) + return err +} + +func (s *SQLStore) DeleteNode(id string) error { + _, err := s.db.Exec(s.q("DELETE FROM nodes WHERE id = ?"), id) + return err +} + func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) { result := make(map[int][]models.CheckRecord) rows, err := s.db.Query(s.q(` diff --git a/internal/store/store.go b/internal/store/store.go index 1ed3c99..1340326 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -35,8 +35,16 @@ type Store interface { // History SaveCheck(siteID int, latencyNs int64, isUp bool) error + SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) + // Nodes + RegisterNode(node models.ProbeNode) error + GetNode(id string) (models.ProbeNode, error) + GetAllNodes() ([]models.ProbeNode, error) + UpdateNodeLastSeen(id string) error + DeleteNode(id string) error + // Backup & Restore ExportData() (models.Backup, error) ImportData(data models.Backup) error