activemq发布订阅

 2023-09-05 阅读 47 评论 0

摘要:2019独角兽企业重金招聘Python工程师标准>>> 关键代码,创建topic Destination destination = session.createTopic("topic1"); 发布者: packagecom.sniper.jms.topic;importjavax.jms.Connection; importjavax.jms.ConnectionFactory; im

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

关键代码,创建topic

Destination destination = session.createTopic("topic1");

发布者:

package com.sniper.jms.topic;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 发布者* @author audaque**/
public class Sender {public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session = null;try {// 实例化连接工厂connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");connection = connectionFactory.createConnection(); // 通过连接工厂获取连接connection.start(); // 启动连接session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session,没有事务// 消息的目的地Destination destination = session.createTopic("topic1");// 创建消息生产者MessageProducer messageProducer = session.createProducer(destination); //设置消息不做持久化messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 发送消息for(int i=0; i<2; i++){TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i);System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);messageProducer.send(message);}} catch (Exception e) {e.printStackTrace();} finally{if(connection!=null){try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

订阅者1:

package com.sniper.jms.topic;import java.util.concurrent.TimeUnit;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 订阅者1* @author audaque**/
public class Receiver1 {public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session = null;try {// 实例化连接工厂connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");connection = connectionFactory.createConnection();  // 通过连接工厂获取连接connection.start(); // 启动连接//自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建SessionDestination destination = session.createTopic("topic1");MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {if(message != null){TextMessage textMessage = (TextMessage)message;try {System.err.println("收到的消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}} catch (JMSException e) {e.printStackTrace();} finally {if(connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

订阅者2:

package com.sniper.jms.topic;import java.util.concurrent.TimeUnit;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 订阅者2* @author audaque**/
public class Receiver2 {public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session = null;try {// 实例化连接工厂connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");connection = connectionFactory.createConnection();  // 通过连接工厂获取连接connection.start(); // 启动连接//自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建SessionDestination destination = session.createTopic("topic1");MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {if(message != null){TextMessage textMessage = (TextMessage)message;try {System.err.println("Receiver2收到的消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}} catch (JMSException e) {e.printStackTrace();} finally {if(connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}


转载于:https://my.oschina.net/sniperLi/blog/632969

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/1215.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息