refactor: config consolidation + Server type extraction #108

Merged
lerko merged 2 commits from refactor/config-server into main 2026-06-11 20:42:32 +00:00
3 changed files with 615 additions and 520 deletions
+133
View File
@@ -0,0 +1,133 @@
package main
import (
"net"
"os"
"strconv"
"time"
"gitea.lerkolabs.com/lerkolabs/uptop/internal/server"
)
type appConfig struct {
Port int
SSHHostKey string
DBType string
DBDSN string
HTTPPort int
TLSCert string
TLSKey string
StatusEnabled bool
StatusTitle string
ClusterMode string
ClusterSecret string
PeerURL string
NodeID string
NodeName string
NodeRegion string
AggStrategy string
AllowPrivateTargets bool
InsecureSkipVerify bool
MaintRetention time.Duration
EncryptionKey string
MetricsPublic bool
CORSOrigin string
TrustedProxies []*net.IPNet
AdminKey string
KeysFile string
}
func parseConfig() appConfig {
cfg := appConfig{
Port: 23234,
SSHHostKey: ".ssh/id_ed25519",
DBType: "sqlite",
DBDSN: "uptop.db",
HTTPPort: 8080,
StatusTitle: "System Status",
ClusterMode: "leader",
MaintRetention: 7 * 24 * time.Hour,
}
if v := os.Getenv("UPTOP_PORT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
cfg.Port = n
}
}
if v := os.Getenv("UPTOP_DB_TYPE"); v != "" {
cfg.DBType = v
}
if v := os.Getenv("UPTOP_DB_DSN"); v != "" {
cfg.DBDSN = v
}
if v := os.Getenv("UPTOP_HTTP_PORT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
cfg.HTTPPort = n
}
}
if os.Getenv("UPTOP_STATUS_ENABLED") == "true" {
cfg.StatusEnabled = true
}
if v := os.Getenv("UPTOP_STATUS_TITLE"); v != "" {
cfg.StatusTitle = v
}
if v := os.Getenv("UPTOP_CLUSTER_MODE"); v != "" {
cfg.ClusterMode = v
}
if v := os.Getenv("UPTOP_PEER_URL"); v != "" {
cfg.PeerURL = v
}
if v := os.Getenv("UPTOP_CLUSTER_SECRET"); v != "" {
cfg.ClusterSecret = v
}
cfg.NodeID = os.Getenv("UPTOP_NODE_ID")
cfg.NodeName = os.Getenv("UPTOP_NODE_NAME")
cfg.NodeRegion = os.Getenv("UPTOP_NODE_REGION")
cfg.AggStrategy = os.Getenv("UPTOP_AGG_STRATEGY")
cfg.AllowPrivateTargets = os.Getenv("UPTOP_ALLOW_PRIVATE_TARGETS") == "true"
cfg.InsecureSkipVerify = os.Getenv("UPTOP_INSECURE_SKIP_VERIFY") == "true"
cfg.MetricsPublic = os.Getenv("UPTOP_METRICS_PUBLIC") == "true"
cfg.EncryptionKey = os.Getenv("UPTOP_ENCRYPTION_KEY")
cfg.TLSCert = os.Getenv("UPTOP_TLS_CERT")
cfg.TLSKey = os.Getenv("UPTOP_TLS_KEY")
cfg.CORSOrigin = os.Getenv("UPTOP_CORS_ORIGIN")
cfg.TrustedProxies = parseTrustedProxies(os.Getenv("UPTOP_TRUSTED_PROXIES"))
cfg.SSHHostKey = envOrDefault("UPTOP_SSH_HOST_KEY", cfg.SSHHostKey)
cfg.AdminKey = os.Getenv("UPTOP_ADMIN_KEY")
cfg.KeysFile = os.Getenv("UPTOP_KEYS")
if v := os.Getenv("UPTOP_MAINT_RETENTION"); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
cfg.MaintRetention = d
}
}
return cfg
}
func (c appConfig) serverConfig(quietHTTPLog bool) server.ServerConfig {
return server.ServerConfig{
Port: c.HTTPPort,
EnableStatus: c.StatusEnabled,
Title: c.StatusTitle,
ClusterKey: c.ClusterSecret,
TLSCert: c.TLSCert,
TLSKey: c.TLSKey,
ClusterMode: c.ClusterMode,
MetricsPublic: c.MetricsPublic,
CORSOrigin: c.CORSOrigin,
TrustedProxies: c.TrustedProxies,
QuietHTTPLog: quietHTTPLog,
}
}
+27 -98
View File
@@ -12,7 +12,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
@@ -255,64 +254,19 @@ func runMigrateSecrets(args []string) {
} }
func runServe(args []string) { func runServe(args []string) {
portVal := 23234 cfg := parseConfig()
dbType := "sqlite"
dbDSN := "uptop.db"
httpPort := 8080
enableStatus := false
statusTitle := "System Status"
clusterMode := "leader"
clusterPeer := ""
clusterKey := ""
if v := os.Getenv("UPTOP_PORT"); v != "" { if cfg.ClusterMode == "probe" {
if p, err := strconv.Atoi(v); err == nil { if cfg.NodeID == "" {
portVal = p
}
}
if v := os.Getenv("UPTOP_DB_TYPE"); v != "" {
dbType = v
}
if v := os.Getenv("UPTOP_DB_DSN"); v != "" {
dbDSN = v
}
if v := os.Getenv("UPTOP_HTTP_PORT"); v != "" {
if p, err := strconv.Atoi(v); err == nil {
httpPort = p
}
}
if v := os.Getenv("UPTOP_STATUS_ENABLED"); v == "true" {
enableStatus = true
}
if v := os.Getenv("UPTOP_STATUS_TITLE"); v != "" {
statusTitle = v
}
if v := os.Getenv("UPTOP_CLUSTER_MODE"); v != "" {
clusterMode = v
}
if v := os.Getenv("UPTOP_PEER_URL"); v != "" {
clusterPeer = v
}
if v := os.Getenv("UPTOP_CLUSTER_SECRET"); v != "" {
clusterKey = v
}
nodeID := os.Getenv("UPTOP_NODE_ID")
nodeName := os.Getenv("UPTOP_NODE_NAME")
nodeRegion := os.Getenv("UPTOP_NODE_REGION")
aggStrategy := os.Getenv("UPTOP_AGG_STRATEGY")
if clusterMode == "probe" {
if nodeID == "" {
fmt.Fprintln(os.Stderr, "UPTOP_NODE_ID is required for probe mode") fmt.Fprintln(os.Stderr, "UPTOP_NODE_ID is required for probe mode")
os.Exit(1) os.Exit(1)
} }
if clusterPeer == "" { if cfg.PeerURL == "" {
fmt.Fprintln(os.Stderr, "UPTOP_PEER_URL is required for probe mode") fmt.Fprintln(os.Stderr, "UPTOP_PEER_URL is required for probe mode")
os.Exit(1) os.Exit(1)
} }
fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", nodeID, nodeRegion) fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", cfg.NodeID, cfg.NodeRegion)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@@ -323,19 +277,18 @@ func runServe(args []string) {
cancel() cancel()
}() }()
probeAllowPrivate := os.Getenv("UPTOP_ALLOW_PRIVATE_TARGETS") == "true" if cfg.AllowPrivateTargets {
if probeAllowPrivate {
fmt.Println("WARNING: Private target blocking disabled. Monitor URLs can reach internal networks.") fmt.Println("WARNING: Private target blocking disabled. Monitor URLs can reach internal networks.")
} }
if err := cluster.RunProbe(ctx, cluster.ProbeConfig{ if err := cluster.RunProbe(ctx, cluster.ProbeConfig{
NodeID: nodeID, NodeID: cfg.NodeID,
NodeName: nodeName, NodeName: cfg.NodeName,
Region: nodeRegion, Region: cfg.NodeRegion,
LeaderURL: clusterPeer, LeaderURL: cfg.PeerURL,
SharedKey: clusterKey, SharedKey: cfg.ClusterSecret,
Interval: 30, Interval: 30,
AllowPrivateTargets: probeAllowPrivate, AllowPrivateTargets: cfg.AllowPrivateTargets,
}); err != nil { }); err != nil {
fmt.Fprintf(os.Stderr, "Probe error: %v\n", err) fmt.Fprintf(os.Stderr, "Probe error: %v\n", err)
} }
@@ -343,9 +296,9 @@ func runServe(args []string) {
} }
fs := flag.NewFlagSet("serve", flag.ExitOnError) fs := flag.NewFlagSet("serve", flag.ExitOnError)
port := fs.Int("port", portVal, "SSH Port") port := fs.Int("port", cfg.Port, "SSH Port")
flagDBType := fs.String("db-type", dbType, "Database type") flagDBType := fs.String("db-type", cfg.DBType, "Database type")
flagDSN := fs.String("dsn", dbDSN, "Database DSN") flagDSN := fs.String("dsn", cfg.DBDSN, "Database DSN")
demo := fs.Bool("demo", false, "Seed demo data") demo := fs.Bool("demo", false, "Seed demo data")
importKuma := fs.String("import-kuma", "", "Import Uptime Kuma backup JSON file") importKuma := fs.String("import-kuma", "", "Import Uptime Kuma backup JSON file")
_ = fs.Parse(args) // ExitOnError: parse errors exit before returning _ = fs.Parse(args) // ExitOnError: parse errors exit before returning
@@ -365,8 +318,8 @@ func runServe(args []string) {
} }
defer ss.Close() defer ss.Close()
if encKey := os.Getenv("UPTOP_ENCRYPTION_KEY"); encKey != "" { if cfg.EncryptionKey != "" {
enc, err := store.NewEncryptor(encKey) enc, err := store.NewEncryptor(cfg.EncryptionKey)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "encryption key error: %v\n", err) fmt.Fprintf(os.Stderr, "encryption key error: %v\n", err)
os.Exit(1) os.Exit(1)
@@ -402,23 +355,18 @@ func runServe(args []string) {
fmt.Printf("Imported %d monitors and %d alerts from Uptime Kuma v%s\n", len(backup.Sites), len(backup.Alerts), kb.Version) fmt.Printf("Imported %d monitors and %d alerts from Uptime Kuma v%s\n", len(backup.Sites), len(backup.Alerts), kb.Version)
} }
allowPrivate := os.Getenv("UPTOP_ALLOW_PRIVATE_TARGETS") == "true" if cfg.AllowPrivateTargets {
if allowPrivate {
fmt.Println("WARNING: Private target blocking disabled. Monitor URLs can reach internal networks.") fmt.Println("WARNING: Private target blocking disabled. Monitor URLs can reach internal networks.")
} }
eng := monitor.NewEngineWithOpts(s, allowPrivate) eng := monitor.NewEngineWithOpts(s, cfg.AllowPrivateTargets)
if os.Getenv("UPTOP_INSECURE_SKIP_VERIFY") == "true" { if cfg.InsecureSkipVerify {
eng.SetInsecureSkipVerify(true) eng.SetInsecureSkipVerify(true)
} }
if aggStrategy != "" { if cfg.AggStrategy != "" {
eng.SetAggStrategy(monitor.AggregationStrategy(aggStrategy)) eng.SetAggStrategy(monitor.AggregationStrategy(cfg.AggStrategy))
}
if v := os.Getenv("UPTOP_MAINT_RETENTION"); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
eng.SetMaintRetention(d)
}
} }
eng.SetMaintRetention(cfg.MaintRetention)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@@ -428,31 +376,14 @@ func runServe(args []string) {
eng.InitAlertHealth() eng.InitAlertHealth()
eng.Start(ctx) eng.Start(ctx)
tlsCert := os.Getenv("UPTOP_TLS_CERT")
tlsKey := os.Getenv("UPTOP_TLS_KEY")
// When the local TUI owns the terminal, per-request HTTP logs to stderr
// would scribble over the alt screen.
localTUI := isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd()) localTUI := isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd())
httpSrv := server.Start(server.ServerConfig{ httpSrv := server.Start(cfg.serverConfig(localTUI), s, eng)
Port: httpPort,
EnableStatus: enableStatus,
Title: statusTitle,
ClusterKey: clusterKey,
TLSCert: tlsCert,
TLSKey: tlsKey,
ClusterMode: clusterMode,
MetricsPublic: os.Getenv("UPTOP_METRICS_PUBLIC") == "true",
CORSOrigin: os.Getenv("UPTOP_CORS_ORIGIN"),
TrustedProxies: parseTrustedProxies(os.Getenv("UPTOP_TRUSTED_PROXIES")),
QuietHTTPLog: localTUI,
}, s, eng)
cluster.Start(ctx, cluster.Config{ cluster.Start(ctx, cluster.Config{
Mode: clusterMode, Mode: cfg.ClusterMode,
PeerURL: clusterPeer, PeerURL: cfg.PeerURL,
SharedKey: clusterKey, SharedKey: cfg.ClusterSecret,
}, eng) }, eng)
sshSrv := startSSHServer(*port, s, eng, kc) sshSrv := startSSHServer(*port, s, eng, kc)
@@ -471,8 +402,6 @@ func runServe(args []string) {
} }
cancel() cancel()
// Drain pending DB writes before the deferred ss.Close() runs, so no
// write races a closed database.
eng.Stop() eng.Stop()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
+455 -422
View File
@@ -21,6 +21,388 @@ import (
const maxRequestBody = 1 << 20 const maxRequestBody = 1 << 20
type ServerConfig struct {
Port int
EnableStatus bool
Title string
ClusterKey string
TLSCert string
TLSKey string
ClusterMode string
MetricsPublic bool
CORSOrigin string
TrustedProxies []*net.IPNet
QuietHTTPLog bool
}
type Server struct {
cfg ServerConfig
store store.Store
eng *monitor.Engine
pushRL *RateLimiter
probeRL *RateLimiter
backupRL *RateLimiter
statusRL *RateLimiter
}
func NewServer(cfg ServerConfig, s store.Store, eng *monitor.Engine) *Server {
return &Server{
cfg: cfg,
store: s,
eng: eng,
pushRL: NewRateLimiter(60, cfg.TrustedProxies),
probeRL: NewRateLimiter(30, cfg.TrustedProxies),
backupRL: NewRateLimiter(10, cfg.TrustedProxies),
statusRL: NewRateLimiter(120, cfg.TrustedProxies),
}
}
func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) *http.Server {
srv := NewServer(cfg, s, eng)
return srv.Start()
}
func (s *Server) Start() *http.Server {
if s.cfg.ClusterKey == "" {
fmt.Println("WARNING: No UPTOP_CLUSTER_SECRET set. Cluster API endpoints are unauthenticated.")
}
if s.cfg.ClusterMode != "" && s.cfg.ClusterMode != "leader" && s.cfg.TLSCert == "" {
fmt.Println("WARNING: Cluster mode active without TLS. Secrets transmitted in cleartext.")
}
handler := s.routes()
addr := fmt.Sprintf(":%d", s.cfg.Port)
httpSrv := &http.Server{
Addr: addr,
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
}
go func() {
if s.cfg.TLSCert != "" && s.cfg.TLSKey != "" {
fmt.Printf("HTTPS Server listening on %s\n", addr)
if err := httpSrv.ListenAndServeTLS(s.cfg.TLSCert, s.cfg.TLSKey); err != nil && err != http.ErrServerClosed {
log.Printf("HTTPS server error: %v", err)
}
} else {
fmt.Printf("HTTP Server listening on %s\n", addr)
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("HTTP server error: %v", err)
}
}
}()
return httpSrv
}
func (s *Server) routes() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/api/push", RateLimit(s.pushRL, s.handlePush))
mux.HandleFunc("/api/health", s.handleHealth)
mux.HandleFunc("/api/backup/export", RateLimit(s.backupRL, s.handleExport))
mux.HandleFunc("/api/backup/import", RateLimit(s.backupRL, s.handleImport))
mux.HandleFunc("/api/import/kuma", RateLimit(s.backupRL, s.handleKumaImport))
mux.HandleFunc("/api/probe/register", RateLimit(s.probeRL, s.handleProbeRegister))
mux.HandleFunc("/api/probe/assignments", RateLimit(s.probeRL, s.handleProbeAssignments))
mux.HandleFunc("/api/probe/results", RateLimit(s.probeRL, s.handleProbeResults))
mux.HandleFunc("/metrics", s.handleMetrics)
if s.cfg.EnableStatus {
mux.HandleFunc("/status", RateLimit(s.statusRL, s.handleStatus))
mux.HandleFunc("/status/json", RateLimit(s.statusRL, s.handleStatusJSON))
}
handler := securityHeadersMiddleware(mux)
if !s.cfg.QuietHTTPLog {
handler = loggingMiddleware(s.cfg.TrustedProxies, handler)
}
if s.cfg.TLSCert != "" {
handler = hstsMiddleware(handler)
}
return handler
}
func (s *Server) requireAuth(r *http.Request) bool {
return s.cfg.ClusterKey != "" && checkSecret(r.Header.Get("X-Upkeep-Secret"), s.cfg.ClusterKey)
}
func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
token := extractBearerToken(r)
if token == "" {
if qt := r.URL.Query().Get("token"); qt != "" {
token = qt
log.Printf("DEPRECATED: push token in query string — use Authorization: Bearer header instead")
}
}
if token == "" {
http.Error(w, "Missing token", http.StatusBadRequest)
return
}
if s.eng.RecordHeartbeat(token) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
} else {
http.Error(w, "Invalid Token", http.StatusNotFound)
}
}
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if s.cfg.ClusterKey != "" && !checkSecret(r.Header.Get("X-Upkeep-Secret"), s.cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}
func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
if !s.requireAuth(r) {
http.Error(w, "Unauthorized: UPTOP_CLUSTER_SECRET required", http.StatusUnauthorized)
return
}
data, err := s.store.ExportData(r.Context())
if err != nil {
log.Printf("Export failed: %v", err)
http.Error(w, "Export failed", http.StatusInternalServerError)
return
}
if r.URL.Query().Get("redact_secrets") != "false" {
for i := range data.Alerts {
data.Alerts[i].Settings = models.RedactAlertSettings(data.Alerts[i].Type, data.Alerts[i].Settings)
}
}
_ = json.NewEncoder(w).Encode(data) //nolint:errcheck
}
func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if !s.requireAuth(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var data models.Backup
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if err := s.store.ImportData(r.Context(), data); err != nil {
log.Printf("Import failed: %v", err)
http.Error(w, "Import failed", http.StatusInternalServerError)
return
}
_, _ = w.Write([]byte("Import Successful"))
}
func (s *Server) handleKumaImport(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if !s.requireAuth(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var kb importer.KumaBackup
if err := json.NewDecoder(r.Body).Decode(&kb); err != nil {
log.Printf("Invalid Kuma JSON: %v", err)
http.Error(w, "Invalid Kuma JSON", http.StatusBadRequest)
return
}
backup := importer.ConvertKuma(&kb)
if err := s.store.ImportData(r.Context(), backup); err != nil {
log.Printf("Kuma import failed: %v", err)
http.Error(w, "Import failed", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version)
}
func (s *Server) handleProbeRegister(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if !s.requireAuth(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var req struct {
ID string `json:"id"`
Name string `json:"name"`
Region string `json:"region"`
Version string `json:"version"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if err := s.store.RegisterNode(r.Context(), models.ProbeNode{
ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version,
}); err != nil {
log.Printf("Probe register failed: %v", err)
http.Error(w, "Registration failed", http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) //nolint:errcheck
}
func (s *Server) handleProbeAssignments(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if !s.requireAuth(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
nodeID := r.URL.Query().Get("node_id")
var nodeRegion string
if nodeID != "" {
if node, err := s.store.GetNode(r.Context(), nodeID); err == nil {
nodeRegion = node.Region
}
}
sites := s.eng.GetAllSites()
var assigned []models.Site
for _, site := range sites {
if site.Paused || site.Type == "push" || site.Type == "group" {
continue
}
if site.Regions != "" && nodeRegion != "" {
matched := false
for _, reg := range strings.Split(site.Regions, ",") {
if strings.TrimSpace(reg) == nodeRegion {
matched = true
break
}
}
if !matched {
continue
}
}
assigned = append(assigned, site)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned}) //nolint:errcheck
}
func (s *Server) handleProbeResults(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if !s.requireAuth(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var req struct {
NodeID string `json:"node_id"`
Results []struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
ErrorReason string `json:"error_reason"`
} `json:"results"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if req.NodeID == "" {
http.Error(w, "node_id is required", http.StatusBadRequest)
return
}
for _, result := range req.Results {
s.eng.EnqueueProbeCheck(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp)
s.eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp, result.ErrorReason)
}
if err := s.store.UpdateNodeLastSeen(r.Context(), req.NodeID); err != nil {
log.Printf("Failed to update node last seen: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) //nolint:errcheck
}
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if !s.cfg.MetricsPublic && s.cfg.ClusterKey != "" {
if !checkSecret(r.Header.Get("X-Upkeep-Secret"), s.cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
}
metrics.Handler(s.eng)(w, r)
}
func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
renderStatusPage(w, s.cfg.Title, s.eng)
}
func (s *Server) handleStatusJSON(w http.ResponseWriter, r *http.Request) {
state := s.eng.GetLiveState()
activeWindows, _ := s.store.GetActiveMaintenanceWindows(r.Context())
maintSet := make(map[int]bool)
allInMaint := false
for _, mw := range activeWindows {
if mw.Type != "maintenance" {
continue
}
if mw.MonitorID == 0 {
allInMaint = true
} else {
maintSet[mw.MonitorID] = true
}
}
public := make(map[int]statusSite, len(state))
for id, site := range state {
displayStatus := string(site.Status)
if allInMaint || maintSet[site.ID] || (site.ParentID > 0 && maintSet[site.ParentID]) {
displayStatus = "MAINT"
}
public[id] = statusSite{
Name: site.Name,
Type: site.Type,
URL: site.URL,
Status: displayStatus,
Paused: site.Paused,
LastCheck: site.LastCheck,
Latency: site.Latency,
}
}
if s.cfg.CORSOrigin != "" {
w.Header().Set("Access-Control-Allow-Origin", s.cfg.CORSOrigin)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(public) //nolint:errcheck
}
// --- Helpers ---
func checkSecret(got, want string) bool { func checkSecret(got, want string) bool {
return subtle.ConstantTimeCompare([]byte(got), []byte(want)) == 1 return subtle.ConstantTimeCompare([]byte(got), []byte(want)) == 1
} }
@@ -33,8 +415,79 @@ func extractBearerToken(r *http.Request) string {
return "" return ""
} }
// Alert-settings redaction policy lives in models.RedactAlertSettings so the // statusSite is the public DTO for /status/json.
// TUI detail panel and this export path share one allowlist. type statusSite struct {
Name string
Type string
URL string
Status string
Paused bool
LastCheck time.Time
Latency time.Duration
}
// --- Middleware ---
type statusWriter struct {
http.ResponseWriter
code int
}
func (w *statusWriter) WriteHeader(code int) {
w.code = code
w.ResponseWriter.WriteHeader(code)
}
func loggingMiddleware(trusted []*net.IPNet, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
sw := &statusWriter{ResponseWriter: w, code: 200}
next.ServeHTTP(sw, r)
path := strings.ReplaceAll(strings.ReplaceAll(r.URL.Path, "\n", ""), "\r", "")
log.Printf("%s %s %d %s %s", r.Method, path, sw.code, time.Since(start).Round(time.Millisecond), clientIP(r, trusted)) //nolint:gosec // path sanitized above
})
}
func securityHeadersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("Referrer-Policy", "no-referrer")
w.Header().Set("Content-Security-Policy", "default-src 'self'; script-src 'unsafe-inline'; style-src 'unsafe-inline'")
next.ServeHTTP(w, r)
})
}
func hstsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Strict-Transport-Security", "max-age=63072000; includeSubDomains")
next.ServeHTTP(w, r)
})
}
func renderStatusPage(w http.ResponseWriter, title string, eng *monitor.Engine) {
sites := eng.GetAllSites()
sort.Slice(sites, func(i, j int) bool {
if sites[i].Status != sites[j].Status {
if sites[i].Status == models.StatusDown {
return true
}
if sites[j].Status == models.StatusDown {
return false
}
}
return sites[i].Name < sites[j].Name
})
data := struct {
Title string
Sites []models.Site
}{Title: title, Sites: sites}
if err := statusTpl.Execute(w, data); err != nil {
log.Printf("Failed to render status page: %v", err)
}
}
var statusTpl = template.Must(template.New("status").Parse(` var statusTpl = template.Must(template.New("status").Parse(`
<!DOCTYPE html> <!DOCTYPE html>
@@ -167,423 +620,3 @@ var statusTpl = template.Must(template.New("status").Parse(`
</script> </script>
</body> </body>
</html>`)) </html>`))
type ServerConfig struct {
Port int
EnableStatus bool
Title string
ClusterKey string
TLSCert string
TLSKey string
ClusterMode string
MetricsPublic bool
CORSOrigin string
TrustedProxies []*net.IPNet
// QuietHTTPLog disables per-request stderr logging. Set when the local
// TUI owns the terminal — request logs would scribble over the alt screen.
QuietHTTPLog bool
}
// statusSite is the public DTO for /status/json. models.Site must never be
// serialized raw here: it carries internal fields (LastError, Hostname, Port,
// DNSServer, AlertID, Token, ...) and every field added to it would become
// public by default. Field names match what the status page JS reads.
type statusSite struct {
Name string
Type string
URL string
Status string
Paused bool
LastCheck time.Time
Latency time.Duration
}
func Start(cfg ServerConfig, s store.Store, eng *monitor.Engine) *http.Server {
if cfg.ClusterKey == "" {
fmt.Println("WARNING: No UPTOP_CLUSTER_SECRET set. Cluster API endpoints are unauthenticated.")
}
pushRL := NewRateLimiter(60, cfg.TrustedProxies)
probeRL := NewRateLimiter(30, cfg.TrustedProxies)
backupRL := NewRateLimiter(10, cfg.TrustedProxies)
statusRL := NewRateLimiter(120, cfg.TrustedProxies)
mux := http.NewServeMux()
// 1. Push Heartbeat
mux.HandleFunc("/api/push", RateLimit(pushRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
token := extractBearerToken(r)
if token == "" {
if qt := r.URL.Query().Get("token"); qt != "" {
token = qt
log.Printf("DEPRECATED: push token in query string — use Authorization: Bearer header instead")
}
}
if token == "" {
http.Error(w, "Missing token", http.StatusBadRequest)
return
}
if eng.RecordHeartbeat(token) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
} else {
http.Error(w, "Invalid Token", http.StatusNotFound)
}
}))
// 2. Health Check (For Cluster Follower)
mux.HandleFunc("/api/health", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey != "" && !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
// 3. Config Export
mux.HandleFunc("/api/backup/export", RateLimit(backupRL, func(w http.ResponseWriter, r *http.Request) {
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized: UPTOP_CLUSTER_SECRET required", http.StatusUnauthorized)
return
}
data, err := s.ExportData(r.Context())
if err != nil {
log.Printf("Export failed: %v", err)
http.Error(w, "Export failed", http.StatusInternalServerError)
return
}
if r.URL.Query().Get("redact_secrets") != "false" {
for i := range data.Alerts {
data.Alerts[i].Settings = models.RedactAlertSettings(data.Alerts[i].Type, data.Alerts[i].Settings)
}
}
_ = json.NewEncoder(w).Encode(data) //nolint:errcheck
}))
// 4. Config Import
mux.HandleFunc("/api/backup/import", RateLimit(backupRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var data models.Backup
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if err := s.ImportData(r.Context(), data); err != nil {
log.Printf("Import failed: %v", err)
http.Error(w, "Import failed", http.StatusInternalServerError)
return
}
_, _ = w.Write([]byte("Import Successful"))
}))
// 5. Kuma Import
mux.HandleFunc("/api/import/kuma", RateLimit(backupRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var kb importer.KumaBackup
if err := json.NewDecoder(r.Body).Decode(&kb); err != nil {
log.Printf("Invalid Kuma JSON: %v", err)
http.Error(w, "Invalid Kuma JSON", http.StatusBadRequest)
return
}
backup := importer.ConvertKuma(&kb)
if err := s.ImportData(r.Context(), backup); err != nil {
log.Printf("Kuma import failed: %v", err)
http.Error(w, "Import failed", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Imported %d monitors, %d alerts from Kuma v%s", len(backup.Sites), len(backup.Alerts), kb.Version)
}))
// 6. Probe Registration
mux.HandleFunc("/api/probe/register", RateLimit(probeRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var req struct {
ID string `json:"id"`
Name string `json:"name"`
Region string `json:"region"`
Version string `json:"version"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if err := s.RegisterNode(r.Context(), models.ProbeNode{
ID: req.ID, Name: req.Name, Region: req.Region, Version: req.Version,
}); err != nil {
log.Printf("Probe register failed: %v", err)
http.Error(w, "Registration failed", http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) //nolint:errcheck
}))
// 7. Probe Assignment Fetch
mux.HandleFunc("/api/probe/assignments", RateLimit(probeRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
nodeID := r.URL.Query().Get("node_id")
var nodeRegion string
if nodeID != "" {
if node, err := s.GetNode(r.Context(), nodeID); err == nil {
nodeRegion = node.Region
}
}
sites := eng.GetAllSites()
var assigned []models.Site
for _, site := range sites {
if site.Paused || site.Type == "push" || site.Type == "group" {
continue
}
if site.Regions != "" && nodeRegion != "" {
matched := false
for _, r := range strings.Split(site.Regions, ",") {
if strings.TrimSpace(r) == nodeRegion {
matched = true
break
}
}
if !matched {
continue
}
}
assigned = append(assigned, site)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string][]models.Site{"sites": assigned}) //nolint:errcheck
}))
// 8. Probe Result Submission
mux.HandleFunc("/api/probe/results", RateLimit(probeRL, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
if cfg.ClusterKey == "" || !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBody)
var req struct {
NodeID string `json:"node_id"`
Results []struct {
SiteID int `json:"site_id"`
LatencyNs int64 `json:"latency_ns"`
IsUp bool `json:"is_up"`
ErrorReason string `json:"error_reason"`
} `json:"results"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if req.NodeID == "" {
http.Error(w, "node_id is required", http.StatusBadRequest)
return
}
for _, result := range req.Results {
eng.EnqueueProbeCheck(result.SiteID, req.NodeID, result.LatencyNs, result.IsUp)
eng.IngestProbeResult(req.NodeID, result.SiteID, result.LatencyNs, result.IsUp, result.ErrorReason)
}
if err := s.UpdateNodeLastSeen(r.Context(), req.NodeID); err != nil {
log.Printf("Failed to update node last seen: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) //nolint:errcheck
}))
// 9. Prometheus Metrics
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if !cfg.MetricsPublic && cfg.ClusterKey != "" {
if !checkSecret(r.Header.Get("X-Upkeep-Secret"), cfg.ClusterKey) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
}
metrics.Handler(eng)(w, r)
})
// 10. Status Page
if cfg.EnableStatus {
mux.HandleFunc("/status", RateLimit(statusRL, func(w http.ResponseWriter, r *http.Request) { renderStatusPage(w, cfg.Title, eng) }))
mux.HandleFunc("/status/json", RateLimit(statusRL, func(w http.ResponseWriter, r *http.Request) {
state := eng.GetLiveState()
activeWindows, _ := s.GetActiveMaintenanceWindows(r.Context())
maintSet := make(map[int]bool)
allInMaint := false
for _, mw := range activeWindows {
if mw.Type != "maintenance" {
continue
}
if mw.MonitorID == 0 {
allInMaint = true
} else {
maintSet[mw.MonitorID] = true
}
}
public := make(map[int]statusSite, len(state))
for id, site := range state {
displayStatus := string(site.Status)
if allInMaint || maintSet[site.ID] || (site.ParentID > 0 && maintSet[site.ParentID]) {
displayStatus = "MAINT"
}
public[id] = statusSite{
Name: site.Name,
Type: site.Type,
URL: site.URL,
Status: displayStatus,
Paused: site.Paused,
LastCheck: site.LastCheck,
Latency: site.Latency,
}
}
if cfg.CORSOrigin != "" {
w.Header().Set("Access-Control-Allow-Origin", cfg.CORSOrigin)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(public) //nolint:errcheck
}))
}
if cfg.ClusterMode != "" && cfg.ClusterMode != "leader" && cfg.TLSCert == "" {
fmt.Println("WARNING: Cluster mode active without TLS. Secrets transmitted in cleartext.")
}
handler := securityHeadersMiddleware(mux)
if !cfg.QuietHTTPLog {
handler = loggingMiddleware(cfg.TrustedProxies, handler)
}
if cfg.TLSCert != "" {
handler = hstsMiddleware(handler)
}
addr := fmt.Sprintf(":%d", cfg.Port)
srv := &http.Server{
Addr: addr,
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
}
go func() {
if cfg.TLSCert != "" && cfg.TLSKey != "" {
fmt.Printf("HTTPS Server listening on %s\n", addr)
if err := srv.ListenAndServeTLS(cfg.TLSCert, cfg.TLSKey); err != nil && err != http.ErrServerClosed {
log.Printf("HTTPS server error: %v", err)
}
} else {
fmt.Printf("HTTP Server listening on %s\n", addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("HTTP server error: %v", err)
}
}
}()
return srv
}
type statusWriter struct {
http.ResponseWriter
code int
}
func (w *statusWriter) WriteHeader(code int) {
w.code = code
w.ResponseWriter.WriteHeader(code)
}
func loggingMiddleware(trusted []*net.IPNet, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
sw := &statusWriter{ResponseWriter: w, code: 200}
next.ServeHTTP(sw, r)
path := strings.ReplaceAll(strings.ReplaceAll(r.URL.Path, "\n", ""), "\r", "")
log.Printf("%s %s %d %s %s", r.Method, path, sw.code, time.Since(start).Round(time.Millisecond), clientIP(r, trusted)) //nolint:gosec // path sanitized above
})
}
func securityHeadersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("Referrer-Policy", "no-referrer")
w.Header().Set("Content-Security-Policy", "default-src 'self'; script-src 'unsafe-inline'; style-src 'unsafe-inline'")
next.ServeHTTP(w, r)
})
}
func hstsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Strict-Transport-Security", "max-age=63072000; includeSubDomains")
next.ServeHTTP(w, r)
})
}
func renderStatusPage(w http.ResponseWriter, title string, eng *monitor.Engine) {
sites := eng.GetAllSites()
sort.Slice(sites, func(i, j int) bool {
if sites[i].Status != sites[j].Status {
if sites[i].Status == models.StatusDown {
return true
}
if sites[j].Status == models.StatusDown {
return false
}
}
return sites[i].Name < sites[j].Name
})
data := struct {
Title string
Sites []models.Site
}{Title: title, Sites: sites}
if err := statusTpl.Execute(w, data); err != nil {
log.Printf("Failed to render status page: %v", err)
}
}