Files
uptop/internal/cluster/probe.go
T
lerko f00acbc280 refactor(models): typed Status constants with IsBroken() predicate
Replace ~150 bare status string comparisons with typed models.Status
constants (StatusUp, StatusDown, StatusPending, StatusLate, StatusStale,
StatusSSLExp). Single IsBroken() method replaces the duplicated
isBroken lambda in monitor.go and isDown function in sla.go.

Adding a new status value (e.g. DEGRADED) now requires one constant
definition instead of grep-and-pray across 16 files.

CheckResult.Status stays string — the checker is the boundary between
raw protocol results and typed status. Cast happens at the edge in
handleStatusChange.
2026-06-11 15:56:51 -04:00

202 lines
5.0 KiB
Go

package cluster
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"sync"
"time"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/monitor"
)
type ProbeConfig struct {
NodeID string
NodeName string
Region string
LeaderURL string
SharedKey string
Interval int
AllowPrivateTargets bool
}
func RunProbe(ctx context.Context, cfg ProbeConfig) error {
if cfg.Interval < 10 {
cfg.Interval = 30
}
apiClient := &http.Client{Timeout: 10 * time.Second}
dial := monitor.SafeDialContext(cfg.AllowPrivateTargets)
strictClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
DialContext: dial,
},
}
insecureClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // intentional for IgnoreTLS sites
DialContext: dial,
},
}
if err := probeRegister(ctx, apiClient, cfg); err != nil {
log.Printf("Probe: initial registration failed: %v (will retry)", err)
}
for {
select {
case <-ctx.Done():
return nil
default:
}
sites, err := probeFetchAssignments(ctx, apiClient, cfg)
if err != nil {
log.Printf("Probe: failed to fetch assignments: %v", err)
sleepCtx(ctx, 10*time.Second)
continue
}
if len(sites) == 0 {
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
continue
}
results := probeExecuteChecks(ctx, sites, strictClient, insecureClient, cfg.AllowPrivateTargets)
if len(results) > 0 {
if err := probeReportResults(ctx, apiClient, cfg, results); err != nil {
log.Printf("Probe: failed to report results: %v", err)
}
}
sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second)
}
}
func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error {
body, _ := json.Marshal(map[string]string{
"id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe",
})
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
_ = resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("register returned %d", resp.StatusCode)
}
return nil
}
func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) {
assignURL := cfg.LeaderURL + "/api/probe/assignments?" + url.Values{"node_id": {cfg.NodeID}}.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", assignURL, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("assignments returned %d", resp.StatusCode)
}
var result struct {
Sites []models.Site `json:"sites"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Sites, nil
}
type probeResultItem struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
ErrorReason string `json:"error_reason,omitempty"`
}
func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client, allowPrivate bool) []probeResultItem {
var mu sync.Mutex
var results []probeResultItem
sem := make(chan struct{}, 10)
var wg sync.WaitGroup
loop:
for _, site := range sites {
select {
case <-ctx.Done():
break loop
default:
}
wg.Add(1)
sem <- struct{}{}
go func(s models.Site) {
defer wg.Done()
defer func() { <-sem }()
cr := monitor.RunCheck(ctx, s, strict, insecure, false, allowPrivate)
mu.Lock()
results = append(results, probeResultItem{
SiteID: s.ID,
LatencyNs: cr.LatencyNs,
IsUp: cr.Status == string(models.StatusUp),
ErrorReason: cr.ErrorReason,
})
mu.Unlock()
}(site)
}
wg.Wait()
return results
}
func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error {
body, err := json.Marshal(map[string]interface{}{
"node_id": cfg.NodeID,
"results": results,
})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Upkeep-Secret", cfg.SharedKey)
resp, err := client.Do(req)
if err != nil {
return err
}
_ = resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("results returned %d", resp.StatusCode)
}
fmt.Printf("Probe: reported %d check results\n", len(results))
return nil
}
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}