项目接口在常规使用的时候可以正常愉快的执行的. 在突如其来的大流量时,如果没有一定的应对测试就会出现项目的雪崩效应, 导致整个服务或系统不可用. 所以根据类似场景需求都会对接口进行特殊的处理保证接口处于始终可用状态. 比如限流,降级,这样程序在处理超出能力范围的时候可以选择拒绝服务或者将不可处理的请求转发到其他服务(比如秒杀的排队或直接返回无货).

常见的算法

  1. 漏桶算法
  2. 固定窗口算法
  3. 滑动窗口算法
  4. 令牌桶算法

漏桶算法:

漏桶算法以一个常量限制了出口流量速率,因此漏桶算法可以平滑突发的流量。其中漏桶作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,当超出流量容器大小时,超出的流量会被丢弃。

下图比较形象的说明了漏桶算法的原理,其中水龙头是入口流量,漏桶是流量容器,匀速流出的水是出口流量。

实例代码:

package main

import (
	"container/list"
	"fmt"
	"math/rand"
	"time"
)

const listCap = 10
const QPS = 1

var leakyBucket = list.New()

func rateLeakyBucketGetToken() bool {
	// 从链表里取元素
	if leakyBucket.Len() > 0 {
		leakyBucket.Remove(leakyBucket.Front())
		return true
	}
	return false
}

func main() {
	go func() {
		for {
			// 不断往链表里丢元素
			if leakyBucket.Len() < listCap { // 容量不满的时候入队
				leakyBucket.PushBack(struct{}{})
			}
		}
	}()
	rand.Seed(time.Now().UnixNano())
	for {
		// 按照固定速率漏水
		speed := time.Duration(1000/QPS) * time.Millisecond
		ticker := time.Tick(speed)
		start := time.Now()
		for range ticker {
			if rateLeakyBucketGetToken() {
				sub := time.Now().Sub(start)
				//fmt.Println("获取请求", sub)
				go func() {
					time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
					fmt.Println("任务成功", sub)
				}()
			}
		}
	}
}

由于出口流量是个定值, 所以漏桶算法不支持突发流量. 查看了好多实现的代码发现他们里面都设置有重置时间区间的功能, 我怎么感觉是固定窗口算法呢?

固定窗口算法 (fixed window):

设置一个时间段内可以处理的最大请求上限,超过的这个上限的请求会拒绝服务. 某个API在一分钟内只能固定被访问N次(可能是出于安全考虑,也可能是出于服务器资源的考虑),那么我们就可以直接统计这一分钟开始对API的访问次数,如果访问次数超过了限定值,则抛弃后续的访问。直到下一分钟开始,再开放对API的访问。

  • 在窗口的起始时间,最差情况下可能会带来 2 倍的流量
  • 很多handle可能都在等待窗口被重置,造成惊群效应, 即当有请求过来,所有handle都会争夺.

实例代码:

//该算法实现看似确实完美的实现了“单位时间内最大访问量控制”,但它在两个单位时间的临界值上的处理是有缺陷的。如:设需要控制的最大请求数为1w, 在第一个单位时间(0-10s)的最后一秒(即第9s)里达到的请求数为1w,接下来第二个单位时间(10-20s)的第一秒(即第10s)里达到请求数也是1w,由于超时重置发生在两个单位时间之间,所以这2w个请求都将通过控制,也就是说在2s里处理2w个请求,与我们设置的10s里1w个请求的设想是相违背。
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

const period = 3 * time.Second //时间周期s
var currTime time.Time
var totalReq int32
var rateMaxReq int32 = 3 // period时间内的最大流量为rateMaxReq  速率为 rateMaxReq/period
var l sync.Mutex
func rateFixedWindowGetToken() bool {
	l.Lock()
	defer l.Unlock()
	if currTime.Add(period).After(time.Now())  {
		if totalReq > rateMaxReq {
			return false
		}
		totalReq++
	} else { // 超出窗口期, 重置窗口 (这里最差情况会通过2倍的流量)
		currTime = time.Now()
		totalReq = 0
		fmt.Println("重置")
	}
	return true
}

func main() {
	rand.Seed(time.Now().UnixNano())
	// 初始化窗口期
	currTime = time.Now()
	for {
		// 无限请求 ,电脑跑起来风扇转的飞起
		if rateFixedWindowGetToken() {
			go func() {
				time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
			}()
		}
	}
}

滑动窗口算法:

滑动窗口将固定窗口再等分为多个小的窗口,每一次对一个小的窗口进行流量控制。这种方法可以很好的解决之前的临界问题。

假设我们将1s划分为4个窗口,则每个窗口对应250ms。假设恶意用户还是在上一秒的最后一刻和下一秒的第一刻冲击服务,按照滑动窗口的原理,此时统计上一秒的最后750毫秒和下一秒的前250毫秒,这种方式能够判断出用户的访问依旧超过了1s的访问数量,因此依然会阻拦用户的访问。

  • 为每个窗口进行请求量的计数
  • 结合上一个窗口的请求量和这一个窗口已经经过的时间来计算出上限,以此 平滑请求尖锋

实例代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

const period = 10 * time.Second //时间周期  单位s
var currTime time.Time         // 时间区间的起始时间
var sliceCount = 10            // 分成多少份
var periodMaxReq = 10          // 每个区间最大可通过的请求
var l sync.Mutex
var windowsSliceCount []int //每个窗口小块通过数量统计
var windowsSliceEndTime []time.Time
var once sync.Once

// 计算每个时间的间隔
var preWindowDuration = period / time.Duration(sliceCount)

func rateRollingWindowGetToken() bool {
	once.Do(func() {
		// 初始化窗口期
		currTime = time.Now()
		// 初始化每个窗口数据
		for i := 1; i <= sliceCount; i++ {
			windowsSliceCount = append(windowsSliceCount, 0)
			windowsSliceEndTime = append(windowsSliceEndTime, currTime.Add(time.Duration(i)*preWindowDuration))
		}
	})
	l.Lock()
	defer l.Unlock()
	// 当前小窗口区间执行超出限制 限流
	t, idx, total := getCurrentSliceWindow(time.Now())
	if idx == sliceCount-1 {
		windowsSliceCount = windowsSliceCount[1:]
		windowsSliceCount = append(windowsSliceCount, 0)
		windowsSliceEndTime = windowsSliceEndTime[1:]
		windowsSliceEndTime = append(windowsSliceEndTime, t.Add(preWindowDuration))
		fmt.Println(windowsSliceCount)
	}
	if getCurrentCounter() >= periodMaxReq || total > periodMaxReq {
		return false
	}
	windowsSliceCount[idx]++ // 自增当前窗口期的数据
	return true
}

func getCurrentSliceWindow(t time.Time) (time.Time, int, int) {
	for k := range windowsSliceEndTime {
		if windowsSliceEndTime[k].After(t) {
			return windowsSliceEndTime[k], k, windowsSliceCount[k]
		}
	}
	panic("程序错误")
}

func getCurrentCounter() int {
	var sum int
	for k := range windowsSliceCount {
		sum += windowsSliceCount[k]
	}
	return sum
}

func main() {
	T := time.NewTicker(time.Millisecond * 3)
	for range T.C {
		// 使用tick替代for mac使用率为 5%
		if rateRollingWindowGetToken() {
			go func() {
				fmt.Println("执行任务")
			}()
			time.Sleep(200 * time.Millisecond)
		}
	}
	//for {
	//	// 无限请求 ,电脑跑起来风扇转的飞起   mac 跑90%的使用
	//	if rateRollingWindowGetToken() {
	//		go func() {
	//			fmt.Println("执行任务")
	//		}()
	//		time.Sleep(200 * time.Millisecond)
	//	}
	//}
}

代码比较丑陋, 主要是实现功能

下面是日志输出的一段内容:

[0 0 0 0 4 5 1 0 0 0]   // 下面一段时间内均无法通过请求
[0 0 0 4 5 1 0 0 0 0]
[0 0 4 5 1 0 0 0 0 0]
[0 4 5 1 0 0 0 0 0 0]
[4 5 1 0 0 0 0 0 0 0]   // 当4的窗口被移动以后就空闲出来4个执行
[5 1 0 0 0 0 0 0 0 0]
执行任务                
执行任务
执行任务
执行任务
[1 0 0 0 0 0 0 3 1 0]
执行任务
执行任务
执行任务
执行任务
执行任务
[0 0 0 0 0 0 3 5 1 0]

令牌桶算法:

令牌桶算法是网络流量整形(Traffic Shaping)速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。

实例代码:

package main

import (
	"fmt"
	"math"
	"sync"
	"sync/atomic"
	"time"
)

var maxTokenNumber = int32(12)    // 平均时间内的最大请求
var period = int32(3)             // 3s一个周期
var currentTokenNumber = int32(0) // 当前的token使用数

// 令牌桶算法: 原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
var once sync.Once

func InitTicker() {
	once.Do(func() {
		// 启动一个添加令牌的协程
		go func() {
			// 计算平均速率speed / s
			var speed = math.Floor(float64(maxTokenNumber / period))
			// 初始化打点器 下发令牌
			ticker := time.Tick(1000 / time.Duration(speed) * time.Millisecond)
			for range ticker {
				// 桶内令牌数没满 添加
				if atomic.LoadInt32(&currentTokenNumber) < maxTokenNumber {
					atomic.AddInt32(&currentTokenNumber, 1)
				}
			}
		}()
	})
}

func rateTokenBucketGetToken() bool {
	if atomic.LoadInt32(&currentTokenNumber) > 0 {
		atomic.AddInt32(&currentTokenNumber, -1)
		return true
	}
	return false
}

func main() {
	var counter int32
	InitTicker()
	now := time.Now()
	for {
		go func() {
			if rateTokenBucketGetToken() {
				fmt.Println("获取令牌执行任务逻辑")
				atomic.AddInt32(&counter, 1)
			}
		}()
		if time.Now().After(now.Add(15 * time.Second)) {
			fmt.Println("15秒内通过", counter, "个请求")
			break
		}
	}
}

令牌桶内如果存储满令牌以后,此时最大支持最多令牌数量的突发流量.

参考

高并发中的惊群效应
三种常见的限流算法
漏桶算法&令牌桶算法理解及常用的算法
令牌桶算法限流