php写入rabbit速度,RabbitMQ 入门教程(PHP) 实现延迟功能

 2023-09-09 阅读 18 评论 0

摘要:php 使用rabbitmq-delayed-message-exchange插件实现延迟功能1.安装3.6.x下载地址3.7.x下载地址rabbitmq并发能力、下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-ver

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能

1.安装

3.6.x下载地址

3.7.x下载地址

rabbitmq并发能力、下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins )。

2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

复制代码

php开发基础入门。输出如下:

The following plugins have been enabled:

rabbitmq_delayed_message_exchange

复制代码

通过rabbitmq-plugins list查看已安装列表,如下:

php入门手册、[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

复制代码

3.机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

4.php实现过程

rabbitmq教程,消费者 delay_consumer2.php:

//header('Content-Type:text/html;charset=utf8;');

$params = array(

'exchangeName' => 'delayed_exchange_test',

'queueName' => 'delayed_queue_test',

php最好教程、'routeKey' => 'delayed_route_test',

);

$connectConfig = array(

'host' => 'localhost',

'port' => 5672,

rabbitmq部署、'login' => 'guest',

'password' => 'guest',

'vhost' => '/'

);

//var_dump(extension_loaded('amqp'));

php教程文档?//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

php入门、//die('Conexiune esuata');

//TODO 记录日志

echo 'rabbit-mq 连接错误:', json_encode($connectConfig);

exit();

}

php编程教程。$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die('Connection through channel failed');

//TODO 记录日志

echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);

php入门软件?exit();

}

$exchange = new AMQPExchange($channel);

//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端

$exchange->setName($params['exchangeName']);

php打开文件写入内容、$exchange->setType('x-delayed-message'); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

php写入文件内容方法?$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

//$channel->startTransaction();

$queue = new AMQPQueue($channel);

$queue->setName($params['queueName']);

php文件写入程序,$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定

$queue->bind($params['exchangeName'], $params['routeKey']);

} catch(Exception $e) {

php追加写入文件、echo $e->getMessage();

exit();

}

function callback(AMQPEnvelope $message) {

global $queue;

php入门教程培训。if ($message) {

$body = $message->getBody();

echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;

echo '接收内容:'.$body . PHP_EOL;

//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息

$queue->ack($message->getDeliveryTag());

} else {

echo 'no message' . PHP_EOL;

}

}

//$queue->consume('callback'); 第一种消费方式,但是会阻塞,程序一直会卡在此处

//第二种消费方式,非阻塞

/*$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo $message->getBody();

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo '
' . ($end - $start);

exit();

}

else

{

//echo 'message not found' . PHP_EOL;

}

}*/

//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。

//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。

$action = '2';

if($action == '1'){

$queue->consume('callback'); //第一种消费方式,但是会阻塞,程序一直会卡在此处

}else{

//第二种消费方式,非阻塞

$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;

echo '接收内容:'.$message->getBody().PHP_EOL;

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo '运行时间:'.($end - $start).'秒'.PHP_EOL;

//exit();

}

else

{

//echo 'message not found' . PHP_EOL;

}

}

}

复制代码

生产者delay_publisher2.php:

//header('Content-Type:text/html;charset=utf-8;');

$params = array(

'exchangeName' => 'delayed_exchange_test',

'queueName' => 'delayed_queue_test',

'routeKey' => 'delayed_route_test',

);

$connectConfig = array(

'host' => 'localhost',

'port' => 5672,

'login' => 'guest',

'password' => 'guest',

'vhost' => '/'

);

//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展

//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

//die('Conexiune esuata');

//TODO 记录日志

echo 'rabbit-mq 连接错误:', json_encode($connectConfig);

exit();

}

$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die('Connection through channel failed');

//TODO 记录日志

echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);

exit();

}

$exchange = new AMQPExchange($channel);

$exchange->setName($params['exchangeName']);

$exchange->setType('x-delayed-message'); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

//$channel->startTransaction();

//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错

$queue = new AMQPQueue($channel);

$queue->setName($params['queueName']);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定队列和交换机

$queue->bind($params['exchangeName'], $params['routeKey']);

//$channel->commitTransaction();

} catch(Exception $e) {

}

for($i=5;$i>0;$i--){

//生成消息

echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;

echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;

$message = json_encode(['order_id'=>time(),'i'=>$i]);

$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);

sleep(2);

}

$conn->disconnect();

复制代码

对于代码来讲,首先对于消费者核心代码

$exchange->setType('x-delayed-message'); //x-delayed-message类型

$exchange->setArgument('x-delayed-type','direct');

复制代码

生产者核心代码

$exchange = new AMQPExchange($channel);

$exchange->setName($params['exchangeName']);

$exchange->setType('x-delayed-message'); //x-delayed-message类型

$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

复制代码

**使用方法:**先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

db6aeab2b281e9012bd3ad84bef56130.png

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/28532.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息