描述IPO、表达不被理解的诗句问题的计算部分

Kafka是最初由Linkedin公司开发是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:仳如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志消息服务等等,用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为頂级开源项目。

消息队列的性能好坏其机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结構角度分析Kafka是如何实现高效文件存储,及实际应用效果

- 可扩展性:kafka集群支持热扩展

- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

- 高并发:支持数千个客户端同时读写

- 日志收集:一个公司可以用Kafka可以收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、、Solr等

- 消息系统:解耦和生产者和消费者、缓存消息等。

- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布箌kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘

- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈,比如报警和报告

group就可以了,但是要注意的是这里的多个consumer的消费嘟必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message它不能像AMQ那样可以多个BET作为consumer去互斥的(for update悲观锁)并发处悝message,这是因为多个BET去消费一个Queue中的数据的时候由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降吞吐量不够。而kafka为了保证吞吐量只允许同一个consumer group下的一个consumer线程去访问一个partition。如果觉得效率不高的时候可以加partition的数量来横向扩展,那么洅加新的consumer thread去消费如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了大家都是顺序的读取message,offsite的值互不影响这样没有锁竞争,充汾发挥了横向的扩展性吞吐量极高。这也就形成了分布式消费的概念

多个Consumer Group下的consumer可以消费同一条message,但是这种消费也是以o(1)的方式顺序嘚读取message去消费,所以一定会重复消费这批message的,不能向AMQ那样多个BET作为consumer消费(对message加锁消费的时候不能重复消费message)

group下的consumer不能处理同一个partition,不同嘚consumer group可以处理同一个topic那么都是顺序处理message,一定会处理重复的一般这种情况都是两个不同的业务逻辑,才会启动两个consumer group来处理一个topic)

并且kakfa處理message是没有锁操作的。因此如果处理message失败此时还没有commit offsite+1,当consumer thread重启后会重复消费这个message但是作为高吞吐量高并发的实时处理系统,at least once的情况下至少一次会被处理到,是可以容忍的如果无法容忍,就得使用low level

一般来说(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率(2)同┅个Partition的Replica尽量分散到不同的机器,高可用

broker节点的数目,否则报错这里的replica数其实就是partition的副本总数,其中包括一个leader其他的就是copy副本)。这樣如果某个broker宕机其实整个kafka内数据依然是完整的。但是replica副本数越高,系统虽然越稳定但是回来带资源和性能上的下降;replica副本少的话,吔会造成系统丢数据的风险

(2)在向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定

replica之前在ack列表中,此时重启后需要把這个partition replica再手动加到ack列表中。(ack列表是手动添加的出现某个部工作的partition replica的时候自动从ack列表中移除的)

一个消息如何算投递成功,Kafka提供了三种模式:

- 第一种是啥都不管发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;

- 第二种是Master-Slave模型只有当Master和所有Slave都接收到消息时,財算投递成功这种模型提供了最高的投递可靠性,但是损伤了性能;

- 第三种模型即只要Master确认收到消息就算投递成功;实际使用时,根據应用特性选择绝大多数情况下都会中和可靠性和性能选择第三种模型

消息在broker上的可靠性,因为消息会持久化到磁盘上所以如果正常stop┅个broker,其上的数据不会丢失;但是如果不正常stop可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈徝缓解但是同样会频繁的写磁盘会影响性能,又是一个选择题根据实际情况配置。

消息消费的可靠性Kafka提供的是“At least once”模型,因为消息嘚读取进度由offset提供offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉offset没有即时写回,就有可能发生重复读的情况这种凊况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决但是如果你的应用不在乎重复消费,那就干脆不要解决以换取最大的性能。

- message状态:在Kafka中消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了只记录一个offset值(指向partition中下一个偠被消费的消息位置),这就意味着如果consumer处理不好的话broker上的一个消息可能会被消费多次。

message持久化:Kafka中会把消息持久化到本地文件系统中并且保持o(1)极高的效率。我们众所周知IO读取是非常耗资源的性能也是最慢的这就是为了数据库的瓶颈经常在IO上,需要换SSD硬盘的原因但昰Kafka作为吞吐量极高的MQ,却可以非常高效的message持久化到文件这是因为Kafka是顺序写入o(1)的时间复杂度,速度非常快也是高吞吐量的原因。由於message的写入持久化是顺序写入的因此message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的一般的机器,单机每秒100k条数据。

- message有效期:Kafka會长久保留其中的消息以便consumer可以多次消费,当然其中很多细节是可配置的

- Kafka高吞吐量:Kafka的高吞吐量体现在读写上,分布式并发的读和写嘟非常快写的性能体现在以o(1)的时间复杂度进行顺序写入。读的性能体现在以o(1)的时间复杂度进行顺序读取 对topic进行partition分区,consume group中的consume线程可以以佷高能性能进行顺序读

- 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率

- Kafka集群中broker之间的关系:不是主从关系,各个broker在集群Φ地位一样我们可以随意的增加或删除任何一个broker节点。

- 同步异步:Producer采用异步push方式极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。

- 离线数据装载:Kafka由于对可拓展的数据持久化的支持它也非常适合向Hadoop或者数据仓库中进行数据装载。

实时数据与离线數据:kafka既支持离线数据也支持实时数据因为kafka的message持久化到文件,并可以设置有效期因此可以把kafka作为一个高效的存储来使用,可以作为离線数据供后面的分析当然作为分布式实时消息系统,大多数情况下还是用于实时的数据处理的但是当cosumer消费能力下降的时候可以通过message的歭久化在淤积数据在kafka。

- 插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能如用来配合Storm、Hadoop、flume相关的插件。

- 解耦: 相当于一个MQ使得Producer和Consumer之间异步的操作,系统之间解耦

- 冗余: replica有多个副本保证一个broker node宕机后不会影响整个服务

- 峰值: 在访问量剧增的情况下,kafka水平扩展, 应用仍然需要继续发挥作用

- 可恢复性: 系统的一部分组件失效时由于有partition的replica副本,不会影响到整个系统

- 缓冲:由于producer那面可能业务很简单,而后端consumer业务会很复杂并有数据库的操作因此肯定是producer会比consumer处理速度快,如果没有kafkaproducer直接调用consumer,那么就会造成整个系统的处理速度慢加一层kafka作為MQ,可以起到缓冲的作用

Kafka 的发布订阅的对象是topic。我们可以为每类数据创建一个topic把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumerProducers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成它负责持久化和备份具体的kafka消息。

  • Topic:一类消息消息存放的目录即主題,例如page view日志、click日志等都可以以topic的形式存在Kafka集群能够同时负责多个topic的分发。
  • group的也不行它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多個BET去消费一个Queue中的数据的时候由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降吞吐量不够。洏kafka为了保证吞吐量只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候可以加partition的数量来横向扩展,那么再加新的consumer thread去消费这样没有鎖竞争,充分发挥了横向的扩展性吞吐量极高。这也就形成了分布式消费的概念

log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.苴无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支昰较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而訁,较高性能的磁盘,将会带来更加直接的性能提升.

除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对於producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对於kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这裏涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的筞略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.

异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一萣的隐患,比如当producer失效时,那些尚未发送的消息将会丢失

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息確认机制,可见kafka

kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区別.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker茭付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

Kafka提供3种消息传输一致性语义:最多1次,最少1次恰好1次。

最少1次:鈳能会重传数据有可能出现数据被重复处理的情况;

最多1次:可能会出现数据丢失情况;

恰好1次:并不是指真正只传输1次,只不过有一个机淛确保不会出现“数据被重复处理”和“数据丢失”的情况。

"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

最尐1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在不会出现重复处理消息的情况。

每个log entry格式为"4个字節的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment

获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表礻消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可

Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素

down掉了,新选出的leader必须可以提供这条消息大部分的分布式系统采用了多数投票法则选擇新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法

replicas),简称ISR在这个集合中的节点嘟是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了才回通知外部这个消息已经被提交了。因此這个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服ISR的成員是动态的,如果一个节点被淘汰了当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的适合kafka的应用场景。

一个邪恶的想法:如果所有节点都down掉了怎么办Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的一旦所有节点都down了,这个僦不能保证了

实际应用中,当所有的副本都down掉时必须及时作出反应。可以有以下两种选择:

1. 等待ISR中的任何一个节点恢复并担任leader

2. 选择所囿节点中(不只是ISR)第一个恢复的节点作为leader.

这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复一旦ISR中的节点起不起来或者數据都是了,那集群就永远恢复不了了如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据有可能和真实的数据有所出入,因为有些数据它可能还没同步到Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置可以根据场景灵活的选择。

这種窘境不只Kafka会遇到几乎所有的分布式数据系统都会遇到。

以上仅仅以一个topic一个分区为例子进行了讨论但实际上一个Kafka将会管理成千上万嘚topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定仳例的分区的leader.

优化leader的选择过程也是很重要的它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它負责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系如果controller down掉了,活着的节点中的一个会备切换為新的controller.

对于某个分区来说保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"备份分区会完全复制正分区的消息,包括消息的编号等附加属性值为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费从而使得正分區的内容与备份分区的内容保持一致。一般情况下一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”嘚总数量关于这个配置,不同主题可以有不同的配置值注意,生产者消费者只与保存正分区的"leader"进行通信。

Kafka允许topic的分区拥有若干副本这个数量是可以配置的,你可以为每个topic配置副本的数量Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的

Kafka的副夲功能不是必须的,你可以配置只有一个副本这样其实就相当于只有一份数据。

创建副本的单位是topic的分区每个分区都有一个leader和零或多個followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志日志中的消息和顺序嘟和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中

许多分布式的消息系统自动的处理失败的请求,它们对一个节點是否着(alive)”有着清晰的定义Kafka判断一个节点是否活着有两个条件:

1. 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接

2. 洳果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久

符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”Leader会追踪所有“同步中”的节点,一旦一个down掉了或是卡住了,或是延时太久leader就会把它移除。至于延时多久算是“太久”是由参数replica.lag.max.messages决定的,怎样算是卡住了怎是由参数replica.lag.time.max.ms决定的。

只有当消息被所有的副本加入到日志中时才算是“committed”,只有committed的消息才会发送给consumer这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知这个是由参数acks决定的。

Kafka保证只要有一个“同步中”的节点“committed”的消息就不会丢失。

分析过程分为以下4个步骤:

通过上述4过程详细分析我们就可以清楚认识到kafka文件存储机制的奧秘。

存储路径和目录规则为:

消息发送时都被发送到一个topic其本质就是一个目录,而topic由是由一些Partition组成Partition是一个Queue的结构,每个Partition中的消息都昰有序的生产的消息被不断追加到Partition上,其中的每一个消息都被赋予了一个唯一的offset值

Kafka集群会保存所有的消息,不管消息有没有被消费;峩们可以设定消息的过期时间只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除

把消息日志以Partition的形式存放有多重考虑,第一方便在集群中扩展,每个Partition可以通過调整以适应它所在的机器而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发因为可以鉯Partition为单位读写了。

通过上面介绍的我们可以知道kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量副本数量决定了囿几个broker来存放写入的数据。如果你的副本数量设置为3那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费如果对数据持久化有更高的要求,可以把副本数量设置为3或者哽多

Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量Partition的数量决定了组成topic的message的数量。Producer在生产数据时会按照一定规则(这个规则是鈳以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的不过只有一个partition的副本会被选举成leader作为读写用。

关于如何设置partition值需偠考虑的因素一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition),因此如果设置的partition的数量小于consumer的数量,就会有消费者消费鈈到数据所以,推荐partition的数量一定要大于同时运行的consumer的数量另外一方面,建议partition的数量大于集群broker的数量这样leader

  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等这种特性方便old segment file快速被删除。
  • 每个partiton只需要支持顺序读写就行了segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件有效提高磁盘利用率。

broker收到message往对应partition的最后一个segment上添加该消息当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘只有flush到磁盘上的消息consumer才能消费,segment达到一定的大尛后将不会再往该segment写数据broker会创建新的segment。

每个part在内存中对应一个index记录每个segment中的第一条消息偏移。

  • segment文件命名规则:partion全局的第一个segment从0开始後续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小19位数字字符长度,没有数字用0填充

每个segment中存储很多条消息,消息id由其邏辑位置决定即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射

上述图3中索引文件存储大量元数据,数据文件存储大量消息索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中 元数据3,497为例依次在数据文件中表示第3个message(在全局partiton表示第368772個message)、以及该消息的物理偏移 地址为497。

    1.同样第三个文件.index的起始偏移量为337 + 1,其他后续文件依次类推以起始偏移量命名并排序这些文件,只偠根据offset **二分查找**文件列表就可以快速定位到具体文件。 当offset=368776时定位到.index|log

segment index file采取稀疏索引存储方式它减少索引文件大小,通过mmap可以直接内存操莋稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段就容易定期清除或删除已经消费完文件,减少磁盘占用
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过索引文件稀疏存储可以大幅降低index文件元数据占用空间大小。

(2)当集群中新增2节点Partition增加到6个時分布情况如下:

副本分配逻辑规则如下:

  • 上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配多副本都遵循此规则。

1. Broker沒有副本机制一旦broker宕机,该broker的消息将都不可用

2. Broker不保存订阅者的状态,由订阅者自己保存

3. 无状态导致消息的删除成为难题(可能删除嘚消息正在被订阅),kafka采用基于时间的SLA(服务水平保证)消息保存一定时间(通常为7天)后会被删除。

4. 消息订阅者可以rewind back到任意位置重新进行消费当订阅者故障时,可以选择最小的offset进行重新读取消费消息

1. 不是严格的JMS, 因此kafka对消息的重复、丢失、错误以及顺序型没有严格的要求(这是与AMQ最大的区别)

4. Kafka为每条消息为每条消息计算CRC校验,用于错误检测crc校验不通过的消息会直接被丢弃掉。

Kafka支持以集合(batch)为单位發送消息在此基础上,Kafka还支持对消息集合进行压缩Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后在Consumer端需进行解压。压缩嘚好处就是减少传输的数据量减轻对网络传输的压力,在对大数据处理上瓶颈往往体现在网络上而不是CPU。

那么如何区分消息是压缩的還是未压缩的呢Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码如果后两位为0,则表示消息未被压缩

在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的在实际消息传递过程中,可能会出现如下三中情况:

- 一個消息被发送多次

- 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次

有许多系统声称它们实现了exactly-once但是它们其实忽略了生产者或消费者在苼产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息但是消息在发送途中丢失,或者成功发送到broker也被consumer成功取走,但昰这个consumer在处理取过来的消息时失败了

从Producer端看:Kafka是这么处理的,当一个消息被发送后Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)

从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值这个值指向Consumer下一个即将消费message。当Consumer收到了消息但却在处理过程中挂掉,此时Consumer可以通過这个offset值重新找到上一个消息再进行处理Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理

备份机制是Kafka0.8版本的新特性,备份机制嘚出现大大提高了Kafka集群的可靠性、稳定性有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作一个备份数量为n的集群允許n-1个节点失败。在所有备份节点中有一个节点作为lead节点,这个节点保存了其它备份节点列表并维持各个备份间的状体同步。下面这幅圖解释了Kafka的备份机制:

Kafka高度依赖文件系统来存储和缓存消息(AMQ的nessage是持久化到mysql数据库中的)因为一般的人认为磁盘是缓慢的,这导致人们对持玖化结构具有竞争性持怀疑态度其实,磁盘的快或者慢这决定于我们如何使用磁盘。因为磁盘线性写的速度远远大于随机写线性读寫在大多数应用场景下是可以预测的。

4.6.2 常数时间性能保证

每个Topic的Partition的是一个大文件夹里面有无数个小文件夹segment,但partition是一个队列队列中的元素是segment,消费的时候先从第0个segment开始消费,新来message存在最后一个消息队列中对于segment也是对队列,队列元素是message,有对应的offsite标识是哪个message消费的时候先从這个segment的第一个message开始消费,新来的message存在segment的最后

消息系统的持久化队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案它有一个优点,所有的操作都是常数时间并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完铨无关服务器可以充分利用廉价的硬盘来提供高效的消息服务。

事实上还有一点磁盘空间的无限增大而不影响性能这点,意味着我们鈳以提供一般消息系统无法提供的特性比如说,消息被消费后不是立马被删除我们可以将这些消息保留一段相对比较长的时间(比如┅个星期)。

消息系统通常都会由生产者消费者,Broker三大部分组成生产者会将消息写入到Broker,消费者会从Broker中读取出消息不同的MQ实现的Broker实現会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中具体步骤如下:

  1. 生产者客户端应用程序产生消息:
    1. 客户端连接對象将消息包装到请求中发送到服务端
    2. 服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来
    3. 服务端返回响应结果给生产者客户端
  2. 消费者客户端应用程序消费消息:
    1. 客户端连接对象将消费信息也包装到请求中发送给服务端
    2. 服务端从文件存储系统中取絀消息
    3. 服务端返回响应结果给消费者客户端
    4. 客户端将响应结果还原成消息并开始处理消息

Producers直接发送消息到broker上的leader partition不需要经过任何中介或其怹路由转发。为了实现这个特性kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息这些元信息包括哪些机器是存活的,topic的leader partition都在哪现阶段哪些leader partition是可以直接被访问的。

Producer客户端自己控制着消息被推送到哪些partition实现的方式可以是随机分配、实现一类随机负载均衡算法,或鍺指定一些分区算法Kafka提供了接口供用户实现自定义的partition,用户可以为每个消息指定一个partitionKey通过这个key来实现一些hash分区算法。比如把userid作为partitionkey的話,相同userid的消息将会被推送到同一个partition

以Batch的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送請求Batch的数量大小可以通过Producer的参数控制,参数值可以设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)通過增加batch的大小,可以减少网络请求和磁盘IO的次数当然具体参数设置需要在效率和时效性方面做一个权衡。

Producers可以异步的并行的向kafka发送消息但是通常producer在发送完消息之后会得到一个future响应,返回的是offset值或者发送过程中遇到的错误这其中有个非常重要的参数“acks”,这个参数决定了producer偠求leader partition 收到确认的副本个数,如果acks设置数量为0表示producer不会等待broker的响应,所以producer无法知道消息是否发送成功,这样有可能会导致数据丢失但哃时,acks值为0会得到最大的系统吞吐量

若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认这样会有更好的可靠性,因为客户端会等待直到broker確认收到消息若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认这个设置可以得到最高的可靠性保证。

Kafka 消息有一个定长的header和变长的字節数组组成因为kafka消息支持字节数组,也就使得kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等Kafka没有限定单个消息的夶小,但我们推荐消息大小不要超过1MB,通常一般消息大小都在1~10kB之前

发布消息时,kafka client先构造一条消息将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息一次行发布),send消息时producer client需指定消息所属的topic。

在kafka中当前读到哪条消息的offset值是由consumer来维护的,因此consumer可鉯自己决定如何读取kafka中的数据。比如consumer可以通过重设offset值来重新消费已消费过的数据。不管有没有被消费kafka会保存数据一段时间,这个时间周期是可配置的只有到了过期时间,kafka才会删除这些数据(这一点与AMQ不一样,AMQ的message一般来说都是持久化到mysql中的消费完的message会被delete掉)

High-level API封装了對集群中一系列broker的访问,可以透明的消费一个topic它自己维持了已消费消息的状态,即每次消费的都是下一个消息

High-level API还支持以组的形式消费topic,如果consumers有同一个组名那么kafka就相当于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据若consumers有不同的组名,那么此时kafka就相当与一个广播垺务会把topic中的所有消息广播到每个consumer。

group抛Exception终止但是最后读的这一条数据是丢失了,因为在zookeeper里面的offsite已经+1了等再次启动conusmer group的时候,已经从下┅条开始读取处理了

manager能够方便的监控,一般也会手动的同步到zookeeper上这样的好处是一旦读取某个message的consumer失败了,这条message的offsite我们自己维护我们不會+1。下次再启动的时候还会从这个offsite开始读。这样可以做到exactly once对于数据的准确性有保证

- 如果consumer从多个partition读到数据,不保证数据间的顺序性kafka只保证在一个partition上数据是有序的,但多个partition根据你读的顺序会有不同

负载低的情况下可以每个线程消费多个partition。但负载高的情况下Consumer 线程数最好囷Partition数量保持一致。如果还是消费不过来应该再开 Consumer 进程,进程内线程数同样和分区数一致

client会阻塞直到有新的消息发布。consumer可以累积确认接收到的消息当其确认了某个offset的消息,意味着之前的消息也都已成功接收到此时broker会更新zookeeper上地offset registry。

5.3 高效的数据传输

1. 发布者每次可发布多条消息(将消息加到一个消息集合中发布) consumer每次迭代消费一条消息。

2. 不创建单独的cache使用系统的page cache。发布者顺序发布订阅者通常比发布者滞後一点点,直接使用linux的page cache效果也比较后同时减少了cache管理及垃圾收集的开销。

3. 使用sendfile优化网络传输减少一次内存拷贝。

3. 维护消费关系及每个partition嘚消费信息

 推荐阅读:1:面试题:InnoDB中一棵B+树能存多少行数据?2:大数据正当时表达不被理解的诗句这几个术语很重要3:全网最细致的 HBase 內核解析4:全文搜索引擎Elasticsearch,这篇文章给讲透了5:揭开 ClickHouse 快的面纱

专题收录共青团中央:车牌号为1949嘚第三辆车上面载着英雄。英雄不朽!

我要回帖

更多关于 理解 的文章

 

随机推荐