rabbitmq默认端口,《RabbitMQ 实战指南》第三章 客户端开发向导

 2023-09-25 阅读 21 评论 0

摘要:《RabbitMQ 实战指南》 文章目录《RabbitMQ 实战指南》一、简介二、连接 RabbitMQ三、使用交换器和队列1.exchangeDeclare 方法详解2.queueDeclare 方法详解3.queueBind 方法详解4.exchangeBind 方法详解5.何时创建四、发送消息五、消费消息1.推模式2.拉模式六、消费端的确认与

《RabbitMQ 实战指南》

文章目录

  • 《RabbitMQ 实战指南》
  • 一、简介
  • 二、连接 RabbitMQ
  • 三、使用交换器和队列
    • 1.exchangeDeclare 方法详解
    • 2.queueDeclare 方法详解
    • 3.queueBind 方法详解
    • 4.exchangeBind 方法详解
    • 5.何时创建
  • 四、发送消息
  • 五、消费消息
    • 1.推模式
    • 2.拉模式
  • 六、消费端的确认与拒绝
  • 七、关闭连接

一、简介

RabbitMQ Java 客户端使用 com.rabbitmq.client 作为顶级包名,关键的 Class 和 Interface 有 Channel、Connection、ConnectionFactory、Consumer 等。AMQP 协议层面的操作通过 Channel 接口实现。Connection 是用来开启 Channel(信道)的,可以注册事件处理器,也可以在应用结束时关闭连接。与 RabbitMQ 相关的开发工作,基本上也是围绕 Connection 和 Channel 这两个类展开的。本章按照一个完整的运转流程进行讲解,详细内容有这几点:连接、交换器/队列的创建与绑定、发送消息、消费消息、消费消息的确认和关闭连接

二、连接 RabbitMQ

rabbitmq默认端口,下面的代码用来在给定的参数(IP 地址、端口号、用户名、密码等)下连接 RabbitMQ:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
Connection conn = factory.newConnection();

也可以选择使用 URI 的方式来实现,示例代码如下:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();

Connection 接口被用来创建一个 Channel:

Channel channel = conn.createChannel();

在创建之后,Channel 可以用来发送或者接收消息了

rabbitmq 端口。注意要点:
Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个 Channel。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享 Channel 实例是非线程安全的


Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否已处于开启状态。但并不推荐在生产环境的代码上使用 isOpen 方法,这个方法的返回值依赖于 shutdownCause 的存在,有可能会产生竞争,下面是 isOpen 方法的源码:

public boolean isOpen(){synchronized(this.monitor){return this.shutdownCause == null;}
}

通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其已经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException,我们只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭,示例代码如下所示:

public void validMethod(Channel channel){try{...channel.basicQos(1);}catch(ShutdownSignalException sse){...}catch(IOException ioe){...}
}

三、使用交换器和队列

交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需要确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare)它们

下面的代码演示了如何声明一个交换器和队列:

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(此队列的名称由 RabbitMQ 自动生成)。这里的交换器和队列也都没有设置特殊的参数

上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下特性:只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开时自动删除

如果要在应用中共享一个队列,可以做如下声明:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueNmae, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配另一个确定的已知的名称(由客户端分配而非 RabbitMQ 自动生成)

注意:Channel 的 API 方法都是可以重载的,比如 exchangeDeclare、queueDeclare。根据参数不同,可以有不同的重载形式,根据自身的需要进行调用

生产者和消费者都可以声明一个交换器或者队列,RabbitMQ 就可以什么都不做,并成功返回。如果声明的参数不匹配则会抛出异常

1.exchangeDeclare 方法详解

exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的:

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;

这个方法的返回值是 Exchange.DeclareOk,用来标识成功声明了一个交换器

各个参数详细说明如下所述:

  • exchange:交换器的名称
  • type:交换器的类型,常见的如 fanout、direct、topic
  • durable:设置是否持久化。durable 设置为 true 表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
  • autoDelete:设置是否自动删除。autoDelete 设置为 true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑时,才会自动删除。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器”
  • internal:设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  • argument:其他一些结构化参数,比如 alternate-exchange

exchangeDeclare 的其他重载方法如下:

Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,Map<String, Object> argument)throws IOException;

与此对应的,将第二个参数 String type 换成 BuiltInExchangeType type 对应的几个重载方法(不常用):

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,Map<String, Object> argument)throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;

与 exchangeDeclare 师出同门的还有几个方法,比如 exchangeDeclareNoWait 方法,具体定义如下(当然也有 BuiltExchangeType 版的,这里就不展开了):

void exchangeDeclareNoWait(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;

这个 exchangeDeclareNoWait 比 exchangeDeclare 多设置了一个 nowait 参数,这个 nowait 参数指的是 AMQP 中 Exchange.Declare 命令的参数,意思是不需要服务器返回,注意这个方法的返回值是 void,而普通的 exchangeDeclare 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange.Declare-Ok 这个 AMQP 命令)

针对 “exchangeDeclareNoWait 不需要服务器任何返回值” 这一点,考虑这样一种情况,在声明完一个交换器之后(实际服务器还并未完成交换器的创建),那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法

这里还有师出同门的另一个方法 exchangeDeclarePassive,这个方法的定义如下:

Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭

有声明创建交换器的方法,当然也有删除交换器的方法,相应的方法如下:

Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

其中 exchange 表示交换器的名称,而 ifUnused 用来设置是否在交换器没有被使用的情况下删除。如果 isUnused 设置为 true,则只有在此交换器没有被使用的情况下才会被删除;如果设置 false,则无论如何这个交换器都要被删除

2.queueDeclare 方法详解

queueDeclare 相对于 exchangeDeclare 方法而言,重载方法的个数就少很多,它只有两个重载方法:

Queue.DeclareOk queueDeclare() throws IOException;Queue.DeclareOk queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments) throws IOException;

不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种 amq.gen-LhQzlgv3GhDOv8PIDabOXA 名称,这种队列名称也称之为匿名队列)、排他的、自动删除的、非持久化的队列

方法的参数详细说明如下所述:

  • queue:队列的名称
  • durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
  • exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除,这里需要注意三点:排他队列是基于连接可见的,同一个连接的不同信道是可以同时访问同一连接创建的排他队列:“首次” 是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景
  • autoDelete:设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”。
  • arguments:设置队列的一些其他参数,如 x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority 等

注意要点
生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为 “传输” 模式,之后才能声明队列


对应于 exchangeDeclareNoWait 方法,这里也有一个 queueDeclareNoWait 方法:

void queueDeclareNoWait(String queue,boolean duarble,boolean exclusive,boolean autoDelete,Map<String, Object> arguments) throws IOException;

方法的返回值也是 void,表示不需要服务器端的任何返回。同样也需要注意,在调用完 queueDeclareNoWait 方法之后,紧接着使用声明的队列时有可能会发生异常情况

同样这里还有一个 queueDeclarePassive 的方法,也比较常用。这个方法用来检测相应的队列是否存在。如果存在则正常放回,如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭。方法定义如下:

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

与交换器对应,关于队列也有删除的相应方法

Queue.DeleteOk queueDelete(String queue) throws IOException;Queue.DeleteOk queueDelete(String queue, boolean ifUnuserd, boolean ifEmpty) throws IOException;void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

其中 queue 表示队列的名称,ifUnused 可以参考上一小节的交换器。ifEmpty 设置为 true 表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除

与队列相关的还有一个有意思的方法 —— queuePurge,区别于 queueDelete,这个方法用来清空队列中的内容,而不删除队列本身,具体定义如下:

Queue.PurgeOk queuePurge(String queue) throws IOException;

3.queueBind 方法详解

将队列和交换器绑定的方法如下,可以与前两节中的方法定义进行类比

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;Queue.BindOk queueBind(String queue,String exchange,String routingKey,Map<String, Object> arguments) throws IOException;void queueBindNoWait(String queue,String exchange,String routingKey,Map<String, Object> arguments) throws IOException;

方法中设计的参数详解:

  • queue:队列名称
  • exchange:交换器的名称
  • routingKey:用来绑定队列和交换器的路由键
  • argument:定义绑定的一些参数

不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体方法可以参考如下:

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;Queue.UnbindOk queueUnbind(String queue,String exchange,String routingKey,Map<String, Object> arguments) throws IOException;

4.exchangeBind 方法详解

我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定,后者和前者的用法如出一辙,相应的方法如下:

Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;Exchange.BindOk exchangeBind(String destination,String source,String routingKey,Map<String, Object> arguments) throws IOException;void exchangeBindNoWait(String destination,String source,String routingKey,Map<String, Object> arguments) throws IOException;

绑定之后,消息从 source 交换器转发到 destination 交换器,某种程度上来说 destination 交换器可以看作一个队列。示例代码如下所示:

channel.exchangeDeclare("source", "direct", false, true, null);
channel.exchangeDeclare("destination", "fanout", false, true, null);
channel.exchangeBind("destination", "source", "exKey");
channel.queueDeclare("queue", false, false, true, null);
channel.queueBind("queue", "destination", "");
channel.basicPublish("source", "exKey", null, "exToExDemo".getBytes());

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配得另一个交换器 destination,并把消息转发到 destination 绑定的队列 queue 中,可参考图 3-1

在这里插入图片描述

5.何时创建

RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS 只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平局值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配

按照 RabbitMQ 官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。这是个很好的建议,但不适用于所有的情况。如果业务本身在架构设计之初已经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、RabbitMQ 命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可

预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失;或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。当然可以配合 mandatory 参数或者备份交换器来提高程序的健壮性

与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理

如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列

至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑

四、发送消息

如果要发送一个消息,可以使用 Channel 类的 basicPublish 方法,比如发送一条内容为 “Hello World!” 的消息,参考如下:

byte[] messageBodyBytes = "Hello World!".getBytes();
channel.baseicPulish(exchangeName, routingKey, null, messageBodyBytes);

为了更好地控制发送,可以使用 mandatory 这个参数,或者可以发送一些特定属性的信息:

channel.basicPublish(exchangeName,routingKey,mandatory,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

上面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为 2,即消息会被持久化(即存入磁盘)在服务器中,同时这条消息的优先级(priority)设置为 1,content-type 为 “text/plain”。可以自己设定消息的属性:

channel.basicPublish(exchangeName,routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliverMode(2).priority(1).userId("hidden").build()),messageBodyBytes);

也可以发送一条带有 headers 的消息:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("location", "here");
headers.put("time", "today");
channel.basicPublish(exchangeName,routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

还可以发送一条带有过期时间(expiration)的消息:

channel.basicPublish(exchangeName,routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

以上只是举例,由于篇幅关系,这里就不一一列举所有的可能情形了。对于 basicPublish 而言,有几个重载方法:

void basicPublish(String exchange,String routingKey,BasicProperties props,byte[] body) throws IOException;void basicPublish(String exchange,String routingKey,boolean mandatory,BasicProperties props,byte[] body) throws IOException;void basicPublish(String exchange,String routingKey,boolean mandatory,boolean immediate,BasicProperties props,byte[] body) throws IOException;

对应的具体参数解释如下所述:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中
  • props:消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String, Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。其中常用的几种都在上面的示例中进行了演示
  • byte[] body:消息体(payload),真正需要发送的消息
  • mandatory 和 immediate 的详细内容在后面的章节会展开

五、消费消息

RabbitMQ 的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume 进行消费,而拉模式则是调用 Basic.Get 进行消费

1.推模式

在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现,当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码如下所示:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{String routingKey = envelope.getRoutingKey();String contentType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();//process the message components here ...channel.basicAck(deliveryTag, false);}});								)

注意,上面代码中显式地设置 autoAck 为 false,然后在接收到消息之后进行显式 ack 操作(channel.basicAck),对于消费者来说,这个设置是非常必要的,可以防止消息不必要地丢失

Channel 类中 basicConsume 方法有如下几种形式:

String basicConsume(String queue, Consumer callback) throws IOException;String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;String basicConsume(String queue,boolean autoAck,Map<String, Object> arguments,Consumer callback) throws IOException; String basicConsume(String queue,boolean autoAck,String consumeTag,Consumer callback) throws IOException;
String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String, Object> arguments,Consumer callback) throws IOException;

其对应的参数说明如下所述:

  • queue:队列地名称
  • autoAck:设置是否自动确认。建议设置成 false,不自动确认
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
  • exclusive:设置是否排他
  • arguments:设置消费者的其他参数
  • callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer,使用时需要客户端重写(override)其中的方法

对于消费者客户端来说,重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:

void handleConsumeOk(String consumerTag);
void handleCancleOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);

比如 handleShutdownSignal 方法,当 Channel 或者 Connection 关闭的时候会调用,再者,handleConsumeOk 方法会在其他方法之前调用,返回消费者标签

重写 handleCancelOk 和 handleCancel 方法,这样消费端可以在显式地或者隐式地取消订阅的时候调用。也可以通过 channel.basicCancel 方法来显式地取消一个消费者的订阅:

channel.basicCancel(consumerTag);

注意上面这行代码会首先触发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法

和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel 等

每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者,但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被 “耽搁”

2.拉模式

这里讲一下拉模式的消费方式。通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetRespone。Channel 类的 basicGet 方法没有其他重载方法,只有:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

其中 queue 代表队列的名称,如果设置 autoAck 为 false,那么同样需要调用 channel.basicAck 来确认消息已被成功接收

拉模式的关键代码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

拉模式的消费方式如图 3-2 所示(只展示消费的部分)

在这里插入图片描述

注意要点
Basic.Consume 会将信道(Channel)置为投递模式,直到取消队列的订阅为止。在投递模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息消息的个数还是会受到 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法

六、消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正消费到了这些消息

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失地问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者地消息:一部分是已经投递给消费者,但是还没有受到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者地确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数

在这里插入图片描述

也可以通过相应的命令来查看上述信息:
在这里插入图片描述
在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 这个命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息

Channel 类中的 basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException

其中 deliveryTag 可以看作消息的编号,它是一个 64 位的长整型值,最大值是 9223372036854775807。如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。消费者客户端可以调用 channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中 deliveryTag 和 requeue 的含义可以参考 basicReject 方法。multiple 参数设置为 false 则表示拒绝编号为 deliveryTag 的这一条消息,multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息

注意要点
将 channel.basicReject 或者 channel.basicNack 中的 requeue 设置为 false,可以启用 “死信队列”。死性队列可以通过检测被拒绝或者未送达的消息来追踪问题

对于 requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

Basic.RecoverOk basicRecover() throws IOException;Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个 channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于 channel.basicRecover(true),即 requeue 默认为 true

七、关闭连接

在应用程序使用完之后,需要关闭连接,释放资源:

channel.close();
conn.close();

显式地关闭 Channel 是个好习惯,但这不是必须的,在 Connection 关闭的时候,Channel 也会自动关闭

AMQP 协议中的 Connection 和 Channel 采用同样的方式来管理网络失败、内部错误和显式地关闭连接。Connection 和 Channel 所具备的生命周期如下所述:

  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
  • Closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身

Connection 和 Channel 最终都是会成为 Closed 的状态,不论是程序正常调用的关闭方法,或者是客户端的异常,再或者是发生了网络异常

在 Connection 和 Channel 中,与关闭相关的方法有 addShutdownListener(ShutdownListener listener)和 removeShutdownListener(ShutdownListener listener)。当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener。而且如果将一个 ShutdownListener 注册到一个已经处于 Closed 状态的对象(这里特指 Connection 和 Channel 对象)时,会立刻调用 ShutdownListener

getCloseReason 方法可以让你知道关闭的原因:isOpen 方法检测对象当前是否处于开启状态;close(int closeCode, String closeMessage) 方法显式地通知当前对象执行关闭操作

有关 shutdownListener 地使用可以参考下面的代码:

connection.addShutdownListener(new ShutdownListener(){public void shutdownCompleted(ShutdownSignalException cause){...}}
);

当触发 ShutdownListener 的时候,就可以获取到 ShutdownSignalException,这个 ShutdownSignalException 包含了关闭的原因,这里原因也可以通过调用前面提及的 getCloseReason 方法获取

ShutdownSignalException 提供了多个方法来分析关闭的原因。isHardError 方法可以知道是 Connection 的还是 Channel 的错误getReason 方法可以获取 cause 相关的信息,相关示例可以参考下面的代码:

public void shutdownCompleted(ShutdownSignalException cause){if(cause.isHardError()){Connection conn = (Connection)cause.getReference();if(!cause.isInitiatedByApplication()){Method reason = cause.getReason();...}}else{Channel ch = (Channel)cause.getReference();...}
}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/95632.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息