Files
uptop/internal/cluster/cluster_test.go
T
lerko ab0a69d06b
CI / test (pull_request) Successful in 1m57s
CI / lint (pull_request) Successful in 1m27s
CI / vulncheck (pull_request) Successful in 56s
fix(cluster)!: rename X-Upkeep-Secret header to X-Uptop-Secret
Last upkeep-era name in the wire protocol. Breaking for mixed-version
clusters, but zero installed base exists pre-v0.1.0 — free now, breaking
forever after first tag.
2026-06-12 14:27:44 -04:00

346 lines
8.7 KiB
Go

package cluster
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/models"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/monitor"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/store/storetest"
)
type mockStore struct {
storetest.BaseMock
}
// --- Cluster Start Tests ---
func TestStart_LeaderMode(t *testing.T) {
eng := monitor.NewEngine(&mockStore{})
eng.SetActive(false)
ctx := context.Background()
Start(ctx, Config{Mode: "leader"}, eng)
if !eng.IsActive() {
t.Error("leader mode should set engine active")
}
}
func TestStart_FollowerMode(t *testing.T) {
eng := monitor.NewEngine(&mockStore{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Start(ctx, Config{Mode: "follower", PeerURL: "http://localhost:9999"}, eng)
time.Sleep(50 * time.Millisecond)
if eng.IsActive() {
t.Error("follower mode should set engine inactive")
}
}
// --- Follower Loop Tests ---
func TestFollowerLoop_FailoverOnLeaderDown(t *testing.T) {
eng := monitor.NewEngine(&mockStore{})
eng.SetActive(false)
// Server always returns 503
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(503)
}))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFollowerLoop(ctx, Config{PeerURL: srv.URL, SharedKey: "key"}, eng)
// Follower checks every 5s, needs 3 failures → ~15s minimum
// But we can't wait that long in a test. The loop sleeps 5s between checks.
// We'll wait up to 20s for failover.
deadline := time.After(20 * time.Second)
for {
if eng.IsActive() {
return // success
}
select {
case <-deadline:
t.Fatal("expected failover to ACTIVE after 3 failures")
case <-time.After(500 * time.Millisecond):
}
}
}
func TestFollowerLoop_RecoveryOnLeaderReturn(t *testing.T) {
eng := monitor.NewEngine(&mockStore{})
eng.SetActive(true) // simulate already failed over
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("OK"))
}))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFollowerLoop(ctx, Config{PeerURL: srv.URL}, eng)
deadline := time.After(10 * time.Second)
for {
if !eng.IsActive() {
return // success — switched back to passive
}
select {
case <-deadline:
t.Fatal("expected switch back to PASSIVE when leader returns")
case <-time.After(500 * time.Millisecond):
}
}
}
func TestFollowerLoop_SendsSecret(t *testing.T) {
var mu sync.Mutex
var receivedSecret string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
receivedSecret = r.Header.Get("X-Uptop-Secret")
mu.Unlock()
w.WriteHeader(200)
w.Write([]byte("OK"))
}))
defer srv.Close()
eng := monitor.NewEngine(&mockStore{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFollowerLoop(ctx, Config{PeerURL: srv.URL, SharedKey: "test-secret"}, eng)
deadline := time.After(10 * time.Second)
for {
mu.Lock()
got := receivedSecret
mu.Unlock()
if got == "test-secret" {
return
}
select {
case <-deadline:
t.Fatalf("expected secret 'test-secret', got %q", got)
case <-time.After(500 * time.Millisecond):
}
}
}
func TestFollowerLoop_CancelContext(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer srv.Close()
eng := monitor.NewEngine(&mockStore{})
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
runFollowerLoop(ctx, Config{PeerURL: srv.URL}, eng)
close(done)
}()
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("expected follower loop to exit on context cancel")
}
}
// --- Probe Tests ---
func TestProbeRegister_Success(t *testing.T) {
var received map[string]string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewDecoder(r.Body).Decode(&received)
w.WriteHeader(200)
}))
defer srv.Close()
err := probeRegister(context.Background(), srv.Client(), ProbeConfig{
NodeID: "n1", NodeName: "US East", Region: "us-east", LeaderURL: srv.URL, SharedKey: "key",
})
if err != nil {
t.Fatalf("register: %v", err)
}
if received["id"] != "n1" {
t.Errorf("expected id n1, got %s", received["id"])
}
}
func TestProbeRegister_Failure(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(401)
}))
defer srv.Close()
err := probeRegister(context.Background(), srv.Client(), ProbeConfig{
LeaderURL: srv.URL,
})
if err == nil {
t.Error("expected error on 401")
}
}
func TestProbeFetchAssignments_Success(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string][]models.Site{
"sites": {{SiteConfig: models.SiteConfig{ID: 1, Name: "s1", Type: "http", URL: "http://example.com"}}},
})
}))
defer srv.Close()
sites, err := probeFetchAssignments(context.Background(), srv.Client(), ProbeConfig{
NodeID: "n1", LeaderURL: srv.URL, SharedKey: "key",
})
if err != nil {
t.Fatalf("fetch: %v", err)
}
if len(sites) != 1 {
t.Errorf("expected 1 site, got %d", len(sites))
}
}
func TestProbeFetchAssignments_Unauthorized(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(401)
}))
defer srv.Close()
_, err := probeFetchAssignments(context.Background(), srv.Client(), ProbeConfig{
LeaderURL: srv.URL,
})
if err == nil {
t.Error("expected error on 401")
}
}
func TestProbeExecuteChecks(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer srv.Close()
sites := []models.Site{
{SiteConfig: models.SiteConfig{ID: 1, Type: "http", URL: srv.URL}},
{SiteConfig: models.SiteConfig{ID: 2, Type: "http", URL: srv.URL}},
}
strict := &http.Client{}
insecure := &http.Client{}
results := probeExecuteChecks(context.Background(), sites, strict, insecure, true)
if len(results) != 2 {
t.Fatalf("expected 2 results, got %d", len(results))
}
for _, r := range results {
if !r.IsUp {
t.Errorf("site %d expected UP", r.SiteID)
}
}
}
func TestProbeExecuteChecks_Concurrency(t *testing.T) {
var concurrent int64
var maxConcurrent int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cur := atomic.AddInt64(&concurrent, 1)
for {
old := atomic.LoadInt64(&maxConcurrent)
if cur <= old || atomic.CompareAndSwapInt64(&maxConcurrent, old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
atomic.AddInt64(&concurrent, -1)
w.WriteHeader(200)
}))
defer srv.Close()
var sites []models.Site
for i := 0; i < 20; i++ {
sites = append(sites, models.Site{SiteConfig: models.SiteConfig{ID: i + 1, Type: "http", URL: srv.URL}})
}
results := probeExecuteChecks(context.Background(), sites, &http.Client{}, &http.Client{}, true)
if len(results) != 20 {
t.Errorf("expected 20 results, got %d", len(results))
}
mc := atomic.LoadInt64(&maxConcurrent)
if mc > 10 {
t.Errorf("expected max 10 concurrent, got %d", mc)
}
}
func TestProbeReportResults_Success(t *testing.T) {
var received struct {
NodeID string `json:"node_id"`
Results []probeResultItem `json:"results"`
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewDecoder(r.Body).Decode(&received)
w.WriteHeader(200)
}))
defer srv.Close()
err := probeReportResults(context.Background(), srv.Client(), ProbeConfig{
NodeID: "n1", LeaderURL: srv.URL, SharedKey: "key",
}, []probeResultItem{{SiteID: 1, LatencyNs: 5000000, IsUp: true}})
if err != nil {
t.Fatalf("report: %v", err)
}
if received.NodeID != "n1" {
t.Errorf("expected n1, got %s", received.NodeID)
}
if len(received.Results) != 1 {
t.Errorf("expected 1 result, got %d", len(received.Results))
}
}
func TestProbeReportResults_Failure(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
}))
defer srv.Close()
err := probeReportResults(context.Background(), srv.Client(), ProbeConfig{
LeaderURL: srv.URL,
}, []probeResultItem{{SiteID: 1}})
if err == nil {
t.Error("expected error on 500")
}
}
// --- sleepCtx ---
func TestSleepCtx_Cancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
start := time.Now()
sleepCtx(ctx, 10*time.Second)
if time.Since(start) > time.Second {
t.Error("expected immediate return on canceled context")
}
}