Kafka
Kafka 是一款开源的、高性能的、可靠的、易于扩展的分布式消息队列。
相对于传统MQ除了具有解耦、异步、削峰填谷三大特点之外还有一个最大的特点是 数据吞吐量大 常用在大数据场景,例如:日志收集,流式计算等。
架构图
Rebalance
什么时候会触发Rebalance?
- 同一个消费者组中消费者实例数发生变化(掉线或新加入)
- topic的分区数量发生变化
通常有哪些措施减小 Rebalance 带来的影响?
- 不要在高峰期对topic进行扩缩容操作,同理也不要在高峰期对消费者节点扩缩容
- 设置合理的超时时间和心跳检查频率,避免因为短时间的网络延迟触发没必要的 rebalance
- 采用静态成员ID的策略,即使临时断线也不会立即踢出 group
# 综上所述,添加如下几项配置可以显著减少 rebalance 的频率 # 开启静态成员 group.instance.id=consumer-1-instance # 增大 session.timeout.ms,避免因短时卡顿被剔除 session.timeout.ms=30000 # 设置 heartbeat.interval.ms < session.timeout.ms / 3,保持及时心跳 heartbeat.interval.ms=10000 # 如果消费逻辑处理时间较长,需调大该参数,防止 Kafka 误以为消费线程“死掉” max.poll.interval.ms=600000
ProducerInterceptor
用于在消息被序列化并发送之前,或发送回调之前,对消息进行拦截、修改、记录或其他处理。
示例:
public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 添加一个 traceId header
Headers headers = record.headers();
headers.add("traceId", UUID.randomUUID().toString().getBytes());
// 也可以修改消息内容,例如加前缀
String newValue = "[Intercepted] " + record.value();
return new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
newValue,
headers
);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息成功发送:" + metadata.offset());
} else {
System.err.println("消息发送失败:" + exception.getMessage());
}
}
@Override
public void close() {
// 清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 读取配置
}
}
缓存机制
Kafka 的高性能很大程度上依赖其缓存机制。
accumulator
sender
ACKS
- acks=0: 只管发,不管成功与否。
- acks=1: 发完等待leader写入成功后,再通知Producer。
- acks=all 或者 acks=-1: 发完等待所有副本写入成功后,再通知Producer。这里所说的所有副本不是实际的所有副本,而是所有参与复制的副本(ISR), 由 min.insync.replicas 所决定,min.insync.replicas 默认值为 1,推荐设置为 $\frac{n}{2}$ + 1, n为实际副本数。