Files
uptop/internal/cluster/cluster.go
T
lerko ca5a42314f feat(cluster): add probe execution mode, check extraction, and result aggregation
Phase 2 of distributed probing:
- Extract check logic into standalone RunCheck() for use by probes
- Add probe cluster mode: stateless nodes that fetch assignments, execute
  checks, and report results to the leader
- Add multi-node result aggregation with configurable strategy
  (any-down, majority-down, all-down)
- Leader ingests probe results into engine live state and triggers alerts
- New env vars: UPKEEP_NODE_ID, UPKEEP_NODE_NAME, UPKEEP_NODE_REGION,
  UPKEEP_AGG_STRATEGY
- Example docker-compose.probe.yml with leader + 2 regional probes
2026-05-16 11:19:57 -04:00

80 lines
1.9 KiB
Go

package cluster
import (
"context"
"fmt"
"go-upkeep/internal/monitor"
"net/http"
"strings"
"time"
)
type Config struct {
Mode string // "leader" or "follower"
PeerURL string // URL of the Leader (e.g., http://primary:8080)
SharedKey string // Security Key
}
func Start(ctx context.Context, cfg Config, eng *monitor.Engine) {
if cfg.Mode == "leader" {
fmt.Println("Cluster: Running as LEADER (Active)")
if cfg.SharedKey != "" {
fmt.Println("WARNING: Cluster mode enabled. Ensure the HTTP server is behind a TLS-terminating proxy.")
}
eng.SetActive(true)
return
}
if cfg.Mode == "follower" {
fmt.Println("Cluster: Running as FOLLOWER (Passive)")
if cfg.PeerURL != "" && !strings.HasPrefix(cfg.PeerURL, "https://") {
fmt.Println("WARNING: Cluster peer URL is not HTTPS. Cluster secret will be sent in cleartext.")
}
eng.SetActive(false)
go runFollowerLoop(ctx, cfg, eng)
}
// "probe" mode is handled directly in main.go before cluster.Start is called
}
func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) {
client := http.Client{Timeout: 2 * time.Second}
failures := 0
threshold := 3
for {
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}
req, _ := http.NewRequest("GET", cfg.PeerURL+"/api/health", nil)
if cfg.SharedKey != "" {
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
}
resp, err := client.Do(req)
isLeaderHealthy := false
if err == nil && resp.StatusCode == 200 {
isLeaderHealthy = true
resp.Body.Close()
}
if isLeaderHealthy {
failures = 0
if eng.IsActive() {
eng.SetActive(false)
eng.AddLog("Cluster: Leader detected. Switching to PASSIVE.")
}
} else {
failures++
if failures >= threshold && !eng.IsActive() {
eng.SetActive(true)
eng.AddLog("Cluster: Leader Unreachable. Switching to ACTIVE.")
}
}
}
}