ActiveMQ消息队列

介绍

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

什么是JMS规范

  JMS的全称是Java MessageService,即Java消息服务。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
  它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。
  对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage – Java原始值的数据流
  • MapMessage–一套名称-值对
  • TextMessage–一个字符串对象
  • ObjectMessage–一个序列化的 Java对象
  • BytesMessage–一个字节的数据流

JMS应用程序接口

  ConnectionFactory

    用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

  Connection

    连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

  Destination

    目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

  MessageProducer

    由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

  MessageConsumer

    由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

  Message

    是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

  Session

    表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

JMS消息发送模式

  在P2P模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。

  publish/subscribe模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

安装ActiveMQ

  1. 首先到官网 http://activemq.apache.org/ 下载ActiveMQ.

  2. 因为ActiveMQ是JAVA开发的,所以依赖jdk环境。

  3. 解压ActiveMQ。

  4. 在ActiveMQ/bin目录中,./activemq start 开启ActiveMQ

  5. 在ActiveMQ/bin目录中,./activemq stop 关闭ActiveMQ

  6. 访问后台 http://ip:8161/admin ActiveMQ的默认后台端口为8161,Message端口为61616

使用ActiveMQ

  使用ActiveMQ需要先引入ActiveMQ的jar包。

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>

以下示例使用Queue模式,如要使用Topic模式只需要将Destination改成Topic即可。

1.Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Test
public void testProducer() throws JMSException{
// 创建连接工厂
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://192.168.145.137:61616");
// 声明Connection
Connection connection = null;
// 声明Session
Session session = null;
// 声明Producer
MessageProducer producer = null;
try{
// 从连接工厂中获得连接
connection = connectionFactory.createConnection();
// 开启连接
connection.start();
/*
* 从连接中获得会话
* 参数1:transacted boolean型
* 当设置为true时,将忽略参数2,acknowledgment mode被jms服务器设置 SESSION_TRANSACTED。
* 当一个事务被提交时,消息确认就会自动发生。
* 当设置为false时,需要设置参数2
* Session.AUTO_ACKNOWLEDGE为自动确认,当客户成功的从receive方法返回的时候,或者从
* MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
* Session.CLIENT_ACKNOWLEDGE 为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的
* acknowledge方法。jms服务器才会删除消息。(默认是批量确认)
*/
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination目的地 Queue或者Topic
Queue queue = session.createQueue("testMessage");
// 创建一个Producer生产者
producer = session.createProducer(queue);
// 创建message
ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("test");
// 发送message
producer.send(textMessage);
}catch(Exception e){
e.printStackTrace();
}finally{
// 回收资源
producer.close();
session.close();
connection.close();
}
}

2.Consumer

  消费者有两种消费方式:

  1. 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

  2. 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
    实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

同步消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Test
public void testSyncConsumer() throws JMSException{
// 创建连接工厂
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://192.168.145.137:61616");
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try{
// 获得连接
connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 获得Session
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建一个目的地
Queue queue = session.createQueue("testSynchronization");
// 创建消费者
consumer = session.createConsumer(queue);
// 使用receive同步消费
while(true){
// 设置接收信息的时间,单位为毫秒
Message message = consumer.receive(10000);
if(message != null){
System.out.println(message);
}else{
// 超时,结束循环
break;
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
// 回收资源
consumer.close();
session.close();
connection.close();
}
}

异步消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Test
public void testAsyncConsumer() throws JMSException{
// 创建连接工厂
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://192.168.145.137:61616");
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try{
// 获得连接
connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 获得Session
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 设置目的地
Queue queue = session.createQueue("testAsynchronization");
// 创建Consumer
consumer = session.createConsumer(queue);
// 异步消费
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
}catch(Exception e){
e.printStackTrace();
}finally{
// 回收资源
consumer.close();
session.close();
connection.close();
}
}

整合Spring

1.配置ConnectionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<!-- ActiveMQ提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.145.137:61616" />
</bean>
<!-- Spring的ConnectionFactory需要注入ActiveMQ的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
</beans>

2.配置生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 注入Spring的连接工厂 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--P2P模式的Destination -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!-- publish/subscribe模式的Destination -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>

3.发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void testSend(){
// 读取Spring配置文件
ApplicationContext applicationContext =
new ClassPathXmlApplicationContext("applicationContext.xml");
// 获得JmsTemplate
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
// 获得Destination
ActiveMQQueue queue = applicationContext.getBean(ActiveMQQueue.class);
// 发送消息
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("send-spring");
}
});
}

4.配置消费者

  Spring通过MessageListenerContainer接收信息,并把接收到的信息分发给MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<!-- ActiveMQ提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.145.137:61616" />
</bean>
<!-- Spring的ConnectionFactory需要注入ActiveMQ的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 注入Spring的连接工厂 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--P2P模式的Destination -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!-- publish/subscribe模式的Destination -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.activemq.MyMessageListener" />
<!-- 消息监听容器 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>

监听器需要实现MessageListener接口。

1
2
3
4
5
6
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}

Exception

  启动ActiveMQ时,如果发生java.net.UnknownHostException异常。
解决方法:
修改 /etc/hosts 文件 添加一行 192.168.1.1(主机IP) 主机名.localdomain 主机名
例: 192.168.145.137 ActiveMQ.localdomain ActiveMQ

分享