目前kazoo是连接zk的最新第三方库,最新更新时间为2019年1月,其他第三方连接zk的库都长时间未更新,所以推荐使用kazoo。前面有几篇文章都已经详细给出了zk的部署,接下来是zk最核心的地方,将zk的数据结构特性跟业务场景相结合,实现复杂需求,本文给出基本demo用法介绍。
1、监控节点数量的变化 基本操作,创建、更新、删除,kazoo接口已经足够简单,入参类型如果不懂,可以直接看源码,同时也有助于深入了解别人是如何构思python“中间件”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from kazoo import exceptionsfrom kazoo.client import KazooClientfrom kazoo.client import ChildrenWatchfrom kazoo.client import DataWatchdef normal_test (zk_path,host,port,node_list ): zk=KazooClient(hosts=host+':' +port,timeout=5 ) zk.start(timeout=5 ) if not zk.exists(zk_path): print("node:{} does't exists" .format (zk_path)) zk.create(path=zk_path,value=b'bar' ) child_node_list=zk.get_children(zk_path,watch=None ,include_data=False ) if not child_node_list: for sub_node in node_list: zk.create(zk_path + '/' + sub_node,b'1' ) else : print('subnode list:{}' .format (child_node_list)) data,stat=zk.get(zk_path) print('current node data:{}' .format (data)) print('data version:{}' .format (stat.version)) print('data length:{}' .format (stat.data_length)) print('children node numbers:{}' .format (stat.numChildren)) stat_new=zk.set (zk_path,value=b'foo' ) print('node {0} is updated:{1}' .format (zk_path,stat_new)) zk.delete(zk_path,recursive=True ) try : last=zk.get_children(zk_path) print('children nodes :{}' .format (last)) except exceptions.NoNodeError: print('no children nodes' ) zk.stop() zk.close() if __name__=='__main__' : normal_test(zk_path='/app_conf' ,host='192.168.100.5' ,port='2181' ,node_list=['foo1' ,'foo2' ,'foo3' ])
==监控节点数量的变化,可以应用到相关的场景: 1)把节点名称设为服务器IP,可以实现服务器集群管理,服务(服务接口)上、下线通知等,又称服务发现、服务监控等 2)主备切换,把最小临时节点设为master角色,其他临时节点为salve角色 3)独占锁,若只监听一个固定临时节点,当该临时节点创建,则获得锁,否则释放锁 4)分布式锁,不同客户端创建不同临时顺序节点,链式监听节点是否删除事件==
2、简单的wacher kazoo支持使用装饰器实现一个简单的wacher,kazoo有两种wacher,一个是监听子节点变化,另外一个是监听节点值的变化。监听子节点变化示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import timefrom kazoo.client import KazooClientfrom kazoo.client import DataWatchfrom kazoo.client import ChildrenWatchdef watch_child_node (zk_path ): zkc=KazooClient(hosts='192.168.100.5:2181' ,timeout=5 ) zkc.start(timeout=5 ) @ChildrenWatch(client=zkc,path=zk_path,send_event=True ) def get_changes_with_event (children,event ): print ("Children nodes are %s" % children) if event: print("catched nodes a children nodes event " ,event) @ChildrenWatch(client=zkc,path=zk_path ) def get_changes_without_event (children ): print ("Children are %s" % children) while True : time.sleep(5 ) print('watching children node changes.....' ) watch_child_node(/app_conf) [zk: localhost:2181 (CONNECTED) 2 ] create /app_conf/foo1 1 Created /app_conf/foo1 [zk: localhost:2181 (CONNECTED) 3 ] create /app_conf/foo2 1 Created /app_conf/foo2 [zk: localhost:2181 (CONNECTED) 4 ] create /app_conf/foo3 1 需要客户端根据事件的发生写一段逻辑去获取zk的节点到底是增加了还是减少了。 Children nodes are [] watching children nodes changes..... watching children nodes changes..... Children nodes are ['foo1' ] catching a children nodes event WatchedEvent(type ='CHILD' , state='CONNECTED' , path='/app_conf' ) watching children nodes changes..... Children nodes are ['foo1' , 'foo2' ] catching a children nodes event WatchedEvent(type ='CHILD' , state='CONNECTED' , path='/app_conf' ) watching children nodes changes..... Children nodes are ['foo1' , 'foo2' , 'foo3' ] catching a children nodes event WatchedEvent(type ='CHILD' , state='CONNECTED' , path='/app_conf' ) Children nodes are ['foo1' , 'foo2' ] catching a children nodes event WatchedEvent(type ='CHILD' , state='CONNECTED' , path='/app_conf' )
注意:在注册监听环节,可以监听当前节点本身是否删除的事件,以及子节点的增、删事件,若需要zk返回event,那么需要将send_event设为True,才可以在watch函数传入event位置参数,这个逻辑可以在kazoo的源码看到 if self._send_event result = self._func(children, event) else: result = self._func(children) 装饰器有两种写法,一种是从引用kazoo import的ChildrenWatch,
1 2 3 @ChildrenWatch(client=zkc,path=zk_path,send_event=True ) def get_changes_with_event (children,event ): pass
另外一种是从已创建的zk实例中调用ChildrenWatch
1 2 3 @zkc.ChildrenWatch(path=zk_path,send_event=True ) def get_changes_with_event (children,event ): pass
2.1 监听节点自身的数据变化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timefrom kazoo.client import KazooClientfrom kazoo.client import DataWatchfrom kazoo.client import ChildrenWatchdef watch_data (zk_path ): zkc = KazooClient(hosts='192.168.100.5:2181' , timeout=5 ) zkc.start(timeout=5 ) @zkc.DataWatch(path=zk_path ) def my_watch (data, stat,event ): if not data: pass print("Data is {0} and data type is {1}" .format (data, type (data))) print("Version is %s" % stat.version) if event: print("catching a data event " ,event) while True : time.sleep(5 ) print('watching current node data changes.....' ) watch_data('/app_conf' ) [zk: localhost:2181 (CONNECTED) 20 ] set /app_conf foo [zk: localhost:2181 (CONNECTED) 19 ] set /app_conf 2 Data is b'foo' and data type is <class 'bytes '> Version is 22watching current node data changes .....Data is b 'bar ' and data type is <class 'bytes '>Version is 23catching a data event WatchedEvent (type ='CHANGED' , state='CONNECTED' , path='/app_conf' )
节点数据的变化,很容易联想相关应用场景: 1)集中配置管理,各个客户端监听放置配置文件内容的节点,若配置有变化,则可以各个客户端拉取配置更新 2)消息队列: 在特定节点下创建持久顺序节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下znode存储的数据就是消息队列中的消息内容,持久顺序节点就是消息的编号,按排序后取出最小编号节点(先get后delete)。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题
2.2 封装一个同时监听子节点变化和当前节点数据变化的watcher类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import timefrom kazoo.client import KazooClientfrom kazoo.client import ChildrenWatchfrom kazoo.client import DataWatchclass ZKWatcher (object ): def __init__ (self,host,port,timeout=5 ): self._old_node_list=[] self._node_name='' self._host=host self._port=port self._time_out=timeout self._ip_port=self._host+':' +self._port self._zkc=KazooClient(hosts=self._ip_port,timeout=self._time_out) self._zkc.start(self._time_out) def watcher (self,zk_path ): self._old_node_list=self._zkc.get_children(zk_path) try : ChildrenWatch(client=self._zkc,path=zk_path,func=self._node_change,send_event=True ) DataWatch(client=self._zkc,path=zk_path,func=self._data_change) except Exception as e: raise def _node_change (self,new_node_list,event ): if not event: print('未有事件发生' ) return if new_node_list == self._old_node_list: print('子节点列表未发生变化' ) return if len (new_node_list)>len (self._old_node_list): for new_node in new_node_list: if new_node not in self._old_node_list: print('监听到一个新的节点:%s' %str (new_node)) self._old_node_list=new_node_list else : for old_node in self._old_node_list: if old_node not in new_node_list: print('监听到一个删除的节点:%s' %str (old_node)) self._old_node_list=new_node_list def _data_change (self,data,stat,event ): if not data: print('节点已删除,无法获取数据' ) return if not event: print('未有事件发生' ) print('监听到数据变化' ) print('数据为' ,data) print('数据长度' ,stat.dataLength) print('数据版本号:' ,stat.version) print('子节点数据版本号' ,stat.cversion) print('子节点数量' ,stat.numChildren) print('事件' ,event) def run (): try : zk = ZKWatcher(host='192.168.100.5' ,port='2181' ) zk.watcher('/app_locker' ) while True : time.sleep(5 ) print('watching......' ) except Exception as e: print(e) if __name__== "__main__" : run()