最近项目需要用到限流和统计流量的功能, 便用 go 基于计数器的原理简单地实现了这两个功能.

限流

限流的要求是在指定的时间间隔内, server 最多只能服务指定数量的请求. 实现的原理是我们启动一个计数器, 每次服务请求会把计数器加一, 同时到达指定的时间间隔后会把计数器清零; 这个计数器的实现代码如下所示:

type RequestLimitService struct {
Interval time.Duration
MaxCount int
Lock sync.Mutex
ReqCount int
}

func NewRequestLimitService(interval time.Duration, maxCnt int) *RequestLimitService {
reqLimit := &RequestLimitService{
Interval: interval,
MaxCount: maxCnt,
}

go func() {
ticker := time.NewTicker(interval)
for {
<-ticker.C
reqLimit.Lock.Lock()
fmt.Println("Reset Count...")
reqLimit.ReqCount = 0
reqLimit.Lock.Unlock()
}
}()

return reqLimit
}

func (reqLimit *RequestLimitService) Increase() {
reqLimit.Lock.Lock()
defer reqLimit.Lock.Unlock()

reqLimit.ReqCount += 1
}

func (reqLimit *RequestLimitService) IsAvailable() bool {
reqLimit.Lock.Lock()
defer reqLimit.Lock.Unlock()

return reqLimit.ReqCount < reqLimit.MaxCount
}

在服务请求的时候, 我们会对当前计数器和阈值进行比较, 只有未超过阈值时才进行服务:

var RequestLimit = NewRequestLimitService(10 * time.Second, 5)

func helloHandler(w http.ResponseWriter, r *http.Request) {
if RequestLimit.IsAvailable() {
RequestLimit.Increase()
fmt.Println(RequestLimit.ReqCount)
io.WriteString(w, "Hello world!\n")
} else {
fmt.Println("Reach request limiting!")
io.WriteString(w, "Reach request limit!\n")
}
}

func main() {
fmt.Println("Server Started!")
http.HandleFunc("/", helloHandler)
http.ListenAndServe(":8000", nil)
}

完整的代码放在了 Github 上.

功能测试

在代码中我们的默认设定是在 10 秒钟内最多只服务 5 个请求. 我们可以每次并行发送 3 个请求看返回结果:

➜  JustDoIt git:(master) seq 3 | xargs -P10 -I% curl localhost:8000
Hello world!
Hello world!
Hello world!
➜ JustDoIt git:(master) seq 3 | xargs -P10 -I% curl localhost:8000
Hello world!
Hello world!
Reach request limit!

可以看到发送到第 6 个请求时就触发了限流操作, 和我们预期的行为是一致的.

流量统计

流量统计的实现原理也是类似, 先启动一个计数器, 每次请求都会把计数器加一, 同时再启动一个定时器, 每隔一秒就会把当前计数器的值保存下来, 然后再把计数器清零. 代码如下:

var QPS []CountQPS

type CountQPS struct {
CountPerSecond int
Timestamp int64
}

type CounterService struct {
CountQPS
CountAll int
Lock sync.Mutex
}

func NewCounterService() *CounterService {
counter := &CounterService{}
go func() {
ticker := time.NewTicker(time.Second)
for {
<-ticker.C
counter.Lock.Lock()
counter.Timestamp = time.Now().Unix()

if counter.CountPerSecond > 0 {
QPS = append(QPS, CountQPS{counter.CountPerSecond, counter.Timestamp})
}

counter.CountPerSecond = 0

counter.Lock.Unlock()
}
}()
return counter
}

func (counter *CounterService) Increase() {
counter.Lock.Lock()
defer counter.Lock.Unlock()

counter.CountAll++
counter.CountPerSecond++
}

完整的代码保存在 Github 上.

在上面的代码中我们只是把每秒的统计值保存在一个 slice 中, 在实际的项目时间中我们可以把这个信息保存在 influxdb 这样的数据库或者 Kafka 中.

功能测试

在上面的完整代码中我们还实现了一个 get_cnt 的 api , 通过它可以打印出当前的所有流量统计值. 让我们来先发送一些请求, 然后看结果是否符合预期:

➜  JustDoIt git:(master) seq 2 | xargs -P10 -I% curl localhost:8000
Hello world!
Hello world!
➜ JustDoIt git:(master) seq 5 | xargs -P10 -I% curl localhost:8000
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
➜ JustDoIt git:(master) curl localhost:8000/get_cnt
timestamp,query_per_second
1525496731,2
1525496734,5
total: 7

可以看到这个结果也是符合预期的!

总结

本文给出的限流和流量统计的方法是比较简单基础的实现, 在某些情况下会有问题, 如参考文献中提到的在限流时间间隔的特定时间点 发送请, 可能会导致实际流量是设计限制流量的两倍. 但本文的方法用在 demo 类的项目中应该也是没什么问题的 :)

欢迎大家多多留言交流!

参考链接