第6章,可靠的数据传递
分区首领是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步的。 •与Zookeeper之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向Zookeeper发送过心跳。 •在过去的10s内(可配置)从首领那里获取过消息。 •在过去的10s内从首领那里获取过最新的消息。光从首领那里获取消息是不够的,它还必须是几乎零延迟的。
~了解一下有帮助。
如果一个或多个副本在同步和非同步状态之间快速切换,说明集群内部出现了问题,通常是Java不恰当的垃圾回收配置导致的。不恰当的垃圾回收配置会造成几秒钟的停顿,从而让broker与Zookeeper之间断开连接,最后变成不同步的,进而发生状态切换。
~当做一个提醒吧,或许我也会遇到同类的问题。
6.3.2 不完全的首领选举
简而言之,如果我们允许不同步的副本成为首领,那么就要承担丢失数据和出现数据不一致的风险。如果不允许它们成为首领,那么就要接受较低的可用性,因为我们必须等待原先的首领恢复到可用状态。
如果把unclean.leader.election.enable设为true,就是允许不同步的副本成为首领(也就是“不完全的选举”),那么我们将面临丢失消息的风险。如果把这个参数设为false,就要等待原先的首领重新上线,从而降低了可用性。我们经常看到一些对数据质量和数据一致性要求较高的系统会禁用这种不完全的首领选举(把这个参数设为false)。银行系统是这方面最好的例子,大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支付事务,也不会冒险处理错误的消息。不过在对可用性要求较高的系统里,比如实时点击流分析系统,一般会启用不完全的首领选举。
~如何选择主要看业务需要。
6.3.3 最少同步副本
如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点的值。对于一个包含3个副本的主题,如果min.insync.replicas被设为2,那么至少要存在两个同步副本才能向分区写入数据。
如果3个副本都是同步的,或者其中一个副本变为不可用,都不会有什么问题。不过,如果有两个副本变为不可用,那么broker就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException异常。消费者仍然可以继续读取已有的数据。实际上,如果使用这样的配置,那么当只剩下一个同步副本时,它就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。为了从只读状态中恢复,必须让两个不可用分区中的一个重新变为可用的(比如重启broker),并等待它变为同步的。
~这是一个重要的概念,需要了解。
6.4.1 发送确认
acks=all意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。如果和min.insync.replicas参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。这是最保险的做法——生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量。
~我想一般项目都会采取这种方式,这样确保在分区首领再选举时消息不会丢失。当然这样减低了消息的吞吐量,但是可以采取异步模式或更大的批次来加快速度。
6.4.2 配置生产者的重试参数
一般情况下,如果你的目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时能够保持重试。为什么要这样?因为像首领选举或网络连接这类问题都可以在几秒钟之内得到解决,如果让生产者保持重试,你就不需要额外去处理这些问题了。经常会有人问:“为生产者配置多少重试次数比较好?”这个要看你在生产者放弃重试并抛出异常之后想做些什么。如果你想抓住异常并再多重试几次,那么就可以把重试次数设置得多一点,让生产者继续重试;如果你想直接丢弃消息,多次重试造成的延迟已经失去发送消息的意义;如果你想把消息保存到某个地方然后回过头来再继续处理,那就可以停止重试。
~一个好的编程建议。
要注意,重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致消息重复。现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”,也就是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。例如,消息“这个账号里有110美元”就是幂等的,因为即使多次发送这样的消息,产生的结果都是一样的。不过消息“往这个账号里增加10美元”就不是幂等的。
~一个处理消息重复的办法,其实实际的项目当中也通常是这样做的。
6.5.1 消费者的可靠性配置
auto.offset.reset,这个参数指定了在没有偏移量可提交时(比如消费者第1次启动时)或者请求的偏移量在broker上不存在时,消费者会做些什么。这个参数有两种配置。一种是earliest,如果选择了这种配置,消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest,如果选择了这种配置,消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
~好像这两种方式都不太好。但是在没有偏移量可参考下,这也是一个可行的做法。
自动提交的主要缺点是,无法控制重复处理消息(比如消费者在自动提交偏移量之前停止处理消息),而且如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。
~也就是说自动提交会出现消息还没有处理完,偏移量就已经提交了。或者消息已经处理了一部分,但是消息的偏移量没有提交。
6.5.2 显式提交偏移量
第一种模式,在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把还没有处理好的消息保存到缓冲区里(这样下一个轮询就不会把它们覆盖掉),调用消费者的pause()方法来确保其他的轮询不会返回数据(不需要担心在重试时缓冲区溢出),在保持轮询的同时尝试重新处理。如果重试成功,或者重试次数达到上限并决定放弃,那么把错误记录下来并丢弃消息,然后调用resume()方法让消费者继续从轮询里获取新数据。
第二种模式,在遇到可重试错误时,把错误写入一个独立的主题,然后继续。一个独立的消费者群组负责从该主题上读取错误消息,并进行重试,或者使用其中的一个消费者同时从该主题上读取错误消息并进行重试,不过在重试时需要暂停该主题。这种模式有点像其他消息系统里的dead-letter-queue。
~两个方法都行。第一种模式中“关于为什么不能停止轮询”,我想主要是要提交心跳,不然broker会认为是此消费者已经停止工作,就会触发分区再分配。至于在第二种模式中“在重试时需要暂停该主题”这一句,不能理解。
有时候处理数据需要很长时间:你可能会从发生阻塞的外部系统获取信息,或者把数据写到外部系统,或者进行一个非常复杂的计算。要记住,暂停轮询的时间不能超过几秒钟。即使不想获取更多的数据,也要保持轮询,这样客户端才能往broker发送心跳。在这种情况下,一种常见的做法是使用一个线程池来处理数据,因为使用多个线程可以进行并行处理,从而加快处理速度。在把数据移交给线程池去处理之后,你就可以暂停消费者,然后保持轮询,但不获取新数据,直到工作线程处理完成。在工作线程处理完成之后,可以让消费者继续获取新数据。因为消费者一直保持轮询,心跳会正常发送,就不会发生再均衡。
~我有一个疑问,暂停消费者轮询完了之后,如何知道数据已经处理完成了呢?要么就是要持有任务的返回的Future,或者是使用另外一个叫什么服务的类,看来还得去查查并发编程那本书了。
6.6.3 在生产环境监控可靠性
首先,Kafka的Java客户端包含了JMX度量指标,这些指标可以用于监控客户端的状态和事件。对于生产者来说,最重要的两个可靠性指标是消息的error-rate和retry-rate(聚合过的)。如果这两个指标上升,说明系统出现了问题。除此以外,还要监控生产者日志——发送消息的错误日志被设为WARN级别,可以在“Got error produce response with correlation id 5689 ontopic-partition [topic-1,3], retrying (two attempts left). Error: …”中找到它们。如果你看到消息剩余的重试次数为0,说明生产者已经没有多余的重试机会。
对于消费者来说,最重要的指标是consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想情况下,该指标总是为0,消费者总能读到最新的消息。不过在实际当中,因为poll()方法会返回很多消息,消费者在获取更多数据之前需要花一些时间来处理它们,所以该指标会有些波动。关键是要确保消费者最终会赶上去,而不是越落越远。
~两个非常重要的监控指标。