Flink 指标(二)

报告(Reporter)

通过 conf/flink-conf.yaml 文件配置一个或多个 Reporters 来暴露度量值给外部系统,这些 Reporter 将在作业和任务启动的时候实例化。

  • metrics.reporter.<name>.<config>:名字为 <name> 的 Reporter 的通用设置
  • metrics.reporter.<name>.class:名字为 <name> 的 Reporter class
  • metrics.reporter.<name>.interval:名字为 <name> 的 Reporter 的间隔时间
  • metrics.reporter.<name>.scope.delimiter:名字为 <name> 的 Reporter 的标识符的分隔符(默认使用 metrics.scope.delimiter
  • metrics.reporters:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。

所有的 Reporter 配置至少需要配置 class 属性,还有一些允许配置记录间隔。下面是一些 Reporter 的配置实例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

包含 Reporter 的 jar 必须放到 /lib 文件夹,这样 Flink 就可以访问到这些 jar。
可以通过继承 org.apache.flink.metrics.reporter.MetricReporter 接口来实现自己的 Reporter,如果需要定期发送记录,需要继承 Scheduled 接口。

下面是一些支持的 Reporter:

JMX

(org.apache.flink.metrics.jmx.JMXReporter)
不需要添加额外的依赖就可以支持 JMX Reporter,默认是不激活的。

参数:

  • port – (可选)JMX 连接监听的端口。为了能够在一个主机上运行多个 Reporter 实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议端口范围(如 9250-9260),实际端口将显示在相关作业或 TaskManager 日志中。如果设置此设置,Flink 将为给定的端口/范围启动额外的 JMX 连接器。度量指标将在本地默认的JMX实例上显示。

配置示例:

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通过 JMX 公开的度量由域(domain)和键属性列表(key-properties)标识,这些属性一起构成对象名。

域始终以 org.apache.flink 开头,后跟一个通用的度量标识符。与通常的标识符不同,它不受作用域格式的影响,不包含任何变量,并且在跨作业时也是常量。例子:org.apache.flink.job.task.numbytesout

键属性列表包含与给定指标关联的所有变量的值,无论配置的作用域格式如何。例子:host=localhost,job_name=myjob,task_name=mytask

因此,域标识一个度量类,键属性列表标识该度量的一个(或多个)实例。

Ganglia

(org.apache.flink.metrics.ganglia.GangliaReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

  • host – 在 gmond.conf 中的 udp_recv_channel.bind 下配置的 gmond 主机地址
  • port – 在 gmond.conf 的 udp_recv_channel.port 下配置的 gmond 端口
  • tmax – 旧指标应保留多长时间的软限制
  • dmax – 旧指标应保留多长时间的硬限制
  • ttl – 传输的 UDP 包的生存时间
  • addressingMode – 要使用的 UDP 寻址模式(单播/多播)

配置示例:

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite

(org.apache.flink.metrics.graphite.GraphiteReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

  • host – Graphite 服务器主机地址
  • port – Graphite 服务器端口
  • protocol – 使用协议(TCP / UDP)

配置示例:

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus

(org.apache.flink.metrics.prometheus.PrometheusReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

  • port – (可选)Prometheus exporter 监听的端口,默认为 9249。为了能够在一个主机上运行多个报告实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议使用端口范围(如:9250-9260)。

配置示例:

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink 度量类型映射到 Prometheus 度量类型,如下所示:

Flink Prometheus Description
Counter Gauge Prometheus 计数器不能减
Gauge Gauge Prometheus 仅支持数字和布尔类型
Histogram Summary 分位数 .5,.75,.95,.98,.99 和 .999
Meter Gauge The gauge exports the meter’s rate

PrometheusPushGateway

(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

默认值 描述
deleteOnShutdown true 指定是否在关闭时从 PushGateway 中删除指标。
Host (none) PushGateway 服务器主机。
jobName (none) 将推送指标的作业名称。
port -1 PushGateway 服务器端口。
randomJobNameSuffix true 指定是否应将随机后缀附加到作业名称。

配置示例:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter 将指标推送到 Pushgateway,可由 Prometheus 抓取。

StatsD

(org.apache.flink.metrics.statsd.StatsDReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
参数:

  • host – StatsD 服务器主机
  • port – StatsD 服务器端口

配置示例:

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog

(org.apache.flink.metrics.datadog.DatadogHttpReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
Flink 指标,如任何变量 <host><job_name><tm_id><subtask_index><task_name><operator_name>,将被发送到 Datadog 作为标签。标签看起来像 host:localhostjob_name:myjobname

参数:

  • apikey – Datadog APIKeys
  • tags – (可选)发送到 Datadog 时将应用于度量标准的全局标记。标签应仅以逗号分隔

配置示例:

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j

(org.apache.flink.metrics.slf4j.Slf4jReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

配置示例:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

系统指标

Flink 默认会收集当前状态的指标,下文的表格中包括以下5列:

  • “Scope”列描述了生成系统范围的范围格式,比如,如果表格里面的值为“Operator”,那么“metrics.scope.operator”将作为指标的范围格式。如果表格包含使用斜线分割的多个值,那么系统将根据不同的值分别报告多个指标,比如同时包含 job- 和 taskmanagers 两个。
  • “Infix”(可选)列描述了附加哪个中缀到系统范围之后。
  • “Metrics” 列出了此系统范围和中缀注册的所有特性的名字。
  • “Description”列描述了指标测量的信息。
  • “Type”描述了指标的类型。

请注意,“infix” 和 “Metrics” 列中所有的点根据 “metrics.delimiter” 设置变化。

因此,为了推断指标的标识符:

  1. 先从“Scope”列获取范围格式。
  2. 如果“Infix”列有值的话,附加到范围格式后面,并根据“metrices.delimiter”设置附加相应的分隔符。
  3. 附加指标的名称。

CPU

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.CPU Load JVM CPU使用情况。 Gauge
Time JVM CPU时间。 Gauge

Memory

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Memory Heap.Used 当前使用的堆内存量(bytes)。 Gauge
Heap.Committed 保证可供 JVM 使用的堆内存量(bytes)。 Gauge
Heap.Max 可用于内存管理的最大堆内存量(bytes)。 Gauge
NonHeap.Used 当前使用的非堆内存量(bytes)。 Gauge
NonHeap.Committed 保证 JVM 可用的非堆内存量(bytes)。 Gauge
NonHeap.Max 可用于内存管理的最大非堆内存量(bytes)。 Gauge
Direct.Count 直接缓冲池中的缓冲区数。 Gauge
Direct.MemoryUsed JVM 用于直接缓冲池的内存量(bytes)。 Gauge
Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量(bytes)。 Gauge
Mapped.Count 映射缓冲池中的缓冲区数。 Gauge
Mapped.MemoryUsed JVM 用于映射缓冲池的内存量(bytes)。 Gauge
Mapped.TotalCapacity 映射缓冲池中的缓冲区数(bytes)。 Gauge

Threads

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Threads Count 活动线程总数。 Gauge

GarbageCollection

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已发生的集合总数。 Gauge
<GarbageCollector>.Time 执行垃圾收集所花费的总时间。 Gauge

ClassLoader

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded JVM 启动以来加载的类总数。 Gauge
ClassesUnloaded JVM 启动以来卸载的类总数。 Gauge

Network

Scope Infix Metrics Description Type
TaskManager Status.Network AvailableMemorySegments 未使用的内存段数。 Gauge
TotalMemorySegments 分配的内存段数。 Gauge
Task buffers inputQueueLength 排队的输入缓冲区数。 Gauge
outputQueueLength 排队输出缓冲区的数量。 Gauge
inPoolUsage 估计输入缓冲区的使用情况。 Gauge
outPoolUsage 估计输出缓冲区的使用情况。 Gauge
Network.
<Input|Output>.
<gate>
totalQueueLen 所有输入/输出通道中排队缓冲区的总数。 Gauge
minQueueLen 所有输入/输出通道中的最小排队缓冲区数。 Gauge
maxQueueLen 所有输入/输出通道中的最大排队缓冲区数。 Gauge
avgQueueLen 所有输入/输出通道中的平均缓冲区数。 Gauge

Cluster

Scope Metrics Description Type
JobManager numRegisteredTaskManagers 注册 TaskManager 的数量。 Gauge
numRunningJobs 正在运行的作业数量。 Gauge
taskSlotsAvailable 可用任务槽的数量。 Gauge
taskSlotsTotal 任务槽的总数。 Gauge

Availability

Scope Metrics Description Type
Job restartingTime 重新启动作业所花费的时间,或当前重新启动的持续时间(ms)。 Gauge
uptime 作业运行的时间不间断。对于已完成的作业,返回-1(ms)。 Gauge
downtime 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(ms)。 Gauge
fullRestarts 自提交此作业以来完全重新启动的总次数。 Gauge

Checkpointing

Scope Metrics Description Type
Job lastCheckpointDuration 完成最后一个检查点所花费的时间(ms)。 Gauge
lastCheckpointSize 最后一个检查点的总大小(bytes)。 Gauge
lastCheckpointExternalPath 存储最后一个外部检查点的路径。 Gauge
lastCheckpointRestoreTimestamp 在协调器上恢复最后一个检查点时的时间戳(ms)。 Gauge
lastCheckpointAlignmentBuffered 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(ms)。 Gauge
numberOfInProgressCheckpoints 进行中检查点的数量。 Gauge
numberOfCompletedCheckpoints 成功完成检查点的数量。 Gauge
numberOfFailedCheckpoints 失败检查点的数量。 Gauge
totalNumberOfCheckpoints 总检查点的数量(正在进行,已完成,失败)。 Gauge
Task checkpointAlignmentTime 最后一次屏障对齐完成所花费的时间(nanoseconds),或当前对齐到目前为止所用的时间(nanoseconds)。 Gauge

IO

Scope Metrics Description Type
Job <SOURCE_ID>.
<source_subtask_index>.
<operator_id>.
<operator_subtask_index>.
latency
从给定源子任务到算子子任务的延迟分布(ms)。 Histogram
Task numBytesInLocal 此任务从本地源读取的总字节数。 Counter
numBytesInLocalPerSecond 此任务每秒从本地源读取的字节数。 Meter
numBytesInRemote 此任务从远程源读取的总字节数。 Counter
numBytesInRemotePerSecond 此任务每秒从远程源读取的字节数。 Meter
numBuffersInLocal 此任务从本地源读取的网络缓冲区总数。 Counter
numBuffersInLocalPerSecond 此任务每秒从本地源读取的网络缓冲区数。 Meter
numBuffersInRemote 此任务从远程源读取的网络缓冲区总数。 Counter
numBuffersInRemotePerSecond 此任务每秒从远程源读取的网络缓冲区数。 Meter
numBytesOut 此任务已发出的总字节数。 Counter
numBytesOutPerSecond 此任务每秒发出的字节数。 Meter
numBuffersOut 此任务已发出的网络缓冲区总数。 Counter
numBuffersOutPerSecond 此任务每秒发出的网络缓冲区数。 Meter
Task/Operator numRecordsIn 此算子/任务已收到的记录总数。 Counter
numRecordsInPerSecond 此算子/任务每秒接收的记录数。 Meter
numRecordsOut 此算子/任务已发出的记录总数。 Counter
numRecordsOutPerSecond 此算子/任务每秒发送的记录数。 Meter
numLateRecordsDropped 此算子/任务因迟到而丢失的记录数。 Counter
currentInputWatermark 此算子/任务收到的最后一个水印(ms)。注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。 Gauge
Operator currentInput1Watermark 此算子在其第一个输入(ms)中收到的最后一个水印。注意:仅适用于具有2个输入的算子。 Gauge
currentInput2Watermark 此算子在其第二个输入中接收的最后一个水印(ms)。注意:仅适用于具有2个输入的算子。 Gauge
currentOutputWatermark 此算子发出的最后一个水印(ms)。 Gauge
numSplitsProcessed 此数据源已处理的InputSplits总数。 Gauge

Connectors

Kafka 连接器

Scope Metrics User Variables Description Type
Operator commitsSucceeded N / A 如果启用了偏移提交并且启用了检查点,则成功向 Kafka 提交的偏移提交总数。 Counter
commitsFailed N / A 如果启用了偏移提交并且启用了检查点,则 Kafka 的偏移提交失败总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方法,因此提交失败不会影响 Flink 的检查点分区偏移的完整性。 Counter
committedOffsets Topic,分区 对于每个分区,最后成功提交到 Kafka 的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 Gauge
currentOffsets Topic,分区 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 Gauge

Kinesis 连接器

Scope Metrics User Variables Description Type
Operator millisBehindLatest stream,shardId 对于每个 Kinesis 分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 Gauge
sleepTimeMillis stream,shardId 消费者在从 Kinesis 获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 Gauge
maxNumberOfRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时请求的最大记录数。 Gauge
numberOfAggregatedRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时获取的聚合 Kinesis 记录数。 Gauge
numberOfDeggregatedRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时获取的分解 Kinesis 记录的数量。 Gauge
averageRecordSizeBytes stream,shardId Kinesis 记录的平均大小(bytes),由消费者在单个 getRecords 调用中获取。 Gauge
runLoopTimeNanos stream,shardId 消费者在运行循环中花费的实际时间(ns)。 Gauge
loopFrequencyHz stream,shardId 一秒钟内调用 getRecords 的次数。 Gauge
bytesRequestedPerFetch stream,shardId 在一次调用 getRecords 中请求的字节数。 Gauge

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

9 − 5 =