第四章,Kafka消费者——从Kafka读取数据
4.1.1 消费者和消费者群组
Kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
~应该要知道。
4.1.2 消费者群组和分区再均衡
消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
~了解它对工作有帮助。
分配分区是怎样的一个过程?当消费者要加入群组时,它会向群组协调器发送一个Join-Group请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
~分区的分配过程,非常重要。
4.4 轮询
消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的API来处理从分区返回的数据。
~这样告诉我了轮询做了哪些事情。
4.5 消费者的配置
max.partition.fetch.bytes的值必须比broker能够接收的最大消息的字节数(通过max.message.size属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll()方法来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把max.partition.fetch.bytes值改小,或者延长会话过期时间。
~这个参数要特别注意。
session.timeout.ms该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。如果session.timeout.ms是3s,那么heartbeat.interval.ms应该是1s。
~需要特别注意的参数。
partition.assignment.strategy我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka有两个默认的分配策略。
Range该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1和消费者C2同时订阅了主题T1和主题T2,并且每个主题有3个分区。那么消费者C1有可能分配到这两个主题的分区0和分区1,而消费者C2分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。
RoundRobin该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin策略来给消费者C1和消费者C2分配分区,那么消费者C1将分到主题T1的分区0和分区2以及主题T2的分区1,消费者C2将分配到主题T1的分区1以及主题T2的分区0和分区2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
~就是两种策略:范围(主题内的分区),轮询(所有的分区)。
4.6 提交和偏移量
那么消费者是如何提交偏移量的呢?消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
~我认为了解这个很重要。
4.6.2 提交当前偏移量
把auto.commit.offset设为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单也最可靠。这个API会提交由poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
要记住,commitSync()将会提交由poll()返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。
~需要了解。
4.6.4 同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
~我觉得利用同步和异步组合提交的方式就可以解决问题,并不需要实现一个机制让异步提交在失败的情况下重试。
4.8 从特定偏移量处开始处理记录
通过把偏移量和记录保存到同一个外部系统来实现单次语义可以有很多种方式,不过它们都需要结合使用ConsumerRebalanceListener和seek()方法来确保能够及时保存偏移量,并保证消费者总是能够从正确的位置开始读取消息。
~这是一个可以在工作应用的方法,但是这样做有什么好处呢?也就是说如果利用外部系统(比如数据库)保存偏移量和记录,那么就可以将数据处理和保存偏移量放在同一个事务中,这样就可以保证保存偏移量和记录的原子性。
4.9 如何退出
如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()方法。如果循环运行在主线程里,可以在ShutdownHook里调用该方法。要记住,consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。调用consumer.wakeup()可以退出poll(),并抛出WakeupException异常,或者如果调用consumer.wakeup()时线程没有等待轮询,那么异常将在下一轮调用poll()时抛出。我们不需要处理WakeupException,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。
~也即是如何让消费者不再去获取消息,上面所说就是可以可行的方法。