Files
uptop/internal/cluster/probe.go
T
lerko ca5a42314f feat(cluster): add probe execution mode, check extraction, and result aggregation
Phase 2 of distributed probing:
- Extract check logic into standalone RunCheck() for use by probes
- Add probe cluster mode: stateless nodes that fetch assignments, execute
  checks, and report results to the leader
- Add multi-node result aggregation with configurable strategy
  (any-down, majority-down, all-down)
- Leader ingests probe results into engine live state and triggers alerts
- New env vars: UPKEEP_NODE_ID, UPKEEP_NODE_NAME, UPKEEP_NODE_REGION,
  UPKEEP_AGG_STRATEGY
- Example docker-compose.probe.yml with leader + 2 regional probes
2026-05-16 11:19:57 -04:00

188 lines
4.4 KiB
Go

package cluster
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"go-upkeep/internal/models"
"go-upkeep/internal/monitor"
"log"
"net/http"
"sync"
"time"
)
type ProbeConfig struct {
NodeID string
NodeName string
Region string
LeaderURL string
SharedKey string
Interval int
}
func RunProbe(ctx context.Context, cfg ProbeConfig) error {
if cfg.Interval < 10 {
cfg.Interval = 30
}
apiClient := &http.Client{Timeout: 10 * time.Second}
strictClient := &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
}
insecureClient := &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
}
if err := probeRegister(ctx, apiClient, cfg); err != nil {
log.Printf("Probe: initial registration failed: %v (will retry)", err)
}
for {
select {
case <-ctx.Done():
return nil
default:
}
sites, err := probeFetchAssignments(ctx, apiClient, cfg)
if err != nil {
log.Printf("Probe: failed to fetch assignments: %v", err)
sleepCtx(ctx, 10*time.Second)
continue
}
if len(sites) == 0 {
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
continue
}
results := probeExecuteChecks(ctx, sites, strictClient, insecureClient)
if len(results) > 0 {
if err := probeReportResults(ctx, apiClient, cfg, results); err != nil {
log.Printf("Probe: failed to report results: %v", err)
}
}
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
}
}
func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error {
body, _ := json.Marshal(map[string]string{
"id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe",
})
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("register returned %d", resp.StatusCode)
}
return nil
}
func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) {
req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("assignments returned %d", resp.StatusCode)
}
var result struct {
Sites []models.Site `json:"sites"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Sites, nil
}
type probeResultItem struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
}
func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem {
var mu sync.Mutex
var results []probeResultItem
sem := make(chan struct{}, 10)
var wg sync.WaitGroup
for _, site := range sites {
select {
case <-ctx.Done():
break
default:
}
wg.Add(1)
sem <- struct{}{}
go func(s models.Site) {
defer wg.Done()
defer func() { <-sem }()
cr := monitor.RunCheck(s, strict, insecure, false)
mu.Lock()
results = append(results, probeResultItem{
SiteID: s.ID,
LatencyNs: cr.LatencyNs,
IsUp: cr.Status == "UP",
})
mu.Unlock()
}(site)
}
wg.Wait()
return results
}
func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error {
body, err := json.Marshal(map[string]interface{}{
"node_id": cfg.NodeID,
"results": results,
})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("results returned %d", resp.StatusCode)
}
fmt.Printf("Probe: reported %d check results\n", len(results))
return nil
}
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}