主题和分区是Kafka中最重要的两个概念,主题相当于一个消息频道,分区则是将一个主题里的消息存放到不同的日志上进行持久化,合理的配置可以提高Kafka的性能和可用性。
在Kafka的bin目录下有很多脚本,其中kafka-topics.sh就是用来操作主题的。
查看主题列表,kafka是配置的chroot
./kafka-topics.sh --zookeeper localhost:2181/kafka --list
查看指定主题,第一行是名字,分区数,副本数和配置参数,第二行添加了Isr,也就是In-Sync-Replicas,与Leader副本保持同步的副本,包括Leader副本。
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ./kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic testTopic7
Topic: testTopic7 PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: testTopic7 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
创建主题,设置分区数,和副本数。
./kafka-topics.sh --zookeeper localhost:2181/kafka --describe --create --topic testTopic8 --partitions 3 --replication-factor 3
Kafka提供了代码方式新增主题,spring-kafka在这个基础上又进行了封装,只要声明NewTopic类型的Bean就可以,在1.0.0版本之后的Kafka如果分区数不一致,会自动增加分区。
@Configuration
public class TopicBean {
@Bean
public NewTopic testTopic1() {
return TopicBuilder.name("testTopic1")
.partitions(3)
.replicas(3)
.build();
}
@Bean
public NewTopic testTopic6() {
return TopicBuilder.name("testTopic6")
.partitions(5)
.build();
}
还可以修改主题的分区数,注意,分区数只能增加,不能减少。
./kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic testTopic7 --partitions 1
还支持为单独一个主题配置参数,不过后续这个功能会移到kafka-configs.sh脚本中,所以给了一个警告信息,这里的配置会覆盖broker的配置。
./kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic testTopic7 --config max.message.bytes=20000
./kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic testTopic7 --delete-config max.message.bytes
可以删除主题,不过需要在config/server.properties中配置delete.topic.enable,否则删除是无法生效的,从1.0.0版本开始,这个配置是默认打开的。其实也可以通过手动删除zookeeper中的数据来删除主题,先删除/config/topics和/brokers/topics下对应主题名目录,再删除Kafka日志目录下该主题对应的文件,前者是删除元数据,后者是删除持久化的消息。
./kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic testTopic7
如果只是想清空主题里的消息,也可以换个思路,单独设置该主题消息持久化的时间,修改到很短,等消息清空后再重置配置,需要等待几分钟可能才会删除掉。
// 过期时间修改为1毫秒
./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testTopic1 --alter --add-config retention.ms=1
// 查看消息最大偏移量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic1 --time -1
// 查看消息起始偏移量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic1 --time -2
// 删除配置
./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testTopic1 --alter --delete-config retention.ms
副本数是Kafka中用来保证高可用的一个东西,可以理解为主从节点,而且因为Kafka中有分区的概念,所以副本也是对应每个分区的,比如说副本因子为3,则表示该分区会有1个Leader副本供写和读,2个Follwer副本作为备份,注意,Kafka副本之所以副本,就是只有主副本可以读写,只有当主副本宕机了,其它副本才能有一个成为新的主副本进行读写,否则就只是一个备份。
既然是副本,那自然是均匀的分布到不同的节点上比较可靠,创建的时候Kafka会自动尽量均匀的把不同的分区的Leader副本分配到不同的节点上,也会尽量把每个分区的Follower副本和Leader副本分开,也提供了下面的脚本修改副本数,需要自己配置一个json格式的文件。
./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute --reassignment-json-file r.json
{
"version":1,
"partitions":[
{
"topic":"testTopic1",
"partition":0,
"replicas":[0,1,2],
"log_dirs":["any","any","any"]
}
]
}
针对Kafka的配置也会对性能造成不同的影响,比如同一个主题下,不同的分区其实是可以看作并行读写的,适当的提高分区数是可以增大吞吐量的,但是分区数过大则可能会造成吞吐量的下降,这个关系到节点配置。
所以Kafka提供了一个测试脚本,--num-records消息数量,--record-size消息大小,--throughput限流,小于0是不限流,--producer-props启动配置,结果集中records/sec是每秒发送的消息数,ms avg latency消息处理平均耗时,后面的50th、95th、99th和99.9th则表示50%、95%、99%和99.9%的消息处理耗时。
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ./kafka-producer-perf-test.sh --topic testTpoic1 --num-records 100 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
100 records sent, 183.823529 records/sec (0.18 MB/sec), 32.57 ms avg latency, 522.00 ms max latency, 27 ms 50th, 31 ms 95th, 522 ms 99th, 522 ms 99.9th.
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ./kafka-producer-perf-test.sh --topic testTpoic1 --num-records 10000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
10000 records sent, 7936.507937 records/sec (7.75 MB/sec), 401.31 ms avg latency, 687.00 ms max latency, 398 ms 50th, 664 ms 95th, 684 ms 99th, 687 ms 99.9th.
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ./kafka-producer-perf-test.sh --topic testTpoic1 --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
100000 records sent, 22461.814915 records/sec (21.94 MB/sec), 983.68 ms avg latency, 1227.00 ms max latency, 1086 ms 50th, 1177 ms 95th, 1200 ms 99th, 1224 ms 99.9th.
因为Kafka的分区其实是对持久化日志的追加写,所以分区上限其实在于系统对文件打开数量的限制,比如我本机就是4096的限制。
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# jps
11985 Kafka
98 QuorumPeerMain
15129 ProducerPerformance
18554 Jps
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ls /proc/11985/fd | wc -l
231
root@Half-PC:/opt/kafka_2.13-2.6.0/bin#
root@Half-PC:/opt/kafka_2.13-2.6.0/bin# ulimit -Hn
4096