5862a46b00
Module path now gitea.lerkolabs.com/lerko/uptop. Binary moves to cmd/uptop. All imports, display strings, CI config, and docs updated.
568 lines
13 KiB
Go
568 lines
13 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"gitea.lerkolabs.com/lerko/uptop/internal/alert"
|
|
"gitea.lerkolabs.com/lerko/uptop/internal/models"
|
|
"gitea.lerkolabs.com/lerko/uptop/internal/store"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
liveState map[int]models.Site
|
|
|
|
logMu sync.RWMutex
|
|
logStore []string
|
|
|
|
activeMu sync.RWMutex
|
|
isActive bool
|
|
|
|
histMu sync.RWMutex
|
|
histories map[int]*SiteHistory
|
|
|
|
tokenIndex map[string]int // protected by mu
|
|
|
|
probeResultsMu sync.RWMutex
|
|
probeResults map[int]map[string]NodeResult
|
|
aggStrategy AggregationStrategy
|
|
|
|
db store.Store
|
|
insecureSkipVerify bool
|
|
strictClient *http.Client
|
|
insecureClient *http.Client
|
|
}
|
|
|
|
func NewEngine(s store.Store) *Engine {
|
|
return &Engine{
|
|
liveState: make(map[int]models.Site),
|
|
histories: make(map[int]*SiteHistory),
|
|
tokenIndex: make(map[string]int),
|
|
probeResults: make(map[int]map[string]NodeResult),
|
|
aggStrategy: AggAnyDown,
|
|
isActive: true,
|
|
db: s,
|
|
strictClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
|
},
|
|
insecureClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, //nolint:gosec // intentional for IgnoreTLS sites
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
|
e.insecureSkipVerify = skip
|
|
}
|
|
|
|
func (e *Engine) AddLog(msg string) {
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
ts := time.Now().Format("15:04:05")
|
|
entry := fmt.Sprintf("[%s] %s", ts, msg)
|
|
e.logStore = append([]string{entry}, e.logStore...)
|
|
if len(e.logStore) > 100 {
|
|
e.logStore = e.logStore[:100]
|
|
}
|
|
go func() { _ = e.db.SaveLog(entry) }()
|
|
}
|
|
|
|
func (e *Engine) InitLogs() {
|
|
logs, err := e.db.LoadLogs(100)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(logs) == 0 {
|
|
return
|
|
}
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
e.logStore = logs
|
|
}
|
|
|
|
func (e *Engine) GetLogs() []string {
|
|
e.logMu.RLock()
|
|
defer e.logMu.RUnlock()
|
|
logs := make([]string, len(e.logStore))
|
|
copy(logs, e.logStore)
|
|
return logs
|
|
}
|
|
|
|
func (e *Engine) SetActive(active bool) {
|
|
e.activeMu.Lock()
|
|
defer e.activeMu.Unlock()
|
|
if e.isActive != active {
|
|
e.isActive = active
|
|
status := "RESUMED (Active)"
|
|
if !active {
|
|
status = "PAUSED (Passive)"
|
|
}
|
|
e.AddLog(fmt.Sprintf("Engine %s", status))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) IsActive() bool {
|
|
e.activeMu.RLock()
|
|
defer e.activeMu.RUnlock()
|
|
return e.isActive
|
|
}
|
|
|
|
func (e *Engine) GetAllSites() []models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
sites := make([]models.Site, 0, len(e.liveState))
|
|
for _, s := range e.liveState {
|
|
sites = append(sites, s)
|
|
}
|
|
return sites
|
|
}
|
|
|
|
func (e *Engine) GetLiveState() map[int]models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
cp := make(map[int]models.Site, len(e.liveState))
|
|
for k, v := range e.liveState {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) RecordHeartbeat(token string) bool {
|
|
if !e.IsActive() {
|
|
return false
|
|
}
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
targetID, ok := e.tokenIndex[token]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
site, exists := e.liveState[targetID]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
site.LastCheck = time.Now()
|
|
wasDown := site.Status == "DOWN"
|
|
site.Status = "UP"
|
|
site.FailureCount = 0
|
|
site.Latency = 0
|
|
e.liveState[targetID] = site
|
|
|
|
if wasDown {
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name))
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) addToTokenIndex(site models.Site) {
|
|
if site.Type == "push" && site.Token != "" {
|
|
e.tokenIndex[site.Token] = site.ID
|
|
}
|
|
}
|
|
|
|
func (e *Engine) removeFromTokenIndex(id int) {
|
|
for token, sid := range e.tokenIndex {
|
|
if sid == id {
|
|
delete(e.tokenIndex, token)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start(ctx context.Context) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
sites, err := e.db.GetSites()
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
for _, s := range sites {
|
|
e.mu.RLock()
|
|
_, exists := e.liveState[s.ID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
e.mu.Lock()
|
|
s.Status = "PENDING"
|
|
if s.Type == "push" {
|
|
s.LastCheck = time.Now()
|
|
}
|
|
if h, ok := e.GetHistory(s.ID); ok && len(h.Statuses) > 0 {
|
|
if h.Statuses[len(h.Statuses)-1] {
|
|
s.Status = "UP"
|
|
} else {
|
|
s.Status = "DOWN"
|
|
}
|
|
if len(h.Latencies) > 0 {
|
|
s.Latency = h.Latencies[len(h.Latencies)-1]
|
|
}
|
|
}
|
|
e.liveState[s.ID] = s
|
|
e.addToTokenIndex(s)
|
|
e.mu.Unlock()
|
|
go e.monitorRoutine(ctx, s.ID)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if existing, ok := e.liveState[site.ID]; ok {
|
|
e.removeFromTokenIndex(site.ID)
|
|
site.Status = existing.Status
|
|
site.StatusCode = existing.StatusCode
|
|
site.Latency = existing.Latency
|
|
site.CertExpiry = existing.CertExpiry
|
|
site.HasSSL = existing.HasSSL
|
|
site.LastCheck = existing.LastCheck
|
|
site.SentSSLWarning = existing.SentSSLWarning
|
|
site.FailureCount = existing.FailureCount
|
|
e.liveState[site.ID] = site
|
|
e.addToTokenIndex(site)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) RemoveSite(id int) {
|
|
e.mu.Lock()
|
|
e.removeFromTokenIndex(id)
|
|
delete(e.liveState, id)
|
|
e.mu.Unlock()
|
|
e.removeHistory(id)
|
|
}
|
|
|
|
func (e *Engine) ToggleSitePause(id int) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
site, ok := e.liveState[id]
|
|
if !ok {
|
|
return false
|
|
}
|
|
site.Paused = !site.Paused
|
|
e.liveState[id] = site
|
|
if site.Paused {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name))
|
|
}
|
|
return site.Paused
|
|
}
|
|
|
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|
// Stagger initial check to avoid thundering herd on startup
|
|
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(stagger):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
e.checkByID(id)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.IsActive() {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if site.Paused {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
interval := site.Interval
|
|
if interval < 5 {
|
|
interval = 5
|
|
}
|
|
jitter := time.Duration(rand.IntN(interval*100)) * time.Millisecond //nolint:gosec // non-security jitter
|
|
select {
|
|
case <-time.After(time.Duration(interval)*time.Second + jitter):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
e.checkByID(id)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkByID(id int) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists || site.Paused {
|
|
return
|
|
}
|
|
|
|
switch site.Type {
|
|
case "push":
|
|
e.checkPush(site)
|
|
case "group":
|
|
e.checkGroup(site)
|
|
default:
|
|
result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify)
|
|
updatedSite := site
|
|
updatedSite.HasSSL = result.HasSSL
|
|
updatedSite.CertExpiry = result.CertExpiry
|
|
updatedSite.Latency = time.Duration(result.LatencyNs)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkPush(site models.Site) {
|
|
deadline := site.LastCheck.Add(time.Duration(site.Interval) * time.Second).Add(5 * time.Second)
|
|
if time.Now().After(deadline) {
|
|
e.handleStatusChange(site, "DOWN", 0, 0)
|
|
} else if site.Status != "UP" {
|
|
e.handleStatusChange(site, "UP", 200, 0)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
newState := site
|
|
newState.StatusCode = code
|
|
|
|
if site.Status == "UP" && rawStatus != "UP" {
|
|
newState.FailureCount++
|
|
if newState.FailureCount > site.MaxRetries {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries))
|
|
}
|
|
} else if rawStatus == "UP" {
|
|
newState.FailureCount = 0
|
|
newState.Status = "UP"
|
|
} else {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
}
|
|
|
|
inMaint := e.isInMaintenance(site.ID)
|
|
|
|
if site.Type == "http" && site.CheckSSL && site.HasSSL {
|
|
daysLeft := int(time.Until(site.CertExpiry).Hours() / 24)
|
|
if daysLeft <= site.ExpiryThreshold && !site.SentSSLWarning && rawStatus != "SSL EXP" {
|
|
if !inMaint {
|
|
e.triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("SSL warning for '%s' suppressed (maintenance)", site.Name))
|
|
}
|
|
newState.SentSSLWarning = true
|
|
} else if daysLeft > site.ExpiryThreshold {
|
|
newState.SentSSLWarning = false
|
|
}
|
|
}
|
|
|
|
e.mu.Lock()
|
|
if _, ok := e.liveState[site.ID]; ok {
|
|
e.liveState[site.ID] = newState
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.recordCheck(site.ID, latency, rawStatus == "UP")
|
|
|
|
isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" }
|
|
if !isBroken(site.Status) && isBroken(newState.Status) && newState.Status != "PENDING" {
|
|
if inMaint {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' is DOWN (alerts suppressed — maintenance)", site.Name))
|
|
} else {
|
|
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", site.Name, rawStatus)
|
|
if site.Type == "push" {
|
|
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", site.Name)
|
|
}
|
|
e.triggerAlert(site.AlertID, "🚨 ALERT", msg)
|
|
}
|
|
}
|
|
if isBroken(site.Status) && newState.Status == "UP" {
|
|
if !inMaint {
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' recovered (maintenance active, alert suppressed)", site.Name))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
|
cfg, err := e.db.GetAlert(alertID)
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load alert config %d: %v", alertID, err))
|
|
return
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider != nil {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if err := provider.Send(ctx, title, message); err != nil {
|
|
e.AddLog(fmt.Sprintf("Alert send failed (%s): %v", cfg.Name, err))
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (e *Engine) isInMaintenance(monitorID int) bool {
|
|
inMaint, err := e.db.IsMonitorInMaintenance(monitorID)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return inMaint
|
|
}
|
|
|
|
func (e *Engine) GetDisplayStatus(site models.Site) string {
|
|
if site.Paused {
|
|
return "PAUSED"
|
|
}
|
|
if e.isInMaintenance(site.ID) {
|
|
return "MAINT"
|
|
}
|
|
return site.Status
|
|
}
|
|
|
|
func (e *Engine) checkGroup(site models.Site) {
|
|
e.mu.RLock()
|
|
status := "UP"
|
|
hasChildren := false
|
|
allPaused := true
|
|
for _, child := range e.liveState {
|
|
if child.ParentID != site.ID || child.Type == "group" {
|
|
continue
|
|
}
|
|
hasChildren = true
|
|
if !child.Paused {
|
|
allPaused = false
|
|
}
|
|
if child.Paused || e.isInMaintenance(child.ID) {
|
|
continue
|
|
}
|
|
if child.Status == "DOWN" || child.Status == "SSL EXP" {
|
|
status = "DOWN"
|
|
} else if child.Status == "PENDING" && status != "DOWN" {
|
|
status = "PENDING"
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if !hasChildren {
|
|
status = "PENDING"
|
|
}
|
|
|
|
e.mu.Lock()
|
|
s := e.liveState[site.ID]
|
|
s.Status = status
|
|
if hasChildren && allPaused {
|
|
s.Paused = true
|
|
}
|
|
e.liveState[site.ID] = s
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) SetAggStrategy(strategy AggregationStrategy) {
|
|
e.aggStrategy = strategy
|
|
}
|
|
|
|
func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) {
|
|
e.probeResultsMu.Lock()
|
|
if e.probeResults[siteID] == nil {
|
|
e.probeResults[siteID] = make(map[string]NodeResult)
|
|
}
|
|
e.probeResults[siteID][nodeID] = NodeResult{
|
|
NodeID: nodeID,
|
|
IsUp: isUp,
|
|
LatencyNs: latencyNs,
|
|
CheckedAt: time.Now(),
|
|
}
|
|
results := make([]NodeResult, 0, len(e.probeResults[siteID]))
|
|
for _, r := range e.probeResults[siteID] {
|
|
results = append(results, r)
|
|
}
|
|
e.probeResultsMu.Unlock()
|
|
|
|
aggUp, avgLatency := AggregateStatus(results, e.aggStrategy)
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[siteID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
rawStatus := "UP"
|
|
if !aggUp {
|
|
rawStatus = "DOWN"
|
|
}
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = time.Duration(avgLatency)
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency))
|
|
}
|
|
|
|
func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult {
|
|
e.probeResultsMu.RLock()
|
|
defer e.probeResultsMu.RUnlock()
|
|
src := e.probeResults[siteID]
|
|
cp := make(map[string]NodeResult, len(src))
|
|
for k, v := range src {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|