前言
由于之前这个系统的日志记录是被领导要求写表的,在不影响系统性能的前提下,日志的入库操作肯定是要改成异步进行的,当时利用 ArrayBlockingQueue+线程+AOP 简单的去实现了一下,但是初版代码测试下来发现了一个很严重的问题,就是日志丢失的问题,本文由此而来。
初步构思
代码实现逻辑实现很简单,利用 AOP 切面去记录用户的行为,最终调用对应的 DAO 去入库日志,切面如下。
/**
* zzh:日志切面优先级第一
*/
@Slf4j
@Aspect
@Order(-999)
@Component
public class AspectLog {
@Autowired
private LogInfoService logInfoService;
@Pointcut("execution(public * com.example.oraceldemo.controller..*(..)))")
public void webPointCut() {
}
@Around("webPointCut()")
public Object arround(ProceedingJoinPoint pjp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
LogInfo logInfo = new LogInfo();
logInfo.setOpTime(sdf.format(System.currentTimeMillis()))
.setIp(request.getRemoteAddr())
.setMethod(request.getRequestURL().toString());
try {
long startTime = System.currentTimeMillis();
Object o = pjp.proceed();
long endTime = System.currentTimeMillis();
StringBuilder sb = new StringBuilder();
sb.append(" ###请求URL: " + request.getRequestURL().toString());
sb.append(" ###IP: " + request.getRemoteAddr());
sb.append(" ###Params: " + Arrays.toString(pjp.getArgs()));
sb.append(" ###CLASS_METHOD: " + pjp.getSignature().getDeclaringTypeName() + "." + pjp.getSignature().getName());
sb.append(" ###耗时: " + (endTime - startTime) + "毫秒");
log.info(sb.toString());
logInfo.setStartTime(sdf.format(startTime))
.setEndTime(sdf.format(endTime))
.setReturnValue(JSONObject.toJSONString(o.toString()));
服务器托管网 logInfoService.put(logInfo);
return o;
} catch (Throwable e) {
e.printStackTrace();
logInfo.setErrors(JSONObject.toJSONString(e));
log.error(e.toString());
logInfoService.put(logInfo);
return null;
}
}
}
DAO层代码
由于之前看过 NACOS 心跳健康监测的源码,感觉代码架构玩的比较 6 ,于是按照他那个设计思路,异步的将日志记录操作抽离出来,在项目启动的时候创建一个日志任务(可用线程池去优化),每当有日志需要记录的时候,就把日志对象堆积到 Notifier 里面,里面由一个阻塞队列去维护。去真正的执行入库日志的逻辑。
/**
*
* 服务实现类
*
*
* @author 张子行
* @since 2022-10-26
*/
@Service
public class LogInfoServiceImpl extends ServiceImpl implements LogInfoService {
private notifier notifier = new Notifier();
@Override
public void put(LogInfo logInfo) {
notifier.addTask(logInfo);
}
@PostConstruct
private void init() {
new Thread(notifier).start();
}
@Override
public boolean save(LogInfo entity) {
return super.save(entity);
}
}
ArrayBlockingQueue发生堆积的地方
Notifier 中的 addTask 方法源源不断的提供需要写入的日志,当阻塞队列不为空的时候就会被唤醒,取出队列中的日志进行 db 插入操作。
@Slf4j
public class Notifier implements Runnable {
private BlockingQueue tasks = new ArrayBlockingQueue(1024 * 1024);
public void addTask(LogInfo logInfo) {
log.info("Notifier tasks.add() begin,{}", logInfo.getIp());
tasks.add(logInfo);
log.info("Notifier tasks.add() end,{}", logInfo.getIp());
}
/**
* Runnable不能抛出异常,抛出异常线程无法执行!因此需要自己内部消化
*/
@Override
public void run() {
for (; ; ) {
try {
log.info("BlockingQueue tasks take begin...");
LogInfo logInfo = tasks.take();
log.info("BlockingQueue tasks take end...{}"+logInfo.toString());
LogInfoServiceImpl logInfoService = WebUtil.getBean(LogInfoServiceImpl.class);
logInfoService.save(logInfo);
} catch (InterruptedException e) {
log.error("Notifier BlockingQueue tasks take InterruptedException, Thread interrupt status{}" + Thread.currentThread().isInterrupted());
} catch (Exception e) {
log.error("Notifier logInfoService save exceptionType :{},msg: {}", e.getClass(), e.getMessage());
}
}
}
}
初版的代码如下
只对31行代码做了异常catch,而 33-34 行可能会出现一些 SQL 异常,但是之前我并未做异常catch 处理,然后 当 33-34 行出现异常的时候直接抛出到了 Runable 中去了,导致此线程运服务器托管网行终结,(这也是 Runable的一大特性了 )。导致消息没被正常消费,于是乎 ArrayBlockingQueue 中的消息越来越多,就有问题了。解决办法加一层 catch 捕获所有类型的异常,也就是在 Runable 中吞掉异常,或者手动捕获 33-34 行的异常,设置线程的中断状态,让其他业务感知到此线程被中断执行了,自定义处理逻辑。等等
问题排查思路
起初我还以为是 InterruptedException 导致的线程中断,于是乎去看了一下 ArrayBlockingQueue 中的 put 与 take 方法的源码,也算有点收获,接下来也分析一下吧。记录于下文的面试专栏中。
面试专栏
为什么当 BlockingQueue 中没有消息的时候 take 方法会阻塞当前线程?
利用了 ReentrantLock 中的条件锁(Condition)实现的, ArrayBlockingQueue 里面维护了俩个 Condition 分别是 notEmpty 与 notFull ,当 ArrayBlockingQueue 阻塞队列进行 take 的时候,且队列中没有元素的时候会进行调用 notEmpty.await(); 方法阻塞当前线程,当 ArrayBlockingQueue 阻塞队列进行 add 的时候,会去调用 notEmpty.signal(); 去唤醒当前线程。加元素的时候去唤醒线程,取元素且元素数量为0的时候会去阻塞当前线程。这也是为什么 ArrayBlockingQueue 被称之为阻塞队列的原因。对应回答的源码剖析如下。
ArrayBlockingQueue take核心源码,注意里面用到的是 ReentrantLock 中的 lockInterruptibly 方法(加锁可中断),当线程状态为中断状态时候,lock.lockInterruptibly() 这行代码就会抛出 InterruptedException 异常,具体的 ReentrantLock 为可中断锁的源码可以去看 浅聊一下,可中断锁(ReentrantLock),这也是为什么调用 take 方法的时候必须要去捕获一下 InterruptedException 异常的原因了。
ArrayBlockingQueue add里面的核心代码,里面调用了 notEmpty.signal(); 这行代码,去唤醒当前线程。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net