什么是分布式锁

分布式锁是实现分布式系统之间共享资源的一种方式

alt text

Redis实现分布式锁的要点

加锁:

1
2
3
4
5
set lock_key owner nx px n 
/*
不能用setnx,因为setnx不能带过期参数
px n 表示设置过期时间是n秒
*/

解锁:

先判断owner是否为加锁客户端,是的话才能将lock_key删除

采用lua脚本让两个操作变成一个原子操作:

1
2
3
4
5
6
// 先判断owner是否为加锁客户端
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1]) // 将lock_key删除
else
return 0
end

为什么需要owner

存在服务A释放掉服务B的锁的可能:

比如服务A获取了锁,由于业务流程比较长,耗时久,导致锁过期。这时候服务B获取了锁,准备去执行,这个时候服务A恢复了过来并做完了业务,就会释放锁,但是业务B还在执行。

lua一定能保证原子性?

lua本身不具有原子性,上面提到的用lua脚本保证原子性是因为Redis是单线程的,一个流程放进lua来执行,相当于是打包在一起,Redis执行他的流程不会被其他请求打断,所以保证了原子性

Redis分布式锁优缺点

优点:

  • 性能高效:选择缓存实现
  • 实现方便:Redis提供了setnx方法
  • 避免单点故障:Redis是跨集群部署

缺点:

  • 超时时间不好设置
  • Redis主从复制的数据是异步复制的,这样导致分布式锁不可靠:Redis主节点获取到锁后,没有同步到其他节点,在主节点宕机后,此时新的节点依然可以获取到锁,所以多个应用服务获取到了锁

Redis分布式锁的超时时间怎么设置

基于续约的方式设置超时时间。先给锁设置一个超时时间,然后启动一个守护线程,让守护线程在一段时间后,重新设置这个锁的超时时间, 比如Redisson的看门狗机制。
当然也会设置一个最大续约次数,避免因为服务异常导致无限续约,锁得不到释放

Redisson的看门狗机制

看门狗机制是Redission提供的一种自动延期机制,这个机制使得Redission提供的分布式锁是可以自动续期的

1
private long lockWatchdogTimeout = 30 * 1000; //看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒
1
2
3
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
  • 在Redis中,锁的waiiTime表示等待获取锁的时间,而leaseTime表示锁的持有时间。 当一个线程或进程尝试获取锁时,如果锁已被其他线程或进程持有,则会等待一段时间(waitTime)后再次尝试获取锁。
  • 如果在这段时间内锁被释放,则当前线程或进程可以成功获取锁,否则需要等待下一次尝试。 一旦锁被某个线程或进程获取成功,该线程或进程拥有锁的持有权,持有时间为leaseTime

看门狗流程:

  1. 在获取锁的时候,不能指定leaseTime或者只能将leaseTime设置为-1,这样才能开启看门狗机制。
  2. 在tryLockInnerAsync方法里尝试获取锁,如果获取锁成功调用scheduleExpirationRenewal执行看门狗机制
  3. 在scheduleExpirationRenewal中比较重要的方法就是renewExpiration,当线程第一次获取到锁(也就是不是重入的情况),那么就会调用renewExpiration方法开启看门狗机制
  4. 在renewExpiration会为当前锁添加一个延迟任务task,这个延迟任务会在10s后执行,执行的任务就是将锁的有效期刷新为30s(这是看门狗机制的默认锁释放时间)
  5. 并且在任务最后还会继续递归调用renewExpiration

看门狗的作用,考虑以下三种情况:

  • 如果没有设置锁的过期时间,单靠逻辑来释放锁,就会出现获取锁的节点宕机时,锁没有释放,造成死锁
  • 如果设置了某个过期时间,在没有宕机的情况下,线程发生了阻塞,就会导致锁过期自动释放,带来一些其他的问题
  • 如果设置了看门狗,在没有宕机时,如果发生了阻塞,那么看门狗就能一直给线程续时间;如果宕机了,看门狗不起作用,过了有效期之后就会自动释放掉锁,不会造成死锁

底层源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//如果获取锁失败,返回的结果是这个key的剩余有效期
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//上面获取锁回调成功之后,执行这代码块的内容
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//不存在异常
if (e == null) {
//剩余有效期为null
if (ttlRemaining == null) {
//这个函数是解决最长等待有效期的问题
this.scheduleExpirationRenewal(threadId);
}

}
});
return ttlRemainingFuture;
}
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 锁不存在,则往redis中设置锁信息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//这里EntryName是指锁的名称
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
//重入
//将线程ID加入
oldEntry.addThreadId(threadId);
} else {
//将线程ID加入
entry.addThreadId(threadId);
//续约
this.renewExpiration();
}
}


private void renewExpiration() {
//先从map里得到这个ExpirationEntry
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//这个是一个延迟任务
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
//延迟任务内容
public void run(Timeout timeout) throws Exception {
//拿出ExpirationEntry
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
//从ExpirationEntry拿出线程ID
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//调用renewExpirationAsync方法刷新最长等待时间
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
//renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数
//那么就可以实现这样的效果
//首先第一次进行这个函数,设置了一个延迟任务,在10s后执行
//10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新最长等待有效期
//这样这个最长等待时间就会一直续费
RedissonLock.this.renewExpiration();
}

}
});
}
}
}
},
this.internalLockLeaseTime / 3L, //这是锁自动释放时间,因为没传,所以是看门狗时间=30*1000,也就是10s
TimeUnit.MILLISECONDS); //时间单位

ee.setTimeout(task); //给当前ExpirationEntry设置延迟任务
}
}



// 刷新等待时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

//最后,在释放锁的时候,就会关闭所有的延迟任务
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
//取消锁更新任务
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}

void cancelExpirationRenewal(Long threadId) {
//获得当前这把锁的任务
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
//当前锁的延迟任务不为空,且线程id不为空
if (threadId != null) {
//先把线程ID去掉
task.removeThreadId(threadId);
}

if (threadId == null || task.hasNoThreads()) {
//然后取出延迟任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
//把延迟任务取消掉
timeout.cancel();
}
//再把ExpirationEntry移除出map
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}

Redis如何解决集群情况下分布式锁的可靠性

采用Redlock(红锁):让客户端和多个独立的Redis节点依次请求加锁,如果客户端能和半数以上的节点成功完成加锁操作,那么我们任务客户端成功获取到了分布式锁,否则获取失败

流程:

  1. 客户端获取到当前时间T1
  2. 客户端依次向N个Redis节点执行加锁操作,加锁操作使用set命令,带上nx,px和客户端的唯一标识。如果某个节点发生了故障,为了保证Redlock能继续运行,需要给 加锁操作设置一个超时时间(远小于锁的过期时间)
  3. 一旦客户端从超过半数的Redis节点上获取到了锁,就再次获取当前时间T2
  4. 如果T2 - T1 < 锁的过期时间,否则获取失败

可以发现,需要满足两个条件:

  • 客户端从超过半数的Redis节点上获取到了锁
  • 如果T2 - T1 < 锁的过期时间