diff --git a/cmd/goupkeep/main.go b/cmd/goupkeep/main.go index 6d72682..38dde72 100644 --- a/cmd/goupkeep/main.go +++ b/cmd/goupkeep/main.go @@ -166,6 +166,44 @@ func runServe(args []string) { clusterKey = v } + nodeID := os.Getenv("UPKEEP_NODE_ID") + nodeName := os.Getenv("UPKEEP_NODE_NAME") + nodeRegion := os.Getenv("UPKEEP_NODE_REGION") + aggStrategy := os.Getenv("UPKEEP_AGG_STRATEGY") + + if clusterMode == "probe" { + if nodeID == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_NODE_ID is required for probe mode") + os.Exit(1) + } + if clusterPeer == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_PEER_URL is required for probe mode") + os.Exit(1) + } + + fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", nodeID, nodeRegion) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-done + cancel() + }() + + if err := cluster.RunProbe(ctx, cluster.ProbeConfig{ + NodeID: nodeID, + NodeName: nodeName, + Region: nodeRegion, + LeaderURL: clusterPeer, + SharedKey: clusterKey, + Interval: 30, + }); err != nil { + fmt.Fprintf(os.Stderr, "Probe error: %v\n", err) + } + return + } + fs := flag.NewFlagSet("serve", flag.ExitOnError) port := fs.Int("port", portVal, "SSH Port") flagDBType := fs.String("db-type", dbType, "Database type") @@ -214,6 +252,9 @@ func runServe(args []string) { if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" { eng.SetInsecureSkipVerify(true) } + if aggStrategy != "" { + eng.SetAggStrategy(monitor.AggregationStrategy(aggStrategy)) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/docker-compose.probe.yml b/docker-compose.probe.yml new file mode 100644 index 0000000..791811f --- /dev/null +++ b/docker-compose.probe.yml @@ -0,0 +1,35 @@ +services: + leader: + build: . + environment: + - UPKEEP_CLUSTER_MODE=leader + - UPKEEP_CLUSTER_SECRET=changeme + - UPKEEP_AGG_STRATEGY=any-down + - UPKEEP_STATUS_ENABLED=true + ports: + - "8080:8080" + - "23234:23234" + + probe-us-east: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=us-east-1 + - UPKEEP_NODE_NAME=US East Probe + - UPKEEP_NODE_REGION=us-east + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader + + probe-eu-west: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=eu-west-1 + - UPKEEP_NODE_NAME=EU West Probe + - UPKEEP_NODE_REGION=eu-west + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index f986c29..03ec751 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -33,6 +33,8 @@ func Start(ctx context.Context, cfg Config, eng *monitor.Engine) { eng.SetActive(false) go runFollowerLoop(ctx, cfg, eng) } + + // "probe" mode is handled directly in main.go before cluster.Start is called } func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) { diff --git a/internal/cluster/probe.go b/internal/cluster/probe.go new file mode 100644 index 0000000..6df0a36 --- /dev/null +++ b/internal/cluster/probe.go @@ -0,0 +1,187 @@ +package cluster + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/monitor" + "log" + "net/http" + "sync" + "time" +) + +type ProbeConfig struct { + NodeID string + NodeName string + Region string + LeaderURL string + SharedKey string + Interval int +} + +func RunProbe(ctx context.Context, cfg ProbeConfig) error { + if cfg.Interval < 10 { + cfg.Interval = 30 + } + + apiClient := &http.Client{Timeout: 10 * time.Second} + strictClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, + } + insecureClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + } + + if err := probeRegister(ctx, apiClient, cfg); err != nil { + log.Printf("Probe: initial registration failed: %v (will retry)", err) + } + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + sites, err := probeFetchAssignments(ctx, apiClient, cfg) + if err != nil { + log.Printf("Probe: failed to fetch assignments: %v", err) + sleepCtx(ctx, 10*time.Second) + continue + } + + if len(sites) == 0 { + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + continue + } + + results := probeExecuteChecks(ctx, sites, strictClient, insecureClient) + + if len(results) > 0 { + if err := probeReportResults(ctx, apiClient, cfg, results); err != nil { + log.Printf("Probe: failed to report results: %v", err) + } + } + + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + } +} + +func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error { + body, _ := json.Marshal(map[string]string{ + "id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe", + }) + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("register returned %d", resp.StatusCode) + } + return nil +} + +func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) { + req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, fmt.Errorf("assignments returned %d", resp.StatusCode) + } + var result struct { + Sites []models.Site `json:"sites"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, err + } + return result.Sites, nil +} + +type probeResultItem struct { + SiteID int `json:"site_id"` + LatencyNs int64 `json:"latency_ns"` + IsUp bool `json:"is_up"` +} + +func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem { + var mu sync.Mutex + var results []probeResultItem + sem := make(chan struct{}, 10) + var wg sync.WaitGroup + + for _, site := range sites { + select { + case <-ctx.Done(): + break + default: + } + wg.Add(1) + sem <- struct{}{} + go func(s models.Site) { + defer wg.Done() + defer func() { <-sem }() + + cr := monitor.RunCheck(s, strict, insecure, false) + mu.Lock() + results = append(results, probeResultItem{ + SiteID: s.ID, + LatencyNs: cr.LatencyNs, + IsUp: cr.Status == "UP", + }) + mu.Unlock() + }(site) + } + wg.Wait() + return results +} + +func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error { + body, err := json.Marshal(map[string]interface{}{ + "node_id": cfg.NodeID, + "results": results, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("results returned %d", resp.StatusCode) + } + fmt.Printf("Probe: reported %d check results\n", len(results)) + return nil +} + +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-time.After(d): + case <-ctx.Done(): + } +} diff --git a/internal/monitor/aggregator.go b/internal/monitor/aggregator.go new file mode 100644 index 0000000..88054c8 --- /dev/null +++ b/internal/monitor/aggregator.go @@ -0,0 +1,44 @@ +package monitor + +import "time" + +type AggregationStrategy string + +const ( + AggAnyDown AggregationStrategy = "any-down" + AggMajorityDown AggregationStrategy = "majority-down" + AggAllDown AggregationStrategy = "all-down" +) + +type NodeResult struct { + NodeID string + IsUp bool + LatencyNs int64 + CheckedAt time.Time +} + +func AggregateStatus(results []NodeResult, strategy AggregationStrategy) (isUp bool, avgLatencyNs int64) { + if len(results) == 0 { + return true, 0 + } + + upCount := 0 + var totalLatency int64 + for _, r := range results { + if r.IsUp { + upCount++ + } + totalLatency += r.LatencyNs + } + avgLatencyNs = totalLatency / int64(len(results)) + + switch strategy { + case AggMajorityDown: + isUp = upCount > len(results)/2 + case AggAllDown: + isUp = upCount > 0 + default: + isUp = upCount == len(results) + } + return +} diff --git a/internal/monitor/checker.go b/internal/monitor/checker.go new file mode 100644 index 0000000..be62155 --- /dev/null +++ b/internal/monitor/checker.go @@ -0,0 +1,218 @@ +package monitor + +import ( + "context" + "go-upkeep/internal/models" + "net" + "net/http" + "strconv" + "strings" + "time" + + "github.com/miekg/dns" + probing "github.com/prometheus-community/pro-bing" +) + +type CheckResult struct { + SiteID int + Status string // "UP", "DOWN", "SSL EXP" + StatusCode int + LatencyNs int64 + HasSSL bool + CertExpiry time.Time +} + +func RunCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + switch site.Type { + case "http": + return runHTTPCheck(site, strict, insecure, globalInsecure) + case "ping": + return runPingCheck(site) + case "port": + return runPortCheck(site) + case "dns": + return runDNSCheck(site) + default: + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } +} + +func runHTTPCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + method := site.Method + if method == "" { + method = "GET" + } + + timeout := siteTimeout(site) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, method, site.URL, nil) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + + client := strict + if globalInsecure || site.IgnoreTLS { + client = insecure + } + + start := time.Now() + resp, err := client.Do(req) + latency := time.Since(start) + + result := CheckResult{ + SiteID: site.ID, + Status: "UP", + LatencyNs: latency.Nanoseconds(), + } + + if err != nil { + result.Status = "DOWN" + return result + } + defer resp.Body.Close() + + result.StatusCode = resp.StatusCode + if !isCodeAccepted(resp.StatusCode, site.AcceptedCodes) { + result.Status = "DOWN" + } + + if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { + result.HasSSL = true + cert := resp.TLS.PeerCertificates[0] + result.CertExpiry = cert.NotAfter + if time.Now().After(cert.NotAfter) { + result.Status = "SSL EXP" + } + } + + return result +} + +func runPingCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + pinger, err := probing.NewPinger(host) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + pinger.Count = 1 + pinger.Timeout = siteTimeout(site) + pinger.SetPrivileged(false) + + start := time.Now() + err = pinger.Run() + latency := time.Since(start) + + if err != nil || pinger.Statistics().PacketsRecv == 0 { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + + stats := pinger.Statistics() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: stats.AvgRtt.Nanoseconds()} +} + +func runPortCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) + timeout := siteTimeout(site) + + start := time.Now() + conn, err := net.DialTimeout("tcp", addr, timeout) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + conn.Close() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func runDNSCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + server := site.DNSServer + if server == "" { + server = "1.1.1.1" + } + if _, _, err := net.SplitHostPort(server); err != nil { + server = net.JoinHostPort(server, "53") + } + + qtype := dns.TypeA + switch site.DNSResolveType { + case "AAAA": + qtype = dns.TypeAAAA + case "MX": + qtype = dns.TypeMX + case "CNAME": + qtype = dns.TypeCNAME + case "TXT": + qtype = dns.TypeTXT + case "NS": + qtype = dns.TypeNS + case "SOA": + qtype = dns.TypeSOA + case "SRV": + qtype = dns.TypeSRV + case "PTR": + qtype = dns.TypePTR + } + + m := new(dns.Msg) + m.SetQuestion(dns.Fqdn(host), qtype) + + c := new(dns.Client) + c.Timeout = siteTimeout(site) + + start := time.Now() + r, _, err := c.Exchange(m, server) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + if r.Rcode != dns.RcodeSuccess { + return CheckResult{SiteID: site.ID, Status: "DOWN", StatusCode: r.Rcode, LatencyNs: latency.Nanoseconds()} + } + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func siteTimeout(site models.Site) time.Duration { + if site.Timeout > 0 { + return time.Duration(site.Timeout) * time.Second + } + return 5 * time.Second +} + +func isCodeAccepted(code int, accepted string) bool { + if accepted == "" { + return code >= 200 && code < 300 + } + for _, part := range strings.Split(accepted, ",") { + part = strings.TrimSpace(part) + if strings.Contains(part, "-") { + bounds := strings.SplitN(part, "-", 2) + lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0])) + hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1])) + if err1 == nil && err2 == nil && code >= lo && code <= hi { + return true + } + } else { + if v, err := strconv.Atoi(part); err == nil && code == v { + return true + } + } + } + return false +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 2d2af10..ddabb74 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -7,15 +7,9 @@ import ( "go-upkeep/internal/alert" "go-upkeep/internal/models" "go-upkeep/internal/store" - "net" "net/http" - "strconv" - "strings" "sync" "time" - - "github.com/miekg/dns" - probing "github.com/prometheus-community/pro-bing" ) type Engine struct { @@ -33,6 +27,10 @@ type Engine struct { tokenIndex map[string]int + probeResultsMu sync.RWMutex + probeResults map[int]map[string]NodeResult + aggStrategy AggregationStrategy + db store.Store insecureSkipVerify bool strictClient *http.Client @@ -41,11 +39,13 @@ type Engine struct { func NewEngine(s store.Store) *Engine { return &Engine{ - liveState: make(map[int]models.Site), - histories: make(map[int]*SiteHistory), - tokenIndex: make(map[string]int), - isActive: true, - db: s, + liveState: make(map[int]models.Site), + histories: make(map[int]*SiteHistory), + tokenIndex: make(map[string]int), + probeResults: make(map[int]map[string]NodeResult), + aggStrategy: AggAnyDown, + isActive: true, + db: s, strictClient: &http.Client{ Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, }, @@ -310,19 +310,20 @@ func (e *Engine) checkByID(id int) { if !exists || site.Paused { return } + switch site.Type { - case "http": - e.checkHTTP(site) case "push": e.checkPush(site) - case "ping": - e.checkPing(site) - case "port": - e.checkPort(site) - case "dns": - e.checkDNS(site) case "group": e.checkGroup(site) + default: + result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify) + updatedSite := site + updatedSite.HasSSL = result.HasSSL + updatedSite.CertExpiry = result.CertExpiry + updatedSite.Latency = time.Duration(result.LatencyNs) + updatedSite.LastCheck = time.Now() + e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs)) } } @@ -335,61 +336,6 @@ func (e *Engine) checkPush(site models.Site) { } } -func (e *Engine) checkHTTP(site models.Site) { - method := site.Method - if method == "" { - method = "GET" - } - - timeout := siteTimeout(site) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, method, site.URL, nil) - if err != nil { - e.handleStatusChange(site, "DOWN", 0, 0) - return - } - - client := e.strictClient - if e.insecureSkipVerify || site.IgnoreTLS { - client = e.insecureClient - } - - start := time.Now() - resp, err := client.Do(req) - latency := time.Since(start) - - rawStatus := "UP" - rawCode := 0 - var certExpiry time.Time - hasSSL := false - - if err != nil { - rawStatus = "DOWN" - } else { - defer resp.Body.Close() - rawCode = resp.StatusCode - if !isCodeAccepted(rawCode, site.AcceptedCodes) { - rawStatus = "DOWN" - } - if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { - hasSSL = true - cert := resp.TLS.PeerCertificates[0] - certExpiry = cert.NotAfter - if time.Now().After(cert.NotAfter) { - rawStatus = "SSL EXP" - } - } - } - updatedSite := site - updatedSite.HasSSL = hasSSL - updatedSite.CertExpiry = certExpiry - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, rawStatus, rawCode, latency) -} - func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) { if !e.IsActive() { return @@ -462,94 +408,6 @@ func (e *Engine) triggerAlert(alertID int, title, message string) { } } -func siteTimeout(site models.Site) time.Duration { - if site.Timeout > 0 { - return time.Duration(site.Timeout) * time.Second - } - return 5 * time.Second -} - -func isCodeAccepted(code int, accepted string) bool { - if accepted == "" { - return code >= 200 && code < 300 - } - for _, part := range strings.Split(accepted, ",") { - part = strings.TrimSpace(part) - if strings.Contains(part, "-") { - bounds := strings.SplitN(part, "-", 2) - lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0])) - hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1])) - if err1 == nil && err2 == nil && code >= lo && code <= hi { - return true - } - } else { - if v, err := strconv.Atoi(part); err == nil && code == v { - return true - } - } - } - return false -} - -func (e *Engine) checkPing(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL - } - - pinger, err := probing.NewPinger(host) - if err != nil { - e.handleStatusChange(site, "DOWN", 0, 0) - e.AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err)) - return - } - pinger.Count = 1 - pinger.Timeout = siteTimeout(site) - pinger.SetPrivileged(false) - - start := time.Now() - err = pinger.Run() - latency := time.Since(start) - - if err != nil || pinger.Statistics().PacketsRecv == 0 { - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - - stats := pinger.Statistics() - updatedSite := site - updatedSite.Latency = stats.AvgRtt - updatedSite.LastCheck = time.Now() - e.handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt) -} - -func (e *Engine) checkPort(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL - } - addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) - timeout := siteTimeout(site) - - start := time.Now() - conn, err := net.DialTimeout("tcp", addr, timeout) - latency := time.Since(start) - - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - - if err != nil { - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - conn.Close() - e.handleStatusChange(updatedSite, "UP", 0, latency) -} - func (e *Engine) checkGroup(site models.Site) { e.mu.RLock() status := "UP" @@ -588,63 +446,54 @@ func (e *Engine) checkGroup(site models.Site) { e.mu.Unlock() } -func (e *Engine) checkDNS(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL +func (e *Engine) SetAggStrategy(strategy AggregationStrategy) { + e.aggStrategy = strategy +} + +func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) { + e.probeResultsMu.Lock() + if e.probeResults[siteID] == nil { + e.probeResults[siteID] = make(map[string]NodeResult) + } + e.probeResults[siteID][nodeID] = NodeResult{ + NodeID: nodeID, + IsUp: isUp, + LatencyNs: latencyNs, + CheckedAt: time.Now(), + } + results := make([]NodeResult, 0, len(e.probeResults[siteID])) + for _, r := range e.probeResults[siteID] { + results = append(results, r) + } + e.probeResultsMu.Unlock() + + aggUp, avgLatency := AggregateStatus(results, e.aggStrategy) + + e.mu.RLock() + site, exists := e.liveState[siteID] + e.mu.RUnlock() + if !exists { + return } - server := site.DNSServer - if server == "" { - server = "1.1.1.1" + rawStatus := "UP" + if !aggUp { + rawStatus = "DOWN" } - if _, _, err := net.SplitHostPort(server); err != nil { - server = net.JoinHostPort(server, "53") - } - - qtype := dns.TypeA - switch site.DNSResolveType { - case "AAAA": - qtype = dns.TypeAAAA - case "MX": - qtype = dns.TypeMX - case "CNAME": - qtype = dns.TypeCNAME - case "TXT": - qtype = dns.TypeTXT - case "NS": - qtype = dns.TypeNS - case "SOA": - qtype = dns.TypeSOA - case "SRV": - qtype = dns.TypeSRV - case "PTR": - qtype = dns.TypePTR - } - - m := new(dns.Msg) - m.SetQuestion(dns.Fqdn(host), qtype) - - c := new(dns.Client) - c.Timeout = siteTimeout(site) - - start := time.Now() - r, _, err := c.Exchange(m, server) - latency := time.Since(start) updatedSite := site - updatedSite.Latency = latency + updatedSite.Latency = time.Duration(avgLatency) updatedSite.LastCheck = time.Now() - - if err != nil { - e.handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - - if r.Rcode != dns.RcodeSuccess { - e.handleStatusChange(updatedSite, "DOWN", r.Rcode, latency) - return - } - - e.handleStatusChange(updatedSite, "UP", 0, latency) + e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency)) +} + +func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult { + e.probeResultsMu.RLock() + defer e.probeResultsMu.RUnlock() + src := e.probeResults[siteID] + cp := make(map[string]NodeResult, len(src)) + for k, v := range src { + cp[k] = v + } + return cp } diff --git a/internal/server/server.go b/internal/server/server.go index 18670c2..bd7926b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -325,6 +325,7 @@ func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) { if err := s.SaveCheckFromNode(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp); err != nil { log.Printf("Failed to save probe result: %v", err) } + eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp) } s.UpdateNodeLastSeen(req.NodeID) json.NewEncoder(w).Encode(map[string]bool{"ok": true})