一、前言
大家好!我是sum墨,一个一线的底层码农,平时喜欢研究和思考一些技术相关的问题并整理成文,限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。
作为一名从业已达六年的老码农,我的工作主要是开发后端Java业务系统,包括各种管理后台和小程序等。在这些项目中,我设计过单/多租户体系系统,对接过许多开放平台,也搞过消息中心这类较为复杂的应用,但幸运的是,我至今还没有遇到过线上系统由于代码崩溃导致资损的情况。这其中的原因有三点:一是业务系统本身并不复杂;二是我一直遵循某大厂代码规约,在开发过程中尽可能按规约编写代码;三是经过多年的开发经验积累,我成为了一名熟练工,掌握了一些实用的技巧。
好像一提到防抖,接下来就会提到限流,我在第六篇文章写了一些接口防抖的策略,那么这篇正好讲讲接口如何限流。不知道从哪里看到的,“防抖是回城,限流是攻击”
,感觉真的很形象,我来简要描述一下:
王者荣耀大家都玩过吧,里面的英雄都有一个攻击间隔,当我们连续的点击普通攻击的时候,英雄的攻速并不会随着我们点击的越快而更快的攻击。这个就是限流,英雄会按照自身攻速的系数执行攻击,我们点的再快也没用。
而防抖在王者荣耀中就是回城,在游戏中经常会遇到连续回城嘲讽对手的玩家,它们每点击一次回城,后一次的回城都会打断前一次的回城,只有最后一次点击的回城会被触发,从而保证回城只执行一次,这就是防抖的概念。
本文参考项目源码地址:summo-springboot-interface-demo
二、业务场景
1. API速率限制
对外提供的API接口可能需要限制每个用户或每个IP地址在单位时间内的访问次数,以防止滥用或过载。
2. 网站流量控制
对于高流量的网站,为了防止瞬时访问量过大导致服务器压力过重,可以通过限流保护系统稳定运行。
3. 秒杀活动
电商平台在进行秒杀活动时,可能会遭遇大量用户同时抢购,通过计数器限流算法可以有效地控制访问量,避免系统崩溃。
4. 微服务架构
在微服务架构中,限流可以防止某个服务因为突发的高流量而成为瓶颈,进而影响到整个系统的稳定性。
5. 分布式系统的互斥操作
在进行诸如分布式锁的操作时,限流算法可以避免过多的请求同时竞争资源,保证系统的公平性和效率。
6. 基础设施保护
对于数据库、缓存等基础设施服务,通过计数器限流可以避免过多的并发请求导致服务不可用。
7. 网络带宽控制
对于带宽有限的网络服务,限流算法可以用来确保带宽的合理分配,防止网络拥堵。
三、限流策略
1. 计数器
(1)代码
CounterRateLimit.java
package com.summo.demo.config.limitstrategy.counter;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CounterRateLimit {
/**
* 请求的数量
*
* @return
*/
int requests();
/**
* 时间窗口,单位为秒
*
* @return
*/
int timeWindow();
}
CounterRateLimitAspect.java
package com.summo.demo.config.limitstrategy.counter;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
@Order(5)
public class CounterRateLimitAspect {
/**
* 用来存储每个方法请求计数的映射
*/
private final ConcurrentHashMap REQUEST_COUNT = new ConcurrentHashMap();
/**
* 来存储每个方法的时间戳的映射
*/
private final ConcurrentHashMap TIMESTAMP = new ConcurrentHashMap();
@Around("@annotation(com.summo.demo.config.limitstrategy.counter.CounterRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
//获取注解信息
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
CounterRateLimit counterRateLimit = method.getAnnotation(CounterRateLimit.class);
//获取注解上配置的参数
int maxRequests = counterRateLimit.requests();
long windowSizeInMillis = TimeUnit.SECONDS.toMillis(counterRateLimit.timeWindow());
// 获取方法的字符串表示,用作键值
String methodName = method.toString();
// 初始化计数器和时间戳
AtomicInteger count = REQUEST_COUNT.computeIfAbsent(methodName, k -> new AtomicInteger(0));
long startTime = TIMESTAMP.computeIfAbsent(methodName, k -> System.currentTimeMillis());
// 获取当前时间
long currentTimeMillis = System.currentTimeMillis();
// 如果当前时间超出时间窗口,则重置计数器和时间戳
if (currentTimeMillis - startTime > windowSizeInMillis) {
// 原子地重置时间戳和计数器
TIMESTAMP.put(methodName, currentTimeMillis);
count.set(0);
}
// 原子地增加计数器并检查其值
if (count.incrementAndGet() > maxRequests) {
// 如果超出最大请求次数,递减计数器,并报错
count.decrementAndGet();
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
// 执行原方法
return joinPoint.proceed();
}
}
使用方式
/**
* 计数器算法限流
*
* @return
*/
@GetMapping("/counter")
@CounterRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity counter() {
return ResponseEntity.ok("counter test ok!");
}
(3)原理说明
在示例中,有一个使用了 @CounterRateLimit 注解的 counter 方法。根据注解的参数,这个方法在2秒钟的时间窗口内只能被调用2次。 如果在 2 秒内有更多的调用尝试,那么这些额外的调用将被限流,并返回错误信息。
(4)流程图
(5)缺点
无法处理“临界问题”。
(6)临界问题
假设1min一个时间段,每个时间段内最多100个请求。有一种极端情况,当10:00:58这个时刻100个请求一起过来,到达阈值;当10:01:02这个时刻100个请求又一起过来,到达阈值。这种情况就会导致在短短的4s内已经处理完了200个请求,而其他所有的时间都在限流中。
2. 滑动窗口
(1)代码
SlidingWindowRateLimit.java
package com.summo.demo.config.limitstrategy.slidingwindow;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.anno服务器托管网tation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SlidingWindowRateLimit {
/**
* 请求的数量
*
* @return
*/
int requests();
/**
* 时间窗口,单位为秒
*
* @return
*/
int timeWindow();
}
SlidingWindowRateLimitAspect.java
package com.summo.demo.config.limitstrategy.slidingwindow;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Component
@Order(5)
public class SlidingWindowRateLimitAspect {
/**
* 使用 ConcurrentHashMap 保存每个方法的请求时间戳队列
*/
private final ConcurrentHashMap> REQUEST_TIMES_MAP = new ConcurrentHashMap();
@Around("@annotation(com.summo.demo.config.limitstrategy.slidingwindow.SlidingWindowRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {服务器托管网
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
SlidingWindowRateLimit rateLimit = method.getAnnotation(SlidingWindowRateLimit.class);
// 允许的最大请求数
int requests = rateLimit.requests();
// 滑动窗口的大小(秒)
int timeWindow = rateLimit.timeWindow();
// 获取方法名称字符串
String methodName = method.toString();
// 如果不存在当前方法的请求时间戳队列,则初始化一个新的队列
ConcurrentLinkedQueue requestTimes = REQUEST_TIMES_MAP.computeIfAbsent(methodName,
k -> new ConcurrentLinkedQueue());
// 当前时间
long currentTime = System.currentTimeMillis();
// 计算时间窗口的开始时间戳
long thresholdTime = currentTime - TimeUnit.SECONDS.toMillis(timeWindow);
// 这一段代码是滑动窗口限流算法中的关键部分,其功能是移除当前滑动窗口之前的请求时间戳。这样做是为了确保窗口内只保留最近时间段内的请求记录。
// requestTimes.isEmpty() 是检查队列是否为空的条件。如果队列为空,则意味着没有任何请求记录,不需要进行移除操作。
// requestTimes.peek()
使用方式
/**
* 滑动窗口算法限流
*
* @return
*/
@GetMapping("/slidingWindow")
@SlidingWindowRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity slidingWindow() {
return ResponseEntity.ok("slidingWindow test ok!");
}
(3)原理说明
从图上可以看到时间创建是一种滑动的方式前进, 滑动窗口限流策略能够显著减少临界问题的影响,但并不能完全消除它。滑动窗口通过跟踪和限制在一个连续的时间窗口内的请求来工作。与简单的计数器方法不同,它不是在窗口结束时突然重置计数器,而是根据时间的推移逐渐地移除窗口中的旧请求,添加新的请求。
举个例子:假设时间窗口为10s,请求限制为3,第一次请求在10:00:00发起,第二次在10:00:05发起,第三次10:00:11发起,那么计数器策略的下一个窗口开始时间是10:00:11,而滑动窗口是10:00:05。所以这也是滑动窗口为什么可以减少临界问题的影响,但并不能完全消除它的原因。
(4)流程图
3. 令牌桶
(1)代码
该算法我就不造轮子了,直接使用Guava自带的RateLimiter实现。
pom.xml
com.google.guava
guava
32.1.1-jre
TokenBucketRateLimit.java
package com.summo.demo.config.limitstrategy.tokenbucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketRateLimit {
/**
* 产生令牌的速率(xx 个/秒)
*
* @return
*/
double permitsPerSecond();
}
TokenBucketRateLimitAspect.java
package com.summo.demo.config.limitstrategy.tokenbucket;
import com.google.common.util.concurrent.RateLimiter;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Aspect
@Component
@Order(5)
public class TokenBucketRateLimitAspect {
private final ConcurrentHashMap limiters = new ConcurrentHashMap();
@Around("@annotation(com.summo.demo.config.limitstrategy.tokenbucket.TokenBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
TokenBucketRateLimit rateLimit = method.getAnnotation(TokenBucketRateLimit.class);
double permitsPerSecond = rateLimit.permitsPerSecond();
String methodName = method.toString();
RateLimiter rateLimiter = limiters.computeIfAbsent(methodName, k -> RateLimiter.create(permitsPerSecond));
if (rateLimiter.tryAcquire()) {
return joinPoint.proceed();
} else {
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
}
}
使用方式
/**
* 令牌桶算法限流
*
* @return
*/
@GetMapping("/tokenBucket")
@TokenBucketRateLimit(permitsPerSecond = 0.5)
public ResponseEntity tokenBucket() {
return ResponseEntity.ok("tokenBucket test ok!");
}
(3)原理说明
令牌桶算法是一种流量控制机制,非常适合于处理突发流量,同时保证一定程度的平滑流动。它的工作原理类似于一个实际的水桶,其中水桶代表令牌桶,水流代表令牌。令牌以恒定的速率填充到桶中,直到达到桶的容量上限,多余的> 令牌会被丢弃。当请求(比如网络数据包或者游戏中的攻击动作)到达时,它需要消耗一个令牌才能被处理。如果桶中没有令牌,请求就会被延迟或丢弃,直到桶中再次有令牌为止。
以王者荣耀和LOL中的英雄攻速为例,英雄的攻击动作可以类比为请求,而英雄的攻速属性则确定了令牌生成的速度,即攻击的最大频率。如果英雄的攻击动作必须消耗一个令牌才能执行,那么即使玩家手速再快,也不能超过攻速设定> 的最大限制。这样,英雄的攻击将会保持一个恒定和平滑的节奏,而不会出现一会儿快速连续攻击,一会儿又突然停止的现象。
这种算法的优势在于其能够限制请求的峰值速率,同时允许一定程度的突发请求。在实际应用中,令牌桶算法可以平滑流量,减少拥塞,确保系统的稳定性。相较于仅仅依靠计数器或者滑动窗口算法,令牌桶算法提供了更加灵活和平滑> 的流量控制方式,非常适合需要处理突发性高流量和保持服务质量的场景。
(4)流程图
4. 漏桶
(1)代码
LeakyBucketRateLimit.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LeakyBucketRateLimit {
/**
* 桶的容量
*
* @return
*/
int capacity();
/**
* 漏斗的速率,单位通常是秒
*
* @return
*/
int leakRate();
}
LeakyBucketLimiter.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class LeakyBucketLimiter {
/**
* 桶的容量
*/
private final int capacity;
/**
* 漏桶的漏出速率,单位时间内漏出水的数量
*/
private final int leakRate;
/**
* 当前桶中的水量
*/
private volatile int water = 0;
/**
* 上次漏水的时间
*/
private volatile long lastLeakTime = System.currentTimeMillis();
/**
* 漏桶容器
*/
private static final ConcurrentHashMap LIMITER_MAP = new ConcurrentHashMap();
/**
* 静态工厂方法,确保相同的方法使用相同的漏桶实例
*
* @param methodKey 方法名
* @param capacity
* @param leakRate
* @return
*/
public static LeakyBucketLimiter createLimiter(String methodKey, int capacity, int leakRate) {
return LIMITER_MAP.computeIfAbsent(methodKey, k -> new LeakyBucketLimiter(capacity, leakRate));
}
private LeakyBucketLimiter(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
/**
* 尝试获取许可(try to acquire a permit),如果获取成功返回true,否则返回false
*
* @return
*/
public boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
synchronized (this) {
// 计算上次漏水到当前时间的时间间隔
long leakDuration = currentTime - lastLeakTime;
// 如果时间间隔大于等于1秒,表示漏桶已经漏出一定数量的水
if (leakDuration >= TimeUnit.SECONDS.toMillis(1)) {
// 计算漏出的水量
long leakQuantity = leakDuration / TimeUnit.SECONDS.toMillis(1) * leakRate;
// 漏桶漏出水后,更新桶中的水量,但不能低于0
water = (int)Math.max(0, water - leakQuantity);
lastLeakTime = currentTime;
}
// 判断桶中的水量是否小于容量,如果是则可以继续添加水(相当于获取到令牌)
if (water
LeakyBucketRateLimitAspect.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.lang.reflect.Method;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
@Order(5)
public class LeakyBucketRateLimitAspect {
@Around("@annotation(com.summo.demo.config.limitstrategy.leakybucket.LeakyBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
LeakyBucketRateLimit leakyBucketRateLimit = method.getAnnotation(LeakyBucketRateLimit.class);
int capacity = leakyBucketRateLimit.capacity();
int leakRate = leakyBucketRateLimit.leakRate();
// 方法签名作为唯一标识
String methodKey = method.toString();
LeakyBucketLimiter limiter = LeakyBucketLimiter.createLimiter(methodKey, capacity, leakRate);
if (!limiter.tryAcquire()) {
// 超过限制,抛出限流异常
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
return joinPoint.proceed();
}
}
使用方式
/**
* 漏桶算法限流
*
* @return
*/
@GetMapping("/leakyBucket")
@LeakyBucketRateLimit(capacity = 100, leakRate = 20)
public ResponseEntity leakyBucket() {
return ResponseEntity.ok("leakyBucket test ok!");
}
(3)原理说明
在Leaky Bucket算法中,容器有一个固定的容量,类似于漏桶的容量。数据以固定的速率进入容器,如果容器满了,则多余的数据会溢出。容器中的数据会以恒定的速率从底部流出,类似于漏桶中的水滴。如果容器中的数据不足以满足流出速率,则会等待直到有足够的数据可供流出。这样就实现了对数据流的平滑控制。
(4)流程图
四、小结一下
如果面试中被问到如何进行接口优化,大家第一印象会想到什么?放到3年前的我来看,第一反应肯定是优化接口性能,然后说一个本来耗时好几秒的接口优化到毫秒的案例。而现在的我会说:上下文、权限控制、防抖/限流等等,当然也会说性能优化。感谢一直在学习的自己!
在写这篇文章之前,我看了很多大佬写的相关文章,才开始动笔,不是看不懂也不是写不来,只是因为我没怎么在真实业务中用到限流策略,我不敢乱写。怕给人埋坑。
回顾前面的6篇,加上这一篇一共7篇,共计耗时3个月,但第6篇到第7篇则整整花了一个月,不是在偷懒,理论型的东西好写,但没有实际业务支持那就是高谈阔论。不过沉淀了一个月,不写点东西我怕马上就忘记了,还是写一篇,拙文共赏吧!如果有哪位同学在真实业务中使用过类似的限流策略,可以和我们分享一下经验哈,谢谢啦!
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net