Kafka 生产者 Java 实现

创建生产者

向 Kafka 写入消息,首先要创建一个生产者对象,该对象有3个必要属性。

  • bootstrap.servers
    指定 broker 的地址列表,地址的格式为 host:port
    不需要包含所有的 broker 地址,生产者会通过给定的 broker 查找到其他 broker 的信息。建议至少提供两个 broker 的信息,防止其中一个宕机。

  • key.serializer
    Kafka 客户端默认提供了 ByteArraySerializerStringSerializerIntegerSerializer,因此如果使用常见的 Java 对象类型,不需要实现自己的序列化器 。如果要自定义序列化器,需要实现 org.apache.kafka.common.serialization.Serializer 接口。
    需要注意,key.serializer 是必须设置的,即使只发送值类型。

  • value.serializer
    key.serializer 一样, value.serializer 指定的类会将值序列化发送给 broker。

创建生产者示例:

Properties kafkaProps = new Properties(); 
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer") ;
KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);

发送消息

实例化生产者对象后,就可以开始送消息了。发送消息主要有以下 3 种方式:

  • 发送并忘记(fire-and-forget)
    把消息发送给服务器,不关心是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

  • 同步发送(async)
    使用 send() 方法发送消息,会返回一个 Future 对象,调用 get() 方法阻塞等待返回,返回结果知道是否发送成功。

  • 异步发送(sync)
    使用 send() 方法并指定一个回调函数,服务器在返回响应时调用该函数。

最简单的消息发送方式:

ProducerRecord<String, String> record = 
    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

同步发送消息方式:

ProducerRecord<String, String> record = 
    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    // 调用 get() 方法获取返回值
    producer.send(record).get();
} catch (Excepti.on e) {
    e.printStackTrace();
}

send() 方法返回一个 Future 对象,然后调用 get() 方法等待 Kafka 响应。如果服务器返回错误, get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以获取消息的偏移量(offset)。

KafkaProducer 一般会发生两类错误。
一类是可重试错误,可以通过重发消息来解决。比如“连接错误”,可以通过再次建立连接来解决。另一类错误无出通过重试解决 ,比如“消息太大”。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。

异步发送消息方式:

class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record =
    new ProducerRecord<>("CustomerCountry","Biomedical Materials", "USA");
producer.send(record,new DemoProducerCallback());

回调方法,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类。如果 Kafka 返回错误,onCompletion 方法会接收到异常。

生产者配置

生产者除了必选参数还有很多可配置的参数,大部分都有合理的默认值。其中一些参数在内存使用、性能和可靠性方面对生产者会产生影响。

1. acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响,参数有如下选项:

  • acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,能够达到很高的吞吐量。

  • acks=1,只要集群的 leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达 leader 节点(比如 leader 重新选举未完成),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果客户端使用异步回调的方式可以提高吞吐量,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。

  • acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,可以保证不止一个服务器收到消息,即使有服务器发生崩溃,集群仍然可以运行。不过,延迟比 acks=1 时更高。

2. buffer.memory

设置生产者内存缓冲区的大小,生产者缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。

send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数(在0.9.0.0版本里被替换成了 max.block.ms)表示在抛出异常之前可以阻塞一段时间。

3. compression.type

默认情况下,消息发送时不会被压缩。该参数可以设置为 snappygziplz4,指定了消息被发送给 broker 之前使用哪一种压缩算也进行压缩。

snappy 占用较少的 CPU,能提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,这往往是向 Kafka 发送消息的瓶颈所在。

4. retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到 leader 节点)。在这种情况下,retries 参数决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。

默认情况下,生产者会在每次重试之间等待 lOOms,可以通过 retry.backoff.ms 参数设置。建议在设置之前,测试一下恢复一个崩溃节点需要多少时间,让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。

一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。只需要处理那些不可重试的错误或重试次数超出上限的情况。

5. batch.size

该参数指定了一个批次可以使用的内存大小(字节数),当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。当批次被填满,批次里的所有消息会被发送出去。

不过生产者井不一定都会等到批次被填满才发送,所以如果批次大小设置得很大,也不会造成延迟,只是会占用更多的内存。但如果设置得太小,生产者会更频繁地发送消息,会增加一些额外的开销。

6. linger.ms

指定生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,即使批次里只有一个消息。把 linger.ms 设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。这样会增加延迟,但会提升吞吐量。

7. client.id

该参数可以是任意的字符串,服务器会用它来识别消息的来橱,还可以用在日志和配额指标里。

8. max.in.flight.requests.per.connection

指定生产者在收到服务器晌应之前可以发送多少个消息。值越高,就会占用越多的内存,同时会提升吞吐量。设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

Kafka 可以保证同一个分区里的消息是有序的。如果生产者按照一定的顺序发送消息,broker 会按照这个顺序写入分区,消费者也会按照同样的顺序读取。在某些情况下,顺序是非常重要的(顺序代表了一定的业务含义)。一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection 设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

9. timeout.msrequest.timeout.msmetadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;metadata.fetch.timeout.ms 指定了生产者在获取元数据时等待服务器返回响应的时间;timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配,如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

10. max.block.ms

指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的最大阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

11. max.request.size

该参数用于控制生产者发送的请求大小。能发送的单个消息的最大值,或单个请求里所有消息总的大小最大值。broker 对可接收的消息最大值也有自己的限制(message.max.bytes),两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

12. receive.buffer.bytessend.buffer.bytes

分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果设为-1就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

消息序列化

如果发送到 Kafka 的对象不是简单的字符串或整型类型,可以使用序列化框架,如 Avro、 Thrift或 Protobuf,或者使用自定义序列化器。强烈建议使用通用的 序列化框架,首先看一下自定义序列化器的用法

现有如下客户类:

public class Customer {
    private int customerId;
    private String customerName;

    public Customer(int id, String name) {
        this.customerId = id;
        this.customerName = name;
    }

    // getter and setter
}

为这个类创建一个序列化器:

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) { }

    @Override
    public byte[] serialize(String topic, Customer data) {
        // serialize data to byte array
    }

    @Override
    public void close() { }
}

只要使用这个 ProducerRecord<String, Customer>,可以直接把 Customer 对象传给生产者。如果 Customer 字段随着业务发生变化,就会出现新旧悄息的兼容性问题。如果多个地方写入 Customer 数据,那么就需要使用相同的序列化器,如果序列化器发生改动,要在同一时间修改代码并上线。

基于以上和其他原因,不建议使用自定义序列化器。

使用 Avro 序列化

Apache Avro 是一种与编程语言无关的序列化格式,目的是提供一种共享数据文件的方式。

Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema, schema 一般会被内嵌在数据文件里。

Avro 有一个很棒的特性是,当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像 Kafka 这样的消息系统上。

假设最初的 schema 是这样的:

id 和 name 字段是必需的, faxNumber 是可选的,默认为 null。

{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name":"id","type":"int"},
        {"name":"name","type":"string"},
        {"name":"faxNumber","type": ["null", "string"], "default": "null"}
    ]
}

现在 schema 发生了变化,faxNumber 不再使用,而使用 email 字段

{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name":"id","type":"int"},
        {"name":"name","type":"string"},
        {"name":"email","type": ["null", "string"], "default": "null"}
    ]
}

更新到新版的 schema 后,旧记录仍然包含 faxNumber 字段,而新记录则包含 email 字段。部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?

在应用程序升级之前,会调用类似 getName()getId()getFaxNumber() 这样的方法。如果碰到使用新 schema 构建的消息,getName()getId() 仍然能够正常返回,getFaxNumber() 方法会返回 null。

在应用程序升级之后, getEmail() 方法取代了 getFaxNumber() 方法。如果碰到一个使用旧 schema 构建的消息,那么 getEmail() 方法会返回 null。

使用 Avro 的好处:修改了消息的 schema,但并没有更新所有负责读取数据的应用程序,而这样仍然不会出现异常或阻断性错误,也不需要对现有数据进行大幅更新。

不过有以下两个需要注意的地方。

  • 用于写入数据和读取数据的 schema 必须是相互兼容的。Avro 文档提到了一些兼容性原则。
  • 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。Avro 数据文件里就包含了用于写入数据的 schema,不过在 Kafka 里有一种更好的处理方式。

在 Kafka 中使用 Avro

Avro 的数据文件里包含了整个 schema,虽然单个 schema 开销很小,但是如果在每条 Kafka 消息里都嵌入 schema,整体的开销就会非常大。不管怎样,在读取记录时需要用到整个 schema,所以要先找到 schema。遵循通用的结构模式并使用“schema 注册表”来达到目的。schema 注册表并不属于 Kafka,现在已经有一些开源的 schema 注册表实现。在下面例子里,使用的是 Confluent Schema Registry

把所有写人数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的标 识符。负责读取数据的应用程序使用标识符从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。Avro 序列化器的使用方法与其他序列化器是一样的。

下面的例子展示了如何把生成的 Avro 对象发送到 Kafka(如何使用 Avro 生成代码请参考 Avro 文档)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); 
// 指定 serializer
props.put("key.serializer",
            "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.Serializer",
            "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 指向 schema 的存储位置
props.put("schema.registry.url", scheMaUrl);

String topic = "customerContacts";

Producer<String, CustoMer> producer = new KafkaProducer<String, Customer>(props);

while(true) {
    Customer customer = CustomerGenerator.getNext(); 
    SysteM.out.println("Generated customer" + customer.toString());
    ProducerRecord<String, Customer> record = 
            new ProducerRecord<>(topic, customer.getid(), customer);
    producer.send(record);
}

如果选择使用一般的 Avro 对象而非生成的 Avro 对象该怎么办?这个时候你
只需提供 schema 就可以

// ...

// 提供 Avro schema
String schemaString = "{\"namespace\": \"customerManagement.avro\","
    + "\"type\": \"record\","
    + "\"name\": \"Customer\","
    + "\"fields\": [ "
    + "{\"name\":\"id\", \"type\":\"int\"}, "
    + "{\"name\":\"name\", \"type\":\"string\"}, "
    + "{\"name\":\"email\", \"type\": [\"null\", \"string\"], \"default\": \"null\"} "
    + "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = new Schema();

GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomers);
customer.put("name", name);
customer.put("eMail", eMail);

ProducerRecord<String, GenericRecord> data =
            new ProducerRecord<String, name, customer);
producer.send(data);

分区

在之前的例子里,ProducerRecord 包含了 topic、key 和 value。Kafka 的消息是 一个个键值对,ProducerRecord 可以只包含 topic 和 value,key 可以设置为默认的 null。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。

如果键值为 null,井且使用了默认的分区器,记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。

自定义分区器

默认分区器(散列分区)是使用次数最多的分区器。除了散列分区之外,有时候也需要根据业务对数据进行不一样的分区。下面是一个自定义分区器的例子:

public class Mypartitioner implements Partitioner {

    public void configure(Map<String,?> configs) { }

    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes,
                        Cluster cluster) { 

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size();

        if ((keyBytes == null) || (!(key instanceOf String)))  
            throw new InvalidRecordException("We expect all Messages to have customer name as key");

        if (((String) key).equals("Banana"))
            // 键为 Banana 总是被分配到最后一个分区
            return numPartitions; 

        // 其他记录被散列到其他分区
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
    }

    public void close() { }
}

《Kafka权威指南》

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注