rabbitmq面试题怎么推送给移动端

摘要:相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果。

在团队协同工具 的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于XMPP协议、Erlang语言实现的Ejabberd,并在其源码基础上,结合我们的业务,对源码作了修改以适配我们自身的需求。另外,基于AMQP协议也可以作为实时消息推送的一种选择,踢踢网就是采用 RabbitMQ+STOMP协议实现的消息推送服务。本文将结合我在Worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。

实时推送的几种实现方式

相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果,这些技术大体可分为以下几类:

1)短轮询。页面端通过JS定时异步刷新,这种方式实时效果较差。

2)长轮询。页面端通过JS异步请求服务端,服务端在接收到请求后,如果该次请求没有数据,则挂起这次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例如下:

3)WebSocket。浏览器通过WebSocket协议连接服务端,实现了浏览器和服务器端的全双工通信。需要服务端和浏览器都支持WebSocket协议。

以上几种方式中,方式1实现较简单,但效率和实时效果较差。方式2对服务端实现的要求比较高,尤其是并发量大的情况下,对服务端的压力很大。方式3效率较高,但对较低版本的浏览器不支持,另外服务端也需要有支持WebSocket的实现。Worktile的WEB端实时消息推送,采用的是XMPP扩展协议XEP-0124 BOSH( ),本质是采用方式2长轮询的方式。踢踢网则采用了WebSocket连接RabbitMQ的方式实现,下面我会具体介绍如何用这两种方式实现Server Push。

服务端的实现中,无论采用Ejabberd还是RabbitMQ,都是基于Erlang语言开发的,所以必须安装Erlang运行时环境。Erlang是一种函数式语言,具有容错、高并发的特点,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于Erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中间件有RabbitMQ等。对于服务端程序员来说,Erlang提供的高并发、容错、热部署等特性是其他语言无法达到的。无论在实时通信还是在游戏程序中,用Erlang可以很容易为每一个上线用户创建一个对应的Process,对一台4核8个G的服务器来说,承载上百万个这样的Process是非常轻松的事。下图是Erlang程序发起Process的一般性示意图:

如图所示,Session Manager(or Gateway)负责为每个用户(UID)创建相对应的Process, 并把这个对应关系(MAP)存放到数据表中。每个Process则对应用户数据,并且他们之间可以相互发送消息。Erlang的优势就是在内存足够的情况下创建上百万个这样的Process,而且它的创建和销毁比JAVA的Thread要轻量的多,两者不是一个数量级的。

其中,User_Path是根据用户名密码进行校验,VHOST_Path是校验是否有权限访问VHOST, Resource_Path是校验用户对传入的Exchange、Queue是否有权限。我下面的代码是用Node.js实现的这三个接口的示例:

浏览器端JS实现,示例代码如下:

到目前为止,基于RabbitMQ+STOMP实现WEB端消息推送就已经完成,其中很多的细节需要小伙伴们亲自去实践了,这里就不多说了。实践过程中可以参照官方文档:


以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下现在在Worktile中如何通过Ejabberd实现消息推送。

基于Ejabberd的实时消息推送

由于自身业务的需要,我们对Ejabberd的用户认证和好友列表模块的源码进行修改,通过Redis保存用户的在线状态,而不是Mnesia和MySQL。另外好友这块我们是从已有的数据库中(MongoDB)中获取项目或团队的成员。Web端通过Strophe.JS来连接(HTTP-BIND),Strophe.JS可以以长轮询和WebSocket两种方式来连接,由于Ejabberd还没有好的WebSocket的实现,就采用了BOSH的方式模拟长连接。整个系统的结构如下:

Android可以用Smack直接连Ejabberd服务器集群。这些都是现有的库,无需对Client进行开发。在线状态根据用户UID作为KEY定义了在线、离线、忙等状态存放于Redis中。好友列表从MongoDB的Project表中获取。用户认证直接修改了Ejabberd_Auth_Internal.erl文件,通过MongoDB驱动连接用户库,在线状态等功能是新加了模块,其部分代码如下:

在用Tsung对Ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登录、设置在线状态、发送一条文本消息、关闭连接操作,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。由于压力测试比较耗时,再等到有时间的时候,会在做一些更深入的测试。


免费订阅“CSDN云计算(左)CSDN大数据(右)”微信公众号,实时掌握第一手云中消息,了解最新的大数据进展!

版权声明:本文为博主原创文章,未经博主允许不得转载。 /qq_/article/details/

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。

客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)

服务器端收到消息并处理

服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性

客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

默认启动的是不启动管理模块的,不能登录

ERLANG_HOME会默认创建环境变量不配置 rabbitmq 也可以正常启动,但我当时管理模块一直起不来

本文提出的方案是基于 RabbitMQ 消息服务器,因此最开始需要安装 RabbitMQ 服务及相关插件。RabbitMQ 是基于 Erlang 语言开发的,所以首先必须安装 Erlang 运行时环境。下面以 CentOS6.5 64 位服务器为例,讲述整个服务的安装过程:

5. 验证是否安装成功

RabbitMQ 有很多第三方插件,可以在AMQP 协议基础上做出许多扩展的应用。Web STOMP 插件就是基于 AMQP 之上的 STOMP 文本协议插件,利用 WebSocket 能够轻松实现浏览器和服务器之间的实时消息传递。

此模式下,发送队列的一方把消息存入mq的指定队列后,若有消费者端联入相应队列,即会获取到消息,并且队列中的消息会被消费掉。

若有多个消费端同时连接着队列,则会已轮询的方式将队列中的消息消费掉。

发送过队列后,可在MQ服务器中查看队列状态

 

接收队列后,查看一下队列状态

当rabbitMQ意外宕机时,可能会有持久化保存队列的需求(队列中的消息不消失)。

 

执行后查看队列,记下队列名字与队列中所含消息的数量

 

当producer发送消息到队列后,所有的consumer都会收到消息,需要注意的是,此模式下producer与concerned之间的关系类似与广播电台与收音机,如果广播后收音机没有接受到,那么消息就会丢失。

 

    我是在解决分布式事务的一致性问题时了解到RabbitMQ的,当时主要是要基于RabbitMQ来实现我们分布式系统之间对有事务可靠性要求的系统间通信的。关于分布式事务一致性问题及其常见的解决方案,可以看我另一篇博客。提到RabbitMQ,不难想到的几个关键字:消息中间件、消息队列。而消息队列不由让我想到,当时在大学学习操作系统这门课,消息队列不难想到生产者消费者模式。(PS:操作系统这门课程真的很好也很重要,其中的一些思想在我工作的很长一段一时间内给了我很大帮助和启发,给我提供了许多解决问题的思路。强烈建议每一个程序员都去学一学操作系统!)

、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。其主要特点如下:

    RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息。其整体模型架构如下图所示:

我们先来看一个RabbitMQ的运转流程,稍后会对这个流程中所涉及到的一些概念进行详细的解释。

(2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
(3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
(4)生产者通过路由键将交换器和队列绑定起来
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
(6)相应的交换器根据接收到的路由键查找相匹配的队列。
(7)如果找到,则将从生产者发送过来的消息存入相应的队列中。
(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

(2)消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
(3)等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
(4)消费者确认(ack) 接收到的消息。
(5)RabbitMQ 从队列中删除相应己经被确认的消息。

这里我们主要讨论两个问题:

连接。而操作系统对于TCP连接的创建于销毁是非常昂贵的开销。假设消费者要消费消息,并根据服务需求合理调度线程,若只进行TCP连接,那么当高并发的时候,每秒可能都有成千上万的TCP连接,不仅仅是对TCP连接的浪费,也很快会超过操作系统每秒所能建立连接的数量。如果能在一条TCP连接上操作,又能保证各个线程之间的私密性就完美了,于是信道的概念出现了。

指令都是通过信道完成的。信道就像电缆里的光纤束。一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。

  • Producer:生产者,即消息投递者一方。
  • 消息:消息一般分两个部分:消息体(payload)和标签。标签用来描述这条消息,如:一个交换器的名称或者一个路由Key,Rabbit通过解析标签来确定消息的去向,payload是消息内容可以使一个json,数组等等。
  • Consumer:消费者,就是接收消息的一方。消费者订阅RabbitMQ的队列,当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,会丢弃标签,存入到队列中的只有消息体。
  • Broker:消息中间件的服务节点。
2.2.3 队列、交换器、路由key、绑定

    从RabbitMQ的运转流程我们可以知道生产者的消息是发布到交换器上的。而消费者则是从队列上获取消息的。那么消息到底是如何从交换器到队列的呢?我们先具体了解一下这几个概念。

    Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中。生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。(注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。)

    Exchange:交换器。在RabbitMQ中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。

    RoutingKey:路由Key。生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。

    Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。

    从这里我们可以看到在RabbitMQ中交换器和队列实际上可以是一对多,也可以是多对多关系。交换器和队列就像我们关系数据库中的两张表。他们同归BindingKey做关联(多对多关系表)。在我们投递消息时,可以通过Exchange和RoutingKey(对应BindingKey)就可以找到相对应的队列。

  • fanout:扇形交换器,它会把发送到该交换器的消息路由到所有与该交换器绑定的队列中。如果使用扇形交换器,则不会匹配路由Key。

  • topic:完全匹配BindingKey和RoutingKey的direct交换器 有些时候并不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与direct 类型的交换器相似,也是将消息路由到BindingKey 和RoutingKey 相匹配的队
    列中,但这里的匹配规则有些不同,它约定:

  • BindingKey 中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"*"用于匹配一个单词,"#"用于匹配多规格单词(可以是零个)。
  • header:headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中
    的headers 属性进行匹配。在绑定队列和交换器时制定一组键值对, 当发送消息到交换器时,
    RabbitMQ 会获取到该消息的headers (也是一个键值对的形式) ,对比其中的键值对是否完全
    匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由
    到该队列。(注:该交换器类型性能较差且不实用,因此一般不会用到)。

了解了上面的概念,我们再来思考消息是如何从交换器到队列的。首先Rabbit在接收到消息时,会解析消息的标签从而得到消息的交换器与路由key信息。然后根据交换器的类型、路由key以及该交换器和队列的绑定关系来决定消息最终投递到哪个队列里面。

这里我们基于docker来安装。

 
 
这里我们以dotnet平台下RabbitMQ.Client3.6.9(可以从nuget中下载)为示例,简单介绍dotnet平台下对RabbitMQ的简单操作。更详细的内容可以从nuget中下载源码和文档进行查看。
 
 
说明:Connection 可以用来创建多个Channel 实例,但是Channel 实例不能在线程问共享,应用程序应该为每一个线程开辟一个Channel 。某些情况下Channel 的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认( publisherconfrrm)机制的运行,所以多线程问共享Channel实例是非线程安全的。
3.2.3 交换器、队列和绑定
 
如上创建了一个持久化的、非自动删除的、绑定类型为direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(此队列的名称由RabbitMQ 自动生成)。这里的交换器和队列也都没有设置特殊的参数。
上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下特性: 只对当前应用中同一个Connection 层面可用,同一个Connection 的不同Channel可共用,并且也会在应用连接断开时自动删除。
上述方法根据参数不同,可以有不同的重载形式,根据自身的需要进行调用。

ExchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
  • durable: 设置是否持久化。durab l e 设置为true 表示持久化, 反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete : 设置是否自动删除。autoDelete 设置为true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:"当与此交换器连接的客户端都断开时, RabbitMQ 会自动删除本交换器"。
  • internal : 设置是否是内置的。如果设置为true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
 


不带任何参数的queueDeclare 方法默认创建一个由RabbitMQ 命名的(类似这种amq.gen-LhQzlgv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列〉、排他的、自动删除的、非持久化的队列。
  • durable: 设置是否持久化。为true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive : 设置是否排他。为true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接( Connection) 可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete: 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
 
注意:生产者和消费者都能够使用queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输"模式,之后才能声明队列。

将队列和交换器绑定的方法如下:
  • routingKey: 用来绑定队列和交换器的路由键;
 
将队列与交换器解绑的方法如下:
其参数与绑定意义相同。
注:除队列可以绑定交换器外,交换器同样可以绑定队列。即:ExchangeBind方法,其使用方式与队列绑定相似。
 
  • exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ 默认的交换器中。
  • routingKey : 路由键,交换器根据路由键将消息存储到相应的队列之中。
  • mandatory: 是否将消息返回给生产者(会在后续的文章中介绍这个参数).
 
 

  • noAck : 设置是否需要确认,false为需要确认。
  • consumerTag: 消费者标签,用来区分多个消费者:
  • consumer: 指定处理消息的消费者对象。
 
 
在应用程序使用完之后,需要关闭连接,释放资源:
显式地关闭Channel 是个好习惯,但这不是必须的,在Connection 关闭的时候,Channel 也会自动关闭。

 
以上简单介绍了分布式系统中消息中间件的概念与作用,以及RabbitMQ的一些基本概念与简单使用。下一篇文章将继续针对RabbitMQ进行总结。主要内容包括何时创建队列、RabbitMQ的确认机制、过期时间的使用、死信队列、以及利用RabbitMQ实现延迟队列......

 

《RabbitMQ实战 高效部署分布式消息队列》

我要回帖

更多关于 rabbitmq面试题 的文章

 

随机推荐