在Kubernetes中使用Apache Flink的方法
本文介绍了容器管理系统的发展,并解释了在Kubernetes上使用Apache Flink的最佳实践。
阿里巴巴高级开发工程师唐云(查甘)著,Flink社区志愿者张壮壮编著
这篇文章是由阿里巴巴高级开发工程师唐云(Chagan)基于Apache Flink系列的实时广播进行编辑的。这篇文章的主要话题如下所示。
-
- コンテナ管理システムの進化
-
- Flink on Kubernetesの紹介
-
- Flink on Kubernetesの実践
- hostPathの使用方法のデモ
在本文前半部分,我们介绍了容器管理系统的演进情况。第二部分将介绍Flink在Kubernetes上的部署模式和集群调度原理等内容。第三部分将分享我们在过去一年中使用Kubernetes上的Flink时遇到的问题和学到的经验等实践经验。最后一部分将进行集群部署和任务发送的演示。
容器管理系统的进化
首先,让我们从非核心开发者的角度来探讨Kubernetes和YARN之间的关系。正如大家所知,Apache Hadoop YARN可能是中国使用最广泛的调度系统。这主要是因为在中国或整个大数据行业中,Hadoop HDFS是最常用的存储系统。因此,Apache Hadoop YARN成为了自然而然广泛使用的调度系统,包括早期的Hadoop MapReduce。随着YARN 2.0之后框架的开放,Spark on YARN和Flink on YARN也可以在YARN上进行调度。
当然,YARN本身也有一些限制。
-
- 例えば、YARNはJavaをベースに開発されているため、ほとんどのリソースの分離には制限があります。
- YARN 3.0では、GPUのスケジューリングと管理をある程度サポートしています。しかし、それ以前のバージョンのYARNは、GPUをあまりサポートしていません。
除了Apache软件基金会外,云原生计算基金会(CNCF)也在开发以本地云计算调度为基础的Kubernetes。
作为开发者,我认为Kubernetes更接近于拥有许多功能的操作系统。当然,这也意味着Kubernetes更复杂,学习起来更困难。需要理解许多定义和概念,而YARN主要用于资源调度,在整个操作系统中规模相对较小。当然,它也是大数据生态系统的先驱者。接下来,我将重点关注Kubernetes,并解释从YARN容器到Kubernetes容器(或Pod)的演变过程中获得的经验和教训。
在Kubernetes上介绍Flink。
部署集群
这个图展示了在Kubernetes上Flink的独立会话集群的调度流程。蓝色虚线框表示在Kubernetes集群内运行的组件,灰色框表示由kubectl或Kubernetes Master等Kubernetes原生提供的命令和组件。在左侧,列出了Flink官方文档提供的五个yaml文件。使用这些文件,可以在Kubernetes上部署最简单的Flink独立会话集群。
使用以下的kubectl命令来启动集群。
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
-
- 最初のコマンドは、Flink ConfigMapを作成するためのKubernetes Masterに適用されます。ConfigMapはflink-conf.yamlやlog4j.propertiesなど、Flinkクラスターを実行するために必要な設定を提供します。
-
- 2番目のコマンドでは、TaskManagerをJobManagerに接続するためのFlink JobManagerサービスを作成します。
-
- 3つ目のコマンドは、JobMasterを起動するためのFlink JobManager Deploymentを作成します。このデプロイメントには、DispatcherとResource managerが含まれます。
- 最後のコマンドでは、TaskManagerを起動するためにFlink TaskManager Deploymentを作成します。公式のFlink taskmanager-deployment.yamlインスタンスには2つのレプリカが指定されているため、図には2つのTaskManagerノードが表示されています。
还可以创建JobManager REST服务,通过REST服务提交作业。
上述的图表展示了Flink独立会话集群的概念。
提交工作
下图展示了使用Flink客户端向独立会话集群发送作业的过程。
在Flink客户端上执行以下命令,发送作业。
./bin/flink run -m : ./examples/streaming/WordCount.jar
mで必要なパラメータpublic-node-IPとnode-portは、jobmanager-service.yamlを通じて公開されるRESTサービスのIPアドレスとポートです。このコマンドを実行すると、Streaming WordCountジョブをクラスターに送信することができます。このプロセスは、Flink Standalone Sessionクラスターが実行されている環境に依存しません。クラスタがKubernetes上で動作しているか、物理マシン上で動作しているかに関わらず、ジョブの送信プロセスは同じです。
通过jobmanager-service.yaml公开的REST服务的IP地址和端口是m中所需要的参数public-node-IP和node-port。执行此命令可以将Streaming WordCount作业发送到集群中。这个过程不依赖于Flink Standalone Session集群正在运行的环境。无论集群是在Kubernetes上运行还是在物理机上运行,作业发送的过程都是相同的。
以下是在Kubernetes上使用Standalone Session的优缺点:
-
- メリット: クラスターを起動する前にいくつかのyamlファイルを定義するだけでよく、Flinkのソースコードを変更する必要がありません。クラスタ間の通信はKubernetes Masterに依存しません。
- デメリット: リソースを事前に要求する必要があり、動的に調整することができません。しかし、Flink on YARNでは、ジョブの送信時に、クラスターが必要とするJMやTMのリソースを宣言することができます。
在Flink 1.10的开发过程中,阿里巴巴的工程师负责调度,在Kubernetes上为Flink的原生计算模式做出了贡献。这也是基于过去一年的实践总结而成的原生Kubernetes。
最显著的区别是,通过Flink客户端发送作业时,整个集群的JobMaster会通过Kubernetes ResourceManager动态申请资源到Kubernetes Master,并创建执行TaskManager的pod。然后,TaskManager和JobMaster之间进行通信。有关原生Kubernetes的详细信息,请参考王阳先生分享的”在Kubernetes原生环境中运行Flink”。
总的来说,这是指将Kubernetes与YARN一样使用,并尽可能地使相关配置项目与YARN相同。为了简化说明,我们将使用Standalone Session集群进行说明。在下面的部分中,我们将说明一些功能,其中一些功能在Flink 1.10中尚未实现,而预计将在Flink 1.11中实现。
Kubernetes上的Flink实践
日志收集
在使用Kubernetes上执行Flink作业时,无法避免功能性问题所带来的日志记录。在YARN上执行该作业时,YARN会自动处理这些日志。例如,当容器执行完成后,YARN会收集日志并上传到HDFS,以便稍后查看。然而,在Kubernetes中,日志的收集和保存都需要手动进行。有许多方法可以收集和显示日志。由于日志会因作业异常而导致Pod终止而丢失,所以故障排除变得非常困难。
如果在YARN上执行此作业,则可以通过使用命令“yarn logs -applicationId”来显示相关日志。然而,如果在Kubernetes上执行此作业呢?
目前,使用fluentd来收集日志是常见的做法,也被一些用户在他们的生产环境中使用。
Fluentd也是CNCF项目之一。通过设置一些规则,如正则表达式匹配,可以定期将.log、.out、*.gc等日志上传到分布式存储文件系统(如HDFS),实现日志收集。换句话说,除了TM和JM外,还需要在Pod中启动运行Fluentd进程的另一个容器(即边车容器)。
还有其他的方法可供选择。例如,您可以使用logback-elasticsearch-appender,在不添加容器的情况下将日志发送到Elasticsearch。实现的原则是使用Elasticsearch REST API支持的socket流来直接将日志写入Elasticsearch。
与Fluentd相比,不需要添加其他容器来收集日志。但是,除了log4j之外的日志(例如System.out和System.err的日志)无法收集。特别是在作业内发生核心转储或崩溃转储的情况下,相关日志将直接写入System.out或System.err。从这个角度来看,使用logback-elasticsearch-appender将日志写入Elasticsearch并不是完美的解决方案。与此相反,通过设置各种策略,可以使用Fluentd来收集所需的日志。
度量衡
日志特别在出现问题时,有助于观察作业的执行状态。它可用于追溯发生的场景,进行故障排除和分析。指标和监控是常见且重要的问题。在该行业中,有许多监控系统解决方案,例如在阿里巴巴广泛使用的Druid、开源的InfluxDB、商业集群版的InfluxDB、CNCF的Prometheus和Uber的开源M3。
然后,让我们以Prometheus为例。Prometheus和Kubernetes都是CNCF项目,它们在指标收集领域都有独特的优势。在某种程度上,Prometheus是Kubernetes中的标准监控和收集系统。Prometheus不仅可以监视警报,还可以定期执行基于设定规则的多精度管理。
然而,实际上,我们发现 Prometheus 并没有被设计成能够很好地在水平方向上扩展。正如前面所示的图表右侧所示,Prometheus 的联邦分布式架构实际上是一个多层结构。上层节点负责路由和转发,并查询下层的结果。当然,无论我们放置多少层,上层节点都容易成为性能瓶颈,并且很难配置整个集群。如果用户规模不大,那么一个 Prometheus 可以满足监控要求。然而,如果用户规模增大,例如在由数百个节点构成的 Flink 集群中,一个 Prometheus 将成为性能的主要瓶颈,无法满足监控要求。
要如何解决这个问题呢?
我们正在实施对不同Flink作业的指标进行一致哈希。当然,并不是将一个作业的指标发送到一个Prometheus实例。相反,我们将作业不同范围的指标发送到多个Prometheus实例。Flink的指标强度有大小之分。
-
- JobManager/TaskManager メトリクス
-
- ジョブ・メトリクス(チェックポイント数、サイズ、失敗回数)
-
- タスク・メトリクス
- オペレーター・メトリクス(1秒間に処理されるレコード数、受信バイト数)。
现在,根据作用域实施具有一致哈希的指标,将哈希结果发送到不同的Prometheus实例,并最后与Thanos进行协作。Thanos是《复仇者联盟3》中的反派角色的名字。在我看来,Thanos是一个扩展组件,支持Prometheus指标的分布式查询。因此,在Prometheus架构中,可以将Thanos的边车装配在容器中,这些容器配备了单个Prometheus实例。
整个架构有一些限制,并且需要创建一致的哈希。当将Thanos与Prometheus一起部署时,某些指标数据因某种原因存在于Prometheus A和Prometheus B中。因此,Thanos查询有一定的规则来丢弃数据,即删除一方则另一方的数据将优先。结果,UI上的指标图表线条会出现间断,引发不友好的体验。因此,需要实施Thanos的一致哈希和分布式查询。
然而,在整个解决方案的应用中,可能会出现一些性能问题。尽管Prometheus在许多服务级别指标上显示出良好的性能,但为什么在Flink和作业级别上性能表现不佳呢?这是因为作业指标的剧烈变化。与HDFS和Hbase的监控相比,这些组件的指标受限且维度较低。为了解释维度的概念,让我们试着使用查询场景。例如,我们需要查询关于主机内作业的所有taskmanager_job_task_buffers_outPoolUsage。换句话说,我们需要在查询中使用标签来进行过滤。然后,我们发现Flink的taskAttempId是一个不友好的标签。它是一个UUID,每当作业失败时都会更改。
如果作业持续失败并将新标签持久化到Prometheus,并且连接到Prometheus的数据库需要创建标签索引,那么就需要创建大量的索引。例如,在InfluxDB上产生高负载时,可能会导致内存和CPU无法使用。这是意外的情况。因此,我们需要向社区求助,请求过滤报告中的高维度标签。如果您感兴趣,请关注FLINK-15110。
表演
网络性能
首先,让我们介绍一下网络性能。即使使用容器网络接口(CNI)或Kubernetes的网络插件,也无法避免网络性能下降。一般的Flannel网络在一些测试项目中出现了大约30%的性能损失。它不太稳定。在作业中经常会出现”PartitionNotFoundException Partition xx@host not found”错误,这意味着下游无法获取到上游的Partition。
在Flink层可以改善网络的容错能力。例如,将taskmanager.network.request-backoff.max从默认的10秒调整为300秒,并将Akka的超时值设定为更大的值。
还有一个非常敏感的问题。
在作业运行时,经常会出现与对等方的连接重置的报告。这是因为Flink被设计为对网络的稳定性有较高的要求。为了确保准确性,如果数据发送失败,整个任务将失败并重新启动。这样就会经常收到对等方的连接重置报告。
我们有多种解决方案。
-
- 異種ネットワーク環境を避ける(クロスIDCアクセスをしない)。
-
- クラウドサービス事業者のマシンにマルチキューNICを設定する(インスタンス内のネットワーク遮断を異なるCPUに分散して処理することでパフォーマンスを向上させる)。
-
- Alibaba Cloud Terwayなどのクラウドサービスプロバイダーの高性能ネットワークプラグインを選択する。
- Kubernetesの仮想化ネットワークを避けてホストネットワークを選択する(一定の開発が必要)。
首先,需要确保集群在异构网络环境中能正常运行。如果Kubernetes主机位于不同的数据中心,访问时可能会出现网络抖动。接下来,需要在来自云服务提供商的机器上配置多队列网络接口卡(NIC)。ECS的虚拟机会使用一定的CPU资源进行网络虚拟化。如果没有配置多队列NIC,则在虚拟化中可能只会使用一个或两个核心,而不是两个核心都使用,导致丢包并报告”Connection Reset by peer”错误。
另一个解决方案是选择由云服务提供商提供的高性能网络插件。例如,阿里云提供的Terway可以支持与主机网络相同的性能。不会出现像Flannel那样的性能下降。
最后,如果无法使用Terway,可以通过使用主机网络来避开Kubernetes的虚拟化网络。然而,这个解决方案需要进行比Flink更多的开发工作。如果使用Kubernetes,使用主机网络在某种程度上感觉不适合。这不符合Kubernetes的风格。另外,也有一些机器无法使用Terway,会遇到相同的问题。我们也准备了相应的项目,使用主机网络代替覆盖网络。
磁盘性能
接下来,我将对磁盘性能进行说明。正如前面所述,虚拟化会导致性能下降。如果RocksDB需要对本地磁盘进行读写操作,那么使用覆盖文件系统将导致约30%的性能下降。
我们决定使用hostPath。简而言之,Pod可以访问主机的物理磁盘。请参考前面图中右侧的hostPath定义。当然,Flink镜像的用户需要事先确认他们具有访问主机目录的权限。因此,最好将目录权限更改为777或775。
如果要使用此功能,请查看提供Pod模板的Flink-15656。可以自己进行调整。Kubernetes提供了各种复杂的功能,我们知道无法对所有功能进行Flink调整。如果在模板中定义了hostPath,编写的Pod将能够根据模板中的hostPath访问目录。
OOM杀死了。
OOM killed也是一个令人烦恼的问题。在容器环境中部署服务时,需要预先设置Pod所需的内存和CPU资源。然后,Kubernetes会指定适用于相关节点(主机)的资源调度配置。除了指定请求参数外,还需要设置限制参数来限制所需的内存和CPU资源。
假设一个节点的物理内存为64GB,并且需要8个Pod,每个Pod都有8GB的内存。表面上看似乎没有问题,但如果没有对8个Pod进行限制会怎么样呢?每个Pod可能使用10GB的内存,因此它们会竞争资源。结果是,一个Pod可以正常运行,但另一个Pod可能会突然被杀掉。因此,需要设置内存上限,以避免Pod因内存限制而意外终止。通过检查Kubernetes的事件,可以看到Pod被杀掉是因为OOM。如果你曾经使用过Kubernetes,你应该遇到过这个问题。
我应该如何进行故障排除呢?
第一解决方案是启用JVM的本地内存跟踪,定期检查内存。通过这种方法,只能检查JVM请求的本地内存(包括Metaspace),而不能检查JVM未请求的内存。另一个解决方案是使用Jemalloc和jeprof定期转储分析内存。
老实说,几乎没有使用第二种解决方案的必要。此前我们将这个解决方案应用于YARN,但发现有一些作业占用了大量内存。由于JVM限制了内存的最大使用量,所以应该存在一些与本机内存相关的问题。因此,我们使用Jemalloc和jeprof来分析内存,并找到准确的本机内存。例如,某些用户在解析配置文件之前会自行解压文件,并最终耗尽内存。
这是一种可能导致OOM的情景。如果在使用RocksDB作为节省本机内存的后端时,很有可能会导致OOM。因此,我们为Flink 1.10版本向社区提供了一个功能。这个功能可以管理RocksDB的内存,并由state.backend.rockdb.memory.managed参数进行控制。该功能默认是启用的。
这张图表示的内容是关于什么?
RocksDB不提供内存控制功能。它具有值(value)、列表(list)、映射(map)和窗口(window)四种状态。在顶部行中,记录了当前RocksDB使用的总内存大小,包括块缓存和写缓冲区的使用量,总的四种状态的内存使用量应大于或等于400MB。
这是因为Flink RocksDB目前没有对状态数量进行限制。状态指的是占用写缓冲区和块缓存的Column Family。默认情况下,每个列族最多可以占用两个64MB的写缓冲区和一个8MB的块缓存。一个状态使用136MB,四个状态使用544MB。
启用 state.backend.rockdb.memory.managed 后,四个状态基本上以相同的趋势使用块缓存。
为什么会这样呢?这是因为使用了缓存共享功能。换句话说,它通过创建LRU缓存来将内存分散和调度,无论是什么情况下,最近使用的内存会被释放。因此,在Flink 1.10及更高版本中,通过启用state.backend.rockdb.memory.managed,可以解决大部分问题。
然而,在开发过程中发现RocksDB的缓存共享设计不够完善。这导致了一些实现上的问题,例如无法实现严格的缓存等。启用RocksDB的缓存共享可能会导致奇怪的NPE问题。换句话说,RocksDB的缓存共享在特定场景下可能无法正常工作。在这种情况下,可能需要增加taskmanager.memory.task.off-heap.size以确保缓冲空间。
当然,首先需要知道使用的内存情况。在前面的内存监控图中,需要将参数state.backend.rocksdb.metrics.block-cache-usage设置为true。这样,可以获取相关指标并观察内存使用情况。比如,默认状态下,1GB的状态TM的管理器使用了294MB的内存。
经理有时会占用300MB或310MB的内存。在这种情况下,可以调整名为taskmanager.memory.task.off-heap.size(默认值为0)的参数来增加一些内存,例如64MB的内存等。这意味着为Flink所需要的堆外空间增加了额外的空间,并添加了用于RocksDB的缓冲区,这样就不会因为OOM而被终止。这是目前可用的解决方案。但是,为了根本解决问题,需要与RocksDB社区合作。
如果你们遇到类似的问题,请与我们联系。
示威活动
最后,我们将演示hostPath的使用方法。大多数的yaml文件与社区中的实例相同。您需要将任务管理器的yaml文件更改如下:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: reg.docker.alibaba-inc.com/chagan/flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: state-volume
mountPath: /dump/1/state
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- name: state-volume
hostPath:
path: /dump/1/state
type: DirectoryOrCreate
常见问题。
Flink如何通过Kubernetes的Pod与HDFS进行交互?
Flink和HDFS的互动很简单。只需将相关依赖复制到图像中。将flink-shaded-hadoop-2-uber-{hadoopVersion}-{flinkVersion}.jar放置在flink-home/lib目录下,将hdfs-site.xml和core-site.xml等一些Hadoop配置放置在可访问的目录下即可。此后,Flink将能够访问HDFS。这个过程与从不具备HDFS的集群节点访问HDFS是相同的。
在Kubernetes上,Flink是如何实现高可用性保障的?
Flink集群的高可用性与Flink在Kubernetes上运行与否无关。对于Flink集群的高可用性,需要依赖ZooKeeper的支持。高可用性需要ZooKeeper实现检查点ID计数器、检查点停止和流图停止。因此,在Kubernetes集群上为Flink提供ZooKeeper服务是实现高可用性的核心。ZooKeeper集群可以部署在Kubernetes上或者物理主机上。同时,社区也在尝试使用etcd在Kubernetes上提供高可用性解决方案。目前,唯一能够提供工业级高可用性的是ZooKeeper。
3、Flink on Kubernetes和Flink on YARN哪一个更好呢?如何进行选择?
目前,Flink on YARN是一个相对成熟的系统,但并不适用于云原生环境。随着服务迁移到云上的趋势,我认为Flink on Kubernetes有着光明的未来。尽管Flink on YARN是一个成熟的系统,但它可能无法满足新的需求和挑战。例如,相比于YARN,Kubernetes在GPU调度和流水线创建方面具有更高的性能。
如果只是运行作业, Flink on YARN是相对成熟的选择,可以稳定运行。而Flink on Kubernetes是比较新且受欢迎的选择,也更容易进行迭代。然而,Flink on Kubernetes的学习曲线较陡峭,需要健全的Kubernetes O&M团队的支持。此外,在Kubernetes的虚拟化环境中,不可避免地会出现一定的磁盘和网络性能损失,这可以说是一些缺点。当然,虚拟化(容器化)也带来了更明显的优点。
要如何配置/etc/hosts文件?为了与HDFS进行交互,您需要将HDFS节点的IP地址和主机映射到/etc/hosts文件中。
可以通过 Volume 将 ConfigMap 的内容挂载到 /etc/hosts,也可以依赖 CoDNS 而无需修改 /etc/hosts。
5、如何在Kubernetes上有效解决Flink问题?
首先,从故障排除的角度来看,我们需要了解Flink on Kubernetes和Flink on YARN之间的区别。Kubernetes是一个拥有许多复杂组件的操作系统,而YARN是基于Java的资源调度程序。集群中的许多异常情况都是由主机故障引起的。我认为,相比于YARN,Kubernetes的故障排除更加困难。Kubernetes拥有许多组件。如果DNS解析失败,我们需要检查CoDNS的日志。对于网络或磁盘错误,我们需要检查Kubernetes的事件。如果Pod异常终止,我们需要了解为什么事件Pod会终止。老实说,我们需要运维支持。
有关Flink的故障排除,无论是在Kubernetes还是YARN上,排查故障的方法都是一样的。
-
- 例外が発生していないかログを確認する。
-
- パフォーマンスの問題については、jstackを使ってCPUとコールスタックに例外がないかチェックします。
- OOMリスクが常に存在したり、フルGCが発生しやすかったりする場合は、jmapを使ってメモリを占有するブロックを確認したり、メモリリークがないか分析したりします。
这些故障排除方法不依赖于平台,适用于所有场景。需要注意的是,某些容器镜像可能没有一些调试工具。建议在构建Flink on Kubernetes集群时,创建私有镜像,并安装相应的调试工具。
本博客是从英文版翻译的,原文请点击这里查看。部分内容采用机器翻译,请您如发现翻译错误,欢迎指正。
阿里巴巴云拥有日本的两个数据中心,并且是亚太地区第一大云基础设施服务提供商(2019年Gartner报告),拥有超过60个可用区域。
欲了解更多阿里巴巴云的详情,请点击此处。
阿里巴巴云日本官方网页。