release: 2026.05.1 — distributed probing, config-as-code, TUI polish #15

Merged
lerko merged 47 commits from develop into main 2026-05-16 20:03:54 +00:00
22 changed files with 960 additions and 244 deletions
Showing only changes of commit 4ac4973eaf - Show all commits
+41
View File
@@ -166,6 +166,44 @@ func runServe(args []string) {
clusterKey = v clusterKey = v
} }
nodeID := os.Getenv("UPKEEP_NODE_ID")
nodeName := os.Getenv("UPKEEP_NODE_NAME")
nodeRegion := os.Getenv("UPKEEP_NODE_REGION")
aggStrategy := os.Getenv("UPKEEP_AGG_STRATEGY")
if clusterMode == "probe" {
if nodeID == "" {
fmt.Fprintln(os.Stderr, "UPKEEP_NODE_ID is required for probe mode")
os.Exit(1)
}
if clusterPeer == "" {
fmt.Fprintln(os.Stderr, "UPKEEP_PEER_URL is required for probe mode")
os.Exit(1)
}
fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", nodeID, nodeRegion)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-done
cancel()
}()
if err := cluster.RunProbe(ctx, cluster.ProbeConfig{
NodeID: nodeID,
NodeName: nodeName,
Region: nodeRegion,
LeaderURL: clusterPeer,
SharedKey: clusterKey,
Interval: 30,
}); err != nil {
fmt.Fprintf(os.Stderr, "Probe error: %v\n", err)
}
return
}
fs := flag.NewFlagSet("serve", flag.ExitOnError) fs := flag.NewFlagSet("serve", flag.ExitOnError)
port := fs.Int("port", portVal, "SSH Port") port := fs.Int("port", portVal, "SSH Port")
flagDBType := fs.String("db-type", dbType, "Database type") flagDBType := fs.String("db-type", dbType, "Database type")
@@ -214,6 +252,9 @@ func runServe(args []string) {
if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" { if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" {
eng.SetInsecureSkipVerify(true) eng.SetInsecureSkipVerify(true)
} }
if aggStrategy != "" {
eng.SetAggStrategy(monitor.AggregationStrategy(aggStrategy))
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
+35
View File
@@ -0,0 +1,35 @@
services:
leader:
build: .
environment:
- UPKEEP_CLUSTER_MODE=leader
- UPKEEP_CLUSTER_SECRET=changeme
- UPKEEP_AGG_STRATEGY=any-down
- UPKEEP_STATUS_ENABLED=true
ports:
- "8080:8080"
- "23234:23234"
probe-us-east:
build: .
environment:
- UPKEEP_CLUSTER_MODE=probe
- UPKEEP_NODE_ID=us-east-1
- UPKEEP_NODE_NAME=US East Probe
- UPKEEP_NODE_REGION=us-east
- UPKEEP_PEER_URL=http://leader:8080
- UPKEEP_CLUSTER_SECRET=changeme
depends_on:
- leader
probe-eu-west:
build: .
environment:
- UPKEEP_CLUSTER_MODE=probe
- UPKEEP_NODE_ID=eu-west-1
- UPKEEP_NODE_NAME=EU West Probe
- UPKEEP_NODE_REGION=eu-west
- UPKEEP_PEER_URL=http://leader:8080
- UPKEEP_CLUSTER_SECRET=changeme
depends_on:
- leader
+2
View File
@@ -33,6 +33,8 @@ func Start(ctx context.Context, cfg Config, eng *monitor.Engine) {
eng.SetActive(false) eng.SetActive(false)
go runFollowerLoop(ctx, cfg, eng) go runFollowerLoop(ctx, cfg, eng)
} }
// "probe" mode is handled directly in main.go before cluster.Start is called
} }
func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) { func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) {
+187
View File
@@ -0,0 +1,187 @@
package cluster
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"go-upkeep/internal/models"
"go-upkeep/internal/monitor"
"log"
"net/http"
"sync"
"time"
)
type ProbeConfig struct {
NodeID string
NodeName string
Region string
LeaderURL string
SharedKey string
Interval int
}
func RunProbe(ctx context.Context, cfg ProbeConfig) error {
if cfg.Interval < 10 {
cfg.Interval = 30
}
apiClient := &http.Client{Timeout: 10 * time.Second}
strictClient := &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
}
insecureClient := &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
}
if err := probeRegister(ctx, apiClient, cfg); err != nil {
log.Printf("Probe: initial registration failed: %v (will retry)", err)
}
for {
select {
case <-ctx.Done():
return nil
default:
}
sites, err := probeFetchAssignments(ctx, apiClient, cfg)
if err != nil {
log.Printf("Probe: failed to fetch assignments: %v", err)
sleepCtx(ctx, 10*time.Second)
continue
}
if len(sites) == 0 {
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
continue
}
results := probeExecuteChecks(ctx, sites, strictClient, insecureClient)
if len(results) > 0 {
if err := probeReportResults(ctx, apiClient, cfg, results); err != nil {
log.Printf("Probe: failed to report results: %v", err)
}
}
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
}
}
func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error {
body, _ := json.Marshal(map[string]string{
"id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe",
})
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("register returned %d", resp.StatusCode)
}
return nil
}
func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) {
req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("assignments returned %d", resp.StatusCode)
}
var result struct {
Sites []models.Site `json:"sites"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Sites, nil
}
type probeResultItem struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
}
func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem {
var mu sync.Mutex
var results []probeResultItem
sem := make(chan struct{}, 10)
var wg sync.WaitGroup
for _, site := range sites {
select {
case <-ctx.Done():
break
default:
}
wg.Add(1)
sem <- struct{}{}
go func(s models.Site) {
defer wg.Done()
defer func() { <-sem }()
cr := monitor.RunCheck(s, strict, insecure, false)
mu.Lock()
results = append(results, probeResultItem{
SiteID: s.ID,
LatencyNs: cr.LatencyNs,
IsUp: cr.Status == "UP",
})
mu.Unlock()
}(site)
}
wg.Wait()
return results
}
func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error {
body, err := json.Marshal(map[string]interface{}{
"node_id": cfg.NodeID,
"results": results,
})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("results returned %d", resp.StatusCode)
}
fmt.Printf("Probe: reported %d check results\n", len(results))
return nil
}
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}
+4
View File
@@ -239,6 +239,7 @@ func monitorToSite(m Monitor, alertID, parentID int) models.Site {
DNSServer: m.DNSServer, DNSServer: m.DNSServer,
IgnoreTLS: m.IgnoreTLS, IgnoreTLS: m.IgnoreTLS,
Paused: m.Paused, Paused: m.Paused,
Regions: m.Regions,
} }
s.ExpiryThreshold = m.ExpiryThreshold s.ExpiryThreshold = m.ExpiryThreshold
@@ -346,6 +347,9 @@ func diffSite(existing, desired models.Site) string {
if existing.Paused != desired.Paused { if existing.Paused != desired.Paused {
diffs = append(diffs, fmt.Sprintf("paused: %v -> %v", existing.Paused, desired.Paused)) diffs = append(diffs, fmt.Sprintf("paused: %v -> %v", existing.Paused, desired.Paused))
} }
if existing.Regions != desired.Regions {
diffs = append(diffs, fmt.Sprintf("regions: %s -> %s", existing.Regions, desired.Regions))
}
return strings.Join(diffs, ", ") return strings.Join(diffs, ", ")
} }
+4
View File
@@ -126,6 +126,10 @@ func siteToMonitor(s models.Site, alertIDToName map[int]string) Monitor {
m.IgnoreTLS = s.IgnoreTLS m.IgnoreTLS = s.IgnoreTLS
m.Paused = s.Paused m.Paused = s.Paused
if s.Regions != "" {
m.Regions = s.Regions
}
return m return m
} }
+1
View File
@@ -30,5 +30,6 @@ type Monitor struct {
DNSServer string `yaml:"dns_server,omitempty"` DNSServer string `yaml:"dns_server,omitempty"`
IgnoreTLS bool `yaml:"ignore_tls,omitempty"` IgnoreTLS bool `yaml:"ignore_tls,omitempty"`
Paused bool `yaml:"paused,omitempty"` Paused bool `yaml:"paused,omitempty"`
Regions string `yaml:"regions,omitempty"`
Monitors []Monitor `yaml:"monitors,omitempty"` Monitors []Monitor `yaml:"monitors,omitempty"`
} }
+13
View File
@@ -74,6 +74,19 @@ func Handler(eng *monitor.Engine) http.HandlerFunc {
writeGauge(&b, "upkeep_monitor_checks_up_total", labels(s), float64(h.UpChecks)) writeGauge(&b, "upkeep_monitor_checks_up_total", labels(s), float64(h.UpChecks))
} }
writeHelp(&b, "upkeep_probe_up", "gauge", "Whether a probe node is online (1) or offline (0) based on last-seen time.")
for _, site := range sites {
probeResults := eng.GetProbeResults(site.ID)
for nodeID, result := range probeResults {
val := 0
if result.IsUp {
val = 1
}
nodeLabels := fmt.Sprintf(`id="%d",name="%s",node="%s"`, site.ID, escapeLabelValue(site.Name), escapeLabelValue(nodeID))
writeGauge(&b, "upkeep_probe_up", nodeLabels, float64(val))
}
}
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
w.Write([]byte(b.String())) w.Write([]byte(b.String()))
} }
+6
View File
@@ -44,6 +44,12 @@ func (m *mockStore) AddSiteReturningID(models.Site) (int, error) { return 0, nil
func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) { func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) {
return 0, nil return 0, nil
} }
func (m *mockStore) SaveCheckFromNode(int, string, int64, bool) error { return nil }
func (m *mockStore) RegisterNode(models.ProbeNode) error { return nil }
func (m *mockStore) GetNode(string) (models.ProbeNode, error) { return models.ProbeNode{}, nil }
func (m *mockStore) GetAllNodes() ([]models.ProbeNode, error) { return nil, nil }
func (m *mockStore) UpdateNodeLastSeen(string) error { return nil }
func (m *mockStore) DeleteNode(string) error { return nil }
func TestMetricsHandler(t *testing.T) { func TestMetricsHandler(t *testing.T) {
ms := &mockStore{ ms := &mockStore{
+10
View File
@@ -25,6 +25,7 @@ type Site struct {
DNSServer string DNSServer string
IgnoreTLS bool IgnoreTLS bool
Paused bool Paused bool
Regions string
FailureCount int FailureCount int
Status string Status string
@@ -52,11 +53,20 @@ type User struct {
type CheckRecord struct { type CheckRecord struct {
SiteID int SiteID int
NodeID string
LatencyNs int64 LatencyNs int64
IsUp bool IsUp bool
CheckedAt time.Time CheckedAt time.Time
} }
type ProbeNode struct {
ID string
Name string
Region string
LastSeen time.Time
Version string
}
type Backup struct { type Backup struct {
Sites []Site `json:"sites"` Sites []Site `json:"sites"`
Alerts []AlertConfig `json:"alerts"` Alerts []AlertConfig `json:"alerts"`
+44
View File
@@ -0,0 +1,44 @@
package monitor
import "time"
type AggregationStrategy string
const (
AggAnyDown AggregationStrategy = "any-down"
AggMajorityDown AggregationStrategy = "majority-down"
AggAllDown AggregationStrategy = "all-down"
)
type NodeResult struct {
NodeID string
IsUp bool
LatencyNs int64
CheckedAt time.Time
}
func AggregateStatus(results []NodeResult, strategy AggregationStrategy) (isUp bool, avgLatencyNs int64) {
if len(results) == 0 {
return true, 0
}
upCount := 0
var totalLatency int64
for _, r := range results {
if r.IsUp {
upCount++
}
totalLatency += r.LatencyNs
}
avgLatencyNs = totalLatency / int64(len(results))
switch strategy {
case AggMajorityDown:
isUp = upCount > len(results)/2
case AggAllDown:
isUp = upCount > 0
default:
isUp = upCount == len(results)
}
return
}
+218
View File
@@ -0,0 +1,218 @@
package monitor
import (
"context"
"go-upkeep/internal/models"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/miekg/dns"
probing "github.com/prometheus-community/pro-bing"
)
type CheckResult struct {
SiteID int
Status string // "UP", "DOWN", "SSL EXP"
StatusCode int
LatencyNs int64
HasSSL bool
CertExpiry time.Time
}
func RunCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult {
switch site.Type {
case "http":
return runHTTPCheck(site, strict, insecure, globalInsecure)
case "ping":
return runPingCheck(site)
case "port":
return runPortCheck(site)
case "dns":
return runDNSCheck(site)
default:
return CheckResult{SiteID: site.ID, Status: "DOWN"}
}
}
func runHTTPCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult {
method := site.Method
if method == "" {
method = "GET"
}
timeout := siteTimeout(site)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, site.URL, nil)
if err != nil {
return CheckResult{SiteID: site.ID, Status: "DOWN"}
}
client := strict
if globalInsecure || site.IgnoreTLS {
client = insecure
}
start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)
result := CheckResult{
SiteID: site.ID,
Status: "UP",
LatencyNs: latency.Nanoseconds(),
}
if err != nil {
result.Status = "DOWN"
return result
}
defer resp.Body.Close()
result.StatusCode = resp.StatusCode
if !isCodeAccepted(resp.StatusCode, site.AcceptedCodes) {
result.Status = "DOWN"
}
if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
result.HasSSL = true
cert := resp.TLS.PeerCertificates[0]
result.CertExpiry = cert.NotAfter
if time.Now().After(cert.NotAfter) {
result.Status = "SSL EXP"
}
}
return result
}
func runPingCheck(site models.Site) CheckResult {
host := site.Hostname
if host == "" {
host = site.URL
}
pinger, err := probing.NewPinger(host)
if err != nil {
return CheckResult{SiteID: site.ID, Status: "DOWN"}
}
pinger.Count = 1
pinger.Timeout = siteTimeout(site)
pinger.SetPrivileged(false)
start := time.Now()
err = pinger.Run()
latency := time.Since(start)
if err != nil || pinger.Statistics().PacketsRecv == 0 {
return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()}
}
stats := pinger.Statistics()
return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: stats.AvgRtt.Nanoseconds()}
}
func runPortCheck(site models.Site) CheckResult {
host := site.Hostname
if host == "" {
host = site.URL
}
addr := net.JoinHostPort(host, strconv.Itoa(site.Port))
timeout := siteTimeout(site)
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, timeout)
latency := time.Since(start)
if err != nil {
return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()}
}
conn.Close()
return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()}
}
func runDNSCheck(site models.Site) CheckResult {
host := site.Hostname
if host == "" {
host = site.URL
}
server := site.DNSServer
if server == "" {
server = "1.1.1.1"
}
if _, _, err := net.SplitHostPort(server); err != nil {
server = net.JoinHostPort(server, "53")
}
qtype := dns.TypeA
switch site.DNSResolveType {
case "AAAA":
qtype = dns.TypeAAAA
case "MX":
qtype = dns.TypeMX
case "CNAME":
qtype = dns.TypeCNAME
case "TXT":
qtype = dns.TypeTXT
case "NS":
qtype = dns.TypeNS
case "SOA":
qtype = dns.TypeSOA
case "SRV":
qtype = dns.TypeSRV
case "PTR":
qtype = dns.TypePTR
}
m := new(dns.Msg)
m.SetQuestion(dns.Fqdn(host), qtype)
c := new(dns.Client)
c.Timeout = siteTimeout(site)
start := time.Now()
r, _, err := c.Exchange(m, server)
latency := time.Since(start)
if err != nil {
return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()}
}
if r.Rcode != dns.RcodeSuccess {
return CheckResult{SiteID: site.ID, Status: "DOWN", StatusCode: r.Rcode, LatencyNs: latency.Nanoseconds()}
}
return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()}
}
func siteTimeout(site models.Site) time.Duration {
if site.Timeout > 0 {
return time.Duration(site.Timeout) * time.Second
}
return 5 * time.Second
}
func isCodeAccepted(code int, accepted string) bool {
if accepted == "" {
return code >= 200 && code < 300
}
for _, part := range strings.Split(accepted, ",") {
part = strings.TrimSpace(part)
if strings.Contains(part, "-") {
bounds := strings.SplitN(part, "-", 2)
lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0]))
hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1]))
if err1 == nil && err2 == nil && code >= lo && code <= hi {
return true
}
} else {
if v, err := strconv.Atoi(part); err == nil && code == v {
return true
}
}
}
return false
}
+64 -215
View File
@@ -7,15 +7,9 @@ import (
"go-upkeep/internal/alert" "go-upkeep/internal/alert"
"go-upkeep/internal/models" "go-upkeep/internal/models"
"go-upkeep/internal/store" "go-upkeep/internal/store"
"net"
"net/http" "net/http"
"strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/miekg/dns"
probing "github.com/prometheus-community/pro-bing"
) )
type Engine struct { type Engine struct {
@@ -33,6 +27,10 @@ type Engine struct {
tokenIndex map[string]int tokenIndex map[string]int
probeResultsMu sync.RWMutex
probeResults map[int]map[string]NodeResult
aggStrategy AggregationStrategy
db store.Store db store.Store
insecureSkipVerify bool insecureSkipVerify bool
strictClient *http.Client strictClient *http.Client
@@ -41,11 +39,13 @@ type Engine struct {
func NewEngine(s store.Store) *Engine { func NewEngine(s store.Store) *Engine {
return &Engine{ return &Engine{
liveState: make(map[int]models.Site), liveState: make(map[int]models.Site),
histories: make(map[int]*SiteHistory), histories: make(map[int]*SiteHistory),
tokenIndex: make(map[string]int), tokenIndex: make(map[string]int),
isActive: true, probeResults: make(map[int]map[string]NodeResult),
db: s, aggStrategy: AggAnyDown,
isActive: true,
db: s,
strictClient: &http.Client{ strictClient: &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
}, },
@@ -310,19 +310,20 @@ func (e *Engine) checkByID(id int) {
if !exists || site.Paused { if !exists || site.Paused {
return return
} }
switch site.Type { switch site.Type {
case "http":
e.checkHTTP(site)
case "push": case "push":
e.checkPush(site) e.checkPush(site)
case "ping":
e.checkPing(site)
case "port":
e.checkPort(site)
case "dns":
e.checkDNS(site)
case "group": case "group":
e.checkGroup(site) e.checkGroup(site)
default:
result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify)
updatedSite := site
updatedSite.HasSSL = result.HasSSL
updatedSite.CertExpiry = result.CertExpiry
updatedSite.Latency = time.Duration(result.LatencyNs)
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs))
} }
} }
@@ -335,61 +336,6 @@ func (e *Engine) checkPush(site models.Site) {
} }
} }
func (e *Engine) checkHTTP(site models.Site) {
method := site.Method
if method == "" {
method = "GET"
}
timeout := siteTimeout(site)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, site.URL, nil)
if err != nil {
e.handleStatusChange(site, "DOWN", 0, 0)
return
}
client := e.strictClient
if e.insecureSkipVerify || site.IgnoreTLS {
client = e.insecureClient
}
start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)
rawStatus := "UP"
rawCode := 0
var certExpiry time.Time
hasSSL := false
if err != nil {
rawStatus = "DOWN"
} else {
defer resp.Body.Close()
rawCode = resp.StatusCode
if !isCodeAccepted(rawCode, site.AcceptedCodes) {
rawStatus = "DOWN"
}
if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
hasSSL = true
cert := resp.TLS.PeerCertificates[0]
certExpiry = cert.NotAfter
if time.Now().After(cert.NotAfter) {
rawStatus = "SSL EXP"
}
}
}
updatedSite := site
updatedSite.HasSSL = hasSSL
updatedSite.CertExpiry = certExpiry
updatedSite.Latency = latency
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, rawStatus, rawCode, latency)
}
func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) { func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
if !e.IsActive() { if !e.IsActive() {
return return
@@ -462,94 +408,6 @@ func (e *Engine) triggerAlert(alertID int, title, message string) {
} }
} }
func siteTimeout(site models.Site) time.Duration {
if site.Timeout > 0 {
return time.Duration(site.Timeout) * time.Second
}
return 5 * time.Second
}
func isCodeAccepted(code int, accepted string) bool {
if accepted == "" {
return code >= 200 && code < 300
}
for _, part := range strings.Split(accepted, ",") {
part = strings.TrimSpace(part)
if strings.Contains(part, "-") {
bounds := strings.SplitN(part, "-", 2)
lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0]))
hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1]))
if err1 == nil && err2 == nil && code >= lo && code <= hi {
return true
}
} else {
if v, err := strconv.Atoi(part); err == nil && code == v {
return true
}
}
}
return false
}
func (e *Engine) checkPing(site models.Site) {
host := site.Hostname
if host == "" {
host = site.URL
}
pinger, err := probing.NewPinger(host)
if err != nil {
e.handleStatusChange(site, "DOWN", 0, 0)
e.AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err))
return
}
pinger.Count = 1
pinger.Timeout = siteTimeout(site)
pinger.SetPrivileged(false)
start := time.Now()
err = pinger.Run()
latency := time.Since(start)
if err != nil || pinger.Statistics().PacketsRecv == 0 {
updatedSite := site
updatedSite.Latency = latency
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
return
}
stats := pinger.Statistics()
updatedSite := site
updatedSite.Latency = stats.AvgRtt
updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt)
}
func (e *Engine) checkPort(site models.Site) {
host := site.Hostname
if host == "" {
host = site.URL
}
addr := net.JoinHostPort(host, strconv.Itoa(site.Port))
timeout := siteTimeout(site)
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, timeout)
latency := time.Since(start)
updatedSite := site
updatedSite.Latency = latency
updatedSite.LastCheck = time.Now()
if err != nil {
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
return
}
conn.Close()
e.handleStatusChange(updatedSite, "UP", 0, latency)
}
func (e *Engine) checkGroup(site models.Site) { func (e *Engine) checkGroup(site models.Site) {
e.mu.RLock() e.mu.RLock()
status := "UP" status := "UP"
@@ -588,63 +446,54 @@ func (e *Engine) checkGroup(site models.Site) {
e.mu.Unlock() e.mu.Unlock()
} }
func (e *Engine) checkDNS(site models.Site) { func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
host := site.Hostname e.aggStrategy = strategy
if host == "" { }
host = site.URL
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) {
e.probeResultsMu.Lock()
if e.probeResults[siteID] == nil {
e.probeResults[siteID] = make(map[string]NodeResult)
}
e.probeResults[siteID][nodeID] = NodeResult{
NodeID: nodeID,
IsUp: isUp,
LatencyNs: latencyNs,
CheckedAt: time.Now(),
}
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
for _, r := range e.probeResults[siteID] {
results = append(results, r)
}
e.probeResultsMu.Unlock()
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
e.mu.RLock()
site, exists := e.liveState[siteID]
e.mu.RUnlock()
if !exists {
return
} }
server := site.DNSServer rawStatus := "UP"
if server == "" { if !aggUp {
server = "1.1.1.1" rawStatus = "DOWN"
} }
if _, _, err := net.SplitHostPort(server); err != nil {
server = net.JoinHostPort(server, "53")
}
qtype := dns.TypeA
switch site.DNSResolveType {
case "AAAA":
qtype = dns.TypeAAAA
case "MX":
qtype = dns.TypeMX
case "CNAME":
qtype = dns.TypeCNAME
case "TXT":
qtype = dns.TypeTXT
case "NS":
qtype = dns.TypeNS
case "SOA":
qtype = dns.TypeSOA
case "SRV":
qtype = dns.TypeSRV
case "PTR":
qtype = dns.TypePTR
}
m := new(dns.Msg)
m.SetQuestion(dns.Fqdn(host), qtype)
c := new(dns.Client)
c.Timeout = siteTimeout(site)
start := time.Now()
r, _, err := c.Exchange(m, server)
latency := time.Since(start)
updatedSite := site updatedSite := site
updatedSite.Latency = latency updatedSite.Latency = time.Duration(avgLatency)
updatedSite.LastCheck = time.Now() updatedSite.LastCheck = time.Now()
e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency))
if err != nil { }
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
return func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
} e.probeResultsMu.RLock()
defer e.probeResultsMu.RUnlock()
if r.Rcode != dns.RcodeSuccess { src := e.probeResults[siteID]
e.handleStatusChange(updatedSite, "DOWN", r.Rcode, latency) cp := make(map[string]NodeResult, len(src))
return for k, v := range src {
} cp[k] = v
}
e.handleStatusChange(updatedSite, "UP", 0, latency) return cp
} }
+110 -2
View File
@@ -12,6 +12,7 @@ import (
"log" "log"
"net/http" "net/http"
"sort" "sort"
"strings"
) )
var statusTpl = template.Must(template.New("status").Parse(` var statusTpl = template.Must(template.New("status").Parse(`
@@ -243,10 +244,117 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) {
w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version))) w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version)))
}) })
// 6. Prometheus Metrics // 6. Probe Registration
mux.HandleFunc("/api/probe/register", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", 405)
return
}
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
http.Error(w, "Unauthorized", 401)
return
}
var req struct {
ID string `json:"id"`
Name string `json:"name"`
Region string `json:"region"`
Version string `json:"version"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", 400)
return
}
if req.ID == "" {
http.Error(w, "id is required", 400)
return
}
if err := s.RegisterNode(models.ProbeNode{
ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version,
}); err != nil {
log.Printf("Probe register failed: %v", err)
http.Error(w, "Registration failed", 500)
return
}
json.NewEncoder(w).Encode(map[string]bool{"ok": true})
})
// 7. Probe Assignment Fetch
mux.HandleFunc("/api/probe/assignments", func(w http.ResponseWriter, r *http.Request) {
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
http.Error(w, "Unauthorized", 401)
return
}
nodeID := r.URL.Query().Get("node_id")
var nodeRegion string
if nodeID != "" {
if node, err := s.GetNode(nodeID); err == nil {
nodeRegion = node.Region
}
}
sites := eng.GetAllSites()
var assigned []models.Site
for _, site := range sites {
if site.Paused || site.Type == "push" || site.Type == "group" {
continue
}
if site.Regions != "" && nodeRegion != "" {
matched := false
for _, r := range strings.Split(site.Regions, ",") {
if strings.TrimSpace(r) == nodeRegion {
matched = true
break
}
}
if !matched {
continue
}
}
assigned = append(assigned, site)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned})
})
// 8. Probe Result Submission
mux.HandleFunc("/api/probe/results", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", 405)
return
}
if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey {
http.Error(w, "Unauthorized", 401)
return
}
var req struct {
NodeID string `json:"node_id"`
Results []struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
} `json:"results"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", 400)
return
}
if req.NodeID == "" {
http.Error(w, "node_id is required", 400)
return
}
for _, result := range req.Results {
if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil {
log.Printf("Failed to save probe result: %v", err)
}
eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp)
}
s.UpdateNodeLastSeen(req.NodeID)
json.NewEncoder(w).Encode(map[string]bool{"ok": true})
})
// 9. Prometheus Metrics
mux.HandleFunc("/metrics", metrics.Handler(eng)) mux.HandleFunc("/metrics", metrics.Handler(eng))
// 7. Status Page // 10. Status Page
if cfg.EnableStatus { if cfg.EnableStatus {
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) }) mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) })
mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) {
+1
View File
@@ -10,6 +10,7 @@ type Dialect interface {
ResetSequenceOnEmpty(db *sql.DB, table string) ResetSequenceOnEmpty(db *sql.DB, table string)
ImportWipe(tx *sql.Tx) ImportWipe(tx *sql.Tx)
ImportResetSequences(tx *sql.Tx) ImportResetSequences(tx *sql.Tx)
UpsertNodeSQL() string
} }
// rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres. // rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres.
+13
View File
@@ -44,6 +44,13 @@ func (d *PostgresDialect) CreateTablesSQL() []string {
is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW() is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW()
)`, )`,
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
`CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
region TEXT DEFAULT '',
last_seen TIMESTAMP DEFAULT NOW(),
version TEXT DEFAULT ''
)`,
} }
} }
@@ -60,9 +67,15 @@ func (d *PostgresDialect) MigrationsSQL() []string {
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE",
"ALTER TABLE check_history ADD COLUMN IF NOT EXISTS node_id TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN IF NOT EXISTS regions TEXT DEFAULT ''",
} }
} }
func (d *PostgresDialect) UpsertNodeSQL() string {
return "INSERT INTO nodes (id, name, region, last_seen, version) VALUES ($1, $2, $3, NOW(), $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, region = EXCLUDED.region, last_seen = NOW(), version = EXCLUDED.version"
}
func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {} func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {}
func (d *PostgresDialect) ImportWipe(tx *sql.Tx) { func (d *PostgresDialect) ImportWipe(tx *sql.Tx) {
+13
View File
@@ -44,6 +44,13 @@ func (d *SQLiteDialect) CreateTablesSQL() []string {
is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`, )`,
`CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`,
`CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
region TEXT DEFAULT '',
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
version TEXT DEFAULT ''
)`,
} }
} }
@@ -60,9 +67,15 @@ func (d *SQLiteDialect) MigrationsSQL() []string {
"ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0",
"ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0",
"ALTER TABLE check_history ADD COLUMN node_id TEXT DEFAULT ''",
"ALTER TABLE sites ADD COLUMN regions TEXT DEFAULT ''",
} }
} }
func (d *SQLiteDialect) UpsertNodeSQL() string {
return "INSERT OR REPLACE INTO nodes (id, name, region, last_seen, version) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)"
}
func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) { func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {
var count int var count int
db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count) db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
+54 -11
View File
@@ -51,7 +51,7 @@ func (s *SQLStore) Init() error {
func (s *SQLStore) GetSites() ([]models.Site, error) { func (s *SQLStore) GetSites() ([]models.Site, error) {
bf := s.dialect.BoolFalse() bf := s.dialect.BoolFalse()
query := fmt.Sprintf( query := fmt.Sprintf(
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites", "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites",
bf, bf, bf, bf,
) )
rows, err := s.db.Query(query) rows, err := s.db.Query(query)
@@ -65,7 +65,7 @@ func (s *SQLStore) GetSites() ([]models.Site, error) {
if err := rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID, if err := rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout, &st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType, &st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
&st.DNSServer, &st.IgnoreTLS, &st.Paused); err != nil { &st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions); err != nil {
return sites, err return sites, err
} }
sites = append(sites, st) sites = append(sites, st)
@@ -78,9 +78,9 @@ func (s *SQLStore) AddSite(site models.Site) error {
if site.Type == "push" { if site.Type == "push" {
token = generateToken() token = generateToken()
} }
_, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), _, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries, site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused) site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions)
return err return err
} }
@@ -90,9 +90,9 @@ func (s *SQLStore) UpdateSite(site models.Site) error {
if site.Type == "push" && existingToken == "" { if site.Type == "push" && existingToken == "" {
existingToken = generateToken() existingToken = generateToken()
} }
_, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=? WHERE id=?"), _, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=?, regions=? WHERE id=?"),
site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries, site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries,
site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID) site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions, site.ID)
return err return err
} }
@@ -113,14 +113,14 @@ func (s *SQLStore) DeleteSite(id int) error {
func (s *SQLStore) GetSiteByName(name string) (models.Site, error) { func (s *SQLStore) GetSiteByName(name string) (models.Site, error) {
bf := s.dialect.BoolFalse() bf := s.dialect.BoolFalse()
query := fmt.Sprintf( query := fmt.Sprintf(
"SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites WHERE name = %s", "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites WHERE name = %s",
bf, bf, s.q("?"), bf, bf, s.q("?"),
) )
var st models.Site var st models.Site
err := s.db.QueryRow(query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID, err := s.db.QueryRow(query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID,
&st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout, &st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout,
&st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType, &st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType,
&st.DNSServer, &st.IgnoreTLS, &st.Paused) &st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions)
return st, err return st, err
} }
@@ -247,7 +247,11 @@ func (s *SQLStore) DeleteUser(id int) error {
} }
func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)"), siteID, latencyNs, isUp) return s.SaveCheckFromNode(siteID, "", latencyNs, isUp)
}
func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error {
_, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp)
if err != nil { if err != nil {
return err return err
} }
@@ -257,6 +261,45 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error {
return err return err
} }
func (s *SQLStore) RegisterNode(node models.ProbeNode) error {
_, err := s.db.Exec(s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version)
return err
}
func (s *SQLStore) GetNode(id string) (models.ProbeNode, error) {
var n models.ProbeNode
err := s.db.QueryRow(s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id).
Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version)
return n, err
}
func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) {
rows, err := s.db.Query("SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name")
if err != nil {
return nil, err
}
defer rows.Close()
var nodes []models.ProbeNode
for rows.Next() {
var n models.ProbeNode
if err := rows.Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version); err != nil {
return nodes, err
}
nodes = append(nodes, n)
}
return nodes, rows.Err()
}
func (s *SQLStore) UpdateNodeLastSeen(id string) error {
_, err := s.db.Exec(s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id)
return err
}
func (s *SQLStore) DeleteNode(id string) error {
_, err := s.db.Exec(s.q("DELETE FROM nodes WHERE id = ?"), id)
return err
}
func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) { func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) {
result := make(map[int][]models.CheckRecord) result := make(map[int][]models.CheckRecord)
rows, err := s.db.Query(s.q(` rows, err := s.db.Query(s.q(`
@@ -325,9 +368,9 @@ func (s *SQLStore) ImportData(data models.Backup) error {
} }
} }
for _, st := range data.Sites { for _, st := range data.Sites {
if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"),
st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries, st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries,
st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused); err != nil { st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused, st.Regions); err != nil {
return err return err
} }
} }
+8
View File
@@ -35,8 +35,16 @@ type Store interface {
// History // History
SaveCheck(siteID int, latencyNs int64, isUp bool) error SaveCheck(siteID int, latencyNs int64, isUp bool) error
SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error
LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error)
// Nodes
RegisterNode(node models.ProbeNode) error
GetNode(id string) (models.ProbeNode, error)
GetAllNodes() ([]models.ProbeNode, error)
UpdateNodeLastSeen(id string) error
DeleteNode(id string) error
// Backup & Restore // Backup & Restore
ExportData() (models.Backup, error) ExportData() (models.Backup, error)
ImportData(data models.Backup) error ImportData(data models.Backup) error
+96
View File
@@ -0,0 +1,96 @@
package tui
import (
"fmt"
"go-upkeep/internal/models"
"strings"
"time"
)
func (m Model) viewNodesTab() string {
if len(m.nodes) == 0 {
return "\n No probe nodes connected."
}
colWidths := []int{0, 12, 20, 10, 8}
return m.renderTable(
[]string{"NAME", "REGION", "LAST SEEN", "VERSION", "STATUS"},
len(m.nodes),
func(start, end int) [][]string {
var rows [][]string
for i := start; i < end; i++ {
node := m.nodes[i]
name := limitStr(node.Name, 20)
if name == "" {
name = node.ID
}
region := node.Region
if region == "" {
region = subtleStyle.Render("—")
}
lastSeen := fmtNodeLastSeen(node.LastSeen)
version := node.Version
if version == "" {
version = subtleStyle.Render("—")
}
status := fmtNodeStatus(node.LastSeen)
rows = append(rows, []string{name, region, lastSeen, version, status})
}
return rows
},
colWidths,
nil,
)
}
func fmtNodeStatus(lastSeen time.Time) string {
if lastSeen.IsZero() {
return subtleStyle.Render("UNKNOWN")
}
ago := time.Since(lastSeen)
if ago < 60*time.Second {
return specialStyle.Render("ONLINE")
}
if ago < 5*time.Minute {
return warnStyle.Render("STALE")
}
return dangerStyle.Render("OFFLINE")
}
func fmtNodeLastSeen(t time.Time) string {
if t.IsZero() {
return subtleStyle.Render("never")
}
ago := time.Since(t)
if ago < time.Minute {
return fmt.Sprintf("%ds ago", int(ago.Seconds()))
}
if ago < time.Hour {
return fmt.Sprintf("%dm ago", int(ago.Minutes()))
}
return fmt.Sprintf("%dh ago", int(ago.Hours()))
}
func fmtProbeRegions(site models.Site, probeResults map[string]probeStatus) string {
if len(probeResults) == 0 {
return subtleStyle.Render("—")
}
var parts []string
for region, status := range probeResults {
short := region
if len(short) > 6 {
short = short[:6]
}
if status.isUp {
parts = append(parts, specialStyle.Render(short+":UP"))
} else {
parts = append(parts, dangerStyle.Render(short+":DN"))
}
}
return strings.Join(parts, " ")
}
type probeStatus struct {
isUp bool
}
+7
View File
@@ -37,6 +37,7 @@ type siteFormData struct {
Description string Description string
IgnoreTLS bool IgnoreTLS bool
GroupID string GroupID string
Regions string
} }
func latencySparkline(latencies []time.Duration, width int) string { func latencySparkline(latencies []time.Duration, width int) string {
@@ -309,6 +310,7 @@ func (m *Model) initSiteHuhForm() tea.Cmd {
m.siteFormData.GroupID = strconv.Itoa(site.ParentID) m.siteFormData.GroupID = strconv.Itoa(site.ParentID)
m.siteFormData.Method = site.Method m.siteFormData.Method = site.Method
m.siteFormData.AcceptedCodes = site.AcceptedCodes m.siteFormData.AcceptedCodes = site.AcceptedCodes
m.siteFormData.Regions = site.Regions
break break
} }
} }
@@ -435,6 +437,10 @@ func (m *Model) initSiteHuhForm() tea.Cmd {
huh.NewInput().Title("Description"). huh.NewInput().Title("Description").
Placeholder("Optional description"). Placeholder("Optional description").
Value(&m.siteFormData.Description), Value(&m.siteFormData.Description),
huh.NewInput().Title("Probe Regions").
Placeholder("us-east, eu-west (empty = all)").
Description("Comma-separated regions for distributed probing").
Value(&m.siteFormData.Regions),
).Title("Connection").WithHideFunc(func() bool { ).Title("Connection").WithHideFunc(func() bool {
return m.siteFormData.SiteType == "group" return m.siteFormData.SiteType == "group"
}), }),
@@ -529,6 +535,7 @@ func (m *Model) submitSiteForm() {
ParentID: groupID, ParentID: groupID,
Method: d.Method, Method: d.Method,
AcceptedCodes: d.AcceptedCodes, AcceptedCodes: d.AcceptedCodes,
Regions: d.Regions,
} }
if m.editID > 0 { if m.editID > 0 {
+29 -16
View File
@@ -80,6 +80,7 @@ type Model struct {
sites []models.Site sites []models.Site
alerts []models.AlertConfig alerts []models.AlertConfig
users []models.User users []models.User
nodes []models.ProbeNode
} }
func InitialModel(isAdmin bool, s store.Store, eng *monitor.Engine) Model { func InitialModel(isAdmin bool, s store.Store, eng *monitor.Engine) Model {
@@ -131,12 +132,12 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} }
m.refreshData() m.refreshData()
m.state = stateDashboard m.state = stateDashboard
if m.deleteTab == 3 { if m.deleteTab == 4 {
m.state = stateUsers m.state = stateUsers
} }
case "n", "N", "esc": case "n", "N", "esc":
m.state = stateDashboard m.state = stateDashboard
if m.deleteTab == 3 { if m.deleteTab == 4 {
m.state = stateUsers m.state = stateUsers
} }
case "ctrl+c": case "ctrl+c":
@@ -155,7 +156,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
if keyMsg.String() == "esc" { if keyMsg.String() == "esc" {
m.huhForm = nil m.huhForm = nil
m.state = stateDashboard m.state = stateDashboard
if m.currentTab == 3 { if m.currentTab == 4 {
m.state = stateUsers m.state = stateUsers
} }
return m, nil return m, nil
@@ -214,6 +215,8 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
if m.currentTab == 1 { if m.currentTab == 1 {
listLen = len(m.alerts) listLen = len(m.alerts)
} else if m.currentTab == 3 { } else if m.currentTab == 3 {
listLen = len(m.nodes)
} else if m.currentTab == 4 {
listLen = len(m.users) listLen = len(m.users)
} }
if msg.Button == tea.MouseButtonWheelUp { if msg.Button == tea.MouseButtonWheelUp {
@@ -273,6 +276,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
max = len(m.alerts) - 1 max = len(m.alerts) - 1
} }
if m.currentTab == 3 { if m.currentTab == 3 {
max = len(m.nodes) - 1
}
if m.currentTab == 4 {
max = len(m.users) - 1 max = len(m.users) - 1
} }
if m.cursor < max { if m.cursor < max {
@@ -291,7 +297,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} else if m.currentTab == 1 { } else if m.currentTab == 1 {
m.state = stateFormAlert m.state = stateFormAlert
return m, m.initAlertHuhForm() return m, m.initAlertHuhForm()
} else if m.currentTab == 3 && m.isAdmin { } else if m.currentTab == 4 && m.isAdmin {
m.state = stateFormUser m.state = stateFormUser
return m, m.initUserHuhForm() return m, m.initUserHuhForm()
} }
@@ -305,7 +311,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.editID = m.alerts[m.cursor].ID m.editID = m.alerts[m.cursor].ID
m.state = stateFormAlert m.state = stateFormAlert
return m, m.initAlertHuhForm() return m, m.initAlertHuhForm()
} else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 { } else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 {
m.editID = m.users[m.cursor].ID m.editID = m.users[m.cursor].ID
m.state = stateFormUser m.state = stateFormUser
return m, m.initUserHuhForm() return m, m.initUserHuhForm()
@@ -335,10 +341,10 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.deleteName = m.alerts[m.cursor].Name m.deleteName = m.alerts[m.cursor].Name
m.deleteTab = 1 m.deleteTab = 1
m.state = stateConfirmDelete m.state = stateConfirmDelete
} else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 { } else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 {
m.deleteID = m.users[m.cursor].ID m.deleteID = m.users[m.cursor].ID
m.deleteName = m.users[m.cursor].Username m.deleteName = m.users[m.cursor].Username
m.deleteTab = 3 m.deleteTab = 4
m.state = stateConfirmDelete m.state = stateConfirmDelete
} }
} }
@@ -348,9 +354,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} }
func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) { func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
tabCount := 3 tabCount := 4
if m.isAdmin { if m.isAdmin {
tabCount = 4 tabCount = 5
} }
for i := 0; i < tabCount; i++ { for i := 0; i < tabCount; i++ {
if m.zones.Get(fmt.Sprintf("tab-%d", i)).InBounds(msg) { if m.zones.Get(fmt.Sprintf("tab-%d", i)).InBounds(msg) {
@@ -385,7 +391,7 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
} }
} }
if m.currentTab == 3 { if m.currentTab == 4 {
end := m.tableOffset + m.maxTableRows end := m.tableOffset + m.maxTableRows
if end > len(m.users) { if end > len(m.users) {
end = len(m.users) end = len(m.users)
@@ -402,9 +408,9 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) {
} }
func (m *Model) switchTab(idx int) { func (m *Model) switchTab(idx int) {
maxTabs := 2 maxTabs := 3
if m.isAdmin { if m.isAdmin {
maxTabs = 3 maxTabs = 4
} }
if idx > maxTabs { if idx > maxTabs {
idx = 0 idx = 0
@@ -415,7 +421,7 @@ func (m *Model) switchTab(idx int) {
switch idx { switch idx {
case 2: case 2:
m.state = stateLogs m.state = stateLogs
case 3: case 4:
m.state = stateUsers m.state = stateUsers
default: default:
m.state = stateDashboard m.state = stateDashboard
@@ -473,12 +479,17 @@ func (m *Model) refreshData() {
m.users = users m.users = users
} }
} }
if nodes, err := m.store.GetAllNodes(); err == nil {
m.nodes = nodes
}
m.logViewport.SetContent(strings.Join(m.engine.GetLogs(), "\n")) m.logViewport.SetContent(strings.Join(m.engine.GetLogs(), "\n"))
listLen := len(m.sites) listLen := len(m.sites)
if m.currentTab == 1 { if m.currentTab == 1 {
listLen = len(m.alerts) listLen = len(m.alerts)
} else if m.currentTab == 3 { } else if m.currentTab == 3 {
listLen = len(m.nodes)
} else if m.currentTab == 4 {
listLen = len(m.users) listLen = len(m.users)
} }
if listLen > 0 && m.cursor >= listLen { if listLen > 0 && m.cursor >= listLen {
@@ -522,7 +533,7 @@ func (m Model) View() string {
kind := "monitor" kind := "monitor"
if m.deleteTab == 1 { if m.deleteTab == 1 {
kind = "alert" kind = "alert"
} else if m.deleteTab == 3 { } else if m.deleteTab == 4 {
kind = "user" kind = "user"
} }
msg := dangerStyle.Render(fmt.Sprintf("Delete %s \"%s\"?", kind, m.deleteName)) msg := dangerStyle.Render(fmt.Sprintf("Delete %s \"%s\"?", kind, m.deleteName))
@@ -559,7 +570,7 @@ func (m Model) View() string {
} }
func (m Model) viewDashboard() string { func (m Model) viewDashboard() string {
tabs := []string{"Sites", "Alerts", "Logs"} tabs := []string{"Sites", "Alerts", "Logs", "Nodes"}
if m.isAdmin { if m.isAdmin {
tabs = append(tabs, "Users") tabs = append(tabs, "Users")
} }
@@ -587,13 +598,15 @@ func (m Model) viewDashboard() string {
case 2: case 2:
content = m.viewLogsTab() content = m.viewLogsTab()
case 3: case 3:
content = m.viewNodesTab()
case 4:
if m.isAdmin { if m.isAdmin {
content = m.viewUsersTab() content = m.viewUsersTab()
} }
} }
footer := subtleStyle.Render("\n[n] New [e/Enter] Edit [d] Delete [p] Pause [Space] Collapse [Tab/Click] Switch [q] Quit") footer := subtleStyle.Render("\n[n] New [e/Enter] Edit [d] Delete [p] Pause [Space] Collapse [Tab/Click] Switch [q] Quit")
if m.currentTab == 3 { if m.currentTab == 4 {
footer = subtleStyle.Render("\n[n] Add User [d] Revoke [Tab/Click] Switch [Ctrl+L] Clear [q] Quit") footer = subtleStyle.Render("\n[n] Add User [d] Revoke [Tab/Click] Switch [Ctrl+L] Clear [q] Quit")
} }
s := lipgloss.NewStyle().Padding(1, 2) s := lipgloss.NewStyle().Padding(1, 2)