ActiveMQ为什么switch更新慢得比较慢

折腾ActiveMQ时遇到的问题和解决方法:
1.先讲严重的:服务挂掉。
这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的&systemUsage&节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。
那如果文件增大到达了配置中的最大限制的时候会发生什么?我做了以下实验:
设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。
设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。整个系统可连接,但是无法提供服务,就这样挂了。
具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。
详细配置信息见文档: http://activemq.apache.org/producer-flow-control.html
2.丢消息。
这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。
通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。
解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。
关于java.net.SocketException请看我的详细研究: http://blog.163.com/_kid/blog/static//
3.持久化消息非常慢。
默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。
4.消息的不均匀消费。
有时在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。
解决方案:将prefetch设为1,每次处理1条消息,处理完再去取,这样也慢不了多少。
详细文档: http://activemq.apache.org/what-is-the-prefetch-limit-for.html
5.死信队列。
如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将ack交由程序自己处理。那如果使用了AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有!
消费消息有2种方法,一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新抛异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回--获取--报错死循环了吗?
在重试6次后,ActiveMQ认为这条消息是“有毒”的,将会把消息丢到死信队列里。如果你的消息不见了,去ActiveMQ.DLQ里找找,说不定就躺在那里。
详细文档: http://activemq.apache.org/redelivery-policy.html
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
阅读(...) 评论()Spring与activemq整合,消费者的消费速度非常慢 - ITeye问答
我使用spring与activemq整合发送消息,发送速度可以达到10000/s,但是消费者的消费速度却是非常慢,求给位大神帮忙!!!这是我配置文件的内容:&bean id="innerNgbfJmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&
&!-- &property name="brokerURL" value="failover:(tcp://10.253.45.103:61616)"
&property name="brokerURL" value="${ngbf.jms.provider.url}" /&
&!--慢消费者设置为true,快消费者设置为false--&
&property name="dispatchAsync" value="true" /&
&!--&property name="producerWindowSize" value="" /&
&property name="useAsyncSend" value="true" /&--&
&!-- 设置发送连接池,提高性能 --&
&bean id="ngbfJmsSenderFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop"&
&property name="connectionFactory" ref="innerNgbfJmsFactory" /&
&!-- &property name="maxConnections" value="100"&&/property&
&property name="maximumActive" value="100"&&/property&
&property name="exclusiveConsumer" value="true"&&/property&--&
&!-- 设置接收连接池,和发送区分以提高性能 --&
&bean id="ngbfJmsReveiverFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop"&
&property name="connectionFactory" ref="innerNgbfJmsFactory" /&
&!--&property name="sessionCache" value="10"&&/property&--&
&!--&property name="maxConnections" value="100"&&/property&
&property name="maximumActive" value="100"&&/property&--&
&!-- 发送模板 --&
&bean id="ngbfJmsTemplate" class="org.springframework.jms.core.JmsTemplate"&
&property name="connectionFactory" ref="ngbfJmsSenderFactory" /&
&property name="messageConverter" ref="ngbfMessageByteConvert" /&
&!-- &property name="defaultDestination" ref="destination" /&--&
&!-- 消息转换器 --&
&bean id="ngbfMessageConvert" class="com.kingstar.ngbf.s.jms.MessageUnitConvert" /&
&bean id="ngbfMessageTextConvert" class="com.kingstar.ngbf.s.jms.MessageUnitTextConvert" /&
&bean id="ngbfMessageByteConvert" class="com.kingstar.ngbf.s.jms.MessageUnitByteConvert" /&
&!-- 缺省的消息监听适配器 --&
&bean id="ngbfDefaultMessageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter"
abstract="true"&
&property name="defaultListenerMethod" value="handleMessage" /&&!--handleMessage&receive--&
&property name="messageConverter" ref="ngbfMessageByteConvert" /&
&!--定义destination --&
&!--&bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"&&
&&&&&&& &constructor-arg index="0"&&
&&&&&&&&&&& &value&NGBF.Queue?consumer.exclusive=true&consumer.priority=10&/value&&
&&&&&&& &/constructor-arg&&
&&& &/bean&--&
&!-- 定义缺省的Queue消息监听器 --&
&bean id="ngbfDefaultQueueMessageListener" parent="ngbfDefaultMessageListenerAdapter"&
&constructor-arg&
&bean class="com.kingstar.ngbf.s.jms.MessageUnitQueueListener" /&
&/constructor-arg&
&!--&bean id="ngbfDefaultQueueMessgeListener1" class="org.springframework.jms.listener.adapter.MessageListenerAdapter" &
&constructor-arg&
&bean class="com.kingstar.ngbf.s.jms.MessageUnitQueueListener1" /&
&/constructor-arg&
&property name="defaultListenerMethod" value="handleMessage" /&
&property name="messageConverter" ref="ngbfMessageByteConvert" /&
&/bean&--&
&!-- 定义缺省的Topic消息监听器 --&
&bean id="ngbfDefaultTopicMessageListener" parent="ngbfDefaultMessageListenerAdapter"&
&constructor-arg&
&bean class="com.kingstar.ngbf.s.jms.MessageUnitTopicListener" /&
&/constructor-arg&
&bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"&
&property name="corePoolSize" value="10" /&&&
&&&&&&& &property name="maxPoolSize" value="100"/&&&
&&&&&&& &property name="queueCapacity" value="1000"/&&&
&&&&&&& &property name="keepAliveSeconds" value="300"/&
&!-- 缺省的消息监听器Queue容器,可以被子类集成改写 --&
&bean id="ngbfDefaultQueueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
lazy-init="false"&
&property name="connectionFactory" ref="ngbfJmsReveiverFactory" /&
&property name="destinationName" value="NGBF.Queue" /&
&&& &property name="pubSubDomain" value="false" /&
&!-- 事务值为false,接受速度明显提高--&
&!-- &property name="sessionTransacted" value="false"/&--&
&property name="messageListener" ref="ngbfDefaultQueueMessageListener" /&
&!-- 多线程监听? --&
&!-- maxMessagesPerTask值小于-1,循环接收消息--&
&property name="maxMessagesPerTask" value="-1"/&
&property name="concurrentConsumers" value="10" /&
&property name="maxConcurrentConsumers" value="100" /&
&property name="idleTaskExecutionLimit" value="2" /&
&&& &property name="receiveTimeout" value="10000" /&&&
&property name="cacheLevel" value="3"/&
&property name="taskExecutor" ref="taskExecutor"/&
&/bean&
&bean id="ngbfDefaultQueueListener" parent="ngbfDefaultQueueListenerContainer"&
&property name="messageSelector" value="${ngbf.jms.queue.selector}" /&
&!--&property name="destinationName" value="${ngbf.jms.queue.default}" /&--&
&property name="destinationName" value="NGBF.Queue" /&
看不到listener代码,不确定是否在其中阻塞,建议多加时间戳,排查哪个阶段耗时多
已解决问题
未解决问题ActiveMQ长期运行后无法接受消息 - ITeye问答
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://10.10.10.101:61616");
Connection connection = factory.createConnection();
connection.start();
final FileWriter fw=new FileWriter("log_.txt",true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("receive");
MessageConsumer consumer1 = session.createConsumer(topic);
consumer1.receive(600000);
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("收到一条消息: 开始收听消息");
TextMessage tm = (TextMessage)
fw.append(tm.getText()+"\r\n");
System.out.println(tm.getText());
fw.flush();
//System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
/*session.close();
connection.stop();
connection.close();*/
哪位高手给看下,这个是接受消息的,运行一天两天后就无法接受到消息了。
日志文件:
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10001ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor ReadCheckTimer: ] - [ DEBUG ]
30000ms elapsed since last read check.
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 9999ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10001ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor ReadCheckTimer: ] - [ DEBUG ]
30000ms elapsed since last read check.
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor ReadCheckTimer: ] - [ DEBUG ]
30000ms elapsed since last read check.
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
[ ActiveMQ InactivityMonitor ReadCheckTimer: ] - [ DEBUG ]
30000ms elapsed since last read check.
[ ActiveMQ InactivityMonitor WriteCheckTimer: ] - [ DEBUG ]
WriteChecker: 10000ms elapsed since last write check.
[ ActiveMQ InactivityMonitor Worker: ] - [ DEBUG ]
Running WriteCheck[tcp://127.0.0.1:61616]
因为consumer1.receive(600000);指定了接收时间,到时就结束了。
ActiveMq不是提供了一个WEB管理服务吗?该网站内有详细的监控数据,建议你可以看看。
我曾经搞过超大数据量的MQ丢数据的问题,是一家客户的服务器,情况跟你的差不多,就是时间久了丢数据,或者接收不到数据,怎么差都查不出原因,但后来解决掉了,
你可以尝试一下,我看到你用的java吧,大概有两种方式:
在这之前,你先一些一个程序,他是可以独立运行的,或者通过传参数运行的,这个程序就是接收MQ数据和处理mq数据的程序,接收完数据,如果发现MQ里面的数据没有了,就退出进程,注意该程序一定要是单线程的,多线程下也会出现接收不到的情况/
1.如果你是用的windows来接收的,可以在windows下面新建一个“计划任务”,任务就是定时启动你上面的那个程序
2.你还可以通过别的方式启动,比如你写一个java的服务程序,这个程序跟windows的任务计划一样,定时处理程序,
-----------------------
就是你不能长时间和MQ服务器连接,长时间连接,一定会丢数据或者获取失败。你要通过多进程,而每个进程又是单线程的方式启动。
如果你的数据量大,你可以同时启动多个进程,传入不同的参数,来改成程序接收的数据范围。
第一个程序:根据入口参数,接收自己的数据,处理,如理完以后进程结束,
第二个程序:写一个专门启动和结束进程的服务,这个服务器随着开机启动,注意,启动进程不要太频繁,一个小时启动一次。
----------------
我可能表达不清楚,因为我这里也没有环境了,不好写代码给你,你可以加我QQ:
已解决问题
未解决问题activemq有三种索引系统默认的是Store-based 参考文档
第一种索引Store-based
消息接收后,首先完成消息存储的工作,判断是否有空闲的内存可用,如果有的话,直接进入DisPatchQueue,如果没有可用内存,维护一个指针,当需要消息的时候,直接从消息存储的介质里每次读取一批消息,然后存入DisPatchQueue。(此消息存储的大小取决于storeUsage+temUsage设置的大小)速度适中
第二种索引 VM Cursor
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntrytopic="&" producerFlowControl="true" memoryLimit="100mb"&
&pendingSubscriberPolicy&
&vmQueueCursor/&
&/pendingSubscriberPolicy&
&/policyEntry&
&/policyEntries&
&/policyMap&
&/destinationPolicy&
消息接收后,首先完成消息存储的工作,然后直接把消息存放在内存中的DisPatchQueue(此消息存储的大小取决于memoryLimit设置的大小,没有设置就是memoryLimit的大小,也就是
jvm的70%)。速度最快
第三种索引 File based Cursor
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntrytopic="&" producerFlowControl="true" memoryLimit="100mb"&
&pendingSubscriberPolicy&
&fileQueueCursor/&
&/pendingSubscriberPolicy&
&/policyEntry&
&/policyEntries&
&/policyMap&
&/destinationPolicy&
消息接收后,首先完成消息存储的工作是否有空闲的内存可用,如果有的话,消息直接进入DisPatchQueue,如果没有可用内存,把消息写入临时文件中,当需要消息的时候,直接从临时文件中读写一批,然后送入DisPatchQueue。(此消息存储的大小取决如tempUsage的大小)速度最慢,慢消费者可以采用
测试结果(已下测试消息内容都是2KB,message3KB)
消息cursor类型
最佳使用场景
Store-based
当内存不够时,需要进行1次消息存储操作,性能在3种方式中居中
最好(storeUsage+temUsage使用完后假死)(12G保存3.5百万)
activeMQ默认使用该cursor,因为它能满足大部分场景需要
当内存不够时,需要进行2次消息存储操作,并且在删除消息的时候也就相应的要删除2次,性能在3种方式中最差
居中(当tempUsage满后假死)
主要用在当消息存储慢(如消息是放在数据库里),并且消费者相对快的情况下
在内存够的情况下,3种message cursor性能一样
如果不对队列进行内存限制,消息在内存中消耗完jvm内存后假死(对队列进行限制后不影响其他队列)
248142个消息
很快,适合消费慢需要抑制消息产生的场景,以及消费快的场景
ActiveMQ使用经验与优化
1.1 不要频繁的建立和关闭连接
JMS使用长连接方式,一个程序,只要和JMS服务器保持一个连接就可以了,不要频繁的建立和关闭连接。频繁的建立和关闭连接,对程序的性能影响还是很大的。这一点和jdbc...
【ActiveMQ】JMS中间件ActiveMQ详解
Java Message Service(JMS)是SUN提出的旨在统一各种MOM(Message-Oriented Middleware )系统接口的规范,它包含点对点(Point...
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka。这几种MQ到底应该选择哪个?要根据自己项目的业务场景和需求。下面我列出这些MQ之间的对比数据和资料。
ActiveMQ构建高稳定性消息中间件
ActiveMQ 构建高稳定性的消息中间件可以采用多种方式,其中的一种方式是采用共享文件的方式,这种部署方式比较简单.
采用这种方式部署的主要优点是系统的稳定性高,一旦一个broker因为异常关闭,...
架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)
根据这个系列文章所陈述的中心思想,系统的性能层次包括:代码级性能、规则性能、存储性能、网络性能,以及多节点协同方法(集群方案),所以我们优化ActiveMQ的中心思路也是这样的:首先优化ActiveM...
ActiveMQ消息队列获取每个队列中的消费者数、剩余消息数、已消费数、队列名等信息 示例
ActiveMQ消息队列获取每个队列中的消费者数、剩余消息数、已消费数、队列名等信息 示例...
ActiveMQ学习总结(8)——消息队列设计精要
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的Activ...
Activemq实现Mysql与SolrCloud同步策略
应用场景:
商品数量很多,搜索功能为了保证用户体验,就使用了solrcloud,可是后面添加的商品可以入库,如何导入到solr库当中呢。我们用到了mq实现。再插入商品之后,发送一...
淘淘商城系列——添加商品同步到索引库以及消息机制测试
我们在添加商品时需要与索引库进行同步,这样每添加一个商品,索引库就多一个文档,这样做的好处是不用把数据库中的所有数据进行同步,大大提高了性能并且节约了时间。
我们要做的是当添加商品的时候发送Acti...
ActiveMQ源码解析(二):聊聊客户端和broker的通讯
ActiveMQ支持以下几种通讯协议:
HTTP/HTTPS
基于http协议
性能更好,但不可靠
没有更多推荐了,求助 activeMQ 消费者消费消息速度优化问题!
[问题点数:20分]
本版专家分:15
结帖率 92.86%
CSDN今日推荐
匿名用户不能发表回复!
其他相关推荐

我要回帖

更多关于 英雄联盟更新慢 的文章

 

随机推荐