客户端用的ActiveMQ,能连接JBoss发布的JMS消息吗

博客分类:
activemq代理
JMS代理(如ActiveMQ broker)的主要作用是为客户端程序提供一种通信机制.为此,ActiveMQ提供一种连接机制,这种连接机制使用传输连接器(transport connector)实现客户端与代理(client-to-broker)之间的通信;使用网络连接器(network connector)实现代理与代理之间的通信.代理可以简单的看出启动了一个socket监听,一个jms中间件可以启动多个通信监听。另外activemq本身是通过java实现的,我们也可以通过编程方式,在代码中启动activemq的代理器。连接机制使用传输连接器(transport connector)实现客户端与代理(client-to-broker)之间的通信;使用网络连接器(network connector)实现代理与代理之间的通信,网络连接起在做集群的时候可能要用到的。
连接配置
文件conf/activemq-demo.xml提供的示例有一段内容
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
&transportConnectors&
&!-- Create a TCP transport that is advertised on via an IP multicast
group named default. --&
&transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/&
&!-- Create a SSL transport. Make sure to configure the SSL options
via the system properties or the sslContext element. --&
&transportConnector name="ssl" uri="ssl://localhost:61617"/&
&!-- Create a STOMP transport for STOMP clients. --&
&transportConnector name="stomp" uri="stomp://localhost:61613"/&
&!-- Create a Websocket transport for the websocket dmeo --&
&transportConnector name="ws" uri="ws://localhost:61614/" /&
&/transportConnectors&
可以看出,一个transportConnectors可以包含多个transportConnector ,他们的name和uri是唯一的。discoveryUri 是可选的。如上,定义了4个连接器。
ActiveMq支持的网络协议
协议描述TCP默认的协议,性能相对可以NIO基于TCP协议之上的,进行了扩展和优化,具有更好的扩展性UDP性能比TCP更好,但是不具有可靠性SSL安全链接HTTP(S)基于HTTP或者HTTPSVMVM本身不是协议,当客户端和代理在同一个Java虚拟机(VM)中运行时,他们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式
TCP
是ActiveMQ默认的协议,访问方式是
tcp://hostname:port?key=value&key=value
&transportConnectors&
&transportConnector name="tcp"
uri="tcp://localhost:61616?trace=true"/&
&/transportConnectors&
其中trace是active提供的参数,用于打开日志,记录每次的通过该链接通道的命令。
NIO
非阻塞方式的IO访问,基于TCP,但是提供了更好的性能,对于大访问量时,可以考虑
访问及配置
nio://hostname:port?key=value
&transportConnectors&
&transportConnector
name="tcp"
uri="tcp://localhost:61616?trace=true" /&
&transportConnector
name="nio"
uri="nio:localhost:61618?trace=true" /&
&/transportConnectors&
UDP
并不保证数据有效性,如在线游戏等情况。
udp://hostname:port?key=value
&transportConnectors&
&transportConnector
name="tcp"
uri="tcp://localhost:61616?trace=true"/&
&transportConnector
name="udp"
uri="udp://localhost:61618?trace=true" /&
&/transportConnectors&
SSL
ssl://hostname:port?key=value
&transportConnectors&
&transportConnector name="ssl" uri="ssl://localhost:61617?trace=true" /&
&/transportConnectors&
默认的安全证书位于conf的broker.ts,和broker.ks文件该证书为示例证书,如果要使用SSL协议,需要自己生成相应的环境下的证书。
对应的客户端证书是client.ts,和client.ks
在访问SSL链接时,需要指定使用的证书文件(jvm系统属性)
java
-Djavax.net.ssl.keyStore=${ACTIVEMQ_HOME}/conf/client.ks \
-Djavax.net.ssl.keyStorePassword=password \
-Djavax.net.ssl.trustStore=${ACTIVEMQ_HOME}/conf/client.ts \
如果指定使用了其他的证书,在服务端配置部分,需要改成
&broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.base}/data"&
[b]&sslContext
keyStore="file:${activemq.base}/conf/mybroker.ks"
keyStorePassword="test123"/&[/b]
&/sslContext&
&transportConnectors&
&transportConnector name="ssl"
uri="ssl://localhost:61617" /&
&/transportConnectors&
启用/禁用 SSL加密套件
ActiveMQ的SSL传输连接器使用的SSL加密套件(SSL cipher suites)由JVM提供.关于加密套件的详细信息
请参考SUN的JSSE文档。每种加密套件的实现等都不一样,从5.4.0版本的ActiveMQ开始支持新的参数transport.enabledCipherSuites
&transportConnectors&
&transportConnector
name="ssl"
uri="ssl://localhost:61617?transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA" /&
&/transportConnectors&
如上使用的是SSL_RSA_WITH_RC4_128_SHA套件。可以开启多个加密套件,使用逗号分割。
HTTP
http://hostname:port?key=value
&transportConnectors&
&transportConnector name="tcp"
uri="tcp://localhost:61616?trace=true"/&
&transportConnector name="http"
uri="http://localhost:8080?trace=true" /&
&/transportConnectors&
http需要添加几个依赖包
commons-httpclient
xstream
xmlpull
虚拟机内部通信
Java应用程序中的VM传输连接器用于启动并连接到一个内嵌的代理.使用VM传输连接器意味着客户端和内嵌带代理之间不需要网络连接,通过直接调用代理对象的方法来实现通信.因为使用VM连接器后不需要网络协议的参与,所以性能显著提高.使用VM协议首次连接到代理时会启动代理,同一个虚拟机中所有后续的VM传输连接都将连接到这个代理.使用VM协议的代理具有标准ActiveMQ代理的所有特性.当所有使用VM传输连接到代理的客户端都关闭连接后,代理自动关闭.
配置VM连接器的URI语法如下:
vm://brokerName?key=value
如vm://broker1?marshal=false&broker.persistent=false
URI中brokerName配置非常重要,它是代理的唯一标识.例如,你可以通过配置两个不同的broker name创建两个不同的嵌入式代理,必须保证brokerName是唯一的。
还可以嵌入URI的方式
vm:broker:(transportURI,network:networkURI)/brokerName?key=value

配置的URI是
vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=false
我们定义了一个名称为embeddedBroker的代理,同时配置了一个TCP连接器在6000端口
监听连接,而同一个jvm内运行的程序可以通过vm连接,不同jvm的外部程序,可以通过tcp://localhost:6000连接到这个代理。
通过配置传输选项的brokerConfig参数,在URI中指定activemq.xml作为配置文件,
可以使用外部配置文件来配置嵌入式代理.下面是配置例子:
vm://localhost?brokerConfig=xbean:activemq.xml
该实例的配置会在类路径中根据xbean:协议查找activemq.xml文件.通过这种方式,使用XML作为配置文件
可以像配置一个独立的代理那样来配置嵌入式代理.
网络连接器
集群的代理网络,包含了多个MQ实例。默认是单向的,代理只将其收到的消息发送到收消息连接另一头的代理,如A指接受B的代理信息,但A并不会发送消息到B,而只发送到B之外的其他。通常称为转发桥。
另外还有一种是双向的A即会接受来自B的,也会把消息发送给B。
&networkConnectors&
&networkConnector name="default-nc" uri="multicast://default"/&
&/networkConnectors&
name和uri是必须的。
侦测.通常,侦测是一种搜寻远程代理
服务的过程.通常,客户端希望能够侦测到所有可用的代理.另一方面,代理也希望能够侦测到其他可用的代理,
以便可以和他们建立一个代理网络.
静态网络连接器
静态网络连接器用于为一个网络中多个代理创建静态配置.这种配置协议使用了一种复合的URI--即包含其他URI的URI.
下面是静态协议使用的URI语法:
static:(uri1,uri2,uri3,...)?key=value
&networkConnectors&
&networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/&
&/networkConnectors&
示例
代理B配置
&broker xmlns="http://activemq.apache.org/schema/core" brokerName="BrokerB" dataDirectory="${activemq.base}/data"&
&transportConnectors&
&transportConnector name="openwire" uri="tcp://localhost:61617" /&
&/transportConnectors&
&/broker&
启动代理B
${ACTIVEMQ_HOME}/bin/activemq console xbean:src/main/resources/org/apache/activemq/book/ch4/brokerB.xml
代理A配置
&broker xmlns="http://activemq.apache.org/schema/core" brokerName="BrokerA" dataDirectory="${activemq.base}/data"&
&transportConnectors&
&transportConnector name="openwire" uri="tcp://localhost:61616" /&
&/transportConnectors&
&networkConnectors&
&networkConnector uri="static:(tcp://localhost:61617)" /&
&/networkConnectors&
启动代理A
${ACTIVEMQ_HOME}/bin/activemq console \xbean:src/main/resources/org/apache/activemq/book/ch4/brokerA.xml
当消息发送到代理A,代理A会向B也发送消息,消息消费者可以通过代理B接收到消息。
连接失败
失效转移连协议,有两种实现,静态的代理和动态侦测语法,
failover:(uri1,...,uriN)?key=value
或者failover:uri1,...,uriN,如
failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
详细查看http://activemq.apache.org/failover-transport-reference.html
默认情况是先随机选择一个连接器,如果该连接器无效,那么会选择下一个。
失效转移
连接器URI:是在客户端方面的协议,客户端在链接代理时需要保证链接的可用和可靠。实际上就算客户端只会连接一个代理,也应该使用failover配置通信协议,保证网络中断等问题时会自动重连。
动态网络
这种动态技术可以实现让客户端和代理之间,代理和代理之间实现动态识别,而不是配置静态的IP组。
多点传送协议,代理会广播自己的服务,也会定位其他代理。同理客户端可以通过多点协议来接收广播,识别出代理。
多址传送协议的URI语法如下:
multicast://ipadaddress:port?key=value
&broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data"&
&networkConnectors&
&networkConnector name="default-nc" uri="multicast://default"/&
&/networkConnectors&
&transportConnectors&
&transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/&
&/transportConnectors&
在上面的例子中,使用群组名称"default"来替代具体的IP地址.上面的配置代码片段中有两个地方比较重要.首先,transport connector的discoveryUri属性用于暴露这个传输连接器的URI到名称为default的群组中.所有的希望查找可用代理的客户端都可以使用这个代理。
network connector的uri属性用于查找可用的代理并与之建立代理网络.这样配置后,代理就像客户端一样,使用多点传送协议来查找其他代理。
,移除discoveryUri属性,客户端就无法通过多点协议扫描到代理,
用多点传送协议的一个缺点是侦测是自动的.如果你不想把某个代理添加到群组中,你必须十分小心的设置。
客户端的自动侦测,通信语法是
discovery:(discoveryAgentURI)?key=value,
,如discovery:(multicast://default)
将会自动侦测组名为default的代理。
上面的配置&transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/&
将会被侦测到。
点对点协议
点对点连接器是一种传输连接器网络,是构成点对点的嵌入式代理网络中所有虚拟连接器的集合.
点对点协议的URI语法如下:
peer://peergroup/brokerName?key=value
使用一个点对点协议的RUI启动应用程序会自动启动一个嵌入式代理(就像使用VM协议一样),同时还会配置代理以和相同组名的代理之间建立网络连接.VM也会启动一个内置代理器。
peer://group1
如上客户端配置的通信协议如果是这样的话,那么会启动一个内嵌代理,并且客户端会和该内嵌代理通信,而内嵌代理会与组名是group1的代理之间建立网络通信。
FANOUT连接器
Fanout是一种通信器群组,用于使得客户端可以同时连接到多个代理并对这些代理进行相同的操作.
Fanout协议的URI语法如下:
fanout:(fanoutURI)?key=value
fanoutURI值可以使用静态的URI或者多点传送URI.参考下面的示例:
fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616))
客户端将尝试连接掉3个使用静态协议配置的静态代理.
使用动态效果
fanout:(multicast://default)
此协议的目的是发送消息到多个代理.其次,如果你使用的代理属于同一个代理网络,那么指定的消息消费者可能会接收到重复的消息.因此,通常情况下,fanout协议仅用于发布消息到多个相互之间没有连接在一起的代理.即多个代理之间的独立的。
总结
Protocol&&&&&&&&&&& Description
协议&&&&&&&&&&&&&&& 描述
Static&&&&&&&&& Used for defining networks of brokers with known addressess
静态协议&&&&&&& 用于定义地址已知的代理之间的网络
Failover&&&&&&& Used to provide reconnection logic for clients to the network of brokers or a single broker
失效重连协议&&& 用于为客户端提供自动重连逻辑,以便客户端能够重新连接到一个代理网络或者单个代理
Multicast&&&&&& Used for defining dynamic networks of brokers (broker addresses are not statically defined)
多点传送协议&&& 用于定义一个动态的代理网络(代理的地址无需静态指定)
Discovery&&&&&& Used by clients to connect to dynamic network of brokers
自动侦测协议&&& 客户端用来连接到动态网络代理
Peer&&&&&&&&&&& Used to easily connect multiple embedded brokers
点对点协议&&&&& 用于方便的连接到多个嵌入式代理
Fanout&&&&&&&&& Used to produce messages to multiple unconnected brokers
扇出协议&&&&&&& 用于发送消息到多个未互联的代理
浏览 24953
浏览: 549784 次
来自: 江西上饶
说的不好,没人看的
非常详细的注解~
spring mvc demo教程源代码下载,地址:http: ...
PandaDONG 写道谢谢你啊,我已经下下来了,只是还有很多 ...
PandaDONG 写道谢谢你啊,我已经下下来了,只是还有很多 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'ActiveMQ 5.12.0 发布,JMS 消息服务器
ActiveMQ 5.12.0 发布,此版本解决了一些,改进了
支持,修复了
相关的问题。下载:改进列表:
Supports a variety of&&from Java, C, C++, C#, Ruby, Perl, Python, PHP
&for high performance clients in Java, C, C++, C#
so that clients can be written easily in C, Ruby, Perl, Python, PHP,
ActionScript/Flash, Smalltalk to talk to ActiveMQ as well as any other
popular Message Broker
&v1.0 support
&v3.1 support allowing for connections in an IoT environment.
full support for the&&both in the JMS client and the Message Broker
Supports many&&such as&,&,&&and&
Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging &so that ActiveMQ can be easily embedded into Spring applications and configured using Spring's XML configuration mechanism
Tested inside popular J2EE servers such as&,&, JBoss, GlassFish and WebLogic
Includes&&for inbound & outbound messaging so that ActiveMQ should auto-deploy in any J2EE 1.4 compliant server
Supports pluggable&&such as&, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports
Supports very fast&&using JDBC along with a high performance journal
Designed for high performance clustering, client-server, peer based communication &API to provide technology agnostic and language neutral web based API to messaging &to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric &so that ActiveMQ can be easily dropped into either of these web service stacks to provide reliable messaging
Can be used as an in memory JMS provider, ideal for&更多内容请看。ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的
JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ 的详细介绍:
ActiveMQ 的下载地址:
转载请注明:文章转载自 开源中国社区
本文标题:ActiveMQ 5.12.0 发布,JMS 消息服务器
本文地址:您所在位置: &
&nbsp&&nbsp&nbsp&&nbsp
一头扎进JMS之ActiveMQ教程.docx 8页
本文档一共被下载:
次 ,您可全文免费在线阅读后下载本文档。
下载提示
1.本站不保证该用户上传的文档完整性,不预览、不比对内容而直接下载产生的反悔问题本站不予受理。
2.该文档所得收入(下载+内容+预览三)归上传者、原创者。
3.登录后可充值,立即自动返金币,充值渠道很便利
一头扎进JMS之ActiveMQ教程
你可能关注的文档:
··········
··········
一头扎进JMS之ActiveMQ第一讲
JMS规范介绍
JMS (Java平台上的专业技术规范) 编辑
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。
JMS专业技术规范
JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
3.JMS体系架构
JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户
生产或消费基于消息的Java的应用程序或对象。
JMS生产者
创建并发送消息的JMS客户。
JMS消费者
接收消息的JMS客户。
JMS消息
包括可以在JMS客户之间传递的数据的对象
JMS队列
一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题
一种支持发送消息给多个订阅者的机制。
4.JMS对象模型
1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型:
① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
5.JMS模型
点对点或队列模型
发布者/订阅者模型
6.传递方式
JMS有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。
7.应用程序
ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
Session 接口(会话)
表示一个单线
正在加载中,请稍后...博客分类:
当我们清楚了以后内容后,现在我们来用JBoss实现一个例子来加深对JBoss和JMS的了解。
在上面叙述中,我们知道明确使用JMS
provider有三个基本的事情要做:配置JNDI初始化上下文,连接工厂的名字和使用目的地的名字。
当编写产品的最好的事情是不受provider-specific
影响,使代码能在不同的JMS
provider之间容易移植。在此这个例子没有聚焦在开发产品上,而是解释如何使用JbossMQ来工作。
1) 初始化上下文
配置JNDI的一个方法是通过属性文件jndi.properties。在这个文件中使用正确的值,并且把它所在的路径包含到classpath中,它比较容获得正确初始化上下文。
jndi.properties文件的内容如下:
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
把该文件放置的路径成为你的classpath的一部分。如果你使用这种方法,在初始化上下文时,代码比较简单:
Context context = new IntialContext();1
在某些情景下,可能需要手工配置JNDI;例如当运行的类文件中环境已经配置了一个初始化上下文,但不是你想用的上下文时,需要手工来配置一个上下文。设置在哈希表中的几个属性值,并且使用此哈希表来实例化一个上下文。定义语法:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
2) 查找连接工厂
自有了上下文后,需要查找一个连接工厂。为了查找它,使用一个可用的名字。查找连接工厂的代码如下:
对于一个topic目的地
TopicConnectionFactory topicFactory = (TopicConnectionFactory)
context.lookup (“ConnectionFactory”)
Queue 目的地:
QueueConnectionFactory queueFactory = (QueueConnectionFactory )
context.lookup (“ConnectionFactory”)
3) 建立连接和会话
在我们有了连接工厂后,建立一个连接,在此连接中建立一个会话。
对于topic代码如下:
//建立一个连接
topicConnection = topicFactory.createTopicConnection();
//建立一个会话
topicSession = topicConnection.createTopicSession(false,
//不需要事务
Session.AUTO_ACKNOLEDGE //自动接收消息的收条。
对于queue代码如下:
//建立一个连接
queueConnection = queueFactory.createQueueConnection();
//建立一个会话
queueSession = queueConnection .createQueueSession(false,
//不需要事务
Session.AUTO_ACKNOLEDGE //自动接收消息的收条。
一个会话建立时,配置是否调用事务
在事务Session中,当事务被提交后,自动接收,如果事务回滚,所有的被消费的消息将会被重新发送。
在非事务Session中,如果没有调用事务处理,消息传递的方式有三种:
Session.AUTO_ACKNOWLEDGE
:当客户机调用的receive方法成功返回,或当MessageListenser
成功处理了消息,session将会自动接收消息的收条。
Session.CLIENT_ACKNOWLEDGE
:客户机通过调用消息的acknowledge方法来接收消息。接收发生在session层。接收到一个被消费的消息时,将自动接收该session已经
消费的所有消息。例如:如果消息的消费者消费了10条消息,然后接收15个被传递的消息,则前面的10个消息的收据都会在这15个消息中被接收。
Session.DUPS_ACKNOWLEDGE :指示session缓慢接收消息。
4) 查找目的地
现在我们来介绍建立publishes/sends
或subscribles/receives消息。
下面的代码列出来查找一个目的地:
对于topic 查找一个testTopic目的地
Topic topic = (Topic) context.lookup(“topic/testTopic”);
对于queue 查找一个testQueue目的地
Queue queue= (Queue) context.lookup(“queue/testQueue”);
注意:JbossM的前缀topic/ (queue/)通常被放在topic
(queue)名字前面。
在JMS中,当客户机扮演每种角色,象对于topic来将的publisher
,subscriber 或对于queue来将的sender, receiver,
都有自己不同类继承和不同函数。
5) 建立一个消息制造者Message ProdUCer (topic publisher/ queue
消息制造者是一个由session创建的对象,主要工作是发送消息到目的地。
对于一个topic,需要通过TopicSession来创建一个TopicPublisher。代码如下:
TopicPublisher topicPublisher =
TopicSession.createPublisher(topic);
对于一个queue,需要通过QueueSession来创建一个QueueSender。代码如下:
QueuePublisher queuePublisher =
queueSession.createSender(queue);
6) 消息发送
建立一个TestMessage并且publish 它, 代码:
TextMessage message = topicSession.createTestMessage();
message.setText(msg);
topicPublishe.publish(topic, message);
建立一个TestMessage并且send它, 代码:
TextMessage message = queueSession.createTestMessage();
message.setText(msg);
queueSender.send(queue, message);
7) 下面是一个完成的topic publisher
代码,文件名HelloPublisher.java :
import javax.naming.C
import javax.naming.InitialC
import javax.naming.NamingE
import javax.jms.TopicConnectionF
import javax.jms.TopicC
import javax.jms.TopicS
import javax.jms.TopicP
import javax.jms.T
import javax.jms.TextM
import javax.jms.S
import javax.jms.JMSE
import java.util.H
public class HelloPublisher {
TopicConnection topicC
TopicSession topicS
TopicPublisher topicP
public HelloPublisher(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable props=new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext(props);
TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)context.lookup(topicJNDI);
topicPublisher = topicSession.createPublisher(topic);
public void publish(String msg) throws JMSException {
TextMessage message = topicSession.createTextMessage();
message.setText(msg);
topicPublisher.publish(topic, message);
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
public static void main(String[] args) {
HelloPublisher publisher = new HelloPublisher(
"ConnectionFactory", "topic/testTopic");
for (int i = 1; i & 11; i++) {
String msg = "Hello World no. " +
System.out.println("Publishing message: " + msg);
publisher.publish(msg);
publisher.close();
} catch(Exception ex) {
System.err.println(
"An exception occurred while testing HelloPublisher25: " + ex);
ex.printStackTrace();
我们知道,使用JMS不仅能发送(send)/发布(publish)消息,也能获得(send)/发布(publish)的消息。在时间方式有良种方法来做:
同步(Synchronously):需要手工的去得到消息,为了得到一个消息客户机调用方法得到消息,直到消息到达或在规定的时间内没有到达而超时。我们在例子中没有说明这部分,大家可以实验一下。
异步(Asynchronously):你需要定义一个消息监听器(MessageListener),实现该接口。当消息达到时,JMS
provider通过调用该对象的 onMessage方法来传递消息。
从原则来将,topic和queue都是异步的,但是在这两种目的地中有不同的类和方法。首先,必须定义一个MessageListener接口。
8) 建立一个MessageListener
在建立了你需要的subscriber/receiver,并且登记了监听器后。就可以调用连接的start方法得到JMS
发送到的消息了。如果在登记监听器之前调用start方法,很可能会丢失消息。
public void onMessage(Message m) {
String msg = ((TextMessage)m).getText();
System.out.println("HelloSubscriber got message: " + msg);
} catch(JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
9) 建立消息消费者
对于topic来将:
//建立一个订阅者
topicSubscriber = topicSession.createSubscriber(topic);
//设置消息监听器,
topicSubscriber.setMessageListener(this)
//连接开始
topicConnection.start();
对于queue来将:
//建立一个订阅者
queueReceiver = queueSession.createReceiver(queue);
//设置消息监听器,
queueReceiver .setMessageListener(this)
//连接开始
queueConnection.start();
10) 完整的代码,放在文件HelloSubscriber.java中,如下:
import javax.naming.C
import javax.naming.InitialC
import javax.naming.NamingE
import javax.jms.TopicConnectionF
import javax.jms.TopicC
import javax.jms.TopicS
import javax.jms.TopicS
import javax.jms.T
import javax.jms.M
import javax.jms.TextM
import javax.jms.S
import javax.jms.MessageL
import javax.jms.JMSE
public class HelloSubscriber implements MessageListener {
TopicConnection topicC
TopicSession topicS
TopicSubscriber topicS
public HelloSubscriber(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException
Context context = new InitialContext();
TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)context.lookup(topicJNDI);
topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
System.out.println(
"HelloSubscriber subscribed to topic: " + topicJNDI);
topicConnection.start();
public void onMessage(Message m) {
String msg = ((TextMessage)m).getText();
System.out.println("HelloSubscriber got message: " + msg);
} catch(JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
public static void main(String[] args) {
HelloSubscriber subscriber = new HelloSubscriber(
"TopicConnectionFactory",
"topic/testTopic");
} catch(Exception ex) {
System.err.println(
"An exception occurred while testing HelloSubscriber: " + ex);
ex.printStackTrace();
11) 编辑、运行程序
直接使用命令(java)
w 开启命令操作符。设置classpath :
classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.C:jboss-3.0.6_tomcat-4.1.18clientlog4j..
w 首先运行订阅消息端:java HelloSubscriber
w 再开启另外一个命令窗口设置classpath :
classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.C:jboss-3.0.6_tomcat-4.1.18clientlog4j..
w 运行发布消息端:java HelloPublisher
在最后我们解释JBoss-specific特性:如何用代码来管理目的地。JBoss各个版本可能不同,但是差别不大。我使用的是jboss3.0.6。
实现这个目的有两种不同的方法,依赖于是否代码是在和JBoss同样的虚拟机还是独立独立的。它们都包括调用一个通过service=DestinationManager
登记的JMX Bean。这个Mbean
有四个方法来管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
在代码中实现管理目的地在影射怎样调用MBean有不同的地方。如果程序虚拟机和Mbean服务器一样,可以直接调用。
建立一个topic 目的地的代码如下:
MBeanServer server = (MBeanServer)
MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new ObjectName("JBossMQ", "service",
"DestinationManager"),
new Object[] { “myTopic” },
new String[] { "java.lang.String" });
如果程序和Mbean服务器的虚拟机不同,需要通过一个JMX
adapter。一个JMX adapter是一个HTML
GUI。用程序通过URL来调用Mbean。代码如下:
import java.io.InputS
import java.net.URL;
import java.net.HttpURLC
import javax.management.MBeanServerF
import javax.management.MBeanS
import javax.management.ObjectN
import javax.jms.T
import javax.jms.Q
public class DestinationHelper {
static final String HOST = "localhost";
static final int PORT = 8080;
static final String BASE_URL_ARG = "/jmx-console/HtmlAdaptor?";
public static void createDestination(Class type, String name)
throws Exception
String method =
if (type == Topic.class) { method = "createTopic"; }
else if (type == Queue.class) { method = "createQueue";}
invoke(method, name);
public static void destroyDestination(Class type, String name)
throws Exception
String method =
if (type == Topic.class) { method = "destroyTopic"; }
else if (type == Queue.class) { method = "destroyQueue";}
invoke(method, name);
protected static void invoke(String method, String destName)
throws Exception
MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next();
invokeViaMBean(method, destName);
}catch(Exception ex) { invokeViaUrl(method, destName);}
protected static void invokeViaUrl(String method, String destName)
throws Exception
String action = "action=invokeOp&methodIndex=6&name=jboss.mq%3Aservice%3DDestinationManager&arg0=" + destN
String arg = BASE_URL_ARG +
URL url = new URL("http", HOST, PORT, arg);
HttpURLConnection con = (HttpURLConnection)url.openConnection();
con.connect();
InputStream is = con.getInputStream();
java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
byte[] buff = new byte[1024];
int size = is.read( buff );
if (size == -1 ) { }
os.write(buff, 0, size);
os.flush();
if (con.getResponseCode() != HttpURLConnection.HTTP_OK ) {
throw new Exception ("Could not invoke url: " + con.getResponseMessage() );
System.out.println("Invoked URL: " + method + " for destination " + destName + "got resonse: " + os.toString());
protected static void invokeViaMBean(String method, String destName)
throws Exception
MBeanServer server = (MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"),
new Object[] { destName },
new String[] { "java.lang.String" });
public static void main(String[] args) {
if (args.length &0){
destroyDestination(Topic.class,"myCreated");
createDestination(Topic.class,"myCreated");
}catch(Exception ex) {
System.out.println("Error in administering destination: " + ex);
ex.printStackTrace();
编辑命令:
javac -classpath
C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx..
DestinationHelper.java
java -classpath
C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx..
DestinationHelper
当运行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
表明你建立成功。当JBoss关闭重新启动时。该目的地不会在存在。
lujinan858
浏览: 343934 次
来自: 大连
long t1 = System.nanoTime();
Sev7en_jun 写道 ExternalInterface ...
ExternalInterface.call(&fu ...
Incorrect syntax near 'fddActiv ...
Alter table TestItem drop COLUM ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

 

随机推荐