fix(monitor): trigger immediate recheck after site config edit
Monitor goroutine slept for the full check interval after a config edit, so hostname/URL changes wouldn't take effect until the next scheduled check. Added per-site recheck channel that wakes the goroutine immediately when UpdateSiteConfig is called.
This commit is contained in:
@@ -53,6 +53,9 @@ type Engine struct {
|
|||||||
alertHealthMu sync.RWMutex
|
alertHealthMu sync.RWMutex
|
||||||
alertHealth map[int]AlertHealth
|
alertHealth map[int]AlertHealth
|
||||||
|
|
||||||
|
recheckMu sync.RWMutex
|
||||||
|
recheck map[int]chan struct{}
|
||||||
|
|
||||||
db store.Store
|
db store.Store
|
||||||
insecureSkipVerify bool
|
insecureSkipVerify bool
|
||||||
allowPrivateTargets bool
|
allowPrivateTargets bool
|
||||||
@@ -74,6 +77,7 @@ func newEngine(s store.Store, allowPrivateTargets bool) *Engine {
|
|||||||
liveState: make(map[int]models.Site),
|
liveState: make(map[int]models.Site),
|
||||||
histories: make(map[int]*SiteHistory),
|
histories: make(map[int]*SiteHistory),
|
||||||
tokenIndex: make(map[string]int),
|
tokenIndex: make(map[string]int),
|
||||||
|
recheck: make(map[int]chan struct{}),
|
||||||
probeResults: make(map[int]map[string]NodeResult),
|
probeResults: make(map[int]map[string]NodeResult),
|
||||||
alertHealth: make(map[int]AlertHealth),
|
alertHealth: make(map[int]AlertHealth),
|
||||||
aggStrategy: AggAnyDown,
|
aggStrategy: AggAnyDown,
|
||||||
@@ -335,7 +339,6 @@ func (e *Engine) Start(ctx context.Context) {
|
|||||||
|
|
||||||
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
func (e *Engine) UpdateSiteConfig(site models.Site) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
|
||||||
if existing, ok := e.liveState[site.ID]; ok {
|
if existing, ok := e.liveState[site.ID]; ok {
|
||||||
e.removeFromTokenIndex(site.ID)
|
e.removeFromTokenIndex(site.ID)
|
||||||
site.Status = existing.Status
|
site.Status = existing.Status
|
||||||
@@ -352,6 +355,28 @@ func (e *Engine) UpdateSiteConfig(site models.Site) {
|
|||||||
e.liveState[site.ID] = site
|
e.liveState[site.ID] = site
|
||||||
e.addToTokenIndex(site)
|
e.addToTokenIndex(site)
|
||||||
}
|
}
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
e.signalRecheck(site.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) getRecheckChan(id int) chan struct{} {
|
||||||
|
e.recheckMu.Lock()
|
||||||
|
defer e.recheckMu.Unlock()
|
||||||
|
ch, ok := e.recheck[id]
|
||||||
|
if !ok {
|
||||||
|
ch = make(chan struct{}, 1)
|
||||||
|
e.recheck[id] = ch
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) signalRecheck(id int) {
|
||||||
|
ch := e.getRecheckChan(id)
|
||||||
|
select {
|
||||||
|
case ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) RemoveSite(id int) {
|
func (e *Engine) RemoveSite(id int) {
|
||||||
@@ -360,6 +385,10 @@ func (e *Engine) RemoveSite(id int) {
|
|||||||
delete(e.liveState, id)
|
delete(e.liveState, id)
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
e.removeHistory(id)
|
e.removeHistory(id)
|
||||||
|
|
||||||
|
e.recheckMu.Lock()
|
||||||
|
delete(e.recheck, id)
|
||||||
|
e.recheckMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) ToggleSitePause(id int) bool {
|
func (e *Engine) ToggleSitePause(id int) bool {
|
||||||
@@ -380,6 +409,8 @@ func (e *Engine) ToggleSitePause(id int) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
||||||
|
recheckCh := e.getRecheckChan(id)
|
||||||
|
|
||||||
// Stagger initial check to avoid thundering herd on startup
|
// Stagger initial check to avoid thundering herd on startup
|
||||||
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
|
stagger := time.Duration(rand.IntN(3000)) * time.Millisecond //nolint:gosec // non-security jitter
|
||||||
select {
|
select {
|
||||||
@@ -401,6 +432,7 @@ func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|||||||
case <-time.After(pollInterval):
|
case <-time.After(pollInterval):
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-recheckCh:
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -417,6 +449,7 @@ func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|||||||
case <-time.After(pollInterval):
|
case <-time.After(pollInterval):
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-recheckCh:
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -430,6 +463,7 @@ func (e *Engine) monitorRoutine(ctx context.Context, id int) {
|
|||||||
case <-time.After(time.Duration(interval)*time.Second + jitter):
|
case <-time.After(time.Duration(interval)*time.Second + jitter):
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-recheckCh:
|
||||||
}
|
}
|
||||||
e.checkByID(id)
|
e.checkByID(id)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user