网站首页 > 技术文章 正文
来源 | 「Stream Processing with Apache Flink」
作者 | Fabian Hueske and Vasiliki Kalavri
翻译 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
校对 | gongyouliu
编辑 | auroral-L
全文共9936字,预计阅读时间70分钟。
第十章 Flink 和流式应用运维
1. 运行和管理流式应用
1.1 保存点
1.2 通过命令行客户端管理应用
1.3 通过 REST API 管理应用
1.4 在容器中打包并部署应用
2. 控制任务调度
2.1 控制任务链接
2.2 定义 Slot 共享组
3. 调整检查点及恢复
3.1 配置检查点
3.2 配置状态后端
3.3 配置故障恢复
4. 监控 Flink 集群和应用
4.1 Flink Web UI
4.2 指标系统
4.3 延迟监控
5. 配置日志行为
? 小结
2. 控制任务调度
Flink 应用程序通过将算子并行化为任务并将这些任务分配到集群中的工作进程来并行执行。就像在许多其他分布式系统中一样,Flink 应用程序的性能在很大程度上取决于任务的调度方式。任务分配到的工作进程、与任务共存的任务以及分配给工作进程的任务数量会对应用程序的性能产生重大影响。
在“任务执行”中,我们描述了 Flink 如何将任务分配到槽,以及它如何利用任务链来降低本地数据交换的成本。在本节中,我们将讨论如何调整默认行为并控制任务链以及将任务分配到slot以提高应用程序的性能。
2.1 控制任务链接
任务链将两个或多个算子的并行任务合并为一个由单个线程执行的任务。合并任务通过方法调用交换记录,因此基本上没有通信成本。由于任务链提高了大多数应用程序的性能,因此在 Flink 中默认启用。
但是,某些应用程序可能无法从任务链中受益。一个原因是打破一系列昂贵的函数,以便在不同的slot上执行它们。你可以通过 StreamExecutionEnvironment 完全禁用应用程序的任务链:
StreamExecutionEnvironment.disableOperatorChaining()
除了为整个应用程序禁用任务链之外,还可以控制各个算子的任务链行为。要禁用特定算子的链接,可以调用它的disableChaining()方法。这将防止算子的任务被链接到前面和后面的任务(例如10-1)。
val input: DataStream[X] = ...
val result: DataStream[Y] = input.filter(new Filter1()).map(new Map1())
// disable chaining for Map2
.map(new Map2()).disableChaining()
.filter(new Filter2())
示例 10-1 中的代码产生了三个任务——Filter1 和 Map1 的链接任务、Map2 的单独任务和 Filter2 的任务,不允许链接到 Map2。
也可以通过调用其 startNewChain() 方法(示例 10-2)来使用算子启动一个新链。如果满足链接的要求,算子的任务将不会链接到前面的任务,但会链接到后续任务。
val input: DataStream[X] = ...
val result: DataStream[Y] = input
.filter(new Filter1())
.map(new Map1())
// start a new chain for Map2 and Filter2
.map(new Map2()).startNewChain()
.filter(new Filter2())
在示例 10-2 中,创建了两个链式任务:一个任务用于 Filter1 和 Map1,另一个任务用于 Map2 和 Filter2。请注意,新的链式任务以调用 startNewChain() 方法的算子开始——在我们的示例中为 Map2。
2.2 定义Slot共享组
Flink 的默认任务调度策略将程序的一个完整切片——一个应用程序的每个算子中最多一个任务分配到一个slot。根据应用程序的复杂性和算子的计算成本,这种默认策略可能会使处理slot超载。Flink 手动控制任务分配到槽的机制是slot共享组。
每个算子都是一个slot共享组的成员。属于同一slot共享组的算子的所有任务都由相同的slots处理。在一个slot共享组中,任务被分配到“任务执行”章节中所描述的——每个slot最多处理一个成员算子的任务。因此,一个slot共享组需要尽可能多的处理slot来满足其算子的最大并行度。不同slot 共享组中的算子任务不会由相同的slot执行。
默认情况下,每个算子都位于“默认”slot共享组中。对于每个算子,你可以使用slotSharingGroup(String)方法显式地指定其slot共享组。如果输入算子的所有成员都属于同一组,则算子将继承其输入算子的slot共享组。如果输入算子在不同的组中,则算子在“默认”组中。示例10-3展示了如何在Flink DataStream应用程序中指定slot共享组。
// slot-sharing group "green"
val a: DataStream[A] = env.createInput(...)
.slotSharingGroup("green")
.setParallelism(4)
val b: DataStream[B] = a.map(...)
// slot-sharing group "green" is inherited from a
.setParallelism(4)
// slot-sharing group "yellow"
val c: DataStream[C] = env.createInput(...)
.slotSharingGroup("yellow")
.setParallelism(2)
// slot-sharing group "blue"
val d: DataStream[D] = b.connect(c.broadcast(...)).process(...)
.slotSharingGroup("blue")
.setParallelism(4)
val e = d.addSink()
// slot-sharing group "blue" is inherited from d
.setParallelism(2)
示例10-3中的应用程序由五个算子、两个源、两个中间算子和一个sink算子组成。算子被分配到三个共享位置组:用绿色、黄色和蓝色来表示。图10-1显示了应用程序的JobGraph,以及如何将其任务映射到处理slot。
该应用程序需要10个处理slot。由于分配的算子的最大并行度,蓝色和绿色的slot共享组需要各4个slot。黄色slot共享组需要两个slot。
3. 调整检查点及恢复
在启用容错的情况下运行的 Flink 应用程序会定期获取其状态的检查点。检查点可能是一个代价比较昂贵的操作,因为需要复制到持久存储的数据量可能非常大。增加检查点间隔可以减少常规处理过程中的容错开销。但是,它也增加了作业在从故障中恢复后在赶上流的尾部之前需要重新处理的数据量。
Flink 提供了几个参数来调整检查点和状态后端。配置这些选项对于确保生产中流应用程序的可靠和平稳运行非常重要。例如,减少每个检查点的开销可以提高检查点频率,从而加快恢复周期。在本节中,我们将介绍用于控制检查点和应用程序恢复的参数。
3.1 配置检查点
当你为应用程序启用检查点时,你必须指定检查点间隔——JobManager 将在应用程序源启动检查点的时间间隔。
在StreamExecutionEnvironment上启用检查点:
val env: StreamExecutionEnvironment = ???
// enable checkpointing with an interval of 10 seconds.
env.enableCheckpointing(10000);
CheckpointConfig 提供了配置检查点行为的更多选项,可以从 StreamExecutionEnvironment 获取:
// get the CheckpointConfig from the StreamExecutionEnvironment
val cpConfig: CheckpointConfig = env.getCheckpointConfig
默认情况下,Flink 创建检查点以保证恰好一次的状态一致性。但是,它也可以配置为提供至少一次保证:
// set mode to at-least-once
cpConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
根据应用程序的特性、其状态的大小和检查点比配置的检查点间隔花费更多的时间。默认情况下,Flink 一次只允许一个检查点进行,以避免检查点占用太多常规处理所需的资源。如果——根据配置的检查点间隔——需要启动一个检查点,但有另一个检查点正在进行中,第二个检查点将被搁置,直到第一个检查点完成。
如果许多或所有检查点花费的时间比检查点间隔长,则此行为可能不是最佳的,原因有两个。首先,这意味着应用程序的常规数据处理将始终与并发检查点竞争资源。因此,它的处理速度会变慢,并且可能无法取得足够的进展来跟上传入的数据。其次,一个检查点可能会被延迟,因为我们需要等待另一个检查点完成导致检查点间隔较短,从而导致恢复期间的追赶处理时间更长。Flink 提供了参数来解决这些情况。
为确保应用程序可以取得足够的处理速度,你可以配置检查点之间的最小暂停时间。如果你将最短暂停时间配置为 30 秒,那么在检查点完成后的前 30 秒内将不会启动新的检查点。这也意味着有效的检查点间隔至少为 30 秒,并且最多同时发生一个检查点
// make sure we process at least 30s without checkpointing
cpConfig.setMinPauseBetweenCheckpoints(30000);
在某些情况下,你可能希望确保在配置的检查点间隔内获取检查点,即使检查点花费的时间长于间隔。一个例子是检查点需要很长时间但不消耗太多资源;例如,由于对外部系统的高延迟调用的操作。在这种情况下,你可以配置检查点的最大并发数。
// allow three checkpoint to be in progress at the same time
cpConfig.setMaxConcurrentCheckpoints(3);
保存点与检查点同时进行。由于检查点操作,Flink 不会延迟显式触发的保存点。无论正在进行多少检查点,保存点将始终启动。
为了避免长时间运行的检查点,你可以配置一个超时间隔,在此之后检查点将被取消。默认情况下,检查点会在 10 分钟后取消。
// checkpoints have to complete within five minutes, or are discarded
cpConfig.setCheckpointTimeout(300000);
最后,你可能还想配置检查点失败时会发生什么。默认情况下,失败的检查点会导致应用程序重新启动的异常。你可以禁用此行为并在检查点错误后让应用程序继续运行。
// do not fail the job on a checkpointing error
cpConfig.setFailOnCheckpointingErrors(false);
启用检查点压缩
Flink 支持压缩检查点和保存点。在 Flink 1.7 之前,唯一支持的压缩算法是 Snappy。你可以按如下方式启用压缩检查点和保存点:
注意,增量式RocksDB检查点不支持检查点压缩。
在应用程序停止后保留检查点
检查点的目的是在失败后恢复应用程序。因此,它们会在作业因失败或显式取消而停止运行时进行清理。但是,你还可以启用称为外部化检查点的功能,以在应用程序停止后保留检查点。
// Enable externalized checkpoints
cpConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
外部化检查点有两种选择:
√ RETAIN_ON_CANCELATION 在应用程序完全失败后和显式取消时保留检查点。
√ DELETE_ON_CANCELLATION 只在应用程序完全失败之后才保留检查点。如果应用程序被显式取消,检查点将被删除。
外部化检查点不会替换保存点。它们使用状态后端特定的存储格式,并且不支持重新缩放。因此,它们足以在应用程序失败后重新启动应用程序,但提供的灵活性不如保存点。一旦应用程序再次运行,你就可以创建一个保存点。
3.2 配置状态后端
应用程序的状态后端负责维护本地状态,执行检查点和保存点,并在发生故障后恢复应用程序状态。因此,应用程序状态后端的选择和配置对检查点的性能有很大的影响。在“选择状态后端”章节中更详细地描述了各个状态后端。
应用程序的默认状态后端是 MemoryStateBackend。由于它将所有状态保存在内存中,并且检查点完全存储在易失性和 JVM 大小受限的 JobManager 堆存储中,因此不建议用于生产环境。但是,它非常适合本地开发 Flink 应用程序。“检查点和状态后端”描述了如何配置 Flink 集群的默认状态后端。
你也可以显式地选择一个应用程序的状态后端:
val env: StreamExecutionEnvironment = ???
// create and configure state backend of your choice
val stateBackend: StateBackend = ???
// set state backend
env.setStateBackend(stateBackend)
可以使用最少的设置创建不同的状态后端,如下所示。MemoryStateBackend 不需要任何参数。但是,有些构造函数采用参数来启用或禁用异步检查点(默认启用)并限制状态大小(默认为 5 MB):
// create a MemoryStateBackend
val memBackend = new MemoryStateBackend()
FsStateBackend 只需要一个路径来定义检查点的存储位置。还有用于启用或禁用异步检查点的构造函数变体(默认启用):
// create a FsStateBackend that checkpoints to the /tmp/ckpfolder
val fsBackend = new FsStateBackend("file:///tmp/ckp", true)
RocksDBStateBackend只需要一个路径来定义检查点的存储位置,并使用一个可选参数来启用增量检查点(默认情况下禁用)。RocksDBStateBackend总是异步写入检查点:
// create a RocksDBStateBackend that writes incremental checkpoints
// to the /tmp/ckp folder
val rocksBackend = new RocksDBStateBackend("file:///tmp/ckp",true)
在“检查点和状态后端”中,我们讨论了状态后端的配置选项。当然,你也可以在应用程序中配置状态后端,覆盖默认值或集群范围的配置。为此,你必须通过将 Configuration 对象传递给你的状态后端来创建一个新的后端对象。有关可用配置选项的说明,请参阅“检查点和状态后端”:
// all of Flink's built-in backends are configurable
val backend: ConfigurableStateBackend = ???
// create configuration and set options
val sbConfig = new Configuration()
sbConfig.setBoolean("state.backend.async", true)
sbConfig.setString("state.savepoints.dir", "file:///tmp/svp")
// create a configured copy of the backend
val configuredBackend = backend.configure(sbConfig)
由于 RocksDB 是一个外部组件,它带来了自己的一组调整参数,这些参数也可以针对你的应用程序进行调整。默认情况下,RocksDB 针对 SSD 存储进行了优化,如果状态存储在旋转磁盘上,则不会提供出色的性能。Flink 提供了一些预定义的设置来提高常见硬件设置的性能。请参阅文档以了解有关可用设置的更多信息。你可以将预定义选项应用于 RocksDBStateBackend,如下所示:
val backend: RocksDBStateBackend = ???
// set predefined options for spinning disk storage
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
3.3 配置故障恢复
当检查点应用程序失败时,它将通过启动其任务、恢复其状态(包括源任务的读取偏移量)并继续处理来重新启动。在应用程序重新启动后,它正处于追赶阶段。由于应用程序的源任务被重置为较早的输入位置,因此它会处理它在失败之前处理的数据和应用程序关闭时积累的数据。
为了能够赶上流——到达它的尾部——应用程序必须以高于新数据到达的速度处理累积的数据。当应用程序在追赶时,处理延迟(输入可用到实际处理的时间)会增加。
因此,在应用程序重新启动以成功恢复其常规处理后,应用程序需要足够的备用资源用于追赶阶段。这意味着应用程序在常规处理期间不应接近 100% 的资源消耗。可用于恢复的资源越多,追赶阶段完成得越快,处理延迟恢复正常的速度就越快。
除了恢复的资源考虑之外,我们还将讨论另外两个与恢复相关的主题:重启策略和本地恢复。
重新启动策略
根据导致应用程序崩溃的故障,应用程序可能会再次被相同的故障杀死。一个常见的例子是应用程序无法处理的无效或损坏的输入数据。在这种情况下,应用程序最终会进入一个无限的恢复周期,消耗大量资源,而没有机会恢复到正常处理状态。Flink 提供了三种重启策略来解决这个问题:
√ 固定延迟重新启动策略(fixed-delay restart strategy):以固定的次数重新启动应用程序,并在重新启动尝试之前等待已配置的时间。
√ 故障率重新启动策略(failure-rate restart strategy):只要不超过可配置的故障率,故障率重启策略就会重启应用程序。故障率被指定为一个时间间隔内的最大故障数。例如,你可以配置一个应用程序,只要它在过去 10 分钟内失败的次数不超过 3 次就可以重新启动。
√ 不重启策略(no-restart strategy):不重启应用程序,但会立即失败。
应用程序的重启策略是通过StreamExecutionEnvironment配置的,如示例10-4所示。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
5, // number of restart attempts
Time.of(30, TimeUnit.SECONDS) // delay between attempts
)
)
如果未明确定义重启策略,则使用的默认重启策略是具有 Integer.MAX_VALUE 重启尝试和 10 秒延迟的固定延迟重启策略。
本地恢复
Flink 的状态后端(MemoryStateBackend 除外)在远程文件系统中存储检查点。这首先确保状态被保存和持久化,其次确保在工作节点丢失或应用程序重新缩放时可以重新分配状态。但是,在恢复期间从远程存储读取状态不是很有效。此外,在恢复时,可能会在故障前运行的同一工作人员上重新启动应用程序。
如果应用程序可以在同一台机器上重新启动,Flink 支持一种称为本地恢复的功能,以显著加快恢复速度。启用后,除了将数据写入远程存储之外,状态后端还会在其工作节点的本地磁盘上存储检查点数据的副本。当应用程序重新启动时,Flink 尝试将相同的任务调度到相同的工作节点。如果成功,任务首先尝试从本地磁盘加载检查点数据。如果出现任何问题,它们会回退到远程存储。
实现了本地恢复,使远程系统中的状态副本成为备用来源。任务只有在远程写入成功时才确认生成检查点。而且,检查点不会因为本地状态副本失败而失败。由于检查点数据被写入两次,本地恢复会增加检查点的开销。
可以在flink-conf.yaml文件中启用和配置集群的本地恢复特性,也可以为每个应用程序设置不同的状态后端配置:
√ state.backend.local-recovery:此标志用于启用或禁用本地恢复。默认情况下,本地恢复被禁用。
√ taskmanager.state.local.root-dirs:此参数指定存储本地状态副本的一个或多个本地路径。
本地恢复只影响键控状态,它总是分区的,通常占状态大小的大部分。算子状态不会存储在本地,需要从远程存储系统中检索。但是,它通常比键控状态小得多。此外,MemoryStateBackend 不支持本地恢复,它无论如何也不支持大状态。
4. 监控Flink 集群和应用
监控你的流作业对于确保其健康运行和及早检测错误配置、资源不足或意外行为的潜在故障至关重要。特别是当流作业是面向用户的应用程序中更大的数据处理管道或事件驱动服务的一部分时,你可能希望尽可能精确地监控其性能并确保它满足延迟、吞吐量、资源利用率的某些目标等等。
Flink 在运行时收集一组预定义的指标,还提供了一个框架,允许你定义和跟踪自己的指标。
4.1 Flink Web UI
要获得Flink集群的概况以及对作业在内部执行情况的了解,最简单的方法是使用Flink的Web UI。你可以通过以下地址访问访问仪表板:
http://<jobmanager-hostname>:8081
在主屏幕上,你将看到集群配置的概览,包括 TaskManager 的数量、已配置和可用的任务槽数量以及正在运行和已完成的作业。图 10-2 显示了仪表板主屏幕的一个实例。左侧的菜单链接到有关作业和配置参数的更多详细信息,还允许通过上传 JAR 提交作业。
如果单击正在运行的作业,你可以快速了解每个任务或子任务的运行统计信息,如图 10-3 所示。你可以检查交换的持续时间、字节和记录,并根据需要汇总每个 TaskManager。
如果单击 Task Metrics 选项卡,则可以从下拉菜单中选择更多指标,如图 10-4 所示。其中包括有关你的任务的更细粒度的统计信息,例如缓冲区使用情况、水位线和输入/输出速率。
Figure 10-4. Selecting metrics to plot
图 10-5 显示了所选指标如何显示为持续更新的图表。
Checkpoints 选项卡(图 10-3)显示有关先前和当前检查点的统计信息。在概览下,你可以查看已触发、正在进行、已成功完成或已失败的检查点数量。如果单击 History 视图,则可以检索更细粒度的信息,例如状态、触发时间、状态大小以及检查点对齐阶段缓冲的字节数。“摘要”视图聚合检查点统计信息,并提供所有已完成检查点的最小值、最大值和平均值。
最后,在配置下,你可以检查检查点的配置属性,例如设置的间隔和超时值。
同样,“背压”选项卡显示每个算子和子任务的背压统计信息。如果单击一行,则会触发反压采样,并且你将看到消息 Sampling in progress... 大约五秒钟。采样完成后,你将在第二列中看到背压状态。背压任务将显示 HIGH 符号;否则,你应该会看到一条漂亮的绿色 OK 消息。
4.2 指标系统
在生产中运行数据处理系统(如 Flink)时,必须监控其行为,以便能够发现和诊断性能下降的原因。Flink 默认收集多个系统和应用指标。每个算子、TaskManager 或 JobManager 收集指标。在这里,我们描述了一些最常用的指标,并请你参阅 Flink 的文档以获取可用指标的完整列表。
类别包括 CPU 利用率、使用的内存、活动线程数、垃圾收集统计信息、网络指标(例如排队的输入/输出缓冲区的数量)、集群范围的指标(例如正在运行的作业和可用资源的数量)、作业指标(包括运行时)、 重试次数和检查点信息、I/O 统计信息(包括本地和远程记录交换的次数)、水位线信息、特定于连接器的指标等。
注册和使用指标
要注册指标,你必须通过调用RuntimeContext上的getMetrics()方法来得到MetricGroup,如下面示例所示。
class PositiveFilter extends RichFilterFunction[Int] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext
.getMetricGroup
.counter("droppedElements")
}
override def filter(value: Int): Boolean = {
if (value > 0) {
true
}
else {
counter.inc()
false
}
}
}
METRIC GROUPS
Flink指标是通过MetricGroup接口注册和访问的。MetricGroup提供了创建嵌套的、命名的指标层次结构的方法,并提供了注册以下指标类型的方法:
Counter:
org.apache.flink.metrics.Counter 指标测量计数并提供递增和递减的方法。你可以使用 MetricGroup 上的 counter(String name, Counter counter) 方法注册计数器指标。
Gauge:
Gauge 指标计算某个时间点的任何类型的值。要使用 Gauge,你需要实现 org.apache.flink.metrics.Gauge 接口并使用 MetricGroup 上的 Gauge(String name, Gauge Gauge) 方法注册它。下面例子中的代码展示了 WatermarkGauge 指标的实现,它公开了当前的水位线。
public class WatermarkGauge implements Gauge<Long> {
private long currentWatermark = Long.MIN_VALUE;
public void setCurrentWatermark(long watermark) {
this.currentWatermark = watermark;
}
@Override
public Long getValue() {
return currentWatermark;
}
}
以字符串形式上报指标
度量报告器会将 Gauge 值转换为字符串,因此如果你使用的类型未提供,请确保提供有意义的 toString() 实现。
Histogram:
你可以使用直方图来表示数值数据的分布。Flink 的直方图特别适用于报告长值的指标。org.apache.flink.metrics.Histogram 接口允许你收集值,获取收集值的当前计数,并为目前看到的值创建统计信息,例如最小值、最大值、标准偏差和平均值。
除了创建你自己的直方图实现,Flink还允许你使用DropWizard直方图,通过添加依赖。如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>flink-version</version>
</dependency>
以及你可以使用DropwizardHistogramWrapper类在Flink程序中注册DropWizard直方图,如下面示例所示。
// create and register histogram
DropwizardHistogramWrapper histogramWrapper =
new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
)
metricGroup.histogram("myHistogram", histogramWrapper)
// update histogram
histogramWrapper.update(value)
Meter:
你可以使用 Meter 指标来衡量某些事件发生的速率(以每秒事件数为单位)。org.apache.flink.metrics.Meter 接口提供了方法来标记一个或多个事件的发生,获取每秒事件的当前速率,以及获取当前标记在仪表上的事件数量。
与直方图一样,你可以通过在 pom.xml 中添加 flink-metrics-dropwizard 依赖项并将meter包装在 DropwizardMeterWrapper 类中来使用 DropWizard meter。
范围和指标格式
Flink 指标属于一个范围,该范围可以是系统范围(系统提供的指标),也可以是用户范围(自定义、用户定义的指标)。指标由唯一标识符引用,该标识符最多包含三个部分:
1. 用户在注册指标时指定的名称
2. 一个可选的用户范围
3. 一个系统范围
例如,名称“myCounter”、用户范围“MyMetrics”和系统范围“localhost.taskmanager.512”将生成标识符“localhost.taskmanager.512.MyMetrics.myCounter”。你可以更改默认的“.”。delimiter 通过设置 metrics.scope.delimiter 配置选项。
系统范围声明了该指标所指的系统组件以及它应该包含哪些上下文信息。指标的范围可以是 JobManager、TaskManager、作业、算子或任务。你可以通过在 flink-conf.yaml 文件中设置相应的度量选项来配置度量应包含哪些上下文信息。我们在下表中列出了其中一些配置选项及其默认值。
Scope | Configuration key | Default Value |
JobManager | metrics.scope.jm | <host>.jobmanager |
JobManager和job | metrics.scope.jm.job | <host>.jobmanager.<job_name> |
TaskManager | metrics.scope.tm | <host>.taskmanager.<tm_id> |
TaskManager和job | metrics.scope.tm.job | <host>.taskmana ger.<tm_id>.<job_name> |
Task | metrics.scope.task | <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index> |
Operator | metrics.scope.operator | <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index> |
配置key包含常量字符串,例如“taskmanager”和尖括号中显示的变量。后者将在运行时替换为实际值。例如,TaskManager 指标的默认范围可能会创建范围“localhost.taskmanager.512”,其中“localhost”和“512”是参数值。下表显示了可用于配置指标范围的所有变量。
Scope | Available Variables |
JobManager: | <host> |
TaskManager: | <host>, <tm_id> |
Job: | <job_id>, <job_name> |
Task: | <task_id>, <task_name>, <task_attempt_id>,<task_attempt_num>, <subtask_index> |
Operator: | <operator_id>, <operator_name>, <subtask_index> |
每个作业的范围标识符必须是唯一的
如果同时运行同一作业的多个副本,由于字符串冲突,指标可能会变得不准确。为避免此类风险,你应确保每个作业的范围标识符是唯一的。这可以通过包含 <job_id> 轻松处理。
你还可以通过调用MetricGroup的addGroup()方法来定义指标的用户范围,如下面示例所示。
counter = getRuntimeContext
.getMetricGroup
.addGroup("MyMetrics")
.counter("myCounter")
访问指标
既然你已经学习了如何注册、定义和分组指标,你可能想知道如何从外部系统访问它们。毕竟,你收集指标可能是因为你想要创建实时仪表板或将测量结果提供给另一个应用程序。你可以通过报告器向外部后端公开指标,Flink 为其中几个提供了实现(见下表)。
Reporter | Implementation |
JMX | org.apache.flink.metrics.jmx.JMXReporter |
Graphite | org.apache.flink.metrics.graphite.GraphiteReporter |
Prometheus | org.apache.flink.metrics.prometheus.PrometheusReporter |
PrometheusPushGateway | org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter |
StatsD | org.apache.flink.metrics.statsd.StatsDReporter |
Datadog | org.apache.flink.metrics.datadog.DatadogHttpReporter |
Slf4j | org.apache.flink.metrics.slf4j.Slf4jReporter |
如果你想使用未包含在上述列表中的指标后端,你还可以通过实现org.apache.flink.metrics.reporter.MetricReporter接口来定义你自己的报告程序。
Reporters 需要在 flink-conf.yaml 中进行配置。将以下行添加到你的配置中将定义一个 JMX 报告器“my_reporter”,它侦听端口 9020-9040:
metrics.reporters: my_reporter
Metrics.reporter.my_jmx_reporter.class:org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
请参阅 Flink 文档以获取每个受支持报告器的完整配置选项列表。
4.3 延迟监控
延迟可能是你要监控的首要指标之一,以评估你的流式作业的性能特征。同时,它也是在 Flink 等语义丰富的分布式流引擎中定义最棘手的指标之一。在“延迟”中,我们将延迟广义地定义为处理事件所需的时间。你可以想象,如果我们尝试在具有复杂数据流的高速流作业中跟踪每个事件的延迟,那么此定义的精确实现在实践中会如何出现问题。考虑到窗口算子使延迟跟踪更加复杂,如果一个事件对多个窗口有贡献,我们是否需要报告第一次调用的延迟,还是需要等到我们评估一个事件可能属于的所有窗口?如果一个窗口多次触发怎么办?
Flink 遵循一种简单且低开销的方法来提供有用的延迟指标测量。它没有尝试严格测量每个事件的延迟,而是通过定期在源处发出特殊记录并允许用户跟踪此记录到达接收器所需的时间来近似延迟。这个特殊的记录被称为延迟标记,它带有一个时间戳,表明它是什么时候发出的。
要启用延迟跟踪,你需要配置从源发出延迟标记的频率。你可以通过在 ExecutionConfig 中设置 delayTrackingInterval 来做到这一点,如下所示:
env.getConfig.setLatencyTrackingInterval(500L)
间隔以毫秒为单位指定。收到延迟标记后,除下沉外的所有运营商都将其向下游转发。延迟标记使用与普通流记录相同的数据流通道和队列,因此它们跟踪的延迟反映了等待处理记录的时间。但是,它们不会测量处理记录所需的时间或记录在处理之前处于等待状态的时间。
算子将延迟统计信息保存在包含最小值、最大值和平均值以及 50、95 和 99 个百分位值的延迟量表中。Sink 算子保留每个并行源实例接收到的延迟标记的统计信息,因此检查接收器的延迟标记可用于估算记录遍历数据流所需的时间。如果你想在算子处自定义处理延迟标记,你可以覆盖 processLatencyMarker() 方法并使用 LatencyMarker 的方法 getMarkedTime()、getVertexId() 和 getSubTaskIndex() 检索相关信息。
注意时钟同步
如果你没有使用自动时钟同步服务(如 NTP),你的机器时钟可能会出现时钟偏差。在这种情况下,延迟跟踪估计将不可靠,因为其当前实现假设时钟同步。
配置日志行为
日志记录是另一个用于调试和理解应用程序行为的重要工具。默认情况下,Flink 使用 SLF4J 日志抽象和 log4j 日志框架。下面示例展示了一个MapFunction,它记录每个输入记录的转换操作。
import org.apache.flink.api.common.functions.MapFunction
import org.slf4j.LoggerFactory
import org.slf4j.Logger
class MyMapFunction extends MapFunction[Int, String] {
Logger LOG = LoggerFactory.getLogger(MyMapFunction.class)
override def map(value: Int): String = {
LOG.info("Converting value {} to string.", value)
value.toString
}
}
要更改log4j记录器的属性,请修改conf/文件夹中的log4j.properties文件。例如,以下行将根日志级别设置为“warning”:
log4j.rootLogger=WARN
要设置此文件的自定义文件名和位置,请将 - Dlog4j.configuration= 参数传递给 JVM。Flink 还提供了命令行客户端使用的 log4j-cli.properties 文件和命令行客户端在启动 YARN 会话时使用的 log4j-yarn-session.properties 文件。
log4j 的替代方案是 logback,Flink 也为此后端提供默认配置文件。要使用 logback 而不是 log4j,你需要从 lib/ 文件夹中删除 log4j。有关如何设置和配置后端,请参阅Flink的文档和logback手册。
? 小结
在本章中,我们讨论了如何在生产中运行、管理和监控 Flink 应用程序。我们解释了收集和公开系统和应用程序指标的 Flink 组件,如何配置日志系统,以及如何使用命令行客户端和 REST API 启动、停止、恢复和重新调整应用程序。
猜你喜欢
- 2024-10-30 爆肝整理5000字!HTAP的关键技术有哪些?| StoneDB学术分享会#3
- 2024-10-30 「经验分享」MindStudio基于AscendCL应用开发流程
- 2024-10-30 spark中 RDD代码演示 spark中的rdd
- 2024-10-30 图像处理中,如何抓住事物的不变特征
- 2024-10-30 TypeScript 4.7 正式发布「2022.05.24」「官文全文翻译」
- 2024-10-30 读书笔记丨《离线和实时大数据开发实战》
- 2024-10-30 视觉SLAM面试题汇总-2019年秋招第一部分
- 2024-10-30 一文了解GaussDB 200整体描述 一文快速了解中国5000年历史
- 2024-10-30 综述:特征点检测与匹配 常用的特征点检测算法
- 2024-10-30 深度学习中的3个秘密:集成,知识蒸馏和蒸馏
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)