尝试使用Kafka Streams(尝试在5分钟时间窗口中进行消息聚合)

首先

Kafka Streams是专为Kafka设计的流处理库。它可以连续读取发布到Kafka的消息,并执行任意的处理并输出结果。例如,可以轻松地对按时间分组的消息进行聚合(例如求和、平均等)。虽然这种处理也可以通过批处理来执行,但是通过流处理可以在毫秒级的时间内返回结果,比批处理更快。

我试着执行这样的流处理。

因为只有亲身体验过才能真正理解,所以我亲自尝试了一下。

我们将创建一个流处理程序,用于收集用户在访问网站时所查看的页面、用户数量和用户名的列表,并以每分钟的间隔对接下来的5分钟进行统计。首先将对9:00至9:04的数据进行汇总,然后对9:01至9:05的数据进行汇总。

image.png

由于这是我第一次使用Kafka Streams,所以我将根据以下内容进行修改并创建。
尽管Streams API在kafka 1.0.0版本中进行了更改,但以下示例程序是在此之前的版本中创建的,因此我将做一些修改以适应Streams API 1.0.0(如使用StreamsBuilder而不是KStreamBuilder等)。

kafka-streams-stockstats是一个Github上的项目,链接为https://github.com/gwenshap/kafka-streams-stockstats。

更正 pom.xml

修改pom.xml,添加kafka-clients、kafka-streams、gson。
gson用于数据的序列化和反序列化。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.2</version>
        </dependency>

页面类

这是一个用于持有用户对网站的一次访问信息的类。
它包括页面名称、用户名和访问时间。
kafka的消息将被转换为Page类的实例。

package sample.model;

import java.time.LocalDateTime;

public class Page {

    private String pageName;
    private String user;
    private LocalDateTime date;

    public Page(String pageName, String user, LocalDateTime date) {
        this.pageName = pageName;
        this.user = user;
        this.date = date;
    }

    @Override
    public String toString() {
        return "Page{" +
                "pageName='" + pageName + '\'' +
                ", user=" + user + '\'' +
                ", date=" + date + '\'' +
                '}';
    }

    public String getPageName() {
        return pageName;
    }

    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public LocalDateTime getDate() {
        return date;
    }

    public void setDate(LocalDateTime date) {
        this.date = date;
    }

}

存储聚合结果的PageStats类。

package sample.model;

import java.util.HashSet;
import java.util.Set;

public class PageStats {

    private String pageName;
    private int count;
    private Set<String> userSet = new HashSet<String>();

    public PageStats add(Page page) {

        this.pageName = page.getPageName();
        this.count++;
        userSet.add(page.getUser());

        return this;
    }

    public PageStats compute() {
        return this;
    }
}


主类

创建名为「PageStatsExample.java」的主类。

首先,定义Streams的属性。

为了使用自定义的时间窗口,我们指定了“DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG”。如果不指定此选项,每条消息的时间戳将使用消息到达kafka的日期和时间。这次我们将访问网站的时间包含在消息中,并使用该时间进行聚合分析,因此需要指定自定义的“DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG”。

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pagestat");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.122:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageSerde.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class);

创建一个StreamsBuilder实例,并定义一个以”pageaccess”主题作为输入的流。
其中,key为String类,value为Page类(自定义类)。

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Page> source = builder.stream("pageaccess");

下一步,我们将构建流拓扑(流处理的流程)。简单来说,我们正在定义以下处理步骤。

以每分钟间隔进行5分钟的聚合定义为 “.windowedBy(TimeWindows.of(300000).advanceBy(60000))”。
指定消息接收时使用 “.aggregate” 方法进行聚合。
“PageStats::new” 在时间窗口内进行初始化,在 PageStats 类中存储每5分钟聚合的数据。
使用 “(k, v, pagestats) -> pagestats.add(v)” 应用聚合方法(add)进行聚合。

使用stats.to(“pagestat-output”)来指定将结果写入到”pagestat-output”主题中。

        KStream<PageWindow, PageStats> stats = source
                .groupByKey()
                .windowedBy(TimeWindows.of(300000).advanceBy(60000))
                .aggregate(
                    PageStats::new,
                    (k, v, pagestats) -> pagestats.add(v),
                    Materialized.<String, PageStats, WindowStore<Bytes, byte[]>>as("serde")
                        .withValueSerde(new PageStatsSerde()))
                .toStream((key, value) -> new PageWindow(key.key(), key.window().start()))
                .mapValues((page) -> page.computeAvgPrice());

        stats.to("pagestat-output",
                Produced.with(new PageWindowSerde(), new PageStatsSerde()));

一旦创建完成,之后只需要指定主类并执行即可,因为这是一个带有main方法的Java应用程序。

自定义时间戳提取器类

这是一个管理时间窗口日期的类。
它被设计成以毫秒级的时间戳值返回Page实例的数值。
如果没有自己创建这样一个类,自动使用从Kafka接收到的日期时间在流处理中使用。

package sample;

import java.time.ZoneOffset;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

import sample.model.Page;

public class CustomTimestampExtractor implements TimestampExtractor {

      @Override
      public long extract(ConsumerRecord<Object, Object> record, long timestamp) {
        if (record != null && record.value() != null) {

          if (record.value() instanceof Page) {
              Page value = (Page) record.value();
              return value.getDate().toEpochSecond(ZoneOffset.ofHours(9)) * 1000;
          }
        }
        return timestamp;
      }
}


执行

使用「kafka-console-consumer.sh」工具,订阅输出结果的pagestat-output主题。

bin/kafka-console-consumer.sh --topic pagestat-output --from-beginning --bootstrap-server 192.168.10.122:9092 --property print.key=true

以下是输出结果:
页面名称和时间戳作为键的情况下,
{“pageName”:”C”,”timestamp”:1536364800000}
显示了总数和用户列表的值
{“pageName”:”C”,”count”:1,”userSet”:[“a”]}

9:00和9:01的数量与最初以图表呈现的计算结果一致。
※为了方便阅读,输出的顺序和格式已经调整好了。

# 1536364800000 -> 20180908 090000 000
{"pageName":"C","timestamp":1536364800000}      {"pageName":"C","count":1,"userSet":["a"]}
{"pageName":"A","timestamp":1536364800000}      {"pageName":"A","count":1,"userSet":["a"]}
{"pageName":"B","timestamp":1536364800000}      {"pageName":"B","count":4,"userSet":["c","a","b"]}

# 1536364860000 -> 20180908 090100 000
{"pageName":"B","timestamp":1536364860000}      {"pageName":"B","count":3,"userSet":["a","b","c"]}
{"pageName":"A","timestamp":1536364860000}      {"pageName":"A","count":3,"userSet":["a","c"]}

在中文中,只需要一种选择重述以下内容:

源代码/来源

最后我补上了之前省略掉的剩余酱料。

image.png
package sample;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.WindowStore;

import sample.model.Page;
import sample.model.PageStats;
import sample.model.PageWindow;
import sample.serde.JsonDeserializer;
import sample.serde.JsonSerializer;
import sample.serde.WrapperSerde;

public class PageStatsExample {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pagestat");// StreamsアプリのIDを指定する。
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.122:9092"); // 接続先のkafkaブローカを指定する
        // シリアライズ・デシリアライズするSerdeを指定する。KEYはデフォルトのStringを用いる。
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // Valueはサイトのアクセス情報を持つクラス(Pageクラス:後述)の独自Serdeを指定する。
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageSerde.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class);

        // StreamsBuilderのインスタンスを生成し、"pageaccess"トピックを入力とするストリームを定義します。
        // key=Stringクラス、valueがPageクラス(独自)になっています。
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Page> source = builder.stream("pageaccess");

        KStream<PageWindow, PageStats> stats = source
                .groupByKey()
                .windowedBy(TimeWindows.of(300000).advanceBy(60000))
                .aggregate(
                    PageStats::new,
                    (k, v, pagestats) -> pagestats.add(v),
                    Materialized.<String, PageStats, WindowStore<Bytes, byte[]>>as("serde")
                        .withValueSerde(new PageStatsSerde()))
                .toStream((key, value) -> new PageWindow(key.key(), key.window().start()))
                .mapValues((page) -> page.compute());

        stats.to("pagestat-output",
                Produced.with(new PageWindowSerde(), new PageStatsSerde()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        //streams.cleanUp();

        streams.start();

        // 通常はストリームアプリケーションがずっと実行されているはず
        // この例では、入力データは有限なので、しばらく実行して停止します。
        Thread.sleep(120000L);
        streams.close();

    }

    static public final class PageSerde extends WrapperSerde<Page> {
        public PageSerde() {
            super(new JsonSerializer<Page>(), new JsonDeserializer<Page>(Page.class));
        }
    }

    static public final class PageStatsSerde extends WrapperSerde<PageStats> {
        public PageStatsSerde() {
            super(new JsonSerializer<PageStats>(), new JsonDeserializer<PageStats>(PageStats.class));
        }
    }

    static public final class PageWindowSerde extends WrapperSerde<PageWindow> {
        public PageWindowSerde() {
            super(new JsonSerializer<PageWindow>(), new JsonDeserializer<PageWindow>(PageWindow.class));
        }
    }
}

package sample.model;

public class PageWindow {
    String pageName;
    long timestamp;

    public PageWindow(String pageName, long timestamp) {
        this.pageName = pageName;
        this.timestamp = timestamp;
    }
}

之后使用的是同一来源。

package sample.serde;


import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.google.gson.Gson;

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if(bytes == null){
            return null;
        }

        return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}
package sample.serde;

import java.nio.charset.Charset;
import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;

import com.google.gson.Gson;

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}
package sample.serde;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;


public class WrapperSerde<T> implements Serde<T> {

    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

广告
将在 10 秒后关闭
bannerAds