跳到主要内容

client-go 源码分析(9) - workerqueue之限速队列RateLimitingQueue

workerqueue的限速队列RateLimitingQueue搞明白三件事就可以了。

  1. 代码结构
  2. 5种RateLimitingQueue(限速队列)
  3. Kubernetes主要用了上述5种限速队列的哪几种

代码结构

因为普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能,所以rateLimitingType结构体包装了延迟队列的接口和RateLimiter接口。而5种限速限速队列都实现了RateLimiter接口。

type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}

rateLimitingType结构体实现了RateLimitingInterface接口的所有方法:

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}

其中 AddRateLimited方法 将限速队列的item通过When方法获取到期时间,然后通过延迟队列的AddAfter方法将该item加入队列。

5种限速限速队列

5种限速限速队列都实现了下面的RateLimiter接口。

type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}

BucketRateLimiter

令牌桶限速器BucketRateLimiter 用了 golang 标准库的令牌桶限流器 golang.org/x/time/rate.Limiter 实现。所有元素都是一样的,来几次都是一样,所以NumRequeues,Forget都没有意义。

type BucketRateLimiter struct {
*rate.Limiter
}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
}

ItemExponentialFailureRateLimiter

指数失败限速器ItemExponentialFailureRateLimiter,失败次数越多,限速越长而且是指数级增长的一种限速器。当然也不是无限增长下去,有最大限速时间配置。

type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

baseDelay time.Duration
maxDelay time.Duration
}

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

delete(r.failures, item)
}

ItemFastSlowRateLimiter

ItemFastSlowRateLimiter 对一定次数的尝试进行快速重试,然后进行慢速重试。

type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

r.failures[item] = r.failures[item] + 1

if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}

return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

delete(r.failures, item)
}

MaxOfRateLimiter

MaxOfRateLimiter 调用每个 RateLimiter 并返回最坏情况响应。

type MaxOfRateLimiter struct {
limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}

return ret
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}

return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}

WithMaxWaitRateLimiter

使用MaxWaitRateLimiter具有maxDelay,可避免等待太长时间。

type WithMaxWaitRateLimiter struct {
limiter RateLimiter
maxDelay time.Duration
}

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay
}

return delay
}

func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
w.limiter.Forget(item)
}

func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
return w.limiter.NumRequeues(item)
}

Kubernetes主要用了上述5种限速队列的哪几种

可以看出基本就用了两种限速队列BucketRateLimiter和ItemExponentialFailureRateLimiter,且以BucketRateLimiter为主。

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}