使用Structured Streaming处理来自Kafka的数据,并将其发送到Elasticsearch

我們試著從Apache Kafka取得流數據,並通過Spark的結構化串流處理它,最後將其流向Elasticsearch。這篇文章中的文件描述了如何在結構化串流中使用ES-Hadoop。

azure-databricks-overview.png

假设按照上一篇文章的方法加载了ES-Hadoop库,并且已经创建了Kafka和Elasticsearch环境。

建立与 Kafka 的连接

连接到Kafka的定义如下所示。

val kafkaDF = (
  spark
    .readStream
    .option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
    .option("subscribe", "YOUR_TOPIC1,YOUR_TOPIC2")
    .option("startingOffsets", "latest")
    .format("kafka")
    .load()
)

准备完成,现在可以普通地确认数据如下所示。

display(kafkaDF)

模式的定义

明确定义模式。以下是根据手头的样本数据进行的执行,但需要根据数据进行相应修改的部分。

import org.apache.spark.sql.types._

val schema = new StructType()
  .add("orderID", IntegerType)
  .add("productID", IntegerType)
  .add("orderTimestamp", TimestampType)
  .add("orderQty", IntegerType)

将Kafka数据读入DataFrame中

将之前定义的模式应用于DataFrame并加载数据。

import org.apache.spark.sql.functions._

val df = kafkaDF.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, java.sql.Timestamp)]
        .select(from_json($"value", schema).as("order"), $"timestamp")
        .select("order.*", "timestamp")

设置分区

要设置shuffle后的分区,可以按照以下方式进行操作。以下是设置为8的示例。

spark.conf.set("spark.sql.shuffle.partitions", 8)

汇总数据

在写出集计数据流时,需要定义时间窗口。如果不进行设置,将无法进行后续的写流操作。

import org.apache.spark.sql.functions._

val aggregatedDF = df.withWatermark("timestamp", "1 minutes").groupBy($"productID", window($"timestamp", "1 minutes")).count()

创建检查点目录

创建一个用于写入偏移量和提交日志的目录。

%fs mkdirs /tmp/es

将数据写入Elasticsearch

在这里将 writeStream 流保持不断,在执行下一行代码。

df.writeStream
      .option("es.nodes.wan.only","true")
      .option("es.net.ssl","false")
      .option("es.nodes", "<Your Elasticsearch>")
      .option("checkpointLocation", "/tmp/es")
      .option("es.port", "<Port>")
      .format("es")
      .start("orders/log")

确认被写入的索引

使用curl命令来查看写入Elasticsearch的数据。这里是使用Elasticsearch的search API。

%sh curl http://<Your Elasticsearch>:<Port>/orders/log/_search?q=productID:869

使用Spark连接Elasticsearch并获取数据。

即使是流数据,也可以特别支持处理。

val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port", "<Port>")
  .option("es.net.ssl","false")
  .option("es.nodes", "<Your Elasticsearch>")

val SQLdf = reader.load("orders/log")
display(SQLdf)

使用SQL访问

在Elasticsearch的文档中创建表格并执行查询。

%sql
DROP TABLE IF EXISTS dcmotor;

CREATE TEMPORARY TABLE orders
USING org.elasticsearch.spark.sql
OPTIONS('resource'='orders/log', 
  'nodes'= '<Your Elasticsearch>',
  'es.nodes.wan.only'='true',
  'es.port'='<Port>',
  'es.net.ssl'='false');

执行查询。

%sql SELECT ProductID, SUM(orderQty) AS sum FROM orders GROUP BY ProductID ORDER BY sum DESC LIMIT 10;

這樣一來,Elasticsearch將不斷接收串流處理的資料。使用時間戳記資料,我們可以在Kibana進行時序資料的視覺化和監控。

广告
将在 10 秒后关闭
bannerAds