生产者
# 创建生产者
创建生产者有三个属性是必选的 bootstrap.servers 指定broker集群的地址,格式为host:port key.serializer kafka broker希望收到消息的键和值都是字节数组,但是为了代码的可读性,允许producer把java对象发送给broker,但是需要指定一个实现了org.apache.kafka.common.serialization.Serializer接口的序列化器。 value.serializer 与key.serializer一样的道理。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.1,172.16.1.2");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
2
3
4
5
# 发送消息
发送消息有三种方式:
- 发送并忘记(fire-and-forget)
- 同步发送
- 异步发送
发送并忘记 不关心消息是否到达
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key,value);
producer.send(record);
2
同步发送
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key,value);
try{
producer.send(record).get();//producer.send(record)返回一个Future对象,调用get()阻塞等待返回结果
} catch (Exception e){
e.printStackTrace();
}
2
3
4
5
6
kafkaProducer会发送两类错误
- 可重试的:比如连接错误、无主(no leader)错误
- 不可重试的:比如消息太大 异步发送 异步发送的异常需要记录下拉,producer支持回调。
ProducerRecord<String, String> record = new ProducerRecord<>(topicNameTrade, key,value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
exception.printStackTrace();
}
});
2
3
4
5
6
7
# 生产者的配置
acks 指定要多少个分区副本收到消息,生产者才认为消息写入是成功的。其值有0、1、all.
- 0:不等待任何服务器的相应
- 1: 只要集群的首领节点收到
- all: 等待所有副本都收到
buffer.memory 设置生产者内存缓冲区大小。 程序发送消息的速度 > 发送到broker的速度会导致这个缓冲区空间不足。 空间不足时程序阻塞还是抛异常取决于配置max.block.ms compression.type 默认,消息是不会被压缩的。 producer向broker发送消息不是逐条发送的,是一批一批发送的。 发送批次有可选的压缩方式,有三种:
- snappy
- gzip
- lz4
retries 重试次数 默认重试之间会等待100ms retry.backoff.ms可设置这个等待时间 batch.size 有多个消息要发送到一个分区的时候,producer会把他们放到一个批次。 该配置指定是批次大小(字节)。 批次装满的时候会被发送出去,
# 序列化器
# 分区
kafka消息的recode包含topic、key、value。 broker用key分配分区。key相同的一定会分配到同一个分区。 如果key为null,那么将使用默认分区,默认的分区器是使用Round Robin算法。 也可以实现partitioner实现自定义分区器。