feat(cluster): add distributed probing foundation #9

Merged
lerko merged 3 commits from feat/distributed-probing-foundation into develop 2026-05-16 16:01:16 +00:00
12 changed files with 187 additions and 26 deletions
Showing only changes of commit 0396acdc59 - Show all commits
+4
View File
@@ -239,6 +239,7 @@ func monitorToSite(m Monitor, alertID, parentID int) models.Site {
DNSServer: m.DNSServer,
IgnoreTLS: m.IgnoreTLS,
Paused: m.Paused,
Regions: m.Regions,
}
s.ExpiryThreshold = m.ExpiryThreshold
@@ -346,6 +347,9 @@ func diffSite(existing, desired models.Site) string {
if existing.Paused != desired.Paused {
diffs = append(diffs, fmt.Sprintf("paused: %v -> %v", existing.Paused, desired.Paused))
}
if existing.Regions != desired.Regions {
diffs = append(diffs, fmt.Sprintf("regions: %s -> %s", existing.Regions, desired.Regions))
}
return strings.Join(diffs, ", ")
}
+4
View File
@@ -126,6 +126,10 @@ func siteToMonitor(s models.Site, alertIDToName map[int]string) Monitor {
m.IgnoreTLS = s.IgnoreTLS
m.Paused = s.Paused
if s.Regions != "" {
m.Regions = s.Regions
}
return m
}
+1
View File
@@ -30,5 +30,6 @@ type Monitor struct {
DNSServer string `yaml:"dns_server,omitempty"`
IgnoreTLS bool `yaml:"ignore_tls,omitempty"`
Paused bool `yaml:"paused,omitempty"`
Regions string `yaml:"regions,omitempty"`
Monitors []Monitor `yaml:"monitors,omitempty"`
}
+13
View File
@@ -74,6 +74,19 @@ func Handler(eng *monitor.Engine) http.HandlerFunc {
writeGauge(&b, "upkeep_monitor_checks_up_total", labels(s), float64(h.UpChecks))
}
writeHelp(&b, "upkeep_probe_up", "gauge", "Whether a probe node is online (1) or offline (0) based on last-seen time.")
for _, site := range sites {
probeResults := eng.GetProbeResults(site.ID)
for nodeID, result := range probeResults {
val := 0
if result.IsUp {
val = 1
}
nodeLabels := fmt.Sprintf(`id="%d",name="%s",node="%s"`, site.ID, escapeLabelValue(site.Name), escapeLabelValue(nodeID))
writeGauge(&b, "upkeep_probe_up", nodeLabels, float64(val))
}
}
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
w.Write([]byte(b.String()))
}
+1
View File
@@ -25,6 +25,7 @@ type Site struct {
DNSServer string
IgnoreTLS bool
Paused bool
Regions string
FailureCount int
Status string
+20
View File
@@ -12,6 +12,7 @@ import (
"log"
"net/http"
"sort"
"strings"
)
var statusTpl = template.Must(template.New("status").Parse(`
@@ -283,12 +284,31 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) {
http.Error(w, "Unauthorized", 401)
return
}
nodeID := r.URL.Query().Get("node_id")
var nodeRegion string
if nodeID != "" {
if node, err := s.GetNode(nodeID); err == nil {
nodeRegion = node.Region
}
}
sites := eng.GetAllSites()
var assigned []models.Site
for _, site := range sites {
if site.Paused || site.Type == "push" || site.Type == "group" {
continue
}
if site.Regions != "" && nodeRegion != "" {
matched := false
for _, r := range strings.Split(site.Regions, ",") {
if strings.TrimSpace(r) == nodeRegion {
matched = true
break
}
}
if !matched {
continue
}
}
assigned = append(assigned, site)
}
w.Header().Set("Content-Type", "application/json")
+1
View File
@@ -68,6 +68,7 @@ func (d *PostgresDialect) MigrationsSQL() []string {
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE",
"ALTER TABLE check_history ADD COLUMN IF NOT EXISTS node_id TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS regions TEXT DEFAULT ''",
}
}
+1
View File
@@ -68,6 +68,7 @@ func (d *SQLiteDialect) MigrationsSQL() []string {
"ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0",
"ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0",
"ALTER TABLE check_history ADD COLUMN node_id TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN regions TEXT DEFAULT ''",
}
}
+10 -10
View File
@@ -51,7 +51,7 @@ func (s *SQLStore) Init() error {
func (s *SQLStore) GetSites() ([]models.Site, error) {
bf := s.dialect.BoolFalse()
query := fmt.Sprintf(
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites",
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites",
bf, bf,
)
rows, err := s.db.Query(query)
@@ -65,7 +65,7 @@ func (s *SQLStore) GetSites() ([]models.Site, error) {
if err := rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
&st.DNSServer, &st.IgnoreTLS, &st.Paused); err != nil {
&st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions); err != nil {
return sites, err
}
sites = append(sites, st)
@@ -78,9 +78,9 @@ func (s *SQLStore) AddSite(site models.Site) error {
if site.Type == "push" {
token = generateToken()
}
_, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
_, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused)
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions)
return err
}
@@ -90,9 +90,9 @@ func (s *SQLStore) UpdateSite(site models.Site) error {
if site.Type == "push" && existingToken == "" {
existingToken = generateToken()
}
_, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=? WHERE id=?"),
_, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=?, regions=? WHERE id=?"),
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID)
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions, site.ID)
return err
}
@@ -113,14 +113,14 @@ func (s *SQLStore) DeleteSite(id int) error {
func (s *SQLStore) GetSiteByName(name string) (models.Site, error) {
bf := s.dialect.BoolFalse()
query := fmt.Sprintf(
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites WHERE name = %s",
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites WHERE name = %s",
bf, bf, s.q("?"),
)
var st models.Site
err := s.db.QueryRow(query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
&st.DNSServer, &st.IgnoreTLS, &st.Paused)
&st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions)
return st, err
}
@@ -368,9 +368,9 @@ func (s *SQLStore) ImportData(data models.Backup) error {
}
}
for _, st := range data.Sites {
if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused); err != nil {
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused, st.Regions); err != nil {
return err
}
}
+96
View File
@@ -0,0 +1,96 @@
package tui
import (
"fmt"
"go-upkeep/internal/models"
"strings"
"time"
)
func (m Model) viewNodesTab() string {
if len(m.nodes) == 0 {
return "\n No probe nodes connected."
}
colWidths := []int{0, 12, 20, 10, 8}
return m.renderTable(
[]string{"NAME", "REGION", "LAST SEEN", "VERSION", "STATUS"},
len(m.nodes),
func(start, end int) [][]string {
var rows [][]string
for i := start; i < end; i++ {
node := m.nodes[i]
name := limitStr(node.Name, 20)
if name == "" {
name = node.ID
}
region := node.Region
if region == "" {
region = subtleStyle.Render("—")
}
lastSeen := fmtNodeLastSeen(node.LastSeen)
version := node.Version
if version == "" {
version = subtleStyle.Render("—")
}
status := fmtNodeStatus(node.LastSeen)
rows = append(rows, []string{name, region, lastSeen, version, status})
}
return rows
},
colWidths,
nil,
)
}
func fmtNodeStatus(lastSeen time.Time) string {
if lastSeen.IsZero() {
return subtleStyle.Render("UNKNOWN")
}
ago := time.Since(lastSeen)
if ago < 60*time.Second {
return specialStyle.Render("ONLINE")
}
if ago < 5*time.Minute {
return warnStyle.Render("STALE")
}
return dangerStyle.Render("OFFLINE")
}
func fmtNodeLastSeen(t time.Time) string {
if t.IsZero() {
return subtleStyle.Render("never")
}
ago := time.Since(t)
if ago < time.Minute {
return fmt.Sprintf("%ds ago", int(ago.Seconds()))
}
if ago < time.Hour {
return fmt.Sprintf("%dm ago", int(ago.Minutes()))
}
return fmt.Sprintf("%dh ago", int(ago.Hours()))
}
func fmtProbeRegions(site models.Site, probeResults map[string]probeStatus) string {
if len(probeResults) == 0 {
return subtleStyle.Render("—")
}
var parts []string
for region, status := range probeResults {
short := region
if len(short) > 6 {
short = short[:6]
}
if status.isUp {
parts = append(parts, specialStyle.Render(short+":UP"))
} else {
parts = append(parts, dangerStyle.Render(short+":DN"))
}
}
return strings.Join(parts, " ")
}
type probeStatus struct {
isUp bool
}
+7
View File
@@ -37,6 +37,7 @@ type siteFormData struct {
Description string
IgnoreTLS bool
GroupID string
Regions string
}
func latencySparkline(latencies []time.Duration, width int) string {
@@ -309,6 +310,7 @@ func (m *Model) initSiteHuhForm() tea.Cmd {
m.siteFormData.GroupID = strconv.Itoa(site.ParentID)
m.siteFormData.Method = site.Method
m.siteFormData.AcceptedCodes = site.AcceptedCodes
m.siteFormData.Regions = site.Regions
break
}
}
@@ -435,6 +437,10 @@ func (m *Model) initSiteHuhForm() tea.Cmd {
huh.NewInput().Title("Description").
Placeholder("Optional description").
Value(&m.siteFormData.Description),
huh.NewInput().Title("Probe Regions").
Placeholder("us-east, eu-west (empty = all)").
Description("Comma-separated regions for distributed probing").
Value(&m.siteFormData.Regions),
).Title("Connection").WithHideFunc(func() bool {
return m.siteFormData.SiteType == "group"
}),
@@ -529,6 +535,7 @@ func (m *Model) submitSiteForm() {
ParentID: groupID,
Method: d.Method,
AcceptedCodes: d.AcceptedCodes,
Regions: d.Regions,
}
if m.editID > 0 {
+29 -16
View File
@@ -80,6 +80,7 @@ type Model struct {
sites []models.Site
alerts []models.AlertConfig
users []models.User
nodes []models.ProbeNode
}
func InitialModel(isAdmin bool, s store.Store, eng *monitor.Engine) Model {
@@ -131,12 +132,12 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}
m.refreshData()
m.state = stateDashboard
if m.deleteTab == 3 {
if m.deleteTab == 4 {
m.state = stateUsers
}
case "n", "N", "esc":
m.state = stateDashboard
if m.deleteTab == 3 {
if m.deleteTab == 4 {
m.state = stateUsers
}
case "ctrl+c":
@@ -155,7 +156,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
if keyMsg.String() == "esc" {
m.huhForm = nil
m.state = stateDashboard
if m.currentTab == 3 {
if m.currentTab == 4 {
m.state = stateUsers
}
return m, nil
@@ -214,6 +215,8 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
if m.currentTab == 1 {
listLen = len(m.alerts)
} else if m.currentTab == 3 {
listLen = len(m.nodes)
} else if m.currentTab == 4 {
listLen = len(m.users)
}
if msg.Button == tea.MouseButtonWheelUp {
@@ -273,6 +276,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
max = len(m.alerts) - 1
}
if m.currentTab == 3 {
max = len(m.nodes) - 1
}
if m.currentTab == 4 {
max = len(m.users) - 1
}
if m.cursor < max {
@@ -291,7 +297,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} else if m.currentTab == 1 {
m.state = stateFormAlert
return m, m.initAlertHuhForm()
} else if m.currentTab == 3 && m.isAdmin {
} else if m.currentTab == 4 && m.isAdmin {
m.state = stateFormUser
return m, m.initUserHuhForm()
}
@@ -305,7 +311,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.editID = m.alerts[m.cursor].ID
m.state = stateFormAlert
return m, m.initAlertHuhForm()
} else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 {
} else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 {
m.editID = m.users[m.cursor].ID
m.state = stateFormUser
return m, m.initUserHuhForm()
@@ -335,10 +341,10 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.deleteName = m.alerts[m.cursor].Name
m.deleteTab = 1
m.state = stateConfirmDelete
} else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 {
} else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 {
m.deleteID = m.users[m.cursor].ID
m.deleteName = m.users[m.cursor].Username
m.deleteTab = 3
m.deleteTab = 4
m.state = stateConfirmDelete
}
}
@@ -348,9 +354,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}
func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
tabCount := 3
tabCount := 4
if m.isAdmin {
tabCount = 4
tabCount = 5
}
for i := 0; i < tabCount; i++ {
if m.zones.Get(fmt.Sprintf("tab-%d", i)).InBounds(msg) {
@@ -385,7 +391,7 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
}
}
if m.currentTab == 3 {
if m.currentTab == 4 {
end := m.tableOffset + m.maxTableRows
if end > len(m.users) {
end = len(m.users)
@@ -402,9 +408,9 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
}
func (m *Model) switchTab(idx int) {
maxTabs := 2
maxTabs := 3
if m.isAdmin {
maxTabs = 3
maxTabs = 4
}
if idx > maxTabs {
idx = 0
@@ -415,7 +421,7 @@ func (m *Model) switchTab(idx int) {
switch idx {
case 2:
m.state = stateLogs
case 3:
case 4:
m.state = stateUsers
default:
m.state = stateDashboard
@@ -473,12 +479,17 @@ func (m *Model) refreshData() {
m.users = users
}
}
if nodes, err := m.store.GetAllNodes(); err == nil {
m.nodes = nodes
}
m.logViewport.SetContent(strings.Join(m.engine.GetLogs(), "\n"))
listLen := len(m.sites)
if m.currentTab == 1 {
listLen = len(m.alerts)
} else if m.currentTab == 3 {
listLen = len(m.nodes)
} else if m.currentTab == 4 {
listLen = len(m.users)
}
if listLen > 0 && m.cursor >= listLen {
@@ -522,7 +533,7 @@ func (m Model) View() string {
kind := "monitor"
if m.deleteTab == 1 {
kind = "alert"
} else if m.deleteTab == 3 {
} else if m.deleteTab == 4 {
kind = "user"
}
msg := dangerStyle.Render(fmt.Sprintf("Delete %s \"%s\"?", kind, m.deleteName))
@@ -559,7 +570,7 @@ func (m Model) View() string {
}
func (m Model) viewDashboard() string {
tabs := []string{"Sites", "Alerts", "Logs"}
tabs := []string{"Sites", "Alerts", "Logs", "Nodes"}
if m.isAdmin {
tabs = append(tabs, "Users")
}
@@ -587,13 +598,15 @@ func (m Model) viewDashboard() string {
case 2:
content = m.viewLogsTab()
case 3:
content = m.viewNodesTab()
case 4:
if m.isAdmin {
content = m.viewUsersTab()
}
}
footer := subtleStyle.Render("\n[n] New [e/Enter] Edit [d] Delete [p] Pause [Space] Collapse [Tab/Click] Switch [q] Quit")
if m.currentTab == 3 {
if m.currentTab == 4 {
footer = subtleStyle.Render("\n[n] Add User [d] Revoke [Tab/Click] Switch [Ctrl+L] Clear [q] Quit")
}
s := lipgloss.NewStyle().Padding(1, 2)