【Rails】让Elasticsearch进行批量更新

2020年4月29日 公开

要做的事情(想做的事情)

    • rails, elasticsearch-ruby関連の gem を使用

XxxxSearchable module 周りをできるだけキレイに作る
Elasticsearch にインデックスを作成する
複数ドキュメントの更新を bulk update で高速に処理させる

不做的事情

    Elasticsearch での検索に関して(今回は全く触れていません)

环境

    • ruby 2.7.0

 

    • rails 6.0.2

 

    • elasticsearch-model 7.1.0

 

    elasticsaerch-rails 7.1.0

假设在本地环境中已经搭建好了Rails应用程序、数据库和Elasticsearch的运行环境。

尽量将HogeSearchable模块的周围设计得更加美观整洁。

处理模型的关系图

如果我画错了ER图,请原谅我。

er_test.png

会员模型

添加配置以使用elasticsearch-model功能来对Member模型进行索引。

class Member < ActiveRecord::Base
  include Es::MemberSearchable
end

会员搜索模块

创建 MemberSearchable 模块。

以下是我个人的观点。

app/models/concerns/以下に有象無象にモデルが溜まるのがあまり好きではないので、XxxxSearchable用にapp/models/es/というディレクトリを新たに作成している

XxxxSearchableで使い回せるようなクラスメソッドを定義するための Es::SearchableBase module を include する(後で作る)

as_indexed_json メソッドの中身を定義するための Es::MemberFormer module を include する(後で作る)

module Es::MemberSearchable
  extend ActiveSupport::Concern

  included do
    include Elasticsearch::Model
    include Es::SearchableBase
    include Es::Formers::MemberFormer

    # index名
    index_name 'members'

    settings do
      # フィールドの型を静的に定義する
      mappings dynamic: 'false' do
        indexes :name, analyzer: 'kuromoji', type: 'text' # 氏名
        indexes :watched_movie_ids, type: 'integer' # 見たことがある映画のID
        indexes :watched_movie_genre_ids, type: 'integer' # 見たことがある映画のジャンルのID
      end
    end

    # DBからElasticsearchへのデータインポート時に渡す値の設定
    def as_indexed_json(_option = {})
      create_data_hash.as_json
    end
  end
end

搜索基本模块

将可重复使用的方法划分为SearchableBase。使用方法类似于类方法,例如Member.create_es_index!或Member.get_es_mapping。

module Es::SearchableBase
  extend ActiveSupport::Concern

  class_methods do

    # index作成メソッド
    def create_es_index
      __elasticsearch__.client.indices.create(
        index: self.index_name,
        body: {
          settings: self.settings.to_hash,
          mappings: self.mappings.to_hash,
        }
      )
    end

    # indexを削除し、作成し直すメソッド(ドキュメント消えるので注意)
    def create_es_index!
      begin
        __elasticsearch__.client.indices.delete(index: self.index_name)
      rescue StandardError
        nil
      end

      self.create_es_index
    end

    # mappingを確認するメソッド
    def get_es_mapping
      __elasticsearch__.client.indices.get_mapping(index: index_name)
    end

    # mappingの再定義をするメソッド(新たなmappingを追加するときのみ使う)
    def put_es_mapping
      __elasticsearch__.client.indices.put_mapping(
        index: index_name,
        body:  self.mappings.to_hash,
      )
    end

    # documentの更新をするメソッド
    def update_es_documents
      transform = lambda do |target|
        { update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
      end

      __elasticsearch__.import(transform: transform)
    end
  end
end

以原生中文重新解释上述内容,只需一种选择:

Es::MemberFormer模块

在常见的 XxxxSearchabel 的设计中,通常在 as_indexed_json 方法中直接编写哈希表。然而,当要对20或30个字段进行索引时,情况就会变得很复杂(尽管本例中只有3个字段)。私有方法用于获取数据也会增加,导致代码行数增多,跟踪代码变得困难。

为了避免这种情况发生,我们将进行分工。
我们希望尽可能地分配任务,并将每个任务尽可能地保持简单。

module Es::Formers::MemberFormer
  extend ActiveSupport::Concern

  def create_data_hash
    {
      name: self.name,
      watched_movie_ids: self.member_watched_movies.ids,
      watched_movie_genre_ids: watched_movie_genre_ids_array,
    }
  end

  private

  def watched_movie_genre_ids_array
    self.member_watched_movies.map { |movie| movie.genre.id }.uniq.sort
  end
end

到目前为止的目录结构

app
├ controllers
├ ...
└ models
  ├ concerns
  ├ es
  │ ├ formers
  │ │ └ member_former.rb
  │ ├ member_searchable.rb
  │ └ searchable_base.rb
  ├ member.rb
  ├ movie.rb
  └ ...

使用Elasticsearch创建索引。

我会创建用于建立索引和导入数据的rake任务。

namespace :es do
  namespace :members do
    # インデックス作成用
    task create_index: :environment do
      Member.create_es_index!
    end

    # データインポート用
    # いきなり全件いれたくなければ、allじゃなくてwhereで絞ってください
    task import: :environment do
      Member.all.__elasticsearch__.import
    end
  end
end

执行创建索引的rake任务。↓

$ bundle exec rake es:members:create_index

我会确认指数是否已被创建。↓

$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'

只要成员索引显示出来,就表示成功了。 文档计数应该为0。

执行数据导入的rake任务↓

$ budle exec rake es:members:import

我将确认数据已经导入。↓

$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'

如果 docs.count 不为 0,则可以。

如果你想删除索引,请执行以下操作。↓

$ curl -X DELETE 'localhost:9200/members'

使用批量更新以高效地处理多个文档的更新

批量更新的执行

创建用于批量更新的rake任务。

namespace :es do
  namespace :members do

    # 省略

    # ドキュメントのアップデート用
    # whereの範囲は適当
    task update: :environment do
      Member.where(id: 1..10).update_es_documents
    end
  end
end

如果执行这个 rake 任务,可以进行批量处理更新。
如果没有变更差异的文档被更新的目标,那么该文档的版本不会增加。
(如果执行的是 index API 而不是 update API,那么没有变化差异的文档的版本将会增加)

在这里使用的是 update_es_documents 方法,但实际上这个方法之前已经在 Es::SearchableBase 模块中定义了。

我們將檢視正在進行的處理方式。

__elasticsearch__模块的import方法的选项设置。

(上面提到的 Es::SearchableBase 模块的部分代码如下)

module Es::SearchableBase
  extend ActiveSupport::Concern

  class_methods do

    # 省略

    # documentの更新をするメソッド
    def update_es_documents
      transform = lambda do |target|
        { update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
      end

      __elasticsearch__.import(transform: transform)
    end
  end
end

调用的处理是elasticsearch-model gem的import方法。(我们之前在创建索引后使用了这个rake任务来导入数据)

这次我们使用 import 方法,并在 transform 选项的参数中传递了 proc。

指定哈希表首部的 “update” 键是重要的,因为这将调用 Elasticsearch 的更新 API。

import 方法最初是为了调用 bulk API 而设计的规格。

因此,我们可以通过这种方法实现批量更新。

顺便提一下,如果不特别指定 transform 作为选项,默认情况下似乎是指定 index 操作。
(参考链接:https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/adapters/active_record.rb#L111-L113)

一件一件 vs 一次性

虽然我们能够实现批量更新,但最终速度如何,这仍然是一个令人担忧的问题。
我进行了简单的比较。

条件 的翻译:
– 情况
– 规定
– 要求
– 设施

Note: The word “条件” has multiple translations depending on the context. Therefore, it is recommended to consider the specific context in order to choose the most accurate translation.

    • 1000件ドキュメントがインデックスされている

testという名前のtext型のフィールドがある
1000件ともtestを同じ内容に変更する
計測には ruby の benchmark を使用

逐件处理

我将简要解释一下在这里使用的update_document方法。

如果在模型中包括Elasticsearch::Model::Callbacks,那么在模型的after_create或after_delete时,elasticsearch-model会自动调用提供的方法,以便进行数据库和Elasticsearch的同步。
update_document方法会在after_update时被调用。

其实,在使用类似于下面的代码的 update_document 方法时,内部逻辑会执行 index API 而不是 update API…
(参考:https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/indexing.rb#L425-L444)

请多考虑一下作为比较对象的问题吧,如果有好的方法,还请告诉我,谢谢您的帮助。

代码
程序 (Programa)
程式码 Mǎ)

namespace :es do
  namespace :members do
    task test1: :environment do
      target_ids = 1..1000

      Benchmark.bm 10 do |r|
        r.report 'test' do
          Member.where(id: target_ids).find_each do |member|
            member.__elasticsearch__.update_document
          end
        end
      end
    end
  end
end

执行时间

$ bundle exec rake es:members:test1
                 user     system      total        real
test         4.976838   2.255107   7.231945 ( 63.745111)

如果是一次性的情况

代码 (daima)

namespace :es do
  namespace :members do
    task test2: :environment do
      target_ids = 1..1000

      Benchmark.bm 10 do |r|
        r.report 'test' do
          Member.where(id: target_ids).update_es_documents
        end
      end
    end
  end
end

执行时间

$ bundle exec rake es:members:test2
                 user     system      total        real
test         0.251815   0.063858   0.315673 (  1.568112)
结果

批量处理速度真是超快的呢。

最后

我尽可能地精心设计了与Elasticsearch相关的类,并整理了将数据批量更新到索引的部分。

虽然我已经做到了这一步,但我并不认为这是正确的解决方法。
模块名和方法名等等也很随意,目录结构也还有改进的空间。
如果有更好的方法,请告诉我,我会非常高兴的。

当你安装了RuboCop,我认为会有很多地方会被指责,所以请你自行处理好这些地方。(例如增加上限或者排除某些情况)

非常感谢您一直观看到最后。

广告
将在 10 秒后关闭
bannerAds