将MySQL(MariaDB)表中的数据导入到Elasticsearch索引中
下面将介绍一个从MariaDB表数据导入Elasticsearch的Python脚本方法。
希望这可以作为RDB数据索引化时的参考。
我是通过参考以下链接安装了MariaDB并准备了示例数据:
在Ubuntu 18.04 LTS上安装了MariaDB 10.3
https://qiita.com/cherubim1111/items/61cbc72673712431d06e
【SQL】使用MySQL官方示例数据库
https://qiita.com/yukibe/items/fc6016348ecf4f3b10bf
从外部连接到MySQL服务器
https://qiita.com/tocomi/items/0c009d7299584df49378
我希望能够完整地导入上述示例数据库中的city表。
事前准备
假设事先已经安装了Python3和python3-pip。
请安装脚本所需的MySQL(MariaDB)和Elasticsearch的Python库。
值得注意的是,MySQL的Python库有多个选择,但个人觉得pymysql是最稳定的。
pip3 install pymysql
pip3 install elasticsearch
1. 城市索引的映射定义
城市表的结构如下所示。
MariaDB [world]> desc city;
+-------------+----------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+----------+------+-----+---------+----------------+
| ID | int(11) | NO | PRI | NULL | auto_increment |
| Name | char(35) | NO | | | |
| CountryCode | char(3) | NO | MUL | | |
| District | char(20) | NO | | | |
| Population | int(11) | NO | | 0 | |
+-------------+----------+------+-----+---------+----------------+
5 rows in set (0.005 sec)
在定义mapping时,我们将Name、CountryCode和District的类型设为keyword,将Population的类型设为integer。
虽然也可以考虑将它们定义为text而不是keyword,但考虑到国名等不是文章,使用keyword已经足够。
需要注意的是,ID被预期用作文档ID,它不被包含在mapping的定义内容中。
定义城市索引的Python脚本如下所示:
#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
# mappingの定義クエリ
city = {}
city["settings"] = {}
city["settings"]["number_of_shards"] = 3
city["settings"]["number_of_replicas"] = 1
city["mappings"] = {}
city["mappings"]["properties"] = {}
city["mappings"]["properties"]["Name"] = {}
city["mappings"]["properties"]["Name"]["type"] = "keyword"
city["mappings"]["properties"]["CountryCode"] = {}
city["mappings"]["properties"]["CountryCode"]["type"] = "keyword"
city["mappings"]["properties"]["District"] = {}
city["mappings"]["properties"]["District"]["type"] = "keyword"
city["mappings"]["properties"]["Population"] = {}
city["mappings"]["properties"]["Population"]["type"] = "integer"
# クエリの内容を表示
pprint("クエリの内容を表示")
pprint(city)
print()
# クエリの実行
response = es.indices.create(index='city',body=city)
# クエリのレスポンスの表示
pprint("レスポンスの内容を表示")
pprint(response)
运行此脚本将得到以下结果。
$ ./putCityMapping.py
'クエリの内容を表示'
{'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
'District': {'type': 'keyword'},
'Name': {'type': 'keyword'},
'Population': {'type': 'integer'}}},
'settings': {'number_of_replicas': 1, 'number_of_shards': 3}}
'レスポンスの内容を表示'
{'acknowledged': True, 'index': 'city', 'shards_acknowledged': True}
获取城市索引映射的定义内容。
接下来,我们将创建一个脚本来显示创建的映射定义内容。
具体内容如下:
#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
response = es.indices.get_mapping(index='city')
pprint(response)
执行此脚本将获得以下类似结果。 (Zhè sì .)
$ ./getCityMapping.py
{'city': {'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
'District': {'type': 'keyword'},
'Name': {'type': 'keyword'},
'Population': {'type': 'integer'}}}}}
3. 创建城市文档(只需一个)。
我们将在创建的城市索引中创建一个文档。
我们决定不直接从城市表中导入所有数据,而是使用SQL显示的数据。
以下是需要进行文档化的数据。
MariaDB [world]> select * from city where id=1;
+----+-------+-------------+----------+------------+
| ID | Name | CountryCode | District | Population |
+----+-------+-------------+----------+------------+
| 1 | Kabul | AFG | Kabol | 1780000 |
+----+-------+-------------+----------+------------+
1 row in set (0.004 sec)
创建此数据文件的脚本如下所示。
#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
index_id = "1"
kabul_doc = {}
kabul_doc["Name"] = "Kabul"
kabul_doc["CountryCode"] = "AFG"
kabul_doc["District"] = "Kabol"
kabul_doc["Population"] = 1780000
pprint(kabul_doc)
print()
response = es.index(index="city", doc_type="_doc", id=index_id,body=kabul_doc)
pprint(response)
执行该脚本会得到如下结果。
$ ./putKabulDoc.py
{'CountryCode': 'AFG',
'District': 'Kabol',
'Name': 'Kabul',
'Population': 1780000}
{'_id': '1',
'_index': 'city',
'_primary_term': 1,
'_seq_no': 0,
'_shards': {'failed': 0, 'successful': 1, 'total': 2},
'_type': '_doc',
'_version': 1,
'result': 'created'}
4. 查看文件
我会创建一个从Elasticsearch获取Kabul文档的脚本。
这里我们将使用获取id=1的文档的查询,而不是搜索查询。
#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
response = es.get(index='city',id='1')
pprint(response)
执行此脚本将获得以下类似的结果。
$ ./getId1Doc.py
{'_id': '1',
'_index': 'city',
'_primary_term': 1,
'_seq_no': 0,
'_source': {'CountryCode': 'AFG',
'District': 'Kabol',
'Name': 'Kabul',
'Population': 1780000},
'_type': '_doc',
'_version': 1,
'found': True}
使用Python从city表中获取数据。
在从city表中导入全部数据之前,我想先确保从表中获取数据的方法。
#!/usr/bin/python3
import pymysql.cursors
from pprint import pprint
# MySQL(MariaDB)接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザー名", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
sql = 'SELECT * FROM city'
cursor.execute(sql)
city = cursor.fetchall()
pprint(city)
当执行此脚本时,将获得下述结果。
$ ./getCityTable.py
[{'CountryCode': 'AFG',
'District': 'Kabol',
'ID': 1,
'Name': 'Kabul',
'Population': 1780000},
{'CountryCode': 'AFG',
'District': 'Qandahar',
'ID': 2,
'Name': 'Qandahar',
'Population': 237500},
{'CountryCode': 'AFG',
'District': 'Herat',
'ID': 3,
'Name': 'Herat',
'Population': 186800},
(省略)
{'CountryCode': 'PSE',
'District': 'Rafah',
'ID': 4079,
'Name': 'Rafah',
'Population': 92020}]
6. 所有数据的导入 de
终于了,我想用脚本导入city表的所有数据。
脚本内容如下所示。
#!/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from pprint import pprint
# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200'])
# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()
# cityデータをレコード単位でループ
for city_record in city_table:
index_id = str(city_record["ID"])
city_doc = {}
city_doc["Name"] = city_record["Name"]
city_doc["CountryCode"] = city_record["CountryCode"]
city_doc["District"] = city_record["District"]
city_doc["Population"] = city_record["Population"]
response = es.index(index="city", doc_type="_doc", id=index_id, body=city_doc)
print(response)
执行此脚本会获得以下结果。
# ./importFromCityTable.py
{'_index': 'city', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '3', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
(省略)
{'_index': 'city', '_type': '_doc', '_id': '4078', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1350, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '4079', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1379, '_primary_term': 1}
我希望在这里跳过对导入数据的确认。
请编辑getId1Doc.py或使用Kibana控制台进行确认。
使用Bulk API进行导入。
如果是类似于city表那样的规模,使用importFromCityTable.py的话,对于每个文档进行一次PUT操作的时间不会太长,但由于每次通信都会发生,所以当记录数或单个记录的大小变大时,会花费更长的时间。
在这种情况下,使用Bulk API,将文档以数组形式批量PUT,可以节省导入时间。
使用Bulk API的脚本如下所示:
#!/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pprint import pprint
# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200'])
# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()
# Bulk送信用のドキュメントの配列
bulk_doc = []
# bulkで送信するドキュメント数とカウンター
bulk_number = 1000
n = 1
# cityデータをレコード単位でループ
for city_record in city_table:
index_id = str(city_record["ID"])
city_doc = {}
city_doc["Name"] = city_record["Name"]
city_doc["CountryCode"] = city_record["CountryCode"]
city_doc["District"] = city_record["District"]
city_doc["Population"] = city_record["Population"]
# bulk配列に追加
bulk_doc.append({"_index":"city", "_type":"_doc", "_id":index_id, "_source":city_doc})
# 格納数がbulk_numberに達したら送信、未達なら次のcity_doc生成とbulk配列へのappendに進む
if n < bulk_number:
n = n + 1
else :
response = helpers.bulk(es, bulk_doc)
pprint(response)
n = 1
bulk_index = []
# bulk_numberに達しなうでループが終わってしまった分を送信
response = helpers.bulk(es, bulk_doc)
pprint(response)
运行此脚本将得到如下结果:
$ ./bulkImportFromCityTable.py
(1000, [])
(2000, [])
(3000, [])
(4000, [])
(4079, [])
根据实际执行环境,我认为这个选项会更快地完成导入操作。