diff --git a/.gitignore b/.gitignore index e1f6a8e..1fd01d6 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,5 @@ tmp # Old repo /go-upkeep/ -*.local.json \ No newline at end of file +*.local.json +*.local.md \ No newline at end of file diff --git a/README.md b/README.md index e7424e1..291c2de 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,63 @@ # Go-Upkeep -   +Self-hosted uptime monitor with a TUI you can access over SSH. No browser, no install on the client โ just `ssh -p 23234 your-server`. -**Go-Upkeep** is a self-hosted infrastructure monitor with a retro-futuristic TUI accessible via SSH. It supports High Availability, Push Monitoring, and Alerting. +Originally forked from [RDGames/go-upkeep](https://github.com/RDGames/go-upkeep). This is an independent fork with significant additions. -* ๐ **Full Documentation:** [goupkeep.org/docs](https://goupkeep.org/docs) -* ๐ณ **Docker Hub:** [rdgames1000/go-upkeep](https://hub.docker.com/r/rdgames1000/go-upkeep) +## What it does ---- +- **6 check types**: HTTP, Push (heartbeat), Ping, Port, DNS, Groups +- **9 alert providers**: Discord, Slack, Email, Ntfy, Webhook, Telegram, PagerDuty, Pushover, Gotify +- **Config as code**: define monitors in YAML, apply declaratively, version control your setup +- **HA clustering**: leader/follower with automatic failover +- **Prometheus metrics**: `/metrics` endpoint for Grafana dashboards +- **Public status page**: HTML + JSON, toggle with an env var +- **SQLite or Postgres**: SQLite for single-node, Postgres for production +- **Uptime Kuma import**: migrate from Kuma with one command -## ๐ Key Features +## Quick start -* **SSH Dashboard**: Zero-install client. Manage monitors via `ssh -p 23234 your-server`. -* **Protocols**: - * **HTTP/S**: Active polling with SSL certificate expiration tracking. - * **PUSH**: Heartbeat endpoints for cron jobs/backup scripts. -* **High Availability**: Leader/Follower clustering with automatic failover. -* **Alerting**: Native support for Discord, Slack, Email (SMTP), and Webhooks. -* **Backends**: SQLite (default) or PostgreSQL (production). - ---- - -## ๐ ๏ธ Quick Start (Local Dev) - -**Option A: Native Go (Fastest)** ```bash -go mod tidy go run cmd/goupkeep/main.go -# Connect: ssh -p 23234 localhost +ssh -p 23234 localhost ``` -**Option B: Docker Compose (Full Stack)** +Seed some demo data to see it in action: + ```bash -docker compose -f docker-compose.dev.yml up --build +go run cmd/goupkeep/main.go -demo ``` ---- +## Config as code -## ๐ฆ Production Deployment +Export your current monitors: -For critical infrastructure, we recommend Docker Compose. +```bash +goupkeep export -o monitors.yaml +``` -### 1. The Compose File -Create `docker-compose.yml`: +Apply a config file: + +```bash +goupkeep apply -f monitors.yaml +goupkeep apply -f monitors.yaml --dry-run # see what would change +goupkeep apply -f monitors.yaml --prune # delete anything not in the YAML +``` + +See [docs/config-as-code.md](docs/config-as-code.md) for the full reference. + +## Docker ```yaml services: monitor: - image: rdgames1000/go-upkeep:latest - container_name: go-upkeep + build: . restart: unless-stopped - stdin_open: true # Required for initial setup console + stdin_open: true tty: true ports: - - "23234:23234" # SSH - - "8080:8080" # HTTP (Status Page & Push) + - "23234:23234" + - "8080:8080" volumes: - ./data:/data - ./ssh_keys:/app/.ssh @@ -62,28 +65,26 @@ services: - UPKEEP_DB_TYPE=sqlite - UPKEEP_DB_DSN=/data/upkeep.db - UPKEEP_STATUS_ENABLED=true - - UPKEEP_CLUSTER_SECRET=ChangeMeToSomethingSecure + - UPKEEP_CLUSTER_SECRET=change-me ``` -### 2. Initial Setup (Identity Management) -**Important:** V2 stores SSH keys in the database. You must create the first user manually via the console. +First run: attach to the container (`docker attach go-upkeep`), go to the Users tab, add your SSH public key. Then detach with `Ctrl+P, Ctrl+Q` and connect normally over SSH. -1. Start the stack: `docker compose up -d` -2. Attach to the container: `docker attach go-upkeep` -3. Inside the TUI: - * Press **[Tab]** to select the `Users` tab. - * Press **[n]** to create a user. - * Enter your username and paste your public key (`cat ~/.ssh/id_ed25519.pub`). - * Press **[Enter]** to save. -4. Detach: Press **Ctrl+P** then **Ctrl+Q**. +## Environment variables -### 3. Usage -Connect using your standard SSH client: -```bash -ssh -p 23234 your-server-ip -``` +| Variable | Default | What it does | +|---|---|---| +| `UPKEEP_PORT` | `23234` | SSH server port | +| `UPKEEP_HTTP_PORT` | `8080` | HTTP server port (status page, push, metrics) | +| `UPKEEP_DB_TYPE` | `sqlite` | `sqlite` or `postgres` | +| `UPKEEP_DB_DSN` | `upkeep.db` | Database path or connection string | +| `UPKEEP_STATUS_ENABLED` | `false` | Enable public status page | +| `UPKEEP_STATUS_TITLE` | `System Status` | Status page title | +| `UPKEEP_CLUSTER_MODE` | `leader` | `leader` or `follower` | +| `UPKEEP_PEER_URL` | | Leader URL for follower nodes | +| `UPKEEP_CLUSTER_SECRET` | | Shared key for cluster + API auth | +| `UPKEEP_INSECURE_SKIP_VERIFY` | `false` | Skip TLS verification for checks | -For advanced setups (Postgres, Clustering, Migration), please consult the [Official Documentation](https://goupkeep.org/docs). +## License -## ๐ License -MIT License. \ No newline at end of file +MIT โ see [LICENSE](LICENSE). diff --git a/cmd/goupkeep/main.go b/cmd/goupkeep/main.go index 0ae9cb0..adec38f 100644 --- a/cmd/goupkeep/main.go +++ b/cmd/goupkeep/main.go @@ -1,9 +1,11 @@ package main import ( + "context" "flag" "fmt" "go-upkeep/internal/cluster" + "go-upkeep/internal/config" "go-upkeep/internal/importer" "go-upkeep/internal/models" "go-upkeep/internal/monitor" @@ -26,6 +28,102 @@ import ( func main() { log.SetOutput(os.Stderr) + if len(os.Args) >= 2 { + switch os.Args[1] { + case "apply": + runApply(os.Args[2:]) + return + case "export": + runExport(os.Args[2:]) + return + } + } + runServe(os.Args[1:]) +} + +func envOrDefault(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func openStore(dbType, dsn string) store.Store { + var s store.Store + var err error + if dbType == "postgres" { + s, err = store.NewPostgresStore(dsn) + } else { + s, err = store.NewSQLiteStore(dsn) + } + if err != nil { + fmt.Fprintf(os.Stderr, "database error: %v\n", err) + os.Exit(1) + } + if err := s.Init(); err != nil { + fmt.Fprintf(os.Stderr, "database init error: %v\n", err) + os.Exit(1) + } + return s +} + +func runApply(args []string) { + fs := flag.NewFlagSet("apply", flag.ExitOnError) + filePath := fs.String("f", "", "Path to YAML config file (required)") + dryRun := fs.Bool("dry-run", false, "Show planned changes without applying") + prune := fs.Bool("prune", false, "Delete monitors/alerts not in YAML") + dbType := fs.String("db-type", envOrDefault("UPKEEP_DB_TYPE", "sqlite"), "Database type") + dsn := fs.String("dsn", envOrDefault("UPKEEP_DB_DSN", "upkeep.db"), "Database DSN") + fs.Parse(args) + + if *filePath == "" { + fmt.Fprintln(os.Stderr, "error: -f flag is required") + fs.Usage() + os.Exit(1) + } + + s := openStore(*dbType, *dsn) + + f, err := config.LoadFile(*filePath) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + changes, err := config.Apply(s, f, config.ApplyOpts{ + DryRun: *dryRun, + Prune: *prune, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + fmt.Print(config.FormatChanges(changes, *dryRun)) +} + +func runExport(args []string) { + fs := flag.NewFlagSet("export", flag.ExitOnError) + outPath := fs.String("o", "-", "Output file path (- for stdout)") + dbType := fs.String("db-type", envOrDefault("UPKEEP_DB_TYPE", "sqlite"), "Database type") + dsn := fs.String("dsn", envOrDefault("UPKEEP_DB_DSN", "upkeep.db"), "Database DSN") + fs.Parse(args) + + s := openStore(*dbType, *dsn) + + f, err := config.Export(s) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + if err := config.WriteFile(f, *outPath); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func runServe(args []string) { portVal := 23234 dbType := "sqlite" dbDSN := "upkeep.db" @@ -58,7 +156,6 @@ func main() { if v := os.Getenv("UPKEEP_STATUS_TITLE"); v != "" { statusTitle = v } - if v := os.Getenv("UPKEEP_CLUSTER_MODE"); v != "" { clusterMode = v } @@ -68,32 +165,71 @@ func main() { if v := os.Getenv("UPKEEP_CLUSTER_SECRET"); v != "" { clusterKey = v } - if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" { - monitor.SetInsecureSkipVerify(true) + + nodeID := os.Getenv("UPKEEP_NODE_ID") + nodeName := os.Getenv("UPKEEP_NODE_NAME") + nodeRegion := os.Getenv("UPKEEP_NODE_REGION") + aggStrategy := os.Getenv("UPKEEP_AGG_STRATEGY") + + if clusterMode == "probe" { + if nodeID == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_NODE_ID is required for probe mode") + os.Exit(1) + } + if clusterPeer == "" { + fmt.Fprintln(os.Stderr, "UPKEEP_PEER_URL is required for probe mode") + os.Exit(1) + } + + fmt.Printf("Cluster: Running as PROBE (node=%s, region=%s)\n", nodeID, nodeRegion) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-done + cancel() + }() + + if err := cluster.RunProbe(ctx, cluster.ProbeConfig{ + NodeID: nodeID, + NodeName: nodeName, + Region: nodeRegion, + LeaderURL: clusterPeer, + SharedKey: clusterKey, + Interval: 30, + }); err != nil { + fmt.Fprintf(os.Stderr, "Probe error: %v\n", err) + } + return } - port := flag.Int("port", portVal, "SSH Port") - flagDBType := flag.String("db-type", dbType, "Database type") - flagDSN := flag.String("dsn", dbDSN, "Database DSN") - demo := flag.Bool("demo", false, "Seed demo data") - importKuma := flag.String("import-kuma", "", "Import Uptime Kuma backup JSON file") - flag.Parse() + fs := flag.NewFlagSet("serve", flag.ExitOnError) + port := fs.Int("port", portVal, "SSH Port") + flagDBType := fs.String("db-type", dbType, "Database type") + flagDSN := fs.String("dsn", dbDSN, "Database DSN") + demo := fs.Bool("demo", false, "Seed demo data") + importKuma := fs.String("import-kuma", "", "Import Uptime Kuma backup JSON file") + fs.Parse(args) var s store.Store + var dbErr error if *flagDBType == "postgres" { - s = &store.PostgresStore{ConnStr: *flagDSN} + s, dbErr = store.NewPostgresStore(*flagDSN) fmt.Printf("Using PostgreSQL: %s\n", *flagDSN) } else { - s = &store.SQLiteStore{DBPath: *flagDSN} + s, dbErr = store.NewSQLiteStore(*flagDSN) fmt.Printf("Using SQLite: %s\n", *flagDSN) } + if dbErr != nil { + fmt.Printf("Database connection error: %v\n", dbErr) + os.Exit(1) + } if err := s.Init(); err != nil { - fmt.Printf("Database Init Error: %v\n", err) + fmt.Printf("Database init error: %v\n", err) os.Exit(1) } - store.SetGlobal(s) - if *demo { seedDemoData(s) } @@ -112,25 +248,38 @@ func main() { fmt.Printf("Imported %d monitors and %d alerts from Uptime Kuma v%s\n", len(backup.Sites), len(backup.Alerts), kb.Version) } - monitor.StartEngine() + eng := monitor.NewEngine(s) + if os.Getenv("UPKEEP_INSECURE_SKIP_VERIFY") == "true" { + eng.SetInsecureSkipVerify(true) + } + if aggStrategy != "" { + eng.SetAggStrategy(monitor.AggregationStrategy(aggStrategy)) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + eng.InitHistory() + eng.InitLogs() + eng.Start(ctx) server.Start(server.ServerConfig{ Port: httpPort, EnableStatus: enableStatus, Title: statusTitle, ClusterKey: clusterKey, - }) + }, s, eng) - cluster.Start(cluster.Config{ + cluster.Start(ctx, cluster.Config{ Mode: clusterMode, PeerURL: clusterPeer, SharedKey: clusterKey, - }) + }, eng) - startSSHServer(*port) + startSSHServer(*port, s, eng) if isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd()) { - p := tea.NewProgram(tui.InitialModel(true), tea.WithAltScreen(), tea.WithMouseCellMotion()) + p := tea.NewProgram(tui.InitialModel(true, s, eng), tea.WithAltScreen(), tea.WithMouseCellMotion()) if _, err := p.Run(); err != nil { fmt.Printf("Error: %v\n", err) } @@ -141,18 +290,19 @@ func main() { <-done fmt.Println("Shutting down...") } + cancel() } -func startSSHServer(port int) { +func startSSHServer(port int, db store.Store, eng *monitor.Engine) { s, err := wish.NewServer( wish.WithAddress(fmt.Sprintf(":%d", port)), wish.WithHostKeyPath(".ssh/id_ed25519"), wish.WithPublicKeyAuth(func(ctx ssh.Context, key ssh.PublicKey) bool { - return isKeyAllowed(key) + return isKeyAllowed(db, key) }), wish.WithMiddleware( bm.Middleware(func(s ssh.Session) (tea.Model, []tea.ProgramOption) { - return tui.InitialModel(false), []tea.ProgramOption{tea.WithAltScreen(), tea.WithMouseCellMotion()} + return tui.InitialModel(false, db, eng), []tea.ProgramOption{tea.WithAltScreen(), tea.WithMouseCellMotion()} }), ), ) @@ -160,11 +310,16 @@ func startSSHServer(port int) { fmt.Printf("SSH server error: %v\n", err) return } - go func() { s.ListenAndServe() }() + go func() { + if err := s.ListenAndServe(); err != nil { + log.Fatalf("SSH server failed: %v", err) + } + }() } func seedDemoData(s store.Store) { - if existing := s.GetSites(); len(existing) > 0 { + existing, _ := s.GetSites() + if len(existing) > 0 { return } fmt.Println("Seeding demo data...") @@ -177,7 +332,7 @@ func seedDemoData(s store.Store) { "from": "oncall@example.com", "to": "team@example.com", }) - alerts := s.GetAllAlerts() + alerts, _ := s.GetAllAlerts() alertID := 0 if len(alerts) > 0 { alertID = alerts[0].ID @@ -195,8 +350,11 @@ func seedDemoData(s store.Store) { s.AddSite(models.Site{Name: "SSH Server", Type: "port", Interval: 60, AlertID: alertID, Hostname: "10.0.0.1", Port: 22, Timeout: 5, ExpiryThreshold: 7}) } -func isKeyAllowed(incomingKey ssh.PublicKey) bool { - users := store.Get().GetAllUsers() +func isKeyAllowed(db store.Store, incomingKey ssh.PublicKey) bool { + users, err := db.GetAllUsers() + if err != nil { + return false + } for _, u := range users { allowedKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(u.PublicKey)) if err != nil { diff --git a/docker-compose.probe.yml b/docker-compose.probe.yml new file mode 100644 index 0000000..791811f --- /dev/null +++ b/docker-compose.probe.yml @@ -0,0 +1,35 @@ +services: + leader: + build: . + environment: + - UPKEEP_CLUSTER_MODE=leader + - UPKEEP_CLUSTER_SECRET=changeme + - UPKEEP_AGG_STRATEGY=any-down + - UPKEEP_STATUS_ENABLED=true + ports: + - "8080:8080" + - "23234:23234" + + probe-us-east: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=us-east-1 + - UPKEEP_NODE_NAME=US East Probe + - UPKEEP_NODE_REGION=us-east + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader + + probe-eu-west: + build: . + environment: + - UPKEEP_CLUSTER_MODE=probe + - UPKEEP_NODE_ID=eu-west-1 + - UPKEEP_NODE_NAME=EU West Probe + - UPKEEP_NODE_REGION=eu-west + - UPKEEP_PEER_URL=http://leader:8080 + - UPKEEP_CLUSTER_SECRET=changeme + depends_on: + - leader diff --git a/docs/config-as-code.md b/docs/config-as-code.md new file mode 100644 index 0000000..1c9e8a1 --- /dev/null +++ b/docs/config-as-code.md @@ -0,0 +1,244 @@ +# Config as Code + +Define your monitors and alerts in a YAML file. Version control them, copy them between instances, or spin up a fresh setup in one command. + +## Quick start + +Export what you already have: + +```bash +goupkeep export -o monitors.yaml +``` + +That gives you a working file you can edit and re-apply: + +```bash +goupkeep apply -f monitors.yaml +``` + +That's it. Apply only creates or updates โ it won't delete anything unless you tell it to. + +## The YAML file + +Two top-level sections: `alerts` and `monitors`. Alerts go first because monitors reference them by name. + +```yaml +alerts: + - name: Discord Ops + type: discord + settings: + url: https://discord.com/api/webhooks/your/token + + - name: PagerDuty Critical + type: pagerduty + settings: + routing_key: your-integration-key + severity: critical + +monitors: + - name: API + type: http + url: https://api.example.com/health + interval: 30 + alert: Discord Ops + + - name: Production + type: group + alert: PagerDuty Critical + monitors: + - name: Prod Web + type: http + url: https://prod.example.com + interval: 15 + - name: Prod DB + type: port + hostname: db.internal + port: 5432 + interval: 30 +``` + +## Monitor types + +Each type has required fields. Everything else is optional with sensible defaults. + +**http** โ polls a URL +```yaml +- name: My API + type: http + url: https://api.example.com/health + interval: 30 +``` + +Optional: `method` (default GET), `accepted_codes` (default 200-299), `timeout`, `check_ssl`, `expiry_threshold` (default 7 days), `max_retries`, `ignore_tls`, `description`, `paused`. + +**ping** โ ICMP ping a host +```yaml +- name: Gateway + type: ping + hostname: 10.0.0.1 + interval: 30 +``` + +**port** โ check if a port is open +```yaml +- name: SSH Server + type: port + hostname: 10.0.0.1 + port: 22 + interval: 60 +``` + +**dns** โ resolve a hostname +```yaml +- name: DNS Check + type: dns + hostname: example.com + dns_resolve_type: A + dns_server: 1.1.1.1 + interval: 60 +``` + +**push** โ heartbeat endpoint for cron jobs +```yaml +- name: Nightly Backup + type: push + interval: 86400 +``` + +Push monitors get a token assigned automatically. Hit the push endpoint before the interval expires or it alerts. + +**group** โ organize monitors together +```yaml +- name: Production + type: group + monitors: + - name: Web + type: http + url: https://prod.example.com + interval: 15 +``` + +Groups can't nest inside other groups. A group is healthy when all its children are healthy. + +## Alert types + +All 9 providers work in the YAML. The `settings` map is different per type. + +```yaml +# Discord / Slack / Generic Webhook โ just a URL +- name: Discord Ops + type: discord + settings: + url: https://discord.com/api/webhooks/your/token + +# Email +- name: Email Oncall + type: email + settings: + host: smtp.example.com + port: "587" + user: oncall@example.com + pass: your-password + from: oncall@example.com + to: team@example.com + +# Ntfy +- name: Ntfy Alerts + type: ntfy + settings: + url: https://ntfy.sh + topic: my-alerts + priority: "4" + +# Telegram +- name: Telegram Ops + type: telegram + settings: + token: "123456:ABC-DEF..." + chat_id: "-1001234567890" + +# PagerDuty +- name: PD Critical + type: pagerduty + settings: + routing_key: your-integration-key + severity: critical + +# Pushover +- name: Pushover + type: pushover + settings: + token: app-token + user: user-key + +# Gotify +- name: Gotify + type: gotify + settings: + url: https://gotify.example.com + token: app-token + priority: "8" +``` + +## Commands + +**Export current state:** +```bash +goupkeep export -o monitors.yaml # to a file +goupkeep export # to stdout +``` + +**Apply a config:** +```bash +goupkeep apply -f monitors.yaml +``` + +**See what would change first:** +```bash +goupkeep apply -f monitors.yaml --dry-run +``` + +**Delete monitors not in the YAML:** +```bash +goupkeep apply -f monitors.yaml --prune +``` + +Without `--prune`, apply never deletes anything. It only creates and updates. + +**Pointing at a different database:** +```bash +goupkeep export -db-type postgres -dsn "host=localhost dbname=upkeep sslmode=disable" +goupkeep apply -f monitors.yaml -db-type postgres -dsn "..." +``` + +Both commands respect the `UPKEEP_DB_TYPE` and `UPKEEP_DB_DSN` environment variables too. + +## How apply works + +Monitors and alerts are matched by **name**. Names must be unique across the entire file. + +1. Alerts are resolved first (created or updated) +2. Groups are created next (so children can reference them) +3. Everything else is created or updated +4. If `--prune` is set, anything in the database that's not in the YAML gets deleted + +Apply is idempotent. Run it twice with the same file, second run changes nothing. + +If something fails mid-apply, just fix the issue and run it again. It picks up where it left off. + +## Typical workflow + +```bash +# set up your monitors in the TUI first, then export +goupkeep export -o monitors.yaml + +# commit it +git add monitors.yaml && git commit -m "add monitor config" + +# deploy to another instance +scp monitors.yaml prod-server: +ssh prod-server goupkeep apply -f monitors.yaml + +# or just keep it as a backup you can restore from +goupkeep apply -f monitors.yaml +``` diff --git a/go.mod b/go.mod index 7011fb1..38cc730 100644 --- a/go.mod +++ b/go.mod @@ -57,4 +57,5 @@ require ( golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect golang.org/x/tools v0.40.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 88b63c7..17c3eca 100644 --- a/go.sum +++ b/go.sum @@ -121,5 +121,6 @@ golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/alert/alert.go b/internal/alert/alert.go index 71b7570..c60013f 100644 --- a/internal/alert/alert.go +++ b/internal/alert/alert.go @@ -7,6 +7,7 @@ import ( "go-upkeep/internal/models" "net/http" "net/smtp" + "strconv" "strings" "time" ) @@ -17,15 +18,95 @@ type Provider interface { Send(title, message string) error } +type PayloadFunc func(title, message string) ([]byte, error) + +type HTTPProvider struct { + URL string + Payload PayloadFunc +} + +func (h *HTTPProvider) Send(title, message string) error { + body, err := h.Payload(title, message) + if err != nil { + return err + } + resp, err := alertClient.Post(h.URL, "application/json", bytes.NewBuffer(body)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + return fmt.Errorf("alert webhook returned HTTP %d", resp.StatusCode) + } + return nil +} + +func discordPayload(title, message string) ([]byte, error) { + return json.Marshal(map[string]string{"content": fmt.Sprintf("**%s**\n%s", title, message)}) +} + +func slackPayload(title, message string) ([]byte, error) { + return json.Marshal(map[string]string{"text": fmt.Sprintf("*%s*\n%s", title, message)}) +} + +func webhookPayload(title, message string) ([]byte, error) { + return json.Marshal(map[string]string{"title": title, "message": message, "status": "alert"}) +} + +func telegramPayload(chatID string) PayloadFunc { + return func(title, message string) ([]byte, error) { + return json.Marshal(map[string]string{ + "chat_id": chatID, + "text": fmt.Sprintf("*%s*\n%s", title, message), + "parse_mode": "Markdown", + }) + } +} + +func pagerdutyPayload(routingKey, severity string) PayloadFunc { + return func(title, message string) ([]byte, error) { + return json.Marshal(map[string]any{ + "routing_key": routingKey, + "event_action": "trigger", + "payload": map[string]string{ + "summary": fmt.Sprintf("%s: %s", title, message), + "source": "go-upkeep", + "severity": severity, + }, + }) + } +} + +func pushoverPayload(token, user string) PayloadFunc { + return func(title, message string) ([]byte, error) { + return json.Marshal(map[string]string{ + "token": token, + "user": user, + "title": title, + "message": message, + }) + } +} + +func gotifyPayload(priority string) PayloadFunc { + return func(title, message string) ([]byte, error) { + pri, _ := strconv.Atoi(priority) + return json.Marshal(map[string]any{ + "title": title, + "message": message, + "priority": pri, + }) + } +} + func GetProvider(cfg models.AlertConfig) Provider { switch cfg.Type { case "discord": - return &DiscordProvider{URL: cfg.Settings["url"]} + return &HTTPProvider{URL: cfg.Settings["url"], Payload: discordPayload} case "slack": - return &SlackProvider{URL: cfg.Settings["url"]} + return &HTTPProvider{URL: cfg.Settings["url"], Payload: slackPayload} case "webhook": - // Generic Webhook - return &WebhookProvider{URL: cfg.Settings["url"]} + return &HTTPProvider{URL: cfg.Settings["url"], Payload: webhookPayload} case "email": port := "25" if p, ok := cfg.Settings["port"]; ok { @@ -51,58 +132,40 @@ func GetProvider(cfg models.AlertConfig) Provider { Username: cfg.Settings["username"], Password: cfg.Settings["password"], } + case "telegram": + return &HTTPProvider{ + URL: fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", cfg.Settings["token"]), + Payload: telegramPayload(cfg.Settings["chat_id"]), + } + case "pagerduty": + severity := "critical" + if s, ok := cfg.Settings["severity"]; ok && s != "" { + severity = s + } + return &HTTPProvider{ + URL: "https://events.pagerduty.com/v2/enqueue", + Payload: pagerdutyPayload(cfg.Settings["routing_key"], severity), + } + case "pushover": + return &HTTPProvider{ + URL: "https://api.pushover.net/1/messages.json", + Payload: pushoverPayload(cfg.Settings["token"], cfg.Settings["user"]), + } + case "gotify": + priority := "5" + if p, ok := cfg.Settings["priority"]; ok && p != "" { + priority = p + } + serverURL := strings.TrimRight(cfg.Settings["url"], "/") + return &HTTPProvider{ + URL: fmt.Sprintf("%s/message?token=%s", serverURL, cfg.Settings["token"]), + Payload: gotifyPayload(priority), + } default: return nil } } -// --- DISCORD --- -type DiscordProvider struct{ URL string } - -func (d *DiscordProvider) Send(title, message string) error { - payload := map[string]string{"content": fmt.Sprintf("**%s**\n%s", title, message)} - jsonValue, _ := json.Marshal(payload) - resp, err := alertClient.Post(d.URL, "application/json", bytes.NewBuffer(jsonValue)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// --- SLACK --- -type SlackProvider struct{ URL string } - -func (s *SlackProvider) Send(title, message string) error { - payload := map[string]string{"text": fmt.Sprintf("*%s*\n%s", title, message)} - jsonValue, _ := json.Marshal(payload) - resp, err := alertClient.Post(s.URL, "application/json", bytes.NewBuffer(jsonValue)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// --- GENERIC WEBHOOK --- -type WebhookProvider struct{ URL string } - -func (w *WebhookProvider) Send(title, message string) error { - payload := map[string]string{ - "title": title, - "message": message, - "status": "alert", - } - jsonValue, _ := json.Marshal(payload) - resp, err := alertClient.Post(w.URL, "application/json", bytes.NewBuffer(jsonValue)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// --- EMAIL --- type EmailProvider struct { Host, Port, User, Pass, To, From string } @@ -139,6 +202,9 @@ func (n *NtfyProvider) Send(title, message string) error { if err != nil { return err } - resp.Body.Close() + defer resp.Body.Close() + if resp.StatusCode >= 400 { + return fmt.Errorf("ntfy returned HTTP %d", resp.StatusCode) + } return nil } diff --git a/internal/alert/alert_test.go b/internal/alert/alert_test.go new file mode 100644 index 0000000..35e1c8d --- /dev/null +++ b/internal/alert/alert_test.go @@ -0,0 +1,213 @@ +package alert + +import ( + "encoding/json" + "go-upkeep/internal/models" + "net/http" + "net/http/httptest" + "testing" +) + +func TestHTTPProviderDiscord(t *testing.T) { + var received map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := GetProvider(models.AlertConfig{Type: "discord", Settings: map[string]string{"url": srv.URL}}) + if err := p.Send("Test Title", "Test Body"); err != nil { + t.Fatalf("Send: %v", err) + } + + if received["content"] != "**Test Title**\nTest Body" { + t.Errorf("unexpected payload: %s", received["content"]) + } +} + +func TestHTTPProviderSlack(t *testing.T) { + var received map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := GetProvider(models.AlertConfig{Type: "slack", Settings: map[string]string{"url": srv.URL}}) + if err := p.Send("Alert", "Message"); err != nil { + t.Fatalf("Send: %v", err) + } + + if received["text"] != "*Alert*\nMessage" { + t.Errorf("unexpected payload: %s", received["text"]) + } +} + +func TestHTTPProviderWebhook(t *testing.T) { + var received map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := GetProvider(models.AlertConfig{Type: "webhook", Settings: map[string]string{"url": srv.URL}}) + if err := p.Send("Title", "Body"); err != nil { + t.Fatalf("Send: %v", err) + } + + if received["title"] != "Title" || received["message"] != "Body" || received["status"] != "alert" { + t.Errorf("unexpected webhook payload: %v", received) + } +} + +func TestHTTPProviderErrorOnHTTP4xx(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(403) + })) + defer srv.Close() + + p := GetProvider(models.AlertConfig{Type: "discord", Settings: map[string]string{"url": srv.URL}}) + if err := p.Send("Test", "Test"); err == nil { + t.Fatal("expected error on 403 response") + } +} + +func TestNtfyProvider(t *testing.T) { + var title, body string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + title = r.Header.Get("Title") + buf := make([]byte, 1024) + n, _ := r.Body.Read(buf) + body = string(buf[:n]) + w.WriteHeader(200) + })) + defer srv.Close() + + p := GetProvider(models.AlertConfig{Type: "ntfy", Settings: map[string]string{ + "url": srv.URL, + "topic": "test", + }}) + if err := p.Send("Alert Title", "Alert Body"); err != nil { + t.Fatalf("Send: %v", err) + } + + if title != "Alert Title" { + t.Errorf("expected title 'Alert Title', got '%s'", title) + } + if body != "Alert Body" { + t.Errorf("expected body 'Alert Body', got '%s'", body) + } +} + +func TestHTTPProviderTelegram(t *testing.T) { + var received map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := &HTTPProvider{URL: srv.URL, Payload: telegramPayload("12345")} + if err := p.Send("Alert", "Down"); err != nil { + t.Fatalf("Send: %v", err) + } + if received["chat_id"] != "12345" { + t.Errorf("expected chat_id '12345', got '%s'", received["chat_id"]) + } + if received["text"] != "*Alert*\nDown" { + t.Errorf("unexpected text: %s", received["text"]) + } + if received["parse_mode"] != "Markdown" { + t.Errorf("expected parse_mode 'Markdown', got '%s'", received["parse_mode"]) + } +} + +func TestHTTPProviderPagerDuty(t *testing.T) { + var received map[string]any + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := &HTTPProvider{URL: srv.URL, Payload: pagerdutyPayload("test-key", "critical")} + if err := p.Send("Alert", "Down"); err != nil { + t.Fatalf("Send: %v", err) + } + if received["routing_key"] != "test-key" { + t.Errorf("expected routing_key 'test-key', got '%v'", received["routing_key"]) + } + if received["event_action"] != "trigger" { + t.Errorf("expected event_action 'trigger', got '%v'", received["event_action"]) + } + payload := received["payload"].(map[string]any) + if payload["summary"] != "Alert: Down" { + t.Errorf("unexpected summary: %v", payload["summary"]) + } + if payload["severity"] != "critical" { + t.Errorf("expected severity 'critical', got '%v'", payload["severity"]) + } +} + +func TestHTTPProviderPushover(t *testing.T) { + var received map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := &HTTPProvider{URL: srv.URL, Payload: pushoverPayload("app-tok", "user-key")} + if err := p.Send("Alert", "Down"); err != nil { + t.Fatalf("Send: %v", err) + } + if received["token"] != "app-tok" { + t.Errorf("expected token 'app-tok', got '%s'", received["token"]) + } + if received["user"] != "user-key" { + t.Errorf("expected user 'user-key', got '%s'", received["user"]) + } + if received["title"] != "Alert" || received["message"] != "Down" { + t.Errorf("unexpected payload: %v", received) + } +} + +func TestHTTPProviderGotify(t *testing.T) { + var received map[string]any + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&received) + w.WriteHeader(200) + })) + defer srv.Close() + + p := &HTTPProvider{URL: srv.URL, Payload: gotifyPayload("8")} + if err := p.Send("Alert", "Down"); err != nil { + t.Fatalf("Send: %v", err) + } + if received["title"] != "Alert" || received["message"] != "Down" { + t.Errorf("unexpected payload: %v", received) + } + if pri, ok := received["priority"].(float64); !ok || pri != 8 { + t.Errorf("expected priority 8, got %v", received["priority"]) + } +} + +func TestGetProviderNewTypes(t *testing.T) { + for _, typ := range []string{"telegram", "pagerduty", "pushover", "gotify"} { + p := GetProvider(models.AlertConfig{Type: typ, Settings: map[string]string{ + "token": "x", "chat_id": "1", "routing_key": "k", "user": "u", "url": "http://localhost", + }}) + if p == nil { + t.Errorf("GetProvider(%q) returned nil", typ) + } + } +} + +func TestGetProviderUnknown(t *testing.T) { + p := GetProvider(models.AlertConfig{Type: "unknown"}) + if p != nil { + t.Error("expected nil for unknown provider type") + } +} diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index 295443d..03ec751 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "fmt" "go-upkeep/internal/monitor" "net/http" @@ -14,13 +15,13 @@ type Config struct { SharedKey string // Security Key } -func Start(cfg Config) { +func Start(ctx context.Context, cfg Config, eng *monitor.Engine) { if cfg.Mode == "leader" { fmt.Println("Cluster: Running as LEADER (Active)") if cfg.SharedKey != "" { fmt.Println("WARNING: Cluster mode enabled. Ensure the HTTP server is behind a TLS-terminating proxy.") } - monitor.SetEngineActive(true) + eng.SetActive(true) return } @@ -29,20 +30,24 @@ func Start(cfg Config) { if cfg.PeerURL != "" && !strings.HasPrefix(cfg.PeerURL, "https://") { fmt.Println("WARNING: Cluster peer URL is not HTTPS. Cluster secret will be sent in cleartext.") } - monitor.SetEngineActive(false) - go runFollowerLoop(cfg) + eng.SetActive(false) + go runFollowerLoop(ctx, cfg, eng) } + + // "probe" mode is handled directly in main.go before cluster.Start is called } -func runFollowerLoop(cfg Config) { +func runFollowerLoop(ctx context.Context, cfg Config, eng *monitor.Engine) { client := http.Client{Timeout: 2 * time.Second} - - // Failover Configuration failures := 0 threshold := 3 for { - time.Sleep(5 * time.Second) + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } req, _ := http.NewRequest("GET", cfg.PeerURL+"/api/health", nil) if cfg.SharedKey != "" { @@ -59,17 +64,15 @@ func runFollowerLoop(cfg Config) { if isLeaderHealthy { failures = 0 - if monitor.IsEngineActive() { - // Leader is back, yield - monitor.SetEngineActive(false) - monitor.AddLog("Cluster: Leader detected. Switching to PASSIVE.") + if eng.IsActive() { + eng.SetActive(false) + eng.AddLog("Cluster: Leader detected. Switching to PASSIVE.") } } else { failures++ - // If failures exceed threshold, take over - if failures >= threshold && !monitor.IsEngineActive() { - monitor.SetEngineActive(true) - monitor.AddLog("Cluster: Leader Unreachable. Switching to ACTIVE.") + if failures >= threshold && !eng.IsActive() { + eng.SetActive(true) + eng.AddLog("Cluster: Leader Unreachable. Switching to ACTIVE.") } } } diff --git a/internal/cluster/probe.go b/internal/cluster/probe.go new file mode 100644 index 0000000..6df0a36 --- /dev/null +++ b/internal/cluster/probe.go @@ -0,0 +1,187 @@ +package cluster + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/monitor" + "log" + "net/http" + "sync" + "time" +) + +type ProbeConfig struct { + NodeID string + NodeName string + Region string + LeaderURL string + SharedKey string + Interval int +} + +func RunProbe(ctx context.Context, cfg ProbeConfig) error { + if cfg.Interval < 10 { + cfg.Interval = 30 + } + + apiClient := &http.Client{Timeout: 10 * time.Second} + strictClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, + } + insecureClient := &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + } + + if err := probeRegister(ctx, apiClient, cfg); err != nil { + log.Printf("Probe: initial registration failed: %v (will retry)", err) + } + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + sites, err := probeFetchAssignments(ctx, apiClient, cfg) + if err != nil { + log.Printf("Probe: failed to fetch assignments: %v", err) + sleepCtx(ctx, 10*time.Second) + continue + } + + if len(sites) == 0 { + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + continue + } + + results := probeExecuteChecks(ctx, sites, strictClient, insecureClient) + + if len(results) > 0 { + if err := probeReportResults(ctx, apiClient, cfg, results); err != nil { + log.Printf("Probe: failed to report results: %v", err) + } + } + + sleepCtx(ctx, time.Duration(cfg.Interval)*time.Second) + } +} + +func probeRegister(ctx context.Context, client *http.Client, cfg ProbeConfig) error { + body, _ := json.Marshal(map[string]string{ + "id": cfg.NodeID, "name": cfg.NodeName, "region": cfg.Region, "version": "probe", + }) + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/register", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("register returned %d", resp.StatusCode) + } + return nil +} + +func probeFetchAssignments(ctx context.Context, client *http.Client, cfg ProbeConfig) ([]models.Site, error) { + req, err := http.NewRequestWithContext(ctx, "GET", cfg.LeaderURL+"/api/probe/assignments?node_id="+cfg.NodeID, nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, fmt.Errorf("assignments returned %d", resp.StatusCode) + } + var result struct { + Sites []models.Site `json:"sites"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, err + } + return result.Sites, nil +} + +type probeResultItem struct { + SiteID int `json:"site_id"` + LatencyNs int64 `json:"latency_ns"` + IsUp bool `json:"is_up"` +} + +func probeExecuteChecks(ctx context.Context, sites []models.Site, strict, insecure *http.Client) []probeResultItem { + var mu sync.Mutex + var results []probeResultItem + sem := make(chan struct{}, 10) + var wg sync.WaitGroup + + for _, site := range sites { + select { + case <-ctx.Done(): + break + default: + } + wg.Add(1) + sem <- struct{}{} + go func(s models.Site) { + defer wg.Done() + defer func() { <-sem }() + + cr := monitor.RunCheck(s, strict, insecure, false) + mu.Lock() + results = append(results, probeResultItem{ + SiteID: s.ID, + LatencyNs: cr.LatencyNs, + IsUp: cr.Status == "UP", + }) + mu.Unlock() + }(site) + } + wg.Wait() + return results +} + +func probeReportResults(ctx context.Context, client *http.Client, cfg ProbeConfig, results []probeResultItem) error { + body, err := json.Marshal(map[string]interface{}{ + "node_id": cfg.NodeID, + "results": results, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, "POST", cfg.LeaderURL+"/api/probe/results", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Upkeep-Secret", cfg.SharedKey) + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("results returned %d", resp.StatusCode) + } + fmt.Printf("Probe: reported %d check results\n", len(results)) + return nil +} + +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-time.After(d): + case <-ctx.Done(): + } +} diff --git a/internal/config/apply.go b/internal/config/apply.go new file mode 100644 index 0000000..ca37e36 --- /dev/null +++ b/internal/config/apply.go @@ -0,0 +1,396 @@ +package config + +import ( + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/store" + "reflect" + "strings" +) + +type ApplyOpts struct { + DryRun bool + Prune bool +} + +type Change struct { + Action string + Kind string + Name string + Details string +} + +func Apply(s store.Store, f *File, opts ApplyOpts) ([]Change, error) { + if err := Validate(f); err != nil { + return nil, err + } + + existingAlerts, err := s.GetAllAlerts() + if err != nil { + return nil, fmt.Errorf("load alerts: %w", err) + } + + existingSites, err := s.GetSites() + if err != nil { + return nil, fmt.Errorf("load sites: %w", err) + } + + existingAlertsByName := make(map[string]models.AlertConfig, len(existingAlerts)) + for _, a := range existingAlerts { + existingAlertsByName[a.Name] = a + } + + existingSitesByName := make(map[string]models.Site, len(existingSites)) + for _, s := range existingSites { + existingSitesByName[s.Name] = s + } + + var changes []Change + + alertMap := make(map[string]int) + for _, ea := range existingAlerts { + alertMap[ea.Name] = ea.ID + } + + desiredAlertNames := make(map[string]bool, len(f.Alerts)) + for _, a := range f.Alerts { + desiredAlertNames[a.Name] = true + existing, exists := existingAlertsByName[a.Name] + if !exists { + changes = append(changes, Change{Action: "create", Kind: "alert", Name: a.Name, Details: a.Type}) + if !opts.DryRun { + id, err := s.AddAlertReturningID(a.Name, a.Type, a.Settings) + if err != nil { + return changes, fmt.Errorf("create alert %q: %w", a.Name, err) + } + alertMap[a.Name] = id + } + } else { + alertMap[a.Name] = existing.ID + if diff := diffAlert(existing, a); diff != "" { + changes = append(changes, Change{Action: "update", Kind: "alert", Name: a.Name, Details: diff}) + if !opts.DryRun { + if err := s.UpdateAlert(existing.ID, a.Name, a.Type, a.Settings); err != nil { + return changes, fmt.Errorf("update alert %q: %w", a.Name, err) + } + } + } + } + } + + desiredMonitorNames := make(map[string]bool) + collectMonitorNames(f.Monitors, desiredMonitorNames) + + var groups []Monitor + var topLevel []Monitor + for _, m := range f.Monitors { + if m.Type == "group" { + groups = append(groups, m) + } else { + topLevel = append(topLevel, m) + } + } + + groupMap := make(map[string]int) + for _, g := range groups { + alertID, err := resolveAlertID(alertMap, g.Alert) + if err != nil { + return changes, fmt.Errorf("monitor %q: %w", g.Name, err) + } + site := monitorToSite(g, alertID, 0) + existing, exists := existingSitesByName[g.Name] + if !exists { + changes = append(changes, Change{Action: "create", Kind: "monitor", Name: g.Name, Details: "group"}) + if !opts.DryRun { + id, err := s.AddSiteReturningID(site) + if err != nil { + return changes, fmt.Errorf("create group %q: %w", g.Name, err) + } + groupMap[g.Name] = id + } + } else { + groupMap[g.Name] = existing.ID + site.ID = existing.ID + if diff := diffSite(normalizeSite(existing), site); diff != "" { + changes = append(changes, Change{Action: "update", Kind: "monitor", Name: g.Name, Details: diff}) + if !opts.DryRun { + if err := s.UpdateSite(site); err != nil { + return changes, fmt.Errorf("update group %q: %w", g.Name, err) + } + } + } + } + } + + for _, g := range groups { + parentID := groupMap[g.Name] + for _, child := range g.Monitors { + c, err := applyMonitor(s, child, alertMap, existingSitesByName, parentID, opts.DryRun) + if err != nil { + return changes, err + } + changes = append(changes, c...) + } + } + + for _, m := range topLevel { + c, err := applyMonitor(s, m, alertMap, existingSitesByName, 0, opts.DryRun) + if err != nil { + return changes, err + } + changes = append(changes, c...) + } + + if opts.Prune { + var childDeletes []Change + var groupDeletes []Change + for _, es := range existingSites { + if desiredMonitorNames[es.Name] { + continue + } + c := Change{Action: "delete", Kind: "monitor", Name: es.Name, Details: es.Type} + if es.Type == "group" { + groupDeletes = append(groupDeletes, c) + } else { + childDeletes = append(childDeletes, c) + } + if !opts.DryRun { + if err := s.DeleteSite(es.ID); err != nil { + return changes, fmt.Errorf("delete monitor %q: %w", es.Name, err) + } + } + } + changes = append(changes, childDeletes...) + changes = append(changes, groupDeletes...) + + for _, ea := range existingAlerts { + if desiredAlertNames[ea.Name] { + continue + } + changes = append(changes, Change{Action: "delete", Kind: "alert", Name: ea.Name, Details: ea.Type}) + if !opts.DryRun { + if err := s.DeleteAlert(ea.ID); err != nil { + return changes, fmt.Errorf("delete alert %q: %w", ea.Name, err) + } + } + } + } + + return changes, nil +} + +func applyMonitor(s store.Store, m Monitor, alertMap map[string]int, existing map[string]models.Site, parentID int, dryRun bool) ([]Change, error) { + alertID, err := resolveAlertID(alertMap, m.Alert) + if err != nil { + return nil, fmt.Errorf("monitor %q: %w", m.Name, err) + } + site := monitorToSite(m, alertID, parentID) + + var changes []Change + ex, exists := existing[m.Name] + if !exists { + changes = append(changes, Change{Action: "create", Kind: "monitor", Name: m.Name, Details: m.Type}) + if !dryRun { + if _, err := s.AddSiteReturningID(site); err != nil { + return changes, fmt.Errorf("create monitor %q: %w", m.Name, err) + } + } + } else { + site.ID = ex.ID + if diff := diffSite(normalizeSite(ex), site); diff != "" { + changes = append(changes, Change{Action: "update", Kind: "monitor", Name: m.Name, Details: diff}) + if !dryRun { + if err := s.UpdateSite(site); err != nil { + return changes, fmt.Errorf("update monitor %q: %w", m.Name, err) + } + } + } + } + return changes, nil +} + +func resolveAlertID(alertMap map[string]int, name string) (int, error) { + if name == "" { + return 0, nil + } + id, ok := alertMap[name] + if !ok { + return 0, fmt.Errorf("alert %q not found", name) + } + return id, nil +} + +func monitorToSite(m Monitor, alertID, parentID int) models.Site { + s := models.Site{ + Name: m.Name, + Type: m.Type, + URL: m.URL, + Interval: m.Interval, + AlertID: alertID, + ParentID: parentID, + + CheckSSL: m.CheckSSL, + MaxRetries: m.MaxRetries, + Hostname: m.Hostname, + Port: m.Port, + Timeout: m.Timeout, + Description: m.Description, + DNSResolveType: m.DNSResolveType, + DNSServer: m.DNSServer, + IgnoreTLS: m.IgnoreTLS, + Paused: m.Paused, + Regions: m.Regions, + } + + s.ExpiryThreshold = m.ExpiryThreshold + if s.ExpiryThreshold == 0 { + s.ExpiryThreshold = 7 + } + + s.Method = m.Method + if s.Method == "" { + s.Method = "GET" + } + + s.AcceptedCodes = m.AcceptedCodes + if s.AcceptedCodes == "" { + s.AcceptedCodes = "200-299" + } + + return s +} + +func collectMonitorNames(monitors []Monitor, names map[string]bool) { + for _, m := range monitors { + names[m.Name] = true + collectMonitorNames(m.Monitors, names) + } +} + +func normalizeSite(s models.Site) models.Site { + if s.Method == "" { + s.Method = "GET" + } + if s.AcceptedCodes == "" { + s.AcceptedCodes = "200-299" + } + if s.ExpiryThreshold == 0 { + s.ExpiryThreshold = 7 + } + return s +} + +func diffAlert(existing models.AlertConfig, desired Alert) string { + var diffs []string + if existing.Type != desired.Type { + diffs = append(diffs, fmt.Sprintf("type: %s -> %s", existing.Type, desired.Type)) + } + if !reflect.DeepEqual(existing.Settings, desired.Settings) { + diffs = append(diffs, "settings changed") + } + return strings.Join(diffs, ", ") +} + +func diffSite(existing, desired models.Site) string { + var diffs []string + if existing.URL != desired.URL { + diffs = append(diffs, fmt.Sprintf("url: %s -> %s", existing.URL, desired.URL)) + } + if existing.Type != desired.Type { + diffs = append(diffs, fmt.Sprintf("type: %s -> %s", existing.Type, desired.Type)) + } + if existing.Interval != desired.Interval { + diffs = append(diffs, fmt.Sprintf("interval: %d -> %d", existing.Interval, desired.Interval)) + } + if existing.AlertID != desired.AlertID { + diffs = append(diffs, fmt.Sprintf("alert_id: %d -> %d", existing.AlertID, desired.AlertID)) + } + if existing.CheckSSL != desired.CheckSSL { + diffs = append(diffs, fmt.Sprintf("check_ssl: %v -> %v", existing.CheckSSL, desired.CheckSSL)) + } + if existing.ExpiryThreshold != desired.ExpiryThreshold { + diffs = append(diffs, fmt.Sprintf("expiry_threshold: %d -> %d", existing.ExpiryThreshold, desired.ExpiryThreshold)) + } + if existing.MaxRetries != desired.MaxRetries { + diffs = append(diffs, fmt.Sprintf("max_retries: %d -> %d", existing.MaxRetries, desired.MaxRetries)) + } + if existing.Hostname != desired.Hostname { + diffs = append(diffs, fmt.Sprintf("hostname: %s -> %s", existing.Hostname, desired.Hostname)) + } + if existing.Port != desired.Port { + diffs = append(diffs, fmt.Sprintf("port: %d -> %d", existing.Port, desired.Port)) + } + if existing.Timeout != desired.Timeout { + diffs = append(diffs, fmt.Sprintf("timeout: %d -> %d", existing.Timeout, desired.Timeout)) + } + if existing.Method != desired.Method { + diffs = append(diffs, fmt.Sprintf("method: %s -> %s", existing.Method, desired.Method)) + } + if existing.Description != desired.Description { + diffs = append(diffs, "description changed") + } + if existing.ParentID != desired.ParentID { + diffs = append(diffs, fmt.Sprintf("parent_id: %d -> %d", existing.ParentID, desired.ParentID)) + } + if existing.AcceptedCodes != desired.AcceptedCodes { + diffs = append(diffs, fmt.Sprintf("accepted_codes: %s -> %s", existing.AcceptedCodes, desired.AcceptedCodes)) + } + if existing.DNSResolveType != desired.DNSResolveType { + diffs = append(diffs, fmt.Sprintf("dns_resolve_type: %s -> %s", existing.DNSResolveType, desired.DNSResolveType)) + } + if existing.DNSServer != desired.DNSServer { + diffs = append(diffs, fmt.Sprintf("dns_server: %s -> %s", existing.DNSServer, desired.DNSServer)) + } + if existing.IgnoreTLS != desired.IgnoreTLS { + diffs = append(diffs, fmt.Sprintf("ignore_tls: %v -> %v", existing.IgnoreTLS, desired.IgnoreTLS)) + } + if existing.Paused != desired.Paused { + diffs = append(diffs, fmt.Sprintf("paused: %v -> %v", existing.Paused, desired.Paused)) + } + if existing.Regions != desired.Regions { + diffs = append(diffs, fmt.Sprintf("regions: %s -> %s", existing.Regions, desired.Regions)) + } + return strings.Join(diffs, ", ") +} + +func FormatChanges(changes []Change, dryRun bool) string { + var b strings.Builder + if dryRun { + b.WriteString("Dry run โ no changes applied.\n\n") + } + + if len(changes) == 0 { + b.WriteString("No changes needed. State is up to date.\n") + return b.String() + } + + creates, updates, deletes := 0, 0, 0 + for _, c := range changes { + var prefix string + switch c.Action { + case "create": + prefix = " + create" + creates++ + case "update": + prefix = " ~ update" + updates++ + case "delete": + prefix = " - delete" + deletes++ + } + line := fmt.Sprintf("%s %s %q", prefix, c.Kind, c.Name) + if c.Details != "" { + line += " (" + c.Details + ")" + } + b.WriteString(line + "\n") + } + + b.WriteString("\n") + if dryRun { + fmt.Fprintf(&b, "Summary: %d to create, %d to update, %d to delete\n", creates, updates, deletes) + } else { + total := creates + updates + deletes + fmt.Fprintf(&b, "Applied %d changes (%d created, %d updated, %d deleted)\n", total, creates, updates, deletes) + } + return b.String() +} diff --git a/internal/config/apply_test.go b/internal/config/apply_test.go new file mode 100644 index 0000000..824fd61 --- /dev/null +++ b/internal/config/apply_test.go @@ -0,0 +1,290 @@ +package config + +import ( + "go-upkeep/internal/models" + "go-upkeep/internal/store" + "strings" + "testing" +) + +func newTestStore(t *testing.T) store.Store { + t.Helper() + s, err := store.NewSQLiteStore(":memory:") + if err != nil { + t.Fatalf("NewSQLiteStore: %v", err) + } + if err := s.Init(); err != nil { + t.Fatalf("Init: %v", err) + } + return s +} + +func TestApplyCreateFromScratch(t *testing.T) { + s := newTestStore(t) + f := &File{ + Alerts: []Alert{ + {Name: "Discord", Type: "discord", Settings: map[string]string{"url": "https://example.com"}}, + }, + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Discord"}, + {Name: "Ping", Type: "ping", Hostname: "10.0.0.1", Interval: 30}, + }, + } + + changes, err := Apply(s, f, ApplyOpts{}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + creates := 0 + for _, c := range changes { + if c.Action == "create" { + creates++ + } + } + if creates != 3 { + t.Fatalf("expected 3 creates, got %d", creates) + } + + sites, _ := s.GetSites() + if len(sites) != 2 { + t.Fatalf("expected 2 sites, got %d", len(sites)) + } + + alerts, _ := s.GetAllAlerts() + if len(alerts) != 1 { + t.Fatalf("expected 1 alert, got %d", len(alerts)) + } +} + +func TestApplyIdempotent(t *testing.T) { + s := newTestStore(t) + f := &File{ + Alerts: []Alert{ + {Name: "Discord", Type: "discord", Settings: map[string]string{"url": "https://example.com"}}, + }, + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Discord"}, + }, + } + + if _, err := Apply(s, f, ApplyOpts{}); err != nil { + t.Fatalf("first Apply: %v", err) + } + + changes, err := Apply(s, f, ApplyOpts{}) + if err != nil { + t.Fatalf("second Apply: %v", err) + } + + if len(changes) != 0 { + t.Fatalf("expected 0 changes on second apply, got %d: %+v", len(changes), changes) + } +} + +func TestApplyUpdate(t *testing.T) { + s := newTestStore(t) + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30}, + }, + } + + if _, err := Apply(s, f, ApplyOpts{}); err != nil { + t.Fatalf("first Apply: %v", err) + } + + f.Monitors[0].Interval = 60 + changes, err := Apply(s, f, ApplyOpts{}) + if err != nil { + t.Fatalf("second Apply: %v", err) + } + + if len(changes) != 1 || changes[0].Action != "update" { + t.Fatalf("expected 1 update, got %+v", changes) + } + + sites, _ := s.GetSites() + if sites[0].Interval != 60 { + t.Fatalf("expected interval 60, got %d", sites[0].Interval) + } +} + +func TestApplyPrune(t *testing.T) { + s := newTestStore(t) + s.AddSite(models.Site{Name: "Keep", URL: "https://keep.com", Type: "http", Interval: 30, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + s.AddSite(models.Site{Name: "Remove", URL: "https://remove.com", Type: "http", Interval: 30, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + + f := &File{ + Monitors: []Monitor{ + {Name: "Keep", Type: "http", URL: "https://keep.com", Interval: 30}, + }, + } + + changes, err := Apply(s, f, ApplyOpts{Prune: true}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + deleteCount := 0 + for _, c := range changes { + if c.Action == "delete" { + deleteCount++ + } + } + if deleteCount != 1 { + t.Fatalf("expected 1 delete, got %d", deleteCount) + } + + sites, _ := s.GetSites() + if len(sites) != 1 || sites[0].Name != "Keep" { + t.Fatalf("expected only 'Keep', got %+v", sites) + } +} + +func TestApplyDryRun(t *testing.T) { + s := newTestStore(t) + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30}, + }, + } + + changes, err := Apply(s, f, ApplyOpts{DryRun: true}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + if len(changes) != 1 || changes[0].Action != "create" { + t.Fatalf("expected 1 create in dry-run, got %+v", changes) + } + + sites, _ := s.GetSites() + if len(sites) != 0 { + t.Fatalf("expected 0 sites after dry-run, got %d", len(sites)) + } +} + +func TestApplyGroupHierarchy(t *testing.T) { + s := newTestStore(t) + f := &File{ + Monitors: []Monitor{ + { + Name: "Prod", Type: "group", + Monitors: []Monitor{ + {Name: "Prod Web", Type: "http", URL: "https://prod.example.com", Interval: 15}, + {Name: "Prod DB", Type: "port", Hostname: "db.internal", Port: 5432, Interval: 30}, + }, + }, + }, + } + + changes, err := Apply(s, f, ApplyOpts{}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + if len(changes) != 3 { + t.Fatalf("expected 3 creates, got %d", len(changes)) + } + + sites, _ := s.GetSites() + var group models.Site + for _, s := range sites { + if s.Type == "group" { + group = s + break + } + } + + if group.ID == 0 { + t.Fatal("group not found") + } + + childCount := 0 + for _, s := range sites { + if s.ParentID == group.ID { + childCount++ + } + } + if childCount != 2 { + t.Fatalf("expected 2 children, got %d", childCount) + } +} + +func TestApplyAlertReference(t *testing.T) { + s := newTestStore(t) + f := &File{ + Alerts: []Alert{ + {Name: "Discord", Type: "discord", Settings: map[string]string{"url": "https://example.com"}}, + }, + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Discord"}, + }, + } + + if _, err := Apply(s, f, ApplyOpts{}); err != nil { + t.Fatalf("Apply: %v", err) + } + + sites, _ := s.GetSites() + alerts, _ := s.GetAllAlerts() + + if sites[0].AlertID != alerts[0].ID { + t.Fatalf("expected alert_id %d, got %d", alerts[0].ID, sites[0].AlertID) + } +} + +func TestApplyInvalidAlertRef(t *testing.T) { + s := newTestStore(t) + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Nonexistent"}, + }, + } + + _, err := Apply(s, f, ApplyOpts{}) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected alert not found error, got %v", err) + } +} + +func TestApplyDuplicateNames(t *testing.T) { + s := newTestStore(t) + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://a.com", Interval: 30}, + {Name: "Web", Type: "http", URL: "https://b.com", Interval: 30}, + }, + } + + _, err := Apply(s, f, ApplyOpts{}) + if err == nil || !strings.Contains(err.Error(), "duplicate") { + t.Fatalf("expected duplicate error, got %v", err) + } +} + +func TestApplyExistingAlertReference(t *testing.T) { + s := newTestStore(t) + s.AddAlert("Existing", "webhook", map[string]string{"url": "https://example.com"}) + + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Existing"}, + }, + } + + changes, err := Apply(s, f, ApplyOpts{}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + if len(changes) != 1 || changes[0].Action != "create" { + t.Fatalf("expected 1 create, got %+v", changes) + } + + sites, _ := s.GetSites() + if sites[0].AlertID == 0 { + t.Fatal("expected non-zero alert_id for existing alert reference") + } +} diff --git a/internal/config/export.go b/internal/config/export.go new file mode 100644 index 0000000..a8cb981 --- /dev/null +++ b/internal/config/export.go @@ -0,0 +1,158 @@ +package config + +import ( + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/store" + "os" + "sort" + + "gopkg.in/yaml.v3" +) + +func Export(s store.Store) (*File, error) { + dbAlerts, err := s.GetAllAlerts() + if err != nil { + return nil, fmt.Errorf("load alerts: %w", err) + } + + dbSites, err := s.GetSites() + if err != nil { + return nil, fmt.Errorf("load sites: %w", err) + } + + alertIDToName := make(map[int]string, len(dbAlerts)) + var yamlAlerts []Alert + for _, a := range dbAlerts { + alertIDToName[a.ID] = a.Name + yamlAlerts = append(yamlAlerts, Alert{ + Name: a.Name, + Type: a.Type, + Settings: a.Settings, + }) + } + + groups := make(map[int]models.Site) + children := make(map[int][]models.Site) + var topLevel []models.Site + + for _, s := range dbSites { + switch { + case s.Type == "group": + groups[s.ID] = s + case s.ParentID > 0: + children[s.ParentID] = append(children[s.ParentID], s) + default: + topLevel = append(topLevel, s) + } + } + + var yamlMonitors []Monitor + + groupIDs := make([]int, 0, len(groups)) + for id := range groups { + groupIDs = append(groupIDs, id) + } + sort.Ints(groupIDs) + + for _, gid := range groupIDs { + g := groups[gid] + ym := siteToMonitor(g, alertIDToName) + kids := children[gid] + sort.Slice(kids, func(i, j int) bool { return kids[i].ID < kids[j].ID }) + for _, child := range kids { + ym.Monitors = append(ym.Monitors, siteToMonitor(child, alertIDToName)) + } + yamlMonitors = append(yamlMonitors, ym) + } + + sort.Slice(topLevel, func(i, j int) bool { return topLevel[i].ID < topLevel[j].ID }) + for _, s := range topLevel { + yamlMonitors = append(yamlMonitors, siteToMonitor(s, alertIDToName)) + } + + return &File{Alerts: yamlAlerts, Monitors: yamlMonitors}, nil +} + +func siteToMonitor(s models.Site, alertIDToName map[int]string) Monitor { + m := Monitor{ + Name: s.Name, + Type: s.Type, + Interval: s.Interval, + } + + if s.AlertID > 0 { + if name, ok := alertIDToName[s.AlertID]; ok { + m.Alert = name + } + } + + if s.URL != "" { + m.URL = s.URL + } + if s.Hostname != "" { + m.Hostname = s.Hostname + } + if s.Port != 0 { + m.Port = s.Port + } + if s.Timeout != 0 { + m.Timeout = s.Timeout + } + if s.Description != "" { + m.Description = s.Description + } + if s.DNSResolveType != "" { + m.DNSResolveType = s.DNSResolveType + } + if s.DNSServer != "" { + m.DNSServer = s.DNSServer + } + + if s.Method != "" && s.Method != "GET" { + m.Method = s.Method + } + if s.AcceptedCodes != "" && s.AcceptedCodes != "200-299" { + m.AcceptedCodes = s.AcceptedCodes + } + if s.ExpiryThreshold != 0 && s.ExpiryThreshold != 7 { + m.ExpiryThreshold = s.ExpiryThreshold + } + if s.MaxRetries != 0 { + m.MaxRetries = s.MaxRetries + } + + m.CheckSSL = s.CheckSSL + m.IgnoreTLS = s.IgnoreTLS + m.Paused = s.Paused + + if s.Regions != "" { + m.Regions = s.Regions + } + + return m +} + +func WriteFile(f *File, path string) error { + data, err := yaml.Marshal(f) + if err != nil { + return fmt.Errorf("marshal yaml: %w", err) + } + if path == "-" || path == "" { + _, err = os.Stdout.Write(data) + return err + } + return os.WriteFile(path, data, 0644) +} + +func LoadFile(path string) (*File, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read %s: %w", path, err) + } + var f File + if err := yaml.Unmarshal(data, &f); err != nil { + return nil, fmt.Errorf("parse %s: %w", path, err) + } + return &f, nil +} diff --git a/internal/config/export_test.go b/internal/config/export_test.go new file mode 100644 index 0000000..16b5534 --- /dev/null +++ b/internal/config/export_test.go @@ -0,0 +1,140 @@ +package config + +import ( + "go-upkeep/internal/models" + "testing" +) + +func TestExportEmpty(t *testing.T) { + s := newTestStore(t) + f, err := Export(s) + if err != nil { + t.Fatalf("Export: %v", err) + } + if len(f.Alerts) != 0 || len(f.Monitors) != 0 { + t.Fatalf("expected empty file, got %d alerts %d monitors", len(f.Alerts), len(f.Monitors)) + } +} + +func TestExportAlertNames(t *testing.T) { + s := newTestStore(t) + s.AddAlert("Discord", "discord", map[string]string{"url": "https://example.com"}) + alerts, _ := s.GetAllAlerts() + s.AddSite(models.Site{Name: "Web", URL: "https://example.com", Type: "http", Interval: 30, AlertID: alerts[0].ID, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + + f, err := Export(s) + if err != nil { + t.Fatalf("Export: %v", err) + } + + if len(f.Monitors) != 1 { + t.Fatalf("expected 1 monitor, got %d", len(f.Monitors)) + } + if f.Monitors[0].Alert != "Discord" { + t.Fatalf("expected alert name 'Discord', got %q", f.Monitors[0].Alert) + } +} + +func TestExportGroupHierarchy(t *testing.T) { + s := newTestStore(t) + groupID, _ := s.AddSiteReturningID(models.Site{Name: "Prod", Type: "group", ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + s.AddSite(models.Site{Name: "Prod Web", URL: "https://prod.example.com", Type: "http", Interval: 15, ParentID: groupID, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + s.AddSite(models.Site{Name: "Top Level", URL: "https://example.com", Type: "http", Interval: 30, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + + f, err := Export(s) + if err != nil { + t.Fatalf("Export: %v", err) + } + + if len(f.Monitors) != 2 { + t.Fatalf("expected 2 top-level monitors, got %d", len(f.Monitors)) + } + + var group *Monitor + for i := range f.Monitors { + if f.Monitors[i].Type == "group" { + group = &f.Monitors[i] + break + } + } + if group == nil { + t.Fatal("group not found in export") + } + if len(group.Monitors) != 1 { + t.Fatalf("expected 1 child in group, got %d", len(group.Monitors)) + } + if group.Monitors[0].Name != "Prod Web" { + t.Fatalf("expected child 'Prod Web', got %q", group.Monitors[0].Name) + } +} + +func TestExportOmitsDefaults(t *testing.T) { + s := newTestStore(t) + s.AddSite(models.Site{ + Name: "Web", URL: "https://example.com", Type: "http", Interval: 30, + Method: "GET", AcceptedCodes: "200-299", ExpiryThreshold: 7, + }) + + f, err := Export(s) + if err != nil { + t.Fatalf("Export: %v", err) + } + + m := f.Monitors[0] + if m.Method != "" { + t.Errorf("expected empty method (default omitted), got %q", m.Method) + } + if m.AcceptedCodes != "" { + t.Errorf("expected empty accepted_codes (default omitted), got %q", m.AcceptedCodes) + } + if m.ExpiryThreshold != 0 { + t.Errorf("expected 0 expiry_threshold (default omitted), got %d", m.ExpiryThreshold) + } +} + +func TestExportRoundTrip(t *testing.T) { + s1 := newTestStore(t) + s1.AddAlert("Discord", "discord", map[string]string{"url": "https://example.com"}) + alerts, _ := s1.GetAllAlerts() + s1.AddSite(models.Site{Name: "Web", URL: "https://example.com", Type: "http", Interval: 30, AlertID: alerts[0].ID, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + s1.AddSite(models.Site{Name: "Ping", Type: "ping", Hostname: "10.0.0.1", Interval: 60, ExpiryThreshold: 7, Method: "GET", AcceptedCodes: "200-299"}) + + exported, err := Export(s1) + if err != nil { + t.Fatalf("Export: %v", err) + } + + s2 := newTestStore(t) + changes, err := Apply(s2, exported, ApplyOpts{}) + if err != nil { + t.Fatalf("Apply: %v", err) + } + + creates := 0 + for _, c := range changes { + if c.Action == "create" { + creates++ + } + } + if creates != 3 { + t.Fatalf("expected 3 creates, got %d", creates) + } + + reexported, err := Export(s2) + if err != nil { + t.Fatalf("re-Export: %v", err) + } + + if len(reexported.Alerts) != len(exported.Alerts) { + t.Fatalf("alert count mismatch: %d vs %d", len(reexported.Alerts), len(exported.Alerts)) + } + if len(reexported.Monitors) != len(exported.Monitors) { + t.Fatalf("monitor count mismatch: %d vs %d", len(reexported.Monitors), len(exported.Monitors)) + } + + for i, m := range reexported.Monitors { + if m.Name != exported.Monitors[i].Name { + t.Errorf("monitor %d name mismatch: %q vs %q", i, m.Name, exported.Monitors[i].Name) + } + } +} diff --git a/internal/config/types.go b/internal/config/types.go new file mode 100644 index 0000000..8613d7f --- /dev/null +++ b/internal/config/types.go @@ -0,0 +1,35 @@ +package config + +type File struct { + Alerts []Alert `yaml:"alerts,omitempty"` + Monitors []Monitor `yaml:"monitors,omitempty"` +} + +type Alert struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Settings map[string]string `yaml:"settings"` +} + +type Monitor struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + URL string `yaml:"url,omitempty"` + Interval int `yaml:"interval,omitempty"` + Alert string `yaml:"alert,omitempty"` + CheckSSL bool `yaml:"check_ssl,omitempty"` + ExpiryThreshold int `yaml:"expiry_threshold,omitempty"` + MaxRetries int `yaml:"max_retries,omitempty"` + Hostname string `yaml:"hostname,omitempty"` + Port int `yaml:"port,omitempty"` + Timeout int `yaml:"timeout,omitempty"` + Method string `yaml:"method,omitempty"` + Description string `yaml:"description,omitempty"` + AcceptedCodes string `yaml:"accepted_codes,omitempty"` + DNSResolveType string `yaml:"dns_resolve_type,omitempty"` + DNSServer string `yaml:"dns_server,omitempty"` + IgnoreTLS bool `yaml:"ignore_tls,omitempty"` + Paused bool `yaml:"paused,omitempty"` + Regions string `yaml:"regions,omitempty"` + Monitors []Monitor `yaml:"monitors,omitempty"` +} diff --git a/internal/config/validate.go b/internal/config/validate.go new file mode 100644 index 0000000..e9a2f2d --- /dev/null +++ b/internal/config/validate.go @@ -0,0 +1,88 @@ +package config + +import "fmt" + +var validMonitorTypes = map[string]bool{ + "http": true, + "push": true, + "ping": true, + "port": true, + "dns": true, + "group": true, +} + +func Validate(f *File) error { + alertNames := make(map[string]bool, len(f.Alerts)) + for _, a := range f.Alerts { + if a.Name == "" { + return fmt.Errorf("alert missing name") + } + if alertNames[a.Name] { + return fmt.Errorf("duplicate alert name %q", a.Name) + } + alertNames[a.Name] = true + if a.Type == "" { + return fmt.Errorf("alert %q: missing type", a.Name) + } + } + + monitorNames := make(map[string]bool) + for _, m := range f.Monitors { + if err := validateMonitor(m, monitorNames, false); err != nil { + return err + } + } + return nil +} + +func validateMonitor(m Monitor, names map[string]bool, nested bool) error { + if m.Name == "" { + return fmt.Errorf("monitor missing name") + } + if names[m.Name] { + return fmt.Errorf("duplicate monitor name %q", m.Name) + } + names[m.Name] = true + + if !validMonitorTypes[m.Type] { + return fmt.Errorf("monitor %q: invalid type %q", m.Name, m.Type) + } + + if m.Type == "group" && nested { + return fmt.Errorf("monitor %q: groups cannot be nested inside other groups", m.Name) + } + + switch m.Type { + case "http": + if m.URL == "" { + return fmt.Errorf("monitor %q: url is required for type http", m.Name) + } + case "ping": + if m.Hostname == "" { + return fmt.Errorf("monitor %q: hostname is required for type ping", m.Name) + } + case "port": + if m.Hostname == "" { + return fmt.Errorf("monitor %q: hostname is required for type port", m.Name) + } + if m.Port == 0 { + return fmt.Errorf("monitor %q: port is required for type port", m.Name) + } + case "dns": + if m.Hostname == "" { + return fmt.Errorf("monitor %q: hostname is required for type dns", m.Name) + } + } + + if m.Type == "group" { + for _, child := range m.Monitors { + if err := validateMonitor(child, names, true); err != nil { + return err + } + } + } else if len(m.Monitors) > 0 { + return fmt.Errorf("monitor %q: only groups can have nested monitors", m.Name) + } + + return nil +} diff --git a/internal/config/validate_test.go b/internal/config/validate_test.go new file mode 100644 index 0000000..fdcfd42 --- /dev/null +++ b/internal/config/validate_test.go @@ -0,0 +1,163 @@ +package config + +import ( + "strings" + "testing" +) + +func TestValidateDuplicateAlertNames(t *testing.T) { + f := &File{ + Alerts: []Alert{ + {Name: "A", Type: "discord"}, + {Name: "A", Type: "slack"}, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "duplicate alert name") { + t.Fatalf("expected duplicate alert error, got %v", err) + } +} + +func TestValidateDuplicateMonitorNames(t *testing.T) { + f := &File{ + Monitors: []Monitor{ + {Name: "M", Type: "http", URL: "https://example.com"}, + {Name: "M", Type: "ping", Hostname: "10.0.0.1"}, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "duplicate monitor name") { + t.Fatalf("expected duplicate monitor error, got %v", err) + } +} + +func TestValidateDuplicateNameAcrossGroups(t *testing.T) { + f := &File{ + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com"}, + { + Name: "Prod", Type: "group", + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://prod.example.com"}, + }, + }, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "duplicate monitor name") { + t.Fatalf("expected duplicate name across group, got %v", err) + } +} + +func TestValidateNestedGroupReject(t *testing.T) { + f := &File{ + Monitors: []Monitor{ + { + Name: "Outer", Type: "group", + Monitors: []Monitor{ + {Name: "Inner", Type: "group"}, + }, + }, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "cannot be nested") { + t.Fatalf("expected nested group error, got %v", err) + } +} + +func TestValidateRequiredFields(t *testing.T) { + tests := []struct { + name string + monitor Monitor + wantErr string + }{ + {"http no url", Monitor{Name: "A", Type: "http"}, "url is required"}, + {"ping no hostname", Monitor{Name: "A", Type: "ping"}, "hostname is required"}, + {"port no hostname", Monitor{Name: "A", Type: "port", Port: 22}, "hostname is required"}, + {"port no port", Monitor{Name: "A", Type: "port", Hostname: "h"}, "port is required"}, + {"dns no hostname", Monitor{Name: "A", Type: "dns"}, "hostname is required"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := &File{Monitors: []Monitor{tt.monitor}} + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("expected %q, got %v", tt.wantErr, err) + } + }) + } +} + +func TestValidateInvalidMonitorType(t *testing.T) { + f := &File{ + Monitors: []Monitor{ + {Name: "A", Type: "ftp"}, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "invalid type") { + t.Fatalf("expected invalid type error, got %v", err) + } +} + +func TestValidateNonGroupWithChildren(t *testing.T) { + f := &File{ + Monitors: []Monitor{ + { + Name: "A", Type: "http", URL: "https://example.com", + Monitors: []Monitor{ + {Name: "B", Type: "ping", Hostname: "h"}, + }, + }, + }, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "only groups") { + t.Fatalf("expected only-groups error, got %v", err) + } +} + +func TestValidateAlertMissingName(t *testing.T) { + f := &File{ + Alerts: []Alert{{Type: "discord"}}, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "missing name") { + t.Fatalf("expected missing name error, got %v", err) + } +} + +func TestValidateAlertMissingType(t *testing.T) { + f := &File{ + Alerts: []Alert{{Name: "A"}}, + } + err := Validate(f) + if err == nil || !strings.Contains(err.Error(), "missing type") { + t.Fatalf("expected missing type error, got %v", err) + } +} + +func TestValidateValidConfig(t *testing.T) { + f := &File{ + Alerts: []Alert{ + {Name: "Discord", Type: "discord", Settings: map[string]string{"url": "https://example.com"}}, + }, + Monitors: []Monitor{ + {Name: "Web", Type: "http", URL: "https://example.com", Interval: 30, Alert: "Discord"}, + {Name: "Ping", Type: "ping", Hostname: "10.0.0.1", Interval: 30}, + {Name: "SSH", Type: "port", Hostname: "10.0.0.1", Port: 22, Interval: 60}, + {Name: "DNS", Type: "dns", Hostname: "example.com", Interval: 60}, + {Name: "Cron", Type: "push", Interval: 300}, + { + Name: "Prod", Type: "group", + Monitors: []Monitor{ + {Name: "Prod Web", Type: "http", URL: "https://prod.example.com", Interval: 15}, + }, + }, + }, + } + if err := Validate(f); err != nil { + t.Fatalf("expected valid config, got %v", err) + } +} diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go new file mode 100644 index 0000000..a85493f --- /dev/null +++ b/internal/metrics/prometheus.go @@ -0,0 +1,112 @@ +package metrics + +import ( + "fmt" + "go-upkeep/internal/models" + "go-upkeep/internal/monitor" + "net/http" + "sort" + "strings" +) + +func Handler(eng *monitor.Engine) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + sites := eng.GetAllSites() + sort.Slice(sites, func(i, j int) bool { return sites[i].ID < sites[j].ID }) + + var b strings.Builder + + writeHelp(&b, "upkeep_monitor_up", "gauge", "Whether the monitor is up (1) or down (0).") + for _, s := range sites { + val := 0 + if s.Status == "UP" { + val = 1 + } + writeGauge(&b, "upkeep_monitor_up", labels(s), float64(val)) + } + + writeHelp(&b, "upkeep_monitor_latency_seconds", "gauge", "Last check latency in seconds.") + for _, s := range sites { + writeGauge(&b, "upkeep_monitor_latency_seconds", labels(s), s.Latency.Seconds()) + } + + writeHelp(&b, "upkeep_monitor_status_code", "gauge", "HTTP response status code of the last check.") + for _, s := range sites { + if s.Type != "http" { + continue + } + writeGauge(&b, "upkeep_monitor_status_code", labels(s), float64(s.StatusCode)) + } + + writeHelp(&b, "upkeep_monitor_check_timestamp_seconds", "gauge", "Unix timestamp of the last check.") + for _, s := range sites { + if s.LastCheck.IsZero() { + continue + } + writeGauge(&b, "upkeep_monitor_check_timestamp_seconds", labels(s), float64(s.LastCheck.Unix())) + } + + writeHelp(&b, "upkeep_monitor_paused", "gauge", "Whether the monitor is paused (1) or active (0).") + for _, s := range sites { + val := 0 + if s.Paused { + val = 1 + } + writeGauge(&b, "upkeep_monitor_paused", labels(s), float64(val)) + } + + writeHelp(&b, "upkeep_monitor_cert_expiry_timestamp_seconds", "gauge", "Unix timestamp when the SSL certificate expires.") + for _, s := range sites { + if !s.HasSSL || s.CertExpiry.IsZero() { + continue + } + writeGauge(&b, "upkeep_monitor_cert_expiry_timestamp_seconds", labels(s), float64(s.CertExpiry.Unix())) + } + + writeHelp(&b, "upkeep_monitor_checks_total", "counter", "Total number of checks performed.") + writeHelp(&b, "upkeep_monitor_checks_up_total", "counter", "Total number of successful checks.") + for _, s := range sites { + h, ok := eng.GetHistory(s.ID) + if !ok { + continue + } + writeGauge(&b, "upkeep_monitor_checks_total", labels(s), float64(h.TotalChecks)) + writeGauge(&b, "upkeep_monitor_checks_up_total", labels(s), float64(h.UpChecks)) + } + + writeHelp(&b, "upkeep_probe_up", "gauge", "Whether a probe node is online (1) or offline (0) based on last-seen time.") + for _, site := range sites { + probeResults := eng.GetProbeResults(site.ID) + for nodeID, result := range probeResults { + val := 0 + if result.IsUp { + val = 1 + } + nodeLabels := fmt.Sprintf(`id="%d",name="%s",node="%s"`, site.ID, escapeLabelValue(site.Name), escapeLabelValue(nodeID)) + writeGauge(&b, "upkeep_probe_up", nodeLabels, float64(val)) + } + } + + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + w.Write([]byte(b.String())) + } +} + +func labels(s models.Site) string { + return fmt.Sprintf(`id="%d",name="%s",type="%s"`, s.ID, escapeLabelValue(s.Name), s.Type) +} + +func escapeLabelValue(s string) string { + s = strings.ReplaceAll(s, `\`, `\\`) + s = strings.ReplaceAll(s, `"`, `\"`) + s = strings.ReplaceAll(s, "\n", `\n`) + return s +} + +func writeHelp(b *strings.Builder, name, typ, help string) { + fmt.Fprintf(b, "# HELP %s %s\n# TYPE %s %s\n", name, help, name, typ) +} + +func writeGauge(b *strings.Builder, name, labels string, val float64) { + fmt.Fprintf(b, "%s{%s} %g\n", name, labels, val) +} diff --git a/internal/metrics/prometheus_test.go b/internal/metrics/prometheus_test.go new file mode 100644 index 0000000..1e2b72b --- /dev/null +++ b/internal/metrics/prometheus_test.go @@ -0,0 +1,112 @@ +package metrics + +import ( + "context" + "go-upkeep/internal/models" + "go-upkeep/internal/monitor" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +type mockStore struct { + sites []models.Site +} + +func (m *mockStore) Init() error { return nil } +func (m *mockStore) GetSites() ([]models.Site, error) { return m.sites, nil } +func (m *mockStore) AddSite(models.Site) error { return nil } +func (m *mockStore) UpdateSite(models.Site) error { return nil } +func (m *mockStore) UpdateSitePaused(int, bool) error { return nil } +func (m *mockStore) DeleteSite(int) error { return nil } +func (m *mockStore) GetAllAlerts() ([]models.AlertConfig, error) { return nil, nil } +func (m *mockStore) GetAlert(int) (models.AlertConfig, error) { return models.AlertConfig{}, nil } +func (m *mockStore) AddAlert(string, string, map[string]string) error { return nil } +func (m *mockStore) UpdateAlert(int, string, string, map[string]string) error { return nil } +func (m *mockStore) DeleteAlert(int) error { return nil } +func (m *mockStore) GetAllUsers() ([]models.User, error) { return nil, nil } +func (m *mockStore) AddUser(string, string, string) error { return nil } +func (m *mockStore) UpdateUser(int, string, string, string) error { return nil } +func (m *mockStore) DeleteUser(int) error { return nil } +func (m *mockStore) SaveCheck(int, int64, bool) error { return nil } +func (m *mockStore) LoadAllHistory(int) (map[int][]models.CheckRecord, error) { + return nil, nil +} +func (m *mockStore) ExportData() (models.Backup, error) { return models.Backup{}, nil } +func (m *mockStore) ImportData(models.Backup) error { return nil } +func (m *mockStore) GetSiteByName(string) (models.Site, error) { return models.Site{}, nil } +func (m *mockStore) GetAlertByName(string) (models.AlertConfig, error) { + return models.AlertConfig{}, nil +} +func (m *mockStore) AddSiteReturningID(models.Site) (int, error) { return 0, nil } +func (m *mockStore) AddAlertReturningID(string, string, map[string]string) (int, error) { + return 0, nil +} +func (m *mockStore) SaveCheckFromNode(int, string, int64, bool) error { return nil } +func (m *mockStore) RegisterNode(models.ProbeNode) error { return nil } +func (m *mockStore) GetNode(string) (models.ProbeNode, error) { return models.ProbeNode{}, nil } +func (m *mockStore) GetAllNodes() ([]models.ProbeNode, error) { return nil, nil } +func (m *mockStore) UpdateNodeLastSeen(string) error { return nil } +func (m *mockStore) DeleteNode(string) error { return nil } +func (m *mockStore) SaveLog(string) error { return nil } +func (m *mockStore) LoadLogs(int) ([]string, error) { return nil, nil } + +func TestMetricsHandler(t *testing.T) { + ms := &mockStore{ + sites: []models.Site{ + {ID: 1, Name: "Example", URL: "https://example.com", Type: "http", Interval: 30}, + {ID: 2, Name: "DNS Check", Type: "dns", Interval: 60}, + }, + } + eng := monitor.NewEngine(ms) + ctx, cancel := context.WithCancel(context.Background()) + eng.Start(ctx) + time.Sleep(100 * time.Millisecond) + + rec := httptest.NewRecorder() + Handler(eng)(rec, httptest.NewRequest("GET", "/metrics", nil)) + cancel() + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rec.Code) + } + + body := rec.Body.String() + + ct := rec.Header().Get("Content-Type") + if !strings.Contains(ct, "text/plain") { + t.Errorf("expected text/plain content type, got %q", ct) + } + + expected := []string{ + "# HELP upkeep_monitor_up", + "# TYPE upkeep_monitor_up gauge", + `upkeep_monitor_up{id="1",name="Example",type="http"}`, + `upkeep_monitor_up{id="2",name="DNS Check",type="dns"}`, + "# HELP upkeep_monitor_latency_seconds", + "# HELP upkeep_monitor_paused", + "# HELP upkeep_monitor_checks_total", + } + for _, s := range expected { + if !strings.Contains(body, s) { + t.Errorf("missing expected line: %s", s) + } + } +} + +func TestEscapeLabelValue(t *testing.T) { + cases := []struct{ in, want string }{ + {`simple`, `simple`}, + {`has "quotes"`, `has \"quotes\"`}, + {"has\nnewline", `has\nnewline`}, + {`back\slash`, `back\\slash`}, + } + for _, tc := range cases { + got := escapeLabelValue(tc.in) + if got != tc.want { + t.Errorf("escapeLabelValue(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} diff --git a/internal/models/models.go b/internal/models/models.go index f79c9c5..14a97c3 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -24,6 +24,8 @@ type Site struct { DNSResolveType string DNSServer string IgnoreTLS bool + Paused bool + Regions string FailureCount int Status string @@ -49,7 +51,22 @@ type User struct { Role string } -// Phase 5: Backup Structure +type CheckRecord struct { + SiteID int + NodeID string + LatencyNs int64 + IsUp bool + CheckedAt time.Time +} + +type ProbeNode struct { + ID string + Name string + Region string + LastSeen time.Time + Version string +} + type Backup struct { Sites []Site `json:"sites"` Alerts []AlertConfig `json:"alerts"` diff --git a/internal/monitor/aggregator.go b/internal/monitor/aggregator.go new file mode 100644 index 0000000..88054c8 --- /dev/null +++ b/internal/monitor/aggregator.go @@ -0,0 +1,44 @@ +package monitor + +import "time" + +type AggregationStrategy string + +const ( + AggAnyDown AggregationStrategy = "any-down" + AggMajorityDown AggregationStrategy = "majority-down" + AggAllDown AggregationStrategy = "all-down" +) + +type NodeResult struct { + NodeID string + IsUp bool + LatencyNs int64 + CheckedAt time.Time +} + +func AggregateStatus(results []NodeResult, strategy AggregationStrategy) (isUp bool, avgLatencyNs int64) { + if len(results) == 0 { + return true, 0 + } + + upCount := 0 + var totalLatency int64 + for _, r := range results { + if r.IsUp { + upCount++ + } + totalLatency += r.LatencyNs + } + avgLatencyNs = totalLatency / int64(len(results)) + + switch strategy { + case AggMajorityDown: + isUp = upCount > len(results)/2 + case AggAllDown: + isUp = upCount > 0 + default: + isUp = upCount == len(results) + } + return +} diff --git a/internal/monitor/checker.go b/internal/monitor/checker.go new file mode 100644 index 0000000..be62155 --- /dev/null +++ b/internal/monitor/checker.go @@ -0,0 +1,218 @@ +package monitor + +import ( + "context" + "go-upkeep/internal/models" + "net" + "net/http" + "strconv" + "strings" + "time" + + "github.com/miekg/dns" + probing "github.com/prometheus-community/pro-bing" +) + +type CheckResult struct { + SiteID int + Status string // "UP", "DOWN", "SSL EXP" + StatusCode int + LatencyNs int64 + HasSSL bool + CertExpiry time.Time +} + +func RunCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + switch site.Type { + case "http": + return runHTTPCheck(site, strict, insecure, globalInsecure) + case "ping": + return runPingCheck(site) + case "port": + return runPortCheck(site) + case "dns": + return runDNSCheck(site) + default: + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } +} + +func runHTTPCheck(site models.Site, strict, insecure *http.Client, globalInsecure bool) CheckResult { + method := site.Method + if method == "" { + method = "GET" + } + + timeout := siteTimeout(site) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, method, site.URL, nil) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + + client := strict + if globalInsecure || site.IgnoreTLS { + client = insecure + } + + start := time.Now() + resp, err := client.Do(req) + latency := time.Since(start) + + result := CheckResult{ + SiteID: site.ID, + Status: "UP", + LatencyNs: latency.Nanoseconds(), + } + + if err != nil { + result.Status = "DOWN" + return result + } + defer resp.Body.Close() + + result.StatusCode = resp.StatusCode + if !isCodeAccepted(resp.StatusCode, site.AcceptedCodes) { + result.Status = "DOWN" + } + + if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { + result.HasSSL = true + cert := resp.TLS.PeerCertificates[0] + result.CertExpiry = cert.NotAfter + if time.Now().After(cert.NotAfter) { + result.Status = "SSL EXP" + } + } + + return result +} + +func runPingCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + pinger, err := probing.NewPinger(host) + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN"} + } + pinger.Count = 1 + pinger.Timeout = siteTimeout(site) + pinger.SetPrivileged(false) + + start := time.Now() + err = pinger.Run() + latency := time.Since(start) + + if err != nil || pinger.Statistics().PacketsRecv == 0 { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + + stats := pinger.Statistics() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: stats.AvgRtt.Nanoseconds()} +} + +func runPortCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) + timeout := siteTimeout(site) + + start := time.Now() + conn, err := net.DialTimeout("tcp", addr, timeout) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + conn.Close() + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func runDNSCheck(site models.Site) CheckResult { + host := site.Hostname + if host == "" { + host = site.URL + } + + server := site.DNSServer + if server == "" { + server = "1.1.1.1" + } + if _, _, err := net.SplitHostPort(server); err != nil { + server = net.JoinHostPort(server, "53") + } + + qtype := dns.TypeA + switch site.DNSResolveType { + case "AAAA": + qtype = dns.TypeAAAA + case "MX": + qtype = dns.TypeMX + case "CNAME": + qtype = dns.TypeCNAME + case "TXT": + qtype = dns.TypeTXT + case "NS": + qtype = dns.TypeNS + case "SOA": + qtype = dns.TypeSOA + case "SRV": + qtype = dns.TypeSRV + case "PTR": + qtype = dns.TypePTR + } + + m := new(dns.Msg) + m.SetQuestion(dns.Fqdn(host), qtype) + + c := new(dns.Client) + c.Timeout = siteTimeout(site) + + start := time.Now() + r, _, err := c.Exchange(m, server) + latency := time.Since(start) + + if err != nil { + return CheckResult{SiteID: site.ID, Status: "DOWN", LatencyNs: latency.Nanoseconds()} + } + if r.Rcode != dns.RcodeSuccess { + return CheckResult{SiteID: site.ID, Status: "DOWN", StatusCode: r.Rcode, LatencyNs: latency.Nanoseconds()} + } + return CheckResult{SiteID: site.ID, Status: "UP", LatencyNs: latency.Nanoseconds()} +} + +func siteTimeout(site models.Site) time.Duration { + if site.Timeout > 0 { + return time.Duration(site.Timeout) * time.Second + } + return 5 * time.Second +} + +func isCodeAccepted(code int, accepted string) bool { + if accepted == "" { + return code >= 200 && code < 300 + } + for _, part := range strings.Split(accepted, ",") { + part = strings.TrimSpace(part) + if strings.Contains(part, "-") { + bounds := strings.SplitN(part, "-", 2) + lo, err1 := strconv.Atoi(strings.TrimSpace(bounds[0])) + hi, err2 := strconv.Atoi(strings.TrimSpace(bounds[1])) + if err1 == nil && err2 == nil && code >= lo && code <= hi { + return true + } + } else { + if v, err := strconv.Atoi(part); err == nil && code == v { + return true + } + } + } + return false +} diff --git a/internal/monitor/history.go b/internal/monitor/history.go index c6fbcbe..69a7f4d 100644 --- a/internal/monitor/history.go +++ b/internal/monitor/history.go @@ -1,11 +1,8 @@ package monitor -import ( - "sync" - "time" -) +import "time" -const maxHistoryLen = 30 +const maxHistoryLen = 60 type SiteHistory struct { Latencies []time.Duration @@ -14,19 +11,39 @@ type SiteHistory struct { UpChecks int } -var ( - histories = make(map[int]*SiteHistory) - historyMu sync.RWMutex -) +func (e *Engine) InitHistory() { + all, err := e.db.LoadAllHistory(maxHistoryLen) + if err != nil { + e.AddLog("Failed to load check history: " + err.Error()) + return + } + e.histMu.Lock() + defer e.histMu.Unlock() + for siteID, records := range all { + h := &SiteHistory{} + for _, r := range records { + h.TotalChecks++ + if r.IsUp { + h.UpChecks++ + } + h.Latencies = append(h.Latencies, time.Duration(r.LatencyNs)) + h.Statuses = append(h.Statuses, r.IsUp) + } + e.histories[siteID] = h + } + if len(all) > 0 { + e.AddLog("Loaded check history from database") + } +} -func RecordCheck(siteID int, latency time.Duration, isUp bool) { - historyMu.Lock() - defer historyMu.Unlock() +func (e *Engine) recordCheck(siteID int, latency time.Duration, isUp bool) { + e.histMu.Lock() + defer e.histMu.Unlock() - h, ok := histories[siteID] + h, ok := e.histories[siteID] if !ok { h = &SiteHistory{} - histories[siteID] = h + e.histories[siteID] = h } h.TotalChecks++ @@ -43,12 +60,14 @@ func RecordCheck(siteID int, latency time.Duration, isUp bool) { if len(h.Statuses) > maxHistoryLen { h.Statuses = h.Statuses[len(h.Statuses)-maxHistoryLen:] } + + go func() { _ = e.db.SaveCheck(siteID, latency.Nanoseconds(), isUp) }() } -func GetHistory(siteID int) (SiteHistory, bool) { - historyMu.RLock() - defer historyMu.RUnlock() - h, ok := histories[siteID] +func (e *Engine) GetHistory(siteID int) (SiteHistory, bool) { + e.histMu.RLock() + defer e.histMu.RUnlock() + h, ok := e.histories[siteID] if !ok { return SiteHistory{}, false } @@ -63,8 +82,8 @@ func GetHistory(siteID int) (SiteHistory, bool) { return cp, true } -func RemoveHistory(siteID int) { - historyMu.Lock() - defer historyMu.Unlock() - delete(histories, siteID) +func (e *Engine) removeHistory(siteID int) { + e.histMu.Lock() + defer e.histMu.Unlock() + delete(e.histories, siteID) } diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 791aac1..07a0f41 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -1,285 +1,367 @@ package monitor import ( + "context" "crypto/tls" "fmt" "go-upkeep/internal/alert" "go-upkeep/internal/models" "go-upkeep/internal/store" - "net" "net/http" - "strconv" "sync" "time" - - "github.com/miekg/dns" - probing "github.com/prometheus-community/pro-bing" ) -// --- LOGGING --- -var ( - LogStore []string - LogMutex sync.RWMutex -) +type Engine struct { + mu sync.RWMutex + liveState map[int]models.Site -func AddLog(msg string) { - LogMutex.Lock() - defer LogMutex.Unlock() - ts := time.Now().Format("15:04:05") - entry := fmt.Sprintf("[%s] %s", ts, msg) - LogStore = append([]string{entry}, LogStore...) - if len(LogStore) > 100 { - LogStore = LogStore[:100] + logMu sync.RWMutex + logStore []string + + activeMu sync.RWMutex + isActive bool + + histMu sync.RWMutex + histories map[int]*SiteHistory + + tokenIndex map[string]int + + probeResultsMu sync.RWMutex + probeResults map[int]map[string]NodeResult + aggStrategy AggregationStrategy + + db store.Store + insecureSkipVerify bool + strictClient *http.Client + insecureClient *http.Client +} + +func NewEngine(s store.Store) *Engine { + return &Engine{ + liveState: make(map[int]models.Site), + histories: make(map[int]*SiteHistory), + tokenIndex: make(map[string]int), + probeResults: make(map[int]map[string]NodeResult), + aggStrategy: AggAnyDown, + isActive: true, + db: s, + strictClient: &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, + }, + insecureClient: &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + }, } } -func GetLogs() []string { - LogMutex.RLock() - defer LogMutex.RUnlock() - logs := make([]string, len(LogStore)) - copy(logs, LogStore) +func (e *Engine) SetInsecureSkipVerify(skip bool) { + e.insecureSkipVerify = skip +} + +func (e *Engine) AddLog(msg string) { + e.logMu.Lock() + defer e.logMu.Unlock() + ts := time.Now().Format("15:04:05") + entry := fmt.Sprintf("[%s] %s", ts, msg) + e.logStore = append([]string{entry}, e.logStore...) + if len(e.logStore) > 100 { + e.logStore = e.logStore[:100] + } + go func() { _ = e.db.SaveLog(entry) }() +} + +func (e *Engine) InitLogs() { + logs, err := e.db.LoadLogs(100) + if err != nil { + return + } + if len(logs) == 0 { + return + } + e.logMu.Lock() + defer e.logMu.Unlock() + e.logStore = logs +} + +func (e *Engine) GetLogs() []string { + e.logMu.RLock() + defer e.logMu.RUnlock() + logs := make([]string, len(e.logStore)) + copy(logs, e.logStore) return logs } -// --- ENGINE --- - -var ( - LiveState = make(map[int]models.Site) - Mutex sync.RWMutex - - // Global Switch for HA - isActive = true - activeMutex sync.RWMutex - - insecureSkipVerify bool -) - -func SetInsecureSkipVerify(skip bool) { - insecureSkipVerify = skip -} - -func SetEngineActive(active bool) { - activeMutex.Lock() - defer activeMutex.Unlock() - if isActive != active { - isActive = active +func (e *Engine) SetActive(active bool) { + e.activeMu.Lock() + defer e.activeMu.Unlock() + if e.isActive != active { + e.isActive = active status := "RESUMED (Active)" if !active { status = "PAUSED (Passive)" } - AddLog(fmt.Sprintf("Engine %s", status)) + e.AddLog(fmt.Sprintf("Engine %s", status)) } } -func IsEngineActive() bool { - activeMutex.RLock() - defer activeMutex.RUnlock() - return isActive +func (e *Engine) IsActive() bool { + e.activeMu.RLock() + defer e.activeMu.RUnlock() + return e.isActive } -func RecordHeartbeat(token string) bool { - if !IsEngineActive() { - return false - } // Only Leader accepts Push - - Mutex.Lock() - defer Mutex.Unlock() - var targetID int = -1 - for id, s := range LiveState { - if s.Type == "push" && s.Token == token { - targetID = id - break - } +func (e *Engine) GetAllSites() []models.Site { + e.mu.RLock() + defer e.mu.RUnlock() + sites := make([]models.Site, 0, len(e.liveState)) + for _, s := range e.liveState { + sites = append(sites, s) } - if targetID == -1 { + return sites +} + +func (e *Engine) GetLiveState() map[int]models.Site { + e.mu.RLock() + defer e.mu.RUnlock() + cp := make(map[int]models.Site, len(e.liveState)) + for k, v := range e.liveState { + cp[k] = v + } + return cp +} + +func (e *Engine) RecordHeartbeat(token string) bool { + if !e.IsActive() { + return false + } + + e.mu.Lock() + defer e.mu.Unlock() + + targetID, ok := e.tokenIndex[token] + if !ok { + return false + } + + site, exists := e.liveState[targetID] + if !exists { return false } - site := LiveState[targetID] site.LastCheck = time.Now() wasDown := site.Status == "DOWN" site.Status = "UP" site.FailureCount = 0 site.Latency = 0 - LiveState[targetID] = site + e.liveState[targetID] = site if wasDown { - AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name)) - triggerAlert(site.AlertID, "โ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name)) + e.AddLog(fmt.Sprintf("Push Monitor '%s' recovered", site.Name)) + e.triggerAlert(site.AlertID, "โ RECOVERY", fmt.Sprintf("Push Monitor '%s' is receiving heartbeats.", site.Name)) } return true } -func StartEngine() { +func (e *Engine) addToTokenIndex(site models.Site) { + if site.Type == "push" && site.Token != "" { + e.tokenIndex[site.Token] = site.ID + } +} + +func (e *Engine) removeFromTokenIndex(id int) { + for token, sid := range e.tokenIndex { + if sid == id { + delete(e.tokenIndex, token) + return + } + } +} + +func (e *Engine) Start(ctx context.Context) { go func() { for { - s_instance := store.Get() - if s_instance == nil { - time.Sleep(1 * time.Second) - continue + select { + case <-ctx.Done(): + return + default: } - sites := s_instance.GetSites() + sites, err := e.db.GetSites() + if err != nil { + e.AddLog(fmt.Sprintf("Failed to load sites: %v", err)) + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + continue + } for _, s := range sites { - Mutex.RLock() - _, exists := LiveState[s.ID] - Mutex.RUnlock() + e.mu.RLock() + _, exists := e.liveState[s.ID] + e.mu.RUnlock() if !exists { - Mutex.Lock() + e.mu.Lock() s.Status = "PENDING" if s.Type == "push" { s.LastCheck = time.Now() } - LiveState[s.ID] = s - Mutex.Unlock() - go monitorRoutine(s.ID) + if h, ok := e.GetHistory(s.ID); ok && len(h.Statuses) > 0 { + if h.Statuses[len(h.Statuses)-1] { + s.Status = "UP" + } else { + s.Status = "DOWN" + } + if len(h.Latencies) > 0 { + s.Latency = h.Latencies[len(h.Latencies)-1] + } + } + e.liveState[s.ID] = s + e.addToTokenIndex(s) + e.mu.Unlock() + go e.monitorRoutine(ctx, s.ID) } } - time.Sleep(5 * time.Second) + + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } } }() } -func UpdateSiteConfig(site models.Site) { - Mutex.Lock() - defer Mutex.Unlock() - if s, ok := LiveState[site.ID]; ok { - s.Name = site.Name - s.URL = site.URL - s.Type = site.Type - s.Interval = site.Interval - s.AlertID = site.AlertID - s.CheckSSL = site.CheckSSL - s.ExpiryThreshold = site.ExpiryThreshold - s.MaxRetries = site.MaxRetries - s.Hostname = site.Hostname - s.Port = site.Port - s.Timeout = site.Timeout - s.Method = site.Method - s.Description = site.Description - s.ParentID = site.ParentID - s.AcceptedCodes = site.AcceptedCodes - s.DNSResolveType = site.DNSResolveType - s.DNSServer = site.DNSServer - s.IgnoreTLS = site.IgnoreTLS - LiveState[site.ID] = s +func (e *Engine) UpdateSiteConfig(site models.Site) { + e.mu.Lock() + defer e.mu.Unlock() + if existing, ok := e.liveState[site.ID]; ok { + e.removeFromTokenIndex(site.ID) + site.Status = existing.Status + site.StatusCode = existing.StatusCode + site.Latency = existing.Latency + site.CertExpiry = existing.CertExpiry + site.HasSSL = existing.HasSSL + site.LastCheck = existing.LastCheck + site.SentSSLWarning = existing.SentSSLWarning + site.FailureCount = existing.FailureCount + e.liveState[site.ID] = site + e.addToTokenIndex(site) } } -func RemoveSite(id int) { - Mutex.Lock() - delete(LiveState, id) - Mutex.Unlock() - RemoveHistory(id) +func (e *Engine) RemoveSite(id int) { + e.mu.Lock() + e.removeFromTokenIndex(id) + delete(e.liveState, id) + e.mu.Unlock() + e.removeHistory(id) } -func monitorRoutine(id int) { - checkByID(id) +func (e *Engine) ToggleSitePause(id int) bool { + e.mu.Lock() + defer e.mu.Unlock() + site, ok := e.liveState[id] + if !ok { + return false + } + site.Paused = !site.Paused + e.liveState[id] = site + if site.Paused { + e.AddLog(fmt.Sprintf("Monitor '%s' paused", site.Name)) + } else { + e.AddLog(fmt.Sprintf("Monitor '%s' resumed", site.Name)) + } + return site.Paused +} + +func (e *Engine) monitorRoutine(ctx context.Context, id int) { + e.checkByID(id) for { - // If paused, just sleep loop to keep goroutine alive but idle - if !IsEngineActive() { - time.Sleep(5 * time.Second) + select { + case <-ctx.Done(): + return + default: + } + + if !e.IsActive() { + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } continue } - Mutex.RLock() - site, exists := LiveState[id] - Mutex.RUnlock() + e.mu.RLock() + site, exists := e.liveState[id] + e.mu.RUnlock() if !exists { return } + if site.Paused { + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + continue + } + interval := site.Interval if interval < 5 { interval = 5 } - time.Sleep(time.Duration(interval) * time.Second) - checkByID(id) + select { + case <-time.After(time.Duration(interval) * time.Second): + case <-ctx.Done(): + return + } + e.checkByID(id) } } -func checkByID(id int) { - if !IsEngineActive() { +func (e *Engine) checkByID(id int) { + if !e.IsActive() { return } - Mutex.RLock() - site, exists := LiveState[id] - Mutex.RUnlock() - if !exists { + e.mu.RLock() + site, exists := e.liveState[id] + e.mu.RUnlock() + if !exists || site.Paused { return } + switch site.Type { - case "http": - checkHTTP(site) case "push": - checkPush(site) - case "ping": - checkPing(site) - case "port": - checkPort(site) - case "dns": - checkDNS(site) + e.checkPush(site) case "group": - // groups don't perform checks + e.checkGroup(site) + default: + result := RunCheck(site, e.strictClient, e.insecureClient, e.insecureSkipVerify) + updatedSite := site + updatedSite.HasSSL = result.HasSSL + updatedSite.CertExpiry = result.CertExpiry + updatedSite.Latency = time.Duration(result.LatencyNs) + updatedSite.LastCheck = time.Now() + e.handleStatusChange(updatedSite, result.Status, result.StatusCode, time.Duration(result.LatencyNs)) } } -func checkPush(site models.Site) { +func (e *Engine) checkPush(site models.Site) { deadline := site.LastCheck.Add(time.Duration(site.Interval) * time.Second).Add(5 * time.Second) if time.Now().After(deadline) { - handleStatusChange(site, "DOWN", 0, 0) - } else { - if site.Status != "UP" { - handleStatusChange(site, "UP", 200, 0) - } + e.handleStatusChange(site, "DOWN", 0, 0) + } else if site.Status != "UP" { + e.handleStatusChange(site, "UP", 200, 0) } } -func checkHTTP(site models.Site) { - start := time.Now() - timeout := time.Duration(site.Timeout) * time.Second - if timeout <= 0 { - timeout = 5 * time.Second - } - skipTLS := insecureSkipVerify || site.IgnoreTLS - client := &http.Client{Timeout: timeout, Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLS}}} - resp, err := client.Get(site.URL) - latency := time.Since(start) - - rawStatus := "UP" - rawCode := 0 - var certExpiry time.Time - hasSSL := false - - if err != nil { - rawStatus = "DOWN" - } else { - defer resp.Body.Close() - rawCode = resp.StatusCode - if resp.StatusCode >= 400 { - rawStatus = "DOWN" - } - if site.CheckSSL && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 { - hasSSL = true - cert := resp.TLS.PeerCertificates[0] - certExpiry = cert.NotAfter - if time.Now().After(cert.NotAfter) { - rawStatus = "SSL EXP" - } - } - } - updatedSite := site - updatedSite.HasSSL = hasSSL - updatedSite.CertExpiry = certExpiry - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - handleStatusChange(updatedSite, rawStatus, rawCode, latency) -} - -func handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) { - // Double check we are still leader before alerting - if !IsEngineActive() { +func (e *Engine) handleStatusChange(site models.Site, rawStatus string, code int, latency time.Duration) { + if !e.IsActive() { return } @@ -291,9 +373,9 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti if newState.FailureCount > site.MaxRetries { newState.Status = rawStatus newState.FailureCount = site.MaxRetries + 1 - AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name)) + e.AddLog(fmt.Sprintf("Monitor '%s' confirmed DOWN", site.Name)) } else { - AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries)) + e.AddLog(fmt.Sprintf("Monitor '%s' failed check %d/%d", site.Name, newState.FailureCount, site.MaxRetries)) } } else if rawStatus == "UP" { newState.FailureCount = 0 @@ -306,20 +388,20 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti if site.Type == "http" && site.CheckSSL && site.HasSSL { daysLeft := int(time.Until(site.CertExpiry).Hours() / 24) if daysLeft <= site.ExpiryThreshold && !site.SentSSLWarning && rawStatus != "SSL EXP" { - triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft)) + e.triggerAlert(site.AlertID, "SSL WARNING", fmt.Sprintf("SSL for '%s' expires in %d days", site.Name, daysLeft)) newState.SentSSLWarning = true } else if daysLeft > site.ExpiryThreshold { newState.SentSSLWarning = false } } - Mutex.Lock() - if _, ok := LiveState[site.ID]; ok { - LiveState[site.ID] = newState + e.mu.Lock() + if _, ok := e.liveState[site.ID]; ok { + e.liveState[site.ID] = newState } - Mutex.Unlock() + e.mu.Unlock() - RecordCheck(site.ID, latency, rawStatus == "UP") + e.recordCheck(site.ID, latency, rawStatus == "UP") isBroken := func(s string) bool { return s == "DOWN" || s == "SSL EXP" } if !isBroken(site.Status) && isBroken(newState.Status) && newState.Status != "PENDING" { @@ -327,152 +409,115 @@ func handleStatusChange(site models.Site, rawStatus string, code int, latency ti if site.Type == "push" { msg = fmt.Sprintf("Push Monitor '%s' missed heartbeat.", site.Name) } - triggerAlert(site.AlertID, "๐จ ALERT", msg) + e.triggerAlert(site.AlertID, "๐จ ALERT", msg) } if isBroken(site.Status) && newState.Status == "UP" { - triggerAlert(site.AlertID, "โ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name)) + e.triggerAlert(site.AlertID, "โ RECOVERY", fmt.Sprintf("Monitor '%s' is UP", site.Name)) } } -func triggerAlert(alertID int, title, message string) { - s_instance := store.Get() - if s_instance == nil { - return - } - cfg, ok := s_instance.GetAlert(alertID) - if !ok { +func (e *Engine) triggerAlert(alertID int, title, message string) { + cfg, err := e.db.GetAlert(alertID) + if err != nil { return } provider := alert.GetProvider(cfg) if provider != nil { - go func() { provider.Send(title, message) }() + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = ctx + _ = provider.Send(title, message) + }() } } -func siteTimeout(site models.Site) time.Duration { - if site.Timeout > 0 { - return time.Duration(site.Timeout) * time.Second +func (e *Engine) checkGroup(site models.Site) { + e.mu.RLock() + status := "UP" + hasChildren := false + allPaused := true + for _, child := range e.liveState { + if child.ParentID != site.ID || child.Type == "group" { + continue + } + hasChildren = true + if !child.Paused { + allPaused = false + } + if child.Paused { + continue + } + if child.Status == "DOWN" || child.Status == "SSL EXP" { + status = "DOWN" + } else if child.Status == "PENDING" && status != "DOWN" { + status = "PENDING" + } } - return 5 * time.Second + e.mu.RUnlock() + + if !hasChildren { + status = "PENDING" + } + + e.mu.Lock() + s := e.liveState[site.ID] + s.Status = status + if hasChildren && allPaused { + s.Paused = true + } + e.liveState[site.ID] = s + e.mu.Unlock() } -func checkPing(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL +func (e *Engine) SetAggStrategy(strategy AggregationStrategy) { + e.aggStrategy = strategy +} + +func (e *Engine) IngestProbeResult(nodeID string, siteID int, latencyNs int64, isUp bool) { + e.probeResultsMu.Lock() + if e.probeResults[siteID] == nil { + e.probeResults[siteID] = make(map[string]NodeResult) } - - pinger, err := probing.NewPinger(host) - if err != nil { - handleStatusChange(site, "DOWN", 0, 0) - AddLog(fmt.Sprintf("Ping '%s' resolve failed: %v", site.Name, err)) - return + e.probeResults[siteID][nodeID] = NodeResult{ + NodeID: nodeID, + IsUp: isUp, + LatencyNs: latencyNs, + CheckedAt: time.Now(), } - pinger.Count = 1 - pinger.Timeout = siteTimeout(site) - pinger.SetPrivileged(false) + results := make([]NodeResult, 0, len(e.probeResults[siteID])) + for _, r := range e.probeResults[siteID] { + results = append(results, r) + } + e.probeResultsMu.Unlock() - start := time.Now() - err = pinger.Run() - latency := time.Since(start) + aggUp, avgLatency := AggregateStatus(results, e.aggStrategy) - if err != nil || pinger.Statistics().PacketsRecv == 0 { - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - handleStatusChange(updatedSite, "DOWN", 0, latency) + e.mu.RLock() + site, exists := e.liveState[siteID] + e.mu.RUnlock() + if !exists { return } - stats := pinger.Statistics() - updatedSite := site - updatedSite.Latency = stats.AvgRtt - updatedSite.LastCheck = time.Now() - handleStatusChange(updatedSite, "UP", 0, stats.AvgRtt) -} - -func checkPort(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL + rawStatus := "UP" + if !aggUp { + rawStatus = "DOWN" } - addr := net.JoinHostPort(host, strconv.Itoa(site.Port)) - timeout := siteTimeout(site) - - start := time.Now() - conn, err := net.DialTimeout("tcp", addr, timeout) - latency := time.Since(start) updatedSite := site - updatedSite.Latency = latency + updatedSite.Latency = time.Duration(avgLatency) updatedSite.LastCheck = time.Now() - - if err != nil { - handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - conn.Close() - handleStatusChange(updatedSite, "UP", 0, latency) + e.handleStatusChange(updatedSite, rawStatus, 0, time.Duration(avgLatency)) } -func checkDNS(site models.Site) { - host := site.Hostname - if host == "" { - host = site.URL +func (e *Engine) GetProbeResults(siteID int) map[string]NodeResult { + e.probeResultsMu.RLock() + defer e.probeResultsMu.RUnlock() + src := e.probeResults[siteID] + cp := make(map[string]NodeResult, len(src)) + for k, v := range src { + cp[k] = v } - - server := site.DNSServer - if server == "" { - server = "1.1.1.1" - } - if _, _, err := net.SplitHostPort(server); err != nil { - server = net.JoinHostPort(server, "53") - } - - qtype := dns.TypeA - switch site.DNSResolveType { - case "AAAA": - qtype = dns.TypeAAAA - case "MX": - qtype = dns.TypeMX - case "CNAME": - qtype = dns.TypeCNAME - case "TXT": - qtype = dns.TypeTXT - case "NS": - qtype = dns.TypeNS - case "SOA": - qtype = dns.TypeSOA - case "SRV": - qtype = dns.TypeSRV - case "PTR": - qtype = dns.TypePTR - } - - m := new(dns.Msg) - m.SetQuestion(dns.Fqdn(host), qtype) - - c := new(dns.Client) - c.Timeout = siteTimeout(site) - - start := time.Now() - r, rtt, err := c.Exchange(m, server) - _ = rtt - latency := time.Since(start) - - updatedSite := site - updatedSite.Latency = latency - updatedSite.LastCheck = time.Now() - - if err != nil { - handleStatusChange(updatedSite, "DOWN", 0, latency) - return - } - - if r.Rcode != dns.RcodeSuccess { - handleStatusChange(updatedSite, "DOWN", r.Rcode, latency) - return - } - - handleStatusChange(updatedSite, "UP", 0, latency) + return cp } diff --git a/internal/server/server.go b/internal/server/server.go index 6e857b4..49f21a2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -4,14 +4,145 @@ import ( "encoding/json" "fmt" "go-upkeep/internal/importer" + "go-upkeep/internal/metrics" "go-upkeep/internal/models" "go-upkeep/internal/monitor" "go-upkeep/internal/store" "html/template" + "log" "net/http" "sort" + "strings" ) +var statusTpl = template.Must(template.New("status").Parse(` + + +
+