1.首先是发布消息者的代码
public class MQPublisher {public static void main(String[] args) {try{ConnectionFactory factory = new ConnectionFactory();factory.setHost("123.199.200.54");//129.199.200.54factory.setUsername("omsuser");factory.setPassword("9aVXR2LLk2xskcDq7ziccWKP");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ManufactureBill", true, false, false, null);String bill = getManufactureBill();MQMessage mqmsg=new MQMessage();JSONObject jso=JSON.parseObject(bill);//json字符串转换成jsonobject对象mqmsg.setObject(jso);mqmsg.setType(0);String MQjson = JSON.toJSONString(mqmsg,SerializerFeature.WriteMapNullValue);//channel.basicPublish("", "ManufactureBill", null, bill.getBytes());// channel.queueDelete("ManufactureBill");channel.basicPublish("", "ManufactureBill", null, MQjson.getBytes());channel.close();connection.close();}catch(Exception e){e.printStackTrace();}}
定义通道
AMQP.Queue.DeclareOk queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments)throws IOException
- Parameters:
queue
- the name of the queue 通道的名称,消费者要接收信息,名称要与此相同,同一个连接下面会可能有多个通道durable
- true if we are declaring a durable queue (the queue will survive a server restart) 定义一个持久的队列,一般设置为true,当服务器挂掉之后,队列依然存在,exclusive
- true if we are declaring an exclusive queue (restricted to this connection) 定义一个独有队列(不允许别人连接)一般设置为false,不然怎么接受消息autoDelete
- true if we are declaring an autodelete queue (server will delete it when no longer in use) 当服务器不在用到此队列的时候,是否自动删除。一般设置为false,和durable一致arguments
- other properties (construction arguments) for the queue 这个不知道 Returns:- a declaration-confirm method to indicate the queue was successfully declared Throws:
IOException
- if an error is encountered See Also:AMQP.Queue.Declare
,AMQP.Queue.DeclareOk
2. 消费者:通过监听器方式,项目启动便启动,开启一个线程执行下面代码,while(true)无限循环,间隔一定时间。然后处理消息(nacked确认),不发送还是,重新发送,如果不确认,消息无法被重新投递
boolean state=true;Channel channel=null;QueueingConsumer.Delivery delivery=null;try {ConnectionFactory factory = new ConnectionFactory(); factory.setHost("134.129.200.54");System.out.println("************收到ykl订单信息**************************");factory.setUsername("omsuser");factory.setPassword("9aVXR2LLk2xs54Dq7ziccWKP"); //9aVXR2LLk2xskcDq7ziccWKP//123456factory.setPort(5672);Connection connection = factory.newConnection();channel = connection.createChannel();channel.queueDeclare("ManufactureBill", true, false, false, null);QueueingConsumer consumer = new QueueingConsumer(channel);// channel.basicConsume("ManufactureBill", false, consumer); delivery= consumer.nextDelivery();String message = new String(delivery.getBody(),"utf-8"); System.out.println("接收数据程序加载完成");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //不需要再次发送 } catch (Exception e) {try {channel.basicNack(delivery.getEnvelope().getDeliveryTag(),true, false);//再次发送} catch (IOException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}}
百度翻译api id和密钥? void basicReject(long deliveryTag, boolean requeue) throws IOException;
第一个参数指定 delivery tag,相当于这条消息的id
,第二个参数说明如何处理这个失败消息。requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息。
一般来说,如果是系统无法处理的异常,我们一般是将 requeue 设为 false,例如消息格式错误,再处理多少次也是异常。调用第三方接口超时这类异常 requeue 应该设为 true。
从 basicReject 方法参数可见,取消确认不支持批量操作(类似于 basicAck 的 multiple 参数)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。
void basicNack(long deliveryTag,boolean multiple,boolean requeue)throws IOException
deliveryTag
from the AMQP.Basic.GetOk
or AMQP.Basic.GetOk
method containing the message to be rejected.- Parameters:
deliveryTag
- the tag from the receivedAMQP.Basic.GetOk
orAMQP.Basic.Deliver 相当于这条消息的id
multiple
- true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag. true表示不再接受任何消息,false表示不再接收参数deliveryTag
表示的这条数据requeue
- true if the rejected message(s) should be requeued rather than discarded/dead-lettered requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息 参考上面方法 Throws:IOException
- if an error is encountered See Also:AMQP.Basic.Nack
- 参考:https://www.cnblogs.com/gordonkong/p/6952957.html?utm_source=itdadao&utm_medium=referral