将MySQL(MariaDB)表中的数据导入到Elasticsearch索引中

下面将介绍一个从MariaDB表数据导入Elasticsearch的Python脚本方法。
希望这可以作为RDB数据索引化时的参考。

我是通过参考以下链接安装了MariaDB并准备了示例数据:
在Ubuntu 18.04 LTS上安装了MariaDB 10.3
https://qiita.com/cherubim1111/items/61cbc72673712431d06e
【SQL】使用MySQL官方示例数据库
https://qiita.com/yukibe/items/fc6016348ecf4f3b10bf
从外部连接到MySQL服务器
https://qiita.com/tocomi/items/0c009d7299584df49378

我希望能够完整地导入上述示例数据库中的city表。

事前准备

假设事先已经安装了Python3和python3-pip。
请安装脚本所需的MySQL(MariaDB)和Elasticsearch的Python库。
值得注意的是,MySQL的Python库有多个选择,但个人觉得pymysql是最稳定的。

pip3 install pymysql
pip3 install elasticsearch

1. 城市索引的映射定义

城市表的结构如下所示。

MariaDB [world]> desc city;
+-------------+----------+------+-----+---------+----------------+
| Field       | Type     | Null | Key | Default | Extra          |
+-------------+----------+------+-----+---------+----------------+
| ID          | int(11)  | NO   | PRI | NULL    | auto_increment |
| Name        | char(35) | NO   |     |         |                |
| CountryCode | char(3)  | NO   | MUL |         |                |
| District    | char(20) | NO   |     |         |                |
| Population  | int(11)  | NO   |     | 0       |                |
+-------------+----------+------+-----+---------+----------------+
5 rows in set (0.005 sec)

在定义mapping时,我们将Name、CountryCode和District的类型设为keyword,将Population的类型设为integer。

虽然也可以考虑将它们定义为text而不是keyword,但考虑到国名等不是文章,使用keyword已经足够。

需要注意的是,ID被预期用作文档ID,它不被包含在mapping的定义内容中。

定义城市索引的Python脚本如下所示:

#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

# mappingの定義クエリ
city = {}
city["settings"] = {}
city["settings"]["number_of_shards"] = 3
city["settings"]["number_of_replicas"] = 1

city["mappings"] = {}
city["mappings"]["properties"] = {}
city["mappings"]["properties"]["Name"] = {}
city["mappings"]["properties"]["Name"]["type"] = "keyword"
city["mappings"]["properties"]["CountryCode"] = {}
city["mappings"]["properties"]["CountryCode"]["type"] = "keyword"
city["mappings"]["properties"]["District"] = {}
city["mappings"]["properties"]["District"]["type"] = "keyword"
city["mappings"]["properties"]["Population"] = {}
city["mappings"]["properties"]["Population"]["type"] = "integer"

# クエリの内容を表示
pprint("クエリの内容を表示")
pprint(city)
print()

# クエリの実行
response = es.indices.create(index='city',body=city)

# クエリのレスポンスの表示
pprint("レスポンスの内容を表示")
pprint(response)

运行此脚本将得到以下结果。

$ ./putCityMapping.py 
'クエリの内容を表示'
{'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
                             'District': {'type': 'keyword'},
                             'Name': {'type': 'keyword'},
                             'Population': {'type': 'integer'}}},
 'settings': {'number_of_replicas': 1, 'number_of_shards': 3}}

'レスポンスの内容を表示'
{'acknowledged': True, 'index': 'city', 'shards_acknowledged': True}

获取城市索引映射的定义内容。

接下来,我们将创建一个脚本来显示创建的映射定义内容。
具体内容如下:

#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

response = es.indices.get_mapping(index='city')

pprint(response)

执行此脚本将获得以下类似结果。 (Zhè sì .)

$ ./getCityMapping.py
{'city': {'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
                                      'District': {'type': 'keyword'},
                                      'Name': {'type': 'keyword'},
                                      'Population': {'type': 'integer'}}}}}

3. 创建城市文档(只需一个)。

我们将在创建的城市索引中创建一个文档。
我们决定不直接从城市表中导入所有数据,而是使用SQL显示的数据。

以下是需要进行文档化的数据。

MariaDB [world]> select  * from city where id=1;
+----+-------+-------------+----------+------------+
| ID | Name  | CountryCode | District | Population |
+----+-------+-------------+----------+------------+
|  1 | Kabul | AFG         | Kabol    |    1780000 |
+----+-------+-------------+----------+------------+
1 row in set (0.004 sec)

创建此数据文件的脚本如下所示。

#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

index_id = "1"
kabul_doc = {}
kabul_doc["Name"] = "Kabul"
kabul_doc["CountryCode"] = "AFG"
kabul_doc["District"] = "Kabol"
kabul_doc["Population"] = 1780000
pprint(kabul_doc)
print()

response = es.index(index="city", doc_type="_doc", id=index_id,body=kabul_doc)
pprint(response)

执行该脚本会得到如下结果。

$ ./putKabulDoc.py 
{'CountryCode': 'AFG',
 'District': 'Kabol',
 'Name': 'Kabul',
 'Population': 1780000}

{'_id': '1',
 '_index': 'city',
 '_primary_term': 1,
 '_seq_no': 0,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': '_doc',
 '_version': 1,
 'result': 'created'}

4. 查看文件

我会创建一个从Elasticsearch获取Kabul文档的脚本。
这里我们将使用获取id=1的文档的查询,而不是搜索查询。

#!/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

response = es.get(index='city',id='1')
pprint(response)

执行此脚本将获得以下类似的结果。

$ ./getId1Doc.py 
{'_id': '1',
 '_index': 'city',
 '_primary_term': 1,
 '_seq_no': 0,
 '_source': {'CountryCode': 'AFG',
             'District': 'Kabol',
             'Name': 'Kabul',
             'Population': 1780000},
 '_type': '_doc',
 '_version': 1,
 'found': True}

使用Python从city表中获取数据。

在从city表中导入全部数据之前,我想先确保从表中获取数据的方法。

#!/usr/bin/python3
import pymysql.cursors
from pprint import pprint

# MySQL(MariaDB)接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザー名", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()

sql = 'SELECT * FROM city'
cursor.execute(sql)

city = cursor.fetchall()

pprint(city)

当执行此脚本时,将获得下述结果。

$ ./getCityTable.py
[{'CountryCode': 'AFG',
  'District': 'Kabol',
  'ID': 1,
  'Name': 'Kabul',
  'Population': 1780000},
 {'CountryCode': 'AFG',
  'District': 'Qandahar',
  'ID': 2,
  'Name': 'Qandahar',
  'Population': 237500},
 {'CountryCode': 'AFG',
  'District': 'Herat',
  'ID': 3,
  'Name': 'Herat',
  'Population': 186800},

(省略)

 {'CountryCode': 'PSE',
  'District': 'Rafah',
  'ID': 4079,
  'Name': 'Rafah',
  'Population': 92020}]

6. 所有数据的导入 de

终于了,我想用脚本导入city表的所有数据。
脚本内容如下所示。

#!/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from pprint import pprint

# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200'])

# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()

# cityデータをレコード単位でループ
for city_record in city_table:
    index_id = str(city_record["ID"])

    city_doc = {}
    city_doc["Name"] = city_record["Name"]
    city_doc["CountryCode"] = city_record["CountryCode"]
    city_doc["District"] = city_record["District"]
    city_doc["Population"] = city_record["Population"]
    response = es.index(index="city", doc_type="_doc", id=index_id, body=city_doc)
    print(response)

执行此脚本会获得以下结果。

# ./importFromCityTable.py 
{'_index': 'city', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '3', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

(省略)

{'_index': 'city', '_type': '_doc', '_id': '4078', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1350, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '4079', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1379, '_primary_term': 1}

我希望在这里跳过对导入数据的确认。
请编辑getId1Doc.py或使用Kibana控制台进行确认。

使用Bulk API进行导入。

如果是类似于city表那样的规模,使用importFromCityTable.py的话,对于每个文档进行一次PUT操作的时间不会太长,但由于每次通信都会发生,所以当记录数或单个记录的大小变大时,会花费更长的时间。
在这种情况下,使用Bulk API,将文档以数组形式批量PUT,可以节省导入时间。
使用Bulk API的脚本如下所示:

#!/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pprint import pprint

# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200'])

# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()

# Bulk送信用のドキュメントの配列
bulk_doc = []

# bulkで送信するドキュメント数とカウンター
bulk_number = 1000
n = 1

# cityデータをレコード単位でループ
for city_record in city_table:
    index_id = str(city_record["ID"])

    city_doc = {}
    city_doc["Name"] = city_record["Name"]
    city_doc["CountryCode"] = city_record["CountryCode"]
    city_doc["District"] = city_record["District"]
    city_doc["Population"] = city_record["Population"]

    # bulk配列に追加
    bulk_doc.append({"_index":"city", "_type":"_doc", "_id":index_id, "_source":city_doc})

    # 格納数がbulk_numberに達したら送信、未達なら次のcity_doc生成とbulk配列へのappendに進む
    if n < bulk_number:
        n = n + 1
    else :
        response = helpers.bulk(es, bulk_doc)
        pprint(response)
        n = 1
        bulk_index = []

# bulk_numberに達しなうでループが終わってしまった分を送信
response = helpers.bulk(es, bulk_doc)
pprint(response)

运行此脚本将得到如下结果:

$ ./bulkImportFromCityTable.py 
(1000, [])
(2000, [])
(3000, [])
(4000, [])
(4079, [])

根据实际执行环境,我认为这个选项会更快地完成导入操作。

广告
将在 10 秒后关闭
bannerAds