考虑使用Logstash实现Elasticsearch的无停机索引更新操作

初次见面!

我是负责在Recruit Lifestyle负责新业务开发的@taikit。这次的圣诞节日历是我第一次尝试写文章。

首先

最近,我們正在使用Elasticsearch構建一個搜索系統。如果要更新索引的設定,這將不會應用於以前的數據。為了將更新應用到以前的數據上,我們需要重新建立索引。雖然有很多操作方法可以選擇,但我們現在考慮的是一種盡可能輕鬆的操作方法。

思考的情况

前提.png

不停止的索引更新的主要方法

在我们所调查的范围内,主要可以分为三种方法。

使用索引别名的方法

这是在官方博客上介绍的方法。不是直接在应用程序中引用索引,而是设置引用别名。这样,当想要引用新的索引时,只需更新别名,而不需要更新应用程序的代码。

以每个集群为单位切换的方法

这是Recruit公司其他部门也在使用的方法1。除了正在使用的集群外,还要创建一个应用了新索引的集群,并通过Blue/Green部署等方式来交换新旧集群。虽然相对来说比较繁琐,但考虑到Elasticsearch版本升级等运维方面的考虑,一旦建立了机制,运维就会变得轻松。

在应用层切换的方法

以下是在这篇文章中介绍的方法。通过应用程序切换索引的引用。如果更改应用程序的搜索查询或者大幅更改索引的字段,可能也需要在应用程序的代码中进行切换的情况。

适用的方法

这次我们采用了使用索引别名(Index Aliases)的方法。关于如何在集群之间切换的方式,我们使用了托管服务,Elasticsearch本身的更新会自动无停机进行,所以不需要担心版本升级时的问题。此外,为了提升搜索性能,我们预计需要频繁更新分析器(Analyzer),如果每次都要更新应用程序,会变得很复杂。

概述处理方式如下,在Logstash中创建包含时间戳的新索引,并在定期执行的脚本中读取索引名称的时间戳,然后根据是否存在新索引来切换别名。

__ (2).png

Logstash的配置

只需部署Logstash,就会进行一些设置,以便在Elasticsearch中添加新配置的索引,并同步数据。

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-8.0.11/mysql-connector-java-8.0.11.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/${DB_NAME}"
    jdbc_user => "${MYSQL_USER}"
    jdbc_password => "${MYSQL_PASSWORD}"
    schedule => "* * * * *"
    statement_filepath => "/usr/share/logstash/sql/items.sql"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["${ELASTICSEARCH_HOST}"]
    user => "${ELASTICSEARCH_USERNAME}"
    password => "${ELASTICSEARCH_PASSWORD}"
    manage_template => true
    template => "/usr/share/logstash/template/items.json"
    template_name => "items"
    template_overwrite => true
    index => "items-${INDEX_TIMESTAMP}"
    document_type => "_doc"
    document_id => "%{id}"
  }
}

请在部署时将环境变量INDEX_TIMESTAMP设置为当前时间,例如20181209045112。这样可以使索引名称的末尾带有时间戳。为了尽量将索引的设置与Logstash对齐,我们使用了索引模板来设置索引。在模板的index_patterns的时间戳部分,请预先指定通配符(例如上面的例子中为”items-*”)。

我们设置了tracking_column,只同步自上次同步时点之后更新的记录。在部署Logstash时,通过重置最终同步日期来将所有记录同步到新的索引中。对于使用容器运行Logstash的情况,由于记录最终同步日期的文件无法继承,因此不需要操作来进行重置。

更新Alias的脚本

读取Index名的时间戳并将Alias替换为最新的Index,然后删除旧的Index。这个脚本我在参考了Cookpad公司的这篇文章后完成。

require 'elasticsearch'
require 'uri'
require_relative '../index_manager'

ES_URL = ENV.fetch('ES_URL')
ES_USER = ENV.fetch('ES_USER')
ES_PASSWORD = ENV.fetch('ES_PASSWORD')
TABLE_NAME = ENV.fetch('TABLE_NAME')

uri = URI.parse(ES_URL)
uri.user = ES_USER
uri.password = ES_PASSWORD

client = Elasticsearch::Client.new(url: uri.to_s)

index_manager = IndexManager.new(TABLE_NAME, client)
index_manager.switch_alias_to_latest
index_manager.delete_old_indexes
class IndexManager
  def initialize(name, client)
    @name = name
    @client = client
  end

  def switch_alias_to_latest
    latest_index_cache = latest_index
    return if indexes_in_alias == [latest_index_cache]

    switch_alias(latest_index_cache)
  end

  def delete_old_indexes
    old_indexes.map do |index|
      @client.indices.delete(index: index)
    end
  end

  private

  def alias_name
    "#{@name}-latest"
  end

  def indexes
    @client.indices.get(index: "#{@name}-*").keys
  end

  def indexes_in_alias
    @client.indices.get_alias(index: alias_name).keys
  rescue Elasticsearch::Transport::Transport::Errors::NotFound
    []
  end

  def latest_index
    latest_date = indexes.map { |index| index_timestamp(index) }.max
    "#{@name}-#{latest_date}"
  end

  def old_indexes
    latest_timestamp = index_timestamp(latest_index)
    indexes.select { |index| index_timestamp(index) < latest_timestamp }
  end

  def switch_alias(new_index)
    actions = []
    indexes_in_alias.each do |old_index|
      actions << { remove: { index: old_index, aliases: alias_name } }
    end
    actions << { add: { index: new_index, aliases: alias_name } }
    @client.indices.update_aliases(body: { actions: actions })
  end

  def index_timestamp(index)
    index.match(/#{@name}-(\d{14})/)[1]
  end
end

通过在Cron等工具中定期执行此脚本,可以在创建新的索引时自动切换到别名的索引,并删除旧的索引。为了防患于未然,如果考虑回退的情况,最好不要删除索引。

即使同期未完成,也会更新别名。

如果定期运行上述脚本,当目标数据较多时,在Logstash的同步完成之前切换别名,可能会导致搜索结果不完整。在脚本中,需要进行比较新旧文档数,确保旧索引的最后一个ID包含在新索引中的处理(如果可能的话,我还想添加)。

Rails 的配置

使用gem的elasticsearch-model时的配置。将Alias名称设置为index_name。

class Item < ApplicationRecord
    include Elasticsearch::Model
    index_name 'items-latest'
    document_type '_doc'
end

结束

我考虑了与Logstash结合使用的无停机索引更新方法。虽然还在试验和探索中,但我写了一篇文章。如果有其他类似的操作方法,请在评论中告诉我,我会很高兴的。

只需要一個選擇:
以下是原文的中文改寫版本:可以在這個網址找到: https://www.slideshare.net/recruitcojp/elasticsearch-56355817/44 ↩

广告
将在 10 秒后关闭
bannerAds