es的维护除了定期清理索引之外,还需要对重要的索引日志进行备份,方便有需要时查看,我们可以使用es自带的快照功能,也可以使用开源工具Elasticsearch-dump,本次我们介绍es自带的快照功能备份集群

ES备份

若要使用es自带的备份功能,则需要在elasticsearch.yml配置文件中添加

path.repo: ["共享存储路径"]

如果你的集群有三台机器,那么这三台机器都需要对这个共享存储目录有写权限,也就是说在这三台机器上都要存在这个目录
一般使用nfs共享,然后在所有es集群的机器上挂载这个nfs目录

理论部分

  1. 备份的API文档地址
  2. 这里我使用python实现快照备份和删除的功能,使用shell去控制备份的规则

Shell

1. 每天凌晨十二点半开始执行脚本, 系统磁盘空间使用率超过阈值 就把最后一个applog索引制作快照
2. 检测快照状态,若上一个快照完成,则把这个索引删除
3. 继续检测空间状态,若系统磁盘空间使用率不大于阈值,则继续上两步,
4. 当系统磁盘空间使用率小于80%时,停止脚本

Python

制作快照(备份索引)

1. 输入索引,让其开始制作快照
2. 制作快照之前每次检测快照仓库是否存在

恢复快照(还原索引)

1. 阻塞式还原快照,等到快照制作结束,脚本才收到结果

Shell实现

~]# cat es-backup-shell.sh
#!/bin/bash


# 磁盘报警阈值
disk_threshold=84
# es主机上日志保留天数
reserved_date=167

# es主机和端口
es_host='10.0.33.150'
es_port='9200'

# 要备份的es索引的正则表达式
date_pattern='[0-9]{4}(.[0-9]{2}){2}'
es_index_pattern="applog-${date_pattern}"


backup(){
    echo "`date +%F-%H:%m:%S` 开始制作快照 $1"
    python ./es-backup-restore.py backup $1
    if [ $? -ne 0 ];then
        echo "`date +%F-%H:%m:%S` 快照备份失败, 索引为 $1"
        exit
    else
        echo " 制作快照完成, 索引为 $1"
    fi

}

# 获取最早的一个索引
get_delete_index(){
    delete_index_name=`curl -fs http://${es_host}:${es_port}/_cat/indices | grep -Eo "${es_index_pattern}" | sort |  head -n 1`
    echo ${delete_index_name}
}

# 传入日期 2019.06.01,判断日志是否在保留时间之内
reserved_date_check(){
    r_date=$1
    date_string="`echo ${r_date} | tr '.' '-'` 00:00:00"
    timestamp=`date -d "${date_string}" +%s`
    day_ago_timestamp=`date +%s -d "${reserved_date} day ago"`
    if [ ${timestamp} -ge ${day_ago_timestamp} ];then
        echo "False"
    fi
}

delete_index(){
    if echo "${1}" | grep -qE "${es_index_pattern}";then
        echo "`date +%F-%H:%m:%S` 开始删除索引 $1"
        curl -X DELETE "http://${es_host}:${es_port}/$1"
        # echo "`date +%F-%H:%m:%S` 删除索引 $1 成功!!!!!!!!!"
    else
        echo "索引错误,不会删除该索引, 索引为:$1"
    fi

}

check(){
    while true;
    do
        # 两台机器的磁盘使用率
        disk1_usage=`df -h | grep '/mnt/hd' | awk '{print $5}' | sed 's/%//'`
        disk2_usage=`ssh root@10.0.33.151 'df -h | grep "/mnt/hd" | awk "{print \\$5}" | sed "s/%//"'`

        if [ ${disk1_usage} -gt ${disk_threshold} -o ${disk2_usage} -gt ${disk_threshold} ];then
            echo "`date +%F-%H:%m:%S` 磁盘使用率标准为(${disk_threshold}%),已超标(disk1_usage: ${disk1_usage}%  disk2_usage:${disk2_usage}%),开始备份计划! "
              index=`get_delete_index`
              index_date=`echo "${index}" | grep -Eo "${date_pattern}"`
              result=`reserved_date_check ${index_date}`
              echo "`date +%F-%H:%m:%S` 获得备份的索引为 $index"
              if [[ "${result}" == "False" ]];then
                  echo "不备份保留时间${reserved_date}天之内的日志,本次备份中止!"
                  exit
              else
                  backup ${index}
              fi

              if [ $? -eq 0 ];then
                  delete_index ${index}
              else
                  echo "`date +%F-%H:%m:%S` 快照制作错误 ${index}"
                  echo "`date +%F-%H:%m:%S` 不删除索引, 备份中止"
                  exit
              fi

            echo "`date +%F-%H:%m:%S` 等待磁盘使用率恢复"
            sleep 10
        else
            echo "磁盘使用率已恢复,(disk1_usage: ${disk1_usage}%  disk2_usage:${disk2_usage}%)"
              break
          fi
          echo
      done

      echo
      echo
      echo
}

main(){
    check
}


main

Python

此脚本和上面的shell脚本放在同级目录下,确保能被shell调用,再此之前,需要安装request库

pip install requests
~]# cat es-backup-restore.py
#coding=utf-8
import requests
import time
from datetime import datetime
from datetime import timedelta
import sys


class ES:
    def __init__(self, host, port, path_repo, create_repo_name):
        self.host = host
        self.port = port
        self.path_repo = path_repo
        self.create_repo_url = '/_snapshot/{}'.format(create_repo_name)
        self.create_repo()

    # 创建快照仓库
    def create_repo(self):
        create_data = {
            "type": "fs",
            "settings": {
                "location": self.path_repo
            }
        }

        check_url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url)
        r = requests.get(check_url)
        if r.status_code != 200:
            url = 'http://{}:{}{}'.format(self.host, self.port, self.create_repo_url)
            r = requests.post(url, json=create_data, headers=self._headers())
            print('创建快照仓库', r.text)

    # 备份快照,一次只能传递一个索引
    def backup(self, indices):
        if indices == '*':
            raise ValueError('不允许备份所有索引')

        if len(indices.split(',')) == 1:
            # 根据索引名字截取日期 tarsclient-applog-2019.06.27
            date = indices.split('-')[-1]
        else:
            raise ValueError("不支持多个索引备份")

        index = {"indices": indices}

        snapshot_name = indices

        backup_index_url='{}/{}'.format(self.create_repo_url, snapshot_name)
        # 阻塞调用,等待快照制作完成
        url = 'http://{}:{}{}?wait_for_completion=true'.format(self.host, self.port, backup_index_url)
        self.open_index(indices)
        r = requests.post(url, json=index, headers=self._headers())
        self.result(r, '备份快照', indices)

    # es 5.5版本测试的时候发现被关闭的索引不能直接备份,会抛出异常,但是es 7.x版本可以不打开索引直接制作快照
    def open_index(self, index):
        url = 'http://{}:{}/{}/_open'.format(self.host, self.port, index)
        r = requests.post(url)
        time.sleep(10)

    # 恢复快照
    def restore(self, indices):
        snapshot_name = indices
        # 阻塞调用,等待快照恢复完成
        url = 'http://{}:{}{}/{}/_restore?wait_for_completion=true'.format(self.host, self.port, self.create_repo_url, snapshot_name)
        r = requests.post(url)
        self.result(r, '恢复快照', indices)


    def result(self, r, action, indices):
        if 200 <= r.status_code < 300:
            print('[成功] {} {} '.format(action, indices))
        else:
            print('[失败] {} {} 错误信息:{}'.format(action, indices, r.text))
            sys.exit(3)

    @staticmethod
    def get_date(days=0):
        s = datetime.now() - timedelta(days=days)
        day = s.date().isoformat()
        return day

    def _headers(self):
        return {"Content-Type": "application/json"}

    def _curl(self):
        pass


host='127.0.0.1'
port='9200'
path_repo = '/backup/es-index'
backup_repo_name = 'my_backup'

es = ES(host, port, path_repo, backup_repo_name)


if len(sys.argv) != 3:
    print('输入参数个数错误')
    sys.exit(1)
else:
    if sys.argv[1] != 'backup' and sys.argv[1] != 'restore':
        print('参数输入有误, {}, 不支持的操作'.format(sys.argv[1]))
        sys.exit(2)

getattr(es, sys.argv[1])(sys.argv[2])

运行脚本

~]# bash es-backup-shell.sh
最后修改:2019 年 07 月 01 日 03 : 29 PM