欧阳亮的博客

编程不止是一份工作,还是一种乐趣!!!

Kafka笔记

一、命令行操作


启动Kafka

./bin/kafka-server-start.sh ./config/server.properties

创建topic

./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic

查询topic列表

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

查询topic详细信息

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_topic

查询consumer group列表

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询consumer group详细信息

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumerGroup

启动一个消费者

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

发送消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic


二、生产者开发


首先使用maven导入kafka客户端库:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.11.0.0</version>
</dependency>


向kafka发送消息相对于订阅来说,要简单很多。我们需要一个KafkaProducer对象,实例化的时候需要传入一个Properties对象来配置一些参数,关于这些参数,我们稍后会详细解释。

public void send() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
    try {
        // do send job with producer here ...
    } finally {
        producer.close();
    }
}


ProducerRecord类,它是我们真正向Kafka发送的对象,是一个Pojo对象,封装了消息相关的信息:消息的key和value,发送的队列与分区,消息的时间,还有消息头信息。例如下面代码给消息设置了一个消息头from,值是System A。

String topic = "test_topic";
int partition = 0;
long timestamp = System.currentTimeMillis();
int key = 1;
String value = "this is the message";

List<Header> headers = new LinkedList<>();
headers.add(new RecordHeader("from", "System A".getBytes(Charset.forName("UTF-8"))) );

ProducerRecord<Integer, String> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);


发送消息只需要把ProducerRecord作为参数传给KafkaProducersend方法即可。

Future<RecordMetadata> f = producer.send(record);
try {
    System.out.println("Sent message, offset: " + f.get().offset() + ")");
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}


不难看出,KafkaProducer.send方法是异步执行的,异步执行结束后我们通过Future可以拿到一个RecordMetadata对象。RecordMetadata类很简单,里面封装了本条消息发送的一些信息:如offset、消息的时间,消息的大小等。如果希望同步发送消息,可以在send方法后立即调用Future.get()方法。

KafkaProducer.send方法还有一个重载版本,它接受一个Callback对象:

public interface Callback {
    public void onCompletion(RecordMetadata metadata, Exception exception);
}
/**
 * Send a record and invoke the given callback when the record has been acknowledged by the server
 */
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);


使用重载版本的send方法,我们不需要显示调用Future.get()方法,onCompletion方法会在消息成功递交之后被自动调用。这里需要注意的是如果消息发送过程中发生错误,onCompletion方法中RecordMetadata参数为空,Exception对象会保存具体的异常信息,但此时我们无法判断是哪个消息发送失败,也就没有办法进行重试了。


关于分区

我们知道,Kafka topic是分为多个分区的,这么设计的目的是为了进一步提升消息的并行消费能力,同时消息在同一个分区上是保序的。消息发送到哪个分区,是由Producer决定的。

在构造ProducerRecord对象时,partition参数的值决定了消息将被发送至哪个分区。那如果我们没有显式设置partition参数的值,消息将被发送到哪个分区呢?Kafka的设计者提供了一个Partitioner接口:

public interface Partitioner extends Configurable, Closeable {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    public void close();
}


DefaultPartitioner类是Partitioner接口的默认实现,Producer发送消息时如果没有显式指定分区,那消息对应的分区就由DefaultPartitioner决定:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
}

DefaultPartitioner的实现也不难理解:如果消息的key不为空的话,分区的取值基于key做hash运算得出;如果消息为空的话,就采取轮循的方式将消息发送到不同的分区。


Produer重要参数

参数说明默认值
bootstrap.serverskafka broker地址,多个地址的话用逗号隔开,如host1:port,host2:port
key.serializer消息key序列化类
value.serializer消息value序列化类
acks这是个很重要的参数,用于指定消息的持久性。
当它的值为0时,客户端生产者不会等待服务器的响应,也正因为此,返回的RecordMetadata对象中offset值是无意义的,固定为-1。在这个级别上,消息可能会丢失。

当它的值为1时,在partition leader将消息持久化到本地后立即响应客户端,不会等待任何follower的响应。在in-sync集合中的follower响应leader之前,但leader响应了客户端且发生宕机的情况下,会丢失消息。

当值为all时,partition leader将在in-sync集合中所有follower都响应成功后才会响应客户端,这是持久化最强的一个级别,但性能最差的一种。
1
linger.msThe producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.0 ms
batch.sizeThe producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.16384
max.request.sizeThe maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.1048576


三、消费者开发


Kafka Consumer的开发要比Producer复杂一些,这主要是由于Kafka的设计原理导致的。在Kafka中,一个topic被分成了一个或者多个日志分区。分区的引入是为了进一步提升消息的并行消费能力,同时消息在同一个分区上是保序的。Producer将消息写到这些分区的尾部,Consumer则以自己的步调读取。

Kafka另外还引入了Consumer Group的概念,以更高的层次抽象了“点对点”和“发布/订阅”两种传统队列模式。一个topic可以被多个Consumer Group消费,这类似于“发布/订阅”模式;对于一个组内的多个消费者来说,这样的设计达到了并行消费和容灾的效果,topic中的分区以一定的规则分配给它们,一个分区只能被分配给一个Consumer,这类似于“点对点”这种模式。例如下图中的分区0和1分配给了Consumer 1,分区2分配给了Consumer 2,组内消费者和partition的分配关系是一对多的关系:

Kafka分区与消费者组的关系图

分区和Consumer之间的分配关系是基于消费者组协调协议实现的。每个Consumer Group会选择一个broker作为自己的组协调者Group Coordinator。协调者负责管理Consumer Group的状态,它的主要工作就是负责协调分区的分配: 当有新的Consumer加入或者退出Consumer Group时,重新分配分区和Consumer的关系,这个过程叫做Group Rebalance。

当一个新的消费者组第一次订阅一个topic时,会从每个分区开始、或最后的位置开始读取数据,取决于Consumer中参数auto.offset.reset的设置。之后Consumer会顺序地读取和消费每个分区中的消息,并不断的提交当前的offset。在某个时刻,分区的结构相对于某个Consumer Group来说可能是这样的:

Partition分区某个时刻的结构图

当Group Rebalance发生时,分区有可能被分配给另一个Consumer。这时新的Consumer会从上次提交的位置last committed offset开始拉取消息,意味着消息可能会被重复消费(如上图示例中offset 1-6的消息)。

前面的partition结构图中还显示了另外两个位置信息:log end offset指向当前partition中最新一条消息即将写入的位置,而high watermark指向最后一条已经成功复制到所有在in-sync集合中的follower的消息。很明显,消费者只能取到high watermark及之前的消息,high watermark之后的消息,如果复制失败会被丢弃。


实现

好了,现在我们来看看具体的实现。接收消息时使用KafkaConsumer类,与消息发送类似,也是通过一个Properties对象来配置一些参数:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);


为了能够消费消息,Consumer需要指定要订阅的topics,像下面这样。

consumer.subscribe(Arrays.asList("test_topic"));


Kafka的设计需要Consumer自己主动的拉取数据,不像传统队列支持推送的方式,所以一般Consumer获取数据是包装在一个循环内的:

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            // handle record here
        }
    }
} finally {
    consumer.close();
}

poll方法会从分区拉取数据,但不仅如此,事实上,组内分区的分配、Group Rebalance等情况也是由poll方法驱动完成的。传递给poll方法的参数控制了消费者在当前位置因为等待消息的到来而阻塞的最长时间,一旦有新的消息就会立即返回;否则会一直等待直到超时才返回。

开发时应该正确的关闭Consumer,这不仅仅是清理已经使用的socket连接,也确保了Consumer及时通知协调者它已经从组中退出,需要做Group Rebalance。前面的例子使用了一个相对较小的timeout来确保在关闭Consumer时不会有太多的延迟。另一种办法是设置较长的timeout,不过这时应该使用wakeup调用来从事件循环中退出:

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<Integer, String> record : records) {
            // handle record here
        }
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}

通过另一个线程调用consumer.wakeup来进行中断操作,这个调用会导致抛出WakeupExceptionwakeup在其他线程中调用是安全的(KafkaConsumer类中只有这个方法是线程安全的)。


活跃度

作为Consumer Group的一部分,每个Consumer会被分配一部分分区,就像在这些分区上加了一个锁。只要锁被持有,组中的其他成员就不会读取这些分区,锁只属于唯一的Consumer。当你的Consumer是正常状态时,当然是最好不过了,因为这是防止重复消费的唯一方式。但如果Consumer挂掉了,你需要释放掉那个锁,这样才可以将分区重新分配给其他健康的成员。

Kafka的消费者组协调协议使用心跳机制解决了这个问题。每个Consumer都会定时地发送心跳给协调者,只要协调者持续接收到心跳,它会假设这个成员是健康的。每次接收到心跳,协调者就开始或者重置计时器。如果时间超过,没有收到Consumer的心跳,协调者标记Consumer为死亡状态,并将分配给该Consumer的分区重新分配给组内的其它Consumer。

计时器的时间间隔是通过参数session.timeout.ms配置的,这个参数确保应用程序崩溃或者网络将消费者和协调者隔离的情况下锁会被释放。所有的网络IO操作在调用poll或者其他的阻塞API,都是在前台完成的。Consumer并不使用任何的后台线程。这就意味着消费者的心跳只有在调用poll的时候才会发送给协调者。如果应用程序故障,就不会再发送心跳了,最终就会导致超时,然后组协调者就会开始Group Rebalance操作。

唯一存在的问题是如果Consumer处理消息花费的时间比session.timeout.ms还要长,就会触发一次不必要的Group Rebalance。可以通过设置更长的超时时间防止发生这样的情况。默认的超时时间是30秒,不过不要太长,过长的超时时间会导致协调者会花费较长时间才能检测到真正崩溃的Consumer。

递交语义

当一个新的Consumer Group第一次订阅一个topic时,会从每个分区开始、或最后的位置开始读取数据,取决于Consumer中参数auto.offset.reset的设置。之后Consumer会顺序地读取和处理各个分区中的消息,并不断的提交当前的offset。当Group Rebalance发生时,分区有可能被分配给另一个Consumer。这时新的Consumer会从上次提交的位置last committed offset开始重新拉取和处理消息。如果Consumer已经成功处理了一批消息,但是为这批消息提交offsets之前崩溃了,新的Consumer就会重复处理这批消息。提交offset的频率越高,重复处理的消息量就越少,但吞吐也越低。

参数enable.auto.commit控制Consumer是否自动提交offset(默认为true)。如果是自动提交,Consumer会周期性的提交offset,周期间隔时间由参数auto.commit.interval.ms指定。间隔时间越小意味着提交的频率就越高,重复处理的消息量就越少,吞吐也越低。还有一个细节要注意,自动提交offset是发生在poll方法的调用时,如果处理消息花费的时间大于周期间隔时间,那提交offset的频率实际上是由处理消息的快慢决定的

如果想要自己控制offset的提交,需要把参数auto.offset.reset的值设为false。手动控制提交给予应用程序更多的灵活性。

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<Integer, String> record : records) {
            // handle record here
        }

        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            // application specific failure handling
        }
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}

使用不带参数的commitSync方法会在最近一次调用poll的返回值中提交offsets。这个方法是同步的,直到提交成功或者出现不可恢复的错误而失败。特别需要关心的是超时问题,这种情况发生时,协调者会将Consumer从组中剔除出去,commitSync方法会抛出CommitFailedException。应用程序应该处理这种错误,比如尝试从上次成功提交的offset开始回滚。

通常情况下,应该保证只有在消息成功被处理之后,才提交offset。如果Consumer在提交offset之前崩溃了,那么已经成功处理的那部分消息就会被另一个Consumer重复处理。

上面的实现中,Consumer从分区获取消息后,是先处理消息,再提交offset,这样的方式实际上是实现了“At least once”的递交语义。如果把消息处理和提交offset的顺序交换,先提交offset,再处理消息,那就实现了“At most once”的递交语义了,就像下面这样:

Partition分区某个时刻的结构图

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);

        try {
            consumer.commitSync();
            for (ConsumerRecord<Integer, String> record : records) {
                // handle record here
            }

        } catch (CommitFailedException e) {

        }
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}

使用自动提交offset的方式提供“at least once”的递交语义,可能被重复处理的消息量和参数auto.commit.interval.ms的值有关系,消息处理越快、参数的值越大,在Rebalance发生时可能被重复处理的消息量就可能越多。


使用带参数的commitSync方法,也可以更细粒度的控制offset的提交,例如:

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        try {
            for (ConsumerRecord<Integer, String> record : records) {
                // handle record here ...
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(record.offset() + 1)));
            }
        } catch (CommitFailedException e) {
            // application specific failure handling
        }
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}

在本例中,我们显式地传递要提交的offset给commitSync方法。committed offset总是应该是Consumer读取的下一条消息的offset。每处理一条消息就提交一次,很明显会带来严重的性能问题,更理想的方式应该是每处理N条消息就提交一次,N在这时可以作为一个性能调节参数使用。这个例子中的commitSync方法的参数是一个map,从topic partition到一个OffsetAndMetadata的实例。

另一种更理想的办法是基于分区进行分组提交,每当一个分区的消息处理完,就提交一次,然后再处理另一个分区的消息。ConsumerRecord类提供了访问分区的方法,如:

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        try {
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<Integer, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<Integer, String> record : partitionRecords) {
                    // handle record here ...
                }
                long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
            }

        } catch (CommitFailedException e) {
            // application specific failure handling
        }
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}


到目前为止,我们介绍的都是同步提交offset。实际上也可以使用异步的方式提交,异步的好处是带来更高的吞吐量,但麻烦的是提交失败的情况以及后续的处理需要在一个独立的回调线程中处理。如:

try {
    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<Integer, String> record : records) {
            // handle record here
        }

        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> arg0, Exception e) {
                // handle commit result here
            }
        });
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
}

Consumer.commitAsync方法异步提交offset,它接收一个OffsetCommitCallback对象用来做提交的回调通知。