使用Structured Streaming处理来自Kafka的数据,并将其发送到Elasticsearch
我們試著從Apache Kafka取得流數據,並通過Spark的結構化串流處理它,最後將其流向Elasticsearch。這篇文章中的文件描述了如何在結構化串流中使用ES-Hadoop。
假设按照上一篇文章的方法加载了ES-Hadoop库,并且已经创建了Kafka和Elasticsearch环境。
建立与 Kafka 的连接
连接到Kafka的定义如下所示。
val kafkaDF = (
spark
.readStream
.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "YOUR_TOPIC1,YOUR_TOPIC2")
.option("startingOffsets", "latest")
.format("kafka")
.load()
)
准备完成,现在可以普通地确认数据如下所示。
display(kafkaDF)
模式的定义
明确定义模式。以下是根据手头的样本数据进行的执行,但需要根据数据进行相应修改的部分。
import org.apache.spark.sql.types._
val schema = new StructType()
.add("orderID", IntegerType)
.add("productID", IntegerType)
.add("orderTimestamp", TimestampType)
.add("orderQty", IntegerType)
将Kafka数据读入DataFrame中
将之前定义的模式应用于DataFrame并加载数据。
import org.apache.spark.sql.functions._
val df = kafkaDF.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, java.sql.Timestamp)]
.select(from_json($"value", schema).as("order"), $"timestamp")
.select("order.*", "timestamp")
设置分区
要设置shuffle后的分区,可以按照以下方式进行操作。以下是设置为8的示例。
spark.conf.set("spark.sql.shuffle.partitions", 8)
汇总数据
在写出集计数据流时,需要定义时间窗口。如果不进行设置,将无法进行后续的写流操作。
import org.apache.spark.sql.functions._
val aggregatedDF = df.withWatermark("timestamp", "1 minutes").groupBy($"productID", window($"timestamp", "1 minutes")).count()
创建检查点目录
创建一个用于写入偏移量和提交日志的目录。
%fs mkdirs /tmp/es
将数据写入Elasticsearch
在这里将 writeStream 流保持不断,在执行下一行代码。
df.writeStream
.option("es.nodes.wan.only","true")
.option("es.net.ssl","false")
.option("es.nodes", "<Your Elasticsearch>")
.option("checkpointLocation", "/tmp/es")
.option("es.port", "<Port>")
.format("es")
.start("orders/log")
确认被写入的索引
使用curl命令来查看写入Elasticsearch的数据。这里是使用Elasticsearch的search API。
%sh curl http://<Your Elasticsearch>:<Port>/orders/log/_search?q=productID:869
使用Spark连接Elasticsearch并获取数据。
即使是流数据,也可以特别支持处理。
val reader = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port", "<Port>")
.option("es.net.ssl","false")
.option("es.nodes", "<Your Elasticsearch>")
val SQLdf = reader.load("orders/log")
display(SQLdf)
使用SQL访问
在Elasticsearch的文档中创建表格并执行查询。
%sql
DROP TABLE IF EXISTS dcmotor;
CREATE TEMPORARY TABLE orders
USING org.elasticsearch.spark.sql
OPTIONS('resource'='orders/log',
'nodes'= '<Your Elasticsearch>',
'es.nodes.wan.only'='true',
'es.port'='<Port>',
'es.net.ssl'='false');
执行查询。
%sql SELECT ProductID, SUM(orderQty) AS sum FROM orders GROUP BY ProductID ORDER BY sum DESC LIMIT 10;
這樣一來,Elasticsearch將不斷接收串流處理的資料。使用時間戳記資料,我們可以在Kibana進行時序資料的視覺化和監控。