Azure Databricks和Elasticsearch的集成
我們嘗試使用Azure Databricks來進行操作,這是一個在Azure上運行的Spark PaaS環境,它為我們提供了一個方便的服務。這個服務經過多種處理,因此比普通的Spark更快,而且附帶的筆記本也更易於使用。詳細資訊請參閱:https://azure.microsoft.com/ja-jp/services/databricks/
做事情况就是按照这里写的内容,不过我会作为备忘录记录下来。
https://docs.azuredatabricks.net/spark/latest/data-sources/elasticsearch.html
关于Azure Databricks的环境准备,请参照这里的 快速入门 进行操作。非常简单,很快就能完成。
ES-Hadoop的安装设置
ES-Hadoop的套件可以从这里获取。在这里,解压缩ZIP文件,使用出现的elasticsearch-spark-xx_x.xx-x.x.x.jar文件。
数据准备与通信确认。
打开Scala版的Databricks笔记本并编写代码。
为了加载初始样本数据,需要挂载存储,需要事先准备好Blob存储账户和容器。
dbutils.fs.mount(
source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
,mount_point = "/mnt/hol"
,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})
为了确认与Elasticsearch实例的连接,请执行一次测试。如果没有做任何设置,通信将需要使用公共IP,但如果进行了虚拟网络的配置,则可以使用私有IP进行通信。在这里,使用“%sh”命令可以执行shell,这非常方便。
%sh
ping -c 2 <Elasticsearch IP>
顺便提一下,使用curl命令可以进行HTTP通信。
%sh
curl -XGET http://<Elasticsearch IP>:9200
接下来,需要在存储空间中下载样本数据集。这些数据集似乎是公开提供的样本数据,可以免费下载。
%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno
在Databricks中,我们使用名为Databricks Filesystem的文件系统。我们可以通过使用%fs魔术命令来调用文件系统相关的命令。
%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/
与Elasticsearch的协同工作
从这里开始进行与Elasticsearch的集成。
从已挂载的文件系统中读取下载的CSV文件并加载到Elasticsearch中。通过创建DataFrame并通过”write”方法进行写入,所以相对比较简单。
val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
.mode("Overwrite")
.save("index/dogs")
为了确认数据已加载,可以使用以下命令向Elasticsearch发送查询。
%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie
如果要使用Spark从Elasticsearch读取数据,可以使用read操作。
val reader = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)
如果使用SQL进行访问,需要按照下列方式创建表并执行查询。在执行SQL时,可以使用%sql魔法命令。能够进行这样的切换非常方便。
%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs',
'nodes'= '<Elasticsearch IP>',
'es.nodes.wan.only'='true',
'es.port'='9200',
'es.net.ssl'='false');
select weight_range as size, count(*) as number
from (
select case
when weight_low_lbs between 0 and 10 then 'toy'
when weight_low_lbs between 11 and 20 then 'small'
when weight_low_lbs between 21 and 40 then 'medium'
when weight_low_lbs between 41 and 80 then 'large'
else 'xlarge' end as weight_range
from dogs) d
group by weight_range