将Pandas的数据框和Elasticsearch的索引进行相互转换

首先

我想要将样本数据插入到特定的Elasticsearch索引中。但是我手头只有分别包含相同主键的不同CSV文件。唉,是要手动合并吗?还是要写脚本?好麻烦啊。。大家也有过这样的经历吧?

在这种情况下,(最终还是要编写Python脚本),使用Eland可以直接在Elasticsearch和Pandas的数据框之间交互,非常方便。换句话说,Pandas很方便。

依赖库

如果您还没有安装Python的pandas、elasticsearch和eland库,请使用以下命令进行安装,因为本次我们将使用这些库。

$ pip install pandas elasticsearch eland

请查阅有关Eland的详细文件。

将CSV数据加载到数据框中并进行合并。

假设你手上有一个如下所示的CSV文件。

“将dau.csv进行再解释,只需要一个选项:”

date,daily_active_users
2023/08/01,1000
2023/08/02,1002
2023/08/03,980

新用户.csv

date,new_users
2023/08/01,23
2023/08/02,31
2023/08/03,7

我打算创建一个Elasticsearch索引,该索引具有每天的daily_active_users和new_users作为字段,基于这种状态。

首先,将这些CSV文件作为数据框读取。

import pandas as pd

dau_df = pd.read_csv("dau.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)
newusers_df = pd.read_csv("newusers.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)

由于Pandas的read_csv有很多选项,所以有些人写了非常长的文章,这里无法详细解释。但我会简要介绍所使用的部分。

    • header: 数値をいじることで、例えば最初の行に余計な説明文とかが入っている場合に読み飛ばしたりできるので便利です。

 

    • index_col: どのカラムをData Frameのインデックス(ID)にするかを指定します。ここでは日付ごとにまとめるので最初のカラムを指定しています。

 

    parse_dates: 日付を自動的にパースしてくれるオプションです。便利。

那么,我们可以使用concat函数来合并读取的数据框。由于我们这次是要横向添加列,所以需要指定axis=1。

all = pd.concat([dau_df, newusers_df], axis=1)
每天活跃用户
新用户日期

2023-08-01
1000
23

2023-08-02
1002
31

2023-08-03
980
7

哇,成功了。Pandas真方便呢。

将数据导入Elasticsearch

那么,让我们将此数据框架投入到Elasticsearch索引中吧。要将Pandas的数据框架投入到Elasticsearch中,我们可以使用Elastic提供的Eland库。

将Data Frame输入到Elasticsearch的函数是ed.pandas_to_eland。

import eland as ed
from elasticsearch import Elasticsearch

es = Elasticsearch(
    'http://localhost:9200',
    basic_auth=('username', 'password')
)
ed.pandas_to_eland(
  all,
  es,
  'daily_user_index',
  es_if_exists="replace",
  es_refresh=True,
  use_pandas_index_for_es_ids=True
)

我将在Elasticsearch上检查是否已经投入。我将在Kibana的开发工具中执行以下请求。

GET daily_user_index/_search

回答

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "daily_user_index",
        "_id": "2023-08-01 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 1000,
          "new_users": 23,
          "date": "2023-08-01T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "2023-08-02 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 1002,
          "new_users": 31,
          "date": "2023-08-02T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "2023-08-03 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 980,
          "new_users": 7,
          "date": "2023-08-03T00:00:00"
        }
      }
    ]
  }
}

我已经投入使用了。注意的一点是,作为Data Frame索引的日期信息被存储为文档ID。这是由于use_pandas_index_for_es_ids=True的设置(这是默认设置)所产生的效果,但如果将它设置为False,索引项(日期)将完全丢失。

如果你想让数据在Elasticsearch中的存储方式更接近常规的话,也许将日期转换为常规字段并投入如下可能会更好。

all['date'] = all.index
ed.pandas_to_eland(
  all,
  es,
  'daily_user_index',
  es_if_exists="replace",
  es_refresh=True,
  use_pandas_index_for_es_ids=False
)

在这种状态下进行搜索,Elasticsearch上注册如下:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "daily_user_index",
        "_id": "I2Cp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 1000,
          "new_users": 23,
          "date": "2023-08-01T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "JGCp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 1002,
          "new_users": 31,
          "date": "2023-08-02T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "JWCp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 980,
          "new_users": 7,
          "date": "2023-08-03T00:00:00"
        }
      }
    ]
  }
}

这样做在 Elasticsearch 中更自然。

将Elasticsearch的索引加载为数据框架。

在Eland中,有一个函数可以将Elasticsearch的索引作为Data Frame进行读取。让我们尝试重新读取刚刚注册的数据。我们将使用ed.DataFrame函数。

ed_df = ed.DataFrame(es, 'daily_user_index')
ed_df
每日活跃用户量
日期
新用户量I2Cp3IsBCU3flpJPM4Go
1000
2023-08-01
23

JGCp3IsBCU3flpJPM4Go
1002
2023-08-02
31

JWCp3IsBCU3flpJPM4Go
980
2023-08-03
7

3行×3列

哇,可以把索引数据自然地转换成数据框,真方便啊。

在这个例子中,我们读取了索引的全部记录,但是也可以进行筛选条件等操作来进行读取,请先查看一下文档。

只是這個 ed_df 是 eland.dataframe.DataFrame 類別的物件。根據情況可能有些不方便,所以讓我們嘗試將其轉換為 Pandas 的 Data Frame。我們可以使用 eland_to_pandas 進行轉換。

pd_df = ed.eland_to_pandas(ed_df)

这样做将使pd_df成为pandas.core.frame.DataFrame类的对象。

最后

我发现使用Eland可以轻松地相互转换Pandas的Data Frame和Elasticsearch的索引。这意味着我们可以像本次一样,将本地的CSV文件读取到Pandas中进行操作,然后将其导入到Elasticsearch中,或者将已经存储在Elasticsearch中的数据读取到Pandas中进行分析等,有很多不同的用法。

作为更进一步的用法,你可以使用Pandas来读取Elasticsearch的数据,然后使用Scikit Learn中的XGBoost等工具进行训练,然后将生成的预测模型上传到Elasticsearch并进行使用。如果你擅长机器学习,请务必尝试一下。