流数据处理与推理处理的关联思路和简单示例

提要

本文将简要介绍以下内容给您。

    • エンタープライズのデータ活用基盤の中で機械学習も含めてシステム作りしていくときの話

 

    • その中で特にデータ活用基盤の視点で考えることのベーシックな部分

 

    その中で特にストリーム処理との組み合わせについての考え方

另外,在这篇文章中,我们将从数据活用基础设施的角度来描述,而不是从机器学习的角度。

背景:

在利用机器学习进行企业数据的实时分析平台时,首先应该重点关注对数据本身进行分析,建立模型并进行训练,以及进行深入的学习以使其能够实际应用。

当开始正式应用时,作为数据利用基础设施的问题是一个例子。

    • データの取り回しかた

 

    推論処理をどう取り込んでいくのか

在这里,可以提出这一点。这里我们只专注于这两个方面。

数据的使用方法

对于那些历史悠久的企业系统,它们真的非常复杂,通常无法简单地表示数据的流动方式。有时也会被称为所谓的“信息孤岛”状态。尽管程度各有不同,但在许多组织中,可能都存在这种情况吧。

尽管如此,如果我们对复杂情况进行分解,大致上可以说是由以下的组合流程构成的。

在以数据收集为起点的数据利用基础设施中,数据的流动。

収集 -> 蓄積 -> 加工 -> 活用

分类如下:

从数据发生源运送数据到数据活用基础设施的方式;
在数据活用基础设施内进行处理的方式。

– 这种情况下,从数据发生源运送数据到数据活用基础设施的方式和在数据活用基础设施内进行处理的方式可大致分类为以下两种。

– 以数据发生源为起点将数据运送到数据活用基础设施的方式以及在数据活用基础设施内进行处理的方式可大致划分为以下分类。

    • バッチ処理方式:

1時間単位で取られるスナップショットデータを連係する場合など、ある程度まとまって処理をすることを前提とする方式

ストリームデータ処理方式

時々刻々と生成されるデータをミリ秒〜分単位で連係する場合など、できるだけ短い遅延・処理単位で扱うことを前提とする方式

可以考虑两种选项。

然而,流式数据处理方式实际上也存在将数据以小单位进行打包处理的情况,并不一定在发生源处以信息单位(在此指消息)处理的实现方式。

在这里,我们可以考虑大致的分类。

図1.png

根据数据利用基础架构的视角,对推理处理的定位

现在,关于机器学习的推理处理,在上述的“以数据收集为起点的数据利用基础设施中数据的流动”的背景下,可以将其视为在“处理”和“利用”时的一种处理方式。

図2.png

将收集到的数据和积累的数据经过一系列的处理,传递给系统和人类可视化的视图来呈现结果。在这一系列的处理中,我们考虑包含推理处理。

此外,将一系列的处理和表达表示出来是因为,在实际情况中,除了将数据输入推理用的机器学习模型并获取结果之外,还需要进行许多其他处理的情况很常见。

比如,就像之前提到的数据处理方式一样,从被孤立存放的系统群中得到的数据无法直接使用作为数据源,需要对来自多个数据源的数据进行组合(连接),去除不需要的部分等加工处理,这种情况很常见。(在这里我们将其称为预处理)

図3.png

此外,我们可能会进行一些其他处理,例如将由推理用机器学习模型输出的数据与通过其他方法计算得出的数据混合,或者将其转化为最终服务所需的格式(在这里我们将其称为后处理)

図4.png

顺便提一下,在我的经验中,处理应用程序中的预处理和后处理通常占据了大部分情况。实际上,能够看到整个处理流程的顺畅性非常重要。
另外,尤其在处理大规模数据时,我会查看整个处理流程(包括前后)并进行基础设施的调优,以及对应用程序进行重构。主要是检查是否存在意外的数据偏差,以及是否避免了不必要的I/O操作等。

现在,根据以上内容,我们可以以数据利用基础设施的角度来考虑”收集”的开始,并将其应用于”批处理方式”和”流数据处理方式”。在这里,还有另一种以”利用”为起点的模式,我也会谈到。

从“活用”角度来看,除了像批处理方式和流数据处理方式一样根据时间表机械地进行处理的模式之外,还存在一种希望在需要时输入数据并查看结果的情况。我们将其称为“按需活用方式”。

図5.png

基于上述情况考虑,在数据利用基础设施中,机器学习推理处理的定位是

    • バッチ処理方式のデータ処理に組み込むパターン

 

    • ストリームデータ処理方式のデータ処理に組み込むパターン

 

    オンデマンド活用方式のデータ処理に組み込むパターン

可能会有三种选择。

將推論處理結合的方法

根据《基于数据应用平台的推理处理定位》进行整理,推理处理可以嵌入到数据应用基础设施中的几种处理模式之中。在这里,我们将从推理处理的执行方式的模式视角考虑如何获取推理处理的结果,并与前后处理进行协调。

有哪些机器学习库和框架呢?例如,可以提到TensorFlow、PyTorch、Deeplearning4j、scikit-learn、Apache Spark的Spark MLlib(以下简称MLlib)等等。还有其他很多选择。

这些库和框架可以分别保存已训练的模型,并在其他应用中加载和使用。

不同的库和框架可能会采用各自独特的保存方式,也有一些可以以通用格式进行保存和使用。

以下是一个使用MLlib输出模型的例子(引用自https://spark.apache.org/docs/latest/ml-pipeline.html)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

如果要从应用程序中使用它们,您需要导入相关库并实现加载模型的过程来利用它们。在这里,我们将直接使用模型加载模式进行表述。

最近,管理使用上述方法输出的机器学习模型,并以类似于Web服务的方式提供输入数据以获取结果数据的技术也越来越多地出现。例如,可以提到TensorFlow Serving、Clipper、Polyaxon、Seldon等。

図6.png

如果您想从应用程序中使用这些内容,需要根据应用程序内部提供的服务方式(例如:REST API等)进行访问,并接收推理结果以供内部使用。我们将其表述为模型服务系统的使用模式。

根据以上内容,可以考虑推理处理的定位、推理处理的移动方式和嵌入方式的模式。

図7.png

每个都有自己的特点,但我这里只简单列举一些。

図8.png

在实际的系统开发中,可以考虑这些模式和业务需求,并考虑未来的发展选择一个权衡解决方案。

使用直接模型加载模式的流数据处理方法的示例。

目前有许多不同类型的流数据处理引擎出现。
例如,有Apache Spark的Spark Streaming(Structured Streaming), Apache Kafka(简称Kafka)的Kafka Streams,以及Apache Flink等等。

在处理流式数据时,通常会与作为处理流数据的中心枢纽的”消息系统”结合使用。作为消息系统,可以使用Kafka、Apache Pulsar等。

让我们考虑一个使用Kafka作为代表性消息系统和Kafka Streams作为流数据处理引擎的示例。通过仅使用Kafka,我们可以简化构建流数据处理管道的过程。

这次试验的整体形象如下所示。

図9.png

请注意,当在实际工作或研究中使用时,其他要素在功能和非功能方面都非常重要,请您牢记这一点。
为了让您对本文的核心有所了解,我们将尽力在这里提供简洁的介绍。

前提条件:执行环境

    • 私は手元のCentOS7で試しました。

 

    • JDKも予め導入しておいてください。私の手元では1.8で試しました。

 

    Mavenも予め導入しておいてください。

準備:建立Kafka環境

首先,简单启动Kafka。
关于Kafka的简易构建和启动,请参考Kafka官方文档中的快速入门。
由于此次只是简单的操作确认,因此一个Broker的配置就足够了。

当启动完成后,我们需要事先创建Kafka的主题(注释)。
打开启动Kafka的机器的终端,并执行以下命令来创建主题。

请将以下内容用中文进行原生释义,只需提供一种翻译:
注释:
请将Kafka视为保持消息的逻辑单元。在本文中,您可以将其大致视为数据集。

例子:执行

$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic input
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic output

关于本次使用的机器学习模型和Kafka Streams应用程序

本次我们将以著名的Iris数据作为例子。
有关Iris数据和其分类的详细信息,请参阅维基百科上关于Iris数据的文章。

本次使用的Kafka Streams将使用Java来实现应用程序。因此,可以利用Java上易于处理的机器学习库和框架来简洁地进行实现。TensorFlow的Java API、Deeplaernig4j、BigDL等可以满足这个要求。(注)

在实际操作中,有可能会出现试错过程中想要使用的库和框架与正式应用时想要使用的不一致的情况。
尤其是在分析人员和系统开发运营人员之间,常常会出现技术和思维方式不一致的情况。
关于如何解决这个问题的讨论,本文章将省略。

现在,我将介绍如何使用Deeplearning4j加载和使用经过训练的模型的例子。
请事先根据deeplearning4j-examples学习和输出模型。
有关训练方面的参考,请使用CSVExample,有关模型输出方面的参考,请使用SaveLoadMultiLayerNetwork。

创建应用程序

创建一个Kafka Streams应用程序来从Kafka读取并处理数据。关于Kafka Streams本身,您可以参考Kafka官方文档中关于Kafka Streams的简洁易懂的内容。

在这篇文章中,我们将参考Kafka官方文档中的Kafka Streams,并创建一个基本的模板。

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.4.0 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=streamml

这次我们将使用作为模板生成的 streamml.Pipe 进行修改。
这里只列出要点。

主要的开始处定义的属性将被直接重用。

接下来,根据前面所述,在本文例子中,我们选择加载并使用事先训练好的Deeplearning4j模型。关于模型的保存和加载,请参考Deeplearning4j的示例 SaveLoadMultiLayerNetwork。

在这里,我们将按照以下方式进行实施。

        File modelZip = new File("src/main/resources/IrisModel.zip");
        boolean saveUpdater = true;
        MultiLayerNetwork model = MultiLayerNetwork.load(modelZip, saveUpdater);

接下来,我们定义用于定义Kafka Streams拓扑的构建器。

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("input");

stream方法的参数是输入的主题名称,所以请使用预先生成的主题。
另外,在本文中,我们将作为输入使用逗号分隔的4个数值字符串,如5.4,3.9,1.7,0.4。

接下来,我们定义一个过程,将从输入主题中获得的消息以逗号分隔并输入模型进行推理,从而得到推理结果。

        // 今回はKeyを使用せず、Valueのみ扱うので mapValues メソッドを利用し、
        // メッセージごとの処理を定義
        KStream<String, String> result = source.mapValues(value -> {

                // 文字列を変換し、モデルが期待する形式である2次元配列を生成
                // ただし、この例では1メッセージずつ処理するので、実際のところ含まれる配列はひとつ
                String[] strArray = value.split(",");
                double[] doubleArray = Arrays.stream(strArray)
                                        .mapToDouble(Double::parseDouble)
                                        .toArray();
                double[][] inputArray = {doubleArray};

                // Deeplearning4jで取り扱うデータ形式に変換
                INDArray inputNDA = Nd4j.create(inputArray);

                // モデルにデータを入力し、推論結果を得る
                INDArray outputNDA = model.output(inputNDA);

                // 今回は出力は文字列とする
                return outputNDA.toString();

        });

就像上述的那样,使用Kafka Streams进行流数据处理的应用程序是一种简单的Java应用程序,因此只需在消息处理中直接使用加载的模型即可,非常容易理解。

最后,将得到的结果输出到文档的主题中。

        result.to("output");

从那以后,按照模板进行。
也就是说,建立拓扑图,并开始使用Kafka Streams进行流数据处理。

执行应用程序并进行操作确认

接下来,让我们运行应用程序。
编译并按照以下方式执行。

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=streamml.Pipe

当应用程序启动后,输入示例数据并确认输出。
在这里,我们将使用与Kafka附带的控制台工具通过输入输出数据来进行操作确认,就像上面的图示一样。

在启动先前的应用程序终端之外,同时打开另一个终端,并首先启动输出工具。

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output --from-beginning

接着,再开启一个终端并启动输入工具,尝试输入示例数据。

Input example:

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input
> 5.4,3.9,1.7,0.4
> 5.7,2.5,5.0,2.0
> 5.0,3.0,1.6,0.2
>

现在,您在先前启动的输出工具的终端上是否显示了判断结果?如果每个记录的三种类别的概率都被显示出来,那么就成功了。

The given output example is:

[[    0.0067,    0.9882,    0.0051]]
[[    0.0001,    0.0057,    0.9941]]
[[    0.0066,    0.9906,    0.0028]]

总结

在本文章中,我们介绍了在数据利用基础设施中嵌入机器学习推理处理的思考方式和简单例子。
实际系统更加复杂,但当我们分解每个要素时,我们认为重点是观察流程和模式,这是简化的第一步。

广告
将在 10 秒后关闭
bannerAds