一:POM文件
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.4.3</version>
</dependency>
二:創建一個消費者
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;public class TestConsumer implements com.rabbitmq.client.Consumer {private String name;public TestConsumer(String name) {this.name = name;}// 這個方法是回調方法@Overridepublic void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {System.out.println(name + ":" + new String(arg3));}@Overridepublic void handleConsumeOk(String consumerTag) {}@Overridepublic void handleCancelOk(String consumerTag) {}@Overridepublic void handleCancel(String consumerTag) throws IOException {}@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {}@Overridepublic void handleRecoverOk(String consumerTag) {}
}
三:創建一個用來消費的主函數,本例中交換機類型采用topic模式
public static void main(String[] args) throws Exception {Address address = new Address("ip", 5672);ConnectionFactory factory = new ConnectionFactory();factory.setUsername("帳號");factory.setPassword("密碼");Connection connection = factory.newConnection(Arrays.asList(address));Channel channel = connection.createChannel();channel.exchangeDeclare("交換機名字", BuiltinExchangeType.TOPIC);channel.queueDeclare("隊列名字", false, false, true, null);channel.queueBind("隊列名字", "交換機名字", "隊列名字");Consumer consumer = new TestConsumer("c1");// 注意此處,第二個參數表示自動消費,rabbitMQ按照官方所說默認就是// 自動消費,但是提供的java client默認是false(不自動消費),這就// 出現頁面很多unacked的現象channel.basicConsume("隊列名字",true, consumer);
}
四:創建一個發送消息的主函數,每隔1.5秒發送一條消息
public static void main(String[] args) throws Exception {Address address = new Address("ip", 5672);ConnectionFactory factory = new ConnectionFactory();factory.setUsername("帳號");factory.setPassword("密碼");Connection connection = factory.newConnection(Arrays.asList(address));Channel channel = connection.createChannel();channel.exchangeDeclare("交換機名", BuiltinExchangeType.TOPIC);channel.queueDeclare("隊列名字", false, false, true, null);channel.queueBind("隊列名字", "交換機名", "隊列名字");int i = 0;System.out.println("馬銀霜很好看");while (true) {Thread.sleep(1500);channel.basicPublish("交換機名", "隊列名字", null, ("-->msg " + i).getBytes(StandardCharsets.UTF_8));i++;}
}
rabbitmq java、日記: 關于Channel.basicAck(long deliveryTag,boolean multiple);該方法表示告知rabbit代理服務器消息已經收到,言外之意就是當需要手動設置ack的時候(channel.basicConsume第二個參數autoAck=false),才使用這個方法,如果不設置,則消息會顯示unacked狀態
Channel.basicAck(long deliveryTag,boolean multiple);
參數deliveryTag:消息編號,依次遞增,從0開始,過來一條消息,deliveryTag就會增加1,該參數在上文中的回調函數handleDelivery中的Envelope可以獲取到
參數multiple:true表示確認當前消息之前的所有消息,假如當前deliveryTag=666,那么0-666的消息將都會確認收到,false表示確認當前消息收到,實際中通常都設置成false
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态