rabittmq当没有消费者时,消息队列 多个消费者会丢掉吗

下次自动登录
现在的位置:
& 综合 & 正文
rabbitmq技术的一些感悟(一)
初识rabbitmq
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍
几个概念说明:
Broker:简单来说就是消息队列服务器实体。Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息队列载体,每个消息都会被投入到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。Routing Key:路由关键字,exchange根据这个关键字进行消息投递。vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。producer:消息生产者,就是投递消息的程序。consumer:消息消费者,就是接受消息的程序。channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
由Exchange,Queue,RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
消息队列的使用过程大概如下:
(<span style="color:#)客户端连接到消息队列服务器,打开一个channel。  (<span style="color:#)客户端声明一个exchange,并设置相关属性。  (<span style="color:#)客户端声明一个queue,并设置相关属性。  (<span style="color:#)客户端使用routing key,在exchange和queue之间建立好绑定关系。  (<span style="color:#)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括<span style="color:#个部分:  (<span style="color:#)exchange持久化,在声明时指定durable =& 1  (<span style="color:#)queue持久化,在声明时指定durable =& 1  (<span style="color:#)消息持久化,在投递时指定delivery_mode=& 2(<span style="color:#是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
安装开发环境和库
1.将目录中的librabbitmq.so.1放到目录 /usr/local/lib/librabbitmq.so.1
2.安装rabbitm需要的环境和库
yum install -y ncurses-devel
yum install gcc
yum install g++
yum install cmake
yum install make
yum install php
yum install mysql
yum install php-process
yum install php-devel
yum install mysql-server
#安装php的amq支持扩展
wget http://pecl.php.net/get/amqp-1.0.3.tgz
tar zxvf amqp-1.0.3.tgz
cd amqp-1.0.3
/usr/bin/phpize
./configure--with-php-config=/usr/bin/php-config --with-amqp
make && make install
#php.ini 添加
vi /etc/php.ini
extension="amqp.so"
#安装erlang支持
wgethttp://www.erlang.org/download/otp_src_R15B01.tar.gz
tar -zxvf otp_src_R15B01.tar.gz
cd otp_src_R15B01
./configure --prefix=/home/erlang--without-javac
make && make install
ln -s /home/erlang/bin/erl/usr/local/bin/erl
3. 安装rabbitma
解压rabbitmq-server-generic-unix-3.3.4.tar
进入sbin目录:
启动rabbitmq服务,执行 nohup./rabbitmq-server start &
启动rabbitmq服务器以及命令
当第一次启动服务,检测数据库是否未初始化或者被删除,它会用下面的资源初始化一个新的数据库:
一个命名为 / 的虚拟宿主一个名为guest密码也为guest的用户,他拥有/虚拟宿主的所有权限如果你的中间件是公开访问的,最好修改guest用户的密码。管理概观rabbitmqctl 是RabbitMQ中间件的一个命令行管理工具。它通过连接一个中间件节点执行所有的动作。本地节点默认被命名为”rabbit”。可以通过这个命令前使用”-n”标志明确的指定节点名称, 例如:# rabbitmqctl -n rabbit@shortstop add_user tonyg changeit
这个命令指示RabbitMQ中间件在rabbit@shortstop 节点创建一个tonyg/changeit的用户。
在一个名为””的主机,RabbitMQ Erlang节点的名称通常是rabbit@server(除非RABBITMQ_NODENAM在中间件启动时候被设置)。hostnam -s 的输出通常是”@”符号正确的后缀。rabbitmqctl 默认产生详细输出。通过”-q”标示可选择安静模式。rabbitmqctl -q status应用和集群管理1.停止RabbitMQ应用,关闭节点
# rabbitmqctl stop
2.停止RabbitMQ应用
# rabbitmqctl stop_app
3.启动RabbitMQ应用
# rabbitmqctl start_app
4.显示RabbitMQ中间件各种信息
# rabbitmqctl status
5.重置RabbitMQ节点
# rabbitmqctl reset
# rabbitmqctl force_reset
从它属于的任何集群中移除,从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息。
force_reset命令和reset的区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。如果数据库或者集群配置发生错误才使用这个最后的手段。
注意:只有在停止RabbitMQ应用后,reset和force_reset才能成功。
6.循环日志文件
# rabbitmqctl rotate_logs[suffix]
7.集群管理
# rabbitmqctl cluster clusternode…
1.添加用户
# rabbitmqctl add_user username password
2.删除用户
# rabbitmqctl delete_user username
3.修改密码
# rabbitmqctl change_password usernamenewpassword
4.列出所有用户
# rabbitmqctl list_users
权限控制1.创建虚拟主机
# rabbitmqctl add_vhost vhostpath
2.删除虚拟主机
# rabbitmqctl delete_vhost vhostpath
3.列出所有虚拟主机
# rabbitmqctl list_vhosts
4.设置用户权限
# rabbitmqctl set_permissions [-pvhostpath] username regexp regexp regexp
5.清除用户权限
# rabbitmqctl clear_permissions [-pvhostpath] username
6.列出虚拟主机上的所有权限
# rabbitmqctl list_permissions [-pvhostpath]
7.列出用户权限
# rabbitmqctl list_user_permissionsusername
rabbitmqctl add_vhost az
rabbitmqctl set_permissions -p az guest".*" ".*" ".*"
amqp_connection_state_tamqp_new_connection(void);
接口说明:声明一个新的amqp connection
intamqp_open_socket(char const *hostname, int portnumber);
接口说明:获取socket.
参数说明:hostname
RabbitMQ server所在主机
portnumber
RabbitMQ server监听端口
voidamqp_set_sockfd(amqp_connection_state_t state,int sockfd);
接口说明:将amqp connection和sockfd进行绑定
amqp_rpc_reply_tamqp_login(amqp_connection_state_t state, char const *vhost,intchannel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method,...);
接口说明:用于登录RabbitMQ server,主要目的为了进行权限管理;
参数说明:state
amqpconnection
rabbit-mq的虚机主机,是rabbit-mq进行权限管理的最小单位
channel_max
最大链接数,此处设成<span style="color:#即可
和客户端通信时所允许的最大的frame size.默认值为<span style="color:#1072,增大这个值有助于提高吞吐,降低这个值有利于降低时延
含义未知,默认值填<span style="color:#
sasl_method
用于SSL鉴权,默认值参考后文demo
amqp_channel_open_ok_t*amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
接口说明:用于关联conn和channel
amqp_exchange_declare_ok_t*amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive,amqp_boolean_t durable, amqp_table_t
arguments);
接口说明:声明declare
参数说明:state
"fanout" "direct" "topic"三选一
amqp_queue_declare_ok_t*amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,amqp_boolean_t exclusive, amqp_boolean_t
auto_delete, amqp_table_targuments);
接口说明:声明queue
参数说明:state
amqp connection
queue name
队列是否持久化
当前连接不在时,队列是否自动删除
aoto_delete
没有consumer时,队列是否自动删除
用于拓展参数,比如x-ha-policy用于mirrored queue
amqp_queue_bind_ok_t*amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
接口说明:声明binding
amqp_basic_qos_ok_t*amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_tprefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
接口说明:qos是 quality of service,我们这里使用主要用于控制预取消息数,避免消息按条数均匀分配,需要和no_ack配合使用
参数说明:state
prefetch_size
以bytes为单位,<span style="color:#为unlimited
prefetch_count
预取的消息条数
amqp_basic_consume_ok_t*amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local,amqp_boolean_t no_ack, amqp_boolean_t
exclusive, amqp_table_t arguments);
接口说明:开始一个queue consumer
参数说明:state
consumer_tag
是否需要确认消息后再从队列中删除消息
int amqp_basic_ack(amqp_connection_state_tstate,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);
intamqp_basic_publish(amqp_connection_state_t state,amqp_channel_tchannel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_tmandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_
const*properties,amqp_bytes_t body);
接口说明:发布消息
参数说明:state
routing_key
当exchange为默认“”时,此处填写queue_name,当exchange为direct,此处为binding_key
参见参考文献<span style="color:#
properties
更多属性,如何设置消息持久化,参见文后demo
amqp_rpc_reply_tamqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,intcode);
amqp_rpc_reply_tamqp_connection_close(amqp_connection_state_t state,int code);
intamqp_destroy_connection(amqp_connection_state_t state);
&&&&推荐文章:
【上篇】【下篇】RabbitMQ介绍2 - 理解消息AMQP
时间: 10:28:59
&&&& 阅读:341
&&&& 评论:
&&&& 收藏:0
标签:理解消息AMQP通信。官方解释:
概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key。
producer发布消息,消息经过交换器传播放入队列,消费者从队列中得到消息。
ConnectionFactory, connection, channel信道。connectionFactory用来建立连接,connection表示一个TCP连接,channel是一个建立在connection上的虚拟连接。每条信道通过一个唯一的ID标记,发布消息、绑定、消费消息这些操作都是在信道上完成。connection和channel的关系类似于进程和线程。
问题:在同一个channel能订阅多个queue吗?生产者和消费者能使用同一个channel吗?同一个channel能给多个exchange发消息吗?
消息包括有效荷载payload和标签label,payload是需要传输的数据,可以是任何东西(C#编程的时候就是一个byte数组),label是传输数据的一些描述,比如routing key,持久化delivery mode。
消息持久化。exchange,queue,message都要是持久化的,消息才会在RabbitMQ重启后还保留,任何一个环节不是持久化的,消息都不会恢复。持久化会可能会使吞吐量降低10倍(采用SSD存储可以大大缓解),而且在集群环境工作的也不好。如果集群中一个节点崩溃,其上的持久化队列便从集群中消失了,在这个节点重启前,所有发送到这些队列的消息都会丢失。作为对比,非持久化队列在节点崩溃后,会在其它节点重建,消息便能发送到重建的队列上。
消息确认。消费者接收到的每一条消息都必须进行确认。消费者必须通过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候设置auto_ack为true。auto_ack为true时,一旦消费者接收消息,RabbitMQ会自动认为其进行了确认。只有确认过的消息,才会从队列中删除。如果消费者接收了消息,但没有进行确认,消息会进入等待确认状态,这时候如果消费者的连接断开,RabbitMQ会清除这些消息的状态,并发送给队列的其它消费者。如果消费者一直不进行确认,消息会堆积在队列中,不会超时。消费者也可以明确拒绝一个消息(basic.reject命令),拒绝的时候如果requeue参数为true,消息会重新发送给下一个订阅的消费者,requeue为false时会从队列中删除消息,这时候的效果和发送确认类似,不同的是2.0以后版本的RabbitMQ会维护被拒绝的消息队列,以便后续调查。
AMQP事务transaction。由于发布消息不返回任何信息给生产者,你怎么知道服务器是否已经保存了持久化消息到硬盘呢?事务用来确认发送者的消息已经进入了队列(或者被丢弃)。在把信道设置为事务模式后,这条信道上的消息将按照串行方式发送,只有前面的消息确认成功了,才会执行后面的,直到提交事务,结束信道的事务模式。事务会极大的降低RabbitMQ的吞吐量(2~10倍)。
发送方确认模式。这是RabbitMQ的概念,不属于AMQP。类似AMQP事务的功能,但是不需要串行,极大的提高了吞吐量。使用方法:将信道设置为confirm模式,只有重新创建信道才能关闭该设置。所有信道上发送的消息都会指派一个唯一的ID(从1开始),一旦消息投递成功,信道会发送一个确认给生产者。生产者通过回调接收信道的确认消息(如果消息丢失,会收到nack),与此同时,生产者也可以继续发送消息。由于没有事务的消息回滚,发送方确认模式更加轻量,对RabbitMQ服务器的影响也可以忽略不计。
我们没有办法得到信道指派的这个ID(basic_publish不会返回ID信息),那怎么确认信道确认消息中的ID具体指的是哪个消息呢?方法是发送者自己维护消息和ID的关系。一个信道一般只有一个线程使用,ID从1开始,每次递增1,发送者完全可以自己算出消息的ID。
Queue。队列是唯一存储消息的地方,一条消息路由后要么进入队列,要么被丢弃。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
多个消费者可以订阅同一个Queue,这时Queue中的消息会被按照round-robin分摊给多个消费者,而不是每个消费者都收到所有的消息。
Prefetch count。消费者从队列中订阅消息的时候,RabbitMQ会把队列中的全部消息推送到消费者,然后等待确认。通过设置prefetch count可以限定消息确认前最多发送给消费者的消息数量。在有多个消费者订阅一个队列的时候,设置这个参数可以帮助实现负载平衡。注意这里提到的订阅是持续订阅,通过basic.get进行的单次订阅不在此列。
创建队列。生产者和消费者都可以使用queue.declare创建队列,如果消费者在同一条信道上订阅了另一个队列的话,就无法再创建队列了。创建队列时如果不指定队列的名称,RabbitMQ会分配一个随机名称,并在queue.declare的结果中返回。创建队列如果已经存在,只要声明参数和现存队列完全一样,RabbitMQ什么都不做,创建过程成功返回,如果参数不匹配,会抛出异常。如果想检查队列是否存在,创建时设置参数passive为true,如果队列存在,创建成功返回,否则返回错误。
队列属性:exclusive,如果为true的话,只有你的程序才能消费队列。Auto-delete,如果为true的话,在最后一个消费者取消订阅的时候,队列会自动删除。
Exchange。生成者将消息发送到exchange,exchange根据消息的routing key,路由消息到相应的队列。不同的交换器类型exchange type提供不同的路由方案,exchange type有4种:direct,fanout,topic和headers。
direct。消息的routing key需要精确匹配binding key。
当生产者(P)发送的消息Rotuing key=booking时,发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果以Rotuing key=create或Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。
注意RabbitMQ有一个默认Exchange,类型是direct,没有办法绑定一个queue到这个exchange,但是通过指定routing key,可以发送消息到同名的queue。
fanout。这个类型的exchange会发送消息到所有绑定在上面的queue,routing key和binding key不起作用。
生产者(P)生产消息1将消息1推送到Exchange,Exchange将消息推送到所有与它绑定Queue,最后两个消费者都会收到消息。
topic。Binding key可以使用*和#来对routing key进行模糊匹配,*表示一个单词,#匹配0或多个单词,一个单词是“.”分割的一段。点分割的部分可以为空,比如bindingkey=*.*的可以匹配routingkey是&.&的消息,但是bindingkey=“*”无法匹配routingkey是空的消息。可以简单理解,先将routingkey按照stringsplit方法用&.&分割,保留空白字符,*表示匹配分割后的一个位置,#表示匹配0…n个位置。其余字符需要精确匹配。
Will &*& binding catch a message sent with an empty routing key? 不会
Will &#.*& catch a message with a string &..& as a key? Will it catch a message with a single word key? 都会
How different is &a.*.#& from &a.#&? 前者routingkey要有&a.&,后者只要有&a&就能匹配。
点号“. ”分隔的每一段字符串称为一个单词,如“quick.orange.rabbit”包含3个单词quick,orange,rabbit。
*表示一个单词,#表示0或多个单词。
binding key使用“*”与“#”来模糊匹配routing key。routing key中的“*”或“#”当做普通字符处理。
当生产者发送消息Routing Key=F.C.E时,只会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同时路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中
headers。headers交换器允许你匹配AMQP消息的header而非路由键,除此之外,和direct交换器完全一样,但是性能会差很多,因此并不太实用,而且几乎再也用不到了。可以在绑定的时候提供一组键值对,如果消息的header(也是一组键值对)和其完全匹配,则路由消息到队列。
vhost虚拟主机。RabbitMQ能创建多个vhost,每个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的exchange,queue,和权限机制,类似于虚拟机之于物理服务器,vhost提供实例逻辑上的分离,区分开RabbitMQ众多的客户,避免queue和exchange命名冲突。默认的vhost是“/”,可以通过rabbitmqctl工具来管理vhost。
Rabbitmqctl add_vhost vhostName。创建vhost
Rabbitmqctl delete_vhost vhostName。删除vhost
Rabbitmqctl list_vhost。列出全部vhost
&&国之画&&&& &&&&chrome插件&&
版权所有 京ICP备号-2
迷上了代码!spring与RabbitMQ整合出现消费者无法消费的解决方法
spring与RabbitMQ整合出现消费者无法消费的解决方法:RabbitMQ是当前一个挺火的消息队列中间件 相比ActiveMQ 消息更不容易丢失。
我之前用的是ActiveMQ 后边有的时候会莫名其妙的收不到消息 项目紧后边也没时间排查 经朋友的推荐下 换了RabbitMQ 后边用着也没啥问题
今天 的RabbitMQ 突然就出了问题 生产者发送消息 消费者监听不到 消费者重启才能接收到 这样的情况肯定不行 项目上线的话要是出现这种问题影响很大的
进入RabbitMQ的监控中心 登录 在queue里边找到自己的queue 进入 在里边可以看到消息发送情况
昨天我看了一下我的queue监控 Unacked是1以上 ready是1以上 ready好像是未消费消息
网上查了一下 好像说消息未确认 但是却消费了消息 这时候消息堵塞 不知情况如何 先记录一下
查看了我的spring配置文件 我当时的acknowledge为auto 接收到消息后自动确认消息 于是就在想 是不是确认不成功了
于是乎 在网上查了手动确认的资料
1.在rabbitmq整合spring的配置文件中 把下述配置中的acknowledge改为manual
2.监听器继承ChannelAwareMessageListener 然后重写onMessage 对消息进行手动确认
@Component(&searchListenner&)
public class SearchListenner implements ChannelAwareMessageListener
public void onMessage(Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//这个就是消息确认的语句 再往下边写自己的逻辑就好
System.out.println(message.getBody());
} catch (Throwable e) {
getLog().error(e.getMessage(), e);RabbitMQ-从基础到实战(4)—消息的交换(中)
RabbitMQ-从基础到实战(4)—消息的交换(中)
时尚的科技
1.简介本章节和官方教程相似度较高,英文好的可以移步官方教程在上一章的例子中,我们创建了一个消费者,生产日志消息,广播给两个消费者,对消息进行不同的处理。这一节,我们将对它进行扩展,实现一些更加高级的功能,例如:使消费者A只接受error级别的日志保存到硬盘,消费者B接收所有级别的消息进行打印。本文中涉及到的所有概念(包括前面几章),都将摒弃个人经验,以官方文档为基础进行讲解,在书写本文的同时,也是我对RabbitMQ的重新学习。2.绑定回顾一下上一章的队列绑定代码// 把刚刚获取的队列绑定到logs这个交换中心上,channel.queueBind(queueName, &logs&, &&);这段代码在消费者中,为什么生产者没有?因为在RabbitMQ中消息是发送到交换中心(exchange)的,这在上一张已经重点强调过。上述代码可以理解成,queueName这个队列对logs这个exchange中的消息感兴趣,routingKey是&&在发送消息的basicPublish方法中,也有一个参数叫做routingKey,没错,他们是有关联的,下面会介绍在不同的exchange类型中,routingKey扮演的角色也相应的不同,比如上一章我们使用的fanout(扇出,多贴切的名字,想象一下WOW中盗贼的刀扇)将忽略routingKey,所有绑定在fanout类型的exchange上的队列,都将接收到该exchange上的所有消息。3.Direct Exchangefanout类型的exchange没有给我们太多的灵活性,direct类型的echange非常简单,会匹配消息发布时的routingKey和queue的routingKey,完全相等则把消息放入该队列。如上图,Q1绑定了orange,Q2绑定了black和green,就可以实现不同级别的日志用不同的消费者进行处理我们看到Q2绑定了两个routingKey,难道第二次绑定不会把第一次绑定覆盖掉吗?实践出真正,我们来试一下声明一个名为logs的exchange,类型换为direct,让它通过routingKey的完全匹配去分发消息然后把消息发送到名为logs的exchange上,routingKey是外面传进来的改造一下发送方法,轮流发送info和error信息给Consummer队列绑定两个routingKey激动人心的时刻到来了,跑一把Duang,报错了报错信息:inequivalent arg &#39;type&#39; for exchange &#39;logs&#39; in vhost &#39;/&#39;: received &#39;direct&#39; but current is &#39;fanout&#39;, class-id=40, method-id=10)大意就是logs exchange已经被声明称fanout了,不能再声明成direct类型,RabbitMQ的队列声明方法和exchange声明方法都是幂等的,如果没有,就创建,如果有,参数相同,就不管,如果有了还用不同的参数重新声明,就报错进入RabbitMQ控制台把logs删除,重新执行逆袭成功,消费一下看看成功了,一个队列可以绑定多个routingKey,这里注意先启动消费者,因为前面的代码里我们用的是临时队列,断开连接后,队列就删除了,如果先启动生产者,exchange接到消息后发现没有队列对它感兴趣,就任性的把消息给丢掉了。一个队列可以绑定多个routingKey,反之,一个routingKey也可以绑定多个队列,如下图,感兴趣的朋友可以自己试一下如果绑定在一个direct类型的exchange上的队列都使用同一个routingKey,那它就是一个fanout4.实战要实现本章的需求,即Q1只接收error级别的日志写到硬盘上,Q2接收error和info级别的日志打印出来用direct类型的exchange来实现这个需求非常简单,Q1绑定error,Q2绑定error和info即可,缺点是Q2需要绑定N个routingKey,N=日志级别数量,我们可以用一些编程的技巧来规避它Sender的代码上面已经改好了,把exchange换为direct,注意删除原exchange,不再赘述Q2绑定所有日志级别,我们用一个Enum来规避手动绑定定义一个Enum1 public enum LogType{ 2 error, 3 }用foreach语法糖进行循环绑定1 //绑定所有类型 2 for(LogType logType: LogType.values){ 3 channel.queueBind(queueName, &logs&, logType.name); 4 }foreach是单线程的,这里也可以装个逼用一下JAVA8的lambda,由于lambda是并行处理,所以外围的try catch无效,需要在内层重新抓取异常,而且不能抛出,反而显得代码很不美好,装逼失败1 IntStream.range(0, LogType.values.length).forEach(n-&{ 2 try { 3 channel.queueBind(queueName, &logs&, LogType.values[n].name); 4 } catch (IOException e) { 5 e.printStackT 6 } 7 });把另外一个Consumer改成只绑定error队列1 channel.queueBind(queueName, &logs&, LogType.error.name);然后,改造一下发送消息的地方,一开始我们用了一个while还有一组if else,看起来比较挫,别人看你代码的时候,就不会觉得你很厉害,这样和你不写博客一样,对你的工作是没有好处的,我们把它改的高端一点1 while(true){ 2 boolean info = ++i%2==0; 3 String type = .name:LogType.error. 4 sender.sendMessage(type +& message: &+i, type); 5 Thread.sleep(1000); 6 }对比一下是不是觉得自己厉害了很多?这就是编码的艺术(得意脸)好了,这一章没有太多内容,跑一下看看结果左边的Consumer1,消费了info和error级别的日志,右边的Consumer2,只消费了error级别的日志5.结束语这一章主要是介绍了RabbitMQ中direct类型的exchange,下一章将跟着官方教程的进度继续介绍topic类型的exchange,以及下下章介绍用RabbitMQ实现RPC调用。之后则会介绍RabbitMQ与Spring的集成等与真实开发环境更相关的技术。
本文仅代表作者观点,不代表百度立场。系作者授权百家号发表,未经许可不得转载。
时尚的科技
百家号 最近更新:
简介: 商业地产,独家观点。独立性与客观性。
作者最新文章

我要回帖

更多关于 华为消费者bg成立时间 的文章

 

随机推荐