尝试使用Kafka Streams(尝试在5分钟时间窗口中进行消息聚合)
首先
Kafka Streams是专为Kafka设计的流处理库。它可以连续读取发布到Kafka的消息,并执行任意的处理并输出结果。例如,可以轻松地对按时间分组的消息进行聚合(例如求和、平均等)。虽然这种处理也可以通过批处理来执行,但是通过流处理可以在毫秒级的时间内返回结果,比批处理更快。
我试着执行这样的流处理。
因为只有亲身体验过才能真正理解,所以我亲自尝试了一下。
我们将创建一个流处理程序,用于收集用户在访问网站时所查看的页面、用户数量和用户名的列表,并以每分钟的间隔对接下来的5分钟进行统计。首先将对9:00至9:04的数据进行汇总,然后对9:01至9:05的数据进行汇总。
由于这是我第一次使用Kafka Streams,所以我将根据以下内容进行修改并创建。
尽管Streams API在kafka 1.0.0版本中进行了更改,但以下示例程序是在此之前的版本中创建的,因此我将做一些修改以适应Streams API 1.0.0(如使用StreamsBuilder而不是KStreamBuilder等)。
kafka-streams-stockstats是一个Github上的项目,链接为https://github.com/gwenshap/kafka-streams-stockstats。
更正 pom.xml
修改pom.xml,添加kafka-clients、kafka-streams、gson。
gson用于数据的序列化和反序列化。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
页面类
这是一个用于持有用户对网站的一次访问信息的类。
它包括页面名称、用户名和访问时间。
kafka的消息将被转换为Page类的实例。
package sample.model;
import java.time.LocalDateTime;
public class Page {
private String pageName;
private String user;
private LocalDateTime date;
public Page(String pageName, String user, LocalDateTime date) {
this.pageName = pageName;
this.user = user;
this.date = date;
}
@Override
public String toString() {
return "Page{" +
"pageName='" + pageName + '\'' +
", user=" + user + '\'' +
", date=" + date + '\'' +
'}';
}
public String getPageName() {
return pageName;
}
public void setPageName(String pageName) {
this.pageName = pageName;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public LocalDateTime getDate() {
return date;
}
public void setDate(LocalDateTime date) {
this.date = date;
}
}
存储聚合结果的PageStats类。
package sample.model;
import java.util.HashSet;
import java.util.Set;
public class PageStats {
private String pageName;
private int count;
private Set<String> userSet = new HashSet<String>();
public PageStats add(Page page) {
this.pageName = page.getPageName();
this.count++;
userSet.add(page.getUser());
return this;
}
public PageStats compute() {
return this;
}
}
主类
创建名为「PageStatsExample.java」的主类。
首先,定义Streams的属性。
为了使用自定义的时间窗口,我们指定了“DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG”。如果不指定此选项,每条消息的时间戳将使用消息到达kafka的日期和时间。这次我们将访问网站的时间包含在消息中,并使用该时间进行聚合分析,因此需要指定自定义的“DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG”。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pagestat");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.122:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageSerde.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class);
创建一个StreamsBuilder实例,并定义一个以”pageaccess”主题作为输入的流。
其中,key为String类,value为Page类(自定义类)。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Page> source = builder.stream("pageaccess");
下一步,我们将构建流拓扑(流处理的流程)。简单来说,我们正在定义以下处理步骤。
以每分钟间隔进行5分钟的聚合定义为 “.windowedBy(TimeWindows.of(300000).advanceBy(60000))”。
指定消息接收时使用 “.aggregate” 方法进行聚合。
“PageStats::new” 在时间窗口内进行初始化,在 PageStats 类中存储每5分钟聚合的数据。
使用 “(k, v, pagestats) -> pagestats.add(v)” 应用聚合方法(add)进行聚合。
使用stats.to(“pagestat-output”)来指定将结果写入到”pagestat-output”主题中。
KStream<PageWindow, PageStats> stats = source
.groupByKey()
.windowedBy(TimeWindows.of(300000).advanceBy(60000))
.aggregate(
PageStats::new,
(k, v, pagestats) -> pagestats.add(v),
Materialized.<String, PageStats, WindowStore<Bytes, byte[]>>as("serde")
.withValueSerde(new PageStatsSerde()))
.toStream((key, value) -> new PageWindow(key.key(), key.window().start()))
.mapValues((page) -> page.computeAvgPrice());
stats.to("pagestat-output",
Produced.with(new PageWindowSerde(), new PageStatsSerde()));
一旦创建完成,之后只需要指定主类并执行即可,因为这是一个带有main方法的Java应用程序。
自定义时间戳提取器类
这是一个管理时间窗口日期的类。
它被设计成以毫秒级的时间戳值返回Page实例的数值。
如果没有自己创建这样一个类,自动使用从Kafka接收到的日期时间在流处理中使用。
package sample;
import java.time.ZoneOffset;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import sample.model.Page;
public class CustomTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long timestamp) {
if (record != null && record.value() != null) {
if (record.value() instanceof Page) {
Page value = (Page) record.value();
return value.getDate().toEpochSecond(ZoneOffset.ofHours(9)) * 1000;
}
}
return timestamp;
}
}
执行
使用「kafka-console-consumer.sh」工具,订阅输出结果的pagestat-output主题。
bin/kafka-console-consumer.sh --topic pagestat-output --from-beginning --bootstrap-server 192.168.10.122:9092 --property print.key=true
以下是输出结果:
页面名称和时间戳作为键的情况下,
{“pageName”:”C”,”timestamp”:1536364800000}
显示了总数和用户列表的值
{“pageName”:”C”,”count”:1,”userSet”:[“a”]}
9:00和9:01的数量与最初以图表呈现的计算结果一致。
※为了方便阅读,输出的顺序和格式已经调整好了。
# 1536364800000 -> 2018年09月08日 09時00分00秒 000
{"pageName":"C","timestamp":1536364800000} {"pageName":"C","count":1,"userSet":["a"]}
{"pageName":"A","timestamp":1536364800000} {"pageName":"A","count":1,"userSet":["a"]}
{"pageName":"B","timestamp":1536364800000} {"pageName":"B","count":4,"userSet":["c","a","b"]}
# 1536364860000 -> 2018年09月08日 09時01分00秒 000
{"pageName":"B","timestamp":1536364860000} {"pageName":"B","count":3,"userSet":["a","b","c"]}
{"pageName":"A","timestamp":1536364860000} {"pageName":"A","count":3,"userSet":["a","c"]}
在中文中,只需要一种选择重述以下内容:
源代码/来源
最后我补上了之前省略掉的剩余酱料。
package sample;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.WindowStore;
import sample.model.Page;
import sample.model.PageStats;
import sample.model.PageWindow;
import sample.serde.JsonDeserializer;
import sample.serde.JsonSerializer;
import sample.serde.WrapperSerde;
public class PageStatsExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pagestat");// StreamsアプリのIDを指定する。
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.122:9092"); // 接続先のkafkaブローカを指定する
// シリアライズ・デシリアライズするSerdeを指定する。KEYはデフォルトのStringを用いる。
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Valueはサイトのアクセス情報を持つクラス(Pageクラス:後述)の独自Serdeを指定する。
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageSerde.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class);
// StreamsBuilderのインスタンスを生成し、"pageaccess"トピックを入力とするストリームを定義します。
// key=Stringクラス、valueがPageクラス(独自)になっています。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Page> source = builder.stream("pageaccess");
KStream<PageWindow, PageStats> stats = source
.groupByKey()
.windowedBy(TimeWindows.of(300000).advanceBy(60000))
.aggregate(
PageStats::new,
(k, v, pagestats) -> pagestats.add(v),
Materialized.<String, PageStats, WindowStore<Bytes, byte[]>>as("serde")
.withValueSerde(new PageStatsSerde()))
.toStream((key, value) -> new PageWindow(key.key(), key.window().start()))
.mapValues((page) -> page.compute());
stats.to("pagestat-output",
Produced.with(new PageWindowSerde(), new PageStatsSerde()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
//streams.cleanUp();
streams.start();
// 通常はストリームアプリケーションがずっと実行されているはず
// この例では、入力データは有限なので、しばらく実行して停止します。
Thread.sleep(120000L);
streams.close();
}
static public final class PageSerde extends WrapperSerde<Page> {
public PageSerde() {
super(new JsonSerializer<Page>(), new JsonDeserializer<Page>(Page.class));
}
}
static public final class PageStatsSerde extends WrapperSerde<PageStats> {
public PageStatsSerde() {
super(new JsonSerializer<PageStats>(), new JsonDeserializer<PageStats>(PageStats.class));
}
}
static public final class PageWindowSerde extends WrapperSerde<PageWindow> {
public PageWindowSerde() {
super(new JsonSerializer<PageWindow>(), new JsonDeserializer<PageWindow>(PageWindow.class));
}
}
}
package sample.model;
public class PageWindow {
String pageName;
long timestamp;
public PageWindow(String pageName, long timestamp) {
this.pageName = pageName;
this.timestamp = timestamp;
}
}
之后使用的是同一来源。
package sample.serde;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.google.gson.Gson;
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
public JsonDeserializer() {
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}
return gson.fromJson(new String(bytes),deserializedClass);
}
@Override
public void close() {
}
}
package sample.serde;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.google.gson.Gson;
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
package sample.serde;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
public class WrapperSerde<T> implements Serde<T> {
final private Serializer<T> serializer;
final private Deserializer<T> deserializer;
public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}