Azure Databricks和Elasticsearch的集成

我們嘗試使用Azure Databricks來進行操作,這是一個在Azure上運行的Spark PaaS環境,它為我們提供了一個方便的服務。這個服務經過多種處理,因此比普通的Spark更快,而且附帶的筆記本也更易於使用。詳細資訊請參閱:https://azure.microsoft.com/ja-jp/services/databricks/

做事情况就是按照这里写的内容,不过我会作为备忘录记录下来。
https://docs.azuredatabricks.net/spark/latest/data-sources/elasticsearch.html
关于Azure Databricks的环境准备,请参照这里的 快速入门 进行操作。非常简单,很快就能完成。

ES-Hadoop的安装设置

ES-Hadoop的套件可以从这里获取。在这里,解压缩ZIP文件,使用出现的elasticsearch-spark-xx_x.xx-x.x.x.jar文件。

library.PNG
CreateLibrary.PNG

数据准备与通信确认。

打开Scala版的Databricks笔记本并编写代码。

为了加载初始样本数据,需要挂载存储,需要事先准备好Blob存储账户和容器。

dbutils.fs.mount(
  source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
 ,mount_point = "/mnt/hol"
 ,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})

为了确认与Elasticsearch实例的连接,请执行一次测试。如果没有做任何设置,通信将需要使用公共IP,但如果进行了虚拟网络的配置,则可以使用私有IP进行通信。在这里,使用“%sh”命令可以执行shell,这非常方便。

%sh 
ping -c 2 <Elasticsearch IP>

顺便提一下,使用curl命令可以进行HTTP通信。

%sh 
curl -XGET http://<Elasticsearch IP>:9200

接下来,需要在存储空间中下载样本数据集。这些数据集似乎是公开提供的样本数据,可以免费下载。

%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno

在Databricks中,我们使用名为Databricks Filesystem的文件系统。我们可以通过使用%fs魔术命令来调用文件系统相关的命令。

%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/

与Elasticsearch的协同工作

从这里开始进行与Elasticsearch的集成。
从已挂载的文件系统中读取下载的CSV文件并加载到Elasticsearch中。通过创建DataFrame并通过”write”方法进行写入,所以相对比较简单。

val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
  .mode("Overwrite")
  .save("index/dogs")

为了确认数据已加载,可以使用以下命令向Elasticsearch发送查询。

%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie

如果要使用Spark从Elasticsearch读取数据,可以使用read操作。

val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)

如果使用SQL进行访问,需要按照下列方式创建表并执行查询。在执行SQL时,可以使用%sql魔法命令。能够进行这样的切换非常方便。

%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs', 
  'nodes'= '<Elasticsearch IP>',
  'es.nodes.wan.only'='true',
  'es.port'='9200',
  'es.net.ssl'='false');

select weight_range as size, count(*) as number 
from (
  select case 
    when weight_low_lbs between 0 and 10 then 'toy'
    when weight_low_lbs between 11 and 20 then 'small'
    when weight_low_lbs between 21 and 40 then 'medium'
    when weight_low_lbs between 41 and 80 then 'large'
    else 'xlarge' end as weight_range
  from dogs) d
group by weight_range
广告
将在 10 秒后关闭
bannerAds