?
RabbitMQ整個SpringBoot
SpringBoot因其配置簡單、快速開發,已經成為熱門的開發之一
rabbitmq docker、?
消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息
而消費者從消息隊列中消費信息.具體過程如下:
rabbitmq部署。
?
從上圖可看出,對于消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念
生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,并且當消息隊列收到消息之后,
接收消息隊列傳來的消息,并且給予相應的處理.消息隊列常用于分布式系統之間互相信息的傳遞.
?
?
?
使用SpringBoot進行整合RabbitMQ
1.pom文件的引入
這是操作RabbitMQ的starter必須要進行引入的
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
?
2.配置文件進行基礎的配置
spring.rabbitmq.virtual-host=/user spring.rabbitmq.port=5672 spring.rabbitmq.password=user spring.rabbitmq.username=user spring.rabbitmq.host=192.168.43.157
?
?
RabbitMQ的模式
1、direct模式
配置Queue(消息隊列).那注意由于采用的是Direct模式,需要在配置Queue的時候,指定一個鍵
使其和交換機綁定.
DirectQueue.java
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class DirectQueue {//若隊列不存在則進行創建隊列
//返回的是隊列名字@Beanpublic Queue queue(){return new Queue("direct_queue");} }
?
消息生產者
Sender.java
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class Sender {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(){String msg = "direct_queue";User user = new User();user.setName("MrChegns");user.setAge(12);amqpTemplate.convertAndSend("direct_queue",user);}}
?
此時發送的消息是一個User類型的對象
對于發送對象需要實現序列化接口
User.java
package com.cr.rabbitmqs.direct; import java.io.Serializable; public class User implements Serializable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public User(String name, int age) {this.name = name;this.age = age;}public User() {}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';} }
?
消費者
Receive.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Receive {//對隊列進行監聽
//同時可以監聽多個隊列 @RabbitListener(queues = "direct_queue")public void listen(User msg){System.out.println(msg);} }
?
測試:
@Autowiredprivate Sender sender;@Testpublic void test1(){sender.send();}
?
得到的結果i:
?
?
?
?2、topic模式
?首先我們看發送端,我們需要配置隊列Queue,再配置交換機(Exchange)
再把隊列按照相應的規則綁定到交換機上
Topic.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class Topic {//創建隊列@Bean(name = "message")public Queue Aqueue(){return new Queue("message.topic");}@Bean(name = "message1")public Queue BQueue(){return new Queue("message.topics");}//交換機//若不存在則進行創建交換機 @Beanpublic TopicExchange exchange(){return new TopicExchange("topic_exchange");}//交換機和隊列進行綁定@BeanBinding bindingExchangeTopic(@Qualifier("message")Queue message,TopicExchange exchange){return BindingBuilder.bind(message).to(exchange).with("message.topic");}@BeanBinding bindingExchangeTopics(@Qualifier("message1")Queue message,TopicExchange exchange){return BindingBuilder.bind(message).to(exchange).with("message.#");} }
?
?消費者
Receive1.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Receive1 { @RabbitListener(queues = "message.topic")public void tes(User user){System.out.println( "user1111:" + user);} }
?
Receive2.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Receive2 { @RabbitListener(queues = "message.topics")public void tes(User user){System.out.println("user222:" + user);} }
?
消息生產者:
TopicSend.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class TopicSend {@Autowiredprivate AmqpTemplate amqpTemplate;//發送消息public void send(){User user = new User("name",12);amqpTemplate.convertSendAndReceive("topic_exchange","message.dev",user);}//發送消息public void send1(){
User user = new User("name",12);
amqpTemplate.convertSendAndReceive("topic_exchange","message.topic",user );
}
}
?
在開發中這種模式的使用還是相對比較多的,此時測試的是兩種方法
一個方法所有的隊列都可以進行獲取
一個方法只有一個隊列可以獲取到消息
?
?測試:
@Autowiredprivate TopicSend topicSend;@Testpublic void ttt(){topicSend.send();}
?
?測試:
@Autowiredprivate TopicSend topicSend;@Testpublic void ttt(){topicSend.send1();}
?
后臺查看交換機和隊列的綁定關系以機相關的路由鍵
?
?
?3、fanout
?那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發送到路由器的消息會使
得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中
convertAndSend方法的參數2),也會被忽略!那么直接上代碼,發送端配置如下:
Fanout.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class Fanout {//隊列//如果隊列不存在會自動創建隊列@Beanpublic Queue queueA(){return new Queue("queueA");} @Beanpublic Queue queueB(){return new Queue("queueB");}@Beanpublic Queue queueC(){return new Queue("queueC");}//交換機//如果交換機不存在會自動創建隊列 @Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}//將交換機和隊列進行綁定 @BeanBinding bindingExchangequeueA(Queue queueA,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueA).to(fanoutExchange);}@BeanBinding bindingExchangequeueB(Queue queueB,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueB).to(fanoutExchange);}@BeanBinding bindingExchangequeueC(Queue queueC,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueC).to(fanoutExchange);} }
?
?消費者:
FanoutReceive.java
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component //監聽器 @RabbitListener(queues = "queueA") public class FanoutReceive {//監聽的方法@RabbitHandlerpublic void listen(String msg){System.out.println("queueA" + msg);}}
?
?
FanoutSender.java?
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class FanoutSender { @Autowiredprivate AmqpTemplate amqpTemplate;//發送消息public void send(){String msg = "test fanout....";//發送消息:參數依次是 交換機名字--路由鍵(此時設置路由鍵沒有作用)--消息amqpTemplate.convertAndSend("fanoutExchange","",msg);} }
?
測試:
@RunWith(SpringRunner.class) @SpringBootTest public class BpptandrabbitmqApplicationTests {//測試fanout @Autowiredprivate FanoutSender fanoutSender;@Testpublic void fanout() {fanoutSender.send();}}
?
?此時3個隊列都能接收到消息
?
交換機、隊列以及路由鍵
?
?