feat(cluster): add probe execution mode, check extraction, and result aggregation
Phase 2 of distributed probing: - Extract check logic into standalone RunCheck() for use by probes - Add probe cluster mode: stateless nodes that fetch assignments, execute checks, and report results to the leader - Add multi-node result aggregation with configurable strategy (any-down, majority-down, all-down) - Leader ingests probe results into engine live state and triggers alerts - New env vars: UPKEEP_NODE_ID, UPKEEP_NODE_NAME, UPKEEP_NODE_REGION, UPKEEP_AGG_STRATEGY - Example docker-compose.probe.yml with leader + 2 regional probes
This commit is contained in:
@@ -0,0 +1,187 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go-upkeep/internal/models"
|
||||
"go-upkeep/internal/monitor"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ProbeConfig struct {
|
||||
NodeID string
|
||||
NodeName string
|
||||
Region string
|
||||
LeaderURL string
|
||||
SharedKey string
|
||||
Interval int
|
||||
}
|
||||
|
||||
func RunProbe(ctx context.Context, cfg ProbeConfig) error {
|
||||
if cfg.Interval < 10 {
|
||||
cfg.Interval = 30
|
||||
}
|
||||
|
||||
apiClient := &http.Client{Timeout: 10 * time.Second}
|
||||
strictClient := &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
||||
}
|
||||
insecureClient := &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
|
||||
}
|
||||
|
||||
if err := probeRegister(ctx, apiClient, cfg); err != nil {
|
||||
log.Printf("Probe: initial registration failed: %v (will retry)", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
sites, err := probeFetchAssignments(ctx, apiClient, cfg)
|
||||
if err != nil {
|
||||
log.Printf("Probe: failed to fetch assignments: %v", err)
|
||||
sleepCtx(ctx, 10*time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(sites) == 0 {
|
||||
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
results := probeExecuteChecks(ctx, sites, strictClient, insecureClient)
|
||||
|
||||
if len(results) > 0 {
|
||||
if err := probeReportResults(ctx, apiClient, cfg, results); err != nil {
|
||||
log.Printf("Probe: failed to report results: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error {
|
||||
body, _ := json.Marshal(map[string]string{
|
||||
"id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe",
|
||||
})
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf("register returned %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("assignments returned %d", resp.StatusCode)
|
||||
}
|
||||
var result struct {
|
||||
Sites []models.Site `json:"sites"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.Sites, nil
|
||||
}
|
||||
|
||||
type probeResultItem struct {
|
||||
SiteID int `json:"site_id"`
|
||||
LatencyNs int64 `json:"latency_ns"`
|
||||
IsUp bool `json:"is_up"`
|
||||
}
|
||||
|
||||
func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem {
|
||||
var mu sync.Mutex
|
||||
var results []probeResultItem
|
||||
sem := make(chan struct{}, 10)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, site := range sites {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break
|
||||
default:
|
||||
}
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
go func(s models.Site) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
|
||||
cr := monitor.RunCheck(s, strict, insecure, false)
|
||||
mu.Lock()
|
||||
results = append(results, probeResultItem{
|
||||
SiteID: s.ID,
|
||||
LatencyNs: cr.LatencyNs,
|
||||
IsUp: cr.Status == "UP",
|
||||
})
|
||||
mu.Unlock()
|
||||
}(site)
|
||||
}
|
||||
wg.Wait()
|
||||
return results
|
||||
}
|
||||
|
||||
func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error {
|
||||
body, err := json.Marshal(map[string]interface{}{
|
||||
"node_id": cfg.NodeID,
|
||||
"results": results,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf("results returned %d", resp.StatusCode)
|
||||
}
|
||||
fmt.Printf("Probe: reported %d check results\n", len(results))
|
||||
return nil
|
||||
}
|
||||
|
||||
func sleepCtx(ctx context.Context, d time.Duration) {
|
||||
select {
|
||||
case <-time.After(d):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user