【Rails】让Elasticsearch进行批量更新
2020年4月29日 公开
要做的事情(想做的事情)
-
- rails, elasticsearch-ruby関連の gem を使用
XxxxSearchable module 周りをできるだけキレイに作る
Elasticsearch にインデックスを作成する
複数ドキュメントの更新を bulk update で高速に処理させる
不做的事情
- Elasticsearch での検索に関して(今回は全く触れていません)
环境
-
- ruby 2.7.0
-
- rails 6.0.2
-
- elasticsearch-model 7.1.0
- elasticsaerch-rails 7.1.0
假设在本地环境中已经搭建好了Rails应用程序、数据库和Elasticsearch的运行环境。
尽量将HogeSearchable模块的周围设计得更加美观整洁。
处理模型的关系图
如果我画错了ER图,请原谅我。
会员模型
添加配置以使用elasticsearch-model功能来对Member模型进行索引。
class Member < ActiveRecord::Base
include Es::MemberSearchable
end
会员搜索模块
创建 MemberSearchable 模块。
以下是我个人的观点。
app/models/concerns/以下に有象無象にモデルが溜まるのがあまり好きではないので、XxxxSearchable用にapp/models/es/というディレクトリを新たに作成している
XxxxSearchableで使い回せるようなクラスメソッドを定義するための Es::SearchableBase module を include する(後で作る)
as_indexed_json メソッドの中身を定義するための Es::MemberFormer module を include する(後で作る)
module Es::MemberSearchable
extend ActiveSupport::Concern
included do
include Elasticsearch::Model
include Es::SearchableBase
include Es::Formers::MemberFormer
# index名
index_name 'members'
settings do
# フィールドの型を静的に定義する
mappings dynamic: 'false' do
indexes :name, analyzer: 'kuromoji', type: 'text' # 氏名
indexes :watched_movie_ids, type: 'integer' # 見たことがある映画のID
indexes :watched_movie_genre_ids, type: 'integer' # 見たことがある映画のジャンルのID
end
end
# DBからElasticsearchへのデータインポート時に渡す値の設定
def as_indexed_json(_option = {})
create_data_hash.as_json
end
end
end
搜索基本模块
将可重复使用的方法划分为SearchableBase。使用方法类似于类方法,例如Member.create_es_index!或Member.get_es_mapping。
module Es::SearchableBase
extend ActiveSupport::Concern
class_methods do
# index作成メソッド
def create_es_index
__elasticsearch__.client.indices.create(
index: self.index_name,
body: {
settings: self.settings.to_hash,
mappings: self.mappings.to_hash,
}
)
end
# indexを削除し、作成し直すメソッド(ドキュメント消えるので注意)
def create_es_index!
begin
__elasticsearch__.client.indices.delete(index: self.index_name)
rescue StandardError
nil
end
self.create_es_index
end
# mappingを確認するメソッド
def get_es_mapping
__elasticsearch__.client.indices.get_mapping(index: index_name)
end
# mappingの再定義をするメソッド(新たなmappingを追加するときのみ使う)
def put_es_mapping
__elasticsearch__.client.indices.put_mapping(
index: index_name,
body: self.mappings.to_hash,
)
end
# documentの更新をするメソッド
def update_es_documents
transform = lambda do |target|
{ update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
end
__elasticsearch__.import(transform: transform)
end
end
end
以原生中文重新解释上述内容,只需一种选择:
Es::MemberFormer模块
在常见的 XxxxSearchabel 的设计中,通常在 as_indexed_json 方法中直接编写哈希表。然而,当要对20或30个字段进行索引时,情况就会变得很复杂(尽管本例中只有3个字段)。私有方法用于获取数据也会增加,导致代码行数增多,跟踪代码变得困难。
为了避免这种情况发生,我们将进行分工。
我们希望尽可能地分配任务,并将每个任务尽可能地保持简单。
module Es::Formers::MemberFormer
extend ActiveSupport::Concern
def create_data_hash
{
name: self.name,
watched_movie_ids: self.member_watched_movies.ids,
watched_movie_genre_ids: watched_movie_genre_ids_array,
}
end
private
def watched_movie_genre_ids_array
self.member_watched_movies.map { |movie| movie.genre.id }.uniq.sort
end
end
到目前为止的目录结构
app
├ controllers
├ ...
└ models
├ concerns
├ es
│ ├ formers
│ │ └ member_former.rb
│ ├ member_searchable.rb
│ └ searchable_base.rb
├ member.rb
├ movie.rb
└ ...
使用Elasticsearch创建索引。
我会创建用于建立索引和导入数据的rake任务。
namespace :es do
namespace :members do
# インデックス作成用
task create_index: :environment do
Member.create_es_index!
end
# データインポート用
# いきなり全件いれたくなければ、allじゃなくてwhereで絞ってください
task import: :environment do
Member.all.__elasticsearch__.import
end
end
end
执行创建索引的rake任务。↓
$ bundle exec rake es:members:create_index
我会确认指数是否已被创建。↓
$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'
只要成员索引显示出来,就表示成功了。 文档计数应该为0。
执行数据导入的rake任务↓
$ budle exec rake es:members:import
我将确认数据已经导入。↓
$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'
如果 docs.count 不为 0,则可以。
如果你想删除索引,请执行以下操作。↓
$ curl -X DELETE 'localhost:9200/members'
使用批量更新以高效地处理多个文档的更新
批量更新的执行
创建用于批量更新的rake任务。
namespace :es do
namespace :members do
# 省略
# ドキュメントのアップデート用
# whereの範囲は適当
task update: :environment do
Member.where(id: 1..10).update_es_documents
end
end
end
如果执行这个 rake 任务,可以进行批量处理更新。
如果没有变更差异的文档被更新的目标,那么该文档的版本不会增加。
(如果执行的是 index API 而不是 update API,那么没有变化差异的文档的版本将会增加)
在这里使用的是 update_es_documents 方法,但实际上这个方法之前已经在 Es::SearchableBase 模块中定义了。
我們將檢視正在進行的處理方式。
__elasticsearch__模块的import方法的选项设置。
(上面提到的 Es::SearchableBase 模块的部分代码如下)
module Es::SearchableBase
extend ActiveSupport::Concern
class_methods do
# 省略
# documentの更新をするメソッド
def update_es_documents
transform = lambda do |target|
{ update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
end
__elasticsearch__.import(transform: transform)
end
end
end
调用的处理是elasticsearch-model gem的import方法。(我们之前在创建索引后使用了这个rake任务来导入数据)
这次我们使用 import 方法,并在 transform 选项的参数中传递了 proc。
指定哈希表首部的 “update” 键是重要的,因为这将调用 Elasticsearch 的更新 API。
import 方法最初是为了调用 bulk API 而设计的规格。
因此,我们可以通过这种方法实现批量更新。
顺便提一下,如果不特别指定 transform 作为选项,默认情况下似乎是指定 index 操作。
(参考链接:https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/adapters/active_record.rb#L111-L113)
一件一件 vs 一次性
虽然我们能够实现批量更新,但最终速度如何,这仍然是一个令人担忧的问题。
我进行了简单的比较。
条件 的翻译:
– 情况
– 规定
– 要求
– 设施
Note: The word “条件” has multiple translations depending on the context. Therefore, it is recommended to consider the specific context in order to choose the most accurate translation.
-
- 1000件ドキュメントがインデックスされている
testという名前のtext型のフィールドがある
1000件ともtestを同じ内容に変更する
計測には ruby の benchmark を使用
逐件处理
我将简要解释一下在这里使用的update_document方法。
如果在模型中包括Elasticsearch::Model::Callbacks,那么在模型的after_create或after_delete时,elasticsearch-model会自动调用提供的方法,以便进行数据库和Elasticsearch的同步。
update_document方法会在after_update时被调用。
其实,在使用类似于下面的代码的 update_document 方法时,内部逻辑会执行 index API 而不是 update API…
(参考:https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/indexing.rb#L425-L444)
请多考虑一下作为比较对象的问题吧,如果有好的方法,还请告诉我,谢谢您的帮助。
代码
程序 (Programa)
程式码 Mǎ)
namespace :es do
namespace :members do
task test1: :environment do
target_ids = 1..1000
Benchmark.bm 10 do |r|
r.report 'test' do
Member.where(id: target_ids).find_each do |member|
member.__elasticsearch__.update_document
end
end
end
end
end
end
执行时间
$ bundle exec rake es:members:test1
user system total real
test 4.976838 2.255107 7.231945 ( 63.745111)
如果是一次性的情况
代码 (daima)
namespace :es do
namespace :members do
task test2: :environment do
target_ids = 1..1000
Benchmark.bm 10 do |r|
r.report 'test' do
Member.where(id: target_ids).update_es_documents
end
end
end
end
end
执行时间
$ bundle exec rake es:members:test2
user system total real
test 0.251815 0.063858 0.315673 ( 1.568112)
结果
批量处理速度真是超快的呢。
最后
我尽可能地精心设计了与Elasticsearch相关的类,并整理了将数据批量更新到索引的部分。
虽然我已经做到了这一步,但我并不认为这是正确的解决方法。
模块名和方法名等等也很随意,目录结构也还有改进的空间。
如果有更好的方法,请告诉我,我会非常高兴的。
当你安装了RuboCop,我认为会有很多地方会被指责,所以请你自行处理好这些地方。(例如增加上限或者排除某些情况)
非常感谢您一直观看到最后。