目录
基本实现
编辑防死锁
防误删
使用lua保证删除原子性
可重入锁
加锁脚本
解锁脚本
代码实现
使用及测试
自动续期
总结
基本实现
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
- 1. 多个客户端同时获取锁(setnx)
- 2. 获取成功,执行业务逻辑,执行完成释放锁(del)
- 3. 其他客户端等待重试
改造StockService方法:
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
@Autowired
private LockMapper lockMapper;
@Autowired
private StringRedisTemplate redisTemplate;
public void checkAndLock() {
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock",
"xxx")){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
// 释放锁
this.redisTemplate.delete("lock");
}
}
其中,加锁:
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
解锁:
// 释放锁
this.redisTemplate.delete("lock");
使用Jmeter压力测试如下:
查看mysql数据库:
防死锁
解决:给锁设置过期时间,自动释放锁。 设置过期时间两种方式:
1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)
压力测试肯定也没有问题。
问题:可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下
1. index1业务逻辑没执行完,3秒后锁被自动释放。
2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
3. index3获取到锁,执行业务逻辑
4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的 锁
防误删
实现如下:
问题:删除操作缺乏原子性。 场景:
1. index1执行删除时,查询到的lock值确实和uuid相等
2. index1执行删除前,lock刚好过期时间已到,被redis自动释放
3. index2获取了lock 4. index1执行删除,此时会把index2的lock删除
解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)
使用lua保证删除原子性
删除LUA脚本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',
KEYS[1]) else return 0 end
代码实现:
public void checkAndLock() {
// 加锁,获取锁失败重试
String uuid = UUID.randomUUID().toString();
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3,
TimeUnit.SECONDS)){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
// 释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
redis.call('del', KEYS[1]) else return 0 end";
this.redisTemplate.execute(new DefaultRedisScript(script,
Long.class), Arrays.asList("lock"), uuid);
}
压力测试:
可重入锁
由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代 码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继 续往下执行。
用一段 Java 代码解释可重入:
public synchronized void a() {
b();
}
public synchronized void b() {
// pass
}
假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。
锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~
可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释 放。 可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
解决方案:redis + Hash
加锁脚本
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的 锁的重入次数,然后利用 lua 脚本判断逻辑。
if (redis.call('exists', KEYS[1]) == 0 or
redis.call('hexists', KEYS[1], ARGV[1]) == 1)
then
redis.call('hincrby', KEYS[1], ARGV[1], 1);
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
假设值为:KEYS:[lock], ARGV[uuid, expire]如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次 数加1。
解锁脚本
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
代码实现
由于加解锁代码量相对较多,这里可以封装成一个工具类:
具体实现:
public class RedisDistributeLock{
private StringRedisTemplate redisTemplate;
//线程局部变量,可以在线程内共享参数
private String lockName;
private String static uuid;
private Integer expire = 30;
private static final ThreadLocal THREAD_LOCAL = new ThreadLocal();
public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.uuid = THREAD_LOCAL.get();
if (StringUtils.isBlank(uuid)) {
this.uuid = UUID.randomUUID().toString();
THREAD_LOCAL.set(uuid);
}
}
public void lock() {
this.lock(expire);
}
public void lock(Integer expire) {
this.expire = expire;
String script = "if (redis.call('exists', KEYS[1]) == 0 or" +
"redis.call('hexists', KEYS[1], ARGV[1]) == 1)" +
" then" +
" redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
" redis.call('expire', KEYS[1], ARGV[2]);" +
" return 1;" +
"else" +
" return 0;" +
" end";
if (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class),
Arrays.asList(lockName), uuid, expire.toString())) {
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
//没有获取到锁重试
lock(expire);
}
}
public void unlock() {
String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
" return nil; " +
"elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
" return 0; " +
"else" +
" redis.call('del', KEYS[1]);" +
" return 1;" +
"end;";
//如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null
//转为 false,这就会影响我们逻辑判断
//所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
Long result = this.redisTemplate.execute(new DefaultRedisScript
(script, Long.class), Arrays.asList(lockName), uuid);
// 如果未返回值,代表尝试解其他线程的锁
if (result == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
} else if (result == 1) {
THREAD_LOCAL.remove();
}
}
}
使用及测试
在业务代码中使用:
public void checkAndLock() {
// 加锁,获取锁失败重试
RedisDistributeLock lock = new RedisDistributeLock(this.redisTemplate,
"lock");
lock.lock();
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
// this.testSubLock();
// 释放锁
lock.unlock();
}
测试:
测试可重入性:
自动续期
lua脚本:
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
在RedisDistributeLock中添加renewExpire方法:
private static final Timer TIMER = new Timer();
/**
* 开启定时器,自动续期
*/
private void renewExpire() {
String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " +
"redis.call('expire', KEYS[1], ARGV[2]); " +
"return 1; " +
"else " +
"return 0; end";
TIMER.schedule(new TimerTask() {
@Override
public void run() {
//如果uuid为空,则终止定时任务
if (StringUtils.isNotBlank(uuid)) {
redisTemplate.execute(new DefaultRedisScript(script, Boolean.class),
Arrays.asList(lockName), RedisDistributeLock.this.uuid,
expire.toString());
renewExpire();
}
}
},expire * 1000 / 3);
}
在lock方法中使用:
在unlock方法中添加红框中的代码:
总结
特征:
1.独占排他:setnx
2.防死锁:
redis客户端程序获取到锁之后,立马宕机。给锁添加过期时间
不可重入:可重入
3.防误删:
先判断是否自己的锁才能删除
4.原子性:
加锁和过期时间之间
判断和释放锁之间
5.可重入性:hash + lua脚本
6.自动续期:Timer定时器 + lua脚本
锁操作:
1.加锁:
1.setnx:独占排他 死锁、不可重入、原子性
2.set k v ex 30 nx:独占排他、死锁 不可重入
3.hash + lua脚本:可重入锁
1.判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)
2.如果锁被占用,则判断是否当前线程占用的,如果是则重入(hincrby)并重置过期时间(expire)
3.否则获取锁失败,将来代码中重试
4.Timer定时器 + lua脚本:实现锁的自动续期
2.解锁
1.del:导致误删
2.先判断再删除同时保证原子性:lua脚本
3.hash + lua脚本:可重入
1.判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常
2.存在则直接减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del),并返回1
3.不为0,则返回0
3.重试:递归 循环
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net