bin-log-distributor消费数据丢失问题解决记录
bin-log-distributor项目简介
bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址
背景
线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。
验证方法
在装mysql的服务器上,使用多个线程并发循环对bin-log-distributor监控的数据表进行增、删、改。
预期
bin-log-distributor客户端出现数据丢失。
定位
分析客户端收到的数据,发现不仅有数据丢失,还有数据重复消费。
将客户端日志打印更仔细,发现以下两点问题:
- 从redis队列peek数据后,remove抛出如下异常:
- Redis队列数据没消费完其他线程异常进行了消费:
分析如下消费redis数据方法源码:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
try {
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
// 尝试加锁,最多等待20秒,上锁以后30秒自动解锁
boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
//拿到锁且 队列不为空 进入
if (lockRes && !queue.isEmpty()) {
//让这个线程把队列里的全部处理完吧
rLock.lock();
//DATA_KEY_IN_PROCESS.add(dataKey);
while ((dto = queue.peek()) != null) {
//处理完毕,把数据从队列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收处理数据失败:" + e.toString());
} finally {
rLock.forceUnlock();
rLock.delete();
//DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
猜测是redis RLock并没有效,于是在rLock.tryLock() 和 rLock.delete() 方法后面分别打印出 acquireLock 和 releaseLock的日志,再次执行测试,得到如下日志:
发现 pool-2-thread-1线程成功获取锁后,pool-2-thread-2获取锁失败,并立刻将pool-2-thread-1 持有的锁release了。结合源码发现pool-2-thread-2获取锁失败后,执行了finally代码块中的rLock.forceUnlock()方法,查阅资料发现forceUnlock()方法可以直接释放相同key的其他线程持有的锁。
至此,定位到问题为:每次收到数据起一个线程执行doRunWithLock()方法,当数据库操作较频繁时,一个线程获取到的redis锁会被另一个线程release掉,导致redis锁内部本来应该是单例执行的消费redis队列数据变成了多个线程同时执行,从而出现多个线程消费队列数据冲突,引发数据丢失和重复消费的问题。
解决
确保每个线程获取到的Redis锁只能由当前线程release(),更改后的代码如下:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
boolean lockRes = false;
try {
// 尝试加锁,最多等待50ms(防止过多线程等待)
// 上锁以后6个小时自动解锁(防止redis队列太长,当前拿到锁的线程处理时间过长)
lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
if (!lockRes) {
return;
}
DATA_KEY_IN_PROCESS.add(dataKey);
//拿到锁之后再获取队列
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
if (!queue.isExists() || queue.isEmpty()) {
return;
}
//拿到锁且 队列不为空 进入
while ((dto = queue.peek()) != null) {
//处理完毕,把数据从队列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收处理数据失败:" + e.toString());
} finally {
//forceUnlock是可以释放别的线程拿到的锁的,需要判断是否是当前线程持有的锁
if (lockRes) {
rLock.forceUnlock();
rLock.delete();
DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
}
回归
更改后执行继续执行相同的测试用例,得到如下结果日志:
pool-2-thread-2在获取到redis锁之后一直持续消费redis队列中的数据,其他线程尝试获取redis锁失败后,不再强制release该锁,客户端消费条数、类型结果正确。
完成
修改测试用例,使用更多线程、执行更多数据库操作,分析消费结果,结果数据条数正确,未出现数据丢失、重复消费等问题。
确认结果回归结果正确,提交代码。
作者简介
陈精华,2018年8月加入凯京科技。任职java开发工程师,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。
欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践
关于架构&运维部
凯京研发中心架构&运维部的工作主要分两大部分,架构部分主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维部分主要负责devops系统研发以及k8s容器环境的维护等工作。
架构组招聘
目前架构组还有一个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn