yield-bytes

沉淀、分享与无限进步

基于Zookeeper的临时顺序节点实现分布式锁

  在前面的文章中,已经给出基于kazoo操作zk的逻辑,接下来利用zk的临时序列节点机制实现分布式锁,分布式锁场景使用非常广,在读写方面都非常适合,个人认为比基于redis实现的分布式锁更具可靠性(但性能方面,redis应该更强?)。

1、zk的临时顺序节点

  临时顺序节点其实是结合临时节点和顺序节点的特点:在某个固定的持久节点(例如/locker)下创建子节点时,zk通过将10位的序列号附加到原始名称来设置znode的路径。例如,使用路径 /locker/foo创建为临时顺序节点,zk会将路径设为为 /locker/foo0000000001 ,并将下一个临时迅速节点设为/locker/foo0000000002,以此类推。如果两个顺序节点是同时创建的,那么zk不会对每个znode使用相同的数字。当创建节点的客户端与zk断开连接时(socket断开,更深一层应该是收到客户端发来的TCP挥手FIN 报文),服务端zk底层收到客户端的FIN报文后将由该客户端创建的临时节点删除掉。
  临时顺序节点结构大致如下,/locker节点为持久节点,该节点下有多个子节点,这些子节点由不同客户端创建。

1
2
3
4
5
6
locker/
├── foo0000000001
├── foo0000000002
├── foo0000000003
├── foo0000000004
└── foo0000000005

2、分布式锁的实现流程

  注意:本文提供的是基于zk的共享锁,而非排他锁(独占锁),看完本文后,实现独占锁会简单很多
  ==共享锁定义:又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。zk实现的“共享锁”就是有多个序号的临时节点。==
  共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。

分布式锁流程:
(1) 客户端发起请求,在zk指定持久节点/locker下(若不存在该locker节点则创建),创建临时顺序节点/locker/foo0000000003
(2) 获取/locker下所有子节点,例如有三个不同客户端各自创建的节点`all_nodes=['/locker/foo0000000001','/locker/foo0000000002','/locker/foo0000000003']`
(3) 对子节点按节点自增序号从小到大排序
(4) 判断本节点/locker/foo0000000003是否为节点列表中最小的子节点,若是,则获取锁,处理业务后,删除本节点/locker/foo0000000003;若不是,则监听排在该节点前面的那个节点/locker/foo0000000002“是否存在”事件
注意:这里产生这样的节点监听链,有两个监听链:
`/locker/foo0000000002监听/locker/foo0000000001是否存在的事件`
`/locker/foo0000000003监听/locker/foo0000000002是否存在的事件`
(5) 若被监听的节点路径“是否存在”的事件触发,处理业务,删除本节点;否则客户端阻塞自己,继续等待监听事件触发。
图示说明

流程用OmniGraffle(Mac)画成,比Visio好用
在这里插入图片描述

3、show me the code

  用python实现的zk临时顺序节点分布式锁的文章,在csdn等貌似没看到过,很多文章都是使用别人已经封装好的zklock或者直接使用kazoo提供zklock来做例子说明。

1
2
3
4
# 使用上下文管理协议,对于使用者友好以及简单易用
with ZkDistributedLock(**conf):
# 调用者的业务逻辑代码
doing_jobs(*args,**kwargs)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import time
import datetime
import threading
from kazoo.client import KazooClient


class ZkDistributedLock(object):
"""
基于zk的临时顺序节点实现分布式锁
"""
def __init__(self, hosts, locker_path, sub_node_name, timeout, default_value=b'1'):
self.hosts = hosts
# 持久节点路径
self.locker_path = locker_path
self.timeout = timeout
# 子节点路径
self.sub_node_path = os.path.join(self.locker_path, sub_node_name)
# 创建子节点为临时顺序节点的默认值(只需要有值就行)
self.default_value = default_value

# 用于客户端自己首次发起请求为获得锁后,用线程的事件阻塞自己不退出,继续等待zk的删除事件通知
# 这比使用while True+time.sleep()方式更优雅
self.thread_event = threading.Event()

# 创建zk连接,若未创建成功,直接raise Kazoo定义的连接错误,这里无需再给出try except的错误。
self.zkc = KazooClient(hosts=self.hosts, timeout=self.timeout)
self.zkc.start(self.timeout)
if not self.zkc.exists(self.locker_path):
self.zkc.create(self.locker_path)

def get_lock(self):
# 这里是直接返回临时顺序节点的完整路径,例如返回:'/locker/foo0000000002'
self.current_node_path = self.zkc.create(path=self.sub_node_path, value=self.default_value, ephemeral=True,
sequence=True)

# 获取固定节点下的所有临时顺序节点列表
all_nodes = self.zkc.get_children(self.locker_path)
# 对临时顺序节点列表进行排序,小到大,kazoo返回是节点名称,不是路径:['foo0000000001','foo0000000002','foo0000000003'....]
all_nodes = sorted(all_nodes)

if len(all_nodes) == 1:
# 如果仅有zk的/locker路径下仅有一个临时顺序节点,说明没有其他客户端争抢,本客户端直接获得锁
d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('current node {0} got the locker at {1}'.format(self.current_node_path, d))
# 线程阻塞事件被set为True,通知客户端无需再阻塞自己,已经获得锁。
self.thread_event.set()
return

# 获取最小节点名例如'foo0000000001'
min_node = all_nodes[0]
# 拼接最小节点路径:'/locker/foo0000000001'
min_node_path = os.path.join(self.locker_path, min_node)

# 如果自身节点为最小节点,说明获得锁,进行操作后可以是释放锁
if self.current_node_path == min_node_path:
d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('current node {0} got the locker at {1}'.format(self.current_node_path, d))
self.thread_event.set()
# 线程阻塞事件被set为True,通知客户端无需再阻塞自己,已经获得锁。
else:
# 当前节点不是最小节点,获取当前节点的前面的节点,并对该节点进行路径存在监听(注意这里不是对最小节点监听,避免羊群效应)
current_node = os.path.split(self.current_node_path)[1]
pre_node_index = all_nodes.index(current_node) - 1
pre_node = all_nodes[pre_node_index]

# 获得在当前节点前面的那个节点路径
self.pre_node_path = os.path.join(self.locker_path, pre_node)
print('current node:{0} is watching the pre node:{1}'.format(self.current_node_path, self.pre_node_path))

# 对当前节点前面的那个节点增加"exists事件"监听
self.zkc.exists(path=self.pre_node_path, watch=self.watch_node_is_exist)

def watch_node_is_exist(self, event):
"""当前节点前面的那个节点被删除,触发删除事件,该函数被回调,获得锁

:param event:
:return:
"""
if event:
d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('current node {0} got the locker at {1}'.format(self.current_node_path, d))
self.thread_event.set()
else:
pass

def release(self):
# 释放锁,通过删除当前子节点路径实现
if self.zkc.exists(self.current_node_path):
d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('deleted node {0} at {1}'.format(self.current_node_path, d))
self.zkc.delete(self.current_node_path)
self.zkc.stop()
self.zkc.close()

def __enter__(self):
# 客户端首次发起请求锁,线程事件为False
if not self.thread_event.is_set():
# 去zk获取锁
self.get_lock()
# 如果本客户端首次请求锁却未能获得,那么客户端可以阻塞自己不退出,这里没限制重新获取锁的次数
# (也可以设计为retry次数到达前,阻塞自己,超过retry次数后,客户端退出并提示获取锁失败)
self.thread_event.wait()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.release()


def doing_jobs(a, b):
"""
模拟业务处理逻辑
:param a:
:param b:
:return:
"""
c = a + b
print('doing jobs')
time.sleep(5)
print('jobs is done!')
return c


def run():
conf = {
'hosts': '192.168.100.5:2181',
'locker_path': '/locker',
'sub_node_name': 'foo',
'timeout': 5
}

with ZkDistributedLock(**conf):
doing_jobs(1, 2)


if __name__ == '__main__':
run()

4、测试分布式锁运行效果

1)单个客户端请求锁:
单个客户端请求模拟情况较为简单:

2)模拟多个客户端并发请求锁:
启动多个客户端程序前,先手动在zk服务器上创建一个临时顺序节点并保持shell不退出,如下所示,当前已有一个最小节点,以后创建的下一个节点要监听该节点:

1
2
3
4
[zk: localhost:2181(CONNECTED) 5] create -e -s /locker/foo 1  
Created /locker/foo0000000605
[zk: localhost:2181(CONNECTED) 6] ls /locker
[foo0000000602]

接着运行多个以上程序,这里已三个为例:
==以上foo0000000602未删除前==
第一个客户端程序,可以看到第一个客户端创建603临时顺序节点,并监听着602节点并保持运行:

1
current node:/locker/foo0000000603 is watching the pre node:/locker/foo0000000602

第二个客户端程序,603节点因为602节点还未删除所以还存在,因此第二个客户端创建的604临时顺序节点要监听它前面的603节点并保持运行:

1
current node:/locker/foo0000000604 is watching the pre node:/locker/foo0000000603

==foo0000000602删除后(在zk服务器上手动删除602节点后,第一个客户端获得锁马上打印相关操作)==
第一个客户端打印,10:35:36获得锁,业务逻辑耗时5秒,并在10:35:41释放锁:

1
2
3
4
current node /locker/foo0000000603 got the locker at *** 10:35:36
doing jobs
jobs is done!
deleted node /locker/foo0000000603 at *** 10:35:41

第二个客户端打印,在第一个客户端释放锁的时刻10:35:41,第二个客户端同时获得锁,业务逻辑耗时5秒,并在10:35:46释放锁:

1
2
3
4
current node /locker/foo0000000603 got the locker at *** 10:35:41
doing jobs
jobs is done!
deleted node /locker/foo0000000603 at *** 10:35:46