Kafka
简介
Kafka是由Linkedin公司开发的分布式、支持分区(partition)、多副本(replica),基于zookeeper的分布式消息系统。
最初是被设计用来解决LinkedIn公司内部海量日志传输等问题。
Kafka使用Scala语言编写,于2011年开源并进入Apache孵化器,2012年10月正式毕业,现在为Apache顶级项目;
特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
架构
场景应用
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源
设计思想
- Consumergroup:各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。
- 消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
- 消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
- 消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
- 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
- push-and-pull :Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
- Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
- 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
- 同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
- 分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。
- 离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。
- 插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
通信模式
kafka支持两种通信模式:
点对点模式:是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动去拉取消息进行消费。
发布订阅模式:该模式可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者
基础架构
- Producer:消息的产生者,是消息的入口。
- Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
- Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
- Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
- Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
- Message:每一条发送的消息主体。
- Consumer:消费者,即消息的消费方,是消息的出口。
- Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
- Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
分区(Partition)
Topic划分成多个分区(Partition),生产者生产的每条消息只会被发送到其中一个分区;
分区 (Partition) 都是一个有序的、不可变的数据序列,消息数据被不断的添加到序列的尾部;
分区中的每一条消息数据都被赋予了一个连续的数字ID,即偏移量 (offset) ,用于唯一标识分区中的每条消息数据;
Kafka 分区可以有如下策略:
轮询策略(Round-robin): 即顺序分配策略。如果一个Topic有3个分区,则第1条消息被发送到分区0,第2条被发送到分区1,第3条被发送到分区2,以此类推;
随机策略(Randomness): 是将消息随机地放置到任意一个分区上;
按消息键保序策略: Kafka允许为每条消息定义消息键,简称为Key。Key可以是一个有着明确业务含义的字符串,如客户代码、部门编号或是业务ID等;
基于地理位置的分区策略:
自定义分区策略:
1 2 3 4 5 6 7 8 9 10 11
//随机策略 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size()); //消息键保序 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size(); //基于地理位置的分区策略 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
分区删除策略:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
消息保序
kafka只能保证分区内有序,无法保证分区间有序,所以消费时,数据是相对有序的。
如果将Topic设置成单分区,该Topic的所有的消息都只在一个分区内读写,保证全局的顺序性,但将丧失Kafka多分区带来的高吞吐量和负载均衡的性能优势。
多分区消息保序的方法是按消息键保序策略,根据业务提取出需要保序的消息的逻辑主体,并建立消息标志位ID,对标志位设定专门的分区策略,
保证同一标志位的所有消息都发送到同一分区,既可以保证分区内的消息顺序,也可以享受到多分区带来的搞吞吐量。
消息重试只是简单将消息重新发送到原来的分区,不会重新选择分区。
消息路由策略
在通过API方式发布消息时,生产者是以Record为消息进行发布的。
Record中包含key与value,value才是消息本身,而key用于路由消息所要存放Partition。
消息要写入到哪个Partition并不是随机的,而是由路由策略决定。
指定Partition,直接写入指定Partition。
没有指定Partition但指定了key,则通过对key的hash值与Partition数量取模,结果就是要选出的Partition索引。
Partition和key都未指定,则使用轮询算法选出一个Partition。
增加分区时,Partition内的消息不会重新进行分配,随着数据继续写入,新分区才会参与再平衡。
Producer流程
消息生产过程如下:
Producer先通过分区策略确定数据录入的partition,再从Zookeeper中找到Partition的Leader
Producer将消息发送给分区的Leader。
Leader将消息接入本地的Log,并通知ISR(In-sync Replicas,副本同步列表)的Followers。
ISR中的Followers从Leader中pull消息,写入本地Log后向Leader发送ACK(消息发送确认机制)。
Leader收到所有ISR中的Followers的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer发送ACK,表示消息写入成功。
消费者组
消费者是以consumer group消费者组的方式工作;
由一个或者多个消费者组成一个组,共同消费一个topic;
每个partition在同一时间只能由group中的一个消费者读取,但是多个consumer group可以同时消费这个partition;
consumer采用pull(拉)模式从broker中读取数据;
ISR机制
Kafka采用ISR
(In-Sync Replicas)机制来保证多副本间数据的一致性,不同于Paxos
, Zab
等一致性算法,ISR需要较少的副本就可以满足可用性要求;
ISR具体做法如下:将所有次级副本数据分到两个集合,其中一个被称为ISR集合,这个集合备份数据的特点是即时和主副本数据保持一致,而另外一个集合的备份数据允许其消息队列落后于主副本的数据
AR(Assigned Repllicas)一个partition的所有副本(就是replica,不区分leader或follower)
ISR(In-Sync Replicas)能够和 leader 保持同步的 follower + leader本身 组成的集合。
OSR(Out-Sync Relipcas)不能和 leader 保持同步的 follower 集合
公式:AR = ISR + OSR
LEO(last end offset):当前replica存的最大的offset的下一个值
HW(high watermark):小于 HW 值的offset所对应的消息被认为是“已提交”或“已备份”的消息,才对消费者可见。
提交offset
在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中。把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交。
KafkaConsumer 类提供了 partition(TopicPartition) 和 committed(TopicPartition) 两个方法来分别获取上面所说的 postion 和 committed offset 的值。这两个方法的定义如下所示:
public long position(TopicPartition partition)
public OffsetAndMetadata committed(TopicPartition partition)
提交方式 :
自动 提交:
enable.auto.commit = true
默认的配置下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
手动提交:
同步提交:
异步提交
Kafka性能优化
使用最佳写方式;
使用
SendFile
直接将文件数据从内核页缓存复制到网卡缓冲区,减少内存拷贝次数;