最近项目需要用到限流和统计流量的功能, 便用 go 基于计数器的原理简单地实现了这两个功能.
限流
限流的要求是在指定的时间间隔内, server 最多只能服务指定数量的请求. 实现的原理是我们启动一个计数器, 每次服务请求会把计数器加一, 同时到达指定的时间间隔后会把计数器清零; 这个计数器的实现代码如下所示:
type RequestLimitService struct { Interval time.Duration MaxCount int Lock sync.Mutex ReqCount int }
func NewRequestLimitService(interval time.Duration, maxCnt int) *RequestLimitService { reqLimit := &RequestLimitService{ Interval: interval, MaxCount: maxCnt, }
go func() { ticker := time.NewTicker(interval) for { <-ticker.C reqLimit.Lock.Lock() fmt.Println("Reset Count...") reqLimit.ReqCount = 0 reqLimit.Lock.Unlock() } }()
return reqLimit }
func (reqLimit *RequestLimitService) Increase() { reqLimit.Lock.Lock() defer reqLimit.Lock.Unlock()
reqLimit.ReqCount += 1 }
func (reqLimit *RequestLimitService) IsAvailable() bool { reqLimit.Lock.Lock() defer reqLimit.Lock.Unlock()
return reqLimit.ReqCount < reqLimit.MaxCount }
|
在服务请求的时候, 我们会对当前计数器和阈值进行比较, 只有未超过阈值时才进行服务:
var RequestLimit = NewRequestLimitService(10 * time.Second, 5)
func helloHandler(w http.ResponseWriter, r *http.Request) { if RequestLimit.IsAvailable() { RequestLimit.Increase() fmt.Println(RequestLimit.ReqCount) io.WriteString(w, "Hello world!\n") } else { fmt.Println("Reach request limiting!") io.WriteString(w, "Reach request limit!\n") } }
func main() { fmt.Println("Server Started!") http.HandleFunc("/", helloHandler) http.ListenAndServe(":8000", nil) }
|
完整的代码放在了 Github 上.
功能测试
在代码中我们的默认设定是在 10 秒钟内最多只服务 5 个请求. 我们可以每次并行发送 3 个请求看返回结果:
➜ JustDoIt git:(master) seq 3 | xargs -P10 -I% curl localhost:8000 Hello world! Hello world! Hello world! ➜ JustDoIt git:(master) seq 3 | xargs -P10 -I% curl localhost:8000 Hello world! Hello world! Reach request limit!
|
可以看到发送到第 6 个请求时就触发了限流操作, 和我们预期的行为是一致的.
流量统计
流量统计的实现原理也是类似, 先启动一个计数器, 每次请求都会把计数器加一, 同时再启动一个定时器, 每隔一秒就会把当前计数器的值保存下来, 然后再把计数器清零. 代码如下:
var QPS []CountQPS
type CountQPS struct { CountPerSecond int Timestamp int64 }
type CounterService struct { CountQPS CountAll int Lock sync.Mutex }
func NewCounterService() *CounterService { counter := &CounterService{} go func() { ticker := time.NewTicker(time.Second) for { <-ticker.C counter.Lock.Lock() counter.Timestamp = time.Now().Unix()
if counter.CountPerSecond > 0 { QPS = append(QPS, CountQPS{counter.CountPerSecond, counter.Timestamp}) }
counter.CountPerSecond = 0
counter.Lock.Unlock() } }() return counter }
func (counter *CounterService) Increase() { counter.Lock.Lock() defer counter.Lock.Unlock()
counter.CountAll++ counter.CountPerSecond++ }
|
完整的代码保存在 Github 上.
在上面的代码中我们只是把每秒的统计值保存在一个 slice 中, 在实际的项目时间中我们可以把这个信息保存在 influxdb 这样的数据库或者 Kafka 中.
功能测试
在上面的完整代码中我们还实现了一个 get_cnt
的 api , 通过它可以打印出当前的所有流量统计值. 让我们来先发送一些请求, 然后看结果是否符合预期:
➜ JustDoIt git:(master) seq 2 | xargs -P10 -I% curl localhost:8000 Hello world! Hello world! ➜ JustDoIt git:(master) seq 5 | xargs -P10 -I% curl localhost:8000 Hello world! Hello world! Hello world! Hello world! Hello world! ➜ JustDoIt git:(master) curl localhost:8000/get_cnt timestamp,query_per_second 1525496731,2 1525496734,5 total: 7
|
可以看到这个结果也是符合预期的!
总结
本文给出的限流和流量统计的方法是比较简单基础的实现, 在某些情况下会有问题, 如参考文献中提到的在限流时间间隔的特定时间点 发送请, 可能会导致实际流量是设计限制流量的两倍. 但本文的方法用在 demo 类的项目中应该也是没什么问题的 :)
欢迎大家多多留言交流!
参考链接