Kafka

Kafka 是一款开源的、高性能的、可靠的、易于扩展的分布式消息队列。
相对于传统MQ除了具有解耦、异步、削峰填谷三大特点之外还有一个最大的特点是 数据吞吐量大 常用在大数据场景,例如:日志收集,流式计算等。
img

架构图

img

Rebalance

什么时候会触发Rebalance?

  • 同一个消费者组中消费者实例数发生变化(掉线或新加入)
  • topic的分区数量发生变化

通常有哪些措施减小 Rebalance 带来的影响?

  • 不要在高峰期对topic进行扩缩容操作,同理也不要在高峰期对消费者节点扩缩容
  • 设置合理的超时时间和心跳检查频率,避免因为短时间的网络延迟触发没必要的 rebalance
  • 采用静态成员ID的策略,即使临时断线也不会立即踢出 group
    # 综上所述,添加如下几项配置可以显著减少 rebalance 的频率
    # 开启静态成员
    group.instance.id=consumer-1-instance
    # 增大 session.timeout.ms,避免因短时卡顿被剔除
    session.timeout.ms=30000
    # 设置 heartbeat.interval.ms < session.timeout.ms / 3,保持及时心跳
    heartbeat.interval.ms=10000
    # 如果消费逻辑处理时间较长,需调大该参数,防止 Kafka 误以为消费线程“死掉”
    max.poll.interval.ms=600000
    

ProducerInterceptor

用于在消息被序列化并发送之前,或发送回调之前,对消息进行拦截、修改、记录或其他处理。
示例:

public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 添加一个 traceId header
        Headers headers = record.headers();
        headers.add("traceId", UUID.randomUUID().toString().getBytes());

        // 也可以修改消息内容,例如加前缀
        String newValue = "[Intercepted] " + record.value();

        return new ProducerRecord<>(
            record.topic(),
            record.partition(),
            record.timestamp(),
            record.key(),
            newValue,
            headers
        );
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("消息成功发送:" + metadata.offset());
        } else {
            System.err.println("消息发送失败:" + exception.getMessage());
        }
    }

    @Override
    public void close() {
        // 清理资源
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 读取配置
    }
}

缓存机制

Kafka 的高性能很大程度上依赖其缓存机制。

accumulator

sender

ACKS

  • acks=0: 只管发,不管成功与否。
  • acks=1: 发完等待leader写入成功后,再通知Producer。
  • acks=all 或者 acks=-1: 发完等待所有副本写入成功后,再通知Producer。这里所说的所有副本不是实际的所有副本,而是所有参与复制的副本(ISR), 由 min.insync.replicas 所决定,min.insync.replicas 默认值为 1,推荐设置为 $\frac{n}{2}$ + 1, n为实际副本数。

results matching ""

    No results matching ""