package delayqueue
import (
"container/heap"
"sync"
"time"
)
type Job struct {
ID string
Payload interface{}
RunAt time.Time
Priority int
}
type jobHeap []*Job
func (h jobHeap) Len() int { return len(h) }
func (h jobHeap) Less(i, j int) bool { return h[i].RunAt.Before(h[j].RunAt) }
func (h jobHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *jobHeap) Push(x interface{}) {
*h = append(*h, x.(*Job))
}
func (h *jobHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type DelayQueue struct {
mu sync.Mutex
cond *sync.Cond
jobs jobHeap
closed bool
timer *time.Timer
}
func New() *DelayQueue {
dq := &DelayQueue{
jobs: make(jobHeap, 0),
}
dq.cond = sync.NewCond(&dq.mu)
heap.Init(&dq.jobs)
return dq
}
func (dq *DelayQueue) Push(id string, payload interface{}, delay time.Duration) {
dq.mu.Lock()
defer dq.mu.Unlock()
job := &Job{
ID: id,
Payload: payload,
RunAt: time.Now().Add(delay),
}
heap.Push(&dq.jobs, job)
dq.cond.Signal() // Wake up waiting consumer
}
func (dq *DelayQueue) Pop() (*Job, bool) {
dq.mu.Lock()
defer dq.mu.Unlock()
for {
if dq.closed {
return nil, false
}
if len(dq.jobs) == 0 {
dq.cond.Wait()
continue
}
next := dq.jobs[0]
now := time.Now()
if next.RunAt.After(now) {
// Wait until job is ready
waitTime := next.RunAt.Sub(now)
// Use timer with condition variable
go func() {
time.Sleep(waitTime)
dq.cond.Signal()
}()
dq.cond.Wait()
continue
}
// Job is ready
job := heap.Pop(&dq.jobs).(*Job)
return job, true
}
}
func (dq *DelayQueue) Close() {
dq.mu.Lock()
defer dq.mu.Unlock()
dq.closed = true
dq.cond.Broadcast()
}
func (dq *DelayQueue) Len() int {
dq.mu.Lock()
defer dq.mu.Unlock()
return len(dq.jobs)
}