RabbitMQ实战经验分享

 2023-09-10 阅读 14 评论 0

摘要:RabbitMQ实战经验分享 原文:RabbitMQ实战经验分享前言 最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。 下面分享下RabbitMQ实战经验,希望对大家有所帮助: 一、
RabbitMQ实战经验分享
原文:RabbitMQ实战经验分享

前言

最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。

下面分享下RabbitMQ实战经验,希望对大家有所帮助:


 

 

一、生产消息

关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。

营销经验分享、先上发消息的代码

private bool MarkErrorSend(string[] lstMsg){try{var factory = new ConnectionFactory(){UserName = "guest",//用户名Password = "guest",//密码HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
                };//创建连接var connection = factory.CreateConnection();//创建通道var channel = connection.CreateModel();try{//定义一个Direct类型交换机
                    channel.ExchangeDeclare(exchange: "TestTopicChange", //exchange名称type: ExchangeType.Topic, //Topic模式,采用路由匹配durable: true,//exchange持久化autoDelete: false,//是否自动删除,一般设成falsearguments: null//一些结构化参数,比如:alternate-exchange
                        );//定义测试队列
                    channel.QueueDeclare(queue: "Test_Queue", //队列名称durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除autoDelete: false,//是否自动删除,一般设成falsearguments: null);//将队列绑定到交换机string routeKey = "TestRouteKey.*";//*匹配一个单词
                    channel.QueueBind(queue: "Test_Queue",exchange: "TestTopicChange",routingKey: routeKey,arguments: null);//消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)IBasicProperties properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;channel.ConfirmSelect();//发送确认机制foreach (var itemMsg in lstMsg){byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);//发布消息
                        channel.BasicPublish(exchange: "TestTopicChange",routingKey: "TestRouteKey.one",basicProperties: properties,body: sendBytes);}bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回truereturn isAllPublished;}catch (Exception ex){//写错误日志return false;}finally{channel.Close();connection.Close();}}catch{//RabbitMQ.Client.Exceptions.BrokerUnreachableException://When the configured hostname was not reachable.return false;}}

        发消息没啥特别的。关于消息持久化的介绍这里也不再介绍,不懂的可以看上篇文章。发消息需要注意的地方是,可以选择多条消息一起发送,最后才确定消息发送成功,这样效率比较高;此外,需要尽量精简每条消息的长度(楼主在这里吃过亏),不然会因消息过长从而增加发送时间。在实际项目中一次发了4万多条数据没有出现问题。

 


rabbitmq教程、 

二、接收消息

       接下来说下消费消息的过程,我使用的是单个连接多个channel,每个channel每次只取一条消息方法。有人会问单个TCP连接,多个channel会不会影响通信效率。这个理论上肯定会有影响的,看影响大不大而已。我开的channel数一般去到30左右,并没有觉得影响效率,有可能是因为我每个channel是拿一条消息的原因。通过单个连接多个channel的方法,可以少开了很多连接。至于我为什么选每个channel每次只取一条消息,这是外界因素限制了,具体看自己需求。

       接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn连接变量,此外还有创建连接、关闭连接和验证连接是否打开等方法。程序运行一个定时器,当

实战居家经验分享,检测到连接未打开的情况下,主动创建连接处理消息。

 public class RabbitMQHelper{public IConnection conn = null;/// <summary>/// 创建RabbitMQ消息中间件连接/// </summary>/// <returns>返回连接对象</returns>public IConnection RabbitConnection(string sHostName, ushort nChannelMax){try{if (conn == null){var factory = new ConnectionFactory(){UserName = "guest",//用户名Password = "guest",//密码HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测连接是否存在RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒RequestedChannelMax = nChannelMax//与开的线程数保持一致
                    };//创建连接conn = factory.CreateConnection();Console.WriteLine("RabbitMQ连接已创建!");}return conn;}catch{Console.WriteLine("创建连接失败,请检查RabbitMQ是否正常运行!");return null;}}/// <summary>/// 关闭RabbitMQ连接/// </summary>public void Close(){try{if (conn != null){if (conn.IsOpen)conn.Close();conn = null;Console.WriteLine("RabbitMQ连接已关闭!");}}catch { }}/// <summary>/// 判断RabbitMQ连接是否打开/// </summary>/// <returns></returns>public bool IsOpen(){try{if (conn != null){if (conn.IsOpen)return true;}return false;}catch{return false;}}}

 

       接下来我们看具体如何接收消息。

private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = 10;//一个连接的最大通道数和所开的线程数一致

       首先初始化一个rabbit实例,然后通过RabbitConnection方法创建RabbitMQ连接。

       当连接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,不然会有问题(具体问题是,设了RabbitMQ连接超时时间为10秒,有时候不管用,原因未查明。RabbitMQ创建连接默认超时时间为30秒,假如在这个时间内再去调用创建的话,就有可能得到两倍的channel;)

/// <summary>/// 单个RabbitMQ连接开多个线程,每个线程开一个channel接受消息/// </summary>private void CreateConnecttion(){try{rabbit.RabbitConnection("localhost", nChannel);if (rabbit.conn != null){ThreadPool.SetMinThreads(1, 1);ThreadPool.SetMaxThreads(100, 100);for (int i = 1; i <= nChannel; i++){ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");}myEvent.WaitOne();//等待所有线程工作完成后,才能关闭连接
                    rabbit.Close();}}catch (Exception ex){rabbit.Close();Console.WriteLine(ex.Message);}}

 

       接着就是接收消息的方法,处理消息的过程省略了。

  /// <summary>/// 接收并处理消息,在一个连接中创建多个通道(channel),避免创建多个连接/// </summary>/// <param name="con">RabbitMQ连接</param>private void ReceiveMsg(object obj){IModel channel = null;try{#region 创建通道,定义中转站和队列channel = rabbit.conn.CreateModel();channel.ExchangeDeclare(exchange: "TestTopicChange", //exchange名称type: ExchangeType.Topic, //Topic模式,采用路由匹配durable: true,//exchange持久化autoDelete: false,//是否自动删除,一般设成falsearguments: null//一些结构化参数,比如:alternate-exchange
                    );//定义阅卷队列
                channel.QueueDeclare(queue: "Test_Queue", //队列名称durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除autoDelete: false,arguments: null);#endregionchannel.BasicQos(0, 1, false);//每次只接收一条消息
channel.QueueBind(queue: "Test_Queue",exchange: "TestTopicChange",routingKey: "TestRouteKey.*");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;//处理消息方法try{bool isMark = AutoMark(message);if (isMark){//Function.writeMarkLog(message);//确认该消息已被消费,发消息给RabbitMQ队列channel.BasicAck(ea.DeliveryTag, false);}else{if (MarkErrorSend(message))//把错误消息推到错误消息队列channel.BasicReject(ea.DeliveryTag, false);else//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 channel.BasicReject(ea.DeliveryTag, true);}}catch (Exception ex){try{Console.WriteLine(ex.Message);if (channel != null && channel.IsOpen)//处理RabbitMQ停止重启而自动评阅崩溃的问题
                            {//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 channel.BasicReject(ea.DeliveryTag, true);}}catch { }}};//手动确认消息channel.BasicConsume(queue: "Test_Queue",autoAck: false,consumer: consumer);}catch (Exception ex){try{Console.WriteLine("接收消息方法出错:" + ex.Message);if (channel != null && channel.IsOpen)//关闭通道
                        channel.Close();if (rabbit.conn != null)//处理RabbitMQ突然停止的问题
                        rabbit.Close();}catch { }}}

 

 

三、处理错误消息

       把处理失败的消息放到“错误队列”,然后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,如果把这些消息都放回原队列,它们会继续分发到其他线程的channel,但结果还是处理不了,就会造成一个死循环,导致后面的消息无法处理)。把第一次处理不了的消息放到“错误队列”后,重新再开一个新的连接去处理“错误队列”的消息。

/// <summary>/// 把处理错误的消息发送到“错误消息队列”/// </summary>/// <param name="msg"></param>/// <returns></returns>private bool MarkErrorSend(string msg){RabbitMQHelper MQ = new RabbitMQHelper();MQ.RabbitConnection("localhost",1);//创建通道var channel = MQ.conn.CreateModel();try{//定义一个Direct类型交换机
                channel.ExchangeDeclare(exchange: "ErrorTopicChange", //exchange名称type: ExchangeType.Topic, //Topic模式,采用路由匹配durable: true,//exchange持久化autoDelete: false,//是否自动删除,一般设成falsearguments: null//一些结构化参数,比如:alternate-exchange
                    );//定义阅卷队列
                channel.QueueDeclare(queue: "Error_Queue", //队列名称durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除autoDelete: false,//是否自动删除,一般设成falsearguments: null);//将队列绑定到交换机string routeKey = "ErrorRouteKey.*";//*匹配一个单词
                channel.QueueBind(queue: "Error_Queue",exchange: "ErrorTopicChange",routingKey: routeKey,arguments: null);//消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)IBasicProperties properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;channel.ConfirmSelect();//发送确认机制byte[] sendBytes = Encoding.UTF8.GetBytes(msg);//发布消息
                channel.BasicPublish(exchange: "ErrorTopicChange",routingKey: "ErrorRouteKey.one",basicProperties: properties,body: sendBytes);bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回truereturn isAllPublished;}catch (Exception ex){//写错误日志return false;}finally{channel.Close();MQ.conn.Close();}}

 

总结:RabbitMQ本身已经很稳定了,而且性能也很好,所有不稳定的因素都在我们处理消息的过程,所以可以放心使用。

Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper

 

posted on 2019-03-22 08:22 NET未来之路 阅读(...) 评论(...) 编辑 收藏

转载于:https://www.cnblogs.com/lonelyxmas/p/10576012.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/36216.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息