我試著檢測Koalas和Elasticsearch是否可以互相配合使用

这篇文章是NTT技术交叉Advent Calender 2020第18天的内容。

你好,我是NTTテクノクロス的@yuyhiraka(平川)。平时,我主要负责虚拟化/容器/云基础设施、小型网络等先进技术的概念验证。

这篇文章中所述的内容是关于个人的业余活动,与所属机构无关。

首先

我试图测试Koalas和Elasticsearch是否可以进行合作。
由于世界上已经有人尝试过Apache Spark TM和Elasticsearch的合作,所以这次测试是为了验证它们之间的应用。

Spark on elasticsearch-hadoop トライアル

ElasticsearchのデータをApache Sparkで加工する

Elasticsearchのクラスタを構築してSparkでIndexを作るまでの簡易手順

Elasticsearch是什么?

Elasticsearch是一种用于搜索、分析的搜索引擎、数据库和生态系统。

有关Elasticsearch的参考信息

    • Elasticsearchって?

 

    Elastic Stackサブスクリプション & 導入支援 -NTTテクノクロス-

Apache Spark TM是什么

Apache Spark TM 是一种快速的分布式处理框架,用于大数据处理。它支持Python,并特别称为PySpark。

关于Apache Spark TM的参考信息。

Apache Spark™ – Apache Spark とは~分散処理入門の方にもわかりやすくご紹介

Apache Spark™ – Unified Analytics Engine for Big Data

熊猫(Pandas)是什么?

Pandas是一个强大的用于Python的数据分析库。

关于熊猫的参考信息

    pandas – Python Data Analysis Library

考拉是什么?

Koalas是一个在Apache Spark TM中实现类似于pandas数据操作的包装库。在Apache Spark TM中,存在着类似于Pandas中的DataFrame的Spark Dataset/DataFrame概念,但由于各种API的不同,将pandas ⇔ Spark Dataset/DataFrame之间的对象转换会导致混淆。Koalas采用了解决这个问题的方法。

关于考拉的参考信息

    • Koalas_ pandas API on Apache Spark

 

    Spark+AI Summit 2019参加レポート at San Francisco — Spark3.0/Koalas/MLflow/Delta Lake

关于Elasticsearch和Apache Spark TM的集成

根据它们拥有相同的版本号,可以使用Elasticsearch-Hadoop插件(elasticsearch-hadoop 7.10)来实现Elasticsearch 7.10和Hadoop生态系统(包括Apache Spark TM和Koalas)的协作。然而,截至2020年12月,使用Elasticsearch-Hadoop插件似乎无法实现Elasticsearch与Apache Spark TM 3.0.x的协作。

所以这次我们决定使用满足以下条件的Apache Spark TM 2.4.7。

    • Koalasのサポートバージョン

 

    Elasticsearch-Hadoopプラグインのサポートバージョン

关于Elasticsearch-Hadoop插件以及Elasticsearch版本的参考信息。

ElasticプロダクトのEOL/サポート終了日

Elasticsearch-Hadoop(GitHub)

有关Elasticsearch和Apache Spark TM 3.0.x的配合参考信息。

Documentation Databricks Workspace guide Data guide Data sources ElasticSearch

Restructure Spark Project Cross Compilation #1423

[Feature] Spark3.0 support #1412

关于Koalas的依赖关系的参考信息

    Koalas Dependencies

关于验证环境的信息

机器规格 (jī qì guī gé)

    • VirtualBox 6.1.10上のUbuntu 20.04 LTS

 

    • vCPU 6コア

 

    • vMem 32GB

 

    • SSD 100GB

 

    • via HTTP/HTTPS Proxy

 

    (※HTTP/HTTPS Proxyの各種設定については手順上省略します。)

Docker版本

# docker version
Client: Docker Engine - Community
 Version:           20.10.0

创建 Apache Spark TM 2.4.7 的容器镜像。

为了节省验证环境部署的时间,可以参考创建一个含有不同版本Spark的映像的方法,来制作一个安装了PySpark2.4.7和JupyterLab的容器映像。

# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
    -t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
    --build-arg spark_version=2.4.7 \
    --build-arg hadoop_version=2.7 \
    --build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
    --build-arg openjdk_version=8

创建一个Dockerfile,用于在上述基础镜像上安装Elasticsearch-Hadoop插件和Koalas。然而,由于PySpark2.4无法在Python3.8.x上正常运行,因此我们将创建一个Python3.7.x的conda虚拟环境作为解决方案。

在本次不会尝试替换cloudpickle.py并进行修改的方法。

# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y

使用创建的Dockerfile来构建容器镜像。

# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/

从本地获取Elasticsearch的容器镜像。

由于这是一个较大的容器图像,所以我会先获取它。

# docker pull elasticsearch:7.10.1

用Docker Compose启动容器

将进行必要目录的创建和docker-compose.yaml的创建。

# mkdir /opt/es
# mkdir /opt/koalas-spark/
# コンテナからアクセスできるようにパーミッションを緩める (手抜き)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'

services:
  elasticsearch:
    image: elasticsearch:7.10.1
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - 9200:9200
    volumes:
      - /opt/es/:/usr/share/elasticsearch/data
    networks:
      - devnet

  koalas-spark:
    build: ./koalas-spark
    container_name: koalas-spark
    working_dir: '/home/jovyan/work/'
    tty: true
    volumes:
      - /opt/koalas-spark/:/home/jovyan/work/
    networks:
      - devnet

networks:
  devnet:

使用Docker Compose来启动Koalas容器和Elasticsearch容器,同时确认Elasticsearch容器成功启动。

# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

考拉进入Koalas容器

可以從正在運行的容器列表中查找Koalas容器的容器ID,然後使用該容器ID進入Koalas容器。另外一種方法是使用docker-compose exec指定容器ID。

# docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                              NAMES
e33681a37aea   root_koalas-spark      "tini -g -- start-no…"   2 minutes ago    Up 2 minutes    8888/tcp                           koalas-spark
fe65e3351bea   elasticsearch:7.10.1   "/tini -- /usr/local…"   16 minutes ago   Up 16 minutes   0.0.0.0:9200->9200/tcp, 9300/tcp   elasticsearch
# docker exec -it e33681a37aea bash

使用curl命令从Koalas容器检查与Elasticsearch容器之间的连接通信。

$ curl -X GET http://elasticsearch:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

考拉(Koalas)向Elasticsearch进行写入操作。

我将继续在Koalas容器中进行工作。
切换到Python 3.7环境,并启动PySpark(IPython),将数据写入Elasticsearch。

我們這次使用Spark RDD的功能來創建一個4行4列的數據。然後將其轉換成一個Spark DataFrame,並進一步轉換成Koalas DataFrame。

$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *

esURL = "elasticsearch"

rdd1 = sc.parallelize([
    Row(col1=1, col2=1, col3=1, col4=1),
    Row(col1=2, col2=2, col3=2, col4=2),
    Row(col1=3, col2=3, col3=3, col4=3),
    Row(col1=4, col2=4, col3=4, col4=4)
])

df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)

kdf1.to_spark_io(path="sample/test", 
    format="org.elasticsearch.spark.sql", 
    options={"es.nodes.wan.only": "false", 
    "es.port": 9200,
    "es.net.ssl": "false", 
    "es.nodes": esURL}, 
    mode="Overwrite")

在 PySpark(IPython)中,使用 Ctrl + D 键或其他方法退出。然后,确认数据已存储在 Elasticsearch 中。

curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kaTbZXYBpKFycpUDLgjO",
        "_score" : 1.0,
        "_source" : {
          "col1" : 4,
          "col2" : 4,
          "col3" : 4,
          "col4" : 4
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kKTbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 2,
          "col2" : 2,
          "col3" : 2,
          "col4" : 2
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "j6TbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 3,
          "col2" : 3,
          "col3" : 3,
          "col4" : 3
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "jqTbZXYBpKFycpUDLgjD",
        "_score" : 1.0,
        "_source" : {
          "col1" : 1,
          "col2" : 1,
          "col3" : 1,
          "col4" : 1
        }
      }
    ]
  }
}

总结

我能够确认使用Elasticsearch-Hadoop插件从Koalas将数据导入Elasticsearch,就像上述所述。最初的计划是使用PySpark(IPython) => JupyterLab和Docker Compose => Kubernetes进行验证,但由于不能花费太多时间在次要问题上,所以我在这次中做出了妥协。

我希望能够尝试在JupyterLab/Kubernetes上执行,从Koalas无障碍地将数据投入到Elasticsearch。此外,由于有很多需求被提出,所以很可能很快就会得到支持。我强烈希望Elasticsearch-Hadoop插件可以在Apache Spark TM 3.0.x上使用。

明天是由@y-ohnuki负责的NTT TechnoCross Advent Calnder 2020的文章。期待吧!

广告
将在 10 秒后关闭
bannerAds