es的维护除了定期清理索引之外,还需要对重要的索引日志进行备份,方便有需要时查看,我们可以使用es自带的快照功能,也可以使用开源工具Elasticsearch-dump,本次我们介绍es自带的快照功能备份集群
ES备份
若要使用es自带的备份功能,则需要在elasticsearch.yml配置文件中添加
path.repo: ["共享存储路径"]
如果你的集群有三台机器,那么这三台机器都需要对这个共享存储目录有写权限,也就是说在这三台机器上都要存在这个目录 一般使用nfs共享,然后在所有es集群的机器上挂载这个nfs目录
理论部分
- 备份的API文档地址
- 这里我使用python实现快照备份和删除的功能,使用shell去控制备份的规则
Shell
1. 每天凌晨十二点半开始执行脚本, 系统磁盘空间使用率超过阈值 就把最后一个applog索引制作快照
2. 检测快照状态,若上一个快照完成,则把这个索引删除
3. 继续检测空间状态,若系统磁盘空间使用率不大于阈值,则继续上两步,
4. 当系统磁盘空间使用率小于80%时,停止脚本
Python
制作快照(备份索引)
1. 输入索引,让其开始制作快照
2. 制作快照之前每次检测快照仓库是否存在
恢复快照(还原索引)
1. 阻塞式还原快照,等到快照制作结束,脚本才收到结果
Shell实现
~]# cat es-backup-shell.sh
#!/bin/bash
# 磁盘报警阈值
disk_threshold=84
# es主机上日志保留天数
reserved_date=167
# es主机和端口
es_host='10.0.33.150'
es_port='9200'
# 要备份的es索引的正则表达式
date_pattern='[0-9]{4}(.[0-9]{2}){2}'
es_index_pattern="applog-${date_pattern}"
backup(){
echo "`date +%F-%H:%m:%S` 开始制作快照 $1"
python ./es-backup-restore.py backup $1
if [ $? -ne 0 ];then
echo "`date +%F-%H:%m:%S` 快照备份失败, 索引为 $1"
exit
else
echo " 制作快照完成, 索引为 $1"
fi
}
# 获取最早的一个索引
get_delete_index(){
delete_index_name=`curl -fs http://${es_host}:${es_port}/_cat/indices | grep -Eo "${es_index_pattern}" | sort | head -n 1`
echo ${delete_index_name}
}
# 传入日期 2019.06.01,判断日志是否在保留时间之内
reserved_date_check(){
r_date=$1
date_string="`echo ${r_date} | tr '.' '-'` 00:00:00"
timestamp=`date -d "${date_string}" +%s`
day_ago_timestamp=`date +%s -d "${reserved_date} day ago"`
if [ ${timestamp} -ge ${day_ago_timestamp} ];then
echo "False"
fi
}
delete_index(){
if echo "${1}" | grep -qE "${es_index_pattern}";then
echo "`date +%F-%H:%m:%S` 开始删除索引 $1"
curl -X DELETE "http://${es_host}:${es_port}/$1"
# echo "`date +%F-%H:%m:%S` 删除索引 $1 成功!!!!!!!!!"
else
echo "索引错误,不会删除该索引, 索引为:$1"
fi
}
check(){
while true;
do
# 两台机器的磁盘使用率
disk1_usage=`df -h | grep '/mnt/hd' | awk '{print $5}' | sed 's/%//'`
disk2_usage=`ssh root@10.0.33.151 'df -h | grep "/mnt/hd" | awk "{print \\$5}" | sed "s/%//"'`
if [ ${disk1_usage} -gt ${disk_threshold} -o ${disk2_usage} -gt ${disk_threshold} ];then
echo "`date +%F-%H:%m:%S` 磁盘使用率标准为(${disk_threshold}%),已超标(disk1_usage: ${disk1_usage}% disk2_usage:${disk2_usage}%),开始备份计划! "
index=`get_delete_index`
index_date=`echo "${index}" | grep -Eo "${date_pattern}"`
result=`reserved_date_check ${index_date}`
echo "`date +%F-%H:%m:%S` 获得备份的索引为 $index"
if [[ "${result}" == "False" ]];then
echo "不备份保留时间${reserved_date}天之内的日志,本次备份中止!"
exit
else
backup ${index}
fi
if [ $? -eq 0 ];then
delete_index ${index}
else
echo "`date +%F-%H:%m:%S` 快照制作错误 ${index}"
echo "`date +%F-%H:%m:%S` 不删除索引, 备份中止"
exit
fi
echo "`date +%F-%H:%m:%S` 等待磁盘使用率恢复"
sleep 10
else
echo "磁盘使用率已恢复,(disk1_usage: ${disk1_usage}% disk2_usage:${disk2_usage}%)"
break
fi
echo
done
echo
echo
echo
}
main(){
check
}
main
Python
此脚本和上面的shell脚本放在同级目录下,确保能被shell调用,再此之前,需要安装request库
pip install requests
~]# cat es-backup-restore.py
#coding=utf-8
import requests
import time
from datetime import datetime
from datetime import timedelta
import sys
class ES:
def __init__(self, host, port, path_repo, create_repo_name):
self.host = host
self.port = port
self.path_repo = path_repo
self.create_repo_url = '/_snapshot/{}'.format(create_repo_name)
self.create_repo()
# 创建快照仓库
def create_repo(self):
create_data = {
"type": "fs",
"settings": {
"location": self.path_repo
}
}
check_url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url)
r = requests.get(check_url)
if r.status_code != 200:
url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url)
r = requests.post(url, json=create_data, headers=self._headers())
print('创建快照仓库', r.text)
# 备份快照,一次只能传递一个索引
def backup(self, indices):
if indices == '*':
raise ValueError('不允许备份所有索引')
if len(indices.split(',')) == 1:
# 根据索引名字截取日期 tarsclient-applog-2019.06.27
date = indices.split('-')[-1]
else:
raise ValueError("不支持多个索引备份")
index = {"indices": indices}
snapshot_name = indices
backup_index_url='{}/{}'.format(self.create_repo_url, snapshot_name)
# 阻塞调用,等待快照制作完成
url = 'http://{}:{}{}?wait_for_completion=true'.format(self.host, self.port, backup_index_url)
self.open_index(indices)
r = requests.post(url, json=index, headers=self._headers())
self.result(r, '备份快照', indices)
# es 5.5版本测试的时候发现被关闭的索引不能直接备份,会抛出异常,但是es 7.x版本可以不打开索引直接制作快照
def open_index(self, index):
url = 'http://{}:{}/{}/_open'.format(self.host, self.port, index)
r = requests.post(url)
time.sleep(10)
# 恢复快照
def restore(self, indices):
snapshot_name = indices
# 阻塞调用,等待快照恢复完成
url = 'http://{}:{}{}/{}/_restore?wait_for_completion=true'.format(self.host, self.port, self.create_repo_url, snapshot_name)
r = requests.post(url)
self.result(r, '恢复快照', indices)
def result(self, r, action, indices):
if 200 <= r.status_code < 300:
print('[成功] {} {} '.format(action, indices))
else:
print('[失败] {} {} 错误信息:{}'.format(action, indices, r.text))
sys.exit(3)
@staticmethod
def get_date(days=0):
s = datetime.now() - timedelta(days=days)
day = s.date().isoformat()
return day
def _headers(self):
return {"Content-Type": "application/json"}
def _curl(self):
pass
host='127.0.0.1'
port='9200'
path_repo = '/backup/es-index'
backup_repo_name = 'my_backup'
es = ES(host, port, path_repo, backup_repo_name)
if len(sys.argv) != 3:
print('输入参数个数错误')
sys.exit(1)
else:
if sys.argv[1] != 'backup' and sys.argv[1] != 'restore':
print('参数输入有误, {}, 不支持的操作'.format(sys.argv[1]))
sys.exit(2)
getattr(es, sys.argv[1])(sys.argv[2])
运行脚本
~]# bash es-backup-shell.sh
...