8e6d97710b
Provider.Send now accepts context.Context for timeout/cancellation. HTTPProvider and NtfyProvider use NewRequestWithContext so HTTP alerts respect the 30s deadline. triggerAlert logs send failures and config load errors instead of silently swallowing them.
558 lines
12 KiB
Go
558 lines
12 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"go-upkeep/internal/alert"
|
|
"go-upkeep/internal/models"
|
|
"go-upkeep/internal/store"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
liveState map[int]models.Site
|
|
|
|
logMu sync.RWMutex
|
|
logStore []string
|
|
|
|
activeMu sync.RWMutex
|
|
isActive bool
|
|
|
|
histMu sync.RWMutex
|
|
histories map[int]*SiteHistory
|
|
|
|
tokenIndex map[string]int // protected by mu
|
|
|
|
probeResultsMu sync.RWMutex
|
|
probeResults map[int]map[string]NodeResult
|
|
aggStrategy AggregationStrategy
|
|
|
|
db store.Store
|
|
insecureSkipVerify bool
|
|
strictClient *http.Client
|
|
insecureClient *http.Client
|
|
}
|
|
|
|
func NewEngine(s store.Store) *Engine {
|
|
return &Engine{
|
|
liveState: make(map[int]models.Site),
|
|
histories: make(map[int]*SiteHistory),
|
|
tokenIndex: make(map[string]int),
|
|
probeResults: make(map[int]map[string]NodeResult),
|
|
aggStrategy: AggAnyDown,
|
|
isActive: true,
|
|
db: s,
|
|
strictClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
|
},
|
|
insecureClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
|
e.insecureSkipVerify = skip
|
|
}
|
|
|
|
func (e *Engine) AddLog(msg string) {
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
ts := time.Now().Format("15:04:05")
|
|
entry := fmt.Sprintf("[%s] %s", ts, msg)
|
|
e.logStore = append([]string{entry}, e.logStore...)
|
|
if len(e.logStore) > 100 {
|
|
e.logStore = e.logStore[:100]
|
|
}
|
|
go func() { _ = e.db.SaveLog(entry) }()
|
|
}
|
|
|
|
func (e *Engine) InitLogs() {
|
|
logs, err := e.db.LoadLogs(100)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(logs) == 0 {
|
|
return
|
|
}
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
e.logStore = logs
|
|
}
|
|
|
|
func (e *Engine) GetLogs() []string {
|
|
e.logMu.RLock()
|
|
defer e.logMu.RUnlock()
|
|
logs := make([]string, len(e.logStore))
|
|
copy(logs, e.logStore)
|
|
return logs
|
|
}
|
|
|
|
func (e *Engine) SetActive(active bool) {
|
|
e.activeMu.Lock()
|
|
defer e.activeMu.Unlock()
|
|
if e.isActive != active {
|
|
e.isActive = active
|
|
status := "RESUMED (Active)"
|
|
if !active {
|
|
status = "PAUSED (Passive)"
|
|
}
|
|
e.AddLog(fmt.Sprintf("Engine %s", status))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) IsActive() bool {
|
|
e.activeMu.RLock()
|
|
defer e.activeMu.RUnlock()
|
|
return e.isActive
|
|
}
|
|
|
|
func (e *Engine) GetAllSites() []models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
sites := make([]models.Site, 0, len(e.liveState))
|
|
for _, s := range e.liveState {
|
|
sites = append(sites, s)
|
|
}
|
|
return sites
|
|
}
|
|
|
|
func (e *Engine) GetLiveState() map[int]models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
cp := make(map[int]models.Site, len(e.liveState))
|
|
for k, v := range e.liveState {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) RecordHeartbeat(token string) bool {
|
|
if !e.IsActive() {
|
|
return false
|
|
}
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
targetID, ok := e.tokenIndex[token]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
site, exists := e.liveState[targetID]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
site.LastCheck = time.Now()
|
|
wasDown := site.Status == "DOWN"
|
|
site.Status = "UP"
|
|
site.FailureCount = 0
|
|
site.Latency = 0
|
|
e.liveState[targetID] = site
|
|
|
|
if wasDown {
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name))
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) addToTokenIndex(site models.Site) {
|
|
if site.Type == "push" && site.Token != "" {
|
|
e.tokenIndex[site.Token] = site.ID
|
|
}
|
|
}
|
|
|
|
func (e *Engine) removeFromTokenIndex(id int) {
|
|
for token, sid := range e.tokenIndex {
|
|
if sid == id {
|
|
delete(e.tokenIndex, token)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start(ctx context.Context) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
sites, err := e.db.GetSites()
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
for _, s := range sites {
|
|
e.mu.RLock()
|
|
_, exists := e.liveState[s.ID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
e.mu.Lock()
|
|
s.Status = "PENDING"
|
|
if s.Type == "push" {
|
|
s.LastCheck = time.Now()
|
|
}
|
|
if h, ok := e.GetHistory(s.ID); ok && len(h.Statuses) > 0 {
|
|
if h.Statuses[len(h.Statuses)-1] {
|
|
s.Status = "UP"
|
|
} else {
|
|
s.Status = "DOWN"
|
|
}
|
|
if len(h.Latencies) > 0 {
|
|
s.Latency = h.Latencies[len(h.Latencies)-1]
|
|
}
|
|
}
|
|
e.liveState[s.ID] = s
|
|
e.addToTokenIndex(s)
|
|
e.mu.Unlock()
|
|
go e.monitorRoutine(ctx, s.ID)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if existing, ok := e.liveState[site.ID]; ok {
|
|
e.removeFromTokenIndex(site.ID)
|
|
site.Status = existing.Status
|
|
site.StatusCode = existing.StatusCode
|
|
site.Latency = existing.Latency
|
|
site.CertExpiry = existing.CertExpiry
|
|
site.HasSSL = existing.HasSSL
|
|
site.LastCheck = existing.LastCheck
|
|
site.SentSSLWarning = existing.SentSSLWarning
|
|
site.FailureCount = existing.FailureCount
|
|
e.liveState[site.ID] = site
|
|
e.addToTokenIndex(site)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) RemoveSite(id int) {
|
|
e.mu.Lock()
|
|
e.removeFromTokenIndex(id)
|
|
delete(e.liveState, id)
|
|
e.mu.Unlock()
|
|
e.removeHistory(id)
|
|
}
|
|
|
|
func (e *Engine) ToggleSitePause(id int) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
site, ok := e.liveState[id]
|
|
if !ok {
|
|
return false
|
|
}
|
|
site.Paused = !site.Paused
|
|
e.liveState[id] = site
|
|
if site.Paused {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name))
|
|
}
|
|
return site.Paused
|
|
}
|
|
|
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|
e.checkByID(id)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.IsActive() {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if site.Paused {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
interval := site.Interval
|
|
if interval < 5 {
|
|
interval = 5
|
|
}
|
|
select {
|
|
case <-time.After(time.Duration(interval) * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
e.checkByID(id)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkByID(id int) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists || site.Paused {
|
|
return
|
|
}
|
|
|
|
switch site.Type {
|
|
case "push":
|
|
e.checkPush(site)
|
|
case "group":
|
|
e.checkGroup(site)
|
|
default:
|
|
result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify)
|
|
updatedSite := site
|
|
updatedSite.HasSSL = result.HasSSL
|
|
updatedSite.CertExpiry = result.CertExpiry
|
|
updatedSite.Latency = time.Duration(result.LatencyNs)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkPush(site models.Site) {
|
|
deadline := site.LastCheck.Add(time.Duration(site.Interval) * time.Second).Add(5 * time.Second)
|
|
if time.Now().After(deadline) {
|
|
e.handleStatusChange(site, "DOWN", 0, 0)
|
|
} else if site.Status != "UP" {
|
|
e.handleStatusChange(site, "UP", 200, 0)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
newState := site
|
|
newState.StatusCode = code
|
|
|
|
if site.Status == "UP" && rawStatus != "UP" {
|
|
newState.FailureCount++
|
|
if newState.FailureCount > site.MaxRetries {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries))
|
|
}
|
|
} else if rawStatus == "UP" {
|
|
newState.FailureCount = 0
|
|
newState.Status = "UP"
|
|
} else {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
}
|
|
|
|
inMaint := e.isInMaintenance(site.ID)
|
|
|
|
if site.Type == "http" && site.CheckSSL && site.HasSSL {
|
|
daysLeft := int(time.Until(site.CertExpiry).Hours() / 24)
|
|
if daysLeft <= site.ExpiryThreshold && !site.SentSSLWarning && rawStatus != "SSL EXP" {
|
|
if !inMaint {
|
|
e.triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("SSL warning for '%s' suppressed (maintenance)", site.Name))
|
|
}
|
|
newState.SentSSLWarning = true
|
|
} else if daysLeft > site.ExpiryThreshold {
|
|
newState.SentSSLWarning = false
|
|
}
|
|
}
|
|
|
|
e.mu.Lock()
|
|
if _, ok := e.liveState[site.ID]; ok {
|
|
e.liveState[site.ID] = newState
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.recordCheck(site.ID, latency, rawStatus == "UP")
|
|
|
|
isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" }
|
|
if !isBroken(site.Status) && isBroken(newState.Status) && newState.Status != "PENDING" {
|
|
if inMaint {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' is DOWN (alerts suppressed — maintenance)", site.Name))
|
|
} else {
|
|
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", site.Name, rawStatus)
|
|
if site.Type == "push" {
|
|
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", site.Name)
|
|
}
|
|
e.triggerAlert(site.AlertID, "🚨 ALERT", msg)
|
|
}
|
|
}
|
|
if isBroken(site.Status) && newState.Status == "UP" {
|
|
if !inMaint {
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' recovered (maintenance active, alert suppressed)", site.Name))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
|
cfg, err := e.db.GetAlert(alertID)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load alert config %d: %v", alertID, err))
|
|
return
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider != nil {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if err := provider.Send(ctx, title, message); err != nil {
|
|
e.AddLog(fmt.Sprintf("Alert send failed (%s): %v", cfg.Name, err))
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (e *Engine) isInMaintenance(monitorID int) bool {
|
|
inMaint, err := e.db.IsMonitorInMaintenance(monitorID)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return inMaint
|
|
}
|
|
|
|
func (e *Engine) GetDisplayStatus(site models.Site) string {
|
|
if site.Paused {
|
|
return "PAUSED"
|
|
}
|
|
if e.isInMaintenance(site.ID) {
|
|
return "MAINT"
|
|
}
|
|
return site.Status
|
|
}
|
|
|
|
func (e *Engine) checkGroup(site models.Site) {
|
|
e.mu.RLock()
|
|
status := "UP"
|
|
hasChildren := false
|
|
allPaused := true
|
|
for _, child := range e.liveState {
|
|
if child.ParentID != site.ID || child.Type == "group" {
|
|
continue
|
|
}
|
|
hasChildren = true
|
|
if !child.Paused {
|
|
allPaused = false
|
|
}
|
|
if child.Paused || e.isInMaintenance(child.ID) {
|
|
continue
|
|
}
|
|
if child.Status == "DOWN" || child.Status == "SSL EXP" {
|
|
status = "DOWN"
|
|
} else if child.Status == "PENDING" && status != "DOWN" {
|
|
status = "PENDING"
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if !hasChildren {
|
|
status = "PENDING"
|
|
}
|
|
|
|
e.mu.Lock()
|
|
s := e.liveState[site.ID]
|
|
s.Status = status
|
|
if hasChildren && allPaused {
|
|
s.Paused = true
|
|
}
|
|
e.liveState[site.ID] = s
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
|
|
e.aggStrategy = strategy
|
|
}
|
|
|
|
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) {
|
|
e.probeResultsMu.Lock()
|
|
if e.probeResults[siteID] == nil {
|
|
e.probeResults[siteID] = make(map[string]NodeResult)
|
|
}
|
|
e.probeResults[siteID][nodeID] = NodeResult{
|
|
NodeID: nodeID,
|
|
IsUp: isUp,
|
|
LatencyNs: latencyNs,
|
|
CheckedAt: time.Now(),
|
|
}
|
|
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
|
|
for _, r := range e.probeResults[siteID] {
|
|
results = append(results, r)
|
|
}
|
|
e.probeResultsMu.Unlock()
|
|
|
|
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[siteID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
rawStatus := "UP"
|
|
if !aggUp {
|
|
rawStatus = "DOWN"
|
|
}
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = time.Duration(avgLatency)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency))
|
|
}
|
|
|
|
func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
|
|
e.probeResultsMu.RLock()
|
|
defer e.probeResultsMu.RUnlock()
|
|
src := e.probeResults[siteID]
|
|
cp := make(map[string]NodeResult, len(src))
|
|
for k, v := range src {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|