Python Elasticsearch 使用方法概述
首先
由于在Python中有机会使用Elasticsearch,我进行了信息收集。然而,我发现所使用的Elasticsearch版本过旧,或者信息本来就很少。因此,本次我整理了一些基本用法,并附上了简单的例子作为备忘录。
通过阅读这篇文章,您将能够使用低级别客户端(elasticsearch-py)对索引和文档进行CRUD操作。
环境
假定 Elasticsearch 已经启动。
另外,执行环境如下所述。由于没有使用特殊库,基本上在任何环境中都应该没有问题。
- 利用環境
Ubuntu 20.04 LTS
- Elasticsearch環境
7.7.0
- Python環境
$ python -V
Python 3.8.4
筹备
- pipでElasticsearchクライアントのインストール
$ python -m pip install elasticsearch
基本操作原文的汉语括号的短语:”基本操作”
在这里,我们将介绍如何使用基本的Python操作Elasticsearch的方法。
连接
使用安装的elasticsearch软件包连接到已启动的Elasticsearch主机(localhost)的方法如下:
基本的的连接方式
如果在Elasticsearch中没有设置认证,就可以通过这种方法进行连接。
from elasticsearch import Elasticsearch
# Elasticsearchクライアント作成
es = Elasticsearch("http://localhost:9200")
您还可以指定端口,包括HTTP或HTTPS。
# Elasticsearchインスタンスを作成
es = Elasticsearch(
["localhost", "otherhost"],
scheme="http",
port=9200
)
使用HTTP身份验证建立连接
如果您在Elasticsearch上设置了ID和密码,您可以使用这种方法进行连接。
es = Elasticsearch(
"http://localhost:9200",
http_auth=("user_id", "password")
)
用户ID和密码表示已设定的身份和密码。
断开连接
可以使用close()函数关闭上述建立的内部连接。
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")
# 内部接続を閉じる
es.close()
关于这个问题,如果不关闭(close())实例,在实例被垃圾回收时会引发异常。为了防止这种情况发生,最好明确地将其写出来。
当一个开放的HTTP连接被垃圾回收时,这个警告是由aiohttp创建的。通常在关闭应用程序时会遇到此问题。为了解决此问题,请确保在AsyncElasticsearch实例被垃圾回收之前调用close()方法。
https://elasticsearch-py.readthedocs.io/en/master/async.html?highlight=close#receiving-unclosed-client-session-connector-warning
索引的基本操作
我将介绍处理索引的基本操作。
在操作索引时,使用属性名为 “indices”。
创建索引
创建一个名为”students”的索引。
这是一个没有类型和文档的空索引。
es.indices.create(index='students')
使用地图映射的方法
您可以指定数据类型和索引结构进行创建。
mapping = {
"mappings": {
"properties": {
"name": {"type": "text"},
"age": {"type": "long"},
"email": {"type": "text"}
}
}
}
es.indices.create(index="students", body=mapping)
收集索引信息
在这里,我们将介绍如何获取索引的信息。
获取索引列表
只需要一个选项:
如果想要确认连接的Elasticsearch上有什么样的索引,可以使用cat属性的indices(index=”*”, h=”index”)。
indices是一个返回索引信息的方法。
这次我们想要获取全部索引的列表,所以在参数index中指定通配符。
另外,通过指定列名参数”h”,可以获取以换行符分隔的列信息返回。
# インデックス一覧の取得
indices = es.cat.indices(index='*', h='index').splitlines()
# インデックスの表示
for index in indices:
print(index)
执行结果如下所示。
.apm-custom-link
.kibana_task_manager_1
.apm-agent-configuration
students
.kibana_1
不仅显示了已创建的学生,还显示了默认的索引。
确认索引映射
如果要查看特定索引的映射,请使用get_mapping(index=”索引名称”)。
print(es.indices.get_mapping(index="students"))
执行结果如下所示。
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}
另外,如果不指定参数index,您可以获取所有索引的映射。
print(es.indices.get_mapping())
不公开执行结果。
更新索引
如果要更改映射并更新索引结构,可以使用put_mapping(index =“索引名称”,body =“更改的映射内容”)。
举例来说,如果要向学生添加新的学籍号,可以按照以下方式进行操作。
mapping = {
"properties": {
"student_number": {"type": "long"}
}
}
es.indices.put_mapping(index="students", body=mapping)
不必传递所有指定的映射,只需传递差异即可。
另外,在创建索引时,我们将其嵌套在mappings下,但是在更新时要注意不是这样。
当前的映射情况如下:
{
"students": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"student_number": {
"type": "long"
}
}
}
}
}
虽然看起来有点困难,但是最后显示了新增的学生编号。
删除索引
如果想要删除特定的索引,在中文中可以使用delete(index=”索引名称”)来进行操作。
这次我们将尝试删除之前操作过的学生。
es.indices.delete(index="students")
检查索引是否存在
在添加错误处理时,希望确认索引是否存在。
在这种情况下,可以使用exists(index=”索引名称”)来实现。
print(es.indices.exists(index="students"))
执行结果如下:
False
由于先前删除了索引,因此返回了False。
如果存在,将返回True。
文档的基本操作
下面将介绍与处理文件相关的基本操作。
创建文件
要注册新文档,可以使用create(index=”索引名称”, id=”文档ID”, body=”新建文档”)。
如果没有索引,将根据注册的文档自动确定类型并创建。
# 登録したいドキュメント
student = {
"name": "Taro",
"age": 36,
"email": "taro@example.com"
}
# ドキュメントの登録
es.create(index='students', id=1, body=student)
成功注册后,将会输出以下结果。
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}
大容量插入
通过多次调用create函数,可以注册多个文档,但是通过使用bulk(实例,数据)函数可以一次性注册。
传递给参数的数据结构如下,稍微复杂。
{
"_op_type": "createやdelete、updateといったアクションを指定"
"_index": "インデックス名"
"_id": "ドキュメントのIDを指定(createの場合はなくても良い)"
"_source": "登録したいドキュメント"
}
您可以将上述数据存储在一个数组中,并通过批量操作来处理多个文档。
这次我们将介绍一种使用yield而不是数组的方法作为示例。
from elasticsearch import Elasticsearch, helpers # bulkを使うために追加
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")
def gendata():
# 登録したいドキュメント
students = [
{
"name": "Jiro",
"age": 25,
"email": "jiro@example.com"
},
{
"name": "Saburo",
"age": 20,
"email": "saburo@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
# 複数ドキュメント登録
helpers.bulk(es, gendata())
如果进行批量插入超过100MB的大量文档,将会出现错误。在这种情况下,请参考文章底部所提及的异步方式来批量插入大量文档。
搜索文档
我将介绍两种搜索文档的方法。
使用查询进行搜索。
要搜索已注册的文档,请使用search(index=”索引名称”,body=”搜索查询”,size=搜索数量) 方法。
如果没有指定body或size,将显示所有项目。
# ageの値が20より大きいドキュメントを検索するためのクエリ
query = {
"query": {
"range": {
"age": {
"gt": 20
}
}
}
}
# ドキュメントを検索
result = es.search(index="students", body=query, size=3)
# 検索結果からドキュメントの内容のみ表示
for document in result["hits"]["hits"]:
print(document["_source"])
以下是执行结果。
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}
因为Saburo的年龄是20岁,所以并未被排除在搜索结果之外显示。
使用ID进行搜索。
如果直接通过指定文档ID进行搜索,则可以使用get_source(index=”索引名称”,id=”文档ID”)进行查询。
在样本中,正在搜索id为1的文件。
print(es.get_source(index="students", id=1))
运行结果如下所示。
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}
显示了ID为1的Taro。
获取文档数量
如果想要查看索引中文档的数量,请使用count(index=”索引名称”)。
print(es.count(index="students"))
执行结果如下:
{'count': 3, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
由于”count”的值为3,因此我们可以知道”students”索引中目前存在三个文档。
文件更新
在进行文档更新时,可以使用update(index=”索引名称”, id=”文档ID”, body=”变更内容”)函数。
# doc配下に変更したいパラメータを記述
student = {
"doc": {
"age": 40
}
}
# ドキュメントIDを指定して更新
es.update(index="students", id=1, body=student)
# 更新されているか確認
print(es.get_source(index="students", id=1))
我使用了get_source来确认是否已经更新。
{'name': 'Taro', 'age': 40, 'email': 'taro@example.com'}
年龄已经变成了40岁。
删除文件
如果要删除特定的文档,则可以使用delete(index=”索引名称”, id=”文档ID”)。
在示例中,删除ID为1的文档。
es.delete(index="students", id=1)
我们将在下面进行确认,以确定此文档是否已被删除。
确认文件的存在
要确认文档的存在,可以使用exists(index=”索引名”,id=”文档ID”)函数。
print(es.exists(index="students", id=1))
False
由于先前使用delete命令删除了ID为1的文档,所以返回了False并显示为不存在。
如果文档存在,则应返回True并显示为存在。
顺便说一句,在索引中虽然存在,但需要用exists_source来检查是否有_source。
提升
在这里,我们将介绍通常不常使用的方法和一些小技巧。
异步搜索
如果有成千上万个文档,搜索可能会花费很多时间。在这种情况下,您可以使用从v7.8.0版本开始支持的AsyncElasticsearch来以异步方式高效地搜索资源。
准备搞定
为了使用该功能,需要安装asyncio。
$ python -m pip install elasticsearch [ async ] > = 7 .8.0
非同期搜索样本
我为异步搜索准备了一个示例。基本上,仅仅添加了async和await,search的功能并没有改变。
import asyncio
from elasticsearch import AsyncElasticsearch
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def main():
# 非同期検索
result = await es.search(
index="students",
body={"query": {"match_all": {}}},
size=20
)
# 検索結果の表示
for student in result['hits']['hits']:
print(student['_source'])
# セッションをクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
在动态方面,
asyncio.get_event_loop()でイベントループを取得
並列で動かしたい関数main()にasyncをつけて定義
時間がかかる処理searchはawaitをつけて宣言
イベントループのrun_until_complete で並列的に実行しつつ終るまで待つ
这种感觉。
以非同步方式進行批次插入。
接下来,我们将介绍如何使用异步方式执行批量插入操作。
代码几乎没有变化,只需使用async和await,以及适用于异步操作的async_bulk函数即可。
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def gendata():
# 登録したいドキュメント
students = [
{
"name": "Siro",
"age": 19,
"email": "siro@example.com"
},
{
"name": "Goro",
"age": 13,
"email": "goro@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
async def main():
# 非同期でバルクインサートを実行
await async_bulk(es, gendata())
# セッションをクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
执行后,可以查看已注册的文件。
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}
{'name': 'Saburo', 'age': 20, 'email': 'saburo@example.com'}
{'name': 'Siro', 'age': 19, 'email': 'siro@example.com'}
{'name': 'Goro', 'age': 13, 'email': 'goro@example.com'}
我們可以看到 Saburo 和 Goro 有被加進去的。
以非同步方式批量插入大量文件。
由于Elasticsearch设定了限制,无法同时批量插入超过100MB的文档。
http.max_content_length是HTTP请求的最大内容。默认为100MB。
请参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html
当处理这么大量的数据时,可以使用async_streaming_bulk(client, actions, 其他参数)来异步地将文档分成多个(块)进行注册。
参数 shù)
以下介绍可以指定的参数。
chunk_size
型:整数
Elasticsearchへ一度に送信するドキュメント数(デフォルト:500)
max_chunk_bytes
型:整数
リクエストの最大バイトサイズ(デフォルト:100MB)
raise_on_error
型:Bool
exceptでBulkIndexError発生時に、エラーリストを取得できるようになる(デフォルト: True)
raise_on_exception
型:Bool
Falseにすると、bulk失敗時に例外を発生させず、最後に失敗したアイテムの報告のみ行う(デフォルト:True)
max_retries
型:整数
ステータスエラー429発生時に再試行を行う回数(デフォルト:0)
initial_backoff
型:整数
再試行まで待機する秒数、2回以降は待機秒数 x 2となる(デフォルト:2)
max_backoff
型:整数
再試行が待機する最大秒数(デフォルト:600)
yield_ok
型:Bool
Falseにすると、出力結果からbulkに成功したドキュメントが表示されなくなる(デフォルト:True)
样本
我将介绍以下的简单示例。
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk, BulkIndexError
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def gendata():
# 登録したいドキュメント
students = [
{
"name": "Siro",
"age": 19,
"email": "siro@example.com"
},
{
"name": "Goro",
"age": 13,
"email": "goro@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
async def main():
try:
# ドキュメントを複数(チャンク)に分けてバルクインサート
async for ok, result in async_streaming_bulk(client=es,
actions=gendata(),
chunk_size=50, # 一度に扱うドキュメント数
max_chunk_bytes=52428800 # 一度に扱うバイト数
):
# 各チャンクごとの実行結果を取得
action, result = result.popitem()
# バルクインサートに失敗した場合
if not ok:
print(f"failed to {result} document {action}")
# 例外処理
except BulkIndexError as bulk_error:
# エラーはリスト形式
print(bulk_error.errors)
# セッションのクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
在本次指定的块参数上。
-
- チャンク数:50
- チャンクの最大バイト数:50MB
就是这样的感觉。
我认为根据需求进行调整可以很好地注册这些数据。
忽略特定状态的错误。
通过指定“引数ignore”中要忽略的特定状态码,可以忽略错误。
举例来说,如果试图删除不存在的索引,将会触发404错误。
elasticsearch.exceptions.NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [test-index]', test-index, index_or_alias)
在样本中,我们将忽略这一点。
# 404と400で発生するエラーを無視
es.indices.delete(index='test-index', ignore=[400, 404])
超时
在ES中,默认情况下已经设置了超时时间,但也可以自行进行设置。
只需在参数中传递request_timeout=秒数(浮点数),这个方法很简单。
我特意准备了一个样例,以便指定一个较短的时间来设置超时。
print(es.cluster.health(wait_for_status='yellow', request_timeout=0.001))
因为在指定的0.001秒内未能执行完毕,所以显示了一个超时错误。
elasticsearch.exceptions.ConnectionError: ConnectionError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)')) caused by: ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)'))
整理回应
直接显示来自 Elasticsearch 的响应
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}
变得非常难以阅读。
因此,您可以使用json包进行格式化,以便以更清晰的方式进行显示。
import json
from elasticsearch import Elasticsearch
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://locahost:9200")
# マッピング情報の取得
response = es.indices.get_mapping(index="students")
# レスポンスの整形
print(json.dumps(response, indent=2))
只需要提供想要格式化的数据和缩进的空格数量,就可以使用json.dumps对其进行格式化。
由于预计响应嵌套较深,因此我选择了2个缩进来指定。
{
"students": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
变得更易于阅读和整齐。
最后
我介绍了Python Elasticsearch客户端的基本使用方法。
使用提供的便捷方法而不是直接调用API,能直观地进行操作,非常方便。
如果使用方法不正确或有其他方便的方法,请在评论中提出!
请参考以下网站
-
- Python Elasticsearch公式
-
- https://elasticsearch-py.readthedocs.io/en/master/api.html
PythonでElasticsearchの操作
https://blog.imind.jp/entry/2019/03/08/185935
【Python】asyncio(非同期I/O)のイベントループをこねくり回す
https://qiita.com/ynakaDream/items/b63fab24bb30dea6ddb1