在前面的文章中,已经给出基于kazoo操作zk的逻辑,接下来利用zk的临时序列节点机制实现分布式锁,分布式锁场景使用非常广,在读写方面都非常适合,个人认为比基于redis实现的分布式锁更具可靠性(但性能方面,redis应该更强?)。
1、zk的临时顺序节点 临时顺序节点其实是结合临时节点和顺序节点的特点:在某个固定的持久节点(例如/locker)下创建子节点时,zk通过将10位的序列号附加到原始名称来设置znode的路径。例如,使用路径 /locker/foo创建为临时顺序节点,zk会将路径设为为 /locker/foo0000000001 ,并将下一个临时迅速节点设为/locker/foo0000000002,以此类推。如果两个顺序节点是同时创建的,那么zk不会对每个znode使用相同的数字。当创建节点的客户端与zk断开连接时(socket断开,更深一层应该是收到客户端发来的TCP挥手FIN 报文),服务端zk底层收到客户端的FIN报文后将由该客户端创建的临时节点删除掉。 临时顺序节点结构大致如下,/locker节点为持久节点,该节点下有多个子节点,这些子节点由不同客户端创建。
1 2 3 4 5 6 locker/ ├── foo0000000001 ├── foo0000000002 ├── foo0000000003 ├── foo0000000004 └── foo0000000005
2、分布式锁的实现流程 注意:本文提供的是基于zk的共享锁,而非排他锁(独占锁),看完本文后,实现独占锁会简单很多 ==共享锁定义:又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。zk实现的“共享锁”就是有多个序号的临时节点。== 共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。
分布式锁流程: (1) 客户端发起请求,在zk指定持久节点/locker下(若不存在该locker节点则创建),创建临时顺序节点/locker/foo0000000003
(2) 获取/locker下所有子节点,例如有三个不同客户端各自创建的节点`all_nodes=['/locker/foo0000000001','/locker/foo0000000002','/locker/foo0000000003']`
(3) 对子节点按节点自增序号从小到大排序
(4) 判断本节点/locker/foo0000000003是否为节点列表中最小的子节点,若是,则获取锁,处理业务后,删除本节点/locker/foo0000000003;若不是,则监听排在该节点前面的那个节点/locker/foo0000000002“是否存在”事件
注意:这里产生这样的节点监听链,有两个监听链:
`/locker/foo0000000002监听/locker/foo0000000001是否存在的事件`
`/locker/foo0000000003监听/locker/foo0000000002是否存在的事件`
(5) 若被监听的节点路径“是否存在”的事件触发,处理业务,删除本节点;否则客户端阻塞自己,继续等待监听事件触发。
图示说明 流程用OmniGraffle(Mac)画成,比Visio好用
3、show me the code 用python实现的zk临时顺序节点分布式锁的文章,在csdn等貌似没看到过,很多文章都是使用别人已经封装好的zklock或者直接使用kazoo提供zklock来做例子说明。
1 2 3 4 with ZkDistributedLock(**conf): doing_jobs(*args,**kwargs)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 import osimport timeimport datetimeimport threadingfrom kazoo.client import KazooClientclass ZkDistributedLock (object ): """ 基于zk的临时顺序节点实现分布式锁 """ def __init__ (self, hosts, locker_path, sub_node_name, timeout, default_value=b'1' ): self.hosts = hosts self.locker_path = locker_path self.timeout = timeout self.sub_node_path = os.path.join(self.locker_path, sub_node_name) self.default_value = default_value self.thread_event = threading.Event() self.zkc = KazooClient(hosts=self.hosts, timeout=self.timeout) self.zkc.start(self.timeout) if not self.zkc.exists(self.locker_path): self.zkc.create(self.locker_path) def get_lock (self ): self.current_node_path = self.zkc.create(path=self.sub_node_path, value=self.default_value, ephemeral=True , sequence=True ) all_nodes = self.zkc.get_children(self.locker_path) all_nodes = sorted (all_nodes) if len (all_nodes) == 1 : d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S' ) print('current node {0} got the locker at {1}' .format (self.current_node_path, d)) self.thread_event.set () return min_node = all_nodes[0 ] min_node_path = os.path.join(self.locker_path, min_node) if self.current_node_path == min_node_path: d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S' ) print('current node {0} got the locker at {1}' .format (self.current_node_path, d)) self.thread_event.set () else : current_node = os.path.split(self.current_node_path)[1 ] pre_node_index = all_nodes.index(current_node) - 1 pre_node = all_nodes[pre_node_index] self.pre_node_path = os.path.join(self.locker_path, pre_node) print('current node:{0} is watching the pre node:{1}' .format (self.current_node_path, self.pre_node_path)) self.zkc.exists(path=self.pre_node_path, watch=self.watch_node_is_exist) def watch_node_is_exist (self, event ): """当前节点前面的那个节点被删除,触发删除事件,该函数被回调,获得锁 若 :param event: :return: """ if event: d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S' ) print('current node {0} got the locker at {1}' .format (self.current_node_path, d)) self.thread_event.set () else : pass def release (self ): if self.zkc.exists(self.current_node_path): d = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S' ) print('deleted node {0} at {1}' .format (self.current_node_path, d)) self.zkc.delete(self.current_node_path) self.zkc.stop() self.zkc.close() def __enter__ (self ): if not self.thread_event.is_set(): self.get_lock() self.thread_event.wait() return self def __exit__ (self, exc_type, exc_val, exc_tb ): self.release() def doing_jobs (a, b ): """ 模拟业务处理逻辑 :param a: :param b: :return: """ c = a + b print('doing jobs' ) time.sleep(5 ) print('jobs is done!' ) return c def run (): conf = { 'hosts' : '192.168.100.5:2181' , 'locker_path' : '/locker' , 'sub_node_name' : 'foo' , 'timeout' : 5 } with ZkDistributedLock(**conf): doing_jobs(1 , 2 ) if __name__ == '__main__' : run()
4、测试分布式锁运行效果 1)单个客户端请求锁: 单个客户端请求模拟情况较为简单:
2)模拟多个客户端并发请求锁: 启动多个客户端程序前,先手动在zk服务器上创建一个临时顺序节点并保持shell不退出,如下所示,当前已有一个最小节点,以后创建的下一个节点要监听该节点:
1 2 3 4 [zk: localhost:2181 (CONNECTED) 5 ] create -e -s /locker/foo 1 Created /locker/foo0000000605 [zk: localhost:2181 (CONNECTED) 6 ] ls /locker [foo0000000602]
接着运行多个以上程序,这里已三个为例: ==以上foo0000000602未删除前== 第一个客户端程序,可以看到第一个客户端创建603临时顺序节点,并监听着602节点并保持运行:
1 current node:/locker/foo0000000603 is watching the pre node:/locker/foo0000000602
第二个客户端程序,603节点因为602节点还未删除所以还存在,因此第二个客户端创建的604临时顺序节点要监听它前面的603节点并保持运行:
1 current node:/locker/foo0000000604 is watching the pre node:/locker/foo0000000603
==foo0000000602删除后(在zk服务器上手动删除602节点后,第一个客户端获得锁马上打印相关操作)== 第一个客户端打印,10:35:36获得锁,业务逻辑耗时5秒,并在10:35:41释放锁:
1 2 3 4 current node /locker/foo0000000603 got the locker at *** 10:35:36 doing jobs jobs is done! deleted node /locker/foo0000000603 at *** 10:35:41
第二个客户端打印,在第一个客户端释放锁的时刻10:35:41,第二个客户端同时获得锁,业务逻辑耗时5秒,并在10:35:46释放锁:
1 2 3 4 current node /locker/foo0000000603 got the locker at *** 10:35:41 doing jobs jobs is done! deleted node /locker/foo0000000603 at *** 10:35:46