Flink Metric(1.13)

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/

Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

注册 Metrics

可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext().getMetricGroup() 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

Metric 类型

Counter

Counter 用来计数。当前值可以使用 inc()/inc(long n)dec()/dec(long n) 进行增减。

// 实现 RichMapFunction 接口
public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    // 定义一个 Counter Metric
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    // Counter 增加 1
    this.counter.inc();
    return value;
  }
}

Gauge

Gauge 根据需要提供任何类型的值。需要先创建一个实现 org.apache.flink.metrics.Gauge 的类,返回值的类形没有限制。

Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString() 实现。

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        // 实现 org.apache.flink.metrics.Gauge 接口
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}

Histogram

Histogram 统计值的分布。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 加入一个新值
    this.histogram.update(value);
    return value;
  }
}

Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.13.0</version>
</dependency>

Meter

Meter 用来统计平均吞吐量。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 注册事件
    // markEvent(long n) 可以注册同时发生多个时间
    this.meter.markEvent();
    return value;
  }
}

同样添加 flink-metrics-dropwizard 依赖,可以使用 DropwizardMeterWrapper 实现

Scope

每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 A.B 是系统 Scope,C.D 是用户 Scope,E 是名称,那么标识符将是 A.B.C.D.E。

可以通过在 conf/flink-conf.yaml 中设置 metrics.scope.delimiter 键来配置用于标识符的分隔符(默认值:.)。

User Scope

定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)MetricGroup#addGroup(String key, String value)。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回值。

// 创建 Metric 时指定 Scope
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

System Scope

System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task()或属于哪个 Job()。

应该包含哪些上下文信息可以通过 conf/flink-conf.yaml 配置。

  • metrics.scope.jm

    • 默认值:<host>.jobmanager
    • JobManager 的所有 Metric
  • metrics.scope.jm.job

    • 默认值:<host>.jobmanager.<job_name>
    • JobManager 和 Job 的所有 Metric
  • metrics.scope.tm

    • 默认值:<host>.taskmanager.<tm_id>
    • TaskManager 的所有 Metric
  • metrics.scope.tm.job

    • 默认值:<host>.taskmanager.<tm_id><job_name>
    • TaskManager 和 Job 的所有 Metric
  • metrics.scope.task

    • 默认值:<host>.taskmanager.<tm_id><job_name><task_name><subtask_index>
    • Task 的所有 Metric
  • metrics.scope.operator

    • 默认值:<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
    • Operator 的所有 Metric

<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

例如:Operator Metric 的默认 Scope 格式为 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>,生成的标识符类似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,生成的标识符会变成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric

建议添加带有 ID 的变量(如:)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

  • JobManager: <host>

  • TaskManager: <host>, <tm_id>

  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_id>, <operator_name>, <subtask_index>

Reporter

Flink 允许向外部系统报告 Metric。

通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

  • metrics.reporter.<name>.<config>:Reporter 名称
  • metrics.reporter.<name>.class:Reporter 实现类
  • metrics.reporter.<name>.factory.class:Reporter 工厂类
  • metrics.reporter.<name>.interval:Reporter 调用间隔
  • metrics.reporter.<name>.scope.delimiter:Scope 标识符的分隔符(默认使用 metrics.scope.delimiter
  • metrics.reporter.<name>.scope.variables.excludes:可选项,以 “;” 分隔的变量列表,可以忽略这些变量
  • metrics.reporters:可选项,以 “,” 分隔的 Reporter 名称列表,表示应用哪些 Reporter,默认会包含所有配置的 Reporter。

Reporter 必须至少配置 classfactory.class 属性(使用哪个取决于 Reporter 的实现)。

配置 Reporter 示例

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

自定义 Reporter:

  • 实现 org.apache.flink.metrics.reporter.MetricReporter 接口
  • 如果要定时发送报告,实现 Scheduled 接口

下面列出了一些支持的 Reporter

JMX

org.apache.flink.metrics.jmx.JMXReporter

参数:

  • port – JMX 监听端口,建议使用范围:9250-9260。实际端口将显示在相关 Job 或 Task Manager 日志中。
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory 
metrics.reporter.jmx.port: 8789

通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 org.apache.flink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:org.apache.flink.job.task.numBytesOut。

key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如:host=localhost,job_name=MyJob,task_name=MyTask

Prometheus

org.apache.flink.metrics.prometheus.PrometheusReporter

参数:

  • port – Prometheus exporter 侦听的端口,默认为 9249,建议使用范围:9250-9260。
  • filterLabelValueCharacters – 可选项,过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink Metric 类型和 Prometheus Metric 类型映射

Flink Prometheus Note
Counter Gauge Prometheus Counters 不能递减
Gauge Gauge 只支持数值和布尔
Histogram Summary 分位数支持 .5, .75, .95, .98, .99, .999
Meter Gauge

PrometheusPushGateway

org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

参数

Key Default Type Description
deleteOnShutdown true Boolean 在关闭时,是否删除 PushGateway 中的 Metric。
filterLabelValueCharacters true Boolean 是否过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
groupingKey (none) String 指定 grouping key。格式:lable_name=label_value;lable_name=label_value;
host (none) String PushGateway 服务地址
jobName (none) String 指定作业,推送 metric
port -1 Integer PushGateway 服务端口
randomJobNameSuffix true Boolean 作业名称添加随机后缀

PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

系统 Metrics

默认情况下,Flink 收集的指标

CPU

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.CPU Load JVM CPU Load Gauge
Time JVM CPU Time Gauge

Memory

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.Memory Heap.Used 当前使用的堆内存量(字节) Gauge
Heap.Committed 保证JVM可用的堆内存量(字节) Gauge
Heap.Max 可用于内存管理的最大堆内存量(字节) Gauge
NonHeap.Used 当前使用的堆外内存量(字节) Gauge
NonHeap.Committed 保证JVM可用的堆外内存量(字节) Gauge
NonHeap.Max 可用于内存管理的最大堆外内存量(字节) Gauge
Metaspace.Used 元空间内存池中当前使用的内存量(字节) Gauge
Metaspace.Committed 元空间内存池中保证可供JVM使用的内存量(字节) Gauge
Metaspace.Max 元空间内存池中可以使用的最大内存量(字节) Gauge
Direct.Count direct 缓冲池中的缓冲区数 Gauge
Direct.MemoryUsed JVM 用于 direct 缓冲池的内存量(字节) Gauge
Direct.TotalCapacity direct 缓冲池中所有缓冲区的总容量(字节) Gauge
Mapped.Count mapped 缓冲池中的缓冲区数 Gauge
Mapped.MemoryUsed JVM 用于 mapped 缓冲池的内存量(字节) Gauge
Mapped.TotalCapacity mapped 缓冲池中所有缓冲区的总容量(字节) Gauge
Status.Flink.Memory Managed.Used 已经使用的托管内存量 Gauge
Managed.Total 托管内存总量 Gauge

Threads

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.Threads Count 活动线程的总数 Gauge

GC

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已经执行的垃圾回收总数 Gauge
<GarbageCollector>.Time 执行垃圾回收总耗时 Gauge

ClassLoader

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded 自JVM启动以来加载的类的总数 Gauge
ClassesUnloaded 自JVM启动以来卸载的类的总数 Gauge

Default Shuffle Service

代替 Network/IO 部分 Metrics

Scope 中缀 Metrics 描述 类型
TaskManager Status.Shuffle.Netty AvailableMemorySegments 未使用 memory segment 数量 Gauge
UsedMemorySegments 已使用 memory segment 数量 Gauge
TotalMemorySegments 分配的 memory segment 数量 Gauge
AvailableMemory 未使用内存量(字节) Gauge
UsedMemory 已使用内存量(字节) Gauge
TotalMemory 分配的内存量(字节) Gauge
Task Shuffle.Netty.Input.Buffers inputQueueLength input buffer 排队数量 Gauge
inPoolUsage 对 input buffer 使用情况的估计 Gauge
inputFloatingBuffersUsage 对浮动 input buffer 使用情况的估计 Gauge
inputExclusiveBuffersUsage 对独占 input buffer 使用情况的估计 Gauge
Shuffle.Netty.Output.Buffers outputQueueLength output buffer 排队数量 Gauge
outPoolUsage 对 output buffer 使用情况的估计 Gauge
Shuffle.Netty.<Input|Output>.<gate|partition> totalQueueLen 所有输入/输出通道中排队的缓冲区总数 Gauge
minQueueLen 所有输入/输出通道中排队缓冲区的最小数 Gauge
maxQueueLen 所有输入/输出通道中排队缓冲区的最大数 Gauge
avgQueueLen 所有输入/输出通道中排队缓冲区的平均数 Gauge
Task Shuffle.Netty.Input numBytesInLocal 此任务从本地源读取的总字节数 Counter
numBytesInLocalPerSecond 此任务每秒从本地源读取的字节数 Meter
numBytesInRemote 此任务从远程源读取的总字节数 Counter
numBytesInRemotePerSecond 此任务每秒从远程源读取的字节数 Meter
numBuffersInLocal 此任务已从本地源读取的网络缓冲区总数 Counter
numBuffersInLocalPerSecond 此任务每秒从本地源读取的网络缓冲区数 Meter
numBuffersInRemote 此任务已从远程源读取的网络缓冲区总数 Counter
numBuffersInRemotePerSecond 此任务每秒从远程源读取的网络缓冲区数 Meter

Cluster

Scope Metrics 描述 类型
JobManager numRegisteredTaskManagers TaskManager 数量 Gauge
numRunningJobs 运行中的 Job 数量 Gauge
taskSlotsAvailable 可用的 Task Slot 数量 Gauge
taskSlotsTotal 总的 Task Slot 数量 Gauge

Availability

如果启用了 Reactive Mode(1.13 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

Scope Metrics 描述 类型
Job restartingTime 重启 Job 所用的时间或当前重新启动的持续时间(毫秒) Gauge
uptime 作业未中断运行的时间。对于已完成的作业,返回-1(毫秒) Gauge
downtime 对于当前处于故障/恢复状态的作业,此中断期间所用的时间。返回0用于运行作业,对于已完成作业返回-1(毫秒) Gauge
fullRestarts 已过期,使用 numRestarts Gauge
numRestarts 自提交此作业以来的重新启动总数,包括完全重新启动和细粒度重新启动 Gauge

Checkpointing

如果启用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

Scope Metrics 描述 类型
Job lastCheckpointDuration 完成最后一个检查点所用的时间(毫秒) Gauge
lastCheckpointSize 最后一个检查点的总大小(字节) Gauge
lastCheckpointExternalPath 存储最后一个外部检查点的路径 Gauge
lastCheckpointRestoreTimestamp 存储最后一个检查点的时间戳(毫秒) Gauge
numberOfInProgressCheckpoints 正在进行的检查点数量 Gauge
numberOfCompletedCheckpoints 成功完成的检查点数量 Gauge
numberOfFailedCheckpoints 失败的检查点数量 Gauge
totalNumberOfCheckpoints 检查点总数(进行中、完成、失败) Gauge
Task checkpointAlignmentTime 最后一个 barrier 对齐完成的时间,或当前对齐已经持续的时间(纳秒) Gauge
checkpointStartDelayNanos 在创建最后一个检查点,到当前 Task 开始启动检查点的时间

这个延迟表示第一个 barrier 到达当前 Task 所需的时间

如果值比较高,表示 back pressure 存在
Gauge

IO

Scope Metrics 描述 类型
Job [<source_id>.[<source_subtask_index>.]]<operator_id>.<operator_subtask_index>.latency 指定 Source 到 Operator 的延迟分布(毫秒) Histogram
Task numBytesOut 此任务已发出的总字节数 Counter
numBytesOutPerSecond 此任务每秒发出的字节数 Meter
numBuffersOut 此任务已发出的网络缓冲区总数 Counter
numBuffersOutPerSecond 此任务每秒发出的网络缓冲区数 Meter
isBackPressured 此任务是否存在 back pressured Gauge
idleTimeMsPerSecond 此任务每秒空闲(无数据可处理)的时间(毫秒)

空闲时间不包括背压时间,因此如果任务被背压,则不空闲
Meter
backPressuredTimeMsPerSecond 此任务每秒背压的时间(毫秒) Meter
busyTimeMsPerSecond 此任务每秒繁忙(既不空闲也不背压)的时间(毫秒)

如果无法计算值,则可以为NaN。
Meter
Task/Operator numRecordsIn 此 Operator/Task 已接收的记录总数 Counter
numRecordsInPerSecond 此 Operator/Task 每秒接收的记录数 Meter
numRecordsOut 此 Operator/Task 已发出的记录总数 Counter
numRecordsOutPerSecond 此 Operator/Task 每秒发出的记录数 Meter
numLateRecordsDropped 此 Operator/Task 由于迟到而丢弃的记录数 Counter
currentInputWatermark 此 Operator/Task 收到的最后一个水印(毫秒)

对于多于1个输入的 Operator/Task,这是最后接收到的水印的最小值
Gauge
Operator currentInputNWatermark 对于多于1个输入的 Operator

此 Operator 接收到的最后一个水印中的第N个(毫秒),N 从 1开始:

currentInput1Watermark,currentInput2Watermark
Gauge
currentOutputWatermark 此 Operator 发出的最后一个水印(毫秒) Gauge
numSplitsProcessed 此数据源已处理的 InputSplit 总数

如果 Operator 是 Source
Gauge

Connectors

Kafka Connector

Scope Metrics 变量 描述 类型
Operator commitsSucceeded n/a 成功提交到 kafka 的 offset 总数。
如果启动了 offset commit 并且开启 checkpointing
Counter
Operator commitsFailed n/a 没有成功提交到 Kafka 的 offset 总数。
如果启动了 offset commit 并且开启 checkpointing
Counter
Operator committedOffsets topic, partition 对于每个分区,最后一次成功提交到 Kafka 的offset。
可以指定 topic 和 partition
Gauge
Operator currentOffsets topic, partition 对于每个分区,当前读取的 offset。
可以指定 topic 和 partition
Gauge

HBase Connector

Scope Metrics User Variables Description Type
Operator lookupCacheHitRate n/a Lookup 缓存命中率 Gauge

延迟跟踪

Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置(conf/flink-conf.yaml)或 ExecutionConfig 中将 latencyTrackingInterval 设置为正数。

Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

延迟监控的粒度,分为以下3档:

  • single:每个算子单独统计延迟;

  • operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;

  • subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。

需要注意:

  • LatencyMarker 记录的时间戳最终是靠 System.currentTimeMillis() 方法获取本地时间,要保证 Flink 集群内所有节点的时区、时间是同步的,可以用 NTP 等工具来配置。
  • 启用延迟 metric 会影响集群的性能(特别是 subtask 粒度)。官方建议仅用于调试目的。

REST API Integration

Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL:http://hostname:8081/jobmanager/metrics

查询 Metric 未聚合值

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

查询 Metric 聚合值

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

查询 Metric 部分值的聚合值

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

特殊字符需要转义(符合 URL 标准)

查看 Metric 列表

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定 Metric 的值(未聚合)

GET taskmanagers/\/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定 Metric 的聚合值

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定 Metric 的特定值的聚合值

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

Dashboard Integration

为 Task 或 Operator 收集的 Metric 也可以在仪表板中可视化。在作业的主页面上,选择 Metrics 选项卡。在 Graph 中选择一个任务后,可以使用 Add Metric 下拉菜单选择要显示的 Metric。

  • Task metrics 列表样式 <subtask_index>.<metric_name>
  • Operator metrics 列表样式 <subtask_index>.<operator_name>.<metric_name>

每个 Metric 可以被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。图表每10秒自动更新一次。

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注