diff --git a/cmd/goupkeep/main.go b/cmd/goupkeep/main.go index 6d72682..38dde72 100644 --- a/cmd/goupkeep/main.go +++ b/cmd/goupkeep/main.go @@ -166,6 +166,44 @@ func runServe(args []string) { clusterKey = v } + nodeID := os.Getenv("UPKEEP_NODE_ID") + nodeName := os.Getenv("UPKEEP_NODE_NAME") + nodeRegion := os.Getenv("UPKEEP_NODE_REGION") + aggStrategy := os.Getenv("UPKEEP_AGG_STRATEGY") + + if clusterMode == "probe" { + if nodeID == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_NODE_ID is required for probe mode") + os.Exit(1) + } + if clusterPeer == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_PEER_URL is required for probe mode") + os.Exit(1) + } + + fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", nodeID, nodeRegion) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-done + cancel() + }() + + if err := cluster.RunProbe(ctx, cluster.ProbeConfig{ + NodeID: nodeID, + NodeName: nodeName, + Region: nodeRegion, + LeaderURL: clusterPeer, + SharedKey: clusterKey, + Interval: 30, + }); err != nil { + fmt.Fprintf(os.Stderr, "Probe error: %v\n", err) + } + return + } + fs := flag.NewFlagSet("serve", flag.ExitOnError) port := fs.Int("port", portVal, "SSH Port") flagDBType := fs.String("db-type", dbType, "Database type") @@ -214,6 +252,9 @@ func runServe(args []string) { if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" { eng.SetInsecureSkipVerify(true) } + if aggStrategy != "" { + eng.SetAggStrategy(monitor.AggregationStrategy(aggStrategy)) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/docker-compose.probe.yml b/docker-compose.probe.yml new file mode 100644 index 0000000..791811f --- /dev/null +++ b/docker-compose.probe.yml @@ -0,0 +1,35 @@ +services: + leader: + build: . + environment: + - UPKEEP_CLUSTER_MODE=leader + - UPKEEP_CLUSTER_SECRET=changeme + - UPKEEP_AGG_STRATEGY=any-down + - UPKEEP_STATUS_ENABLED=true + ports: + - "8080:8080" + - "23234:23234" + + probe-us-east: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=us-east-1 + - UPKEEP_NODE_NAME=US East Probe + - UPKEEP_NODE_REGION=us-east + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader + + probe-eu-west: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=eu-west-1 + - UPKEEP_NODE_NAME=EU West Probe + - UPKEEP_NODE_REGION=eu-west + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index f986c29..03ec751 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -33,6 +33,8 @@ func Start(ctx context.Context, cfg Config, eng *monitor.Engine) { eng.SetActive(false) go runFollowerLoop(ctx, cfg, eng) } + + // "probe" mode is handled directly in main.go before cluster.Start is called } func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) { diff --git a/internal/cluster/probe.go b/internal/cluster/probe.go new file mode 100644 index 0000000..6df0a36 --- /dev/null +++ b/internal/cluster/probe.go @@ -0,0 +1,187 @@ +package cluster + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/monitor" + "log" + "net/http" + "sync" + "time" +) + +type ProbeConfig struct { + NodeID string + NodeName string + Region string + LeaderURL string + SharedKey string + Interval int +} + +func RunProbe(ctx context.Context, cfg ProbeConfig) error { + if cfg.Interval < 10 { + cfg.Interval = 30 + } + + apiClient := &http.Client{Timeout: 10 * time.Second} + strictClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, + } + insecureClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + } + + if err := probeRegister(ctx, apiClient, cfg); err != nil { + log.Printf("Probe: initial registration failed: %v (will retry)", err) + } + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + sites, err := probeFetchAssignments(ctx, apiClient, cfg) + if err != nil { + log.Printf("Probe: failed to fetch assignments: %v", err) + sleepCtx(ctx, 10*time.Second) + continue + } + + if len(sites) == 0 { + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + continue + } + + results := probeExecuteChecks(ctx, sites, strictClient, insecureClient) + + if len(results) > 0 { + if err := probeReportResults(ctx, apiClient, cfg, results); err != nil { + log.Printf("Probe: failed to report results: %v", err) + } + } + + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + } +} + +func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error { + body, _ := json.Marshal(map[string]string{ + "id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe", + }) + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("register returned %d", resp.StatusCode) + } + return nil +} + +func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) { + req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, fmt.Errorf("assignments returned %d", resp.StatusCode) + } + var result struct { + Sites []models.Site `json:"sites"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, err + } + return result.Sites, nil +} + +type probeResultItem struct { + SiteID int `json:"site_id"` + LatencyNs int64 `json:"latency_ns"` + IsUp bool `json:"is_up"` +} + +func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem { + var mu sync.Mutex + var results []probeResultItem + sem := make(chan struct{}, 10) + var wg sync.WaitGroup + + for _, site := range sites { + select { + case <-ctx.Done(): + break + default: + } + wg.Add(1) + sem <- struct{}{} + go func(s models.Site) { + defer wg.Done() + defer func() { <-sem }() + + cr := monitor.RunCheck(s, strict, insecure, false) + mu.Lock() + results = append(results, probeResultItem{ + SiteID: s.ID, + LatencyNs: cr.LatencyNs, + IsUp: cr.Status == "UP", + }) + mu.Unlock() + }(site) + } + wg.Wait() + return results +} + +func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error { + body, err := json.Marshal(map[string]interface{}{ + "node_id": cfg.NodeID, + "results": results, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("results returned %d", resp.StatusCode) + } + fmt.Printf("Probe: reported %d check results\n", len(results)) + return nil +} + +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-time.After(d): + case <-ctx.Done(): + } +} diff --git a/internal/config/apply.go b/internal/config/apply.go index 2c81abf..ca37e36 100644 --- a/internal/config/apply.go +++ b/internal/config/apply.go @@ -239,6 +239,7 @@ func monitorToSite(m Monitor, alertID, parentID int) models.Site { DNSServer: m.DNSServer, IgnoreTLS: m.IgnoreTLS, Paused: m.Paused, + Regions: m.Regions, } s.ExpiryThreshold = m.ExpiryThreshold @@ -346,6 +347,9 @@ func diffSite(existing, desired models.Site) string { if existing.Paused != desired.Paused { diffs = append(diffs, fmt.Sprintf("paused: %v -> %v", existing.Paused, desired.Paused)) } + if existing.Regions != desired.Regions { + diffs = append(diffs, fmt.Sprintf("regions: %s -> %s", existing.Regions, desired.Regions)) + } return strings.Join(diffs, ", ") } diff --git a/internal/config/export.go b/internal/config/export.go index a6d182d..a8cb981 100644 --- a/internal/config/export.go +++ b/internal/config/export.go @@ -126,6 +126,10 @@ func siteToMonitor(s models.Site, alertIDToName map[int]string) Monitor { m.IgnoreTLS = s.IgnoreTLS m.Paused = s.Paused + if s.Regions != "" { + m.Regions = s.Regions + } + return m } diff --git a/internal/config/types.go b/internal/config/types.go index ed0895f..8613d7f 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -30,5 +30,6 @@ type Monitor struct { DNSServer string `yaml:"dns_server,omitempty"` IgnoreTLS bool `yaml:"ignore_tls,omitempty"` Paused bool `yaml:"paused,omitempty"` + Regions string `yaml:"regions,omitempty"` Monitors []Monitor `yaml:"monitors,omitempty"` } diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index 24f4faa..a85493f 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -74,6 +74,19 @@ func Handler(eng *monitor.Engine) http.HandlerFunc { writeGauge(&b, "upkeep_monitor_checks_up_total", labels(s), float64(h.UpChecks)) } + writeHelp(&b, "upkeep_probe_up", "gauge", "Whether a probe node is online (1) or offline (0) based on last-seen time.") + for _, site := range sites { + probeResults := eng.GetProbeResults(site.ID) + for nodeID, result := range probeResults { + val := 0 + if result.IsUp { + val = 1 + } + nodeLabels := fmt.Sprintf(`id="%d",name="%s",node="%s"`, site.ID, escapeLabelValue(site.Name), escapeLabelValue(nodeID)) + writeGauge(&b, "upkeep_probe_up", nodeLabels, float64(val)) + } + } + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") w.Write([]byte(b.String())) } diff --git a/internal/metrics/prometheus_test.go b/internal/metrics/prometheus_test.go index 091a5df..cd86d26 100644 --- a/internal/metrics/prometheus_test.go +++ b/internal/metrics/prometheus_test.go @@ -44,6 +44,12 @@ func (m *mockStore) AddSiteReturningID(models.Site) (int, error) { return 0, nil func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) { return 0, nil } +func (m *mockStore) SaveCheckFromNode(int, string, int64, bool) error { return nil } +func (m *mockStore) RegisterNode(models.ProbeNode) error { return nil } +func (m *mockStore) GetNode(string) (models.ProbeNode, error) { return models.ProbeNode{}, nil } +func (m *mockStore) GetAllNodes() ([]models.ProbeNode, error) { return nil, nil } +func (m *mockStore) UpdateNodeLastSeen(string) error { return nil } +func (m *mockStore) DeleteNode(string) error { return nil } func TestMetricsHandler(t *testing.T) { ms := &mockStore{ diff --git a/internal/models/models.go b/internal/models/models.go index cdf4c68..14a97c3 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -25,6 +25,7 @@ type Site struct { DNSServer string IgnoreTLS bool Paused bool + Regions string FailureCount int Status string @@ -52,11 +53,20 @@ type User struct { type CheckRecord struct { SiteID int + NodeID string LatencyNs int64 IsUp bool CheckedAt time.Time } +type ProbeNode struct { + ID string + Name string + Region string + LastSeen time.Time + Version string +} + type Backup struct { Sites []Site `json:"sites"` Alerts []AlertConfig `json:"alerts"` diff --git a/internal/monitor/aggregator.go b/internal/monitor/aggregator.go new file mode 100644 index 0000000..88054c8 --- /dev/null +++ b/internal/monitor/aggregator.go @@ -0,0 +1,44 @@ +package monitor + +import "time" + +type AggregationStrategy string + +const ( + AggAnyDown AggregationStrategy = "any-down" + AggMajorityDown AggregationStrategy = "majority-down" + AggAllDown AggregationStrategy = "all-down" +) + +type NodeResult struct { + NodeID string + IsUp bool + LatencyNs int64 + CheckedAt time.Time +} + +func AggregateStatus(results []NodeResult, strategy AggregationStrategy) (isUp bool, avgLatencyNs int64) { + if len(results) == 0 { + return true, 0 + } + + upCount := 0 + var totalLatency int64 + for _, r := range results { + if r.IsUp { + upCount++ + } + totalLatency += r.LatencyNs + } + avgLatencyNs = totalLatency / int64(len(results)) + + switch strategy { + case AggMajorityDown: + isUp = upCount > len(results)/2 + case AggAllDown: + isUp = upCount > 0 + default: + isUp = upCount == len(results) + } + return +} diff --git a/internal/monitor/checker.go b/internal/monitor/checker.go new file mode 100644 index 0000000..be62155 --- /dev/null +++ b/internal/monitor/checker.go @@ -0,0 +1,218 @@ +package monitor + +import ( + "context" + "go-upkeep/internal/models" + "net" + "net/http" + "strconv" + "strings" + "time" + + "github.com/miekg/dns" + probing "github.com/prometheus-community/pro-bing" +) + +type CheckResult struct { + SiteID int + Status string // "UP", "DOWN", "SSL EXP" + StatusCode int + LatencyNs int64 + HasSSL bool + CertExpiry time.Time +} + +func RunCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + switch site.Type { + case "http": + return runHTTPCheck(site, strict, insecure, globalInsecure) + case "ping": + return runPingCheck(site) + case "port": + return runPortCheck(site) + case "dns": + return runDNSCheck(site) + default: + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } +} + +func runHTTPCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + method := site.Method + if method == "" { + method = "GET" + } + + timeout := siteTimeout(site) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, method, site.URL, nil) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + + client := strict + if globalInsecure || site.IgnoreTLS { + client = insecure + } + + start := time.Now() + resp, err := client.Do(req) + latency := time.Since(start) + + result := CheckResult{ + SiteID: site.ID, + Status: "UP", + LatencyNs: latency.Nanoseconds(), + } + + if err != nil { + result.Status = "DOWN" + return result + } + defer resp.Body.Close() + + result.StatusCode = resp.StatusCode + if !isCodeAccepted(resp.StatusCode, site.AcceptedCodes) { + result.Status = "DOWN" + } + + if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { + result.HasSSL = true + cert := resp.TLS.PeerCertificates[0] + result.CertExpiry = cert.NotAfter + if time.Now().After(cert.NotAfter) { + result.Status = "SSL EXP" + } + } + + return result +} + +func runPingCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + pinger, err := probing.NewPinger(host) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + pinger.Count = 1 + pinger.Timeout = siteTimeout(site) + pinger.SetPrivileged(false) + + start := time.Now() + err = pinger.Run() + latency := time.Since(start) + + if err != nil || pinger.Statistics().PacketsRecv == 0 { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + + stats := pinger.Statistics() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: stats.AvgRtt.Nanoseconds()} +} + +func runPortCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) + timeout := siteTimeout(site) + + start := time.Now() + conn, err := net.DialTimeout("tcp", addr, timeout) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + conn.Close() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func runDNSCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + server := site.DNSServer + if server == "" { + server = "1.1.1.1" + } + if _, _, err := net.SplitHostPort(server); err != nil { + server = net.JoinHostPort(server, "53") + } + + qtype := dns.TypeA + switch site.DNSResolveType { + case "AAAA": + qtype = dns.TypeAAAA + case "MX": + qtype = dns.TypeMX + case "CNAME": + qtype = dns.TypeCNAME + case "TXT": + qtype = dns.TypeTXT + case "NS": + qtype = dns.TypeNS + case "SOA": + qtype = dns.TypeSOA + case "SRV": + qtype = dns.TypeSRV + case "PTR": + qtype = dns.TypePTR + } + + m := new(dns.Msg) + m.SetQuestion(dns.Fqdn(host), qtype) + + c := new(dns.Client) + c.Timeout = siteTimeout(site) + + start := time.Now() + r, _, err := c.Exchange(m, server) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + if r.Rcode != dns.RcodeSuccess { + return CheckResult{SiteID: site.ID, Status: "DOWN", StatusCode: r.Rcode, LatencyNs: latency.Nanoseconds()} + } + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func siteTimeout(site models.Site) time.Duration { + if site.Timeout > 0 { + return time.Duration(site.Timeout) * time.Second + } + return 5 * time.Second +} + +func isCodeAccepted(code int, accepted string) bool { + if accepted == "" { + return code >= 200 && code < 300 + } + for _, part := range strings.Split(accepted, ",") { + part = strings.TrimSpace(part) + if strings.Contains(part, "-") { + bounds := strings.SplitN(part, "-", 2) + lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0])) + hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1])) + if err1 == nil && err2 == nil && code >= lo && code <= hi { + return true + } + } else { + if v, err := strconv.Atoi(part); err == nil && code == v { + return true + } + } + } + return false +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 2d2af10..ddabb74 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -7,15 +7,9 @@ import ( "go-upkeep/internal/alert" "go-upkeep/internal/models" "go-upkeep/internal/store" - "net" "net/http" - "strconv" - "strings" "sync" "time" - - "github.com/miekg/dns" - probing "github.com/prometheus-community/pro-bing" ) type Engine struct { @@ -33,6 +27,10 @@ type Engine struct { tokenIndex map[string]int + probeResultsMu sync.RWMutex + probeResults map[int]map[string]NodeResult + aggStrategy AggregationStrategy + db store.Store insecureSkipVerify bool strictClient *http.Client @@ -41,11 +39,13 @@ type Engine struct { func NewEngine(s store.Store) *Engine { return &Engine{ - liveState: make(map[int]models.Site), - histories: make(map[int]*SiteHistory), - tokenIndex: make(map[string]int), - isActive: true, - db: s, + liveState: make(map[int]models.Site), + histories: make(map[int]*SiteHistory), + tokenIndex: make(map[string]int), + probeResults: make(map[int]map[string]NodeResult), + aggStrategy: AggAnyDown, + isActive: true, + db: s, strictClient: &http.Client{ Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, }, @@ -310,19 +310,20 @@ func (e *Engine) checkByID(id int) { if !exists || site.Paused { return } + switch site.Type { - case "http": - e.checkHTTP(site) case "push": e.checkPush(site) - case "ping": - e.checkPing(site) - case "port": - e.checkPort(site) - case "dns": - e.checkDNS(site) case "group": e.checkGroup(site) + default: + result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify) + updatedSite := site + updatedSite.HasSSL = result.HasSSL + updatedSite.CertExpiry = result.CertExpiry + updatedSite.Latency = time.Duration(result.LatencyNs) + updatedSite.LastCheck = time.Now() + e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs)) } } @@ -335,61 +336,6 @@ func (e *Engine) checkPush(site models.Site) { } } -func (e *Engine) checkHTTP(site models.Site) { - method := site.Method - if method == "" { - method = "GET" - } - - timeout := siteTimeout(site) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, method, site.URL, nil) - if err != nil { - e.handleStatusChange(site, "DOWN", 0, 0) - return - } - - client := e.strictClient - if e.insecureSkipVerify || site.IgnoreTLS { - client = e.insecureClient - } - - start := time.Now() - resp, err := client.Do(req) - latency := time.Since(start) - - rawStatus := "UP" - rawCode := 0 - var certExpiry time.Time - hasSSL := false - - if err != nil { - rawStatus = "DOWN" - } else { - defer resp.Body.Close() - rawCode = resp.StatusCode - if !isCodeAccepted(rawCode, site.AcceptedCodes) { - rawStatus = "DOWN" - } - if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { - hasSSL = true - cert := resp.TLS.PeerCertificates[0] - certExpiry = cert.NotAfter - if time.Now().After(cert.NotAfter) { - rawStatus = "SSL EXP" - } - } - } - updatedSite := site - updatedSite.HasSSL = hasSSL - updatedSite.CertExpiry = certExpiry - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, rawStatus, rawCode, latency) -} - func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) { if !e.IsActive() { return @@ -462,94 +408,6 @@ func (e *Engine) triggerAlert(alertID int, title, message string) { } } -func siteTimeout(site models.Site) time.Duration { - if site.Timeout > 0 { - return time.Duration(site.Timeout) * time.Second - } - return 5 * time.Second -} - -func isCodeAccepted(code int, accepted string) bool { - if accepted == "" { - return code >= 200 && code < 300 - } - for _, part := range strings.Split(accepted, ",") { - part = strings.TrimSpace(part) - if strings.Contains(part, "-") { - bounds := strings.SplitN(part, "-", 2) - lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0])) - hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1])) - if err1 == nil && err2 == nil && code >= lo && code <= hi { - return true - } - } else { - if v, err := strconv.Atoi(part); err == nil && code == v { - return true - } - } - } - return false -} - -func (e *Engine) checkPing(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL - } - - pinger, err := probing.NewPinger(host) - if err != nil { - e.handleStatusChange(site, "DOWN", 0, 0) - e.AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err)) - return - } - pinger.Count = 1 - pinger.Timeout = siteTimeout(site) - pinger.SetPrivileged(false) - - start := time.Now() - err = pinger.Run() - latency := time.Since(start) - - if err != nil || pinger.Statistics().PacketsRecv == 0 { - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - - stats := pinger.Statistics() - updatedSite := site - updatedSite.Latency = stats.AvgRtt - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt) -} - -func (e *Engine) checkPort(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL - } - addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) - timeout := siteTimeout(site) - - start := time.Now() - conn, err := net.DialTimeout("tcp", addr, timeout) - latency := time.Since(start) - - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - - if err != nil { - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - conn.Close() - e.handleStatusChange(updatedSite, "UP", 0, latency) -} - func (e *Engine) checkGroup(site models.Site) { e.mu.RLock() status := "UP" @@ -588,63 +446,54 @@ func (e *Engine) checkGroup(site models.Site) { e.mu.Unlock() } -func (e *Engine) checkDNS(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL +func (e *Engine) SetAggStrategy(strategy AggregationStrategy) { + e.aggStrategy = strategy +} + +func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) { + e.probeResultsMu.Lock() + if e.probeResults[siteID] == nil { + e.probeResults[siteID] = make(map[string]NodeResult) + } + e.probeResults[siteID][nodeID] = NodeResult{ + NodeID: nodeID, + IsUp: isUp, + LatencyNs: latencyNs, + CheckedAt: time.Now(), + } + results := make([]NodeResult, 0, len(e.probeResults[siteID])) + for _, r := range e.probeResults[siteID] { + results = append(results, r) + } + e.probeResultsMu.Unlock() + + aggUp, avgLatency := AggregateStatus(results, e.aggStrategy) + + e.mu.RLock() + site, exists := e.liveState[siteID] + e.mu.RUnlock() + if !exists { + return } - server := site.DNSServer - if server == "" { - server = "1.1.1.1" + rawStatus := "UP" + if !aggUp { + rawStatus = "DOWN" } - if _, _, err := net.SplitHostPort(server); err != nil { - server = net.JoinHostPort(server, "53") - } - - qtype := dns.TypeA - switch site.DNSResolveType { - case "AAAA": - qtype = dns.TypeAAAA - case "MX": - qtype = dns.TypeMX - case "CNAME": - qtype = dns.TypeCNAME - case "TXT": - qtype = dns.TypeTXT - case "NS": - qtype = dns.TypeNS - case "SOA": - qtype = dns.TypeSOA - case "SRV": - qtype = dns.TypeSRV - case "PTR": - qtype = dns.TypePTR - } - - m := new(dns.Msg) - m.SetQuestion(dns.Fqdn(host), qtype) - - c := new(dns.Client) - c.Timeout = siteTimeout(site) - - start := time.Now() - r, _, err := c.Exchange(m, server) - latency := time.Since(start) updatedSite := site - updatedSite.Latency = latency + updatedSite.Latency = time.Duration(avgLatency) updatedSite.LastCheck = time.Now() - - if err != nil { - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - - if r.Rcode != dns.RcodeSuccess { - e.handleStatusChange(updatedSite, "DOWN", r.Rcode, latency) - return - } - - e.handleStatusChange(updatedSite, "UP", 0, latency) + e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency)) +} + +func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult { + e.probeResultsMu.RLock() + defer e.probeResultsMu.RUnlock() + src := e.probeResults[siteID] + cp := make(map[string]NodeResult, len(src)) + for k, v := range src { + cp[k] = v + } + return cp } diff --git a/internal/server/server.go b/internal/server/server.go index fdf7f9b..6f70df1 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -12,6 +12,7 @@ import ( "log" "net/http" "sort" + "strings" ) var statusTpl = template.Must(template.New("status").Parse(` @@ -243,10 +244,117 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) { w.Write([]byte(fmt.Sprintf("Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version))) }) - // 6. Prometheus Metrics + // 6. Probe Registration + mux.HandleFunc("/api/probe/register", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST required", 405) + return + } + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + var req struct { + ID string `json:"id"` + Name string `json:"name"` + Region string `json:"region"` + Version string `json:"version"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", 400) + return + } + if req.ID == "" { + http.Error(w, "id is required", 400) + return + } + if err := s.RegisterNode(models.ProbeNode{ + ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version, + }); err != nil { + log.Printf("Probe register failed: %v", err) + http.Error(w, "Registration failed", 500) + return + } + json.NewEncoder(w).Encode(map[string]bool{"ok": true}) + }) + + // 7. Probe Assignment Fetch + mux.HandleFunc("/api/probe/assignments", func(w http.ResponseWriter, r *http.Request) { + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + nodeID := r.URL.Query().Get("node_id") + var nodeRegion string + if nodeID != "" { + if node, err := s.GetNode(nodeID); err == nil { + nodeRegion = node.Region + } + } + sites := eng.GetAllSites() + var assigned []models.Site + for _, site := range sites { + if site.Paused || site.Type == "push" || site.Type == "group" { + continue + } + if site.Regions != "" && nodeRegion != "" { + matched := false + for _, r := range strings.Split(site.Regions, ",") { + if strings.TrimSpace(r) == nodeRegion { + matched = true + break + } + } + if !matched { + continue + } + } + assigned = append(assigned, site) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned}) + }) + + // 8. Probe Result Submission + mux.HandleFunc("/api/probe/results", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST required", 405) + return + } + if cfg.ClusterKey == "" || r.Header.Get("X-Upkeep-Secret") != cfg.ClusterKey { + http.Error(w, "Unauthorized", 401) + return + } + var req struct { + NodeID string `json:"node_id"` + Results []struct { + SiteID int `json:"site_id"` + LatencyNs int64 `json:"latency_ns"` + IsUp bool `json:"is_up"` + } `json:"results"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", 400) + return + } + if req.NodeID == "" { + http.Error(w, "node_id is required", 400) + return + } + for _, result := range req.Results { + if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil { + log.Printf("Failed to save probe result: %v", err) + } + eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp) + } + s.UpdateNodeLastSeen(req.NodeID) + json.NewEncoder(w).Encode(map[string]bool{"ok": true}) + }) + + // 9. Prometheus Metrics mux.HandleFunc("/metrics", metrics.Handler(eng)) - // 7. Status Page + // 10. Status Page if cfg.EnableStatus { mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) }) mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/store/dialect.go b/internal/store/dialect.go index 4e1ba04..f6e35b2 100644 --- a/internal/store/dialect.go +++ b/internal/store/dialect.go @@ -10,6 +10,7 @@ type Dialect interface { ResetSequenceOnEmpty(db *sql.DB, table string) ImportWipe(tx *sql.Tx) ImportResetSequences(tx *sql.Tx) + UpsertNodeSQL() string } // rewritePlaceholders converts ? markers to $1, $2, etc. for Postgres. diff --git a/internal/store/postgres.go b/internal/store/postgres.go index 78fcc8d..d6e6dbd 100644 --- a/internal/store/postgres.go +++ b/internal/store/postgres.go @@ -44,6 +44,13 @@ func (d *PostgresDialect) CreateTablesSQL() []string { is_up BOOLEAN, checked_at TIMESTAMP DEFAULT NOW() )`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, + `CREATE TABLE IF NOT EXISTS nodes ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + region TEXT DEFAULT '', + last_seen TIMESTAMP DEFAULT NOW(), + version TEXT DEFAULT '' + )`, } } @@ -60,9 +67,15 @@ func (d *PostgresDialect) MigrationsSQL() []string { "ALTER TABLE sites ADD COLUMN IF NOT EXISTS dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS ignore_tls BOOLEAN DEFAULT FALSE", "ALTER TABLE sites ADD COLUMN IF NOT EXISTS paused BOOLEAN DEFAULT FALSE", + "ALTER TABLE check_history ADD COLUMN IF NOT EXISTS node_id TEXT DEFAULT ''", + "ALTER TABLE sites ADD COLUMN IF NOT EXISTS regions TEXT DEFAULT ''", } } +func (d *PostgresDialect) UpsertNodeSQL() string { + return "INSERT INTO nodes (id, name, region, last_seen, version) VALUES ($1, $2, $3, NOW(), $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, region = EXCLUDED.region, last_seen = NOW(), version = EXCLUDED.version" +} + func (d *PostgresDialect) ResetSequenceOnEmpty(db *sql.DB, table string) {} func (d *PostgresDialect) ImportWipe(tx *sql.Tx) { diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index dbeb74d..be7ba1d 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -44,6 +44,13 @@ func (d *SQLiteDialect) CreateTablesSQL() []string { is_up BOOLEAN, checked_at DATETIME DEFAULT CURRENT_TIMESTAMP )`, `CREATE INDEX IF NOT EXISTS idx_check_history_site ON check_history(site_id, checked_at DESC)`, + `CREATE TABLE IF NOT EXISTS nodes ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + region TEXT DEFAULT '', + last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, + version TEXT DEFAULT '' + )`, } } @@ -60,9 +67,15 @@ func (d *SQLiteDialect) MigrationsSQL() []string { "ALTER TABLE sites ADD COLUMN dns_server TEXT DEFAULT ''", "ALTER TABLE sites ADD COLUMN ignore_tls BOOLEAN DEFAULT 0", "ALTER TABLE sites ADD COLUMN paused BOOLEAN DEFAULT 0", + "ALTER TABLE check_history ADD COLUMN node_id TEXT DEFAULT ''", + "ALTER TABLE sites ADD COLUMN regions TEXT DEFAULT ''", } } +func (d *SQLiteDialect) UpsertNodeSQL() string { + return "INSERT OR REPLACE INTO nodes (id, name, region, last_seen, version) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)" +} + func (d *SQLiteDialect) ResetSequenceOnEmpty(db *sql.DB, table string) { var count int db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count) diff --git a/internal/store/sqlstore.go b/internal/store/sqlstore.go index 8adc020..e8b554a 100644 --- a/internal/store/sqlstore.go +++ b/internal/store/sqlstore.go @@ -51,7 +51,7 @@ func (s *SQLStore) Init() error { func (s *SQLStore) GetSites() ([]models.Site, error) { bf := s.dialect.BoolFalse() query := fmt.Sprintf( - "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites", + "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites", bf, bf, ) rows, err := s.db.Query(query) @@ -65,7 +65,7 @@ func (s *SQLStore) GetSites() ([]models.Site, error) { if err := rows.Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID, &st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout, &st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType, - &st.DNSServer, &st.IgnoreTLS, &st.Paused); err != nil { + &st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions); err != nil { return sites, err } sites = append(sites, st) @@ -78,9 +78,9 @@ func (s *SQLStore) AddSite(site models.Site) error { if site.Type == "push" { token = generateToken() } - _, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), + _, err := s.db.Exec(s.q("INSERT INTO sites (name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), site.Name, site.URL, site.Type, token, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries, - site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused) + site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions) return err } @@ -90,9 +90,9 @@ func (s *SQLStore) UpdateSite(site models.Site) error { if site.Type == "push" && existingToken == "" { existingToken = generateToken() } - _, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=? WHERE id=?"), + _, err := s.db.Exec(s.q("UPDATE sites SET name=?, url=?, type=?, token=?, interval=?, alert_id=?, check_ssl=?, threshold=?, max_retries=?, hostname=?, port=?, timeout=?, method=?, description=?, parent_id=?, accepted_codes=?, dns_resolve_type=?, dns_server=?, ignore_tls=?, paused=?, regions=? WHERE id=?"), site.Name, site.URL, site.Type, existingToken, site.Interval, site.AlertID, site.CheckSSL, site.ExpiryThreshold, site.MaxRetries, - site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.ID) + site.Hostname, site.Port, site.Timeout, site.Method, site.Description, site.ParentID, site.AcceptedCodes, site.DNSResolveType, site.DNSServer, site.IgnoreTLS, site.Paused, site.Regions, site.ID) return err } @@ -113,14 +113,14 @@ func (s *SQLStore) DeleteSite(id int) error { func (s *SQLStore) GetSiteByName(name string) (models.Site, error) { bf := s.dialect.BoolFalse() query := fmt.Sprintf( - "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s) FROM sites WHERE name = %s", + "SELECT id, COALESCE(name, url), url, COALESCE(type, 'http'), COALESCE(token, ''), interval, alert_id, check_ssl, threshold, max_retries, COALESCE(hostname, ''), COALESCE(port, 0), COALESCE(timeout, 0), COALESCE(method, 'GET'), COALESCE(description, ''), COALESCE(parent_id, 0), COALESCE(accepted_codes, '200-299'), COALESCE(dns_resolve_type, ''), COALESCE(dns_server, ''), COALESCE(ignore_tls, %s), COALESCE(paused, %s), COALESCE(regions, '') FROM sites WHERE name = %s", bf, bf, s.q("?"), ) var st models.Site err := s.db.QueryRow(query, name).Scan(&st.ID, &st.Name, &st.URL, &st.Type, &st.Token, &st.Interval, &st.AlertID, &st.CheckSSL, &st.ExpiryThreshold, &st.MaxRetries, &st.Hostname, &st.Port, &st.Timeout, &st.Method, &st.Description, &st.ParentID, &st.AcceptedCodes, &st.DNSResolveType, - &st.DNSServer, &st.IgnoreTLS, &st.Paused) + &st.DNSServer, &st.IgnoreTLS, &st.Paused, &st.Regions) return st, err } @@ -247,7 +247,11 @@ func (s *SQLStore) DeleteUser(id int) error { } func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { - _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, latency_ns, is_up) VALUES (?, ?, ?)"), siteID, latencyNs, isUp) + return s.SaveCheckFromNode(siteID, "", latencyNs, isUp) +} + +func (s *SQLStore) SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error { + _, err := s.db.Exec(s.q("INSERT INTO check_history (site_id, node_id, latency_ns, is_up) VALUES (?, ?, ?, ?)"), siteID, nodeID, latencyNs, isUp) if err != nil { return err } @@ -257,6 +261,45 @@ func (s *SQLStore) SaveCheck(siteID int, latencyNs int64, isUp bool) error { return err } +func (s *SQLStore) RegisterNode(node models.ProbeNode) error { + _, err := s.db.Exec(s.dialect.UpsertNodeSQL(), node.ID, node.Name, node.Region, node.Version) + return err +} + +func (s *SQLStore) GetNode(id string) (models.ProbeNode, error) { + var n models.ProbeNode + err := s.db.QueryRow(s.q("SELECT id, name, region, last_seen, version FROM nodes WHERE id = ?"), id). + Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version) + return n, err +} + +func (s *SQLStore) GetAllNodes() ([]models.ProbeNode, error) { + rows, err := s.db.Query("SELECT id, name, region, last_seen, version FROM nodes ORDER BY region, name") + if err != nil { + return nil, err + } + defer rows.Close() + var nodes []models.ProbeNode + for rows.Next() { + var n models.ProbeNode + if err := rows.Scan(&n.ID, &n.Name, &n.Region, &n.LastSeen, &n.Version); err != nil { + return nodes, err + } + nodes = append(nodes, n) + } + return nodes, rows.Err() +} + +func (s *SQLStore) UpdateNodeLastSeen(id string) error { + _, err := s.db.Exec(s.q("UPDATE nodes SET last_seen = CURRENT_TIMESTAMP WHERE id = ?"), id) + return err +} + +func (s *SQLStore) DeleteNode(id string) error { + _, err := s.db.Exec(s.q("DELETE FROM nodes WHERE id = ?"), id) + return err +} + func (s *SQLStore) LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) { result := make(map[int][]models.CheckRecord) rows, err := s.db.Query(s.q(` @@ -325,9 +368,9 @@ func (s *SQLStore) ImportData(data models.Backup) error { } } for _, st := range data.Sites { - if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), + if _, err := tx.Exec(s.q("INSERT INTO sites (id, name, url, type, token, interval, alert_id, check_ssl, threshold, max_retries, hostname, port, timeout, method, description, parent_id, accepted_codes, dns_resolve_type, dns_server, ignore_tls, paused, regions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"), st.ID, st.Name, st.URL, st.Type, st.Token, st.Interval, st.AlertID, st.CheckSSL, st.ExpiryThreshold, st.MaxRetries, - st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused); err != nil { + st.Hostname, st.Port, st.Timeout, st.Method, st.Description, st.ParentID, st.AcceptedCodes, st.DNSResolveType, st.DNSServer, st.IgnoreTLS, st.Paused, st.Regions); err != nil { return err } } diff --git a/internal/store/store.go b/internal/store/store.go index 1ed3c99..1340326 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -35,8 +35,16 @@ type Store interface { // History SaveCheck(siteID int, latencyNs int64, isUp bool) error + SaveCheckFromNode(siteID int, nodeID string, latencyNs int64, isUp bool) error LoadAllHistory(limit int) (map[int][]models.CheckRecord, error) + // Nodes + RegisterNode(node models.ProbeNode) error + GetNode(id string) (models.ProbeNode, error) + GetAllNodes() ([]models.ProbeNode, error) + UpdateNodeLastSeen(id string) error + DeleteNode(id string) error + // Backup & Restore ExportData() (models.Backup, error) ImportData(data models.Backup) error diff --git a/internal/tui/tab_nodes.go b/internal/tui/tab_nodes.go new file mode 100644 index 0000000..60f9522 --- /dev/null +++ b/internal/tui/tab_nodes.go @@ -0,0 +1,96 @@ +package tui + +import ( + "fmt" + "go-upkeep/internal/models" + "strings" + "time" +) + +func (m Model) viewNodesTab() string { + if len(m.nodes) == 0 { + return "\n No probe nodes connected." + } + + colWidths := []int{0, 12, 20, 10, 8} + + return m.renderTable( + []string{"NAME", "REGION", "LAST SEEN", "VERSION", "STATUS"}, + len(m.nodes), + func(start, end int) [][]string { + var rows [][]string + for i := start; i < end; i++ { + node := m.nodes[i] + name := limitStr(node.Name, 20) + if name == "" { + name = node.ID + } + region := node.Region + if region == "" { + region = subtleStyle.Render("—") + } + lastSeen := fmtNodeLastSeen(node.LastSeen) + version := node.Version + if version == "" { + version = subtleStyle.Render("—") + } + status := fmtNodeStatus(node.LastSeen) + rows = append(rows, []string{name, region, lastSeen, version, status}) + } + return rows + }, + colWidths, + nil, + ) +} + +func fmtNodeStatus(lastSeen time.Time) string { + if lastSeen.IsZero() { + return subtleStyle.Render("UNKNOWN") + } + ago := time.Since(lastSeen) + if ago < 60*time.Second { + return specialStyle.Render("ONLINE") + } + if ago < 5*time.Minute { + return warnStyle.Render("STALE") + } + return dangerStyle.Render("OFFLINE") +} + +func fmtNodeLastSeen(t time.Time) string { + if t.IsZero() { + return subtleStyle.Render("never") + } + ago := time.Since(t) + if ago < time.Minute { + return fmt.Sprintf("%ds ago", int(ago.Seconds())) + } + if ago < time.Hour { + return fmt.Sprintf("%dm ago", int(ago.Minutes())) + } + return fmt.Sprintf("%dh ago", int(ago.Hours())) +} + +func fmtProbeRegions(site models.Site, probeResults map[string]probeStatus) string { + if len(probeResults) == 0 { + return subtleStyle.Render("—") + } + var parts []string + for region, status := range probeResults { + short := region + if len(short) > 6 { + short = short[:6] + } + if status.isUp { + parts = append(parts, specialStyle.Render(short+":UP")) + } else { + parts = append(parts, dangerStyle.Render(short+":DN")) + } + } + return strings.Join(parts, " ") +} + +type probeStatus struct { + isUp bool +} diff --git a/internal/tui/tab_sites.go b/internal/tui/tab_sites.go index ad96da5..0672cbc 100644 --- a/internal/tui/tab_sites.go +++ b/internal/tui/tab_sites.go @@ -37,6 +37,7 @@ type siteFormData struct { Description string IgnoreTLS bool GroupID string + Regions string } func latencySparkline(latencies []time.Duration, width int) string { @@ -309,6 +310,7 @@ func (m *Model) initSiteHuhForm() tea.Cmd { m.siteFormData.GroupID = strconv.Itoa(site.ParentID) m.siteFormData.Method = site.Method m.siteFormData.AcceptedCodes = site.AcceptedCodes + m.siteFormData.Regions = site.Regions break } } @@ -435,6 +437,10 @@ func (m *Model) initSiteHuhForm() tea.Cmd { huh.NewInput().Title("Description"). Placeholder("Optional description"). Value(&m.siteFormData.Description), + huh.NewInput().Title("Probe Regions"). + Placeholder("us-east, eu-west (empty = all)"). + Description("Comma-separated regions for distributed probing"). + Value(&m.siteFormData.Regions), ).Title("Connection").WithHideFunc(func() bool { return m.siteFormData.SiteType == "group" }), @@ -529,6 +535,7 @@ func (m *Model) submitSiteForm() { ParentID: groupID, Method: d.Method, AcceptedCodes: d.AcceptedCodes, + Regions: d.Regions, } if m.editID > 0 { diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 89846a5..f188254 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -80,6 +80,7 @@ type Model struct { sites []models.Site alerts []models.AlertConfig users []models.User + nodes []models.ProbeNode } func InitialModel(isAdmin bool, s store.Store, eng *monitor.Engine) Model { @@ -131,12 +132,12 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } m.refreshData() m.state = stateDashboard - if m.deleteTab == 3 { + if m.deleteTab == 4 { m.state = stateUsers } case "n", "N", "esc": m.state = stateDashboard - if m.deleteTab == 3 { + if m.deleteTab == 4 { m.state = stateUsers } case "ctrl+c": @@ -155,7 +156,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { if keyMsg.String() == "esc" { m.huhForm = nil m.state = stateDashboard - if m.currentTab == 3 { + if m.currentTab == 4 { m.state = stateUsers } return m, nil @@ -214,6 +215,8 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { if m.currentTab == 1 { listLen = len(m.alerts) } else if m.currentTab == 3 { + listLen = len(m.nodes) + } else if m.currentTab == 4 { listLen = len(m.users) } if msg.Button == tea.MouseButtonWheelUp { @@ -273,6 +276,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { max = len(m.alerts) - 1 } if m.currentTab == 3 { + max = len(m.nodes) - 1 + } + if m.currentTab == 4 { max = len(m.users) - 1 } if m.cursor < max { @@ -291,7 +297,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } else if m.currentTab == 1 { m.state = stateFormAlert return m, m.initAlertHuhForm() - } else if m.currentTab == 3 && m.isAdmin { + } else if m.currentTab == 4 && m.isAdmin { m.state = stateFormUser return m, m.initUserHuhForm() } @@ -305,7 +311,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.editID = m.alerts[m.cursor].ID m.state = stateFormAlert return m, m.initAlertHuhForm() - } else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 { + } else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 { m.editID = m.users[m.cursor].ID m.state = stateFormUser return m, m.initUserHuhForm() @@ -335,10 +341,10 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.deleteName = m.alerts[m.cursor].Name m.deleteTab = 1 m.state = stateConfirmDelete - } else if m.currentTab == 3 && m.isAdmin && len(m.users) > 0 { + } else if m.currentTab == 4 && m.isAdmin && len(m.users) > 0 { m.deleteID = m.users[m.cursor].ID m.deleteName = m.users[m.cursor].Username - m.deleteTab = 3 + m.deleteTab = 4 m.state = stateConfirmDelete } } @@ -348,9 +354,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) { - tabCount := 3 + tabCount := 4 if m.isAdmin { - tabCount = 4 + tabCount = 5 } for i := 0; i < tabCount; i++ { if m.zones.Get(fmt.Sprintf("tab-%d", i)).InBounds(msg) { @@ -385,7 +391,7 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) { } } - if m.currentTab == 3 { + if m.currentTab == 4 { end := m.tableOffset + m.maxTableRows if end > len(m.users) { end = len(m.users) @@ -402,9 +408,9 @@ func (m *Model) handleClick(msg tea.MouseMsg) (tea.Model, tea.Cmd) { } func (m *Model) switchTab(idx int) { - maxTabs := 2 + maxTabs := 3 if m.isAdmin { - maxTabs = 3 + maxTabs = 4 } if idx > maxTabs { idx = 0 @@ -415,7 +421,7 @@ func (m *Model) switchTab(idx int) { switch idx { case 2: m.state = stateLogs - case 3: + case 4: m.state = stateUsers default: m.state = stateDashboard @@ -473,12 +479,17 @@ func (m *Model) refreshData() { m.users = users } } + if nodes, err := m.store.GetAllNodes(); err == nil { + m.nodes = nodes + } m.logViewport.SetContent(strings.Join(m.engine.GetLogs(), "\n")) listLen := len(m.sites) if m.currentTab == 1 { listLen = len(m.alerts) } else if m.currentTab == 3 { + listLen = len(m.nodes) + } else if m.currentTab == 4 { listLen = len(m.users) } if listLen > 0 && m.cursor >= listLen { @@ -522,7 +533,7 @@ func (m Model) View() string { kind := "monitor" if m.deleteTab == 1 { kind = "alert" - } else if m.deleteTab == 3 { + } else if m.deleteTab == 4 { kind = "user" } msg := dangerStyle.Render(fmt.Sprintf("Delete %s \"%s\"?", kind, m.deleteName)) @@ -559,7 +570,7 @@ func (m Model) View() string { } func (m Model) viewDashboard() string { - tabs := []string{"Sites", "Alerts", "Logs"} + tabs := []string{"Sites", "Alerts", "Logs", "Nodes"} if m.isAdmin { tabs = append(tabs, "Users") } @@ -587,13 +598,15 @@ func (m Model) viewDashboard() string { case 2: content = m.viewLogsTab() case 3: + content = m.viewNodesTab() + case 4: if m.isAdmin { content = m.viewUsersTab() } } footer := subtleStyle.Render("\n[n] New [e/Enter] Edit [d] Delete [p] Pause [Space] Collapse [Tab/Click] Switch [q] Quit") - if m.currentTab == 3 { + if m.currentTab == 4 { footer = subtleStyle.Render("\n[n] Add User [d] Revoke [Tab/Click] Switch [Ctrl+L] Clear [q] Quit") } s := lipgloss.NewStyle().Padding(1, 2)