RabbitMQ是一个消息中间件,负责接收和转发消息,本篇主要梳理了交换机的direct、topic、fanout三种最常用的模式。
场景
突然很多人需要通过系统发送邮件,但是系统每秒最多处理100封邮件,那么如果突然1000封过来,系统肯定是承受不住的,直接就打挂了。这时候就可以把邮件请求放到队列里面,服务器根据自己处理能力进行消费,这样就可以平稳度过。
当前系统中出现了一些事件,但是有其他服务需要处理这个事件,或者是当前系统不必实时处理这个事件,那么就可以把这个事件存入消息队列,然后由其他服务取出并做出行为。
我之前公司就是有个产品,创建会议的时候要和很多第三方通信,链路比较长,速度就很慢,然后就改成了消息队列异步处理,首先会议创建成功后直接反馈给用户,然后剩下的消耗时间的地方慢慢处理。
而且,用消息队列也能避免出现问题后的排查困难度,比如创建会议后最终在zoom系统中会议创建失败,那么只需要排查消费端代码。会议完全创建成功后,没有给关注的用户发通知,也只需要排查消费端代码。解耦,并且排查问题不涉及整个流程,大家都轻松。
RabbitMQ
消息就是字符串或形式的数据,队列是一种链表,消息队列就是存放消息的链表。
把消息放入队列的一方叫做生产者,从消息队列中取出消息的一方叫做消费者。
RabbitMQ消息模型的理念是:发布者不会直接发送任何消息给队列,发布者只需要把消息发给交换机,交换机从发布方接收消息然后推送给消息队列。
发布者 >> 交换机 >> 队列 >> 消费者
消息队列操作步骤:
- 建立和mq的连接。
- 创建一个通道。
- 设置交换机。
- 交换机绑定队列。
- 生成消息。
- 推送消息到交换机。
其中,由交换机,队列,路由关键字这三个元素,决定一个消息最终到哪个队列中。
交换机类型有:直连交换机(direct)、主题交换机(topic)、(头交换机)headers和扇形交换机(fanout)。
直连交换机(direct)
把消息路由到bind key与routing key匹配的queue中,如下,routingKey、queueName、exchangeName是一个绑定,发送消息时指定了routingKey,那么这个消息就会被路由到queueName队列中。
1 2 3
| $channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']); ... $channel->basic_publish($msg, $conf['exchangeName'], $conf['routingKey']);
|
push
语言PHP
框架Laravel
RabbitMQ库:https://packagist.org/packages/php-amqplib/php-amqplib (v2.11.3)
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
$conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], 'exchangeName' => 'ex', //交换机名 'queueName' => 'list_a', //队列名称 'routingKey' => '', //路由关键字(也可以省略) ]; // 建立和mq的连接 $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ], $options = []); $channel = $connection->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($conf['exchangeName'], 'direct', false, true, false); //声明初始化交换机 $channel->queue_declare($conf['queueName'], false, true, false, false); //声明初始化一条队列 $channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']); //将队列与某个交换机进行绑定,并使用路由关键字
$msgBody = 'message'; $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息 $r = $channel->basic_publish($msg, $conf['exchangeName'], $conf['routingKey']); //推送消息到交换机 $channel->close(); $connection->close();
|
pull
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| use PhpAmqpLib\Connection\AMQPStreamConnection;
$conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], 'exchangeName' => 'ex', //交换机名 'queueName' => 'list_a', //队列名称 'routingKey' => '', //路由关键字(也可以省略) ];
// 建立和mq的连接 $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ], $options = []); $channel = $connection->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($conf['exchangeName'], 'direct', false, true, false); //声明初始化交换机 $channel->queue_declare($conf['queueName'], false, true, false, false); //声明初始化一条队列 $channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']); //将队列与某个交换机进行绑定,并使用路由关键字
$callback = function ($msg) { echo $msg->body."\n"; }; $channel->basic_consume($conf['queueName'], '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
|
主题交换机(topic)
可根据通配符选择性接收消息,例子:
push
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| $conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], ]; $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ]); $channel = $connection->channel();
// 设置交换机为topic_logs,类型为topic $channel->exchange_declare('topic_logs', 'topic', false, false, false);
$data = [ [ 'message' => 'error message', 'routing_key' => 'log.error', ], [ 'message' => 'info message', 'routing_key' => 'log.info', ], ];
foreach ($data as $key => $value) { // 指定信息、交换机、路由关键字 $msg = new AMQPMessage($value['message']); $channel->basic_publish($msg, 'topic_logs', $value['routing_key']); }
$channel->close(); $connection->close();
|
pull
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| $conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], ]; $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ]); $channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false); // 设置交换机为topic_logs,类型为topic list($queue_name) = $channel->queue_declare('', false, false, true, false); $channel->queue_bind($queue_name, 'topic_logs', '#.error'); // 仅接路由关键字为logs.error的消息 // $channel->queue_bind($queue_name, 'topic_logs', '#.#'); // 不限路由关键字,全接收
$callback = function ($msg) { echo $msg->body."\n"; };
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) { $channel->wait(); }
$channel->close(); $connection->close();
|
扇形交换机(fanout)
也被称为订阅模式、广播模式。
不需要路由关键字,交换机会将消息发给每个绑定的队列中。
生产者往fanout模式的交换机里发消息,然后消费者们可以把队列和fanout模式的这个交换机绑定,来接收消息。
就像电脑分屏,水池接水管。
push
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| $conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], ]; $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ]); $channel = $connection->channel();
// 交换机名:notice // 交换机模式:fanout $channel->exchange_declare('notice', 'fanout', false, true, false);
$data = [ [ 'message' => 'fanout消息', ], [ 'message' => 'fanout消息2', ], ];
foreach ($data as $key => $value) { // 指定信息、交换机、路由关键字 $msg = new AMQPMessage($value['message']); $channel->basic_publish($msg, 'notice'); }
$channel->close(); $connection->close();
|
pull
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| $conf = [ 'connection' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'sheng', 'password' => '123456', 'vhost' => '/', ], ]; $connection = AMQPStreamConnection::create_connection([ $conf['connection'], ]); $channel = $connection->channel();
// 交换机名:notice // 交换机模式:fanout $channel->exchange_declare('notice', 'fanout', false, true, false); $channel->queue_declare('queue_notice_1', false, true, false, false); // 声明初始化一条队列 $channel->queue_bind('queue_notice_1', 'notice'); // 绑定队列到交换机
$callback = function ($msg) { echo $msg->body."\n"; };
$channel->basic_consume('queue_notice_1', '', false, true, false, false, $callback);
while ($channel->is_consuming()) { $channel->wait(); }
$channel->close(); $connection->close();
|
常用命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| systemctl restart rabbitmq-server.service ---重启服务 rabbitmqctl start_app ---启动 rabbitmqctl stop_app ---停止 rabbitmqctl status ---查看当前状态 rabbitmq-plugins list ---查看插件启用状态 rabbitmqctl list_connections ---用于查看当前的连接 rabbitmqctl list_queues ---会列出所有队列名称,后边可能还会带着这个队列当前消息数 rabbitmqctl list_bindings ---列出所有现存的绑定 rabbitmqctl list_users ---列出所有用户 rabbitmqctl add_user sheng 123456 ---增加用户 rabbitmqctl set_permissions -p / sheng ".*" ".*" ".*" ---给用户设置vhost权限 rabbitmqctl set_user_tags sheng administrator ---给用户设置管理员权限(可登录web界面) rabbitmq-plugins enable rabbitmq_management ---开启web界面管理插件,web管理默认端口为15672,默认的用户名和密码为guest / guest rabbitmq-plugins disable rabbitmq_management ---禁用web界面管理插件
|
允许外网访问:
编辑配置文件 => /etc/rabbitmq/rabbitmq-env.conf
解注并修改为 => NODE_IP_ADDRESS=0.0.0.0
默认端口为5672,配置完之后可以telnet一下看看端口通不通
参考
https://www.jianshu.com/p/3e76a144bebd
https://blog.csdn.net/shimazhuge/article/details/93372303
https://www.zhihu.com/question/34243607
https://www.jianshu.com/p/67d55a2c3391
https://www.cnblogs.com/brady-wang/p/11168289.html
https://www.cnblogs.com/jun-ma/p/4840869.html
http://www.dongxiaofeng.com/2019/03/28/%E4%BA%8C%E3%80%81rabbitmq-%E6%B7%BB%E5%8A%A0%E6%96%B0%E7%94%A8%E6%88%B7%E5%B9%B6%E8%AE%BE%E7%BD%AE%E8%BF%9C%E7%A8%8B%E8%AE%BF%E9%97%AE/