> es的维护除了定期清理索引之外,还需要对重要的索引日志进行备份,方便有需要时查看,我们可以使用es自带的快照功能,也可以使用开源工具[Elasticsearch-dump](https://github.com/taskrabbit/elasticsearch-dump),本次我们介绍es自带的快照功能备份集群 #### ES备份 若要使用es自带的备份功能,则需要在elasticsearch.yml配置文件中添加 ``` path.repo: ["共享存储路径"] ``` 如果你的集群有三台机器,那么这三台机器都需要对这个共享存储目录有写权限,也就是说在这三台机器上都要存在这个目录 一般使用nfs共享,然后在所有es集群的机器上挂载这个nfs目录 #### 理论部分 1. 备份的[API文档地址](https://www.elastic.co/guide/cn/elasticsearch/guide/current/backing-up-your-cluster.html) 2. 这里我使用python实现快照备份和删除的功能,使用shell去控制备份的规则 #### Shell ``` 1. 每天凌晨十二点半开始执行脚本, 系统磁盘空间使用率超过阈值 就把最后一个applog索引制作快照 2. 检测快照状态,若上一个快照完成,则把这个索引删除 3. 继续检测空间状态,若系统磁盘空间使用率不大于阈值,则继续上两步, 4. 当系统磁盘空间使用率小于80%时,停止脚本 ``` #### Python 制作快照(备份索引) ``` 1. 输入索引,让其开始制作快照 2. 制作快照之前每次检测快照仓库是否存在 ``` 恢复快照(还原索引) ``` 1. 阻塞式还原快照,等到快照制作结束,脚本才收到结果 ``` #### Shell实现 ``` ~]# cat es-backup-shell.sh #!/bin/bash # 磁盘报警阈值 disk_threshold=84 # es主机上日志保留天数 reserved_date=167 # es主机和端口 es_host='10.0.33.150' es_port='9200' # 要备份的es索引的正则表达式 date_pattern='[0-9]{4}(.[0-9]{2}){2}' es_index_pattern="applog-${date_pattern}" backup(){ echo "`date +%F-%H:%m:%S` 开始制作快照 $1" python ./es-backup-restore.py backup $1 if [ $? -ne 0 ];then echo "`date +%F-%H:%m:%S` 快照备份失败, 索引为 $1" exit else echo " 制作快照完成, 索引为 $1" fi } # 获取最早的一个索引 get_delete_index(){ delete_index_name=`curl -fs http://${es_host}:${es_port}/_cat/indices | grep -Eo "${es_index_pattern}" | sort | head -n 1` echo ${delete_index_name} } # 传入日期 2019.06.01,判断日志是否在保留时间之内 reserved_date_check(){ r_date=$1 date_string="`echo ${r_date} | tr '.' '-'` 00:00:00" timestamp=`date -d "${date_string}" +%s` day_ago_timestamp=`date +%s -d "${reserved_date} day ago"` if [ ${timestamp} -ge ${day_ago_timestamp} ];then echo "False" fi } delete_index(){ if echo "${1}" | grep -qE "${es_index_pattern}";then echo "`date +%F-%H:%m:%S` 开始删除索引 $1" curl -X DELETE "http://${es_host}:${es_port}/$1" # echo "`date +%F-%H:%m:%S` 删除索引 $1 成功!!!!!!!!!" else echo "索引错误,不会删除该索引, 索引为:$1" fi } check(){ while true; do # 两台机器的磁盘使用率 disk1_usage=`df -h | grep '/mnt/hd' | awk '{print $5}' | sed 's/%//'` disk2_usage=`ssh root@10.0.33.151 'df -h | grep "/mnt/hd" | awk "{print \\$5}" | sed "s/%//"'` if [ ${disk1_usage} -gt ${disk_threshold} -o ${disk2_usage} -gt ${disk_threshold} ];then echo "`date +%F-%H:%m:%S` 磁盘使用率标准为(${disk_threshold}%),已超标(disk1_usage: ${disk1_usage}% disk2_usage:${disk2_usage}%),开始备份计划! " index=`get_delete_index` index_date=`echo "${index}" | grep -Eo "${date_pattern}"` result=`reserved_date_check ${index_date}` echo "`date +%F-%H:%m:%S` 获得备份的索引为 $index" if [[ "${result}" == "False" ]];then echo "不备份保留时间${reserved_date}天之内的日志,本次备份中止!" exit else backup ${index} fi if [ $? -eq 0 ];then delete_index ${index} else echo "`date +%F-%H:%m:%S` 快照制作错误 ${index}" echo "`date +%F-%H:%m:%S` 不删除索引, 备份中止" exit fi echo "`date +%F-%H:%m:%S` 等待磁盘使用率恢复" sleep 10 else echo "磁盘使用率已恢复,(disk1_usage: ${disk1_usage}% disk2_usage:${disk2_usage}%)" break fi echo done echo echo echo } main(){ check } main ``` #### Python 此脚本和上面的shell脚本放在同级目录下,确保能被shell调用,再此之前,需要安装request库 ``` pip install requests ``` ``` ~]# cat es-backup-restore.py #coding=utf-8 import requests import time from datetime import datetime from datetime import timedelta import sys class ES: def __init__(self, host, port, path_repo, create_repo_name): self.host = host self.port = port self.path_repo = path_repo self.create_repo_url = '/_snapshot/{}'.format(create_repo_name) self.create_repo() # 创建快照仓库 def create_repo(self): create_data = { "type": "fs", "settings": { "location": self.path_repo } } check_url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url) r = requests.get(check_url) if r.status_code != 200: url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url) r = requests.post(url, json=create_data, headers=self._headers()) print('创建快照仓库', r.text) # 备份快照,一次只能传递一个索引 def backup(self, indices): if indices == '*': raise ValueError('不允许备份所有索引') if len(indices.split(',')) == 1: # 根据索引名字截取日期 tarsclient-applog-2019.06.27 date = indices.split('-')[-1] else: raise ValueError("不支持多个索引备份") index = {"indices": indices} snapshot_name = indices backup_index_url='{}/{}'.format(self.create_repo_url, snapshot_name) # 阻塞调用,等待快照制作完成 url = 'http://{}:{}{}?wait_for_completion=true'.format(self.host, self.port, backup_index_url) self.open_index(indices) r = requests.post(url, json=index, headers=self._headers()) self.result(r, '备份快照', indices) # es 5.5版本测试的时候发现被关闭的索引不能直接备份,会抛出异常,但是es 7.x版本可以不打开索引直接制作快照 def open_index(self, index): url = 'http://{}:{}/{}/_open'.format(self.host, self.port, index) r = requests.post(url) time.sleep(10) # 恢复快照 def restore(self, indices): snapshot_name = indices # 阻塞调用,等待快照恢复完成 url = 'http://{}:{}{}/{}/_restore?wait_for_completion=true'.format(self.host, self.port, self.create_repo_url, snapshot_name) r = requests.post(url) self.result(r, '恢复快照', indices) def result(self, r, action, indices): if 200 <= r.status_code < 300: print('[成功] {} {} '.format(action, indices)) else: print('[失败] {} {} 错误信息:{}'.format(action, indices, r.text)) sys.exit(3) @staticmethod def get_date(days=0): s = datetime.now() - timedelta(days=days) day = s.date().isoformat() return day def _headers(self): return {"Content-Type": "application/json"} def _curl(self): pass host='127.0.0.1' port='9200' path_repo = '/backup/es-index' backup_repo_name = 'my_backup' es = ES(host, port, path_repo, backup_repo_name) if len(sys.argv) != 3: print('输入参数个数错误') sys.exit(1) else: if sys.argv[1] != 'backup' and sys.argv[1] != 'restore': print('参数输入有误, {}, 不支持的操作'.format(sys.argv[1])) sys.exit(2) getattr(es, sys.argv[1])(sys.argv[2]) ``` #### 运行脚本 ``` ~]# bash es-backup-shell.sh ``` Last modification:July 1st, 2019 at 03:29 pm © 允许规范转载 Support 如果觉得我的文章对你有用 ×Close Appreciate the author Sweeping payments