rabbitmq 没有消费者消费者不够用怎么办

RabbitMQ(四):分发到多消费者
时间: 19:13:04
&&&& 阅读:187
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&& &这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 我们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。
1. Exchanges
&&&&& 关于exchange的概念在《》中有详细介绍。现在做一下简单的回顾。
&&& & RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。
&& & &Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到那个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。
&&&& 我们知道有三种类型的Exchange:direct,
topic 和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。创建一个名字为logs,类型为fanout的Exchange:
channel.exchange_declare(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&&&&&&&type=‘fanout‘)&&
&span style=&font-size:18&&channel.exchange_declare(exchange='logs',
type='fanout')&/span&
Listing exchanges
通过rabbitmqctl可以列出当前所有的Exchange:
$&sudo&rabbitmqctl&list_exchanges&&Listing&exchanges&...&&logs&&&&&&fanout&&amq.direct&&&&&&direct&&amq.topic&&&&&&&topic&&amq.fanout&&&&&&fanout&&amq.headers&&&&&headers&&...done.&&
&span style=&font-size:18&&$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct
amq.fanout
amq.headers
...done.&/span&
注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。
现在我们可以通过exchange,而不是routing_key来publish Message了:
channel.basic_publish(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&&&&routing_key=‘‘,&&&&&&&&&&&&&&&&&&&&&&&&body=message)&&
&span style=&font-size:18&&channel.basic_publish(exchange='logs',
routing_key='',
body=message)&/span&
2. Temporary queues
&&&& &截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
&&&& 但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
&&& 1) 每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
result&=&channel.queue_declare()&&
&span style=&font-size:18&&result = channel.queue_declare()&/span&&&&& &通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
&&& 2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
result&=&channel.queue_declare(exclusive=True)&&
&span style=&font-size:18&&result = channel.queue_declare(exclusive=True)&/span&
3. Bindings绑定
现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。
channel.queue_bind(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&queue=result.method.queue)&&
&span style=&font-size:18&&channel.queue_bind(exchange='logs',
queue=result.method.queue)&/span&现在logs的exchange就将它的Message附加到我们创建的queue了。
Listing bindings
使用命令rabbitmqctl list_bindings。
4. 最终版本
&&& 我们最终实现的数据流图如下:
Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
&&import&pika&&import&sys&&&&connection&=&pika.BlockingConnection(pika.ConnectionParameters(&&&&&&&&&&host=‘localhost‘))&&channel&=&connection.channel()&&&&channel.exchange_declare(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&&&&&&&type=‘fanout‘)&&&&message&=&‘&‘.join(sys.argv[1:])&or&&info:&Hello&World!&&&channel.basic_publish(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&&&&routing_key=‘‘,&&&&&&&&&&&&&&&&&&&&&&&&body=message)&&print&&&[x]&Sent&%r&&%&(message,)&&connection.close()&&
&span style=&font-size:18&&#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or &info: Hello World!&
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print & [x] Sent %r& % (message,)
connection.close()&/span&
还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
&&import&pika&&&&connection&=&pika.BlockingConnection(pika.ConnectionParameters(&&&&&&&&&&host=‘localhost‘))&&channel&=&connection.channel()&&&&channel.exchange_declare(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&&&&&&&type=‘fanout‘)&&&&result&=&channel.queue_declare(exclusive=True)&&queue_name&=&result.method.queue&&&&channel.queue_bind(exchange=‘logs‘,&&&&&&&&&&&&&&&&&&&&&queue=queue_name)&&&&print&‘&[*]&Waiting&for&logs.&To&exit&press&CTRL+C‘&&&&def&callback(ch,&method,&properties,&body):&&&&&&print&&&[x]&%r&&%&(body,)&&&&channel.basic_consume(callback,&&&&&&&&&&&&&&&&&&&&&&&&queue=queue_name,&&&&&&&&&&&&&&&&&&&&&&&&no_ack=True)&&&&channel.start_consuming()&&
&span style=&font-size:18&&#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print & [x] %r& % (body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()&/span&我们开始不是说需要两个Consumer吗?一个负责记录到文件;一个负责打印到屏幕?
其实用重定向就可以了,当然你想修改callback自己写文件也行。我们使用重定向的方法:
We‘re done. If you want to save logs to a file, just open a console and type:
&span style=&font-size:18&&$ python receive_logs.py & logs_from_rabbit.log&/span&
$ python receive_logs.py$ python emit_log.py
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&原文:http://blog.csdn.net/u/article/details/
教程昨日排行
&&国之画&&&& &&&&&&
&& &&&&&&&&&&&&&&
鲁ICP备号-4
打开技术之扣,分享程序人生!Rabbitmq堆积消息后生产速率降低的问题分析及应对措施_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
Rabbitmq堆积消息后生产速率降低的问题分析及应对措施
总评分4.1|
浏览量14778
用知识赚钱
阅读已结束,下载文档到电脑
想免费下载更多文档?
定制HR最喜欢的简历
你可能喜欢RabbitMQ消费者的几个参数 - 简书
RabbitMQ消费者的几个参数
分布式消息中间件
RabbitMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。
在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:
Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;direct类型的Exchange是最直接最简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)
Concurrency与Prefetch
在通常的使用中(Java项目),我们一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:
&rabbit:connection-factory id="connectionFactory" /&
&!-- template非必须,主要用于生产者发送消息--&
&rabbit:template id="template" connection-factory="connectionFactory" /&
&rabbit:queue name="remoting.queue" /&
&rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3"&
&rabbit:listener ref="listener" queue-names="remoting.queue" /&
&/rabbit:listener-container&
listener-container可以设置消费者在监听Queue的时候的各种参数,其中concurrency和prefetch是本篇文章比较关心的两个参数,以下是spring-amqp文档的解释:
prefetchCount(prefetch)The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode is NONE. This will be increased, if necessary, to match the txSize
concurrentConsumers(concurrency)The number of concurrent consumers to initially start for each listener.
简单解释下就是concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数,上面的配置在监控后台看到的效果如下:
图中可以看出有两个消费者同时监听Queue,但是注意这里的消息只有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值3,意味着每个消费者每次会预取3个消息准备消费。每个消费者对应的listener有个Exclusive参数,默认为false, 如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。
这里concurrency的实现方式不看源码也能猜到,肯定是用多线程的方式来实现的,此时同一进程下打开的本地端口都是56278.下面看看listener-contaner对应的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源码:
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashMap&BlockingQueueConsumer, Boolean&(this.concurrentConsumers);
for (int i = 0; i & this.concurrentC i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.put(consumer, true);
container启动的时候会根据设置的concurrency的值(同时不超过最大值)创建n个BlockingQueueConsumer。
protected void doStart() throws Exception {
//some code
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();
//some code
Set&AsyncMessageProcessingConsumer& processors = new HashSet&AsyncMessageProcessingConsumer&();
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.taskExecutor.execute(processor);
//some code
在doStart()方法中调用initializeConsumers来初始化所有的消费者,AsyncMessageProcessingConsumer作为真实的处理器包装了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其实实现了Runnable接口,由this.taskExecutor.execute(processor)来启动消费者线程。
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueC
private final CountDownL
private volatile FatalListenerStartupException startupE
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer =
this.start = new CountDownLatch(1);
//some code
public void run() {
//some code
那么prefetch的值意味着什么呢?其实从名字上大致能看出,BlockingQueueConsumer内部应该维护了一个阻塞队列BlockingQueue,prefetch应该是这个阻塞队列的长度,看下BlockingQueueConsumer内部有个queue,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息:
private final BlockingQueue&Delivery&
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter&BlockingQueueConsumer& activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map&String, Object& consumerArgs, boolean exclusive, String... queues) {
//some code
this.queue = new LinkedBlockingQueue&Delivery&(prefetchCount);
BlockingQueueConsumer的构造函数清楚说明了每个消费者内部的队列大小就是prefetch的大小。
前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。
阅读/写作/编程
偶尔的碎片化思考会记录在微博 @王鸿缘VRabbitMQ不同Confirm模式下的性能对比 - 为程序员服务
RabbitMQ不同Confirm模式下的性能对比
前几天看到一篇文章『
』,作者比较客观地分析了多种分布式消息服务器间集群和消息可靠传输机制,比对了各自的性能情况,他的测试场景为:
1. 分布式队列,节点间数据复制(同步、异步)
2. 消息可靠性等级最高(持久化、ack等)
由于不同的消息服务器实现原理不同,会造成集群节点间数据复制代价和消息可靠性上的差异,最终文章给出的基准性能测试数据(消息吞吐量维度)总结如下图:
从上图来看,Kafka毫无争议的拥有最大的消息吞吐量。但是RabbitMQ的数据却是有点反直觉,因为之前给人的感觉RabbitMQ作为一款工业级的消息队列服务器,虽说不是靠高性能扬名,但也不至于让性能问题成为累赘。
网上的基准测试结果只能作为参考,不能作为技术选型的依据。当我们看到某类产品(Web服务器、缓存、队列、数据库等)的一组性能测试数据时,首先要了解以下三点:
1. 作者是否利益相关,利益相关往往导致给出的数据和结论偏向自家产品。
2. 作者是否能hold得住不同产品的技术细节,有时候一个参数值的优化会影响到产品的表现。
3. 具体的测试场景和参数。这个不用多解释了,每个产品都有自己擅长和不擅长的使用场景。
对于上述的第2点和第3点,本质上是由于信息不对称造成的。本文的目的不是为了质疑那篇文章的结论,而是借此分析一下是否可以通过一些客户端程序优化来提升RabbitMQ性能。
之前自己也做过RabbitMQ的性能测试,对于固定消息体大小和线程数,如果消息持久化、生产者confirm、消费者ack三个参数中开启消息持久化和生产者confirm,那么对性能影响相当致命,能够衰减一个数量级,吞吐量甚至会退化到几百msg/s。
消息持久化的优化没太好方法,用更好更快的物理存储(SAS,SSD,RAID卡)总会带来改善的。生产者confirm这一环节的优化则主要在于客户端程序的优化上。归纳起来,客户端实现生产者confirm有三种编程方式:
1. 普通confirm模式。每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
2. 批量confirm模式。每次发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
3. 异步confirm模式。提供一个回调方法,服务器端confirm了一条(或多条)消息后SDK会回调这个方法。
从编程实现的复杂度上来看:
第1种普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务器端返回false或者超时时间内未返回,客户端进行消息重传。
第2种批量confirm模式稍微复杂一点,客户端程序需要定期(每x秒)或定量(每x条)或者两者结合来pubish消息,然后等待服务器端confirm。相比普通confirm模式,批量可以极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
第3种异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号),我们需要自己为每个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率角度上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK里的waitForConfirms()方法也是通过SortedSet维护消息序号的。
我写了一个简单的RabbitMQ生产者confirm环节性能测试程序放在了
上,它实现了上述三种confirm模式,并且有丰富的参数可以配置,比如生产者数量、消费者数量、消息体大小、消息持久化、生产者confirm、消费者ack
,可以根据使用场景组合。以下的讨论都是基于这个测试程序跑出来的结果。
首先是测试环境:
OS: OSX 10.10, Darwin 14.0.0, x86_64
Erlang: R16B03-1 (erts-5.10.4)
RabbitMQ: 3.5.1
CPU: 2.5 GHz Intel Core i5
Disk: SATA
Message Size: 1000 Bytes
一、单线程,
消息持久化和消费者ack。
批量,50 msg/批
批量,100 msg/批
批量,200 msg/批
2931 msg/s
6581 msg/s
7019 msg/s
7563 msg/s
8550 msg/s
可见,单线程跑时批量和异步confirm甩开普通confirm一大截了。严格来讲,异步confirm不存在单线程模式,因为回调handleAck()方法的线程和publish消息的线程不是同一个。
二、多线程,
消息持久化和消费者ack。
多线程下普通confirm模式:
2110 msg/s
2353 msg/s
2477 msg/s
多线程下批量confirm模式:
100线程,50 msg/批
100线程,100 msg/批
100线程,500 msg/批
500线程,100 msg/批
3828 msg/s
3551 msg/s
3567 msg/s
3829 msg/s
多线程下异步confirm模式:
3621 msg/s
3378 msg/s
2842 msg/s
以上是不同线程数量的维度下,相同confirm模式的性能数据,大致来看,遵循
线程数越大,吞吐量越大
的规律。当然,当线程数量达到一个阈值之后,吞吐量会下降。通过这些数据还能得到一个隐式的结论:
不论哪种confirm模式,通过调整客户端线程数量,都可以达到一个最大吞吐量值。无非是达到这个最大值的代价不同,比如异步模式需要少量线程数就能达到,而普通模式需要大量线程数才能达到。
最后再从相同线程数量(100线程数)的维度下,分析下不同confirm模式的性能数据:
批量,50 msg/批
批量,100 msg/批
批量,500 msg/批
3828 msg/s
3551 msg/s
3567 msg/s
3378 msg/s
由此可见,选取了一个典型的线程数量(100)后,普通confirm模式性能相比批量和异步模式,差了一个数量级。
从以上所有的数据分析来看,异步和批量confirm模式两者没有明显的性能差距,实际上他们的实现原理是一样,无非是客户端SDK进行了不同的封装而已。所以,只需从可编程性的角度选择异步或批量或者两者结合的模式即可。相比而言,选择普通confirm模式只剩编程简单这个理由了。
回到本文开头提到的不同队列服务之间的性能对比,实际上,我认为RabbitMQ最大的优势在于它提供了最灵活的消息路由策略、高可用和可靠性,可靠性又分为两部分(消息可靠性和软件可靠性),以及丰富的插件、平台支持和完善的文档。然而,由于AMQP协议本身的灵活性导致了它比较重量,所以造成了它相比某些队列服务(如Kafka)吞吐量处于下风。因此,当选择一个消息队列服务时,关键还是看需求上更看重消息吞吐量、消息堆积能力还是路由灵活性、高可用、可靠传输这些方面,只有先确定使用场景,根据使用场景对不同服务进行针对性的测试和分析,最终得到的结论才能成为技术选型的依据。
擅写水文,立志于写出白痴都看得懂的技术文章。
原文地址:, 感谢原作者分享。
您可能感兴趣的代码rabbitmq消费者接受json数据类型的问题
来源:it-home
今天在公司解决一个bug,同事没搞定,喊我过去,当时没看出来是任何错误,后来在rabbitmq的控制台发现原来是一个队列绑定了多个routingKey,导致该消费的没消费,不想消费的拿过去消费了,然后就报错了。而我们的队列监听配置的是,如果消息处理有异常,直接抛弃,不再放回队列中。够悲惨!这个问题解决了,还不够,后来发现类型转换异常了,坑爹,扒开代码一看,我擦嘞,居然是map转消费者的对象错误。何故?这里就要扯到我们这边对消费者接受到消息的处理封装了。1.原来我们用原生的,发现这个项目要是需要消费另外一个项目的生产的消息,那么这两个项目就得依赖了,因为你要用到生产方的类型了,不愿意这么做!2.后来我们使用spring-amqp中的jackson转换,这样看似是好的,可是用着就发现还是不方便,消费方的对象必须与生产方的对象的包路径+类名都要完全一致,当然也包括字段名了,用着很不爽,所以还是给废弃了。3.后来领导封装了一层东西,在消费方接到消息,处理之前,将其中的__TypeId__给替换成消费方对象的包路径+类名,这样我们就不用纠结要和生产方高度一致了,见代码(MessageListenerAdapter4Json-version1.0.java)4.这回就要扯到上面的问题了,用到泛型怎么办?如何单纯的用3的话,list中的泛型将被Jackson转出map了,到消费方处理时,自然要转成自己的对象,这里就报错了5.晚上回来,我扒了一下spring-amqp的代码,然后把泛型加上去了。见代码(MessageListenerAdapter4Json-version2.0.java)相应的配置,见对于的rabbitmq-version.1.0.xml或者rabbitmq-version.2.0.xml
package message.amqp.import com.rabbitmq.client.Cimport org.springframework.amqp.AmqpIllegalStateEimport org.springframework.amqp.core.Mimport org.springframework.amqp.core.MessageLimport org.springframework.amqp.rabbit.core.ChannelAwareMessageLimport org.springframework.amqp.rabbit.listener.adapter.MessageListenerAimport org.springframework.amqp.support.converter.AbstractJavaTypeMimport java.lang.reflect.Mimport java.lang.reflect.ParameterizedTimport java.lang.reflect.T/** * 由于原来的amqp在接受json数据转成Java对象时,需要按照原来message中__TypeId__来转换,这里重写类是为了替换这里的__TypeId__为消费者的&br/& * 数据类型,并且加入泛型的判断. * * @author sunhao(sunhao.) * @version V1.0,
19:09 */public class MessageListenerAdapter4Jackson extends MessageListenerAdapter {
public void onMessage(Message message, Channel channel) throws Exception {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
Object delegate = getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) {
((ChannelAwareMessageListener) delegate).onMessage(message, channel);
} else if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
+ "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message);
String methodName = getListenerMethodName(message, null);
if (methodName == null) {
throw new AmqpIllegalStateException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
Method[] methods = delegate.getClass().getMethods();
for (Method method : methods) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
String className = method.getParameterTypes()[0].getName();
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, className);
Object convertedMessage = extractMessage(message);
Object result = method.invoke(delegate, convertedMessage);
if (result != null) {
handleResult(result, message, channel);
}}复制代码
package message.amqp.import com.rabbitmq.client.Cimport org.springframework.amqp.AmqpIllegalStateEimport org.springframework.amqp.core.Mimport org.springframework.amqp.core.MessageLimport org.springframework.amqp.rabbit.core.ChannelAwareMessageLimport org.springframework.amqp.rabbit.listener.adapter.MessageListenerAimport org.springframework.amqp.support.converter.AbstractJavaTypeMimport java.lang.reflect.Mimport java.lang.reflect.ParameterizedTimport java.lang.reflect.T/** * 由于原来的amqp在接受json数据转成Java对象时,需要按照原来message中__TypeId__来转换,这里重写类是为了替换这里的__TypeId__为消费者的&br/& * 数据类型,并且加入泛型的判断. * * @author sunhao(sunhao.) * @version V1.0,
19:09 */public class MessageListenerAdapter4Jackson extends MessageListenerAdapter {
public void onMessage(Message message, Channel channel) throws Exception {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
Object delegate = getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) {
((ChannelAwareMessageListener) delegate).onMessage(message, channel);
} else if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
+ "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message);
String methodName = getListenerMethodName(message, null);
if (methodName == null) {
throw new AmqpIllegalStateException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
Method[] methods = delegate.getClass().getMethods();
for (Method method : methods) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
String className = method.getParameterTypes()[0].getName();
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, className);
//泛型的参数类型(如果只有一个参数,那么就取第一个)
Type[] types = method.getGenericParameterTypes();
if (types.length &= 0 && types[0] instanceof ParameterizedType) {
//存在泛型
ParameterizedType pType = (ParameterizedType) types[0];
Type t = pType.getActualTypeArguments()[0];
if (t instanceof Class) {
//泛型类型
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, ((Class) t).getName());
Object convertedMessage = extractMessage(message);
Object result = method.invoke(delegate, convertedMessage);
if (result != null) {
handleResult(result, message, channel);
}}复制代码
&!-- 定义队列 --&&rabbit:queue name="demo.queue" durable="true" exclusive="false" auto-delete="false"/&&!-- 通过名称进行绑定 --&&bean id="demoBinding" class="message.amqp.core.QueueBinding"& &constructor-arg index="0" value="exchange.direct.default"&&/constructor-arg& &constructor-arg index="1" value="demo.queue"&&/constructor-arg& &constructor-arg index="2" value="routingDemoKey"&&/constructor-arg&&/bean&&!-- 定义消息监听,注意队列是对象而不是名称 --&&rabbit:listener-container connection-factory="rabbitConnectionFactory"
requeue-rejected="false" message-converter="jsonMessageConverter" error-handler="messageErrorHandler"& &rabbit:listener queue-names="demo.queue" ref="demoService" method="handleMessage" /&&/rabbit:listener-container&复制代码
&!-- 定义队列 --&&rabbit:queue name="demo.queue" durable="true" exclusive="false" auto-delete="false" /&&!-- 通过对象进行绑定 --&&bean id="demoBinding" class="message.amqp.core.QueueBinding"& &constructor-arg index="0" value="exchange.direct.default"&&/constructor-arg& &constructor-arg index="1" value="demo.queue"&&/constructor-arg& &constructor-arg index="2" value="routingDemoKey"&&/constructor-arg&&/bean&&bean id="demoListener" class="message.amqp.core.MessageListenerAdapter4Json"& &property name="messageConverter" ref="jsonMessageConverter" /& &property name="delegate" ref="demoService" /& &property name="defaultListenerMethod" value="handleMessage" /&&/bean&&!-- 定义消息监听,注意队列是对象而不是名称 --&&rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="messageErrorHandler" requeue-rejected="false"& &rabbit:listener queue-names="demo.queue" ref="demoListener" /&&/rabbit:listener-container&复制代码
免责声明:本站部分内容、图片、文字、视频等来自于互联网,仅供大家学习与交流。相关内容如涉嫌侵犯您的知识产权或其他合法权益,请向本站发送有效通知,我们会及时处理。反馈邮箱&&&&。
学生服务号
在线咨询,奖学金返现,名师点评,等你来互动

我要回帖

更多关于 rabbitmq 没有消费者 的文章

 

随机推荐