yield-bytes

沉淀、分享与无限进步

基于redis实现分布式锁(多实例redis+RedLock算法)

  在前面的文章中,已经实现单实例redis分布式锁,但这种实现是基于单个redis服务,若redis服务不可用,显然所有客户端无法加锁,该实现还未到高可用的水平,因此需要进一步提升分布式的锁的逻辑,好在redis官方提供了相应的权威描述并称之为Redlock,具体参考文章:DLM,这个锁的算法实现了多redis实例(各个redis是相互独立的,没有主从、集群模式)的情况,实现了真正高可用分布式锁。

高可用的分布式锁要求:

1)Mutual exclusion,互斥性,任何时刻只能有一个client获取锁

2)Deadlock free,死锁也必须能释放,即使锁定资源的redis服务器崩溃或者分区,仍然能释放锁

3)Fault tolerance;只要多数互相独立的redis节点,这里不是指主从模式两个节点或者集群模式,(一半以上)在使用,client才可以进行加锁,而且加锁成功的redis服务数量超过半数,且锁的有效时长还未过期,才认为加锁成功,否则加锁失败

Redlock算法说明

  首先需理解时钟漂移clock drift概念:服务器时钟偏离绝对参考时钟的差值,例如在分布式系统中,有5台服务器,所有服务器时钟在初始情况下都设置成相同的时间(服务器上没有设置ntp同步)例如都为2019-08-01 10:00:00,随着时间的流逝,例如经过1年后,再“观察”这5台服务器的时间,服务器之间的时间对比,将有可能出现一定的快慢差异:

Server1显示一年后的时间:2020-08-01 10:00:01

Server2显示一年后的时间:2020-08-01 10:00:02

Server3显示一年后的时间:2020-08-01 10:00:02

Server4显示一年后的时间:2020-08-01 09:59:58

Server5显示一年后的时间:2020-08-01 10:00:01

那么由这5台服务器组成的分布式系统,在外侧观察,时钟漂移为=2020-08-01 10:00:02减去2020-08-01 09:59:58=4秒,当然这是累计一年的时钟漂移时长,于是可以计算每秒的时间漂移刻度=4/(3600*24*365),该刻度时长极小完全可以忽略不计,这是redis官方提供这个概念,让分布式锁的redis实现看起来更高级。

redlock加锁流程,假设客户端A按顺序分别在5个完全独立的redis实例作为加锁,如图所示:
在这里插入图片描述

1)客户端A在redis01加锁操作前,获取当前时间戳T1

2)客户端A使用相同的key和uuid按顺序在5个redis上set key加锁和设定键的过期时长(有效时长),因为set key操作需要一定时间,因此在set过期时长时,需要set大于加锁所消耗的时长,否则客户端A还未在超过半数redis实例加锁成功前,前面redis set的key就已经先失效了,

错误设置:TTL为1s,例如客户端A在redis01加锁耗时为0.1秒、在redis02加锁耗时为0.5秒,但在redis03加锁耗时为1秒,此时redis01、redis02的key已失效,导致客户端A没能在超过半数(3个)的redis实例上加锁成功

正确设置:TTL为5s,例如客户端A在redis01加锁耗时为0.1秒、在redis02加锁耗时为0.5秒,redis03加锁耗时为1秒,此时redis01、redis02、redis03 key还未失效,客户端A成功在超过半数(3个)的redis实例上加锁,但此时客户端A还不能严格意义上成功获得了分布式锁,还需要进行第3步骤的判断

3)客户端A完成在多个redis实例上加锁后,此刻,锁真正有效时间不是一开始设置TTL的10秒,而是由以下得出:

在5个redis上加锁完后所消耗的时长:set_lock_cost=T5-T1=4s

实际锁的最小有效时长:min_validity=TTL-set_lock_cost-时钟漂移耗时

实际锁的最小有效时长=10s-4s-1s=5s,也就是说客户端A虽然在redis服务器设置有效时长为10s,但扣除一系列的加锁操作耗时后,“redis服务端”留给客户端A的实际有效时长为5秒。如果客户端A能在这5秒内完成任务,且按顺序释放锁,那么客户端A完成了一个完整流程的分布式锁条件的任务。

4)如果客户端A超时等原因无法获得超过半数(3)个以上,则必须解锁所有redis实例,否则影响其他进程加锁

RedLock代码实现:

  前一篇文章中已经实现的单服务的redis分布式锁,基于该基础上,实现redlock并不复杂,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import time,datetime
import uuid
import random
import redis
import threading


class RedLockException(Exception):
pass


class RedLock(object):
def __init__(self, locker_key, connection_conf_list=None,
retry_times=3,
retry_interval=200,
ttl=5000,
clock_drift=500):
self.locker_key = locker_key
self.retry_times = retry_times
self.retry_interval = retry_interval
self.global_ttl = ttl
self.clock_drift = clock_drift
self.locker_id = None
self.is_get_lock = False
if not connection_conf_list:
connection_conf_list = [{
'host': '192.168.100.5',
'port': 6379,
'db': 0,
'socket_connect_timeout':1
}]

self.all_redis_nodes = [redis.StrictRedis(**each_conf) for each_conf in connection_conf_list]
self.majority_nodes = len(self.all_redis_nodes) // 2 + 1

def _release_single_lock(self, node):
"""
在redis服务端执行原生lua脚本,只能删除加锁者自己的id,而且是原子删除
:return:
"""
lua_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
try:
lua_func = node.register_script(lua_script)
lua_func(keys=[self.locker_key], args=[self.locker_id])
except(redis.exceptions.ConnectionError, redis.exceptions.TimeoutError):
pass

def _acquire_single_lock(self, node):
"""
在单个redis加锁
:param node:
:return:
"""
try:
result = node.set(self.locker_key, self.locker_id, nx=True, px=self.global_ttl)
return result
except(redis.exceptions.ConnectionError, redis.exceptions.TimeoutError):
return False

def _acquire(self):

self.locker_id = str(uuid.uuid1())
loop = 1
while loop <= self.retry_times:
# 这里需要注意:多线程并发模拟过程中,需要在任务执行前加锁,否则线程不安全
ok_lock_count = 0
start = time.monotonic()
# 按顺序在每个redis上尝试set key 加锁
for node in self.all_redis_nodes:
if self._acquire_single_lock(node):
print('{}:成功加锁'.format(node))
ok_lock_count += 1

end = time.monotonic()

# 在多个redis实例上加锁所消耗的时长
set_lock_cost = (end - start)
# 扣除相关操作耗时,得出实际锁的有效时长
real_ttl = self.global_ttl - set_lock_cost - self.clock_drift
print('本次加锁耗时:{0:,.4f} ms 锁实际有效时长:{1:,.4f} ms'.format(set_lock_cost,real_ttl))

# 如果加锁数量超过半数,且实际锁的有效时长大于0,则说明客户端本次成功获得分布式锁
if ok_lock_count >= self.majority_nodes and real_ttl > 0:
return True, real_ttl
else:
# 客户端本次未能获得分布式锁,需释放本次申请的所有锁
if real_ttl <= 0:
print('客户端加锁失败,因为锁的实际有效时间太短')
else:
print('客户端加锁失败,因为成功加锁的redis实例少于总数的一半')
for node in self.all_redis_nodes:
self._release_single_lock(node)

# 随机休眠后,客户端继续下一轮加锁
loop += 1
time.sleep(random.randint(0, self.retry_interval) / 1000)
print('超过{}次加锁失败'.format(self.retry_times))
return False,0

def acquire(self):
is_lock, validity = self._acquire()
return is_lock

def acquire_with_validity(self):
"""
:return: 返回加锁是否成功和锁的有效期
"""
is_lock, validity = self._acquire()
return is_lock, validity

def release(self):
for node in self.all_redis_nodes:
self._release_single_lock(node)

def __enter__(self):
is_lock, validity = self._acquire()
if not is_lock:
return False
# raise RedLockException('unable to acquire distributed lock')
return is_lock, validity

def __exit__(self, exc_type, exc_val, exc_tb):
return self.release()


def doing_jobs(r):
with RedLock('locker_test'):
thread_name = threading.currentThread().name
bonus = 'money'
total = r.get(bonus)
if not total:
print('奖金池没设置')
return
if int(total) == 0:
print('奖金已被抢完'.format(thread_name))
return
result = r.decr(bonus, 1)
print('客户端:{0}抢到奖金,还剩{1},时间:{2}'.format(thread_name, result,datetime.datetime.now()))


if __name__=='__main__':

start_time=time.monotonic()
thread_nums=100
pool_obj = redis.ConnectionPool(host='192.168.100.5', port=8002, socket_connect_timeout=5)
r_conn = redis.Redis(connection_pool=pool_obj)

threads = []
for _ in range(thread_nums):
t = threading.Thread(target=doing_jobs, args=(r_conn,))
threads.append(t)

for t in threads:
t.start()

for t in threads:
t.join()

cost=time.monotonic()-start_time
print('任务耗时:{:,.2f} ms'.format(cost))

在单个redis下测试redlock

1)单redis,100个并发请求

手动在redis单服务set值,测试客户端发来的100个并发抢资源时,基于redlock的分布式锁是否逻辑正确,

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> set money 300
OK

# 运行结果
客户端:Thread-100抢到奖金,还剩299,时间:*** 22:14:40.358679
客户端:Thread-9抢到奖金,还剩298,时间:*** 22:14:40.363933
客户端:Thread-36抢到奖金,还剩297,时间:*** 22:14:40.371695

客户端:Thread-16抢到奖金,还剩202,时间:*** 22:14:40.808868
客户端:Thread-34抢到奖金,还剩201,时间:*** 22:14:40.811364
客户端:Thread-52抢到奖金,还剩200,时间:*** 22:14:40.817055

可以看到,在同一秒内,100个线程都有序的抢到锁和资源

2)5个redis实例,1个并发请求

在5个独立redis实例下验证redlock分布式锁有效性(这里的5个实例是在同一服务器下开启,模拟5台redis服务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@dn2 redis-5.0.5]# pwd
/opt/redis/redis-5.0.5
# 在redis目录下直接拷贝redis.conf,重命名,且只需修改里面的端口项即可,这里端口为8000~8004
redis8001.conf redis8002.conf redis8003.conf redis8004.conf

# 逐个启动redis实例
[root@dn2 redis-5.0.5]# redis-server redis8001.conf

[root@dn2 redis-5.0.5]# ps -ef|grep redis
root 30321 1 0 20:58 ? 00:00:00 redis-server *:8000
root 30326 1 0 20:58 ? 00:00:00 redis-server *:8001
root 30372 1 0 21:02 ? 00:00:00 redis-server *:8002
root 30381 1 0 21:03 ? 00:00:00 redis-server *:8003
root 30386 1 0 21:03 ? 00:00:00 redis-server *:8004

# 登录其中一个实例set key
[root@dn2 redis-5.0.5]# redis-cli -p 8002
127.0.0.1:8002> set foo 1
OK
127.0.0.1:8002> get foo
"1"

只有一个并发的条件下,客户端在5个实例加锁情况,在8002实例上加入资源:

以上代码小改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 加入5个redis实例连接配置
def doing_jobs(r):
redis_nodes_conf=[
{'host':'192.168.100.5','port':8000},
{'host': '192.168.100.5', 'port': 8001},
{'host': '192.168.100.5', 'port': 8002},
{'host': '192.168.100.5', 'port': 8003},
{'host': '192.168.100.5', 'port': 8004},
]
with RedLock(locker_key='Redlock',connection_conf_list=redis_nodes_conf):
thread_name = threading.currentThread().name
bonus = 'money'
total = r.get(bonus)

if not total:
print('奖金池没设置')
return

if int(total) == 0:
print('奖金已被抢完'.format(thread_name))
return

result = r.decr(bonus, 1)
print('客户端:{0}抢到奖金,还剩{1},时间:{2}'.format(thread_name, result,datetime.datetime.now()))

在其中一个redis实例加入资源

1
2
3
[root@dn2 redis-5.0.5]# redis-cli -p 8002
127.0.0.1:8002> set money 10
OK

可以看到,客户端首先在五个实例上按顺序加锁,执行任务,获得1个资源完成任务后,接着再顺序释放锁,其中加锁耗时0.01ms,锁实际有效时长:4,499.99 ms,任务耗时0.02ms,说明锁的实际有效时长足够大,以至于可以保证任务执行过程中,保持锁不失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8000,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8001,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8002,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功加锁
本次加锁耗时:0.01 ms 锁实际有效时长:4,499.99 ms
客户端:Thread-1抢到奖金,还剩9,时间:*** 22:10:46.490385
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8000,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8001,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8002,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功释放锁
任务耗时:0.02 ms

3)redis实例工作数量小于半数,1个并发请求

只有一个并发的条件下,客户端在小于3个实例加锁情况,只需把8000、8001、8002端口改掉,模拟只有两个redis实例正常服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功加锁
本次加锁耗时:0.0670 ms 锁实际有效时长:4,499.9330 ms
客户端加锁失败,因为成功加锁的redis实例少于总数的一半
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功加锁
本次加锁耗时:0.0728 ms 锁实际有效时长:4,499.9272 ms
客户端加锁失败,因为成功加锁的redis实例少于总数的一半
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功加锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功加锁
本次加锁耗时:0.0548 ms 锁实际有效时长:4,499.9452 ms
客户端加锁失败,因为成功加锁的redis实例少于总数的一半
超过3次加锁失败
Redis<ConnectionPool<Connection<host=192.168.100.5,port=80000,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=80001,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=80002,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8003,db=0>>>:成功释放锁
Redis<ConnectionPool<Connection<host=192.168.100.5,port=8004,db=0>>>:成功释放锁
任务耗时:0.53 ms

  可以看到,客户端尝试3次加锁,在给定的5个redis实例里仅能成功加锁2个,少于半数,故本次分布式加锁失败。当然也可以模拟把锁的ttl设置小值,例如500ms,那么将出现即使加完锁,因为锁的有实效时长太短,导致无法最终得到分布式锁,这里不在模拟。

支持多线程的redlock算法

  以上未模拟1个线程并发,但其实现不支持多线程,如果要模拟多个并发例如:100个并发,因为在同一进程里,涉及到对多个线程同一时刻更改ok_lock_count的值,因此,在执行任务前,就需要出传入线程锁,保证同一时刻,仅有一个线程更新这个ok_lock_count(本线程在多个redis实例上成功set key 的计数)

任务执行代码小改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

def doing_jobs(r,thread_lock):
redis_nodes_conf=[
{'host':'192.168.100.5','port':8000},
{'host': '192.168.100.5', 'port': 8001},
{'host': '192.168.100.5', 'port': 8002},
{'host': '192.168.100.5', 'port': 8003},
{'host': '192.168.100.5', 'port': 8004},
]
# 这里的多线程锁是为了处理"模拟并发情况下",对ok_lock_count变量进行更新时,保证同一时间只能有一个线程来操作

with thread_lock:
with RedLock(locker_key='Redlock',connection_conf_list=redis_nodes_conf) as (is_lock,validity):
if not is_lock:
return
thread_name = threading.currentThread().name
bonus = 'money'
total = r.get(bonus)

if not total:
print('奖金池没设置')
return

if int(total) == 0:
print('奖金已被抢完'.format(thread_name))
return

result = r.decr(bonus, 1)
print('客户端:{0}抢到奖金,还剩{1},时间:{2}'.format(thread_name, result,datetime.datetime.now()))


if __name__=='__main__':
start_time=time.monotonic()
thread_nums=100
pool_obj = redis.ConnectionPool(host='192.168.100.5', port=8002, socket_connect_timeout=5)
r_conn = redis.Redis(connection_pool=pool_obj)
thread_lock=threading.RLock()
threads = []
for _ in range(thread_nums):
t = threading.Thread(target=doing_jobs, args=(r_conn,thread_lock))
threads.append(t)

for t in threads:
t.start()

for t in threads:
t.join()

cost=time.monotonic()-start_time
print('任务耗时:{:,.2f} ms'.format(cost))

  以上完成redlock完整的分析、实现和测试,现在回看redlock的实现,它提出的所谓加锁耗时、时钟漂移等,都可以用最简单的方式代替:只需要把key的ttl设置足够长的时间,那么就无需担心在加锁过程中key突然失效。

  综上,个人认为redis实现分布式锁的过程过于繁琐(注意不是复杂),而且要求redis实例之间是独立运行,反正我个人不会在项目中使用这种逻辑,因此Zookeeper在分布式锁方面的可用性,无疑是最优的。