feat(cluster): add distributed probing foundation #9
@@ -44,6 +44,12 @@ func (m *mockStore) AddSiteReturningID(models.Site) (int, error) { return 0, nil
|
|||||||
func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) {
|
func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
func (m *mockStore) SaveCheckFromNode(int, string, int64, bool) error { return nil }
|
||||||
|
func (m *mockStore) RegisterNode(models.ProbeNode) error { return nil }
|
||||||
|
func (m *mockStore) GetNode(string) (models.ProbeNode, error) { return models.ProbeNode{}, nil }
|
||||||
|
func (m *mockStore) GetAllNodes() ([]models.ProbeNode, error) { return nil, nil }
|
||||||
|
func (m *mockStore) UpdateNodeLastSeen(string) error { return nil }
|
||||||
|
func (m *mockStore) DeleteNode(string) error { return nil }
|
||||||
|
|
||||||
func TestMetricsHandler(t *testing.T) {
|
func TestMetricsHandler(t *testing.T) {
|
||||||
ms := &mockStore{
|
ms := &mockStore{
|
||||||
|
|||||||
@@ -52,11 +52,20 @@ type User struct {
|
|||||||
|
|
||||||
type CheckRecord struct {
|
type CheckRecord struct {
|
||||||
SiteID int
|
SiteID int
|
||||||
|
NodeID string
|
||||||
LatencyNs int64
|
LatencyNs int64
|
||||||
IsUp bool
|
IsUp bool
|
||||||
CheckedAt time.Time
|
CheckedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ProbeNode struct {
|
||||||
|
ID string
|
||||||
|
Name string
|
||||||
|
Region string
|
||||||
|
LastSeen time.Time
|
||||||
|
Version string
|
||||||
|
}
|
||||||
|
|
||||||
type Backup struct {
|
type Backup struct {
|
||||||
Sites []Site `json:"sites"`
|
Sites []Site `json:"sites"`
|
||||||
Alerts []AlertConfig `json:"alerts"`
|
Alerts []AlertConfig `json:"alerts"`
|
||||||
|
|||||||
@@ -243,10 +243,97 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) {
|
|||||||
w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version)))
|
w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version)))
|
||||||
})
|
})
|
||||||
|
|
||||||
// 6. Prometheus Metrics
|
// 6. Probe Registration
|
||||||
|
mux.HandleFunc("/api/probe/register", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != "POST" {
|
||||||
|
http.Error(w, "POST required", 405)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
|
||||||
|
http.Error(w, "Unauthorized", 401)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Region string `json:"region"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, "Invalid JSON", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if req.ID == "" {
|
||||||
|
http.Error(w, "id is required", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.RegisterNode(models.ProbeNode{
|
||||||
|
ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version,
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("Probe register failed: %v", err)
|
||||||
|
http.Error(w, "Registration failed", 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]bool{"ok": true})
|
||||||
|
})
|
||||||
|
|
||||||
|
// 7. Probe Assignment Fetch
|
||||||
|
mux.HandleFunc("/api/probe/assignments", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
|
||||||
|
http.Error(w, "Unauthorized", 401)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sites := eng.GetAllSites()
|
||||||
|
var assigned []models.Site
|
||||||
|
for _, site := range sites {
|
||||||
|
if site.Paused || site.Type == "push" || site.Type == "group" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
assigned = append(assigned, site)
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned})
|
||||||
|
})
|
||||||
|
|
||||||
|
// 8. Probe Result Submission
|
||||||
|
mux.HandleFunc("/api/probe/results", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != "POST" {
|
||||||
|
http.Error(w, "POST required", 405)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
|
||||||
|
http.Error(w, "Unauthorized", 401)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req struct {
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
Results []struct {
|
||||||
|
SiteID int `json:"site_id"`
|
||||||
|
LatencyNs int64 `json:"latency_ns"`
|
||||||
|
IsUp bool `json:"is_up"`
|
||||||
|
} `json:"results"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, "Invalid JSON", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if req.NodeID == "" {
|
||||||
|
http.Error(w, "node_id is required", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, result := range req.Results {
|
||||||
|
if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil {
|
||||||
|
log.Printf("Failed to save probe result: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.UpdateNodeLastSeen(req.NodeID)
|
||||||
|
json.NewEncoder(w).Encode(map[string]bool{"ok": true})
|
||||||
|
})
|
||||||
|
|
||||||
|
// 9. Prometheus Metrics
|
||||||
mux.HandleFunc("/metrics", metrics.Handler(eng))
|
mux.HandleFunc("/metrics", metrics.Handler(eng))
|
||||||
|
|
||||||
// 7. Status Page
|
// 10. Status Page
|
||||||
if cfg.EnableStatus {
|
if cfg.EnableStatus {
|
||||||
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) })
|
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) })
|
||||||
mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ type Dialect interface {
|
|||||||
ResetSequenceOnEmpty(db *sql.DB, table string)
|
ResetSequenceOnEmpty(db *sql.DB, table string)
|
||||||
ImportWipe(tx *sql.Tx)
|
ImportWipe(tx *sql.Tx)
|
||||||
ImportResetSequences(tx *sql.Tx)
|
ImportResetSequences(tx *sql.Tx)
|
||||||
|
UpsertNodeSQL() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres.
|
// rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres.
|
||||||
|
|||||||
@@ -44,6 +44,13 @@ func (d *PostgresDialect) CreateTablesSQL() []string {
|
|||||||
is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW()
|
is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW()
|
||||||
)`,
|
)`,
|
||||||
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
|
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
|
||||||
|
`CREATE TABLE IF NOT EXISTS nodes (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
region TEXT DEFAULT '',
|
||||||
|
last_seen TIMESTAMP DEFAULT NOW(),
|
||||||
|
version TEXT DEFAULT ''
|
||||||
|
)`,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,9 +67,14 @@ func (d *PostgresDialect) MigrationsSQL() []string {
|
|||||||
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''",
|
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''",
|
||||||
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE",
|
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE",
|
||||||
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE",
|
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE",
|
||||||
|
"ALTER TABLE check_history ADD COLUMN IF NOT EXISTS node_id TEXT DEFAULT ''",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *PostgresDialect) UpsertNodeSQL() string {
|
||||||
|
return "INSERT INTO nodes (id, name, region, last_seen, version) VALUES ($1, $2, $3, NOW(), $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, region = EXCLUDED.region, last_seen = NOW(), version = EXCLUDED.version"
|
||||||
|
}
|
||||||
|
|
||||||
func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {}
|
func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {}
|
||||||
|
|
||||||
func (d *PostgresDialect) ImportWipe(tx *sql.Tx) {
|
func (d *PostgresDialect) ImportWipe(tx *sql.Tx) {
|
||||||
|
|||||||
@@ -44,6 +44,13 @@ func (d *SQLiteDialect) CreateTablesSQL() []string {
|
|||||||
is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||||
)`,
|
)`,
|
||||||
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
|
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
|
||||||
|
`CREATE TABLE IF NOT EXISTS nodes (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
region TEXT DEFAULT '',
|
||||||
|
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
version TEXT DEFAULT ''
|
||||||
|
)`,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,9 +67,14 @@ func (d *SQLiteDialect) MigrationsSQL() []string {
|
|||||||
"ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''",
|
"ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''",
|
||||||
"ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0",
|
"ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0",
|
||||||
"ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0",
|
"ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0",
|
||||||
|
"ALTER TABLE check_history ADD COLUMN node_id TEXT DEFAULT ''",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *SQLiteDialect) UpsertNodeSQL() string {
|
||||||
|
return "INSERT OR REPLACE INTO nodes (id, name, region, last_seen, version) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)"
|
||||||
|
}
|
||||||
|
|
||||||
func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {
|
func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {
|
||||||
var count int
|
var count int
|
||||||
db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
|
db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
|
||||||
|
|||||||
@@ -247,7 +247,11 @@ func (s *SQLStore) DeleteUser(id int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
|
func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
|
||||||
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)"), siteID, latencyNs, isUp)
|
return s.SaveCheckFromNode(siteID, "", latencyNs, isUp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error {
|
||||||
|
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -257,6 +261,45 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) RegisterNode(node models.ProbeNode) error {
|
||||||
|
_, err := s.db.Exec(s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) GetNode(id string) (models.ProbeNode, error) {
|
||||||
|
var n models.ProbeNode
|
||||||
|
err := s.db.QueryRow(s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id).
|
||||||
|
Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) {
|
||||||
|
rows, err := s.db.Query("SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var nodes []models.ProbeNode
|
||||||
|
for rows.Next() {
|
||||||
|
var n models.ProbeNode
|
||||||
|
if err := rows.Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version); err != nil {
|
||||||
|
return nodes, err
|
||||||
|
}
|
||||||
|
nodes = append(nodes, n)
|
||||||
|
}
|
||||||
|
return nodes, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) UpdateNodeLastSeen(id string) error {
|
||||||
|
_, err := s.db.Exec(s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLStore) DeleteNode(id string) error {
|
||||||
|
_, err := s.db.Exec(s.q("DELETE FROM nodes WHERE id = ?"), id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) {
|
func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) {
|
||||||
result := make(map[int][]models.CheckRecord)
|
result := make(map[int][]models.CheckRecord)
|
||||||
rows, err := s.db.Query(s.q(`
|
rows, err := s.db.Query(s.q(`
|
||||||
|
|||||||
@@ -35,8 +35,16 @@ type Store interface {
|
|||||||
|
|
||||||
// History
|
// History
|
||||||
SaveCheck(siteID int, latencyNs int64, isUp bool) error
|
SaveCheck(siteID int, latencyNs int64, isUp bool) error
|
||||||
|
SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error
|
||||||
LoadAllHistory(limit int) (map[int][]models.CheckRecord, error)
|
LoadAllHistory(limit int) (map[int][]models.CheckRecord, error)
|
||||||
|
|
||||||
|
// Nodes
|
||||||
|
RegisterNode(node models.ProbeNode) error
|
||||||
|
GetNode(id string) (models.ProbeNode, error)
|
||||||
|
GetAllNodes() ([]models.ProbeNode, error)
|
||||||
|
UpdateNodeLastSeen(id string) error
|
||||||
|
DeleteNode(id string) error
|
||||||
|
|
||||||
// Backup & Restore
|
// Backup & Restore
|
||||||
ExportData() (models.Backup, error)
|
ExportData() (models.Backup, error)
|
||||||
ImportData(data models.Backup) error
|
ImportData(data models.Backup) error
|
||||||
|
|||||||
Reference in New Issue
Block a user