下載erlang:http://www.erlang.org/downloads
RabbitMQ服務端使用并發式語言erlang編寫
創建ERLANG_HOME
環境變量
環境變量指向erlang的安裝目錄,將%ERLANG_HOME%\bin加入到Path中
驗證erlang安裝
輸入
erl
出現erlang的版本信息表示erlang語言環境安裝成功
下載RabbitMQ:http://www.rabbitmq.com/
默認的安裝目錄是
C:/Program Files/....
,這個目錄中是存在空格符的
RabbitMQ安裝目錄最好不要有空格,可以改變安裝目錄
驗證RabbitMQ安裝
- 命令行進入RabbitMQ的安裝目錄: cd rabbitmq_server-3.5.2\sbin
- 輸入
rabbitmqctl status
, 如果出現以下的圖,說明安裝成功,并且RabbitMQ Server已經啟動運行正常。
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin>rabbitmqctl status Status of node 'rabbit@Lifeix-pc' ... [{pid,9788}, {running_applications,[{rabbit,"RabbitMQ","3.6.5"}, {mnesia,"MNESIA CXC 138 12","4.12.1"}, {os_mon,"CPO CXC 138 46","2.2.15"}, {rabbit_common,[],"3.6.5"}, {xmerl,"XML parser","1.3.7"}, {ranch,"Socket acceptor pool for TCP protocols.", "1.2.1"}, {sasl,"SASL CXC 138 11","2.4"}, {stdlib,"ERTS CXC 138 10","2.1"}, {kernel,"ERTS CXC 138 10","3.0.1"}]}, {os,{win32,nt}}, {erlang_version,"Erlang/OTP 17 [erts-6.1] [64-bit] [smp:4:4] [async-threads:64]\n"}, {memory,[{total,48083008}, {connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,0}, {queue_procs,2704}, {queue_slave_procs,0}, {plugins,0}, {other_proc,22122752}, {mnesia,61640}, {mgmt_db,0}, {msg_index,51952}, {other_ets,979696}, {binary,23264}, {code,19851942}, {atom,711569}, {other_system,4277489}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{amqp,5672,"0.0.0.0"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,6851372646}, {disk_free_limit,50000000}, {disk_free,1131212800}, {file_descriptors,[{total_limit,8092}, {total_used,2}, {sockets_limit,7280}, {sockets_used,0}]}
安裝 RabbitMQ-Plugins
相當于管理界面,方便在瀏覽器界面查看RabbitMQ各個消息隊列以及exchange的工作情況
安裝方法:
- 打開命令行cd進入
rabbitmq的sbin目錄
(如:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin)- 輸入:
rabbitmq-plugins enable rabbitmq_management
命令,稍等會出現plugins安裝成功的提示,默認是安裝6個插件。
如果出現了下面的錯誤:
解決方法:
1. 命令行輸入:rabbitmq-service stop
2. 輸入rabbitmq-service remove
3. 輸入rabbitmq-service install
4. 輸入rabbitmq-service start
5. 重新輸入rabbitmq-plugins enable rabbitmq_management
驗證
插件安裝完后,在瀏覽器輸入
http://localhost:15672
驗證
用戶名:guest,密碼:guest
producer:消息生產者
consumer:消息消費者
virtual host:虛擬主機
在RabbitMQ中,用戶只能在虛擬主機的層面上進行一些權限設置,比如我可以訪問哪些隊列,我可以處理哪些請求等等;
broker:消息轉發者
和producer直接進行打交道的,類似于路由器的功能,主要就是進行轉發操作,producer到底用哪個exchange進行路由?取決于routing key(路由鍵),每個消息都有這個鍵(可以設定)其實就是一字符串
用于存放消息,接收exchange路由過來的消息,可以對隊列內容進行持久化操作,queue接收哪個exchange路由的消息?需要用到binding key(綁定鍵),綁定鍵會將隊列和exchange進行綁定,RabbitMQ提供了多種綁定方式
這些概念間的關系
producer(生產者)端步驟:
注意,當前沒有用到exchange交換機,我們沒有創建exchange時,RabbitMQ會創建一個默認的空字符串(exchange)并使用這個exchange
public class Sender { private final static String QUEUE_NAME = "MyQueue"; public static void main(String[] args) { send(); } public static void send() { ConnectionFactory factory = null; Connection connection = null; Channel channel = null; try { factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "my first message ....."; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("已經發送消息....."+message); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ try { //關閉資源 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
consumer(消費者)端步驟:
注意這里的queue名字要和前面producer創建的queue一致
public class Receiver { private final static String QUEUE_NAME = "MyQueue"; public static void main(String[] args) { receive(); } public static void receive() { ConnectionFactory factory = null; Connection connection = null; Channel channel = null; try { factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("11111111111"); String message = new String(body, "UTF-8"); System.out.println("收到消息....."+message); }}; channel.basicConsume(QUEUE_NAME, true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ try { //關閉資源 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态