回顾日志分析基础架构

首先

我們公司在大約半年前開始建立日誌分析平台,昨天我們進行了試運營。
為了這個機會,我們在公司的部落格上公開了架構的回顧,所以我們也想在這裡介紹一下。
基本上內容是一樣的,請您以最容易閱讀的方式查看。
另外,如果您點擊連結,還可以看到其他同事寫的許多部落格文章,如果您感興趣,將不勝感激。

我想要解决的问题

在我们公司,最近一两年来,将用户行为日志等各种数据与机器学习等分析相结合的动向日益增多,但由于主要以下两个原因,分析以及利用分析结果进行应用程序开发受到了限制。

    • ログが一台のサーバのPostgresにhstore形式にて格納されている

Postgresは分析に適していない
スケールしない(複数サーバによりクラスタリング構成を組めない)

日次の夜間のバッチ処理で前日分のログがデータベースに格納されるので、リアルタイムな分析やアプリケーションを作成することができない

仮に秀逸なレコメンドモデルをデータサイエンティストが作成したとしても、一日前のデータなので精度が落ちるといったことがおこりうる

这些问题的解决方案大约一年前就已经存在了,但由于一些原因一直未能着手,直到2018年10月左右才真正开始实施。

想法出现了一年,开始行动了大约半年,终于从昨天开始试运行,因此我想借此机会回顾整体架构。

希望的读者目标

    • Hadoop, Spark等を導入したいけど、社内に知見があるひとがいない

 

    Hadoop界隈のOSSが非常に入り乱れていて諦めそう

我致力于提供过去我自己这样的人在当时会非常有帮助的信息。

因此,我主要写一些容易误解的要点和陷阱,但我不会故意写出可以在网上轻易找到的信息。然而,我会不断介绍优质的来源。

虽然对于专注于日志分析平台的人来说,我这次介绍的开放源码软件早在半年前之前就没碰过了,可能会觉得我现在才说这些有点晚了,但我希望能够成为从日志收集到分析的整个过程的回顾契机。

整体的趋势

log_base_architecture

我认为这是一个传统的构造。大致上,

    1. 数据从各个应用服务器传输到采集服务器。当采集服务器发生故障时,将使用备用服务器。

 

    1. 采集服务器对数据进行处理(如过滤和添加信息),然后将其发送到Kafka。根据不同的应用程序,将使用不同的主题。数据以JSON格式存储在Kafka中。

 

    1. 从Kafka集群消费数据。

 

    1. 数据保存到HDFS中。

 

    1. 使用Structured Streaming将数据以parquet格式写入HDFS。

 

    1. 然后,可以用于批处理和即席分析。

 

    1. 也可用于实时分析。

 

    1. 根据应用程序的指标,在Structured Streaming中处理后,将数据写回主题。

 

    Fluentd读取主题,并在批处理中计算的阈值超过时,实时检测到Slack。

图上没有显示,但我们作为集群管理器使用Mesos,Zookeeper作为Kafka的幕后支持,正在运行。
另外,为了方便理解,在图中集群由不同的服务器组成,但由于目前正在进行试运行,所以实际上是在同一个集群上运行的。

整个系统的可靠性考虑清楚。

在进入各操作系统的说明之前,我们先谈谈整体可靠性。

    • システム全体としてどの程度の信頼性が必要なのか

 

    • それぞれのOSSはどの程度までサポートしているのか

 

    そのOSSは他のOSSと組み合わせた場合にどうなるのか

在考虑这些事情时最好有这个意识。

从观点来看,

    • 順序を保証するか

 

    • データ到達の保証

at most once

データロストを許容

at least once

データ重複を許容

exactly once

データロスト、データ重複ともに許容しない

可用性をどの程度保証するか

还有其他选择。

整个系统的可信度依赖于各个中间件以及它们的组合可信度。Fluentd声称可以至少一次性,而Kafka和SparkStructuredStreaming声称可以确保仅一次性。但是,如果将它们组合使用,例如将SparkStructuredStreaming的处理结果写入到Kafka中,那么系统整体上将不可避免地变为至少一次性以下。

在应用程序中进行一些设计上的改进可能能够保证”exactly once”,但通常来说,要通过整个数据流来保证”exactly once”是相当困难的。

在这个设计中,我们不需要保证顺序,但至少需要保证数据传递的一次性。

Fluentd(1.3.3版本)

学习方法

如果现在要引入Fluentd,我认为会使用td-agent3。但是,截至2019年4月,大多数日语文章(如Qiita)仍然以td-agent2为基础进行解释。

由于td-agent2系列的规格已经发生了相当大的变化,所以在我的情况下,我试图参考却反而感到困惑。

对于要学习Fluentd的人来说,它需要记忆的内容很少,所以我认为他们可以不必阅读那些文章,而仅仅参考官方手册就可以了。

虽然如此,在这么多内容的情况下,也有人认为手册的数量太多了!我认为单凭个人意见选择的话,读两到三遍以下内容,基本上就能掌握要点了。如果只是粗略读一遍的话,可能需要五到六个小时。

    • OverView

 

    • Configuration (特に重要)

 

    • Input Plugins ~ Buffer PluginsのそれぞれのOverView

in_tail,in_forward,in_http,out_file,out_forward,out_copy,out_stdout,filter_grep,filter_record_transformer

关于Buffer

大體上,如果讀取說明文件,我們可以理解大多數事情,但對於緩衝插件的部分,我們可能會陷入困境,不知道應該如何設定選項。

    • BufferedOutput pluginの代表的なoptionについて

 

    fluentd の基礎知識

这本书写得非常容易理解。

特别是在”将chunk加入队列”这一部分,设置排队时机可能有些复杂,所以稍微解释一下,在每种情况下,只要满足任一条件,就会将其加入队列。

chunk keyとしてtimeが指定されている場合

chunkのレコード数がchunk_limit_recordsに達した
chunkのレコード容量がchunk_limit_size * chunk_full_threshold に達した場合

timekey時間経過ごと(厳密には、timekeyの時間幅の終端に達し、+timekey_wait経過した時刻)

chunk keyとしてtimeが指定されてない場合

chunkのレコード数がchunk_limit_recordsに達した
chunkのレコード容量がchunk_limit_size * chunk_full_threshold に達した場合
chunkが作られてからflush_interval時間経過時

每个默认值是

chunk_limit_size: Default: 8MB (memory) / 256MB (file)

chunk_full_threshold: Default: 0.95

flush_interval: Default: 60s

chunk_limit_records: なし

timekey_wait: 10s

是的。 (Shì de.)

另外,上述为默认行为,可以通过指定flush_mode=interval来同时指定timekey并启用flush_interval,也可以通过指定flush_mode=immediate来禁用缓冲配置。

由于还有其他设置选项,因此我们可以考虑根据情况进行设置。

flush_thread_interval (Default: 1秒 バッファをキューに入れる処理のインターバル)

flush_thread_burst_interval (Default: 1秒 キューから取り出し、送信する処理のインターバル)

flush_thread_count (Default: 1 キューから取り出し、送信する処理のスレッド数)

queued_chunks_limit_size (Default: No limitだが、こちらによると実質的にはflush_thread_countのよう。 キュー内のchunk数の上限。Fluentd v1.1.3 has been releasedを参照するかぎり、 flush_interval経由でのenqueのみ抑制できるよう)

in_tail插件的常见问题

如果在指定in_tail的路径中包含了时间格式字符串,并且没有将read_from_head(默认为false)更改为true,则可能会跳过初始数据的读取并丢失日志。

请注意,即使在日期更改后被旋转到新的日期文件,也无法从开头进行阅读。

对于一种类型的旋转,即使路径没有改变,也会默认从开头开始阅读。

请参考有关fluentd的in_tail插件的详细操作理解。

关于可靠性

根据Fluentd高可用配置,如果在写入缓冲区之前发生崩溃,则无法解决,但在其他情况下,可以通过使用各种插件来保证高可用性。

顺便说一下,”exactly once”在本身并没有支持。

缓冲区插件

默认设置下,重试次数不受限制,重试时间上限为72小时(retry_timeout).

然而,重试间隔默认为2秒、4秒、8秒、16秒,逐渐加倍。因此,建议使用retry_max_interval来设置重试间隔的上限。

只是为了参考,按照默认设置来计算,重试上限前的重试间隔约为18.2小时。

第二个插件

    • outputプラグイン内に記載することができ、リトライしても出力できなかった場合に行う処理を書ける。

 

    サーバに接続できなかったため、ローカルのファイルに書き込むなど。

前进插件

    • require_ack_response

Default: false
at-least-onceを指定できます

流利的插件 – Kafka

Fluentd向Kafak的数据流转使用了buffered-output-plugin。然而,文档不太完善,

partition_key是什么意思?
还有partition_key_key这种东西存在?

必须混乱

查看原始代码会更快。我自己对Ruby语法一无所知,也不了解fluentd插件开发的规范,但我能在4、5分钟内理解配置值的意义。

首先,partition_key是什么?在本地的Java客户端中,我觉得没有类似的概念,所以我查看了本地的DefaultPartitioner实现,但是也没有找到。

在fluent-plugin-kafka插件的内部,使用的ruby-kafka客户端似乎在有指定partition_key的情况下,会根据partition_key计算哈希值并随机分配到不同的分区中,而不是使用消息的键(通常在kafka上下文中使用的消息键,即值的键)。相关的ruby-kafka逻辑可以在这里找到。

如果您不想根据键的值来确定分区,而是由于键的值不均匀等原因,可以像 `ruby-kafka` 文档中所述那样,使用 `partition_key: rand(100)` 等方式进行指定。从这个意义上说,可以说相比官方库,这样的指定更加灵活。(当然,如果您自己实现分区器,也可以在Java中实现类似的功能)。

因为已经知道了partition_key是什么,所以我来解释一下partition_key和partition_key_key。记录的partition_key的值将作为partition,并且记录的partition_key_key的值将作为partition_key。前者的值是整数,后者的值是字符串。前者的“key”是记录的键,后者的“key_key”是partition_key的键和记录的键。有点复杂哈哈。

如果两者都被指定,那么在ruby-kafka中,分区将优先于分区键进行操作。

如果你能理解以上内容,我认为你也能在以下资源中理解其他相关选项。

topic = (@exclude_topic_key ? record.delete(@topic_key) : record[@topic_key]) || def_topic
partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition
message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key

主题也可以根据记录动态分配,是吗?

由於某些參數的默認值可能為1(max_send_retries)和-1(required_acks),並且與Java官方庫稍有不同,因此需要確認。

动物园管理员(3.4.13版本)

使用它来存储与 Kafka Broker、Topic 和 Partition 相关的元数据。

据说在Kafka 0.9之前,还会进行Consumer组的偏移写入等操作,但现在只有在Consumer组成员更改或Kafka集群自身发生变化时才会发生写入,因此流量限制相当严格。

由于其性质,集群结构最好是奇数台,通常三台就足够了。

卡夫卡(2.1.1-cp1)

学习方式

Apache Kafka 分散メッセージングシステムの構築と活用

お勧めです。まず読むのはこちらでしょう。
ソースも豊富なので、実際にコードを動かしましょう
StructuredStreamingとの連携も1章分さかれています

Kafka

ソースはほぼないですが、Kafkaの内部構造等、Deepな部分の理解にはとても良かったです。2周くらい読まないと理解できませんが。。

我认为很少会有人自己编写直接与Kafka进行通信的客户端代码,最终我也是使用了Spark和Fluentd的集成,因此没有自己写过任何一行代码。

我认为,Kafka有不同角色的参与者,包括Broker、Producer和Consumer,个人而言,我觉得有点复杂。我建议您尝试实际运行书中给出的示例。

此外,我认为如果对于Kafka的机制没有一定程度的理解,就无法知道该如何设置Kafka的配置值,所以在这个意义上也需要一定程度的理解。

版本发布

我們使用了Confluent公司提供的Confluent平台的OSS版本,它包含了一些方便的組件,如Schema Registry,而不是Apache Software Foundation提供的社區版本。

另外,虽然是多余的,但Ansilbe的Playbook基础也已经公开,对于编写Ansible Playbook提供了非常有价值的参考。

对于可靠性问题

主题

    • min.insync.replicas

最小のin-syncレプリカ数(デフォルト1)

经纪人

    • unclean.leader.election.enable

out-of-syncレプリカがリーダーになることを許容するか(デフォルトfalse)

制作人

retries

リトライ回数(デフォルト0)
パーティションリーダ不在など復旧可能なエラーについては、アプリケーション側で実装せずともProducer側でリトライしてくれます(デフォルト100msインターバル後)
普通は設定するもののようだが、リトライすると、max.in.flight.requests.per.connectionが1以上(デフォルト5)だと、メッセージ順序が保たれない一方、1に設定するとスループットが大幅に制限される。

acks

0

ProducerはBrokerからの応答を待たずに成功とみなす

1(デフォルト)

リーダーレプリカの受信を成功とみなす(ディスク書き込みは保証しない)
成功と判断後、リーダーレプリカにてディクス書き込み前にクラッシュして、メッセージを受信していないレプリカが新しいリーダーに選出された場合、ロストの可能性がある

all

in-syncレプリカ(同期しているレプリカ)全ての受信完了を成功とみなす
レプリケーションが遅延して、in-syncレプリカが1の場合もあり得るので、min.insync.replicasを2以上に設定することが多いよう。

消费者

    • auto.offset.reset

earliest

有効なオフセットがない場合、先頭から消費
メッセージ重複する可能性あり

latest

デフォルト
有効なオフセットがない場合、末尾から消費
メッセージロストする可能性あり

設定値ではないですが、オフセットのcommitはアプリケーション側に委ねられている部分で、実装によって、重複、ロスト等しやすい部分なので気をつけましょう。

只需要一个选项就可以将以下内容用中文进行本地化改写:
这次

    • min.insync.replicas: 2

 

    • acks: all (fluent-plugin-kafka、Structured Streaming + Kafka Integrationでの設定)

 

    retries: 10 (fluent-plugin-kafka、Structured Streaming + Kafka Integrationでの設定)

设置了。

Kafka单独支持从Kafka 0.11版本开始的exactly-once,但这仅仅意味着它不能确保向外部系统准确地写入一次。

为了支持将”exactly once”写入外部系统,需要在支持主键的数据存储中进行幂等写入设计(即使多次使用相同的键和值进行写入也没有问题),或者在支持事务的数据存储中进行偏移管理(参考Kafka p.81)。这需要应用程序端的技巧和改进。

请重申一次,这次不需要求取“只发生一次”。

卡夫卡的格式

    • JSON

 

    Apache Avro

尽管有多个选择,但我们最终选择了使用JSON。

在Kafka中,除了JSON外,通常还会结合Avro和SchemaRegistry一起使用,因为Avro可以解决JSON的以下问题。

    • データ作成側が自由にフィールドを追加、削除等の変更ができるので、利用側が解釈できなくなる可能性がある。

 

    フィールドや型情報はファイル内では同一であるにもかかわらず、レコードごとに記載するので、冗長である。

在Avro中,可以将模式信息存储在一个文件的开头。此外,使用SchemaRegistry可以进行模式的集中管理和兼容性检查等操作。

考虑到这些好处是否超过了运营成本,我们认为在没有像我们这样的专业团队存在、容易个人化的环境中,最好避免使用Avro,而选择采用JSON。

另外,由于关于Avro的整理信息并不多,所以我之前写了关于Avro和SchemaRegistry的入门指南,请在考虑时参考一下。

Apache Mesos(1.7.2)

对于Apache Mesos,我虽然购买了Mesos实践指南,但由于没时间读完而且学习不足,所以无法否定或赞成将Apache Mesos作为集群管理器。但很明显,YARN被广泛使用且信息丰富,所以如果没有特别的偏好的话,我觉得用YARN就可以了。

Spark2.3引入的Kubernetes在目前仍处于发展阶段,但未来可能成为主流。

要在某种程度上理解Mesos,首先应该以运行作业的方式来理解MesosUI的视角,这是最好的方法。

如果要提醒一个注意点的话,那就是默认设置下,Spark应用程序会无限制地使用集群的核心。它不会根据后来的其他作业请求来释放核心。

因此,默认情况下无法同时运行Spark作业,因此请务必在启动作业时指定spark.cores.max参数,以指定集群整体核心数的上限。

Hadoop(3.1.1)

学习方式

这一次,我们仅使用HDFS在Hadoop的HDFS、MapReduce和YARN功能中。只使用HDFS的功能并不会遇到太多问题,所以作为学习的优先级可以很低。

要想好好学习,虽然我不太会读写。

    • Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

 

    Hadoop 第3版

我认为应该阅读类似的书籍,只需要大致了解即可。

    • Hadoop本第3版 3章 Hadoop分散ファイルシステムについて

 

    1人でHadoopの話をする Advent Calendar 2016

我觉得你可以简单地浏览一下。

注意事项

如果有注意事项的话,那就是需要意识到”HDFS不适合处理大量小文件”这一事实。默认的块大小为128MB。考虑到Spark按照HDFS的块进行分区分配,所以对文件大小要小心才好。

在我参加的2019年Hadoop / Spark Conference Japan中,口头上的解释是“虽然只是一个参考,但理想的文件大小至少应该是100MB”。在这个例子中,我使用SparkStructuredStreaming将数据以parquet格式写入HDFS,并将其作为设置触发器间隔的参考标准。

HDFS中的格式化

诚实地说,这部分并不是我主要调查的部分。

    • ORC

 

    Parquet

有几个选择出现了,但我们决定采用Parquet。

我认为基本上,我们可以根据《基于数据仓库的技术支持下的列式存储格式》一书中所述的最优化方法来实现适用于数据分析的列式存储格式。

在Column格式中,我們參考了HDFS文件格式的選擇,包括個人偏好和偏見,並且在Spark的文件中有很多描述。雖然ORC和Parquet都有優缺點,但由於Parquet是默認格式,因此我們選擇了Parquet。

Spark结构化流(2.4.0)

学习方式

アプリケーションエンジニアのためのApache Spark入門

とてもおすすめです。ログ基盤構築のために読んだ本の中では一番良かったです
ソースが豊富ですぐ動かせる一方、理論的な部分も説明されており、そのバランスが非常に良いです
1回通しで読んだ後、手を動かしながら読むことをお勧めします

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク

2015年の本なので、情報はかなり古いです。コードを書くのに参考にするというよりもRDDも少し知っておきたいなという方が眺めるにはいいでしょう。

简述

您可以使用比 Spark Streaming 更高级的 API 将流数据作为 Dataset/DataFrame 进行处理。通过微批处理方式,具有100毫秒至数秒的延迟(确保一次性处理)。如果这种延迟无法被接受,之前我们不得不使用其他流处理框架,但自从 Spark 2.3 推出了连续处理(continuous-processing),我们可以实现1毫秒以上的延迟(至少一次)。

由于流处理具有特定的概念,所以对于没有接触过流处理 OSS 的人来说,最好先浏览一下structured-streaming-programming-guide,以便学习以下概念。

    • EventTime, ProcessingTime

 

    • TumblingWindow, SlidingWindow, SessionWindow

 

    • Watermark

 

    • Trigger

 

    OutputMode(Append,Update,Complete)

此外,根据以上设置的值,输出的时间可能与直觉不符,可能会产生“嗯?结果怎么还没出来?”的情况,所以需要注意。
例如,如果设置了水印,可以在处理延迟数据和水印的图表中确认,在默认的Append模式下会延迟输出。

(2019/4/24追記)
我没有遇到太大的障碍,但后来出现了一些问题,所以我写了关于如何在StructuredStreaming中使用JOIN来更新静态数据框架的方法。如果你感兴趣的话,请看看。

与KafkaStreams进行对比

看到各种分散流处理引擎,你就会发现开源软件有太多导致混乱。由于引入新的开源软件会使设计变得繁琐,所以除了StructuredStreaming外,我只试过KafkaStreams。我还没有尝试过KSQL。

KafkaStreams 優點如下:

    • ドキュメントやすぐに動くサンプルが非常に充実しているので複雑なことをしない限り、「知りたい情報がネットにない」といったことにはまずならない。やはり新しいOSSを触る際にコード例がたくさんあることは非常に助かりますし、触ってみたいという気持ちも高まります。

 

    • 単純にJavaのアプリケーションを複数サーバそれぞれで起動させるだけでストリーム処理クラスタが簡単に構築されます。Spark導入の際のようにYARN,Apache Mesos等を導入する必要性がなく、利用障壁が低いです。

 

    StreamとTableの関係とかプログラミングしている感があって触っていて楽しい笑。逆にいうと、ライブラリの使い方を覚える必要がある。

SparkStructuredStreaming的优点是什么?

    • ほとんど、バッチ処理とおなじようなDataframeの操作でストリーム処理が可能なので、勉強のコストが低い。

 

    • SparkSQLやSparkMlibでバッチ処理した結果を利用したりといったことがすぐにでもできる

 

    • UDF(ユーザ定義関数)等をバッチ処理等と共有できる

 

    開発スピードが速く、注目度が高い。continuous-processing がサポートされるなど今後も成長していきそう

考虑到以上情况,如果想要进行一些简单的流处理,或者想要在多个专业团队中进行各种定制,但又没有引入YARN、Apache Mesos等工具,我认为Kafka Streams是比较合适的选择。但是这次我个人抑制了想要亲自体验的心情,冷静地选择了StructuredStreaming。另外,我认为将Kafka仅作为数据集线器的角色来担当,从设计上来说会更清楚易懂。

Spark作为KafkaConsumer和KafkaProducer

请参考structured-streaming-kafka-integration。
没有特别困扰的问题。

如果在后缀中添加”kafka”,基本上就可以进行kafka的配置了。

由于Spark会处理偏移量的提交和管理等繁琐的事务,因此用户不需要关注。换句话说,无法指定enable.auto.commit、auto.offset.reset等参数。由于序列化和反序列化是通过DataFrame操作进行的,所以同样无法指定。

目前,仅指定了kafka.acks=-1和kafka.retries=10。

通过在Spark中设置kafka的分区对应于输入的分区,如果适当地设置了kafka消息的键,或者没有设置任何内容,就不会出现偏差。

而且,您可以根据图表看到,我们在Spark集群上对聚合结果进行了再次处理,并将其发送到另一个kafka主题中。这一步骤可能看起来多余,但它是为了供应用工程师使用的Apache Spark入门(简体版)第194页的要求。

作为整个系统来说,通过将数据输出到Kafka并从那里获取并通知警报的过程会更易于维护。这样做可以方便地更改或增加要通知的目标,因此,从扩展性来看,如果考虑到未来的扩展,这种做法更好,这是作者的观点。

会变成顺其自然地遵循的形式。

写入HDFS

在Spark Structured Streaming中,我们通过对HDFS进行分区(例如按日期)并将其持续化为Parquet格式。需要注意的是,默认设置会导致大量的Parquet文件生成。实际上,当我们检查指定路径时,我们发现在每个微批(几秒钟之内)中会生成与MesosUI中Executor数量相同的文件数量,这让我们感到震惊。

如前所述,Hadoop并不擅长处理大量小文件,因此除非处理非常大量的数据流主题,否则可能需要进行调整。

这个地区的情况在Spark Streaming、将输出到Parquet和太多的小输出文件方面有详细说明。

首先,让我们来解释一下为什么会产生大量的文件。

    • DataFrameのpartitionごとに並列にparquetファイルを書き込む際、それぞれ異なるファイルに書き込む必要がある(HDFSは一つのブロックに同時に書き込みができない)

 

    StructuredStreamingはマイクロバッチごとにファイルをcloseする

看起来是如此。
为了避免复杂的事情,只需通过设定值来处理。

triggerにて、明示的にマイクロバッチ間隔を多めに指定する。当然ですが、レイテンシは大きくなります

coalesceにてパーティションを少なめに指定する

看起来只能进行。

云服务也是一个选择

亚马逊金石流

Kafka可以作为替代品,但我没有特别调查过。在与Spark的集成方面,Kafka非常强大,而且与fluentd等周边工具的插件集成也很完善,所以我没有考虑除了Kafka之外的其他选项的理由。

亚马逊 S3

S3可能作为HDFS的替代。平时我们可能不太关注HDFS,但一旦发生故障,最困扰的就是这个部分。由于S3没有数据本地性,且存在通信开销,因此据称它较慢。然而,最近网络带宽足够,特别是对于强调吞吐量的批处理等情况,它可能不会有那么大的缺点。对于公司而言,由于有许多本地环境,并且将日志堆积在S3的情况目前较少,但如果不是这种情况,我认为首先应考虑S3。

另外,如果您正在考虑S3,我推荐参考《Hadoop / Spark Conference Japan 2019》的演讲,该演讲将全面展示如何充分利用Hadoop / Spark使用Amazon S3。尽管我是实时收听的,但考虑到演讲者来自AWS,我们应该加以考虑并参考一下。

电子医疗记录

这是亚马逊提供的Hadoop、Spark等托管服务。虽然准备环境非常简便,但每次分析都需要动态分配集群,并在分析结束后终止,所以选择自由环境比较困难。

如果您在S3中已经有日志数据,并且希望在几个月内使用Spark进行分析,我认为您应该首先考虑。

其他相关信息

Hadoop

「Hadoopの時代は終わった」の意味を正しく理解する

Hadoopが誕生してから現在に至るまでの歴史を、NoSQLの台頭などを絡めて非常にわかりやすく、説明してくれてます
「Hadoopって、Sparkに置き換わるって聞くのに、なんでHadoopって未だに聞くんだろ?」と思っている人にはちょうどいいいです

Hbase

ユースケースで徹底検証! HBaseでIoT時代のビッグデータ管理機能を試す 記事一覧)

2時間ほどで全部よめます
NoSQLにおけるHBaseの位置付け、およびHBaseの概要とアーキテクチャについてわかりやすく書いてあります。

Zookeeper

Apache ZooKeeperを内部解析してみる

4つほどの連載を30分ほどで読めますので、縁の下の力持ちのZookeeperが何をやってくれるのかをサクッと把握するにはよい

Hadoop / Spark Conference Japan 2019で紹介されていた各社アーキテクチャ事例

LINE

OASISというLINE社製のOSS紹介がメイン
Hadoop(HDFS), Sparkという構成で、Hadoopクラスタ(メインのクラスタで、他のクラスタもあるが統合予定とのこと)は500ノード、30PB、150以上のHiveデータベース

SmartNews

2014年頃にアーキテクチャを刷新したとのこと。Hadoop / Spark Conference Japan 2014で古橋さんがPrestoを紹介していたのを機にPrestoを知ったらしい
前 S3 + MapReduce + mongoDB
後 S3 + Hive(バッチ処理) + Presto(リアルタイム処理)

ソフトバンク

ログ基盤は、Kinesis, Fluentd(AutoScaling), Spark(EMR),S3
混雑マップは、Kinesis(ログ基盤とは異なるクラスタにコピー), Fluentd(AutoScaling), Spark(EMR),S3,ElasticSearch

Yahoo

Yosegiという自社製フォーマットのOSS紹介がメイン
Kafka,Hadoop(HDFS),Spark

对OSS的贡献

在过去的半年中,我第一次接触到以下的开源软件。

    • Apache Spark

 

    • Apache Hadoop(HDFS)

 

    • Apache Mesos

 

    • Apache Kafka

 

    • Fluentd

 

    • Apache Zookeeper

 

    Ansible(ログ基盤の構成管理に利用)

在一番开始尝试并深入调查时,我以基于察觉到的事实为基础做出了一些微小的修正,贡献如下所示。

[SPARK-26339][SQL]Throws better exception when reading files that start with underscore

アンダースコアをプレフィックスに持つファイルを読み込む際のSparkの挙動が不親切でしたので修正しました

Enable the setting of partition key

fluent-plugin-kafkaのpartition key指定が無効になっていたので修正しました
ドキュメントに誤りがあったので修正しました

Update lineinfile description

Ansibleのlineinfileモジュールのドキュメントに実際の挙動と不整合がある箇所がありましたので、ドキュメントを修正しました

想要挑战的课题和想要处理的方面

目前状况只是试点阶段,还有许多尚未完成的事情。
例如,有以下一些事项,如果您有兴趣,请务必来我们公司帮忙,谢谢笑

引入HiveMetastore的操作

Spark是一种纯粹的计算框架,本身没有存储功能。它可以处理各种存储,但如果想要将存储在Hadoop兼容的文件系统(如HDFS、Amazon S3)中的数据抽象为表并进行持久化,就需要使用HiveMetastore。

然而,要在实际运营中使用,需要将元数据存储在关系数据库中,这增加了引入的难度,因此目前尚未引入。

现在我们将其作为原始的Parquet文件进行处理,虽然有数据类型、分区等,我认为并没有特别不方便的地方,但还是依赖路径的感觉让人不舒服。

卡夫卡、HDFS等的指标收集和检测

例如,在Kafka中,建议首先监控Kafka中的副本不足分区(UnderReplicatedPartitions)。在实际实施监控时,可以通过Jolokia使用JMX获取指标的例子可以在Apache Kafka分布式消息系统的搭建与应用(NEXT ONE)的第8章中找到,这个例子非常有参考价值。

将存储在应用服务器上的主数据协同到分析平台。

如何在应用程序服务器和分析基础设施在网络上相隔较远等原因导致无法进行复制时,最佳的协作方式是什么,我们还没有考虑清楚。可能会考虑使用Embulk。

最后

回头看,我意识到可能把太多的内容都包括进去了,但我想如果我自己陷入其中或者觉得难以理解的事物,在其他人身上也可能有相同的感觉。基于这个假设,我尽可能多地写了下来。

Hadoop和Spark领域存在着各种各样的开源软件,每个软件的角色都不易理解,因此要全面把握并理解整体是很困难的。起初的几个月我自己也无法理解,不知道应该选择哪个开源软件,经历了非常痛苦的时期。

然而,后来我发现各个开源软件的知识能够相互联系,突然间如雾霾散去一般,我能够理解了。希望这篇文章能够对像我一样的过去的人有所帮助。

广告
将在 10 秒后关闭
bannerAds