rabbitmq支持mqtt盘p2p吗

数加&大数据分析及展现
数加&大数据应用
管理与监控
阿里云办公
培训与认证
域名与网站(万网)
数加&人工智能
数加&大数据基础服务
互联网中间件
开发者工具
钉钉智能硬件
云服务器 ECS
&&&&&&部署RabbitMQ
部署RabbitMQ
更新时间: 21:50:58
RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
部署方式在阿里云服务器下部署 RabbitMQ 提供两种部署方式:
手动部署(源码编译安装)
一般推荐镜像部署适合新手使用更加快捷方便,安装包部署以及手动部署适合对 Linux 命令有基本了解的用户,可以满足用户个性化部署的要求。本教程主要介绍镜像和手工部署的方式。
进入镜像详情页。
单击 立即购买,按提示步骤购买 ECS 实例。
在左边导航栏里,单击 实例,进入 ECS 实例列表页。
选择所购 ECS 实例所在的地域,并找到所购 ECS 实例,在 IP 地址 列获取该实例的公网 IP 地址。
在浏览器地址栏中输入公网 IP 地址,下载操作文档。
使用 putty 登录 Linux 服务器,请参考;忘记 root 密码,请参考。
初始化 rabbitmq。
cd /root/oneinstack ./init_rabbitmq.sh
进入管理页面,浏览器访问 http://公网IP:15672 。
系统平台:CentOS 7.3rabbitmq版本:rabbitmq-server -3.6.9erlang版本:erlang19.3JDK版本:JDK1.8.0_121
前提准备创建一般用户 rabbitmq,运行 rabbitmq。
useradd rabbitmq
设置 Linux 主机名。
CentOS 7 修改 /etc/hostname,CentOS 6 修改 /etc/sysconfig/network,下面以 CentOS 7 为例:
echo rabbit1 & /etc/hostnamehostname rabbit1exit
#退出重新登录
安装依赖包yum -y install make gcc gcc-c++ m4 ncurses-devel openssl-devel unixODBC-devel
源代码下载
下载 Erlang:wget http://erlang.org/download/otp_src_19.3.tar.gz。下载 rabbitmq:wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-generic-unix-3.6.9.tar.xz
安装 Erlangtar xzf otp_src_19.3.tar.gz #解压cd otp_src_19.3./configure --prefix=/usr/local/erlang --enable-shared-zlib --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javacmake && make install
解压 RabbitMQ
解压 rabbitmq-server-generic-unix-3.6.9.tar.xz 。
tar xvJf rabbitmq-server-generic-unix-3.6.9.tar.xzmv rabbitmq_server-3.6.9
/usr/local/rabbitmq
rabbitmq 环境变量配置。
sed -i 's@^ERL_DIR=.*@ERL_DIR=/usr/local/erlang/bin/@' /usr/local/rabbitmq/sbin/rabbitmq-defaultssed -i 's@^LOG_BASE=.*@LOG_BASE=/usr/local/rabbitmq/var/log/rabbitmq@' /usr/local/rabbitmq/sbin/rabbitmq-defaultsmkdir -p /usr/local/rabbitmq/var/{lib,log}/rabbitmq
一般用户(rabbitmq)运行 RabbitMQ。
wget http://pkgs.fedoraproject.org/cgit/rpms/rabbitmq-server.git/plain/rabbitmq-script-wrappersed -i 's@cd /var/lib/rabbitmq@cd /usr/local/rabbitmq/var/lib/rabbitmq@g' rabbitmq-script-wrapper
#更改rabbitmq数据存储目录sed -i 's@/usr/lib/rabbitmq/bin/@/usr/local/rabbitmq/sbin/@g' rabbitmq-script-wrapperchmod +x rabbitmq-script-wrappercp rabbitmq-script-wrapper /usr/sbin/rabbitmqctlcp rabbitmq-script-wrapper /usr/sbin/rabbitmq-servercp rabbitmq-script-wrapper /usr/sbin/rabbitmq-pluginschown -R rabbitmq.rabbitmq /usr/local/rabbitmq/var
rabbitmq 日志割接。
cat && /etc/logrotate.d/rabbitmq-server && EOF/usr/local/rabbitmq/var/log/rabbitmq/*.log {weeklymissingokrotate 20compressdelaycompressnotifemptysharedscriptspostrotate /sbin/service rabbitmq-server rotate-logs & /dev/nullendscript}EOF
rabbitmq 启动脚本。
vi /etc/init.d/rabbitmq-server#!/bin/sh## rabbitmq-server RabbitMQ broker## chkconfig: - 80 05# description: Enable AMQP service provided by RabbitMQ#### BEGIN INIT INFO# Provides:
rabbitmq-server# Required-Start:
$remote_fs $network# Required-Stop:
$remote_fs $network# Description:
RabbitMQ broker# Short-Description: Enable AMQP service provided by RabbitMQ broker### END INIT INFO# Source function library.. /etc/init.d/functionsPATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/erlang/binNAME=rabbitmq-serverDAEMON=/usr/sbin/${NAME}CONTROL=/usr/sbin/rabbitmqctlDESC=rabbitmq-serverUSER=rabbitmqROTATE_SUFFIX=INIT_LOG_DIR=/usr/local/rabbitmq/var/log/rabbitmqPID_FILE=/var/run/rabbitmq/pidSTART_PROG="daemon"LOCK_FILE=/var/lock/subsys/$NAMEtest -x $DAEMON || exit 0test -x $CONTROL || exit 0RETVAL=0set -e[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}[ -f /etc/sysconfig/${NAME} ] && . /etc/sysconfig/${NAME}ensure_pid_dir () { PID_DIR=`dirname ${PID_FILE}` if [ ! -d ${PID_DIR} ] ; then
mkdir -p ${PID_DIR}
chown -R ${USER}:${USER} ${PID_DIR}
chmod 755 ${PID_DIR} fi}remove_pid () { rm -f ${PID_FILE} rmdir `dirname ${PID_FILE}` || :}start_rabbitmq () { status_rabbitmq quiet if [ $RETVAL = 0 ] ; then
echo RabbitMQ is currently running else
# RABBIT_NOFILES_LIMIT from /etc/sysconfig/rabbitmq-server is not handled
# automatically
if [ "$RABBITMQ_NOFILES_LIMIT" ]; then
ulimit -n $RABBITMQ_NOFILES_LIMIT
ensure_pid_dir
RABBITMQ_PID_FILE=$PID_FILE $START_PROG $DAEMON \
& "${INIT_LOG_DIR}/startup_log" \
2& "${INIT_LOG_DIR}/startup_err" \
$CONTROL wait $PID_FILE &/dev/null 2&&1
case "$RETVAL" in
echo SUCCESS
if [ -n "$LOCK_FILE" ] ; then
touch $LOCK_FILE
remove_pid
echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}
esac fi}stop_rabbitmq () { status_rabbitmq quiet if [ $RETVAL = 0 ] ; then
$CONTROL stop ${PID_FILE} & ${INIT_LOG_DIR}/shutdown_log 2& ${INIT_LOG_DIR}/shutdown_err
if [ $RETVAL = 0 ] ; then
remove_pid
if [ -n "$LOCK_FILE" ] ; then
rm -f $LOCK_FILE
echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err
echo RabbitMQ is not running
RETVAL=0 fi}status_rabbitmq() { set +e if [ "$1" != "quiet" ] ; then
$CONTROL status 2&&1 else
$CONTROL status & /dev/null 2&&1 fi if [ $? != 0 ] ; then
RETVAL=3 fi set -e}rotate_logs_rabbitmq() { set +e $CONTROL rotate_logs ${ROTATE_SUFFIX} if [ $? != 0 ] ; then
RETVAL=1 fi set -e}restart_running_rabbitmq () { status_rabbitmq quiet if [ $RETVAL = 0 ] ; then
restart_rabbitmq else
echo RabbitMQ is not runnning
RETVAL=0 fi}restart_rabbitmq() { stop_rabbitmq start_rabbitmq}case "$1" in start)
echo -n "Starting $DESC: "
start_rabbitmq
echo "$NAME."
echo -n "Stopping $DESC: "
stop_rabbitmq
echo "$NAME."
;; status)
status_rabbitmq
;; rotate-logs)
echo -n "Rotating log files for $DESC: "
rotate_logs_rabbitmq
;; force-reload|reload|restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;; try-restart)
echo -n "Restarting $DESC: "
restart_running_rabbitmq
echo "$NAME."
echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" &&2
;;esacexit $RETVAL
保存后,添加执行权限,并设置自启动。
chmod +x /etc/init.d/rabbitmq-serverchkconfig --add rabbitmq-serverchkconfig rabbitmq-server on
修改 rabbitmq.config 。
特别注意默认用户名密码,请自行修改 default_user,default_pass,loopback_users。
cat & /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.config && EOF[{rabbit, [ {tcp_listeners,[{"0.0.0.0",5672}]}, {tcp_listen_options, [binary, {packet,raw},
{reuseaddr,true},
{backlog,128},
{nodelay,true},
{exit_on_close,false},
{keepalive,true}]}, {default_vhost,
&&"/"&&}, {default_user,
&&"guest"&&}, {default_pass,
&&"guest"&&}, {loopback_users, ["guest"]}, {default_permissions, [&&".*"&&, &&".*"&&, &&".*"&&]}]}].EOF
开启 rabbitmq manager。
cat & /usr/local/rabbitmq/etc/rabbitmq/enabled_plugins && EOF[rabbitmq_management].EOF
启动 rabbitmq 。
service rabbitmq-server start
进入管理页面。
浏览器访问
更多开源软件尽在云市场: 。
本文导读目录
以上内容是否对您有帮助?
更新不及时
缺少代码/图片示例
太简单/步骤待完善
更新不及时
缺少代码/图片示例
太简单/步骤待完善
感谢您的打分,是否有意见建议想告诉我们?
感谢您的反馈,反馈我们已经收到浅析消息队列之rabbitMQ - 简书
浅析消息队列之rabbitMQ
rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。
rabbitMQ工作原理rabbitMQ里的一些基本定义如下:exchangeproducer只能将消息发送给exchange。而exchange负责将消息发送到queues。Exchange必须准确的知道怎么处理它接受到的消息,是被发送到一个特定的queue还是许多quenes,还是被抛弃,这些规则则是通过exchange type来定义。主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。queue消息队列,消息的载体。接收来自exchange的消息,然后再由consumer取出。exchange和queue是可以一对多的,它们通过routingKey来绑定。Producer生产者,消息的来源,消息必须发送给exchange。而不是直接给queueConsumer消费者,直接从queue中获取消息进行消费,而不是从exchange。从以上可以看出rabbitMQ工作原理大致就是producer把一条消息发送给exchange。rabbitMQ根据routingKey负责将消息从exchange发送到对应绑定的queue中去,这是由rabbitMQ负责做的,而consumer只需从queue获取消息即可。大致流程如下:
rabbitMQ工作模型下面通过几个列子来详细说明一下如何使用rabbitMQ。1、简单发送模型
在rabbitMQ里消息永远不能被直接发送到queue,这里我们通过提供一个空字符串来使用默认的exchange。这个exchange是特殊的,它可以根据routingKey把消息发送给指定的queue。所以我们的设计看起来如下所示:
代码如下send.py
receive.py
2、工作队列模型
这种模式常常用来处理耗资源耗时间的任务在多个workers中,主要是为了避免立即去处理一个耗时的任务而等待它的完成。代替的做法是一个稍后去处理这个任务,让一个worker process 在后台处理这个任务。当有许多workers的时候,消息将会以轮询的方式被workers获取。模型如下:
这里就会有一个问题,如果consumer在执行任务时需要花费一些时间,这个时候如果突然挂了,消息还没有被完成,消息岂不是丢失了,为了不让消息丢失,rabbitmq提供了消息确认机制,consumer在接收到,执行完消息后会发送一个ack给rabbitmq告诉它可以从queue中移除消息了。如果没收到ack,Rabbitmq会重新发送此条消息,如果有其他的consumer在线,将会接收并消费这条消息。消息确认机制是默认打开的。如果想关闭它只需要设置no_ack=true。在此处我们不需要设置。默认如下就行:channel.basic_consume(callback,
queue='hello')除了consumer之外我们还得确保rabbitMQ挂了之后消息不被丢失。这里我们就需要确保队列queue和消息messages都得是持久化的。队列的持久话需要设置durable属性:channel.queue_declare(queue= task_queue, durable=True)消息的持久话则是通过delivery_mode属性,设置值为2即可。channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))还有一个属性相对比较重要,它可以保证consumer确认消费完一条消息之后再去获取下一条消息。如果consumer正在忙碌的状态,消息将会被分发到下一个不是很忙的consumer。设置如下:channel.basic_qos(prefetch_count=1)下面贴出部分代码producer.py
consumer.py
3、广播模型
在前面2个示例我们都适用默认的exchange。这里我们将自己定义一个exchange。并设置type为fanout。它可以将消息广播给绑定的每一个queue。而不再是某一个queue。我们在此创建一个叫logs的exchange,就像下面这样:channel.exchange_declare(exchange='logs', type='fanout')所以发布消息就变成了下面这样:channel.basic_publish(exchange='logs',routing_key='', body=message)在这里我们需要将消息发送给所有的queues。而不需要指定某些队列。所以我们这里就用临时队列代替。并设置在失去连接后删除队列。当然我们也可以不这么做。设置临时队列,让rabbitmq给我们一个随机的队列名字,设置exclusive为true确保失去连接的时候队列也被删除了。因为我们这里不需要持久化队列。result = channel.queue_declare(exclusive=True)下面就是要绑定queues和exchange:channel.queue_bind(exchange='logs', queue=result.method.queue)综上所述我们的设计就像下面这样:
部分代码如下producer.py
consumer.py
4、direct模型
在上个模型中,消息被发送给所有的消费者,而在这一部分我们将通过路由的方式使exchange通过定义的路由方式将消息发送给队列。所以我们需要在绑定exchange和queue的时候指定routing_key字段,注意这里的routing_key不是basic_publish中的routing_key。见如下:channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')这里我们将使用type为direct的exchange。这种路由方式exchange将消息通过绑定的routing_key发送到指定的队列。而且exchange可以通过多个routing_key把消息发送给同一个queue。通过下面这张图我们来分析一下:
在上面的图中,我们可以看出type为direct的exchange X 绑定了2个队列。队列Q1关联路由orange。队列Q2关联路由black和green。所以一个带有路由健orange消息将被exchange发送给队列Q1。而带有路由健black或者green的消息将被发送给队列Q2。我们还是通过修改前面的日志系统,来展示direct类型的exchange如何工作,如图:
部分代码如下producer.py
consumer.py
让我们运行一下看看结果是什么,我们启动了3个consumer,routing_key分别指定为warning, error,第三个同时指定这2个。然后在运行producer时带上路由信息routing_key。运行后可以看出指定了warning的不会收到error的消息。同时指定warning 和error的consumer则会都收到消息。发送消息:
只收到warning的消息:
只收到error的消息:
error和waring的都能收到:
5、Topic模型
这种模型是最灵活的,相比较于direct的完全匹配和fanout的广播。Topic可以用类似正则的手法更好的匹配来满足我们的应用。下面我们首先了解一下topic类型的exchange:topic类型的routing_key不可以是随意的单词,它必须是一系列的单词组合,中间以逗号隔开,譬如“quick.orange.rabbit”这个样子。发送消息的routing_key必须匹配上绑定到队列的routing_key。消息才会被发送。此外还有个重要的地方要说明,在如下代码处绑定的routing_key种可以有*和#两种字符channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)它们代表的意义如下*(星号)可以匹配任意一个单词#(井号)可以匹配0到多个单词我们通过下图来解释一下:
Q1匹配3个单词中间为orange的routing_key ,而Q2可以匹配3个单词最后一个单词为rabbit和第一个单词为lazy后面可以有多个单词的routing_key。下面贴上部分示例:producer.py
consumer.py
6、RPC应用模型
当我们需要在远程服务器上执行一个方法并等待它的结果的时候,我们将这种模式称为RPC。下面我们用rabbitMQ建立一个RPC系统在rabbit MQ中为了能让client收到server端的response message。需要定义一个callback queue ,就像下面这样:
不过现在有一个问题,就是每次请求都会创建一个callback queue .这样的效率是极其低下的。幸运的是我们可以通过correlation_id为每一个client创建一个单独的callback queue。通过指定correlation_id我们可以知道callback queue中的消息属于哪个client。要做到这样只需client每次发送请求时带上这唯一的correlation_id。然后当我们从callback queue中收到消息时,我们能基于 correlation_id 匹配上我们的消息。匹配不上的消息将被丢弃,看上去就像下图这样:
总结一下流程如下:client发起请求,请求中带有2个参数reply_to和correlation_id请求发往rpc_queueserver获取到rpc_queue中的消息,处理完毕后,将结果发往reply_to指定的callback queueclient 获取到callback queue中的消息,匹配correlation_id,如果匹配就获取,不匹配就丢弃。示列代码参考:http://www.rabbitmq.com/tutorials/tutorial-six-python.html从上面的6个示例我们大致了解了如何运用rabbitMQ解决我们的实际需求,下面我们再来看看如何管理和监控rabbitMQ的实际运行情况。rabbitMQ的管理和监控1、rabbitmq management插件
rabbitMQ提供了一个管理插件,通过这个插件我们可以查看当前rabbitMQ服务的运行情况。在解压缩官网提供的rabbitMQ安装包之后,在sbin目录可以看见rabbitmq-plugins文件,我们只需运行一下命令:rabbitmq-plugins enable rabbitmq_management然后再游览器中输入http://server-name:15672/就可以查看当前rabbitMQ的一些运行状况。如下所示:
在这个管理控制台我可以做很多事情,譬如:查看运行的exchanges,queues,users,virtual hosts还有权限添加exchanges,queue,users,virtual host,以及给用户赋予权限监控消息长度,通道,消息速度。连接数发送接收消息关闭连接,清除队列2、rabbitmqctl使用
在rabbitMQ中,rabbitctl是一个被广泛使用的命令。对用户的增加,删除,列出列表,创建权限,都是通过rabbitmqctl完成的。下面举几个例子来熟悉一下如何使用创建一个用户名和密码都为test的新用户./rabbitmqctl
add_user test test删除的话使用以下命令./rabbitmqctl
delete_user test列出所有用户./rabbitmqctl
list_users同样也可以用此命令为用户赋予权限譬如我们想为用户test在vhost rabbitmq赋予全部访问权限,只许执行如下命令./rabbitmqctl set_permissions –p rabbitmq test “.*” “.*” “.*”列出权限./rabbitmqctl list_permissions –p rabbitmq删除权限./rabbitmqctl clear_permissions –p rabbitmq同样的rabbitmqctl也可以用来查看rabbitmq的运行状况,如下列出队列和消息数目./rabbitmqctl
list_queues –p rabbitmq如果想要了解更多的队列消息,譬如名字,消息数目,消费者数目,内存使用情况,以及其他属性 。则可以发送一下命令:./rabbitmqctl list_queues name messages consumers memory durable auto_delete列出exchanges相关信息./rabbitmqctl list_exchanges
auto_deleterabbitmqctl还有很多功能,这里不一一例举了。有兴趣的可以去官方网站查看。rabbitMQ集群且听下回分解。本文作者:吕翔(点融黑帮),来自点融北京技术团队,从事过社交和P2P领域,对互联网金融比较感兴趣。平时喜欢爬山打球。
点融黑帮——一个充满激情和梦想的技术团队,吸引了来自金融及信息科技领域的顶尖人才。我们正在用技术创新改变传统金融。(公众号:DianrongMafia)
1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在...
1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/downloads 这是RabbitMQ环境、客户端、实例和说明文档的地址http://www.rabbitmq.com/download.html 2 RabbitM...
Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式...
关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。 市面上的消息队列产品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ...
来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。 Erla...
中国五金工具OYT数控刀具! 这是我的第149篇原创日记,相信日积月累的力量! 因为有朋友来吃饭,于是我就去买菜,正好爱人从家里带来了婆婆晒的菜干,于是我就买了两斤五花肉,准备做肉炒菜干。 记得我们小时候,我们家每年都会养一只猪过年杀,肉可以从过年吃到端午节,我们读书时带的...
出书训练营 第70天 邢大侠
感谢生命中的每一次相遇 因为你的出现 我的人生更加丰富 喜的 怒的 乐的 悲的 它们构筑了属于我们的生活 或短或长 感谢有过你 感谢生命中的每一次相遇 感谢你的出现 我伤了你 或你伤了我 放下的 追忆的 愧疚的 祝福的 因为生命中你的出现 我的人生更加精彩 感谢有过你
周四正如所料,次新股大涨,最后周五又形成次新股暴跌。不管怎样,目前大盘还走在上行通道内。请看指数的走势图:
从图中看,自9月中旬指数就一直走在向上的箱体中。周四见顶回落,还是在箱体范围内。一般的交易策略是在上轨高抛,在下轨低吸。只要不有效跌破下轨就一直采取这个策略...
昨天孩子放学,回到家一屁股坐下来开始看她的《知否知否,应是绿肥红瘦》。也不知看了多少遍,这么爱不释手。
孩子告诉我,在回家坐地铁的路上,她听了一路的英语,感赏她愿意自发自动学习。回家孩子虽然嘴上说不听英语,但吃饭时,我...Rabbitmq实际项目 - 网盘合集 - 盘搜搜文件名:3和昌(城市)电话访问项目名称电话问卷1xlsx , 文件大小:1KB , 分享者:151*****133 , 分享时间: , 下载次数:65次文件名:网赚微课堂第179期 适合春节期间操作的项目,月入5000 , 文件大小:1KB , 分享者:扬帆起飞abc , 分享时间: , 下载次数:81次文件名:s022 如何直观地做项目管理?swf 30 轻松思考 , 文件大小:1KB , 分享者:安稳的小家6 , 分享时间: , 下载次数:19次
文件名: 02 1024好项目pdf
, 文件大小:1KB , 分享者:简简单单德爱情 , 分享时间: , 下载次数:51次文件名:01生成项目与组态硬件mp4 plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:78次文件名:第29课时如何保存项目文件01avi plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:49次文件名:第30课时如何保存项目文件02avi plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:20次文件名:第23课时项目的建立01avi plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:41次文件名:第55课时库与多重项目的建立02avi plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:48次文件名:第54课时库与多重项目的建立01avi plc s编程资料 , 文件大小:1KB , 分享者:问*教你 , 分享时间: , 下载次数:34次利用Python操作消息队列RabbitMQ的方法教程
转载 & & 作者:Shawn
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。下面这篇文章主要给大家介绍了关于利用Python操作消息队列RabbitMQ的方法教程,需要的朋友可以参考下。
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
应用场景:
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:
&&&& 1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。
&&&& 2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。
&&&& 3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。
一、安装环境
首先是在 Linux 上安装 rabbitmq
# 环境为CentOS 7
yum install rabbitmq-server # 安装RabbitMQ
systemctl start rabbitmq-server # 启动
systemctl enable rabbitmq-server # 开机自启
systemctl stop firewall-cmd
# 临时关闭防火墙
然后用 pip 安装 Python3 的开发包
pip3 install pika
安装好软件之后可以访问http://115.xx.xx.xx:15672/来访问自带的 web 页面来查看和管理 RabbitMQ。默认管理员的用户密码都是guest
二、简单的向队列中加入消息
#!/usr/bin/env python3
# coding=utf-8
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email :
# @purpose : RabbitMQ_Producer
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
print("sent...")
# 关闭连接
connection.close()
三、简单的从队列中获取消息
#!/usr/bin/env python3
# coding=utf-8
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email :
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 定义一个回调函数
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()
四、万一消费者掉线了
想象这样一种情况:
消费者从消息队列中获取了 n 条数据,正要处理呢结果宕机了,那该怎么办?在 RabbieMQ 中有一个 ACK 可以用来确认消费者处理结束。就有点类似网络中的 ACK,消费者每次从队列中获取了数据之后队列不会立刻将数据移除,而是等待对应的 ACK。消费者获取到数据并处理完成之后会向队列发送一个 ACK 包,通知 RabbitMQ 这堆消息已经处理妥当了,可以删除了,这时候 RabbitMQ 才会将数据从队列中移除。所以这种情况下即使消费者掉线也没有什么问题,数据依旧会在队列中存在,留给其他消费者处理。
在 Python 中这样实现:
消费者有这样一行代码channel.basic_consume(callback, queue='test_queue', no_ack=False) ,其中no_ack=False表示不发送确认包。将其修改为no_ack=True就会在每次处理完之后向 RabbitMQ 发送一个确认包,以确认消息处理完毕。
五、万一 RabbitMQ 宕机了呢
虽然有了 ACK 包,但是万一 RabbitMQ 挂了那数据还是会损失。所以我们可以给 RabbitMQ 设置一个数据持久化存储。RabbitMQ 会将数据持久化存储到磁盘上,保证下次再启动的时候队列还在。
在 Python 中这样实现:
我们声明一个队列是这样的channel.queue_declare(queue='test_queue') ,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True) 。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,)) 。
六、最简单的发布订阅
最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
#!/usr/bin/env python3
# coding=utf-8
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email :
# @purpose : RabbitMQ_Publisher
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='my_fanout', # 指定exchange
routing_key='', # fanout下不需要配置,配置了也不会生效
body=message)
connection.close()
订阅者代码:
#!/usr/bin/env python3
# coding=utf-8
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email :
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='my_fanout',
queue=queue_name)
# 定义回调方法
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 从队列获取信息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
以上就是这篇文章的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具

我要回帖

更多关于 rabbitmq 支持的协议 的文章

 

随机推荐