这里是文章模块栏目内容页
消息队列处理消息不丢失不重复和有序性的常用思路

如何保证消息不丢失

对于常见的消息队列,一般只要配置得当,是不会丢失消息的。先看看这个关系图:

一共有生产消息、存储消息、消费消息三个阶段,下面就以这三个阶段入手看看如何保证消息不丢失。

生产消息

生产者发送消息至Borker,需要对Broket的响应进行处理,不管是同步发送消息还是异步发送消息,都需要做好try-catch对Broket的响应做出妥善的处理,如果Broket返回写入消息失败等其他错误信息,生产者需要重新发送消息至Broket,如果多次发送失败则需要报警并且记录日志。这样可以保证消息在生产阶段不会丢失。

存储消息

存储消息阶段需要等待消息写入磁盘(消息刷盘)之后再给生产者做出响应。因为如果消息仅仅写入了内存就给生产者响应的话,这个时候如果断电导致机器停了,那么消息也就没了,但是生产者却以为消息已经存储成功了。

如果Borket是集群部署,有多副本机制,那么消息不仅仅要写入当前Broket,还需要写入副本机器中,那么必须等待消息写入两台机器之后再给生产者做出相应,这样就可以保证消息在存储阶段不丢失了。

如果两台机器都挂了呢?那就加!三台!四台!五台!如果整个机房地震了,全挂呢咋办!先关心一下人有没有事好伐!这种情况,异地部署吧!没招!不要杠精说整个地球地震了咋办!


消费消息

消费者拿到消息之后,等他真正执行完逻辑之后,也就是处理完消息之后,再给Borket做出相应,如果在消费者刚拿到消息就做相应的话,消费者宕机了,那这消息就没了!这样就可以保证消息在消费阶段不会丢失了。

小结一下

保证消息不丢失,需要三方共同配合:

生产者需要做好对Borket相应的处理,异常处理、错误重试、报警机制、日志记录。

Broket需要把握好相应的时机,在单机情况下等待消息刷盘之后再做相应,集群部署多副本情况下,保证消息发送至两个以及以上副本的时候再做相应。

消费者需要真正处理完消息之后(完成所有业务逻辑处理)在给Broket进行相应。

注:以上这些操作,虽然是保证了消息的可靠性,保证了消息不丢失,但是程序性能会降低很多。

如何处理重复消息

Jesse : 在讲这个问题之前,我觉得还是有必要先说一下,消息重复是怎么出现的

上一个问题已经说过,为了保证消息不丢失,在生产消息阶段生产者需要等待Broket的响应,如果在Briket做出相应的时候,网络出现问题,导致生产者没有收到相应,那么生产者会再次重复发送,这样Broket中就会有两条重复的消息了!

再看消费消息阶段,如果一个消费者已经处理完了消息,业务逻辑已经处理完,事物也提交了,准备更新Consumer offset了,就在这时消费者挂了,也就是更新Consumer offset还没更新成功,所以此时另一个消费者还是会拿到刚才那条消息再重复执行一次,这就造成了消息重复消费!

Jesse : 从正常的业务流程来看,消息重复好像是不可避免的,因为我们总不能为了解决消息重复的问题又导致消息丢失的问题吧!那么我们换个角度去思考这个问题。既然我们不能避免消息的重复,那么我们就对因为消息重复带来的业务上面的影响进行处理,关键点就在于幂等。


什么叫做幂等

学过数学的都知道,这是一个数学的概念。但是在代码的角度来思考,何为幂等,其实我们可以这样简单理解,幂等就是使用同样的参数多次调用同一个接口和一次调用同一个接口执行结果是一样的、举个例子:

执行一条SQL语句:update table money = 100 where id = 1 and money = 50;

对于这条SQL语句,不管执行多少次,money的值都是150,这个就叫做幂等。

几个常用的实现幂等的套路

我们现在要做的,就是对业务逻辑进行改造处理,实现幂等,从而保证就算消息重复了也不会影响最终的结果。怎么做呢?常见的套路主要有以下这些:

提交结果的时候,做一个前置的判断,就比如上面的money=50。

加一个version,利用版本号机制,对比消息中的版本号和数据库中的版本号是否相等。

使用数据库的约束列,比如唯一键。

记录关键的key,比如保存订单处理数据的时候,如果有重复消息,就先判断一下这个ID的订单是否已经被处理过了,如果没有被处理过再进行后面的业务逻辑。

方法绝不仅仅只有这些,如何实现还是要看业务的具体细节,根据业务逻辑而定。

如何保证消息的有序性

有序性分为全局有序和部分有序。

全局有序

必须只有一个生产者往Topic中发送消息,并且Topic中只有一个队列(分区),消费者必须单线程消费这个队列(分区)中的消息,这样消息就是全局有序的,但是一般我们不需要全局有序。

部分有序

绝大多数时候我们都是使用部分有序,将Topic内部划分成我们指定数量的队列(分区),然后通过特定的策略将消息发送给指定的队列,然后每个队列对应一个单线程的消费者去消费队列中的信息,这样既可以实现消息的部分有序,还可以通过Topic中队列的数量提高并发处理消息的效率。

注:这里的生产者也可以是一个,只要根据特定策略将同类消息发送到指定队列即可。

如何处理消息堆积

消息堆积往往是因为生产者的生产能力和消费者的消费能力不匹配造成的,有可能是因为消费者消费消息出现错误反复重试导致,也有可能是消费者消费能力太弱导致。要解决消息堆积的问题,首先要确定消费慢的原因,如果是因为消费错误导致反复重试,那么就先解决代码中的bug。如果是因为消费者消费能力太弱,就对消息处理的业务逻辑代码进行优化,如果优化之后还是消费慢,那么就进行扩容,也就是增加Topic中队列的数量和消费者的数量。

注:增加一个消费者就必须增加一个队列,否则消费者是没有东西消费的,在同一个Topic中,一个队列只会分配给一个消费者。

 

作者:让我来处理高并发
链接:https://www.jianshu.com/p/4e8c80936d40
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。