Kafka 通信协议(译)

原文链接 A Guide To The Kafka Protocol

Kafka 的 Producer、Broker 和 Consumer 之间采用的基于 TCP 的二进制协议,完全是为了 Kafka 自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。所有的消息都是通过长度来分隔,并且由后面描述的数据类型组成。

数据类型

  • 定长基本类型
    INT8INT16INT32INT64UINT32

  • 变长基本类型
    STRINGBYTES
    一个表示长度的带符号整数 N 以及后续 N 字节的内容组成,长度如果为-1则表示空(NULL)。
    STRING 使用 INT16 表示长度,BYTES 使用 INT32 表示长度。

  • 数组
    用来处理重复的结构体数据,总是由一个代表元素个数的整数 N(INT32),以及后续 N 个重复结构体组成。
    这些结构体自身是由其他的基本数据类型组成。后面会把结构 foo 的数组显示为 [foo]

通用的请求和响应格式

所有请求和响应都源于以下语法,将会对这些语法逐步进行描述:

RequestOrResponse => Size (RequestMessage | ResponseMessage)
  Size => int32
Field Description
MessageSize 以字节为单位给出后续请求或响应消息的大小

请求(Request)

所有请求都具有以下格式:

RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Field Description
ApiKey 表示正在调用的API的ID(即它是元数据请求、生产请求、获取请求等)。
ApiVersion 表示正在调用的API的版本号。
版本号允许服务器正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式
CorrelationId 这是一个用户提供的整数。
将会被服务器回传给客户端。用于在客户机和服务器之间匹配请求和响应
ClientId 客户端的自定义的标识。
可以使用任意标识符,会被用在记录错误、监测统计信息等场景

响应(Responses)

Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
Field Description
CorrelationId 服务器回传给客户端的信息于请求中的信息一致

所有响应都是与请求成对匹配

消息集(Message sets)

生产和获取消息指令请求使用同一个消息集结构。Kafka 中的消息由一个键值对以及少量相关的元数据组成。消息集只是一系列带有偏移量和大小信息的消息序列。这种格式同时用于 broker 上的磁盘存储和线上数据格式(on-the-wire format)。

在 Kafka 中,消息集也是压缩的基本单位,同时允许消息递归地包含压缩的消息集,以允许批压缩。

在通讯协议中,消息集的前面没有类似的数组元素前面的 INT32 整数。

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32

消息体:

v0
Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Timestamp => int64
  Key => bytes
  Value => bytes
Field Description
Offset Kafka 中消息的偏移量
Crc 剩余消息字节的 CRC32 值,用来检查信息的完整性
MagicByte 这是一个版本ID,用于允许消息二进制格式的向后兼容演进,当前值是1
Attributes 此字节保存有关消息的元数据属性
Timestamp 消息的时间戳
Key 一个可选项,主要用来进行指派分区。可以为 null
Value 消息的实际内容,类型是字节数组,也可能是一个消息集。可以为 null

在 Kafka 0.11 中,消息集和消息的结构发生了变化。不仅添加了新字段以支持新功能,如一次性语义和记录头,而且消除了以前版本消息格式的递归性质,消息集(Messageset)称为 RecordBatch,包含一个或多个 Record(而不是 Message)。当启用压缩时,RecordBatch 将保持未压缩状态,但 Records 被压缩在一起。

RecordBatch =>
  FirstOffset => int64
  Length => int32
  PartitionLeaderEpoch => int32
  Magic => int8 
  CRC => int32
  Attributes => int16
  LastOffsetDelta => int32
  FirstTimestamp => int64
  MaxTimestamp => int64
  ProducerId => int64
  ProducerEpoch => int16
  FirstSequence => int32
  Records => [Record]

Record =>
  Length => varint
  Attributes => int8
  TimestampDelta => varint
  OffsetDelta => varint
  KeyLen => varint
  Key => data
  ValueLen => varint
  Value => data
  Headers => [Header]

Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey => string
  HeaderValueLen => varint
  HeaderValue => data

压缩(Compression)

Kafka 支持压缩消息以提高效率,当然,这比压缩一条消息要更复杂。单个消息可能没有足够的冗余信息以达到良好的压缩比,因此必须以特殊的批处理方式发送压缩消息。要被发送的消息被包装(未压缩)在一个 MessageSet 结构中,然后将其压缩并存储在一个 MessageValue 中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。

Kafka 目前支持两个压缩编解码器,编解码器编号如下:

Compression Codec
None 0
GZIP 1
Snappy 2

接口(APIs)

Kafka 六个核心的客户端请求的 API:

  • 元数据接口(Metadata API)
  • 生产消息接口(Produce API)
  • 获取消息接口(Fetch API)
  • 偏移量接口(Offset API)
  • 偏移量提交接口(Offset Commit API)
  • 偏移量获取接口(Offset Fetch API)

元数据接口(Metadata API)

这个 API 能获取以下信息:

  • 存在哪些 Topic?
  • 每个 Topic 有几个 Partition?
  • 每个 Partition 的 Leader 是哪个 broker?
  • 这些 broker 的地址和端口分别是什么?

这是唯一一个能发往集群中任意一个 broker 的请求消息。

因为可能有很多 Topic 存在,客户端可以提供一个可选 Topic 名称列表,只返回这些 Topic 的元数据。

返回的元数据是 Partition 级别的信息,以 Topic 分组集中在一起。每个分区的元数据中包含了 leader 以及所有副本,包括正在同步的副本的信息。

Topic Metadata Request

TopicMetadataRequest => [TopicName]
  TopicName => string
Field Description
TopicName 请求元数据的 Topic。如果为空,则请求所有主题元数据

Metadata Response

响应包含的每个分区的元数据,以 Topic 分组,使用 Broker id 指向具体的 Broker。

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]
Field Description
Leader 分区的 Leader 节点的 Broker id。如果在 Leader 选举过程中,没有 Leader 存在,值为 -1
Replicas 分区的活着的 slave 节点集合
Isr Replicas 集合中,所有处在与 Leader 跟随(表示数据已经完全复制到这些节点)状态的子集
Broker Broker 节点ID、主机名和端口信息

生产消息接口(Produce API)

用于向服务器发送消息集。为了提高效率,允许在一个请求中为多个主题分区发送消息集,使用通用消息集格式。

Produce Request

v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32
Field Description
RequiredAcks 表示服务端收到多少确认后才发送 Response 给客户端。
如果设置为0,那么服务端将不发送 Response。
如果设置为1,那么服务器将等到数据写入到本地日之后发送。
如果设置为-1,那么服务端将等到数据被所有的同步副本写入后再发送
Timeout 服务器等待接收 RequiredAcks 中确认数的最长时间(毫秒)
TopicName 发布到的 Topic
Partition 发布到的 Partition
MessageSetSize 后续消息集的长度(字节)
MessageSet 标准格式的消息集合

Produce Response

v0
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64

v1 (supported in 0.9.0 or later)
ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64
  ThrottleTime => int32

v2 (supported in 0.10.0 or later)
ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64
  Timestamp => int64
  ThrottleTime => int32
Field Description
Topic 此响应对应的 Topic
Partition 此响应对应的 Partition
ErrorCode 分区对应的错误信息
Offset 追加到该分区的消息集中的为第一个消息分配的偏移量
Timestamp 如果该主题使用了 LogAppendTime,消息集中的所有消息都有相同的时间戳
ThrottleTime 由于配额冲突而阻止请求的持续时间(毫秒)

获取消息接口(Fetch API)

用于获取某些主题分区的一个或多个日志块(a chunk of one or more logs),指定了主题、分区和起始偏移量,开始提取并返回消息块。通常,返回消息的偏移量大于等于起始偏移量。但是,对于压缩的消息,返回的消息的偏移量可能小于起始偏移量。这类的消息的数量通常较少,并且调用者必须负责过滤掉这些消息。

接口使用长轮询模型,如果没有足够的数据立即可用,可以在一段时间内阻塞。

服务器被允许在消息集末尾返回部分消息,客户端应该处理这种情况。

Fetch Request

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32
Field Description
ReplicaId 发起这个请求的副本节点ID,普通消费者客户端应该始终将其指定为-1
MaxWaitTime 如果没有足够的数据可发送时,最大阻塞等待时间(毫秒)
MinBytes 返回响应消息的最小字节数目,必须设置。
如果设为0,服务器将会立即返回,如果没有新的数据,会返回一个空消息集。
如果设为1,服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达到
TopicName 获取数据的 Topic
Partition 获取数据的 Partition
FetchOffset 获取数据的起始 Offset
MaxBytes 分区返回消息集所能包含的最大字节数

Fetch Response

v0
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32

v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  ThrottleTime => int32
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
Field Description
ThrottleTime 由于配额冲突而阻止请求的持续时间(毫秒)
TopicName 此响应对应的 Topic
Partition 此响应对应的 Partition
HighwaterMarkOffset 该分区日志结尾处的偏移量,可以用来确定日志结尾后面有多少条消息。
MessageSetSize 此分区的消息集的大小(字节)
MessageSet 此分区获取到的消息集

偏移量接口(Offset API,AKA ListOffset)

描述了一组主题分区的偏移量有效范围。

对于 v0 版本,响应包含请求分区的每个段的起始偏移量以及“日志结束偏移量(log end offset)”,也就是将附加到给定分区的下一条消息的偏移量。
对于 v1 版本(0.10.1.0+),Kafka 支持按消息中使用的时间戳搜索偏移量的时间索引,并对此 API 进行了更改以支持这一点。

Offset Request

// v0
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32


// v1 (supported in 0.10.1.0 and later)
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
Field Description
Time 用来请求一定时间(毫秒)前的所有消息。
两个特殊取值:
-1 表示获取最后一个offset
-2 表示获取最早的有效偏移量

Offset Response

// v0
OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64


// v1
ListOffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32
  ErrorCode => int16
  Timestamp => int64
  Offset => int64

偏移量提交接口(Offset Commit API)

Offset Commit Request

v0 (supported in 0.8.1 or later)
OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]]
  ConsumerGroupId => string
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string


v1 (supported in 0.8.2 or later)
OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  ConsumerGroupId => string
  ConsumerGroupGenerationId => int32
  ConsumerId => string
  TopicName => string
  Partition => int32
  Offset => int64
  TimeStamp => int64
  Metadata => string

v2 (supported in 0.9.0 or later)
OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
  ConsumerGroupId => string
  ConsumerGroupGenerationId => int32
  ConsumerId => string
  RetentionTime => int64
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string

在 v0、v1 版本中,每个分区的时间戳定义为提交时间戳,偏移量协调者将保存消费者所提交的偏移量,直到当前时间超过提交时间加偏移量保留时间(在 broker 配置中);如果时间戳没有设值,那么 broker 会将此值设定为接收到提交偏移量请求的时间。

在v2版本中,移除了时间戳,但是增加了一个全局保存时间域(KAFKA-1634)代替。

Offset Commit Response

v0, v1 and v2:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16

偏移量获取接口(Offset Fetch API)

Offset Fetch Request

v0 and v1 (supported in 0.8.2 or after):
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32

Offset Fetch Response

v0 and v1 (supported in 0.8.2 or after):
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string
  ErrorCode => int16

另外还有一些管理接口,这里不做介绍。再来看看下面的内容。

Some Common Philosophical Questions

有些人问,为什么不使用 HTTP。有许多原因,最主要的是客户端实现可以使用一些更高级的 TCP 特性,同时 HTTP 库在许多编程语言中是令人惊讶的烂。

还有人问,为什么不支持许多不同的协议。此前的经验是,如果必须在许多协议实现中移植新特性,是很难添加和测试的,并且大多数用户并不在乎支持多个协议这些特性,只是希望在自己选择的语言中实现了良好可靠的客户端。

另一个问题是,为什么不采用 XMPP,STOMP,AMQP 或现有的协议。这一问题的答案因协议而异,共同的问题是,协议决定了大部分的实现,如果没有对协议的控制权,就不能做一些想要做的事情。也许现在的实现方式比现有的协议更好。

最后一个问题是,为什么不使用的 Protocol Buffers 或 Thrift 来定义请求消息格式。这些库擅长帮助管理非常多的序列化的消息,然而,这里只有几个种类的消息,而且这些库跨语言的支持程度不同。

最后,通过对二进制日志格式和传输协议之间映射的管理,让 API 有明确的版本并且更细致地控制兼容性。

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

14 + 3 =