Flink 指标(一)

Flink 自带一个度量系统,允许收集和公开指标到外部系统。

注册指标

可以通过继承 RichFunction,在继承类里面调用 getRuntimeContext().getMetricGroup() 来访问 Flink 的指标系统,这个方法返回一个 MetricGroup 对象,可以通过这个对象创建和注册新的度量指标。

度量类型

支持 CountersGaugesHistogramsMeters 这四个类型的度量值。

Counter(计数器)

Counter 用于计数。可以使用 inc()/inc(long n)dec()/dec(long n) 更新(增加或减少)计数器。可以通过调用 MetricGroupcounter(String name) 方法来创建和注册 Counter 类型的度量值。

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用默认 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

也可以使用自己的 Counter 实现:

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用自定义 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

Gauges(测量)

Gauge 根据需要可提供任何类型的值。首先需要创建一个实现了 org.apache.flink.metrics.Gauge 接口的类,这个类对返回值的类型没有限制。然后,通过调用 MetricGroupgauge(String name, Gauge gauge) 方法创建和注册 Gauge 类型的度量指标。

new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

报告会把导出的数据转换成 String 类型,所以返回的统计类型需要实现 toString() 方法。

Histograms(直方图)

Histogram 用来测量长期变化值的分布。可以用过调用 MetricGrouphistogram(String name, Histogram histogram) 方法创建和注册 Histogram 类型的度量指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Flink 没有提供默认 Histogram 实现 ,但提供了一个允许使用 Codahale / DropWizard 直方图的包装类(Wrapper),添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Meters(仪表)

Meter 用来衡量平均吞吐量。可以通过 markEvent() 方法用来注册事件的发生。可以通过 markEvent(long n) 方法注册多个事件同时发生。可以通过调用 MetricGroupmeter(String name, Meter meter) 方法用来注册 Meter 类型的指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

Flink 提供了一个允许使用 Codahale / DropWizard 表的 Wrapper,添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()

    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

作用域(Scope)

为每个被报告的度量值分配一个标识符和一组键值对。标识符基于3个部分:

  1. 注册度量标准时的用户定义名称
  2. 可选的用户定义范围
  3. 系统提供的范围。

例如,如果A.B是系统作用域的,C.D是用户作用域的,E是度量值的名称。那么 A.B.C.D.E 就是这个度量值的标识符。
可以通过设置 conf/flink-conf.yaml 中的 metrics.scope.delimiterKeys 来配置要用于标识符的分隔符(默认值: .) 。

用户作用域(User Scope)

用户范围可以通过调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value) 来定义。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

系统作用域(System Scope)

系统范围包含有关度量的上下文信息,例如:在哪个 Task 中注册或该 Task 属于哪个 Job。

可以通过设置 conf/flink-conf.yaml 中的以下键,来配置需要包含哪些上下文信息。这些键值的格式由常量(比如“taskmanager”)和变量(比如“<task_id>”)组成,其中变量会在运行时被替换掉:

  • metrics.scope.jm
    默认值:<host>.jobmanager
    应用于 Scope 为 JobManager 的所有指标。

  • metrics.scope.jm.job
    默认值:<host>.jobmanager.<job_name>
    应用于 Scope 为 JobManager 和作业的所有指标。

  • metrics.scope.tm
    默认值:<host>.taskmanager.<tm_id>
    应用于 Scope 为 TaskManager 的所有指标。

  • metrics.scope.tm.job
    默认值:<host>.taskmanager.<tm_id>.<job_name>
    应用于 Scope 为 TaskManager 和作业的所有指标。

  • metrics.scope.task
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    应用于 Scope 为 Task 的所有指标。

  • metrics.scope.operator
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    应用于 Scope 为 Operator 的所有指标。

  1. 变量的数量或顺序没有限制。
  2. 变量区分大小写。
  3. 算子指标的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符
  4. 如果还想包含任务名称但省略 TaskManager 信息,则可以指定以下格式:
    metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

将产生类似于 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric 的标识符
5. 对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用 <job_id> 或通过为作业和算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:<host>
  • TaskManager:<host><tm_id>
  • 作业:<job_id><作业名称>
  • 任务:<task_id><task_name><task_attempt_id><task_attempt_num><subtask_index>
  • 算子:<operator_id><operator_name><subtask_index>
    对于 Batch API,<operator_id> 始终等于 <task_id>

用户变量

用户变量可以通过调用 MetricGroup#addGroup(String key, String value) 来定义。会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables() 返回。用户变量不能用于 Scope 定义中。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

延迟跟踪

Flink 允许跟踪在系统里处理数据记录的延迟时间。这个功能默认是关闭的,通过在配置文件或者 ExecutionConfig 设置 latencyTrackingInterval(一个正数),来开启功能。

在开启的状态下,数据源会定期的发出一个叫做 LatencyMarker 的数据记录,这条记录包含一个数据源发出这条数据的时间戳,这条记录不会超过正常的用户记录,因此,如果这条记录在排队等待操作的时候,会增加跟踪标记的延迟时间。

延迟标记不会记录绕过它进行操作的用户记录。延迟标记也不会记录在时间窗口缓冲区花费的时间,只有当算子操作无法接受新数据产生延迟时,才会记录延迟时间。

LatencyMarkers 可以导出位于数据源和下游算子的延迟分布。这个分布以 histogram 类型的指标发布。这个分布的粒度可以在 Flink 配置文件进行控制。

目前,Flink 假定集群的所有计算机的时钟都是同步的,建议设置 NTP 进行时间同步。

启用此功能将严重影响集群性能,强烈建议仅用于调试。

REST API 集成

可以通过 Monitoring REST API 查询度量标准。下面是可用端点列表,尖括号中的值是变量,例如 http://hostname:8081/jobs/<jobid>/metrics,必须请求变量 http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics

请求特定实体的指标:

  • /jobmanager/metrics
  • /taskmanagers//metrics
  • /jobs//metrics
  • /jobs//vertices//subtasks/

请求在相应类型的所有实体之间聚合的指标:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs//vertices//subtasks/metrics

请求在相应类型的所有实体的子集上聚合的度量标准:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs//vertices//subtasks/metrics?subtask=1,2,3

请求可用指标列表:

GET /jobmanager/metrics
[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定(未聚合)指标的值:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定指标的汇总值:

GET /taskmanagers/metrics?get=metric1,metric2
[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定指标的特定聚合值:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

Reference:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

8 + 3 =