我試著檢測Koalas和Elasticsearch是否可以互相配合使用
这篇文章是NTT技术交叉Advent Calender 2020第18天的内容。
你好,我是NTTテクノクロス的@yuyhiraka(平川)。平时,我主要负责虚拟化/容器/云基础设施、小型网络等先进技术的概念验证。
这篇文章中所述的内容是关于个人的业余活动,与所属机构无关。
首先
我试图测试Koalas和Elasticsearch是否可以进行合作。
由于世界上已经有人尝试过Apache Spark TM和Elasticsearch的合作,所以这次测试是为了验证它们之间的应用。
Spark on elasticsearch-hadoop トライアル
ElasticsearchのデータをApache Sparkで加工する
Elasticsearchのクラスタを構築してSparkでIndexを作るまでの簡易手順
Elasticsearch是什么?
Elasticsearch是一种用于搜索、分析的搜索引擎、数据库和生态系统。
有关Elasticsearch的参考信息
-
- Elasticsearchって?
- Elastic Stackサブスクリプション & 導入支援 -NTTテクノクロス-
Apache Spark TM是什么
Apache Spark TM 是一种快速的分布式处理框架,用于大数据处理。它支持Python,并特别称为PySpark。
关于Apache Spark TM的参考信息。
Apache Spark™ – Apache Spark とは~分散処理入門の方にもわかりやすくご紹介
Apache Spark™ – Unified Analytics Engine for Big Data
熊猫(Pandas)是什么?
Pandas是一个强大的用于Python的数据分析库。
关于熊猫的参考信息
- pandas – Python Data Analysis Library
考拉是什么?
Koalas是一个在Apache Spark TM中实现类似于pandas数据操作的包装库。在Apache Spark TM中,存在着类似于Pandas中的DataFrame的Spark Dataset/DataFrame概念,但由于各种API的不同,将pandas ⇔ Spark Dataset/DataFrame之间的对象转换会导致混淆。Koalas采用了解决这个问题的方法。
关于考拉的参考信息
-
- Koalas_ pandas API on Apache Spark
- Spark+AI Summit 2019参加レポート at San Francisco — Spark3.0/Koalas/MLflow/Delta Lake
关于Elasticsearch和Apache Spark TM的集成
根据它们拥有相同的版本号,可以使用Elasticsearch-Hadoop插件(elasticsearch-hadoop 7.10)来实现Elasticsearch 7.10和Hadoop生态系统(包括Apache Spark TM和Koalas)的协作。然而,截至2020年12月,使用Elasticsearch-Hadoop插件似乎无法实现Elasticsearch与Apache Spark TM 3.0.x的协作。
所以这次我们决定使用满足以下条件的Apache Spark TM 2.4.7。
-
- Koalasのサポートバージョン
- Elasticsearch-Hadoopプラグインのサポートバージョン
关于Elasticsearch-Hadoop插件以及Elasticsearch版本的参考信息。
ElasticプロダクトのEOL/サポート終了日
Elasticsearch-Hadoop(GitHub)
有关Elasticsearch和Apache Spark TM 3.0.x的配合参考信息。
Documentation Databricks Workspace guide Data guide Data sources ElasticSearch
Restructure Spark Project Cross Compilation #1423
[Feature] Spark3.0 support #1412
关于Koalas的依赖关系的参考信息
- Koalas Dependencies
关于验证环境的信息
机器规格 (jī qì guī gé)
-
- VirtualBox 6.1.10上のUbuntu 20.04 LTS
-
- vCPU 6コア
-
- vMem 32GB
-
- SSD 100GB
-
- via HTTP/HTTPS Proxy
- (※HTTP/HTTPS Proxyの各種設定については手順上省略します。)
Docker版本
# docker version
Client: Docker Engine - Community
Version: 20.10.0
创建 Apache Spark TM 2.4.7 的容器镜像。
为了节省验证环境部署的时间,可以参考创建一个含有不同版本Spark的映像的方法,来制作一个安装了PySpark2.4.7和JupyterLab的容器映像。
# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
-t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
--build-arg spark_version=2.4.7 \
--build-arg hadoop_version=2.7 \
--build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
--build-arg openjdk_version=8
创建一个Dockerfile,用于在上述基础镜像上安装Elasticsearch-Hadoop插件和Koalas。然而,由于PySpark2.4无法在Python3.8.x上正常运行,因此我们将创建一个Python3.7.x的conda虚拟环境作为解决方案。
在本次不会尝试替换cloudpickle.py并进行修改的方法。
# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y
使用创建的Dockerfile来构建容器镜像。
# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/
从本地获取Elasticsearch的容器镜像。
由于这是一个较大的容器图像,所以我会先获取它。
# docker pull elasticsearch:7.10.1
用Docker Compose启动容器
将进行必要目录的创建和docker-compose.yaml的创建。
# mkdir /opt/es
# mkdir /opt/koalas-spark/
# コンテナからアクセスできるようにパーミッションを緩める (手抜き)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'
services:
elasticsearch:
image: elasticsearch:7.10.1
container_name: elasticsearch
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
volumes:
- /opt/es/:/usr/share/elasticsearch/data
networks:
- devnet
koalas-spark:
build: ./koalas-spark
container_name: koalas-spark
working_dir: '/home/jovyan/work/'
tty: true
volumes:
- /opt/koalas-spark/:/home/jovyan/work/
networks:
- devnet
networks:
devnet:
使用Docker Compose来启动Koalas容器和Elasticsearch容器,同时确认Elasticsearch容器成功启动。
# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
考拉进入Koalas容器
可以從正在運行的容器列表中查找Koalas容器的容器ID,然後使用該容器ID進入Koalas容器。另外一種方法是使用docker-compose exec指定容器ID。
# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e33681a37aea root_koalas-spark "tini -g -- start-no…" 2 minutes ago Up 2 minutes 8888/tcp koalas-spark
fe65e3351bea elasticsearch:7.10.1 "/tini -- /usr/local…" 16 minutes ago Up 16 minutes 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch
# docker exec -it e33681a37aea bash
使用curl命令从Koalas容器检查与Elasticsearch容器之间的连接通信。
$ curl -X GET http://elasticsearch:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
考拉(Koalas)向Elasticsearch进行写入操作。
我将继续在Koalas容器中进行工作。
切换到Python 3.7环境,并启动PySpark(IPython),将数据写入Elasticsearch。
我們這次使用Spark RDD的功能來創建一個4行4列的數據。然後將其轉換成一個Spark DataFrame,並進一步轉換成Koalas DataFrame。
$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
esURL = "elasticsearch"
rdd1 = sc.parallelize([
Row(col1=1, col2=1, col3=1, col4=1),
Row(col1=2, col2=2, col3=2, col4=2),
Row(col1=3, col2=3, col3=3, col4=3),
Row(col1=4, col2=4, col3=4, col4=4)
])
df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)
kdf1.to_spark_io(path="sample/test",
format="org.elasticsearch.spark.sql",
options={"es.nodes.wan.only": "false",
"es.port": 9200,
"es.net.ssl": "false",
"es.nodes": esURL},
mode="Overwrite")
在 PySpark(IPython)中,使用 Ctrl + D 键或其他方法退出。然后,确认数据已存储在 Elasticsearch 中。
curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "sample",
"_type" : "test",
"_id" : "kaTbZXYBpKFycpUDLgjO",
"_score" : 1.0,
"_source" : {
"col1" : 4,
"col2" : 4,
"col3" : 4,
"col4" : 4
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "kKTbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 2,
"col2" : 2,
"col3" : 2,
"col4" : 2
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "j6TbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 3,
"col2" : 3,
"col3" : 3,
"col4" : 3
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "jqTbZXYBpKFycpUDLgjD",
"_score" : 1.0,
"_source" : {
"col1" : 1,
"col2" : 1,
"col3" : 1,
"col4" : 1
}
}
]
}
}
总结
我能够确认使用Elasticsearch-Hadoop插件从Koalas将数据导入Elasticsearch,就像上述所述。最初的计划是使用PySpark(IPython) => JupyterLab和Docker Compose => Kubernetes进行验证,但由于不能花费太多时间在次要问题上,所以我在这次中做出了妥协。
我希望能够尝试在JupyterLab/Kubernetes上执行,从Koalas无障碍地将数据投入到Elasticsearch。此外,由于有很多需求被提出,所以很可能很快就会得到支持。我强烈希望Elasticsearch-Hadoop插件可以在Apache Spark TM 3.0.x上使用。
明天是由@y-ohnuki负责的NTT TechnoCross Advent Calnder 2020的文章。期待吧!