Flink 内存配置学习总结

设置进程内存(Process Memory)

Apache Flink通过严格控制其各种组件的内存使用,在JVM之上提供高效的工作负载。

配置总内存(Total Memory)

Flink JVM进程的总进程内存(total process memory)由Flink应用程序消耗的内存(总Flink内存(total Flink memory))和JVM运行进程所消耗的内存组成。总Flink内存消耗包括JVM堆内存( JVM Heap)和堆外(Off-heap,直接(direct)或本地(native)内存的使用量

在Flink中设置内存的最简单方法是配置以下两个选项之一:

组件 TaskManager配置选项 JobManager配置选项
Total Flink memory taskmanager.memory.flink.size jobmanager.memory.flink.size
Total process memory taskmanager.memory.process.size jobmanager.memory.process.size

其余内存组件将根据默认值或额外配置的选项自动调整。

配置 total Flink memory 更适合standalone部署,其中要声明给Flink本身多少内存。total Flink memory分为JVM HeapOff-heap内存。另请参阅如何为standalone部署配置内存

如果配置了total process memory,那就声明了总共应该为Flink JVM进程分配多少内存。对于容器化部署,它对应于请求的容器的内存大小,另请参阅如何为容器配置内存(Kubernetes 或者 Yarn)

另一种设置内存的方法是配置特定于具体Flink进程的total Flink memory所需的内部组件,比如TaskManager、JobManager。

必须使用上述三种方法之一来配置Flink的内存(本地执行除外),否则Flink启动将失败。这意味着必须显式配置以下没有默认值的选项子集之一:

针对TaskManager: 针对JobManager:
taskmanager.memory.flink.size jobmanager.memory.flink.size
taskmanager.memory.process.size jobmanager.memory.process.size
taskmanager.memory.task.heap.sizetaskmanager.memory.managed.size jobmanager.memory.heap.size

不建议同时显示配置 total process memorytotal Flink memory。因为这样可能因为潜在的内存配置冲突,导致部署失败。配置其它内存组件时同样需要注意,因为也可能产生配置冲突。

JVM参数

Flink在启动进程时,会根据配置或派生的内存组件大小,显式添加以下与内存相关的JVM参数:

JVM 参数 Value for TaskManager Value for JobManager
-Xmx-Xms Framework + Task Heap Memory JVM Heap Memory (*)
-XX:MaxDirectMemorySize Framework + Task Off-heap (**) + Network Memory Off-heap Memory (**),(***)
-XX:MaxMetaspaceSize JVM Metaspace JVM Metaspace

(*) 请记住,根据使用的GC算法,你可能无法使用全部堆内存。一些GC算法会为自己分配一定数量的堆内存。这将导致Heap metrics返回不同的最大值(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#memory)。
(**) 请注意,用户代码中本地非直接使用内存也可以作为堆外内存的一部分。
(***) 仅当设置了对应的jobmanager.memory.enable-jvm-direct-memory-limit 选项时,才会为JobManager添加JVM Direct内存限制

根据比例限制的组件(Capped Fractionated Components)

本节描述了选项的配置细节,这些选项(的大小)可以设置为其它内存大小的占比,同时受到min-max范围的限制:

  • JVM OverheadJVM 开销)可以设置为 total process memory的占比
  • Network memory 可以设置为 total Flink memory 的占比(仅针对TaskManager)

相关内存部分的配置方法,请同时参考 TaskManagerJobManager 的详细内存模型。

这些组件的内存大小必须在相应的最大值、最小值范围内,否则 Flink 将无法启动。 最大值、最小值具有默认值,也可以通过相应的配置选项显示设置。 例如,如果仅配置以下内存选项:

  • total Process memory = 1000MB,
  • JVM Overhead min = 64MB,
  • JVM Overhead max = 128MB,
  • JVM Overhead fraction = 0.1

那么 JVM Overhead将会是 1000MB x 0.1 = 100MB,在 64-128MB 的范围内。

注意,如果将最大值、最小值设置成相同大小,那相当于明确指定了该组件内存的大小。

如果没有明确指定组件内存的大小,Flink 会根据总内存和占比(fraction)计算出该组件内存的大小。 计算得到的内存大小将受限于相应的最小值/最大值选项。 例如,如果仅配置下列选项:

  • total Process memory = 1000MB,
  • JVM Overhead min = 128MB,
  • JVM Overhead max = 256MB,
  • JVM Overhead fraction = 0.1

那么 JVM Overhead将会是 128MB,因为根据总内存和占比计算得到的内存大小 100MB 小于最小值128MB。

如果配置了总内存和其他组件内存的大小,那么 Flink 也有可能会忽略给定的占比。 这种情况下,JVM Overhead被设置为总内存减去其他所有组件内存后的剩余部分。 这样推导得出的内存大小必须符合最大值、最小值范围,否则配置失败。 例如,假设仅配置下列选项:

  • total Process memory = 1000MB,
  • task heap = 100MB, (类似的例子可以是JobManager中的JVM Heap)
  • JVM Overhead min = 64MB,
  • JVM Overhead max = 256MB,
  • JVM Overhead fraction = 0.1

total Process memory中所有其他组件内存均有默认大小,包括 TaskManager 的默认Managed Memory占比或 JobManager 的默认Off-heap 内存。 因此,**JVM Overhead的实际大小不是根据占比算出的大小(1000MB x 0.1 = 100MB),而是total Process memory的剩余部分,该值的大小必须在 64-256MB 的范围内,否则将会启动失败。

设置任务管理器内存(TaskManager Memory)

TaskManager在Flink中运行用户代码。根据需要配置内存使用情况可以大大减少Flink的资源占用,并提高作业稳定性。

下述内存配置描述适用版本1.10及往后版本。

配置总内存

Flink JVM进程的total process memory由Flink应用程序消耗的内存(总Flink内存)和JVM运行进程所消耗的内存组成。总Flink内存消耗包括JVM堆、托管内存(由Flink管理)和其他直接(或本机)内存的使用。

如果您在本地(例如从IDE)运行Flink而没有创建集群,那么只有内存配置选项的一个子集是相关的,请参阅本地运行 以了解更多详细信息。

否则,为TaskManager设置内存的最简单方法是配置总内存(参见上文)。这里更详细地描述了一种更细粒度的方法。

其余内存组件将根据默认值或额外配置的选项自动调整。

配置堆和托管内存(Heap and Managed Memory)

如前所述 ,在Flink中设置内存的另一种方法是显式指定两者task堆内存托管内存)。它为Flink的任务可用堆内存及其托管内存提供了更多控制。

其余内存组件将根据默认值或额外配置的选项自动调整。

如果已显式配置任务堆和托管内存,建议既不设置total process memory,也不设置 total Flink memory,否则,很容易导致内存配置冲突

Task (Operator)堆内存

如果想保证一定数量的JVM堆内存可用于的用户代码,可以显式地设置任务堆内存taskmanager.memory.task.heap.size)。它将被添加到JVM堆大小中,并将专用于运行用户代码的Flink operator。

托管内存

托管内存由Flink管理,并作为本地内存(堆外内存)进行分配。以下工作负载使用托管内存:

托管内存的大小可以:

如果两者都已设置,则Size将覆盖fraction。如果没有显式配置sizefraction,则使用默认fraction

查看如何为state backendsbatch jobs配置内存。

使用者权重(Consumer Weights)

如果作业包含多种类型的托管内存使用者,还可以控制如何在这些类型之间共享托管内存。配置选项taskmanager.memory.managed.consumer-weights 允许你为每种类型设置一个权重,Flink将按比例保留托管内存。有效的消费者类型包括:

  • OPERATOR: 用于内置算法。
  • STATE_BACKEND: 用于流作业中的RocksDB State后端
  • PYTHON: 用于PYTHON进程

例如,如果流作业同时使用RocksDB State后端和Python UDFs,并且使用者权重配置为 STATE_BACKEND:70,PYTHON:30,则Flink将为RocksDB State后端保留总托管内存的70% ,为Python进程保留 30%

对于每种类型,只有当作业包含该类型的托管内存使用者时,Flink才会保留托管内存。

Flink不会为未包含在使用者权重中的使用者类型保留托管内存。如果作业实际需要缺少的类型,则可能导致内存分配失败。默认情况下,包括所有使用者类型。只有当显式配置/覆盖权重时,才会发生这种情况。

配置堆外内存(直接内存或者本地内存)

用户代码分配的堆外内存应计入任务堆外内存(taskmanager.memory.task.off-heap.size)。

还可以调整框架堆外内存(framework off-heap memory)。仅当你确信Flink框架需要更多内存时,才应该更改此值。

Flink将框架堆外内存和任务堆外内存包含在JVM的直接内存(direct memory)限制中,另请参阅JVM参数

注意:尽管本地非直接内存使用可以算作框架堆外内存或任务堆外内存的一部分,但这也将导致更高的JVM直接内存限制。

注意:网络内存(network memory)也是JVM直接内存的一部分,但它由Flink管理,并保证永远不会超过其配置的大小。因此,在这种情况下,调整网络内存的大小将没有帮助。

详细内存模型

注意:用户代码的本地非直接内存使用也算作任务堆外内存(task off-heap memory)的一部分

下表列出了上面描述的所有内存组件,及影响各个组件大小的Flink配置选项:

组件 配置 描述
Framework Heap Memory taskmanager.memory.framework.heap.size 专用于Flink框架的JVM堆内存(高级选项)默认128 mb
Task Heap Memory taskmanager.memory.task.heap.size 专用于Flink应用程序以运行Operator和用户代码的JVM堆内存,无默认大小
Managed memory taskmanager.memory.managed.size taskmanager.memory.managed.fraction 由Flink管理的本地内存,保留用于排序、哈希表、缓存中间结果和RocksDB state后端。size无默认大小,fraction默认0.4
Framework Off-heap Memory taskmanager.memory.framework.off-heap.size 专用于Flink框架的堆外直接(或本地)内存(高级选项)默认 128 mb
Task Off-heap Memory taskmanager.memory.task.off-heap.size 专供Flink应用运行operator的堆外直接(或本地)内存。默认 0 bytes
Network Memory taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction 为任务之间交换数据记录而保留的直接内存(例如,为网络传输进行缓冲)是total Flink memory的一个 capped fractionated component 。 该内存用于分配网络缓冲(network buffers)
min 默认64 mb
max 默认 infinite
fraction 0.1
JVM metaspace taskmanager.memory.jvm-metaspace.size Flink JVM 进程的元空间大小(Metaspace size) 默认 256mb
JVM Overhead taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction 为其他JVM开销保留的本地内存:例如线程堆栈、代码缓存、垃圾收集空间等,它是total process memory的一个capped fractionated component
min 默认 192 mb
max 默认 1 gb
fraction 默认 0.1

框架内存(Framework Memory)

不应该在没有充分理由的情况下更改框架堆内存(framework heap memory)和框架堆外内存(framework off-heap memory)。仅在你确信Flink需要更多内存用于某些内部数据结构或operator时,才调整它们。它可能与特定的部署环境或作业结构有关,例如高并行性。此外,在某些设置中,Flink依赖项(如Hadoop)可能会消耗更多的直接内存或本地内存。

注意 Flink目前没有隔离框架堆或堆外内存和任务内存的版本。

本地执行(Local Execution)

如果将Flink作为一个单独的java程序在机器上本地启动(例如,从IDE),而不创建集群,则除以下组件外,所有组件都将被忽略:

内存组件 相关选项 用于本地执行的默认值
Task heap taskmanager.memory.task.heap.size infinite
Task off-heap taskmanager.memory.task.off-heap.size infinite
Managed memory taskmanager.memory.managed.size 128MB
Network memory taskmanager.memory.network.min taskmanager.memory.network.max 64MB

上面列出的所有组件都可以但不必为本地执行显示的配置。如果未对其进行配置,则会将其设置为默认值

注意 本地执行的情况下,任务堆大小与实际堆大小没有任何关系。启动的本地进程的实际JVM堆大小不受Flink控制,取决于进程的启动方式。如果要控制JVM堆大小,则必须显式传递相应的JVM参数,例如-Xmx-Xms

设置Job管理器(JobManager)内存

JobManager是Flink集群的控制元素。它由三个不同的组件组成:Resource Manager、Dispatcher和JobMaster(每个运行Flink Job各一个)。

以下描述的内存配置从1.11*版本开始适用。

配置总内存(Total Memory)

设置内存配置的最简单方法是为进程配置总内存。如果使用本地执行模式运行JobManager进程,则不需要配置内存选项,不起任何作用。

详细配置

下表列出了上面描述的所有内存组件,及影响各个组件大小的Flink配置选项:

Component Configuration options Description
JVM Heap jobmanager.memory.heap.size job管理器的 JVM堆内存大小,无默认大小
Off-heap Memory jobmanager.memory.off-heap.size job管理器的堆外内存大小,包括直接内存和本地内存,默认 128 mb
JVM metaspace jobmanager.memory.jvm-metaspace.size Flink JVM进程的元空间大小。 默认 256 mb
JVM Overhead jobmanager.memory.jvm-overhead.min jobmanager.memory.jvm-overhead.max jobmanager.memory.jvm-overhead.fraction 为其他JVM开销保留的本地内存:例如线程堆栈、代码缓存、垃圾收集空间等,它是total process memory的一个capped fractionated component
min 默认 192 mb
max 默认 1 gb
fraction 默认 0.1

配置JVM堆(Heap)

如前所述,为JobManager设置内存的另一种方法是显式指定JVM Heap大小 (jobmanager.memory.heap.size)。它提供了对可用的JVM堆的更多控制,该堆由以下用户使用:

  • Flink框架
  • 在作业提交期间(例如,对于某些批处理源)或检查点完成回调中执行的用户代码

所需的JVM堆大小主要由正在运行的作业的数量、作业的结构以及对所提到的用户代码的要求决定。

注意 如果已显式配置了JVM堆,则建议既不设置总进程内存(total process memory),也不设置总Flink内存(total Flink memory)。否则,很容易导致内存配置冲突。

Flink脚本和CLI在启动JobManager进程时通过JVM参数-Xms-Xmx设置JVM堆大小

配置堆外内存(Off-heap Memory)

堆外内存组件可用于任何类型的JVM直接内存和本地内存使用。因此,还可以通过设置 jobmanager.memory.enable-jvm-direct-memory-limit 来启用JVM直接内存(JVM Direct Memory)限制 。如果配置了此选项,Flink将通过相应的JVM参数:-XX:MaxDirectMemorySize 将限制设置为堆外内存大小。

此组件的大小可以由jobmanager.memory.off-heap.size 配置。可以调整此选项,例如,如果JobManager进程抛出“OutOfMemoryError:Direct buffer memory”

堆外内存消耗可能来源以下:

  • Flink框架依赖关系(例如Akka网络通信)
  • 在作业提交期间(例如,对于某些批处理源)或检查点完成回调中执行的用户代码

注意 如果已显示配置 Total Flink MemoryJVM Heap,但尚未配置堆外(Off-heap)内存,则堆外内存的大小将派生为Total Flink memory - JVM Heap。堆外内存选项的默认值将被忽略

本地执行

如果在本地(例如从IDE)运行Flink而没有创建集群,那么JobManager内存配置选项将被忽略。

参考链接

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup/

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup_tm/

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup_jobmanager/

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config