
client-go 源码分析(8) - workerqueue之延时队列DelayingQueue



type DelayingInterface interface {
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)

type delayingType struct {
Interface // 实例化延迟队列的同时实例化了普通队列

// clock tracks time for delayed firing
clock clock.Clock

// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker

// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics



type waitFor struct {
data t // 准备添加到队列中的数据
readyAt time.Time // 应该被加入队列的时间
index int // 在 heap 中的索引


type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
return len(pq)
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
// 返回第一个元素,非heap接口的实现方法。
// 这里没有写错,函数接收器不用指针是为了不改变waitForPriorityQueue内的数据。
// 后面调用该方法都是为了检查最小堆pop的item,(因为是最小堆,pop出来的item的到期时间一定是最早的)的到期时间是比now时间早还是晚
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]

延时队列DelayingQueue的核心就是运行 waitingLoop方法。

func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),

go ret.waitingLoop()
return ret

AddAfter方法是对DelayingInterface接口的实现。AddAfter方法是在给定延迟后将给定项目添加到工作队列。具体是通过将两个入参组装成waitFor结构体 &waitFor{data: item, readyAt: q.clock.Now().Add(duration)} 放入到channel waitingForAddCh: make(chan *waitFor, 1000) 中去。(最大可以缓存1000个 &waitForm items)

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {


// immediately add things with no delay
if duration <= 0 {

select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:

waitingLoop方法,随着delayingType实例的创建而启动,并一直运行下去直到workqueue被shutdown。waitingLoop方法一直在做的事情就是 不停的将上面的 AddAfter方法 放进 q.waitingForAddCh channel的item取出来,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在就放到heap上。并不断的从heap pop出第一个item,检测item的到期时间,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在啥也不做,item继续保留在heap上。

func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()

// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)

// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer

waitingForQueue := &waitForPriorityQueue{}

waitingEntryByData := map[t]*waitFor{}

for {
// 如果该延迟队列包含wrap的普通队列的属性和方法,得知该队列正在被关闭,则跳出整个waitingLoop()方法
if q.Interface.ShuttingDown() {

now := q.clock.Now()

// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
// heap的第一个item是最接近到期时间的,该item时间还没到,则heap不动,若该item时间已到,则pop出来,并将该item加入workqueue和从唯一无序set集合waitingEntryByData删除该item。
if entry.readyAt.After(now) {

entry = heap.Pop(waitingForQueue).(*waitFor)
delete(waitingEntryByData, entry.data)

// Set up a wait for the first item's readyAt (if one exists)
// nextReadyAt是个定时器,never是永不到期的定时器
nextReadyAt := never
//若 heeap:waitingForQueue 还有item
if waitingForQueue.Len() > 0 {
// 若定时器在工作,则停止改计时器
if nextReadyAtTimer != nil {
entry := waitingForQueue.Peek().(*waitFor) // 获取 heeap:waitingForQueue 首个item
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 获取该首个item还有多久到期(到期时间减去现在时间),根据该时间创建定时器 nextReadyAtTimer.C()
nextReadyAt = nextReadyAtTimer.C()

select {
case <-q.stopCh:

case <-q.heartbeat.C():
// continue the loop, which will add ready items

case <-nextReadyAt:
// continue the loop, which will add ready items

case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {

drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
drained = true

上面的代码中的select方法,满足心跳时间 或者 pop后的heap的第一个元素的时间已经到了 或者q.waitingForAddCh channel有数据,就进入下一次的for循环。


for !drained 是为了将 q.waitingForAddCh channel里的items处理完,当 drained = true 表示已处理完成该channel的全部items。

insert方法往heap添加元素,分两种情况。若元素存在则update,若不存在,push该元素到heap中,并将入参的 knownEntries(即waitingLoop方法的waitingEntryByData) set集合添加该元素的值,为了保证不重复。

func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)


heap.Push(q, entry)
knownEntries[entry.data] = entry