目 录CONTENT

文章目录

kafka

FatFish1
2025-08-20 / 0 评论 / 0 点赞 / 26 阅读 / 0 字 / 正在检测是否收录...

kafka的一些设计理念

这是理解 Kafka 所有机制的基石。Kafka 的核心存储结构是一个只能追加(Append-Only)的、有序的、不可变的(Immutable)消息序列,即日志(Log)

  • 顺序读写:磁盘顺序读写的性能(甚至超过内存随机读写)是 Kafka 高吞吐量的根本保障。所有消息都被顺序追加到文件末尾,极大地减少了磁盘寻道时间。

  • 不可变性:消息一旦写入就不能被修改。这简化了并发控制(不需要复杂的锁机制)、保证了消息顺序,并使得缓存(PageCache)策略非常高效。

  • 日志分段(Log Segment):一个主题分区(Partition)的日志在物理上被切分为多个段文件(Segment)(包括 .log 数据文件和 .index.timeindex 索引文件)。活跃的写入只会发生在最后一个段(Active Segment)。老的段文件会被定期清理或压缩(Compaction)。

kafka的分区分段机制可以与Pulsar做对比,参考如下:

http://www.chymfatfish.cn/archives/pulsar#%E5%88%86%E6%AE%B5%E3%80%81%E5%88%86%E5%8C%BA

Kafka的设计架构

存储机制:高效的“消息数据库”

物理存储结构

  • Topic -> Partition -> Segment: 一个 Topic 分为多个 Partition,每个 Partition 是一个逻辑日志,物理上由多个 Segment 文件组成。

  • 索引(.index & .timeindex): 为了快速定位消息,Kafka 为每个 Segment 维护了稀疏索引

    • .index 文件:映射 offset -> 物理位置(position)

    • .timeindex 文件:映射 时间戳 -> offset

    • 稀疏索引意味着它不会为每条消息建索引,而是隔几条消息建一个。查找时,先通过索引找到大致范围,再顺序扫描一小段数据,这是一种空间换时间的经典设计。

每个partition都是有序的,顺序就是消息被追加的顺序,每个partition上的每个消息都被赋了topic中唯一的offset值,当consumer每消费一个partition的消息,offset就会+1,consumer也可以跟踪和重设这个offset值,从而读取任意位置消息

topic里面的消息是无状态的,因此topic消息的过期只针对时间。可以设置这个过期时间从而达到释放磁盘空间的目的。

页缓存(PageCache)与零拷贝(Zero-Copy)

这是 Kafka 实现高吞吐的两大“杀手锏”。

  • 页缓存(PageCache)

    • Kafka 大量依赖操作系统本身的页缓存(PageCache)来缓存磁盘数据,而不是在 JVM 堆内维护缓存。这避免了昂贵的 GC 开销和对象开销。

    • 写入时:消息先被写入页缓存,由操作系统异步刷盘。

    • 读取时:优先从页缓存中读取,如果命中则速度极快。

  • 零拷贝(Zero-Copy)

    • 传统数据发送:磁盘 -> 内核缓冲区 -> 用户缓冲区 -> 内核Socket缓冲区 -> 网卡。经历了多次上下文切换和数据拷贝。

    • Kafka 的零拷贝:使用 sendfile() 系统调用,数据直接从页缓存发送到网卡, bypass 了应用程序(用户空间)。上下文切换次数和数据拷贝次数大幅减少,极大提升了消费效率。

生产者(Producer)机制

分区策略(Partitioning)

生产者决定将消息发送到哪个分区,这是实现负载均衡和语义保证的关键。

  • DefaultPartitioner

    • 如果指定了 key,则对 key 进行哈希(murmur2Hash),然后对分区数取模,确保相同 key 的消息总是进入同一分区,从而保证顺序性

    • 如果没有指定 key,则使用轮询(Round-Robin) 策略将消息均衡地分发到所有分区。

内存缓冲与批量发送(Batching)

生产者并非来一条消息就发一条,而是先存入内存缓冲区(RecordAccumulator)。

  • 批次(Batch): 发往同一分区的消息会被聚合成一个 Batch

  • 触发条件: 发送由两个参数控制,满足任一即发送:

    • linger.ms: 等待时间。即使 Batch 没满,也等待这么久后发送,旨在增加批量处理的几率。

    • batch.size: Batch 的大小。Batch 满了就立即发送。

  • 优点: 大幅减少网络 IO 次数,极大提升吞吐量。

以Batch的方式推送数据可以极大提高处理效率,producer可以将消息在内存中累计到一定数量再以batch发送请求,大小可自定,这里对IO的性能提升较大。

消息确认机制(Acks)

acks 参数决定了生产者认为请求完成的标准,这是在可靠性延迟之间的权衡。

  • acks=0“发后即忘”。不管是否成功写入服务器,立即视为发送成功。吞吐量最高,但可能丢失消息

  • acks=1(默认): Leader 副本成功写入本地日志即视为成功。平衡了吞吐和可靠性,但如果 Leader 刚写入就宕机且未同步到 Follower,消息仍会丢失。

  • acks=all(或 acks=-1): 最强保证。要求 Leader 收到所有 ISR(In-Sync Replicas) 副本的成功写入确认后才视为成功。可以保证只要至少一个 ISR 副本存活,消息就不会丢失

producer可以异步发送消息并且得到一个Future结果,返回的是offset或者发送异常。异步流程有一个ack参数,代表producer等待broker发送成功回应的数量。ack=0时,producer不等待broker响应,这时吞吐量达到最大;ack=1时,producer等待一个partition副本收到消息就会收到broker确认,兼顾性能和可靠性;ack=-1时,producer等待所有备份的partition副本收到消息时得到broker的确认

消费者(Consumer)机制

消费者组(Consumer Group)与重平衡(Rebalance)

  • 消费者组(Consumer Group): 多个消费者实例可以组成一个组来共同消费一个 Topic。

    • 核心规则一个分区只能被组内的一个消费者消费;一个消费者可以消费多个分区。

    • 这是实现横向扩展负载均衡的基础。

consumer需要关注与consumer group的特性:

  • 如果所有的消费者实例在同一个消费组中,消息记录会负载均衡到消费组中的每一个消费者实例

  • 如果所有的消费者实例在不同的消费组中,则会将每条消息记录广播到所有的消费组或消费者进程中

  • 重平衡(Rebalance)

    • 定义: 当消费者组成员发生变化(增、删、崩溃)或订阅的 Topic 分区数发生变化时,分区所有权在消费者间重新分配的过程。

    • 影响Rebalance 期间,整个消费者组会停止工作(Stop-The-World),是影响稳定性的一个重要因素。

    • 触发条件: 消费者心跳超时(session.timeout.ms)、处理消息超时(max.poll.interval.ms)等。

  • api:consumer有两套API:high-level api和Sample-api。

    • Sample-api维护了与单个broker之间的链接,且不做状态维护,每次pull都需要指定offset值,因此比较灵活,可以自由决定消费哪个消息;high-level api封装了对集群中一系列broker的访问,提供黑盒消费一个topic的能力,且维护已消费状态,不需要单独指定offset,每次消费都是下一个消息。

    • high-level api还支持consumer以组的形式消费topic,如果多个consumer有一个共同的组名,则各个consumer均衡地进行消费,类似一个消息队列服务;如果多个consumer消费一个topic但是组名不同,那么kafka将消息广播到每一个组,类似广播服务。

位移管理(Offset Management)

消费者需要记录自己消费到了哪个位置,这个位置叫位移(Offset)

  • 提交(Commit): 消费者需要定期将自己消费到的位移提交到 Kafka 一个特殊的内部 Topic(__consumer_offsets)中。

  • 交付语义(Delivery Semantics)

    • 至少一次(At least once): 消息绝不会丢,但可能重复消费(先提交位移后处理业务,若处理中途崩溃,重启后会从已提交的位移处重新消费)。

    • 至多一次(At most once): 消息可能丢失,但绝不会重复(先处理业务后提交位移,若处理完提交前崩溃,消息就丢了)。

    • 精确一次(Exactly once): 通过事务机制和幂等生产者实现,消息且只被处理一次。实现复杂,开销较大。

副本(Replication)与高可用机制

这是 Kafka 实现故障自动转移(Failover)和数据可靠性的核心。

Leader/Follower 模型

  • 每个 Partition 有多个副本(Replica),分散在不同 Broker 上。

  • 只有一个副本是 Leader,负责所有客户端的读写请求。

  • 其他副本都是 Follower,只做一件事:从 Leader 异步拉取(Fetch)消息,同步到自己的日志中

replications是topic的副本数量,即指定几个broker存放topic数据,提供了kafka的高可用性

partitions是topic的组成片的数量,producer产生数据会按照一定规则发布到各个partition中,只有一个partition的副本会被选举为leader进行读写

建议partition数量大于broker数量,这样leader partition就会均匀分布在各个broker中

ISR(In-Sync Replicas)机制

  • ISR: 与 Leader 副本保持同步的副本集合(包括 Leader 自己)。

  • 同步标准: Follower 副本的落后程度(replica.lag.time.max.ms)不能超过一定阈值。如果 Follower 在指定时间内未能追上 Leader,就会被踢出 ISR。

  • 核心作用

    1. 决定 acks=all 的成功条件

    2. 选举新 Leader: 当 Leader 宕机时,新的 Leader 必须从 ISR 中选举产生,这样才能保证数据一致性(不丢消息)。

控制器(Controller)与 Leader 选举

  • 控制器(Controller): Kafka 集群中唯一的一个特殊 Broker(通过 ZooKeeper/KRaft 竞选产生),负责管理集群状态。

  • 职责

    • 管理分区和副本的状态(如创建、删除)。

    • 监听 Broker 变化,在 Leader 副本宕机时,负责从 ISR 中为受影响的分区选举新的 Leader

    • 触发重平衡。

  • 高可用: 如果 Controller 所在 Broker 宕机,其他 Broker 会通过 ZooKeeper/KRaft 重新选举出新的 Controller,保证管理功能的高可用。

kafka高级特性

压缩

当以batch发送消息时,producer端可以通过GZIP或Snappy格式对消息集合进行压缩,从而降低网络IO压力

consumer端会通过kafka消息头部添加的一个描述压缩属性的字节后两位编码判断压缩方法,为0是不压缩

消息可靠性

从producer端看,可以通过设置ack属性,等待broker反馈后再发送下一个,甚至可以调整参数使所有备份partition副本都收到消息再发下一个

从consumer端看,可以通过回退offset属性重新消费上一个消费失败的消息

备份机制

备份机制可以让一个n个节点的集群在挂掉n-1个后还能正常运行。

在所有备份节点中有一个leader节点,它保存了其他备份节点列表,并维持各个备份间的状态同步

当leader节点挂了,controller在zk中的监视器将消息发送给controller,controller开始执行新leader的选举工作

kafka是基于ZK进行故障恢复选举,而ZK是基于zab算法的,参考zk部分:

http://www.chymfatfish.cn/archives/zookeeper

KafkaAdmin源码分析

AdminClient

create

AdminClient#create方法是kafka构造Admin的API的最上游,给用户直接使用

return (AdminClient) Admin.create(props);
// -- Admin#create --
return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);

KafkaAdminClient

createInternal - 构造KafkaAdmin的方法

networkClient = new NetworkClient(……

关注点1:这里构造了一个kafkaClient,客户端,对应kafka服务器做连接

return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient, timeoutProcessorFactory, logContext);

关注点2:之类初始化了KafkaAdminClient管理器,这个构造方法是真正初始化的地方

构造方法

this.runnable = new AdminClientRunnable();
……
this.thread = new KafkaThread(threadName, runnable, true);
……
thread.start();

构造方法中除了参数构造,还启动了一个AdminClientRunnable的线程,可以进一步看看这个可执行对象都是在执行些什么内容

listTopics - 获取全部topics的用户接口

runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
     new LeastLoadedNodeProvider()) {
     ……
     MetadataRequest.Builder createRequest(int timeoutMs) {
       return MetadataRequest.Builder.allTopics();
     }

方法中通过kafka预置的请求创建工具创建了一个allTopics请求,通过call方法,存入newCalls中,等待异步发送

AdminClientRunnable

核心成员变量包括:

// 还没进入发送流程的新请求
private final List<Call> newCalls = new LinkedList<>();
// 尚未分配到节点的客户端请求,马上就要找node发送
private final ArrayList<Call> pendingCalls = new ArrayList<>();
// 已经分配好了节点的请求
private final Map<Node, List<Call>> callsToSend = new HashMap<>();
// 已经标记为已发送的请求
private final Map<String, Call> callsInFlight = new HashMap<>();

通过这几个list可以看出来kafka客户端提交请求和发送请求是异步的流程

run

processRequests();

重点关注其中调用的方法,下面finally是处理异常关闭

processRequests - 不断拉取并发送请求到服务器

while (true) {
    drainNewCalls();

通过while true启动了一个发送请求的工作循环,可以看出来只要kafka在本地被构造出来,就会不断循环处理客户端的请求发送到服务器

首先通过drainNewCalls方法从newCalls里面提取请求,存入pendingCalls里面等待发送。这里可以看出来,捞取请求的线程和存入请求的线程并不是一个,因此kafka创建请求和发送请求是一个异步的流程

timeoutPendingCalls(timeoutProcessor);
timeoutCallsToSend(timeoutProcessor);
timeoutCallsInFlight(timeoutProcessor);

这里对pendingCalss、callsToSend、callsToFlight做超时处理

pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));

然后调用maybeDrainPendingCalls方法,检查pendingCalls里面的请求是否可以分配上节点,分配的上的,转移到callsToSend中

long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
if (metadataFetchDelayMs == 0) {
Call metadataCall = makeMetadataCall(now);
if (!maybeDrainPendingCall(metadataCall, now))
    pendingCalls.add(metadataCall);

然后这里判断是否需要获取元数据信息,如果需要则重新构造一个获取元数据信息的请求,然后看下能不能分配节点,能分配直接进callsToSend里面,分配不了,先扔进pendingCalls里面,下次再拉取更新。

pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));

然后完成calls的发送,发送完成后转移到callsToFlight

List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);

最后,kafka客户端获取响应,在poll方法中进行了连接的初始化,可以溯源方法调用,一直到NetworkClient#initiateConnect

maybeDrainPendingCalls

Iterator<Call> pendingIter = pendingCalls.iterator();
while (pendingIter.hasNext()) {
	……
	maybeDrainPendingCall(call, now)

maybeDrainPendingCall

Node node = call.nodeProvider.provide();
if (node != null) {
……
getOrCreateListValue(callsToSend, node).add(call);

可以分配节点,这里就调用getOrCreateListValue方法,实际上调用的是map的computeIfAbsent方法,添加到callsToSend里面

NetworkClient - kafka客户端

initiateConnect - 初始化连接

connectionStates.connecting(nodeConnectionId, now, node.host());

这里可以看到,node是在这个方法调用的时候就已经传进来的,这个时候选定了一个要连接的node,可以获取到这个node的所有信息,直接做连接就可以了。

那么node是如何获取到的呢?从该方法的调用点向上找,在maybeUpdate方法中找到这一个调用:

// -- maybeUpdate --
Node node = leastLoadedNode(now);

leastLoadedNode - node选取负载均衡

该方法其实是在做类似负载均衡操作,看上面的简介就可以知道:

Choose the node with the fewest outstanding requests which is at least eligible for connection

选择完成请求最少的node

List<Node> nodes = this.metadataUpdater.fetchNodes();

首先拿到所有node的列表

生成一个索引,遍历node

int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
if (canSendRequest(node.idString(), now)) {
    ……
    return node;

通过canSendRequest做判断,可以请求,就把这个node返回去。这里体现了Kafka的多备份、集群化的特点。

selector.connect(……

完成连接

KafakProducer源码分析

 KafkaProducer

send/doSend - 发送事件统一用户接口

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

首先完成消息体的序列化

RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, 
serializedKey,
        serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);

这里是kafka以batch发送消息部分代码,使用累计器进行累计

kafkaConsumer源码分析

KafkaConsumer

poll - 拉取事件的用户API

return poll(time.timer(timeout), true);

从这里隐约可以看出来是走的timer定时任务

final Fetch<K, V> fetch = pollForFetches(timer);

拉取的核心方法在这里

pollForFetches

sendFetches();

这里首先调用sendFetches发送拉取topic的请求

client.poll(pollTimer, () -> {
    return !fetcher.hasAvailableFetches();
});

最后这里的client调用到ConsumerNetworkClient和前面admin的

NetworkClient类似,可以看出来也是个异步的流程

ConsumerNetworkClient

poll

long pollDelayMs = trySend(timer.currentTimeMs());

在poll方法中首先完成发送

client.poll(pollTimeout, timer.currentTimeMs());

调用client#poll方法完成response拉取。这里是NetworkClient,不是ConsumerNetworkClient了。

send

ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
    requestTimeoutMs, completionHandler);
unsent.put(node, clientRequest);

send本身也是异步的,先把准备好的request放到unsend列表里面

Fetcher

sendFetches

final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
……
final RequestFuture<ClientResponse> future = client.send(fetchTarget, request);

这里调用client.send进行发送,还是走的ConsumerNetworkClient

Kafka开发案例

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaExample {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC_NAME = "test-topic";

    public static void main(String[] args) {
        // 1. 检查并创建Topic
        createTopicIfNotExists(TOPIC_NAME, 3, (short) 1);

        // 2. 发送消息
        sendMessage(TOPIC_NAME, "key1", "Hello Kafka!");
        sendMessage(TOPIC_NAME, "key2", "This is a test message");
        
        // 3. 接收消息
        receiveMessages(TOPIC_NAME);
    }

    /**
     * 检查Topic是否存在,不存在则创建
     */
    public static void createTopicIfNotExists(String topicName, int partitions, short replicationFactor) {
        // 创建Admin客户端配置
        Properties adminProps = new Properties();
        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        
        try (AdminClient adminClient = AdminClient.create(adminProps)) {
            // 检查Topic是否存在
            DescribeTopicsResult describeResult = adminClient.describeTopics(Collections.singletonList(topicName));
            
            try {
                // 尝试获取Topic描述(如果存在)
                describeResult.topicNameValues().get(topicName).get();
                System.out.println("Topic \"" + topicName + "\" already exists");
            } catch (ExecutionException e) {
                if (e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicException) {
                    // Topic不存在,创建Topic
                    System.out.println("Topic \"" + topicName + "\" does not exist. Creating...");
                    
                    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
                    CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic));
                    
                    try {
                        createResult.all().get();
                        System.out.println("Topic \"" + topicName + "\" created successfully");
                    } catch (InterruptedException | ExecutionException ex) {
                        System.err.println("Failed to create topic: " + ex.getMessage());
                    }
                } else {
                    System.err.println("Error checking topic existence: " + e.getMessage());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Thread interrupted: " + e.getMessage());
            }
        }
    }

    /**
     * 发送消息到指定Topic
     */
    public static void sendMessage(String topicName, String key, String value) {
        // 生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保消息完整写入
        
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 创建消息记录
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            
            // 发送消息并获取结果
            RecordMetadata metadata = producer.send(record).get();
            
            System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d\n",
                    metadata.topic(), metadata.partition(), metadata.offset());
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Error sending message: " + e.getMessage());
        }
    }

    /**
     * 从指定Topic接收消息
     */
    public static void receiveMessages(String topicName) {
        // 消费者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交offset
        
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 订阅Topic
            consumer.subscribe(Collections.singletonList(topicName));
            
            System.out.println("Starting to consume messages from topic: " + topicName);
            
            // 持续消费消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                if (records.isEmpty()) {
                    System.out.println("No more messages. Exiting...");
                    break;
                }
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
                            record.key(), record.value(), record.partition(), record.offset());
                }
                
                // 手动提交offset
                consumer.commitSync();
            }
        }
    }
}

0

评论区