通过etcd的watch接口捕获事件变更时,不得不面对的连接中断等异常场景。在连接中断后如果全量拉取数据,会给etcd数据库带来较大的负载冲击。因此有必要实现断点重续。
etcd中有一个全局的revision,每次PUT或DELETE值都会触发etcd中的revision加1。
因此我们可以在watch到每一个事件时,同时记录其对应的revision。当中途断连后,从上次已处理事件的revision+1开始继续watch就可以实现断点重续。
另外需要考虑重续位置的revision已被compacted的场景,此时必须要进行一次全量数据同步。
我们部署的etcd设置的自动compact间隔是2小时。因此,除了etcd连接长时间中断外,如果超过2个小时没有watch到新的事件也会导致上次记录的revision失效。为了避免这个情况,需要定期的主动获取etcd当前revision,作为长时间没有捕获到事件时的补充。
注意事项
- 一次watch到多个事件后,只有所有事件都处理完后才能更新记录的revision位点
- 定时通过get获取最新revision。一是可以作为etcd长连接的检活,二是可以在长时间没有获取新事件时的补充。当超过x秒没有watch到新事件,且连接未中断,将y秒前通过get获取到的revision作为下次断点重续的位点。
- y必须小于x。比如y为60秒,x为600秒。设计延迟y秒的目的为了确保所设定的revision附近没有满足watch条件的事件发生。
- 记录get方式获取revision和记录从watch的事件中获取的revision之间需要确保互斥
- 必须使用0.11以上的python-etcd3模块
以下测试程序主要演示etcd如何获取revision以及从指定revision位置watch事件。
import etcd3
import etcd3.exceptions
import etcd3.utils as utils
import logging
import sys
logging.getLogger().setLevel(logging.DEBUG)
def connect():
client = etcd3.client(host='10.37.163.43',
port=2379,
user='admin',
password='etcd@snds',
timeout=5)
return client
def prn_obj(obj):
return '\n'.join([' --%s:%s' % item for item in obj.__dict__.items()])
def my_callback(response_or_err):
logging.info('---------------------my_callback():')
logging.info('got response:%s' % response_or_err)
logging.info('got response member:%s' % prn_obj(response_or_err))
logging.info('---------------------content of response_or_err:')
if type(response_or_err) == etcd3.watch.WatchResponse:
logging.info('got response header.revision:%s' % response_or_err.header.revision)
logging.info('got response events:')
for e in response_or_err.events:
logging.info(' :%s' % e)
elif type(response_or_err) == etcd3.exceptions.RevisionCompactedError:
logging.info('got RevisionCompactedError:%s' % response_or_err.compacted_revision)
else:
logging.info('got other Error:%s' % response_or_err)
def main():
etcd=connect()
## 获取当前revision
etcd.put('/test1','0')
value,meta = etcd.get('/test1')
logging.info('value of key /test1:%s' % value)
logging.info('KVMeta of key /test1:%s' % prn_obj(meta))
logging.info('mod_revision of key /test1:%s' % meta.mod_revision)
logging.info('current revision:%s' % meta.response_header.revision)
## 从指定revision开始捕获变更
etcd.add_watch_callback('/test1', my_callback, start_revision=meta.mod_revision)
logging.info('press any key to exit')
sys.stdin.readline()
if __name__ == "__main__":
main()
以下是正常场景输出(收到普通的PUT/DELETE事件)
[postgres@sndsdevdb18 python-etcd3-master]$ python3 mytest.py
INFO:root:value of key /test1:b'0'
INFO:root:KVMeta of key /test1: --key:b'/test1'
--create_revision:3386233
--mod_revision:3386253
--version:20
--lease_id:0
--response_header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386253
raft_term: 447849
INFO:root:mod_revision of key /test1:3386253
INFO:root:current revision:3386253
INFO:root:press any key to exit
INFO:root:---------------------my_callback():
INFO:root:got response:<etcd3.watch.WatchResponse object at 0x7fad82de0048>
INFO:root:got response member: --header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386253
raft_term: 447849
--events:[<etcd3.events.PutEvent object at 0x7fad82de00b8>]
INFO:root:---------------------content of response_or_err:
INFO:root:got response header.revision:3386253
INFO:root:got response events:
INFO:root: :<class 'etcd3.events.PutEvent'> key=b'/test1' value=b'0'
回收所有revision小于3386255的历史key
[root@sndsdevapp21 ~]# etcdctl --user 'admin:etcd@snds' compaction 3386255
compacted revision 3386255
修改测试程序,watch一个已被compact的revision
etcd.add_watch_callback('/test1', my_callback, start_revision=3386254)
再次执行测试程序,输出如下(收到一个RevisionCompactedError异常)。
[postgres@sndsdevdb18 python-etcd3-master]$ python3 mytest.py
INFO:root:value of key /test1:b'0'
INFO:root:KVMeta of key /test1: --key:b'/test1'
--create_revision:3386233
--mod_revision:3386260
--version:25
--lease_id:0
--response_header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386260
raft_term: 447849
INFO:root:mod_revision of key /test1:3386260
INFO:root:current revision:3386260
INFO:root:press any key to exit
INFO:root:---------------------my_callback():
INFO:root:got response:
INFO:root:got response member: --compacted_revision:3386255
INFO:root:---------------------content of response_or_err:
INFO:root:got RevisionCompactedError:3386255
在测试程序启动后,重启etcd进程。测试程序输出如下(收到一个其他error)
INFO:root:---------------------my_callback():
INFO:root:got response:<_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "{"created":"@1605106219.660601735","description":"Error received from peer ipv4:10.37.163.43:2379","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Socket closed","grpc_status":14}"
>
INFO:root:got response member: --_state:<grpc._channel._RPCState object at 0x7f2ec03ab0b8>
--_call:<grpc._cython.cygrpc.IntegratedCall object at 0x7f2ec03ab1d0>
--_response_deserializer:<built-in method FromString of GeneratedProtocolMessageType object at 0x298cb08>
--_deadline:None
INFO:root:---------------------content of response_or_err:
INFO:root:got other Error:<_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "{"created":"@1605106219.660601735","description":"Error received from peer ipv4:10.37.163.43:2379","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Socket closed","grpc_status":14}"
>