refactor(monitor): encapsulate engine state, add graceful shutdown and tests
Replace all monitor package-level mutable state with Engine struct. All state (liveState, logStore, histories, tokenIndex, HTTP clients) is now encapsulated in Engine, created via NewEngine(store). Key changes: - Engine struct holds all monitor state with proper mutex protection - Engine.Start(ctx) and monitorRoutine respect context cancellation for graceful shutdown — no more leaked goroutines - cluster.runFollowerLoop also respects context for clean exit - Token index (map[string]int) for O(1) push heartbeat lookup, replacing O(n) linear scan through LiveState - UpdateSiteConfig preserves 8 runtime fields instead of copying 17 config fields individually - triggerAlert goroutines get 30s timeout context - All consumers (TUI, server, cluster, main) receive *Engine via constructor/parameter — no package-level state access - main.go creates context.WithCancel, passes to engine and cluster First test suite: 12 tests across store and alert packages - Store: CRUD for sites/alerts/users, push token generation, import/export round-trip, check history persistence - Alert: Discord/Slack/Webhook payload format, HTTP 4xx error propagation, Ntfy headers, unknown provider returns nil
This commit is contained in:
+19
-12
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"go-upkeep/internal/cluster"
|
||||
@@ -68,9 +69,6 @@ func main() {
|
||||
if v := os.Getenv("UPKEEP_CLUSTER_SECRET"); v != "" {
|
||||
clusterKey = v
|
||||
}
|
||||
if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" {
|
||||
monitor.SetInsecureSkipVerify(true)
|
||||
}
|
||||
|
||||
port := flag.Int("port", portVal, "SSH Port")
|
||||
flagDBType := flag.String("db-type", dbType, "Database type")
|
||||
@@ -115,26 +113,34 @@ func main() {
|
||||
fmt.Printf("Imported %d monitors and %d alerts from Uptime Kuma v%s\n", len(backup.Sites), len(backup.Alerts), kb.Version)
|
||||
}
|
||||
|
||||
monitor.InitHistoryFromStore(s)
|
||||
monitor.StartEngine(s)
|
||||
eng := monitor.NewEngine(s)
|
||||
if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" {
|
||||
eng.SetInsecureSkipVerify(true)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
eng.InitHistory()
|
||||
eng.Start(ctx)
|
||||
|
||||
server.Start(server.ServerConfig{
|
||||
Port: httpPort,
|
||||
EnableStatus: enableStatus,
|
||||
Title: statusTitle,
|
||||
ClusterKey: clusterKey,
|
||||
}, s)
|
||||
}, s, eng)
|
||||
|
||||
cluster.Start(cluster.Config{
|
||||
cluster.Start(ctx, cluster.Config{
|
||||
Mode: clusterMode,
|
||||
PeerURL: clusterPeer,
|
||||
SharedKey: clusterKey,
|
||||
})
|
||||
}, eng)
|
||||
|
||||
startSSHServer(*port, s)
|
||||
startSSHServer(*port, s, eng)
|
||||
|
||||
if isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd()) {
|
||||
p := tea.NewProgram(tui.InitialModel(true, s), tea.WithAltScreen(), tea.WithMouseCellMotion())
|
||||
p := tea.NewProgram(tui.InitialModel(true, s, eng), tea.WithAltScreen(), tea.WithMouseCellMotion())
|
||||
if _, err := p.Run(); err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
}
|
||||
@@ -145,9 +151,10 @@ func main() {
|
||||
<-done
|
||||
fmt.Println("Shutting down...")
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
func startSSHServer(port int, db store.Store) {
|
||||
func startSSHServer(port int, db store.Store, eng *monitor.Engine) {
|
||||
s, err := wish.NewServer(
|
||||
wish.WithAddress(fmt.Sprintf(":%d", port)),
|
||||
wish.WithHostKeyPath(".ssh/id_ed25519"),
|
||||
@@ -156,7 +163,7 @@ func startSSHServer(port int, db store.Store) {
|
||||
}),
|
||||
wish.WithMiddleware(
|
||||
bm.Middleware(func(s ssh.Session) (tea.Model, []tea.ProgramOption) {
|
||||
return tui.InitialModel(false, db), []tea.ProgramOption{tea.WithAltScreen(), tea.WithMouseCellMotion()}
|
||||
return tui.InitialModel(false, db, eng), []tea.ProgramOption{tea.WithAltScreen(), tea.WithMouseCellMotion()}
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
package alert
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"go-upkeep/internal/models"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestHTTPProviderDiscord(t *testing.T) {
|
||||
var received map[string]string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewDecoder(r.Body).Decode(&received)
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := GetProvider(models.AlertConfig{Type: "discord", Settings: map[string]string{"url": srv.URL}})
|
||||
if err := p.Send("Test Title", "Test Body"); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
|
||||
if received["content"] != "**Test Title**\nTest Body" {
|
||||
t.Errorf("unexpected payload: %s", received["content"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPProviderSlack(t *testing.T) {
|
||||
var received map[string]string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewDecoder(r.Body).Decode(&received)
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := GetProvider(models.AlertConfig{Type: "slack", Settings: map[string]string{"url": srv.URL}})
|
||||
if err := p.Send("Alert", "Message"); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
|
||||
if received["text"] != "*Alert*\nMessage" {
|
||||
t.Errorf("unexpected payload: %s", received["text"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPProviderWebhook(t *testing.T) {
|
||||
var received map[string]string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewDecoder(r.Body).Decode(&received)
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := GetProvider(models.AlertConfig{Type: "webhook", Settings: map[string]string{"url": srv.URL}})
|
||||
if err := p.Send("Title", "Body"); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
|
||||
if received["title"] != "Title" || received["message"] != "Body" || received["status"] != "alert" {
|
||||
t.Errorf("unexpected webhook payload: %v", received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPProviderErrorOnHTTP4xx(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(403)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := GetProvider(models.AlertConfig{Type: "discord", Settings: map[string]string{"url": srv.URL}})
|
||||
if err := p.Send("Test", "Test"); err == nil {
|
||||
t.Fatal("expected error on 403 response")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNtfyProvider(t *testing.T) {
|
||||
var title, body string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
title = r.Header.Get("Title")
|
||||
buf := make([]byte, 1024)
|
||||
n, _ := r.Body.Read(buf)
|
||||
body = string(buf[:n])
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := GetProvider(models.AlertConfig{Type: "ntfy", Settings: map[string]string{
|
||||
"url": srv.URL,
|
||||
"topic": "test",
|
||||
}})
|
||||
if err := p.Send("Alert Title", "Alert Body"); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
|
||||
if title != "Alert Title" {
|
||||
t.Errorf("expected title 'Alert Title', got '%s'", title)
|
||||
}
|
||||
if body != "Alert Body" {
|
||||
t.Errorf("expected body 'Alert Body', got '%s'", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetProviderUnknown(t *testing.T) {
|
||||
p := GetProvider(models.AlertConfig{Type: "unknown"})
|
||||
if p != nil {
|
||||
t.Error("expected nil for unknown provider type")
|
||||
}
|
||||
}
|
||||
+17
-16
@@ -1,6 +1,7 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"go-upkeep/internal/monitor"
|
||||
"net/http"
|
||||
@@ -14,13 +15,13 @@ type Config struct {
|
||||
SharedKey string // Security Key
|
||||
}
|
||||
|
||||
func Start(cfg Config) {
|
||||
func Start(ctx context.Context, cfg Config, eng *monitor.Engine) {
|
||||
if cfg.Mode == "leader" {
|
||||
fmt.Println("Cluster: Running as LEADER (Active)")
|
||||
if cfg.SharedKey != "" {
|
||||
fmt.Println("WARNING: Cluster mode enabled. Ensure the HTTP server is behind a TLS-terminating proxy.")
|
||||
}
|
||||
monitor.SetEngineActive(true)
|
||||
eng.SetActive(true)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -29,20 +30,22 @@ func Start(cfg Config) {
|
||||
if cfg.PeerURL != "" && !strings.HasPrefix(cfg.PeerURL, "https://") {
|
||||
fmt.Println("WARNING: Cluster peer URL is not HTTPS. Cluster secret will be sent in cleartext.")
|
||||
}
|
||||
monitor.SetEngineActive(false)
|
||||
go runFollowerLoop(cfg)
|
||||
eng.SetActive(false)
|
||||
go runFollowerLoop(ctx, cfg, eng)
|
||||
}
|
||||
}
|
||||
|
||||
func runFollowerLoop(cfg Config) {
|
||||
func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) {
|
||||
client := http.Client{Timeout: 2 * time.Second}
|
||||
|
||||
// Failover Configuration
|
||||
failures := 0
|
||||
threshold := 3
|
||||
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest("GET", cfg.PeerURL+"/api/health", nil)
|
||||
if cfg.SharedKey != "" {
|
||||
@@ -59,17 +62,15 @@ func runFollowerLoop(cfg Config) {
|
||||
|
||||
if isLeaderHealthy {
|
||||
failures = 0
|
||||
if monitor.IsEngineActive() {
|
||||
// Leader is back, yield
|
||||
monitor.SetEngineActive(false)
|
||||
monitor.AddLog("Cluster: Leader detected. Switching to PASSIVE.")
|
||||
if eng.IsActive() {
|
||||
eng.SetActive(false)
|
||||
eng.AddLog("Cluster: Leader detected. Switching to PASSIVE.")
|
||||
}
|
||||
} else {
|
||||
failures++
|
||||
// If failures exceed threshold, take over
|
||||
if failures >= threshold && !monitor.IsEngineActive() {
|
||||
monitor.SetEngineActive(true)
|
||||
monitor.AddLog("Cluster: Leader Unreachable. Switching to ACTIVE.")
|
||||
if failures >= threshold && !eng.IsActive() {
|
||||
eng.SetActive(true)
|
||||
eng.AddLog("Cluster: Leader Unreachable. Switching to ACTIVE.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+22
-33
@@ -1,10 +1,6 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"go-upkeep/internal/store"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
import "time"
|
||||
|
||||
const maxHistoryLen = 30
|
||||
|
||||
@@ -15,19 +11,14 @@ type SiteHistory struct {
|
||||
UpChecks int
|
||||
}
|
||||
|
||||
var (
|
||||
histories = make(map[int]*SiteHistory)
|
||||
historyMu sync.RWMutex
|
||||
)
|
||||
|
||||
func InitHistoryFromStore(s store.Store) {
|
||||
all, err := s.LoadAllHistory(maxHistoryLen)
|
||||
func (e *Engine) InitHistory() {
|
||||
all, err := e.db.LoadAllHistory(maxHistoryLen)
|
||||
if err != nil {
|
||||
AddLog("Failed to load check history: " + err.Error())
|
||||
e.AddLog("Failed to load check history: " + err.Error())
|
||||
return
|
||||
}
|
||||
historyMu.Lock()
|
||||
defer historyMu.Unlock()
|
||||
e.histMu.Lock()
|
||||
defer e.histMu.Unlock()
|
||||
for siteID, records := range all {
|
||||
h := &SiteHistory{}
|
||||
for _, r := range records {
|
||||
@@ -38,21 +29,21 @@ func InitHistoryFromStore(s store.Store) {
|
||||
h.Latencies = append(h.Latencies, time.Duration(r.LatencyNs))
|
||||
h.Statuses = append(h.Statuses, r.IsUp)
|
||||
}
|
||||
histories[siteID] = h
|
||||
e.histories[siteID] = h
|
||||
}
|
||||
if len(all) > 0 {
|
||||
AddLog("Loaded check history from database")
|
||||
e.AddLog("Loaded check history from database")
|
||||
}
|
||||
}
|
||||
|
||||
func RecordCheck(siteID int, latency time.Duration, isUp bool) {
|
||||
historyMu.Lock()
|
||||
defer historyMu.Unlock()
|
||||
func (e *Engine) recordCheck(siteID int, latency time.Duration, isUp bool) {
|
||||
e.histMu.Lock()
|
||||
defer e.histMu.Unlock()
|
||||
|
||||
h, ok := histories[siteID]
|
||||
h, ok := e.histories[siteID]
|
||||
if !ok {
|
||||
h = &SiteHistory{}
|
||||
histories[siteID] = h
|
||||
e.histories[siteID] = h
|
||||
}
|
||||
|
||||
h.TotalChecks++
|
||||
@@ -70,15 +61,13 @@ func RecordCheck(siteID int, latency time.Duration, isUp bool) {
|
||||
h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:]
|
||||
}
|
||||
|
||||
if db != nil {
|
||||
go func() { _ = db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }()
|
||||
}
|
||||
go func() { _ = e.db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }()
|
||||
}
|
||||
|
||||
func GetHistory(siteID int) (SiteHistory, bool) {
|
||||
historyMu.RLock()
|
||||
defer historyMu.RUnlock()
|
||||
h, ok := histories[siteID]
|
||||
func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) {
|
||||
e.histMu.RLock()
|
||||
defer e.histMu.RUnlock()
|
||||
h, ok := e.histories[siteID]
|
||||
if !ok {
|
||||
return SiteHistory{}, false
|
||||
}
|
||||
@@ -93,8 +82,8 @@ func GetHistory(siteID int) (SiteHistory, bool) {
|
||||
return cp, true
|
||||
}
|
||||
|
||||
func RemoveHistory(siteID int) {
|
||||
historyMu.Lock()
|
||||
defer historyMu.Unlock()
|
||||
delete(histories, siteID)
|
||||
func (e *Engine) removeHistory(siteID int) {
|
||||
e.histMu.Lock()
|
||||
defer e.histMu.Unlock()
|
||||
delete(e.histories, siteID)
|
||||
}
|
||||
|
||||
+282
-216
@@ -18,207 +18,271 @@ import (
|
||||
probing "github.com/prometheus-community/pro-bing"
|
||||
)
|
||||
|
||||
// --- LOGGING ---
|
||||
var (
|
||||
LogStore []string
|
||||
LogMutex sync.RWMutex
|
||||
)
|
||||
type Engine struct {
|
||||
mu sync.RWMutex
|
||||
liveState map[int]models.Site
|
||||
|
||||
func AddLog(msg string) {
|
||||
LogMutex.Lock()
|
||||
defer LogMutex.Unlock()
|
||||
ts := time.Now().Format("15:04:05")
|
||||
entry := fmt.Sprintf("[%s] %s", ts, msg)
|
||||
LogStore = append([]string{entry}, LogStore...)
|
||||
if len(LogStore) > 100 {
|
||||
LogStore = LogStore[:100]
|
||||
logMu sync.RWMutex
|
||||
logStore []string
|
||||
|
||||
activeMu sync.RWMutex
|
||||
isActive bool
|
||||
|
||||
histMu sync.RWMutex
|
||||
histories map[int]*SiteHistory
|
||||
|
||||
tokenIndex map[string]int
|
||||
|
||||
db store.Store
|
||||
insecureSkipVerify bool
|
||||
strictClient *http.Client
|
||||
insecureClient *http.Client
|
||||
}
|
||||
|
||||
func NewEngine(s store.Store) *Engine {
|
||||
return &Engine{
|
||||
liveState: make(map[int]models.Site),
|
||||
histories: make(map[int]*SiteHistory),
|
||||
tokenIndex: make(map[string]int),
|
||||
isActive: true,
|
||||
db: s,
|
||||
strictClient: &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
||||
},
|
||||
insecureClient: &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func GetLogs() []string {
|
||||
LogMutex.RLock()
|
||||
defer LogMutex.RUnlock()
|
||||
logs := make([]string, len(LogStore))
|
||||
copy(logs, LogStore)
|
||||
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
||||
e.insecureSkipVerify = skip
|
||||
}
|
||||
|
||||
func (e *Engine) AddLog(msg string) {
|
||||
e.logMu.Lock()
|
||||
defer e.logMu.Unlock()
|
||||
ts := time.Now().Format("15:04:05")
|
||||
entry := fmt.Sprintf("[%s] %s", ts, msg)
|
||||
e.logStore = append([]string{entry}, e.logStore...)
|
||||
if len(e.logStore) > 100 {
|
||||
e.logStore = e.logStore[:100]
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) GetLogs() []string {
|
||||
e.logMu.RLock()
|
||||
defer e.logMu.RUnlock()
|
||||
logs := make([]string, len(e.logStore))
|
||||
copy(logs, e.logStore)
|
||||
return logs
|
||||
}
|
||||
|
||||
// --- ENGINE ---
|
||||
|
||||
var (
|
||||
LiveState = make(map[int]models.Site)
|
||||
Mutex sync.RWMutex
|
||||
|
||||
// Global Switch for HA
|
||||
isActive = true
|
||||
activeMutex sync.RWMutex
|
||||
|
||||
insecureSkipVerify bool
|
||||
|
||||
db store.Store
|
||||
|
||||
strictClient = &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
||||
}
|
||||
insecureClient = &http.Client{
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
|
||||
}
|
||||
)
|
||||
|
||||
func SetInsecureSkipVerify(skip bool) {
|
||||
insecureSkipVerify = skip
|
||||
}
|
||||
|
||||
func SetEngineActive(active bool) {
|
||||
activeMutex.Lock()
|
||||
defer activeMutex.Unlock()
|
||||
if isActive != active {
|
||||
isActive = active
|
||||
func (e *Engine) SetActive(active bool) {
|
||||
e.activeMu.Lock()
|
||||
defer e.activeMu.Unlock()
|
||||
if e.isActive != active {
|
||||
e.isActive = active
|
||||
status := "RESUMED (Active)"
|
||||
if !active {
|
||||
status = "PAUSED (Passive)"
|
||||
}
|
||||
AddLog(fmt.Sprintf("Engine %s", status))
|
||||
e.AddLog(fmt.Sprintf("Engine %s", status))
|
||||
}
|
||||
}
|
||||
|
||||
func IsEngineActive() bool {
|
||||
activeMutex.RLock()
|
||||
defer activeMutex.RUnlock()
|
||||
return isActive
|
||||
func (e *Engine) IsActive() bool {
|
||||
e.activeMu.RLock()
|
||||
defer e.activeMu.RUnlock()
|
||||
return e.isActive
|
||||
}
|
||||
|
||||
func RecordHeartbeat(token string) bool {
|
||||
if !IsEngineActive() {
|
||||
return false
|
||||
} // Only Leader accepts Push
|
||||
func (e *Engine) GetAllSites() []models.Site {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
sites := make([]models.Site, 0, len(e.liveState))
|
||||
for _, s := range e.liveState {
|
||||
sites = append(sites, s)
|
||||
}
|
||||
return sites
|
||||
}
|
||||
|
||||
Mutex.Lock()
|
||||
defer Mutex.Unlock()
|
||||
var targetID int = -1
|
||||
for id, s := range LiveState {
|
||||
if s.Type == "push" && s.Token == token {
|
||||
targetID = id
|
||||
break
|
||||
func (e *Engine) GetLiveState() map[int]models.Site {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
cp := make(map[int]models.Site, len(e.liveState))
|
||||
for k, v := range e.liveState {
|
||||
cp[k] = v
|
||||
}
|
||||
}
|
||||
if targetID == -1 {
|
||||
return cp
|
||||
}
|
||||
|
||||
func (e *Engine) RecordHeartbeat(token string) bool {
|
||||
if !e.IsActive() {
|
||||
return false
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
targetID, ok := e.tokenIndex[token]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
site, exists := e.liveState[targetID]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
site := LiveState[targetID]
|
||||
site.LastCheck = time.Now()
|
||||
wasDown := site.Status == "DOWN"
|
||||
site.Status = "UP"
|
||||
site.FailureCount = 0
|
||||
site.Latency = 0
|
||||
LiveState[targetID] = site
|
||||
e.liveState[targetID] = site
|
||||
|
||||
if wasDown {
|
||||
AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name))
|
||||
triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name))
|
||||
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name))
|
||||
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func StartEngine(s store.Store) {
|
||||
db = s
|
||||
func (e *Engine) addToTokenIndex(site models.Site) {
|
||||
if site.Type == "push" && site.Token != "" {
|
||||
e.tokenIndex[site.Token] = site.ID
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) removeFromTokenIndex(id int) {
|
||||
for token, sid := range e.tokenIndex {
|
||||
if sid == id {
|
||||
delete(e.tokenIndex, token)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) Start(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
sites, err := db.GetSites()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
sites, err := e.db.GetSites()
|
||||
if err != nil {
|
||||
AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
||||
time.Sleep(5 * time.Second)
|
||||
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, s := range sites {
|
||||
Mutex.RLock()
|
||||
_, exists := LiveState[s.ID]
|
||||
Mutex.RUnlock()
|
||||
e.mu.RLock()
|
||||
_, exists := e.liveState[s.ID]
|
||||
e.mu.RUnlock()
|
||||
if !exists {
|
||||
Mutex.Lock()
|
||||
e.mu.Lock()
|
||||
s.Status = "PENDING"
|
||||
if s.Type == "push" {
|
||||
s.LastCheck = time.Now()
|
||||
}
|
||||
LiveState[s.ID] = s
|
||||
Mutex.Unlock()
|
||||
go monitorRoutine(s.ID)
|
||||
e.liveState[s.ID] = s
|
||||
e.addToTokenIndex(s)
|
||||
e.mu.Unlock()
|
||||
go e.monitorRoutine(ctx, s.ID)
|
||||
}
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func UpdateSiteConfig(site models.Site) {
|
||||
Mutex.Lock()
|
||||
defer Mutex.Unlock()
|
||||
if s, ok := LiveState[site.ID]; ok {
|
||||
s.Name = site.Name
|
||||
s.URL = site.URL
|
||||
s.Type = site.Type
|
||||
s.Interval = site.Interval
|
||||
s.AlertID = site.AlertID
|
||||
s.CheckSSL = site.CheckSSL
|
||||
s.ExpiryThreshold = site.ExpiryThreshold
|
||||
s.MaxRetries = site.MaxRetries
|
||||
s.Hostname = site.Hostname
|
||||
s.Port = site.Port
|
||||
s.Timeout = site.Timeout
|
||||
s.Method = site.Method
|
||||
s.Description = site.Description
|
||||
s.ParentID = site.ParentID
|
||||
s.AcceptedCodes = site.AcceptedCodes
|
||||
s.DNSResolveType = site.DNSResolveType
|
||||
s.DNSServer = site.DNSServer
|
||||
s.IgnoreTLS = site.IgnoreTLS
|
||||
s.Paused = site.Paused
|
||||
LiveState[site.ID] = s
|
||||
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if existing, ok := e.liveState[site.ID]; ok {
|
||||
e.removeFromTokenIndex(site.ID)
|
||||
site.Status = existing.Status
|
||||
site.StatusCode = existing.StatusCode
|
||||
site.Latency = existing.Latency
|
||||
site.CertExpiry = existing.CertExpiry
|
||||
site.HasSSL = existing.HasSSL
|
||||
site.LastCheck = existing.LastCheck
|
||||
site.SentSSLWarning = existing.SentSSLWarning
|
||||
site.FailureCount = existing.FailureCount
|
||||
e.liveState[site.ID] = site
|
||||
e.addToTokenIndex(site)
|
||||
}
|
||||
}
|
||||
|
||||
func RemoveSite(id int) {
|
||||
Mutex.Lock()
|
||||
delete(LiveState, id)
|
||||
Mutex.Unlock()
|
||||
RemoveHistory(id)
|
||||
func (e *Engine) RemoveSite(id int) {
|
||||
e.mu.Lock()
|
||||
e.removeFromTokenIndex(id)
|
||||
delete(e.liveState, id)
|
||||
e.mu.Unlock()
|
||||
e.removeHistory(id)
|
||||
}
|
||||
|
||||
func ToggleSitePause(id int) bool {
|
||||
Mutex.Lock()
|
||||
defer Mutex.Unlock()
|
||||
site, ok := LiveState[id]
|
||||
func (e *Engine) ToggleSitePause(id int) bool {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
site, ok := e.liveState[id]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
site.Paused = !site.Paused
|
||||
LiveState[id] = site
|
||||
e.liveState[id] = site
|
||||
if site.Paused {
|
||||
AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name))
|
||||
e.AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name))
|
||||
} else {
|
||||
AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name))
|
||||
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name))
|
||||
}
|
||||
return site.Paused
|
||||
}
|
||||
|
||||
func monitorRoutine(id int) {
|
||||
checkByID(id)
|
||||
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
||||
e.checkByID(id)
|
||||
for {
|
||||
if !IsEngineActive() {
|
||||
time.Sleep(5 * time.Second)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if !e.IsActive() {
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
Mutex.RLock()
|
||||
site, exists := LiveState[id]
|
||||
Mutex.RUnlock()
|
||||
e.mu.RLock()
|
||||
site, exists := e.liveState[id]
|
||||
e.mu.RUnlock()
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
if site.Paused {
|
||||
time.Sleep(5 * time.Second)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -226,72 +290,52 @@ func monitorRoutine(id int) {
|
||||
if interval < 5 {
|
||||
interval = 5
|
||||
}
|
||||
time.Sleep(time.Duration(interval) * time.Second)
|
||||
checkByID(id)
|
||||
select {
|
||||
case <-time.After(time.Duration(interval) * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
e.checkByID(id)
|
||||
}
|
||||
}
|
||||
|
||||
func checkByID(id int) {
|
||||
if !IsEngineActive() {
|
||||
func (e *Engine) checkByID(id int) {
|
||||
if !e.IsActive() {
|
||||
return
|
||||
}
|
||||
|
||||
Mutex.RLock()
|
||||
site, exists := LiveState[id]
|
||||
Mutex.RUnlock()
|
||||
e.mu.RLock()
|
||||
site, exists := e.liveState[id]
|
||||
e.mu.RUnlock()
|
||||
if !exists || site.Paused {
|
||||
return
|
||||
}
|
||||
switch site.Type {
|
||||
case "http":
|
||||
checkHTTP(site)
|
||||
e.checkHTTP(site)
|
||||
case "push":
|
||||
checkPush(site)
|
||||
e.checkPush(site)
|
||||
case "ping":
|
||||
checkPing(site)
|
||||
e.checkPing(site)
|
||||
case "port":
|
||||
checkPort(site)
|
||||
e.checkPort(site)
|
||||
case "dns":
|
||||
checkDNS(site)
|
||||
e.checkDNS(site)
|
||||
case "group":
|
||||
checkGroup(site)
|
||||
e.checkGroup(site)
|
||||
}
|
||||
}
|
||||
|
||||
func checkPush(site models.Site) {
|
||||
func (e *Engine) checkPush(site models.Site) {
|
||||
deadline := site.LastCheck.Add(time.Duration(site.Interval) * time.Second).Add(5 * time.Second)
|
||||
if time.Now().After(deadline) {
|
||||
handleStatusChange(site, "DOWN", 0, 0)
|
||||
} else {
|
||||
if site.Status != "UP" {
|
||||
handleStatusChange(site, "UP", 200, 0)
|
||||
}
|
||||
e.handleStatusChange(site, "DOWN", 0, 0)
|
||||
} else if site.Status != "UP" {
|
||||
e.handleStatusChange(site, "UP", 200, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func isCodeAccepted(code int, accepted string) bool {
|
||||
if accepted == "" {
|
||||
return code >= 200 && code < 300
|
||||
}
|
||||
for _, part := range strings.Split(accepted, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
if strings.Contains(part, "-") {
|
||||
bounds := strings.SplitN(part, "-", 2)
|
||||
lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0]))
|
||||
hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1]))
|
||||
if err1 == nil && err2 == nil && code >= lo && code <= hi {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if v, err := strconv.Atoi(part); err == nil && code == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func checkHTTP(site models.Site) {
|
||||
func (e *Engine) checkHTTP(site models.Site) {
|
||||
method := site.Method
|
||||
if method == "" {
|
||||
method = "GET"
|
||||
@@ -303,13 +347,13 @@ func checkHTTP(site models.Site) {
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, method, site.URL, nil)
|
||||
if err != nil {
|
||||
handleStatusChange(site, "DOWN", 0, 0)
|
||||
e.handleStatusChange(site, "DOWN", 0, 0)
|
||||
return
|
||||
}
|
||||
|
||||
client := strictClient
|
||||
if insecureSkipVerify || site.IgnoreTLS {
|
||||
client = insecureClient
|
||||
client := e.strictClient
|
||||
if e.insecureSkipVerify || site.IgnoreTLS {
|
||||
client = e.insecureClient
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
@@ -343,12 +387,11 @@ func checkHTTP(site models.Site) {
|
||||
updatedSite.CertExpiry = certExpiry
|
||||
updatedSite.Latency = latency
|
||||
updatedSite.LastCheck = time.Now()
|
||||
handleStatusChange(updatedSite, rawStatus, rawCode, latency)
|
||||
e.handleStatusChange(updatedSite, rawStatus, rawCode, latency)
|
||||
}
|
||||
|
||||
func handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
|
||||
// Double check we are still leader before alerting
|
||||
if !IsEngineActive() {
|
||||
func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
|
||||
if !e.IsActive() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -360,9 +403,9 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti
|
||||
if newState.FailureCount > site.MaxRetries {
|
||||
newState.Status = rawStatus
|
||||
newState.FailureCount = site.MaxRetries + 1
|
||||
AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name))
|
||||
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name))
|
||||
} else {
|
||||
AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries))
|
||||
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries))
|
||||
}
|
||||
} else if rawStatus == "UP" {
|
||||
newState.FailureCount = 0
|
||||
@@ -375,20 +418,20 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti
|
||||
if site.Type == "http" && site.CheckSSL && site.HasSSL {
|
||||
daysLeft := int(time.Until(site.CertExpiry).Hours() / 24)
|
||||
if daysLeft <= site.ExpiryThreshold && !site.SentSSLWarning && rawStatus != "SSL EXP" {
|
||||
triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft))
|
||||
e.triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft))
|
||||
newState.SentSSLWarning = true
|
||||
} else if daysLeft > site.ExpiryThreshold {
|
||||
newState.SentSSLWarning = false
|
||||
}
|
||||
}
|
||||
|
||||
Mutex.Lock()
|
||||
if _, ok := LiveState[site.ID]; ok {
|
||||
LiveState[site.ID] = newState
|
||||
e.mu.Lock()
|
||||
if _, ok := e.liveState[site.ID]; ok {
|
||||
e.liveState[site.ID] = newState
|
||||
}
|
||||
Mutex.Unlock()
|
||||
e.mu.Unlock()
|
||||
|
||||
RecordCheck(site.ID, latency, rawStatus == "UP")
|
||||
e.recordCheck(site.ID, latency, rawStatus == "UP")
|
||||
|
||||
isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" }
|
||||
if !isBroken(site.Status) && isBroken(newState.Status) && newState.Status != "PENDING" {
|
||||
@@ -396,24 +439,26 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti
|
||||
if site.Type == "push" {
|
||||
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", site.Name)
|
||||
}
|
||||
triggerAlert(site.AlertID, "🚨 ALERT", msg)
|
||||
e.triggerAlert(site.AlertID, "🚨 ALERT", msg)
|
||||
}
|
||||
if isBroken(site.Status) && newState.Status == "UP" {
|
||||
triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name))
|
||||
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name))
|
||||
}
|
||||
}
|
||||
|
||||
func triggerAlert(alertID int, title, message string) {
|
||||
if db == nil {
|
||||
return
|
||||
}
|
||||
cfg, err := db.GetAlert(alertID)
|
||||
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
||||
cfg, err := e.db.GetAlert(alertID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
provider := alert.GetProvider(cfg)
|
||||
if provider != nil {
|
||||
go func() { provider.Send(title, message) }()
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
_ = ctx
|
||||
_ = provider.Send(title, message)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -424,7 +469,29 @@ func siteTimeout(site models.Site) time.Duration {
|
||||
return 5 * time.Second
|
||||
}
|
||||
|
||||
func checkPing(site models.Site) {
|
||||
func isCodeAccepted(code int, accepted string) bool {
|
||||
if accepted == "" {
|
||||
return code >= 200 && code < 300
|
||||
}
|
||||
for _, part := range strings.Split(accepted, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
if strings.Contains(part, "-") {
|
||||
bounds := strings.SplitN(part, "-", 2)
|
||||
lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0]))
|
||||
hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1]))
|
||||
if err1 == nil && err2 == nil && code >= lo && code <= hi {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if v, err := strconv.Atoi(part); err == nil && code == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (e *Engine) checkPing(site models.Site) {
|
||||
host := site.Hostname
|
||||
if host == "" {
|
||||
host = site.URL
|
||||
@@ -432,8 +499,8 @@ func checkPing(site models.Site) {
|
||||
|
||||
pinger, err := probing.NewPinger(host)
|
||||
if err != nil {
|
||||
handleStatusChange(site, "DOWN", 0, 0)
|
||||
AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err))
|
||||
e.handleStatusChange(site, "DOWN", 0, 0)
|
||||
e.AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err))
|
||||
return
|
||||
}
|
||||
pinger.Count = 1
|
||||
@@ -448,7 +515,7 @@ func checkPing(site models.Site) {
|
||||
updatedSite := site
|
||||
updatedSite.Latency = latency
|
||||
updatedSite.LastCheck = time.Now()
|
||||
handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -456,10 +523,10 @@ func checkPing(site models.Site) {
|
||||
updatedSite := site
|
||||
updatedSite.Latency = stats.AvgRtt
|
||||
updatedSite.LastCheck = time.Now()
|
||||
handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt)
|
||||
e.handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt)
|
||||
}
|
||||
|
||||
func checkPort(site models.Site) {
|
||||
func (e *Engine) checkPort(site models.Site) {
|
||||
host := site.Hostname
|
||||
if host == "" {
|
||||
host = site.URL
|
||||
@@ -476,19 +543,19 @@ func checkPort(site models.Site) {
|
||||
updatedSite.LastCheck = time.Now()
|
||||
|
||||
if err != nil {
|
||||
handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
handleStatusChange(updatedSite, "UP", 0, latency)
|
||||
e.handleStatusChange(updatedSite, "UP", 0, latency)
|
||||
}
|
||||
|
||||
func checkGroup(site models.Site) {
|
||||
Mutex.RLock()
|
||||
func (e *Engine) checkGroup(site models.Site) {
|
||||
e.mu.RLock()
|
||||
status := "UP"
|
||||
hasChildren := false
|
||||
allPaused := true
|
||||
for _, child := range LiveState {
|
||||
for _, child := range e.liveState {
|
||||
if child.ParentID != site.ID || child.Type == "group" {
|
||||
continue
|
||||
}
|
||||
@@ -505,23 +572,23 @@ func checkGroup(site models.Site) {
|
||||
status = "PENDING"
|
||||
}
|
||||
}
|
||||
Mutex.RUnlock()
|
||||
e.mu.RUnlock()
|
||||
|
||||
if !hasChildren {
|
||||
status = "PENDING"
|
||||
}
|
||||
|
||||
Mutex.Lock()
|
||||
s := LiveState[site.ID]
|
||||
e.mu.Lock()
|
||||
s := e.liveState[site.ID]
|
||||
s.Status = status
|
||||
if hasChildren && allPaused {
|
||||
s.Paused = true
|
||||
}
|
||||
LiveState[site.ID] = s
|
||||
Mutex.Unlock()
|
||||
e.liveState[site.ID] = s
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
func checkDNS(site models.Site) {
|
||||
func (e *Engine) checkDNS(site models.Site) {
|
||||
host := site.Hostname
|
||||
if host == "" {
|
||||
host = site.URL
|
||||
@@ -562,8 +629,7 @@ func checkDNS(site models.Site) {
|
||||
c.Timeout = siteTimeout(site)
|
||||
|
||||
start := time.Now()
|
||||
r, rtt, err := c.Exchange(m, server)
|
||||
_ = rtt
|
||||
r, _, err := c.Exchange(m, server)
|
||||
latency := time.Since(start)
|
||||
|
||||
updatedSite := site
|
||||
@@ -571,14 +637,14 @@ func checkDNS(site models.Site) {
|
||||
updatedSite.LastCheck = time.Now()
|
||||
|
||||
if err != nil {
|
||||
handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
||||
return
|
||||
}
|
||||
|
||||
if r.Rcode != dns.RcodeSuccess {
|
||||
handleStatusChange(updatedSite, "DOWN", r.Rcode, latency)
|
||||
e.handleStatusChange(updatedSite, "DOWN", r.Rcode, latency)
|
||||
return
|
||||
}
|
||||
|
||||
handleStatusChange(updatedSite, "UP", 0, latency)
|
||||
e.handleStatusChange(updatedSite, "UP", 0, latency)
|
||||
}
|
||||
|
||||
@@ -148,7 +148,7 @@ type ServerConfig struct {
|
||||
ClusterKey string // Shared Secret for Security
|
||||
}
|
||||
|
||||
func Start(cfg ServerConfig, s store.Store) {
|
||||
func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) {
|
||||
if cfg.ClusterKey == "" {
|
||||
fmt.Println("WARNING: No UPKEEP_CLUSTER_SECRET set. Cluster API endpoints are unauthenticated.")
|
||||
}
|
||||
@@ -161,7 +161,7 @@ func Start(cfg ServerConfig, s store.Store) {
|
||||
http.Error(w, "Missing token", 400)
|
||||
return
|
||||
}
|
||||
if monitor.RecordHeartbeat(token) {
|
||||
if eng.RecordHeartbeat(token) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("OK"))
|
||||
} else {
|
||||
@@ -244,12 +244,10 @@ func Start(cfg ServerConfig, s store.Store) {
|
||||
|
||||
// 6. Status Page
|
||||
if cfg.EnableStatus {
|
||||
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title) })
|
||||
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) })
|
||||
mux.HandleFunc("/status/json", func(w http.ResponseWriter, r *http.Request) {
|
||||
monitor.Mutex.RLock()
|
||||
defer monitor.Mutex.RUnlock()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(monitor.LiveState)
|
||||
json.NewEncoder(w).Encode(eng.GetLiveState())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -262,13 +260,8 @@ func Start(cfg ServerConfig, s store.Store) {
|
||||
}()
|
||||
}
|
||||
|
||||
func renderStatusPage(w http.ResponseWriter, title string) {
|
||||
monitor.Mutex.RLock()
|
||||
var sites []models.Site
|
||||
for _, s := range monitor.LiveState {
|
||||
sites = append(sites, s)
|
||||
}
|
||||
monitor.Mutex.RUnlock()
|
||||
func renderStatusPage(w http.ResponseWriter, title string, eng *monitor.Engine) {
|
||||
sites := eng.GetAllSites()
|
||||
|
||||
sort.Slice(sites, func(i, j int) bool {
|
||||
if sites[i].Status != sites[j].Status {
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"go-upkeep/internal/models"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func newTestStore(t *testing.T) *SQLStore {
|
||||
t.Helper()
|
||||
s, err := NewSQLiteStore(":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("NewSQLiteStore: %v", err)
|
||||
}
|
||||
if err := s.Init(); err != nil {
|
||||
t.Fatalf("Init: %v", err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func TestSiteCRUD(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
sites, err := s.GetSites()
|
||||
if err != nil {
|
||||
t.Fatalf("GetSites: %v", err)
|
||||
}
|
||||
if len(sites) != 0 {
|
||||
t.Fatalf("expected 0 sites, got %d", len(sites))
|
||||
}
|
||||
|
||||
if err := s.AddSite(models.Site{Name: "Test", URL: "https://example.com", Type: "http", Interval: 30}); err != nil {
|
||||
t.Fatalf("AddSite: %v", err)
|
||||
}
|
||||
|
||||
sites, err = s.GetSites()
|
||||
if err != nil {
|
||||
t.Fatalf("GetSites: %v", err)
|
||||
}
|
||||
if len(sites) != 1 {
|
||||
t.Fatalf("expected 1 site, got %d", len(sites))
|
||||
}
|
||||
if sites[0].Name != "Test" {
|
||||
t.Errorf("expected name 'Test', got '%s'", sites[0].Name)
|
||||
}
|
||||
|
||||
sites[0].Name = "Updated"
|
||||
if err := s.UpdateSite(sites[0]); err != nil {
|
||||
t.Fatalf("UpdateSite: %v", err)
|
||||
}
|
||||
|
||||
sites, _ = s.GetSites()
|
||||
if sites[0].Name != "Updated" {
|
||||
t.Errorf("expected name 'Updated', got '%s'", sites[0].Name)
|
||||
}
|
||||
|
||||
if err := s.DeleteSite(sites[0].ID); err != nil {
|
||||
t.Fatalf("DeleteSite: %v", err)
|
||||
}
|
||||
|
||||
sites, _ = s.GetSites()
|
||||
if len(sites) != 0 {
|
||||
t.Fatalf("expected 0 sites after delete, got %d", len(sites))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlertCRUD(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
if err := s.AddAlert("Discord", "discord", map[string]string{"url": "https://example.com/hook"}); err != nil {
|
||||
t.Fatalf("AddAlert: %v", err)
|
||||
}
|
||||
|
||||
alerts, err := s.GetAllAlerts()
|
||||
if err != nil {
|
||||
t.Fatalf("GetAllAlerts: %v", err)
|
||||
}
|
||||
if len(alerts) != 1 {
|
||||
t.Fatalf("expected 1 alert, got %d", len(alerts))
|
||||
}
|
||||
if alerts[0].Type != "discord" {
|
||||
t.Errorf("expected type 'discord', got '%s'", alerts[0].Type)
|
||||
}
|
||||
if alerts[0].Settings["url"] != "https://example.com/hook" {
|
||||
t.Errorf("settings url mismatch")
|
||||
}
|
||||
|
||||
a, err := s.GetAlert(alerts[0].ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetAlert: %v", err)
|
||||
}
|
||||
if a.Name != "Discord" {
|
||||
t.Errorf("expected name 'Discord', got '%s'", a.Name)
|
||||
}
|
||||
|
||||
if err := s.UpdateAlert(a.ID, "Slack", "slack", map[string]string{"url": "https://slack.com/hook"}); err != nil {
|
||||
t.Fatalf("UpdateAlert: %v", err)
|
||||
}
|
||||
|
||||
a, _ = s.GetAlert(a.ID)
|
||||
if a.Type != "slack" {
|
||||
t.Errorf("expected type 'slack', got '%s'", a.Type)
|
||||
}
|
||||
|
||||
if err := s.DeleteAlert(a.ID); err != nil {
|
||||
t.Fatalf("DeleteAlert: %v", err)
|
||||
}
|
||||
|
||||
alerts, _ = s.GetAllAlerts()
|
||||
if len(alerts) != 0 {
|
||||
t.Fatalf("expected 0 alerts after delete, got %d", len(alerts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserCRUD(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
if err := s.AddUser("admin", "ssh-ed25519 AAAA...", "admin"); err != nil {
|
||||
t.Fatalf("AddUser: %v", err)
|
||||
}
|
||||
|
||||
users, err := s.GetAllUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("GetAllUsers: %v", err)
|
||||
}
|
||||
if len(users) != 1 {
|
||||
t.Fatalf("expected 1 user, got %d", len(users))
|
||||
}
|
||||
if users[0].Username != "admin" {
|
||||
t.Errorf("expected username 'admin', got '%s'", users[0].Username)
|
||||
}
|
||||
|
||||
if err := s.UpdateUser(users[0].ID, "root", "ssh-ed25519 BBBB...", "admin"); err != nil {
|
||||
t.Fatalf("UpdateUser: %v", err)
|
||||
}
|
||||
|
||||
users, _ = s.GetAllUsers()
|
||||
if users[0].Username != "root" {
|
||||
t.Errorf("expected username 'root', got '%s'", users[0].Username)
|
||||
}
|
||||
|
||||
if err := s.DeleteUser(users[0].ID); err != nil {
|
||||
t.Fatalf("DeleteUser: %v", err)
|
||||
}
|
||||
|
||||
users, _ = s.GetAllUsers()
|
||||
if len(users) != 0 {
|
||||
t.Fatalf("expected 0 users after delete, got %d", len(users))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushTokenGeneration(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
if err := s.AddSite(models.Site{Name: "Push Monitor", Type: "push", Interval: 60}); err != nil {
|
||||
t.Fatalf("AddSite: %v", err)
|
||||
}
|
||||
|
||||
sites, _ := s.GetSites()
|
||||
if len(sites) != 1 {
|
||||
t.Fatalf("expected 1 site, got %d", len(sites))
|
||||
}
|
||||
if sites[0].Token == "" {
|
||||
t.Error("expected non-empty token for push monitor")
|
||||
}
|
||||
if len(sites[0].Token) != 32 {
|
||||
t.Errorf("expected 32-char hex token, got %d chars", len(sites[0].Token))
|
||||
}
|
||||
}
|
||||
|
||||
func TestImportExport(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
s.AddAlert("Test Alert", "webhook", map[string]string{"url": "https://example.com"})
|
||||
s.AddSite(models.Site{Name: "Site1", URL: "https://example.com", Type: "http", Interval: 30})
|
||||
s.AddUser("user1", "ssh-ed25519 KEY", "user")
|
||||
|
||||
backup, err := s.ExportData()
|
||||
if err != nil {
|
||||
t.Fatalf("ExportData: %v", err)
|
||||
}
|
||||
if len(backup.Sites) != 1 || len(backup.Alerts) != 1 || len(backup.Users) != 1 {
|
||||
t.Fatalf("export mismatch: %d sites, %d alerts, %d users", len(backup.Sites), len(backup.Alerts), len(backup.Users))
|
||||
}
|
||||
|
||||
s2 := newTestStore(t)
|
||||
if err := s2.ImportData(backup); err != nil {
|
||||
t.Fatalf("ImportData: %v", err)
|
||||
}
|
||||
|
||||
sites, _ := s2.GetSites()
|
||||
alerts, _ := s2.GetAllAlerts()
|
||||
users, _ := s2.GetAllUsers()
|
||||
if len(sites) != 1 || len(alerts) != 1 || len(users) != 1 {
|
||||
t.Fatalf("import mismatch: %d sites, %d alerts, %d users", len(sites), len(alerts), len(users))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckHistory(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
|
||||
if err := s.SaveCheck(1, 5000000, true); err != nil {
|
||||
t.Fatalf("SaveCheck: %v", err)
|
||||
}
|
||||
if err := s.SaveCheck(1, 10000000, false); err != nil {
|
||||
t.Fatalf("SaveCheck: %v", err)
|
||||
}
|
||||
if err := s.SaveCheck(2, 3000000, true); err != nil {
|
||||
t.Fatalf("SaveCheck site 2: %v", err)
|
||||
}
|
||||
|
||||
history, err := s.LoadAllHistory(10)
|
||||
if err != nil {
|
||||
t.Fatalf("LoadAllHistory: %v", err)
|
||||
}
|
||||
if len(history[1]) != 2 {
|
||||
t.Fatalf("expected 2 records for site 1, got %d", len(history[1]))
|
||||
}
|
||||
if len(history[2]) != 1 {
|
||||
t.Fatalf("expected 1 record for site 2, got %d", len(history[2]))
|
||||
}
|
||||
|
||||
upCount := 0
|
||||
for _, r := range history[1] {
|
||||
if r.IsUp {
|
||||
upCount++
|
||||
}
|
||||
}
|
||||
if upCount != 1 {
|
||||
t.Errorf("expected 1 up record for site 1, got %d", upCount)
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package tui
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go-upkeep/internal/monitor"
|
||||
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/charmbracelet/huh"
|
||||
@@ -237,11 +236,11 @@ func (m *Model) submitAlertForm() {
|
||||
|
||||
if m.editID > 0 {
|
||||
if err := m.store.UpdateAlert(m.editID, d.Name, d.AlertType, settings); err != nil {
|
||||
monitor.AddLog("Update alert failed: " + err.Error())
|
||||
m.engine.AddLog("Update alert failed: " + err.Error())
|
||||
}
|
||||
} else {
|
||||
if err := m.store.AddAlert(d.Name, d.AlertType, settings); err != nil {
|
||||
monitor.AddLog("Add alert failed: " + err.Error())
|
||||
m.engine.AddLog("Add alert failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
m.state = stateDashboard
|
||||
|
||||
@@ -3,7 +3,6 @@ package tui
|
||||
import (
|
||||
"fmt"
|
||||
"go-upkeep/internal/models"
|
||||
"go-upkeep/internal/monitor"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -243,7 +242,7 @@ func (m Model) viewSitesTab() string {
|
||||
name = limitStr(name, 13)
|
||||
}
|
||||
|
||||
hist, _ := monitor.GetHistory(site.ID)
|
||||
hist, _ := m.engine.GetHistory(site.ID)
|
||||
var spark string
|
||||
if site.Type == "push" {
|
||||
spark = heartbeatSparkline(hist.Statuses, sparkWidth)
|
||||
@@ -508,12 +507,12 @@ func (m *Model) submitSiteForm() {
|
||||
|
||||
if m.editID > 0 {
|
||||
if err := m.store.UpdateSite(site); err != nil {
|
||||
monitor.AddLog("Update site failed: " + err.Error())
|
||||
m.engine.AddLog("Update site failed: " + err.Error())
|
||||
}
|
||||
monitor.UpdateSiteConfig(site)
|
||||
m.engine.UpdateSiteConfig(site)
|
||||
} else {
|
||||
if err := m.store.AddSite(site); err != nil {
|
||||
monitor.AddLog("Add site failed: " + err.Error())
|
||||
m.engine.AddLog("Add site failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
m.state = stateDashboard
|
||||
|
||||
@@ -2,7 +2,6 @@ package tui
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go-upkeep/internal/monitor"
|
||||
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/charmbracelet/huh"
|
||||
@@ -104,11 +103,11 @@ func (m *Model) submitUserForm() {
|
||||
d := m.userFormData
|
||||
if m.editID > 0 {
|
||||
if err := m.store.UpdateUser(m.editID, d.Username, d.PublicKey, d.Role); err != nil {
|
||||
monitor.AddLog("Update user failed: " + err.Error())
|
||||
m.engine.AddLog("Update user failed: " + err.Error())
|
||||
}
|
||||
} else {
|
||||
if err := m.store.AddUser(d.Username, d.PublicKey, d.Role); err != nil {
|
||||
monitor.AddLog("Add user failed: " + err.Error())
|
||||
m.engine.AddLog("Add user failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
m.state = stateUsers
|
||||
|
||||
+10
-13
@@ -69,6 +69,7 @@ type Model struct {
|
||||
|
||||
collapsed map[int]bool
|
||||
store store.Store
|
||||
engine *monitor.Engine
|
||||
|
||||
// harmonica animation state
|
||||
pulseSpring harmonica.Spring
|
||||
@@ -81,7 +82,7 @@ type Model struct {
|
||||
users []models.User
|
||||
}
|
||||
|
||||
func InitialModel(isAdmin bool, s store.Store) Model {
|
||||
func InitialModel(isAdmin bool, s store.Store, eng *monitor.Engine) Model {
|
||||
vpLogs := viewport.New(100, 20)
|
||||
vpLogs.SetContent("Waiting for logs...")
|
||||
z := zone.New()
|
||||
@@ -92,6 +93,7 @@ func InitialModel(isAdmin bool, s store.Store) Model {
|
||||
maxTableRows: 5,
|
||||
isAdmin: isAdmin,
|
||||
store: s,
|
||||
engine: eng,
|
||||
zones: z,
|
||||
pulseSpring: spring,
|
||||
collapsed: make(map[int]bool),
|
||||
@@ -112,18 +114,18 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
switch m.deleteTab {
|
||||
case 0:
|
||||
if err := m.store.DeleteSite(m.deleteID); err != nil {
|
||||
monitor.AddLog("Delete site failed: " + err.Error())
|
||||
m.engine.AddLog("Delete site failed: " + err.Error())
|
||||
}
|
||||
monitor.RemoveSite(m.deleteID)
|
||||
m.engine.RemoveSite(m.deleteID)
|
||||
m.adjustCursor(len(m.sites) - 1)
|
||||
case 1:
|
||||
if err := m.store.DeleteAlert(m.deleteID); err != nil {
|
||||
monitor.AddLog("Delete alert failed: " + err.Error())
|
||||
m.engine.AddLog("Delete alert failed: " + err.Error())
|
||||
}
|
||||
m.adjustCursor(len(m.alerts) - 1)
|
||||
case 3:
|
||||
if err := m.store.DeleteUser(m.deleteID); err != nil {
|
||||
monitor.AddLog("Delete user failed: " + err.Error())
|
||||
m.engine.AddLog("Delete user failed: " + err.Error())
|
||||
}
|
||||
m.adjustCursor(len(m.users) - 1)
|
||||
}
|
||||
@@ -317,7 +319,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
case "p":
|
||||
if m.currentTab == 0 && len(m.sites) > 0 {
|
||||
site := m.sites[m.cursor]
|
||||
monitor.ToggleSitePause(site.ID)
|
||||
m.engine.ToggleSitePause(site.ID)
|
||||
site.Paused = !site.Paused
|
||||
_ = m.store.UpdateSitePaused(site.ID, site.Paused)
|
||||
m.refreshData()
|
||||
@@ -433,12 +435,7 @@ func (m *Model) adjustCursor(newLen int) {
|
||||
}
|
||||
|
||||
func (m *Model) refreshData() {
|
||||
monitor.Mutex.RLock()
|
||||
var allSites []models.Site
|
||||
for _, s := range monitor.LiveState {
|
||||
allSites = append(allSites, s)
|
||||
}
|
||||
monitor.Mutex.RUnlock()
|
||||
allSites := m.engine.GetAllSites()
|
||||
|
||||
var groups, ungrouped []models.Site
|
||||
children := make(map[int][]models.Site)
|
||||
@@ -476,7 +473,7 @@ func (m *Model) refreshData() {
|
||||
m.users = users
|
||||
}
|
||||
}
|
||||
m.logViewport.SetContent(strings.Join(monitor.GetLogs(), "\n"))
|
||||
m.logViewport.SetContent(strings.Join(m.engine.GetLogs(), "\n"))
|
||||
|
||||
listLen := len(m.sites)
|
||||
if m.currentTab == 1 {
|
||||
|
||||
Reference in New Issue
Block a user