f023e38fdc
Replace all monitor package-level mutable state with Engine struct. All state (liveState, logStore, histories, tokenIndex, HTTP clients) is now encapsulated in Engine, created via NewEngine(store). Key changes: - Engine struct holds all monitor state with proper mutex protection - Engine.Start(ctx) and monitorRoutine respect context cancellation for graceful shutdown — no more leaked goroutines - cluster.runFollowerLoop also respects context for clean exit - Token index (map[string]int) for O(1) push heartbeat lookup, replacing O(n) linear scan through LiveState - UpdateSiteConfig preserves 8 runtime fields instead of copying 17 config fields individually - triggerAlert goroutines get 30s timeout context - All consumers (TUI, server, cluster, main) receive *Engine via constructor/parameter — no package-level state access - main.go creates context.WithCancel, passes to engine and cluster First test suite: 12 tests across store and alert packages - Store: CRUD for sites/alerts/users, push token generation, import/export round-trip, check history persistence - Alert: Discord/Slack/Webhook payload format, HTTP 4xx error propagation, Ntfy headers, unknown provider returns nil
651 lines
14 KiB
Go
651 lines
14 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"go-upkeep/internal/alert"
|
|
"go-upkeep/internal/models"
|
|
"go-upkeep/internal/store"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/miekg/dns"
|
|
probing "github.com/prometheus-community/pro-bing"
|
|
)
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
liveState map[int]models.Site
|
|
|
|
logMu sync.RWMutex
|
|
logStore []string
|
|
|
|
activeMu sync.RWMutex
|
|
isActive bool
|
|
|
|
histMu sync.RWMutex
|
|
histories map[int]*SiteHistory
|
|
|
|
tokenIndex map[string]int
|
|
|
|
db store.Store
|
|
insecureSkipVerify bool
|
|
strictClient *http.Client
|
|
insecureClient *http.Client
|
|
}
|
|
|
|
func NewEngine(s store.Store) *Engine {
|
|
return &Engine{
|
|
liveState: make(map[int]models.Site),
|
|
histories: make(map[int]*SiteHistory),
|
|
tokenIndex: make(map[string]int),
|
|
isActive: true,
|
|
db: s,
|
|
strictClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}},
|
|
},
|
|
insecureClient: &http.Client{
|
|
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetInsecureSkipVerify(skip bool) {
|
|
e.insecureSkipVerify = skip
|
|
}
|
|
|
|
func (e *Engine) AddLog(msg string) {
|
|
e.logMu.Lock()
|
|
defer e.logMu.Unlock()
|
|
ts := time.Now().Format("15:04:05")
|
|
entry := fmt.Sprintf("[%s] %s", ts, msg)
|
|
e.logStore = append([]string{entry}, e.logStore...)
|
|
if len(e.logStore) > 100 {
|
|
e.logStore = e.logStore[:100]
|
|
}
|
|
}
|
|
|
|
func (e *Engine) GetLogs() []string {
|
|
e.logMu.RLock()
|
|
defer e.logMu.RUnlock()
|
|
logs := make([]string, len(e.logStore))
|
|
copy(logs, e.logStore)
|
|
return logs
|
|
}
|
|
|
|
func (e *Engine) SetActive(active bool) {
|
|
e.activeMu.Lock()
|
|
defer e.activeMu.Unlock()
|
|
if e.isActive != active {
|
|
e.isActive = active
|
|
status := "RESUMED (Active)"
|
|
if !active {
|
|
status = "PAUSED (Passive)"
|
|
}
|
|
e.AddLog(fmt.Sprintf("Engine %s", status))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) IsActive() bool {
|
|
e.activeMu.RLock()
|
|
defer e.activeMu.RUnlock()
|
|
return e.isActive
|
|
}
|
|
|
|
func (e *Engine) GetAllSites() []models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
sites := make([]models.Site, 0, len(e.liveState))
|
|
for _, s := range e.liveState {
|
|
sites = append(sites, s)
|
|
}
|
|
return sites
|
|
}
|
|
|
|
func (e *Engine) GetLiveState() map[int]models.Site {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
cp := make(map[int]models.Site, len(e.liveState))
|
|
for k, v := range e.liveState {
|
|
cp[k] = v
|
|
}
|
|
return cp
|
|
}
|
|
|
|
func (e *Engine) RecordHeartbeat(token string) bool {
|
|
if !e.IsActive() {
|
|
return false
|
|
}
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
targetID, ok := e.tokenIndex[token]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
site, exists := e.liveState[targetID]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
site.LastCheck = time.Now()
|
|
wasDown := site.Status == "DOWN"
|
|
site.Status = "UP"
|
|
site.FailureCount = 0
|
|
site.Latency = 0
|
|
e.liveState[targetID] = site
|
|
|
|
if wasDown {
|
|
e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name))
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) addToTokenIndex(site models.Site) {
|
|
if site.Type == "push" && site.Token != "" {
|
|
e.tokenIndex[site.Token] = site.ID
|
|
}
|
|
}
|
|
|
|
func (e *Engine) removeFromTokenIndex(id int) {
|
|
for token, sid := range e.tokenIndex {
|
|
if sid == id {
|
|
delete(e.tokenIndex, token)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start(ctx context.Context) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
sites, err := e.db.GetSites()
|
|
if err != nil {
|
|
e.AddLog(fmt.Sprintf("Failed to load sites: %v", err))
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
for _, s := range sites {
|
|
e.mu.RLock()
|
|
_, exists := e.liveState[s.ID]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
e.mu.Lock()
|
|
s.Status = "PENDING"
|
|
if s.Type == "push" {
|
|
s.LastCheck = time.Now()
|
|
}
|
|
e.liveState[s.ID] = s
|
|
e.addToTokenIndex(s)
|
|
e.mu.Unlock()
|
|
go e.monitorRoutine(ctx, s.ID)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if existing, ok := e.liveState[site.ID]; ok {
|
|
e.removeFromTokenIndex(site.ID)
|
|
site.Status = existing.Status
|
|
site.StatusCode = existing.StatusCode
|
|
site.Latency = existing.Latency
|
|
site.CertExpiry = existing.CertExpiry
|
|
site.HasSSL = existing.HasSSL
|
|
site.LastCheck = existing.LastCheck
|
|
site.SentSSLWarning = existing.SentSSLWarning
|
|
site.FailureCount = existing.FailureCount
|
|
e.liveState[site.ID] = site
|
|
e.addToTokenIndex(site)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) RemoveSite(id int) {
|
|
e.mu.Lock()
|
|
e.removeFromTokenIndex(id)
|
|
delete(e.liveState, id)
|
|
e.mu.Unlock()
|
|
e.removeHistory(id)
|
|
}
|
|
|
|
func (e *Engine) ToggleSitePause(id int) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
site, ok := e.liveState[id]
|
|
if !ok {
|
|
return false
|
|
}
|
|
site.Paused = !site.Paused
|
|
e.liveState[id] = site
|
|
if site.Paused {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name))
|
|
}
|
|
return site.Paused
|
|
}
|
|
|
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|
e.checkByID(id)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.IsActive() {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if site.Paused {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
interval := site.Interval
|
|
if interval < 5 {
|
|
interval = 5
|
|
}
|
|
select {
|
|
case <-time.After(time.Duration(interval) * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
e.checkByID(id)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkByID(id int) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
e.mu.RLock()
|
|
site, exists := e.liveState[id]
|
|
e.mu.RUnlock()
|
|
if !exists || site.Paused {
|
|
return
|
|
}
|
|
switch site.Type {
|
|
case "http":
|
|
e.checkHTTP(site)
|
|
case "push":
|
|
e.checkPush(site)
|
|
case "ping":
|
|
e.checkPing(site)
|
|
case "port":
|
|
e.checkPort(site)
|
|
case "dns":
|
|
e.checkDNS(site)
|
|
case "group":
|
|
e.checkGroup(site)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkPush(site models.Site) {
|
|
deadline := site.LastCheck.Add(time.Duration(site.Interval) * time.Second).Add(5 * time.Second)
|
|
if time.Now().After(deadline) {
|
|
e.handleStatusChange(site, "DOWN", 0, 0)
|
|
} else if site.Status != "UP" {
|
|
e.handleStatusChange(site, "UP", 200, 0)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) checkHTTP(site models.Site) {
|
|
method := site.Method
|
|
if method == "" {
|
|
method = "GET"
|
|
}
|
|
|
|
timeout := siteTimeout(site)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, site.URL, nil)
|
|
if err != nil {
|
|
e.handleStatusChange(site, "DOWN", 0, 0)
|
|
return
|
|
}
|
|
|
|
client := e.strictClient
|
|
if e.insecureSkipVerify || site.IgnoreTLS {
|
|
client = e.insecureClient
|
|
}
|
|
|
|
start := time.Now()
|
|
resp, err := client.Do(req)
|
|
latency := time.Since(start)
|
|
|
|
rawStatus := "UP"
|
|
rawCode := 0
|
|
var certExpiry time.Time
|
|
hasSSL := false
|
|
|
|
if err != nil {
|
|
rawStatus = "DOWN"
|
|
} else {
|
|
defer resp.Body.Close()
|
|
rawCode = resp.StatusCode
|
|
if !isCodeAccepted(rawCode, site.AcceptedCodes) {
|
|
rawStatus = "DOWN"
|
|
}
|
|
if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
|
|
hasSSL = true
|
|
cert := resp.TLS.PeerCertificates[0]
|
|
certExpiry = cert.NotAfter
|
|
if time.Now().After(cert.NotAfter) {
|
|
rawStatus = "SSL EXP"
|
|
}
|
|
}
|
|
}
|
|
updatedSite := site
|
|
updatedSite.HasSSL = hasSSL
|
|
updatedSite.CertExpiry = certExpiry
|
|
updatedSite.Latency = latency
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, rawStatus, rawCode, latency)
|
|
}
|
|
|
|
func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) {
|
|
if !e.IsActive() {
|
|
return
|
|
}
|
|
|
|
newState := site
|
|
newState.StatusCode = code
|
|
|
|
if site.Status == "UP" && rawStatus != "UP" {
|
|
newState.FailureCount++
|
|
if newState.FailureCount > site.MaxRetries {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name))
|
|
} else {
|
|
e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries))
|
|
}
|
|
} else if rawStatus == "UP" {
|
|
newState.FailureCount = 0
|
|
newState.Status = "UP"
|
|
} else {
|
|
newState.Status = rawStatus
|
|
newState.FailureCount = site.MaxRetries + 1
|
|
}
|
|
|
|
if site.Type == "http" && site.CheckSSL && site.HasSSL {
|
|
daysLeft := int(time.Until(site.CertExpiry).Hours() / 24)
|
|
if daysLeft <= site.ExpiryThreshold && !site.SentSSLWarning && rawStatus != "SSL EXP" {
|
|
e.triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft))
|
|
newState.SentSSLWarning = true
|
|
} else if daysLeft > site.ExpiryThreshold {
|
|
newState.SentSSLWarning = false
|
|
}
|
|
}
|
|
|
|
e.mu.Lock()
|
|
if _, ok := e.liveState[site.ID]; ok {
|
|
e.liveState[site.ID] = newState
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.recordCheck(site.ID, latency, rawStatus == "UP")
|
|
|
|
isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" }
|
|
if !isBroken(site.Status) && isBroken(newState.Status) && newState.Status != "PENDING" {
|
|
msg := fmt.Sprintf("Monitor '%s' is DOWN (%s)", site.Name, rawStatus)
|
|
if site.Type == "push" {
|
|
msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", site.Name)
|
|
}
|
|
e.triggerAlert(site.AlertID, "🚨 ALERT", msg)
|
|
}
|
|
if isBroken(site.Status) && newState.Status == "UP" {
|
|
e.triggerAlert(site.AlertID, "✅ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) triggerAlert(alertID int, title, message string) {
|
|
cfg, err := e.db.GetAlert(alertID)
|
|
if err != nil {
|
|
return
|
|
}
|
|
provider := alert.GetProvider(cfg)
|
|
if provider != nil {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
_ = ctx
|
|
_ = provider.Send(title, message)
|
|
}()
|
|
}
|
|
}
|
|
|
|
func siteTimeout(site models.Site) time.Duration {
|
|
if site.Timeout > 0 {
|
|
return time.Duration(site.Timeout) * time.Second
|
|
}
|
|
return 5 * time.Second
|
|
}
|
|
|
|
func isCodeAccepted(code int, accepted string) bool {
|
|
if accepted == "" {
|
|
return code >= 200 && code < 300
|
|
}
|
|
for _, part := range strings.Split(accepted, ",") {
|
|
part = strings.TrimSpace(part)
|
|
if strings.Contains(part, "-") {
|
|
bounds := strings.SplitN(part, "-", 2)
|
|
lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0]))
|
|
hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1]))
|
|
if err1 == nil && err2 == nil && code >= lo && code <= hi {
|
|
return true
|
|
}
|
|
} else {
|
|
if v, err := strconv.Atoi(part); err == nil && code == v {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (e *Engine) checkPing(site models.Site) {
|
|
host := site.Hostname
|
|
if host == "" {
|
|
host = site.URL
|
|
}
|
|
|
|
pinger, err := probing.NewPinger(host)
|
|
if err != nil {
|
|
e.handleStatusChange(site, "DOWN", 0, 0)
|
|
e.AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err))
|
|
return
|
|
}
|
|
pinger.Count = 1
|
|
pinger.Timeout = siteTimeout(site)
|
|
pinger.SetPrivileged(false)
|
|
|
|
start := time.Now()
|
|
err = pinger.Run()
|
|
latency := time.Since(start)
|
|
|
|
if err != nil || pinger.Statistics().PacketsRecv == 0 {
|
|
updatedSite := site
|
|
updatedSite.Latency = latency
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
|
return
|
|
}
|
|
|
|
stats := pinger.Statistics()
|
|
updatedSite := site
|
|
updatedSite.Latency = stats.AvgRtt
|
|
updatedSite.LastCheck = time.Now()
|
|
e.handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt)
|
|
}
|
|
|
|
func (e *Engine) checkPort(site models.Site) {
|
|
host := site.Hostname
|
|
if host == "" {
|
|
host = site.URL
|
|
}
|
|
addr := net.JoinHostPort(host, strconv.Itoa(site.Port))
|
|
timeout := siteTimeout(site)
|
|
|
|
start := time.Now()
|
|
conn, err := net.DialTimeout("tcp", addr, timeout)
|
|
latency := time.Since(start)
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = latency
|
|
updatedSite.LastCheck = time.Now()
|
|
|
|
if err != nil {
|
|
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
|
return
|
|
}
|
|
conn.Close()
|
|
e.handleStatusChange(updatedSite, "UP", 0, latency)
|
|
}
|
|
|
|
func (e *Engine) checkGroup(site models.Site) {
|
|
e.mu.RLock()
|
|
status := "UP"
|
|
hasChildren := false
|
|
allPaused := true
|
|
for _, child := range e.liveState {
|
|
if child.ParentID != site.ID || child.Type == "group" {
|
|
continue
|
|
}
|
|
hasChildren = true
|
|
if !child.Paused {
|
|
allPaused = false
|
|
}
|
|
if child.Paused {
|
|
continue
|
|
}
|
|
if child.Status == "DOWN" || child.Status == "SSL EXP" {
|
|
status = "DOWN"
|
|
} else if child.Status == "PENDING" && status != "DOWN" {
|
|
status = "PENDING"
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if !hasChildren {
|
|
status = "PENDING"
|
|
}
|
|
|
|
e.mu.Lock()
|
|
s := e.liveState[site.ID]
|
|
s.Status = status
|
|
if hasChildren && allPaused {
|
|
s.Paused = true
|
|
}
|
|
e.liveState[site.ID] = s
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) checkDNS(site models.Site) {
|
|
host := site.Hostname
|
|
if host == "" {
|
|
host = site.URL
|
|
}
|
|
|
|
server := site.DNSServer
|
|
if server == "" {
|
|
server = "1.1.1.1"
|
|
}
|
|
if _, _, err := net.SplitHostPort(server); err != nil {
|
|
server = net.JoinHostPort(server, "53")
|
|
}
|
|
|
|
qtype := dns.TypeA
|
|
switch site.DNSResolveType {
|
|
case "AAAA":
|
|
qtype = dns.TypeAAAA
|
|
case "MX":
|
|
qtype = dns.TypeMX
|
|
case "CNAME":
|
|
qtype = dns.TypeCNAME
|
|
case "TXT":
|
|
qtype = dns.TypeTXT
|
|
case "NS":
|
|
qtype = dns.TypeNS
|
|
case "SOA":
|
|
qtype = dns.TypeSOA
|
|
case "SRV":
|
|
qtype = dns.TypeSRV
|
|
case "PTR":
|
|
qtype = dns.TypePTR
|
|
}
|
|
|
|
m := new(dns.Msg)
|
|
m.SetQuestion(dns.Fqdn(host), qtype)
|
|
|
|
c := new(dns.Client)
|
|
c.Timeout = siteTimeout(site)
|
|
|
|
start := time.Now()
|
|
r, _, err := c.Exchange(m, server)
|
|
latency := time.Since(start)
|
|
|
|
updatedSite := site
|
|
updatedSite.Latency = latency
|
|
updatedSite.LastCheck = time.Now()
|
|
|
|
if err != nil {
|
|
e.handleStatusChange(updatedSite, "DOWN", 0, latency)
|
|
return
|
|
}
|
|
|
|
if r.Rcode != dns.RcodeSuccess {
|
|
e.handleStatusChange(updatedSite, "DOWN", r.Rcode, latency)
|
|
return
|
|
}
|
|
|
|
e.handleStatusChange(updatedSite, "UP", 0, latency)
|
|
}
|