0%

RabbitMQ笔记

RabbitMQ是一个消息中间件,负责接收和转发消息,本篇主要梳理了交换机的direct、topic、fanout三种最常用的模式。

场景

突然很多人需要通过系统发送邮件,但是系统每秒最多处理100封邮件,那么如果突然1000封过来,系统肯定是承受不住的,直接就打挂了。这时候就可以把邮件请求放到队列里面,服务器根据自己处理能力进行消费,这样就可以平稳度过。
当前系统中出现了一些事件,但是有其他服务需要处理这个事件,或者是当前系统不必实时处理这个事件,那么就可以把这个事件存入消息队列,然后由其他服务取出并做出行为。
我之前公司就是有个产品,创建会议的时候要和很多第三方通信,链路比较长,速度就很慢,然后就改成了消息队列异步处理,首先会议创建成功后直接反馈给用户,然后剩下的消耗时间的地方慢慢处理。
而且,用消息队列也能避免出现问题后的排查困难度,比如创建会议后最终在zoom系统中会议创建失败,那么只需要排查消费端代码。会议完全创建成功后,没有给关注的用户发通知,也只需要排查消费端代码。解耦,并且排查问题不涉及整个流程,大家都轻松。

RabbitMQ

消息就是字符串或形式的数据,队列是一种链表,消息队列就是存放消息的链表。
把消息放入队列的一方叫做生产者,从消息队列中取出消息的一方叫做消费者。

RabbitMQ消息模型的理念是:发布者不会直接发送任何消息给队列,发布者只需要把消息发给交换机,交换机从发布方接收消息然后推送给消息队列。
发布者 >> 交换机 >> 队列 >> 消费者

消息队列操作步骤:

  1. 建立和mq的连接。
  2. 创建一个通道。
  3. 设置交换机。
  4. 交换机绑定队列。
  5. 生成消息。
  6. 推送消息到交换机。

其中,由交换机,队列,路由关键字这三个元素,决定一个消息最终到哪个队列中。

交换机类型有:直连交换机(direct)、主题交换机(topic)、(头交换机)headers和扇形交换机(fanout)。

直连交换机(direct)

把消息路由到bind key与routing key匹配的queue中,如下,routingKey、queueName、exchangeName是一个绑定,发送消息时指定了routingKey,那么这个消息就会被路由到queueName队列中。

1
2
3
$channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']);
...
$channel->basic_publish($msg, $conf['exchangeName'], $conf['routingKey']);

push

语言PHP
框架Laravel
RabbitMQ库:https://packagist.org/packages/php-amqplib/php-amqplib (v2.11.3)

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
'exchangeName' => 'ex', //交换机名
'queueName' => 'list_a', //队列名称
'routingKey' => '', //路由关键字(也可以省略)
];
// 建立和mq的连接
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
],
$options = []);
$channel = $connection->channel(); //在已连接基础上建立生产者与mq之间的通道

$channel->exchange_declare($conf['exchangeName'], 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($conf['queueName'], false, true, false, false); //声明初始化一条队列
$channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']); //将队列与某个交换机进行绑定,并使用路由关键字

$msgBody = 'message';
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$r = $channel->basic_publish($msg, $conf['exchangeName'], $conf['routingKey']); //推送消息到交换机
$channel->close();
$connection->close();

pull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use PhpAmqpLib\Connection\AMQPStreamConnection;

$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
'exchangeName' => 'ex', //交换机名
'queueName' => 'list_a', //队列名称
'routingKey' => '', //路由关键字(也可以省略)
];

// 建立和mq的连接
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
],
$options = []);
$channel = $connection->channel(); //在已连接基础上建立生产者与mq之间的通道

$channel->exchange_declare($conf['exchangeName'], 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($conf['queueName'], false, true, false, false); //声明初始化一条队列
$channel->queue_bind($conf['queueName'], $conf['exchangeName'], $conf['routingKey']); //将队列与某个交换机进行绑定,并使用路由关键字

$callback = function ($msg) {
echo $msg->body."\n";
};
$channel->basic_consume($conf['queueName'], '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();

主题交换机(topic)

可根据通配符选择性接收消息,例子:

push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
];
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
]);
$channel = $connection->channel();

// 设置交换机为topic_logs,类型为topic
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$data = [
[
'message' => 'error message',
'routing_key' => 'log.error',
],
[
'message' => 'info message',
'routing_key' => 'log.info',
],
];

foreach ($data as $key => $value) {
// 指定信息、交换机、路由关键字
$msg = new AMQPMessage($value['message']);
$channel->basic_publish($msg, 'topic_logs', $value['routing_key']);
}

$channel->close();
$connection->close();

pull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
];
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
]);
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false); // 设置交换机为topic_logs,类型为topic
list($queue_name) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, 'topic_logs', '#.error'); // 仅接路由关键字为logs.error的消息
// $channel->queue_bind($queue_name, 'topic_logs', '#.#'); // 不限路由关键字,全接收

$callback = function ($msg) {
echo $msg->body."\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();

扇形交换机(fanout)

也被称为订阅模式、广播模式。
不需要路由关键字,交换机会将消息发给每个绑定的队列中。
生产者往fanout模式的交换机里发消息,然后消费者们可以把队列和fanout模式的这个交换机绑定,来接收消息。
就像电脑分屏,水池接水管。

push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
];
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
]);
$channel = $connection->channel();

// 交换机名:notice
// 交换机模式:fanout
$channel->exchange_declare('notice', 'fanout', false, true, false);

$data = [
[
'message' => 'fanout消息',
],
[
'message' => 'fanout消息2',
],
];

foreach ($data as $key => $value) {
// 指定信息、交换机、路由关键字
$msg = new AMQPMessage($value['message']);
$channel->basic_publish($msg, 'notice');
}

$channel->close();
$connection->close();

pull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$conf = [
'connection' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'sheng',
'password' => '123456',
'vhost' => '/',
],
];
$connection = AMQPStreamConnection::create_connection([
$conf['connection'],
]);
$channel = $connection->channel();

// 交换机名:notice
// 交换机模式:fanout
$channel->exchange_declare('notice', 'fanout', false, true, false);
$channel->queue_declare('queue_notice_1', false, true, false, false); // 声明初始化一条队列
$channel->queue_bind('queue_notice_1', 'notice'); // 绑定队列到交换机

$callback = function ($msg) {
echo $msg->body."\n";
};

$channel->basic_consume('queue_notice_1', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
systemctl restart rabbitmq-server.service ---重启服务
rabbitmqctl start_app ---启动
rabbitmqctl stop_app ---停止
rabbitmqctl status ---查看当前状态
rabbitmq-plugins list ---查看插件启用状态
rabbitmqctl list_connections ---用于查看当前的连接
rabbitmqctl list_queues ---会列出所有队列名称,后边可能还会带着这个队列当前消息数
rabbitmqctl list_bindings ---列出所有现存的绑定
rabbitmqctl list_users ---列出所有用户
rabbitmqctl add_user sheng 123456 ---增加用户
rabbitmqctl set_permissions -p / sheng ".*" ".*" ".*" ---给用户设置vhost权限
rabbitmqctl set_user_tags sheng administrator ---给用户设置管理员权限(可登录web界面)
rabbitmq-plugins enable rabbitmq_management ---开启web界面管理插件,web管理默认端口为15672,默认的用户名和密码为guest / guest
rabbitmq-plugins disable rabbitmq_management ---禁用web界面管理插件

允许外网访问:
编辑配置文件 => /etc/rabbitmq/rabbitmq-env.conf
解注并修改为 => NODE_IP_ADDRESS=0.0.0.0
默认端口为5672,配置完之后可以telnet一下看看端口通不通

参考

https://www.jianshu.com/p/3e76a144bebd
https://blog.csdn.net/shimazhuge/article/details/93372303
https://www.zhihu.com/question/34243607
https://www.jianshu.com/p/67d55a2c3391
https://www.cnblogs.com/brady-wang/p/11168289.html
https://www.cnblogs.com/jun-ma/p/4840869.html
http://www.dongxiaofeng.com/2019/03/28/%E4%BA%8C%E3%80%81rabbitmq-%E6%B7%BB%E5%8A%A0%E6%96%B0%E7%94%A8%E6%88%B7%E5%B9%B6%E8%AE%BE%E7%BD%AE%E8%BF%9C%E7%A8%8B%E8%AE%BF%E9%97%AE/

欢迎关注我的其它发布渠道