笔记来自:B站编程不良人,稍作修改、补充
1.MQ引言.
1.1 什么是MQ.
MQ
(Message Quene) : 翻译为 消息队列
,通过典型的 生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ有哪些.
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ
、RabbitMQ
,炙手可热的Kafka
,阿里巴巴自主开发RocketMQ
等。
1.3 不同MQ特点.
# 1.ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
JMS:Java消息服务(Java Message Service)应用程序接口
# 2.Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
适合产生大量数据的互联网服务的数据收集业务。
# 3.RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
# 4.RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
AMQP:高级消息队列协议(Advanced Message Queuing Protocol) -- 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
- 数据可靠性==> RabbitMQ
- 数据高吞吐==> Kafka
2.RabbitMQ 的引言.
2.1 RabbitMQ.
基于
AMQP
协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
官方教程
: https://www.rabbitmq.com/#getstarted
# AMQP 协议
`AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种`binary wire-level protocol`(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
2.2 RabbitMQ 的安装.
2.2.1 下载.
官网下载地址
: https://www.rabbitmq.com/download.html
下载最新的RabbitMQ,在安装之前还需下载一些依赖包 :
- erlang [https://github.com/rabbitmq/erlang-rpm/releases] [RabbitMQ3.8.9对应版本erlang最少22.3]
- socat [https://pkgs.org/download/socat]
- logrotate [https://github.com/logrotate/logrotate/releases/]
注意:版本对应.
2.2.2 下载的安装包.
- erlang-23.1.5-1.el7.x86_64.rpm
- socat-1.7.3.2-2.el7.x86_64.rpm
- rabbitmq-server-3.8.9-1.el7.noarch.rpm
2.2.3 安装步骤.
# 创建存放安装包暂存目录
mkdir /app
cd /app
# 1.将安装包上传到 /app 上
## 可使用xftp
# 2.安装
rpm -ivh erlang-23.1.5-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.9-1.el7.noarch.rpm
# 3.复制配置文件
## cp /usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
### 3.8.9在该目录下没有这个文件,所以去github上拷贝了一份,复制到rabbitmq.conf,文件内容在结尾
### https://github.com/rabbitmq/rabbitmq-server/tree/v3.8.x/deps/rabbit/docs
vim /etc/rabbitmq/rabbitmq.conf
### 编辑rabbitmq.conf 修改81行,取消注释
loopback_users.guest= false
# 4.配置开机自启
chkconfig rabbitmq-server on
# 5.开启rabbitmq
systemctl start rabbitmq-server
# 查看状态
systemctl status rabbitmq-server
# 6.执行如下命令,启动rabbitmq中的插件管理
## 192.168.111.11:15672访问图形化管理界面
rabbitmq-plugins enable rabbitmq_management
# 7.如果访问不到:需要关闭防火墙
systemctl stop firewalld
## 关闭开机自启
systemctl disable firewalld
# 8.访问图形化界面
192.168.111.11:15672
2.2.4登录.
username:guest
password:guest
但是登录失败:
User can only log in via localhost
原因:
rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问
# 解决方法
vim /etc/rabbitmq/rabbitmq.conf
### 编辑rabbitmq.conf 修改81行,取消注释
loopback_users.guest= false
#### 以前的配置文件rabbitmq.config
#
# {loopback_users, [<<”guest”>>]} 改为 {loopback_users, []},
#
####
### 然后重启服务
systemctl restart rabbitmq-server
### 再次访问,并登录
192.168.111.11:15672
username: guest
password: guest
2.2.5 权限配置命令,用户标签.
用户权限命令
rabbitmqctl list_users // 查看当前所有用户
rabbitmqctl list_user_permissions guest //查看默认guest用户的权限
rabbitmqctl delete_user guest // 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
rabbitmqctl add_user username password //添加新用户
rabbitmqctl set_user_tags username administrator // 设置用户tag
rabbitmqctl set_permissions -p / username ".*" ".*" ".*" //赋予用户默认vhost的全部操作权限
rabbitmqctl list_user_permissions username // 查看用户的权限
# 其他命令
https://www.rabbitmq.com/rabbitmqctl.8.html
用户标签:
这些标签,由上到下,上面的权限包括下面的,因此,选用标签只需一个即可
administrator
:用户可以执行监视所能做的一切,管理用户、vhost和权限,关闭其他用户的连接,以及管理所有vhost的策略和参数。- 创建和删除虚拟主机
- 查看、创建和删除用户
- 查看、创建和删除权限
- 关闭其他用户的连接
monitoring
:用户可以访问管理插件并查看所有连接和通道以及节点相关信息- 列出所有虚拟主机,包括无法使用消息传递协议访问的虚拟主机
- 查看节点级数据,如内存使用和集群
- 查看所有虚拟主机的真实全局统计信息
policymaker
:用户可以访问管理插件并管理他们有权访问的vhost的策略和参数。- 查看、创建和删除可通过AMQP登录的虚拟主机的策略和参数
management
:用户可以访问管理插件- 列出他们可以通过AMQP登录的虚拟主机
- 查看其虚拟主机中的所有队列、交换和绑定
- 查看并关闭自己的频道和连接
- 查看涵盖其所有虚拟主机的“全局”统计信息,包括其中其他用户的活动
impersonator
(不了解)none
:无法访问管理插件
3. RabiitMQ 配置.
3.1RabbitMQ 管理命令行.
# 1.服务启动相关
systemctl start|restart|stop|status rabbitmq-server
# 2.管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ
rabbitmqctl help 可以查看更多命令
# 3.插件管理命令行
rabbitmq-plugins enable|list|disable
3.2 web管理界面介绍.
3.2.1 overview概览.
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
3.2.2 Admin用户和虚拟主机管理.
1. 添加用户.
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
2. 创建虚拟主机.
# 虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
3. 绑定虚拟主机和用户.
创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
进入虚拟机设置界面:
4.RabbitMQ 的第一个程序.
4.0 AMQP协议的回顾.
4.1 RabbitMQ支持的消息模型.
4.2 创建一个普通maven项目.
过程略
编写RabbitMQ工具类.
public class RabbitUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.111.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/java");
connectionFactory.setUsername("liuyou");
connectionFactory.setPassword("lmk");
}
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChannel(Connection connection, Channel channel){
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
4.3 引入依赖.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
4.3 第一种模型(直连).
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1.开发生产者.
String queue = "hello";
String message = "hello,rabbitmq";
// 1、连接
Connection connection = RabbitMQUtil.getConnection();
// 2、通道
Channel channel = connection.createChannel();
// 3、队列
// 参数1:queue 队列名
// 参数2:durable 是否持久化队列
// 参数3:exclusive 是否独占队列
// 参数4:autoDelete 是否删除队列(在服务器不使用时)
// 参数5:arguments 其他属性
channel.queueDeclare(queue, true, false, false, null);
// 4、消息发送
// 参数1:exchange 交换名字,这里不使用("")
// 参数2:routingKey 路由key --> 这里使用 队列名(因为没有交换机)
// 参数3:props 额外配置
// 参数4:body 要发送的消息
for (int i = 0; i < 3; i++) {
channel.basicPublish("",queue,null, message.getBytes());
}
// 5、关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
String queue = "hello";
// 1.连接
Connection connection = RabbitMQUtil.getConnection();
// 2.通道
Channel channel = connection.createChannel();
// 3.队列 (避免队列不存在)
// 参数1:队列名 参数2:是否持久化 参数3:是否独占 参数4:是否自动删除 参数5:其他属性
channel.queueDeclare(queue,true,false,false,null);
// 4.消费信息
// 参数1:队列名 参数2:autoAck是否自动确认 参数3:consumer实现对象(包含消息处理细节)
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
autoAck
:确认消息(队列消息会被清除),如果为true
,自动确认(不管信息是否消费,取走的消息可能未被消费);false
,手动确认,一般待数据处理后执行channel.basicAck(envelope.getDeliveryTag(),false);
手动确认,如果未手动确认,这些消息会保留在队列里
autodelete
队列自动删除的条件,有消费者订阅本队列,然后所有消费者都解除订阅此队列,autoDelete=true时,此队列会自动删除,即使此队列中还有消息。
3.测试.
①运行Producer
192.168.111.11:15672
查看队列信息
队列hello中,有3个刚生成的消息
②运行Consumer
测试结论.
- 消费者取出消息,就马上进行了确认(
autoAck=true
),队列信息就清空- 存在问题:可能这些取走的消息没有消费完(消费者宕机),造成消息浪费
- 解决办法:
autoAck=false
,采用手动确认
4.4 第二种模型(work quene).
Work queues
,也被称为(Task queues
),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
情况一:两消费者处理能力均衡.
角色:
- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务
- C2:消费者-2:领取任务并且完成任务
1. 开发生产者.
P
.
String queue = "workqueue";
String message = "测试workqueue,一生成者,一队列,多个消费者";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列
channel.queueDeclare(queue, true, false, true, null);
//4.发送消息
for (int i = 0; i < 20; i++) {
channel.basicPublish("", queue,null,(message + " 消息-" + i).getBytes());
}
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1
.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1 == " + new String(body, "utf-8"));
System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
System.out.println();
channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
}
});
C2
.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2 == " + new String(body, "utf-8"));
System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
System.out.println();
channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
}
});
3.测试.
①运行C1
②运行C2
③运行P
Consumer1
消费全是偶数位的消息
Consumer2
消费全是奇数位的消息
测试结论.
- 两个消费者,平均交替消费队列里的消息(提前分配,不管谁处理快都是平均的)
- 存在问题:如果两消费者,消费能力不均?
- 能否使用能者多劳(不平均分配,按需分配或按处理能力分配)
- 造成原理:
- 默认情况下,
RabbitMQ
将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环
- 默认情况下,
- 解决办法:
- 由于会提前分配好全部消息给不同消费者,我们需要① 设置每个通道一次只能获取一个消息(每个消费者一次只能获取一个,获取能力强的,获取的消息就多)②关闭自动确认,使用手动确认
- 存在问题:如果两消费者,消费能力不均?
情况二:两消费者处理能力不均.
实现效果:能者多劳
角色:
- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务,处理快
- C2:消费者-2:领取任务并且完成任务,处理慢
P
我们不做修改
C
中添加 channel.basicQos(1);
C1
.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
channel.basicQos(1); //一次只接受一条未确认的消息
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000); // 处理能力快 1秒处理一个
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1 == " + new String(body, "utf-8"));
System.out.println();
channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
}
});
C2
.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
channel.basicQos(1); //一次只接受一条未确认的消息
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(5000); // 处理能力慢 5秒处理一个
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2 == " + new String(body, "utf-8"));
System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
System.out.println();
channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
}
});
测试(同情况一).
测试结果
C1
处理快,处理了16个
C2
处理慢,只处理了4个
4.5 第三种模型(fanout).
fanout 扇出 也称为广播
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1.开发消费者.
String exchange = "fanout";
String message = "测试fanout 广播: 1个消息--> 发给绑定交换机fanout的消费者";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
// 参数1:交换机名字 参数2:交换机类型 参数3:是否持久化 参数4:是否自动删除 参数5:其他
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.发送消息
// 参数1:exchange 交换机名
// 参数2:routingKey 路由key(广播类型永不上)
// 参数3:其他
// 参数4:要发送的消息
channel.basicPublish(exchange,"",null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发生产者.
C1
.
String exchange = "fanout";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 (避免交换机不存在--需保证与生产者端条件一致)
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.临时队列(临时使用,用完自动删除)
String queue = channel.queueDeclare().getQueue();
//5.绑定队列
// 参数1:队列名 参数2:交换机名 参数3:路由key 参数4:其他
channel.queueBind(queue, exchange, "", null);
//6.接收消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 接收fanout消息 :" + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
C2
.
String exchange = "fanout";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 (避免交换机不存在--需保证与生产者端条件一致)
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.临时队列(临时使用,用完自动删除)
String queue = channel.queueDeclare().getQueue();
//5.绑定队列
// 参数1:队列名 参数2:交换机名 参数3:路由key 参数4:其他
channel.queueBind(queue, exchange, "", null);
//6.接收消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 接收fanout消息 :" + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
3.测试.
①运行所有消费者.
②运行生产者.
测试结果.
)
4.6 第四种模型(Routing).
4.6.1 Routing 之订阅模型-Direct(直连).
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
图解:
P
:生产者,向Exchange
发送消息,发送消息时,会指定一个routing key。X
:Exchange
(交换机),接收生产者的消息,然后把消息递交给 与routing key
完全匹配的队列C1
:消费者,其所在队列指定了需要routing key
为 error 的消息C2
:消费者,其所在队列指定了需要routing key
为 info、error、warning 的消息
1.开发生产者.
String exchange = "direct";
String routingKey = "info";
String message = "测试 routing之direct:在fanout基础上,加上routingkey,消息绑定该key,只有绑定该key的消费者才能获取该数据";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 direct 直连
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.发送消息,绑定routingKey,只有绑定该key的消费者才能获取消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1
.
String exchange = "direct";
String routingKey = "info";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 info
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 == " + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
C2
.
String exchange = "direct";
String routingKey = "debug";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 debug
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 == " + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
C3
.
String exchange = "direct";
String routingKey1 = "error";
String routingKey2 = "info";
String routingKey3 = "debug";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 info,debug,error
channel.queueBind(queue, exchange, routingKey1);
channel.queueBind(queue, exchange, routingKey2);
channel.queueBind(queue, exchange, routingKey3);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者3 == " + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
3.测试.
①运行所有消费者.
C1
(info
)C2
(debug
)C3
(info、debug、error
)
②运行生产者.
- 消息(
info
)
③运行结果.
C1
C2
C3
4.6.2 Routing 之订阅模型-Topic.
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!这种模型Routingkey
一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 通配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配零个或多个词
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
1.开发生产者.
String exchange = "topic";
String message = "测试 routing之topic,在direct基础上,消费者可以使用 通配符匹配routingKey";
String routingKey = "liu";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.发送消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1
采用 * ,只匹配一个单词.
String exchange = "topic";
String routingKey = "*.liu.*";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 routingkey 采用 * ,只匹配一个单词
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 + 采用*匹配 = " + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);// 手动确认
}
});
C2
采用 # ,可匹配零个或多个单词.
String exchange = "topic";
String routingKey = "rabbit.#";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 routingkey 采用 # ,可匹配零个或多个单词
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 + 采用#匹配 = " + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);// 手动确认
}
});
3.测试.
①运行所有消费者.
C1
(*.liu.*
)C2
(rabbit.#
)
②运行生产者.
P
(rabbit.liu.you
)
③运行结果.
4.7 第五种模型 (RPC).
在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。
但是如果我们需要在远程计算机上运行一个函数并等待结果呢?好吧,那是另一回事了。这种模式通常称为远程过程调用或RPC。在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci数的虚拟RPC服务。
图解:
C
:客户端,使用远程调用的那一方S
:服务端,远程调用服务提供方Request
:请求(携带correlation_id
和用于回调临时队列名
)correlation_id
:该id是确认标识,它会跟着回调响应传回客户端,客户端根据回调相关id和本地id进行对比,相等:取出
Reply
:回调(携带correlation_id
和远程方法返回值
)
0.搭建环境.
①创建两个模块:RPCClient
和 RPCServer
.
②依赖引入.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
③两模块都编写相同的RabbitMQUtil
工具类.
该工具类 采用 虚拟主机为
/rpc
public class RabbitMQUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.111.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/rpc");
connectionFactory.setUsername("liuyou");
connectionFactory.setPassword("lmk");
}
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChannel(Connection connection, Channel channel){
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
1.开发服务端.
public class RPCServer_f {
public static void main(String[] argv) throws Exception {
// 运行rpc server
run();
}
public static void run() throws IOException {
// 请求队列名
String requestQueue = "rpc_queue_req";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
channel.basicQos(1);
//3.队列
channel.queueDeclare(requestQueue, false, false, false, null);
//4.清空队列
channel.queuePurge(requestQueue);
//5.处理调用请求(consume)
Object monitor = new Object();// 锁
DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 处理内容
//5.1 构建回调配置
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId()) // 使用
.build();
System.out.println("client相关id:" + delivery.getProperties().getCorrelationId());
System.out.println("client临时队列名:" + delivery.getProperties().getReplyTo());
//5.2 获取message
String message = new String(delivery.getBody(), "UTF-8");
//5.3 调用call() =======================
String call = RPCServer_f.call(message);
// =====================================
//5.4 将调用处理结果 通过制定队列传回
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, call.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) { // 防止多个consumer同时消费
monitor.notify();
}
};
channel.basicConsume(requestQueue, false, deliverCallback, consumerTag -> {});
}
/**
* 被远程调用的方法 ===================
*/
public static String call(String message){
return "RPCServer的call()被远程调用,显示内容:" + message;
}
}
2.开发客户端.
public class RPCClient_f {
public static void main(String[] argv) throws IOException, InterruptedException {
String message = "rabbit_java";
// 调用
Object call = call(message);
System.out.println(call);
}
/**
* 使用该方法调用 服务端的call()方法 ========================
*/
public static String call(String message) throws IOException, InterruptedException {
// 请求队列名
String requestQueue = "rpc_queue_req";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.回调队列名(临时队列名)
String replyQueueName = channel.queueDeclare().getQueue();
//4.关联id correlationId
String correlationId = UUID.randomUUID().toString();
//5.构建配置信息
AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
.correlationId(correlationId)
.replyTo(replyQueueName)
.build();
//6.发送调用请求
channel.basicPublish("", requestQueue, properties, message.getBytes());
//7.创建阻塞队列:解决多线程中数据的安全传输问题
BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//8.接收回调的数据
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
// 数据入队
response.offer(new String(delivery.getBody(), "UTF-8"));
}
System.out.println("相关ID :" + delivery.getProperties().getCorrelationId());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 手动确认
};
// 消费(接收服务端传回的数据,并处理)
channel.basicConsume(replyQueueName, false, deliverCallback, consumerTag -> {});
//9.从队列中获取结果
String take = response.take();
//10.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
return take;
}
}
3.测试.
①运行服务端.
②运行客户端.
③运行结果.
5. SpringBoot中使用RabbitMQ.
5.0 搭建初始环境.
1.创建springboot项目.
过程略
2.引入依赖.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.修改配置文件.
server.port=8080
spring.application.name=springboot-application
server.servlet.context-path=/
# rabbitmq
spring.rabbitmq.host=192.168.111.11
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/springboot
spring.rabbitmq.username=liuyou
spring.rabbitmq.password=lmk
spring.rabbitmq.listener.simple.prefetch=1
4.Simple.
生产者.
private RabbitTemplate rabbitTemplate;
@Test
public void testSimple(){
rabbitTemplate.convertAndSend("_hello","hello,rabbitmq");
}
消费者.
@Component
@RabbitListener(queuesToDeclare = @Queue("_hello"))
public class HelloConsumer {
@RabbitHandler
public void consume(String message){
System.out.println("Consumer 接收到:" + message);
}
}
5.Work.
生产者.
@Test
public void testWork() throws InterruptedException {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("_work","work-message" + i);
}
Thread.sleep(30000);
}
消费者.
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("_work"), ackMode = "MANUAL")
public void comsume1(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(1000);
System.out.println("消费者1 : " + new String(message.getBody()));
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
channel.basicNack(deliveryTag, false, false);
}
}
@RabbitListener(queuesToDeclare = @Queue("_work"), ackMode = "MANUAL")
public void comsume2(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(3000);
System.out.println("消费者2 : " + new String(message.getBody()));
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
channel.basicNack(deliveryTag, false, false);
}
}
}
6.Fanout.
生产者.
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("_fanout","","广播模型的测试");
}
消费者.
@Component
public class FanoutConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(type = "fanout", name = "_fanout")
))
public void consume1(String message){
System.out.println("C1 : " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(type = "fanout", name = "_fanout")
))
public void consume2(String message){
System.out.println("C2 : " + message);
}
}
7.Routing-Direct.
生产者.
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("_direct","info","direct发送info类型的消息");
}
消费者.
@Component
public class DirectConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "_direct",type = "direct"),
key = {"info"}
))
public void consume1(String message) {
System.out.println("C1 info : " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "_direct",type = "direct"),
key = {"debug"}
))
public void consume2(String message) {
System.out.println("C2 debug : " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "_direct",type = "direct"),
key = {"info","debug"}
))
public void consume3(String message) {
System.out.println("C3 info,debug : " + message);
}
}
8.Routing-Topic.
生产者.
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("_topic","rabbit","这里只测试#的使用");
}
消费者.
@Component
public class TopicConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "_topic",type = "topic"),
key = {"rabbit.#"}
))
public void consume1(String message) {
System.out.println("C1 rabbit.# : " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "_topic",type = "topic"),
key = {"rabbit.*"}
))
public void consume2(String message) {
System.out.println("C2 rabbit.* : " + message);
}
}
6. MQ的应用场景.
6.1 异步处理.
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式
串行方式:
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.并行方式:
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。消息队列:
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.消息队列
: 引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
6.2 应用解耦.
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
当库存系统出现故障时,订单就会失败。 订单系统和库存系统高耦合. 引入消息队列
订单系统:
用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:
订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.
6.3 流量削峰.
场景:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.
7. RabbitMQ的集群.
7.1 集群架构.
7.1.1 普通集群(副本集群).
默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问
1.架构图.
核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份
2.集群搭建.
# 0.集群规划
node1: 192.168.111.11 mq1 master 主节点
node2: 192.168.111.12 mq2 repl1 副本节点
node3: 192.168.111.13 mq3 repl2 副本节点
# 1.克隆三台机器
## 1.1 更改主机名
主机mq2:hostnamectl set-hostname mq2
主机mq3:hostnamectl set-hostname mq3
## 1.2 更改三台主机ip
只需更改 mq2、mq3
vim /etc/sysconfig/network-scripts/ifcfg-ens33
内容修改:
主机mq2 IPADDR=192.168.111.12
主机mq3 IPADDR=192.168.111.13
## 修改后重启network服务
systemctl restart network
## 1.3 添加主机映射
①主机mq1:vim /etc/hosts 加入:
192.168.111.11 mq1
192.168.111.12 mq2
192.168.111.13 mq3
保存退出
②主机mq1: scp /etc/hosts root@mq2:/etc/hosts
scp /etc/hosts root@mq3:/etc/hosts
# 2.三个机器安装rabbitmq,并同步cookie文件,在mq1上执行:
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
## 必须保证三台机器 .erlang.cookie 内容一致
# 3.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面:
rabbitmq-server -detached
# 4.在mq2和mq3执行加入集群命令:
1.关闭 rabbitmqctl stop_app
2.加入集群 rabbitmqctl join_cluster rabbit@mq1
3.启动服务 rabbitmqctl start_app
# 5.查看集群状态,任意节点执行:
rabbitmqctl cluster_status
# 6.登录管理界面,展示如下状态:
3.测试.
普通集群,主机负责对交换机、队列等创建,消息进入队列,备机只是备份交换机、队列(不含队列里的消息),并不能做到故障转移
①在主机创建队列,备机查看.
②主机宕机,查看备机.
# mq1
rabbitmqctl stop_app
7.1.2 镜像集群.
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。
队列数据可备份,但是还是无法实现故障转移
1.架构图.
2.集群配置.
(在普通集群基础上)
# 0.策略说明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
# 1.查看当前策略
rabbitmqctl list_policies
# 2.添加策略
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
说明:策略正则表达式为 “^” 表示所有匹配所有队列名称 ^hello:匹配hello开头队列
# 3.删除策略
rabbitmqctl clear_policy ha-all
# 4.测试集群
配置文件.
rabbitmq.conf.example.
# ======================================
# RabbitMQ broker section
# ======================================
## Related doc guide: https://rabbitmq.com/configure.html. See
## https://rabbitmq.com/documentation.html for documentation ToC.
## Networking
## ====================
##
## Related doc guide: https://rabbitmq.com/networking.html.
##
## By default, RabbitMQ will listen on all interfaces, using
## the standard (reserved) AMQP 0-9-1 and 1.0 port.
##
# listeners.tcp.default = 5672
## To listen on a specific interface, provide an IP address with port.
## For example, to listen only on localhost for both IPv4 and IPv6:
##
# IPv4
# listeners.tcp.local = 127.0.0.1:5672
# IPv6
# listeners.tcp.local_v6 = ::1:5672
## You can define multiple listeners using listener names
# listeners.tcp.other_port = 5673
# listeners.tcp.other_ip = 10.10.10.10:5672
## TLS listeners are configured in the same fashion as TCP listeners,
## including the option to control the choice of interface.
##
# listeners.ssl.default = 5671
## It is possible to disable regular TCP (non-TLS) listeners. Clients
## not configured to use TLS and the correct TLS-enabled port won't be able
## to connect to this node.
# listeners.tcp = none
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# num_acceptors.tcp = 10
# num_acceptors.ssl = 10
## Socket writer will force GC every so many bytes transferred.
## Default is 1 GiB (`1000000000`). Set to 'off' to disable.
##
# socket_writer.gc_threshold = 1000000000
#
## To disable:
# socket_writer.gc_threshold = off
## Maximum amount of time allowed for the AMQP 0-9-1 and AMQP 1.0 handshake
## (performed after socket connection and TLS handshake) to complete, in milliseconds.
##
# handshake_timeout = 10000
## Set to 'true' to perform reverse DNS lookups when accepting a
## connection. rabbitmqctl and management UI will then display hostnames
## instead of IP addresses. Default value is `false`.
##
# reverse_dns_lookups = false
##
## Security, Access Control
## ==============
##
## Related doc guide: https://rabbitmq.com/access-control.html.
## The default "guest" user is only permitted to access the server
## via a loopback interface (e.g. localhost).
## {loopback_users, [<<"guest">>]},
##
# loopback_users.guest = true
## Uncomment the following line if you want to allow access to the
## guest user from anywhere on the network.
# loopback_users.guest = false
## TLS configuration.
##
## Related doc guide: https://rabbitmq.com/ssl.html.
##
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = false
# ssl_options.cacertfile = /path/to/cacert.pem
# ssl_options.certfile = /path/to/cert.pem
# ssl_options.keyfile = /path/to/key.pem
#
# ssl_options.honor_cipher_order = true
# ssl_options.honor_ecc_order = true
# ssl_options.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
# ssl_options.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
# ssl_options.ciphers.4 = ECDHE-RSA-AES256-SHA384
# ssl_options.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
# ssl_options.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.7 = ECDH-ECDSA-AES256-SHA384
# ssl_options.ciphers.8 = ECDH-RSA-AES256-SHA384
# ssl_options.ciphers.9 = DHE-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.10 = DHE-DSS-AES256-GCM-SHA384
# ssl_options.ciphers.11 = DHE-RSA-AES256-SHA256
# ssl_options.ciphers.12 = DHE-DSS-AES256-SHA256
# ssl_options.ciphers.13 = ECDHE-ECDSA-AES128-GCM-SHA256
# ssl_options.ciphers.14 = ECDHE-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.15 = ECDHE-ECDSA-AES128-SHA256
# ssl_options.ciphers.16 = ECDHE-RSA-AES128-SHA256
# ssl_options.ciphers.17 = ECDH-ECDSA-AES128-GCM-SHA256
# ssl_options.ciphers.18 = ECDH-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.19 = ECDH-ECDSA-AES128-SHA256
# ssl_options.ciphers.20 = ECDH-RSA-AES128-SHA256
# ssl_options.ciphers.21 = DHE-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.22 = DHE-DSS-AES128-GCM-SHA256
# ssl_options.ciphers.23 = DHE-RSA-AES128-SHA256
# ssl_options.ciphers.24 = DHE-DSS-AES128-SHA256
# ssl_options.ciphers.25 = ECDHE-ECDSA-AES256-SHA
# ssl_options.ciphers.26 = ECDHE-RSA-AES256-SHA
# ssl_options.ciphers.27 = DHE-RSA-AES256-SHA
# ssl_options.ciphers.28 = DHE-DSS-AES256-SHA
# ssl_options.ciphers.29 = ECDH-ECDSA-AES256-SHA
# ssl_options.ciphers.30 = ECDH-RSA-AES256-SHA
# ssl_options.ciphers.31 = ECDHE-ECDSA-AES128-SHA
# ssl_options.ciphers.32 = ECDHE-RSA-AES128-SHA
# ssl_options.ciphers.33 = DHE-RSA-AES128-SHA
# ssl_options.ciphers.34 = DHE-DSS-AES128-SHA
# ssl_options.ciphers.35 = ECDH-ECDSA-AES128-SHA
# ssl_options.ciphers.36 = ECDH-RSA-AES128-SHA
## Select an authentication/authorisation backend to use.
##
## Alternative backends are provided by plugins, such as rabbitmq-auth-backend-ldap.
##
## NB: These settings require certain plugins to be enabled.
##
## Related doc guides:
##
## * https://rabbitmq.com/plugins.html
## * https://rabbitmq.com/access-control.html
##
# auth_backends.1 = rabbit_auth_backend_internal
## uses separate backends for authentication and authorisation,
## see below.
# auth_backends.1.authn = rabbit_auth_backend_ldap
# auth_backends.1.authz = rabbit_auth_backend_internal
## The rabbitmq_auth_backend_ldap plugin allows the broker to
## perform authentication and authorisation by deferring to an
## external LDAP server.
##
## Relevant doc guides:
##
## * https://rabbitmq.com/ldap.html
## * https://rabbitmq.com/access-control.html
##
## uses LDAP for both authentication and authorisation
# auth_backends.1 = rabbit_auth_backend_ldap
## uses HTTP service for both authentication and
## authorisation
# auth_backends.1 = rabbit_auth_backend_http
## uses two backends in a chain: HTTP first, then internal
# auth_backends.1 = rabbit_auth_backend_http
# auth_backends.2 = rabbit_auth_backend_internal
## Authentication
## The built-in mechanisms are 'PLAIN',
## 'AMQPLAIN', and 'EXTERNAL' Additional mechanisms can be added via
## plugins.
##
## Related doc guide: https://rabbitmq.com/authentication.html.
##
# auth_mechanisms.1 = PLAIN
# auth_mechanisms.2 = AMQPLAIN
## The rabbitmq-auth-mechanism-ssl plugin makes it possible to
## authenticate a user based on the client's x509 (TLS) certificate.
## Related doc guide: https://rabbitmq.com/authentication.html.
##
## To use auth-mechanism-ssl, the EXTERNAL mechanism should
## be enabled:
##
# auth_mechanisms.1 = PLAIN
# auth_mechanisms.2 = AMQPLAIN
# auth_mechanisms.3 = EXTERNAL
## To force x509 certificate-based authentication on all clients,
## exclude all other mechanisms (note: this will disable password-based
## authentication even for the management UI!):
##
# auth_mechanisms.1 = EXTERNAL
## This pertains to both the rabbitmq-auth-mechanism-ssl plugin and
## STOMP ssl_cert_login configurations. See the RabbitMQ STOMP plugin
## configuration section later in this file and the README in
## https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further
## details.
##
## To use the TLS cert's CN instead of its DN as the username
##
# ssl_cert_login_from = common_name
## TLS handshake timeout, in milliseconds.
##
# ssl_handshake_timeout = 5000
## Cluster name
##
# cluster_name = dev3.eng.megacorp.local
## Password hashing implementation. Will only affect newly
## created users. To recalculate hash for an existing user
## it's necessary to update her password.
##
## To use SHA-512, set to rabbit_password_hashing_sha512.
##
# password_hashing_module = rabbit_password_hashing_sha256
## When importing definitions exported from versions earlier
## than 3.6.0, it is possible to go back to MD5 (only do this
## as a temporary measure!) by setting this to rabbit_password_hashing_md5.
##
# password_hashing_module = rabbit_password_hashing_md5
##
## Default User / VHost
## ====================
##
## On first start RabbitMQ will create a vhost and a user. These
## config items control what gets created.
## Relevant doc guide: https://rabbitmq.com/access-control.html
##
# default_vhost = /
# default_user = guest
# default_pass = guest
# default_permissions.configure = .*
# default_permissions.read = .*
# default_permissions.write = .*
## Tags for default user
##
## For more details about tags, see the documentation for the
## Management Plugin at https://rabbitmq.com/management.html.
##
# default_user_tags.administrator = true
## Define other tags like this:
# default_user_tags.management = true
# default_user_tags.custom_tag = true
##
## Additional network and protocol related configuration
## =====================================================
##
## Set the default AMQP 0-9-1 heartbeat interval (in seconds).
## Related doc guides:
##
## * https://rabbitmq.com/heartbeats.html
## * https://rabbitmq.com/networking.html
##
# heartbeat = 60
## Set the max permissible size of an AMQP frame (in bytes).
##
# frame_max = 131072
## Set the max frame size the server will accept before connection
## tuning occurs
##
# initial_frame_max = 4096
## Set the max permissible number of channels per connection.
## 0 means "no limit".
##
# channel_max = 128
## Customising TCP Listener (Socket) Configuration.
##
## Related doc guides:
##
## * https://rabbitmq.com/networking.html
## * https://www.erlang.org/doc/man/inet.html#setopts-2
##
# tcp_listen_options.backlog = 128
# tcp_listen_options.nodelay = true
# tcp_listen_options.exit_on_close = false
#
# tcp_listen_options.keepalive = true
# tcp_listen_options.send_timeout = 15000
#
# tcp_listen_options.buffer = 196608
# tcp_listen_options.sndbuf = 196608
# tcp_listen_options.recbuf = 196608
##
## Resource Limits & Flow Control
## ==============================
##
## Related doc guide: https://rabbitmq.com/memory.html.
## Memory-based Flow Control threshold.
##
# vm_memory_high_watermark.relative = 0.4
## Alternatively, we can set a limit (in bytes) of RAM used by the node.
##
# vm_memory_high_watermark.absolute = 1073741824
## Or you can set absolute value using memory units (with RabbitMQ 3.6.0+).
## Absolute watermark will be ignored if relative is defined!
##
# vm_memory_high_watermark.absolute = 2GB
##
## Supported unit symbols:
##
## k, kiB: kibibytes (2^10 - 1,024 bytes)
## M, MiB: mebibytes (2^20 - 1,048,576 bytes)
## G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
## kB: kilobytes (10^3 - 1,000 bytes)
## MB: megabytes (10^6 - 1,000,000 bytes)
## GB: gigabytes (10^9 - 1,000,000,000 bytes)
## Fraction of the high watermark limit at which queues start to
## page message out to disc in order to free up memory.
## For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5,
## paging can begin as early as when 20% of total available RAM is used by the node.
##
## Values greater than 1.0 can be dangerous and should be used carefully.
##
## One alternative to this is to use durable queues and publish messages
## as persistent (delivery mode = 2). With this combination queues will
## move messages to disk much more rapidly.
##
## Another alternative is to configure queues to page all messages (both
## persistent and transient) to disk as quickly
## as possible, see https://rabbitmq.com/lazy-queues.html.
##
# vm_memory_high_watermark_paging_ratio = 0.5
## Selects Erlang VM memory consumption calculation strategy. Can be `allocated`, `rss` or `legacy` (aliased as `erlang`),
## Introduced in 3.6.11. `rss` is the default as of 3.6.12.
## See https://github.com/rabbitmq/rabbitmq-server/issues/1223 and rabbitmq/rabbitmq-common#224 for background.
# vm_memory_calculation_strategy = rss
## Interval (in milliseconds) at which we perform the check of the memory
## levels against the watermarks.
##
# memory_monitor_interval = 2500
## The total memory available can be calculated from the OS resources
## - default option - or provided as a configuration parameter.
# total_memory_available_override_value = 2GB
## Set disk free limit (in bytes). Once free disk space reaches this
## lower bound, a disk alarm will be set - see the documentation
## listed above for more details.
##
## Absolute watermark will be ignored if relative is defined!
# disk_free_limit.absolute = 50000
## Or you can set it using memory units (same as in vm_memory_high_watermark)
## with RabbitMQ 3.6.0+.
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB
## Alternatively, we can set a limit relative to total available RAM.
##
## Values lower than 1.0 can be dangerous and should be used carefully.
# disk_free_limit.relative = 2.0
##
## Clustering
## =====================
##
# cluster_partition_handling = ignore
## pause_if_all_down strategy require additional configuration
# cluster_partition_handling = pause_if_all_down
## Recover strategy. Can be either 'autoheal' or 'ignore'
# cluster_partition_handling.pause_if_all_down.recover = ignore
## Node names to check
# cluster_partition_handling.pause_if_all_down.nodes.1 = rabbit@localhost
# cluster_partition_handling.pause_if_all_down.nodes.2 = hare@localhost
## Mirror sync batch size, in messages. Increasing this will speed
## up syncing but total batch size in bytes must not exceed 2 GiB.
## Available in RabbitMQ 3.6.0 or later.
##
# mirroring_sync_batch_size = 4096
## Make clustering happen *automatically* at startup. Only applied
## to nodes that have just been reset or started for the first time.
##
## Relevant doc guide: https://rabbitmq.com//cluster-formation.html
##
# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
#
# cluster_formation.classic_config.nodes.1 = rabbit1@hostname
# cluster_formation.classic_config.nodes.2 = rabbit2@hostname
# cluster_formation.classic_config.nodes.3 = rabbit3@hostname
# cluster_formation.classic_config.nodes.4 = rabbit4@hostname
## DNS-based peer discovery. This backend will list A records
## of the configured hostname and perform reverse lookups for
## the addresses returned.
# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_dns
# cluster_formation.dns.hostname = discovery.eng.example.local
## This node's type can be configured. If you are not sure
## what node type to use, always use 'disc'.
# cluster_formation.node_type = disc
## Interval (in milliseconds) at which we send keepalive messages
## to other cluster members. Note that this is not the same thing
## as net_ticktime; missed keepalive messages will not cause nodes
## to be considered down.
##
# cluster_keepalive_interval = 10000
##
## Statistics Collection
## =====================
##
## Statistics collection interval (in milliseconds). Increasing
## this will reduce the load on management database.
##
# collect_statistics_interval = 5000
## Fine vs. coarse statistics
#
# This value is no longer meant to be configured directly.
#
# See https://www.rabbitmq.com/management.html#fine-stats.
##
## Ra Settings
## =====================
##
## NB: changing these on a node with existing data directory
## can lead to DATA LOSS.
##
# raft.segment_max_entries = 65536
# raft.wal_max_size_bytes = 1048576
# raft.wal_max_batch_size = 4096
# raft.snapshot_chunk_size = 1000000
##
## Misc/Advanced Options
## =====================
##
## NB: Change these only if you understand what you are doing!
##
## Timeout used when waiting for Mnesia tables in a cluster to
## become available.
##
# mnesia_table_loading_retry_timeout = 30000
## Retries when waiting for Mnesia tables in the cluster startup. Note that
## this setting is not applied to Mnesia upgrades or node deletions.
##
# mnesia_table_loading_retry_limit = 10
## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb
## Whether or not to enable background periodic forced GC runs for all
## Erlang processes on the node in "waiting" state.
##
## Disabling background GC may reduce latency for client operations,
## keeping it enabled may reduce median RAM usage by the binary heap
## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html).
##
## Before trying this option, please take a look at the memory
## breakdown (https://www.rabbitmq.com/memory-use.html).
##
# background_gc_enabled = false
## Target (desired) interval (in milliseconds) at which we run background GC.
## The actual interval will vary depending on how long it takes to execute
## the operation (can be higher than this interval). Values less than
## 30000 milliseconds are not recommended.
##
# background_gc_target_interval = 60000
## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to AMQP clients, other protocols
## like MQTT or STOMP have their own setting to enable proxy protocol.
## See the plugins documentation for more information.
##
# proxy_protocol = false
## Overriden product name and version.
## They are set to "RabbitMQ" and the release version by default.
# product.name = RabbitMQ
# product.version = 1.2.3
## "Message of the day" file.
## Its content is used to expand the logged and printed banners.
## Default to /etc/rabbitmq/motd on Unix, %APPDATA%\RabbitMQ\motd.txt
## on Windows.
# motd_file = /etc/rabbitmq/motd
## ----------------------------------------------------------------------------
## Advanced Erlang Networking/Clustering Options.
##
## Related doc guide: https://rabbitmq.com/clustering.html
## ----------------------------------------------------------------------------
# ======================================
# Kernel section
# ======================================
## Timeout used to detect peer unavailability, including CLI tools.
## Related doc guide: https://www.rabbitmq.com/nettick.html.
##
# net_ticktime = 60
## Inter-node communication port range.
## The parameters inet_dist_listen_min and inet_dist_listen_max
## can be configured in the classic config format only.
## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
##
## Related doc guide: https://rabbitmq.com/management.html.
## ----------------------------------------------------------------------------
# =======================================
# Management section
# =======================================
## Preload schema definitions from the following JSON file.
## Related doc guide: https://rabbitmq.com/management.html#load-definitions.
##
# management.load_definitions = /path/to/exported/definitions.json
## Log all requests to the management HTTP API to a file.
##
# management.http_log_dir = /path/to/access.log
## HTTP listener and embedded Web server settings.
# ## See https://rabbitmq.com/management.html for details.
#
# management.tcp.port = 15672
# management.tcp.ip = 0.0.0.0
#
# management.tcp.shutdown_timeout = 7000
# management.tcp.max_keepalive = 120
# management.tcp.idle_timeout = 120
# management.tcp.inactivity_timeout = 120
# management.tcp.request_timeout = 120
# management.tcp.compress = true
## HTTPS listener settings.
## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details.
##
# management.ssl.port = 15671
# management.ssl.cacertfile = /path/to/ca_certificate.pem
# management.ssl.certfile = /path/to/server_certificate.pem
# management.ssl.keyfile = /path/to/server_key.pem
## More TLS options
# management.ssl.honor_cipher_order = true
# management.ssl.honor_ecc_order = true
# management.ssl.client_renegotiation = false
# management.ssl.secure_renegotiate = true
## Supported TLS versions
# management.ssl.versions.1 = tlsv1.2
# management.ssl.versions.2 = tlsv1.1
## Cipher suites the server is allowed to use
# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384
# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384
# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384
# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384
## URL path prefix for HTTP API and management UI
# management.path_prefix = /a-prefix
## One of 'basic', 'detailed' or 'none'. See
## https://rabbitmq.com/management.html#fine-stats for more details.
# management.rates_mode = basic
## Configure how long aggregated data (such as message rates and queue
## lengths) is retained. Please read the plugin's documentation in
## https://rabbitmq.com/management.html#configuration for more
## details.
## Your can use 'minute', 'hour' and 'day' keys or integer key (in seconds)
# management.sample_retention_policies.global.minute = 5
# management.sample_retention_policies.global.hour = 60
# management.sample_retention_policies.global.day = 1200
# management.sample_retention_policies.basic.minute = 5
# management.sample_retention_policies.basic.hour = 60
# management.sample_retention_policies.detailed.10 = 5
## ----------------------------------------------------------------------------
## RabbitMQ Shovel Plugin
##
## Related doc guide: https://rabbitmq.com/shovel.html
## ----------------------------------------------------------------------------
## See advanced.config.example for a Shovel plugin example
## ----------------------------------------------------------------------------
## RabbitMQ STOMP Plugin
##
## Related doc guide: https://rabbitmq.com/stomp.html
## ----------------------------------------------------------------------------
# =======================================
# STOMP section
# =======================================
## See https://rabbitmq.com/stomp.html for details.
## TCP listeners.
##
# stomp.listeners.tcp.1 = 127.0.0.1:61613
# stomp.listeners.tcp.2 = ::1:61613
## TCP listener settings
##
# stomp.tcp_listen_options.backlog = 2048
# stomp.tcp_listen_options.recbuf = 131072
# stomp.tcp_listen_options.sndbuf = 131072
#
# stomp.tcp_listen_options.keepalive = true
# stomp.tcp_listen_options.nodelay = true
#
# stomp.tcp_listen_options.exit_on_close = true
# stomp.tcp_listen_options.send_timeout = 120
## Proxy protocol support
##
# stomp.proxy_protocol = false
## TLS listeners
## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details.
# stomp.listeners.ssl.default = 61614
#
# ssl_options.cacertfile = path/to/cacert.pem
# ssl_options.certfile = path/to/cert.pem
# ssl_options.keyfile = path/to/key.pem
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = true
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# stomp.num_acceptors.tcp = 10
# stomp.num_acceptors.ssl = 1
## Additional TLS options
## Extract a name from the client's certificate when using TLS.
##
# stomp.ssl_cert_login = true
## Set a default user name and password. This is used as the default login
## whenever a CONNECT frame omits the login and passcode headers.
##
## Please note that setting this will allow clients to connect without
## authenticating!
##
# stomp.default_user = guest
# stomp.default_pass = guest
## If a default user is configured, or you have configured use TLS client
## certificate based authentication, you can choose to allow clients to
## omit the CONNECT frame entirely. If set to true, the client is
## automatically connected as the default user or user supplied in the
## TLS certificate whenever the first frame sent on a session is not a
## CONNECT frame.
##
# stomp.implicit_connect = true
## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like MQTT or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# stomp.proxy_protocol = false
## ----------------------------------------------------------------------------
## RabbitMQ MQTT Adapter
##
## See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
## for details
## ----------------------------------------------------------------------------
# =======================================
# MQTT section
# =======================================
## TCP listener settings.
##
# mqtt.listeners.tcp.1 = 127.0.0.1:61613
# mqtt.listeners.tcp.2 = ::1:61613
## TCP listener options (as per the broker configuration).
##
# mqtt.tcp_listen_options.backlog = 4096
# mqtt.tcp_listen_options.recbuf = 131072
# mqtt.tcp_listen_options.sndbuf = 131072
#
# mqtt.tcp_listen_options.keepalive = true
# mqtt.tcp_listen_options.nodelay = true
#
# mqtt.tcp_listen_options.exit_on_close = true
# mqtt.tcp_listen_options.send_timeout = 120
## TLS listener settings
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
#
# mqtt.listeners.ssl.default = 8883
#
# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
# ssl_options.certfile = /path/to/tls/server_certificate.pem
# ssl_options.keyfile = /path/to/tls/server_key.pem
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = true
#
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# mqtt.num_acceptors.tcp = 10
# mqtt.num_acceptors.ssl = 10
## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like STOMP or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# mqtt.proxy_protocol = false
## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
# mqtt.default_user = guest
# mqtt.default_pass = guest
## Enable anonymous connections. If this is set to false, clients MUST provide
## credentials in order to connect. See also the mqtt.default_user/mqtt.default_pass
## keys. Anonymous connections are highly discouraged!
##
# mqtt.allow_anonymous = true
## If you have multiple vhosts, specify the one to which the
## adapter connects.
##
# mqtt.vhost = /
## Specify the exchange to which messages from MQTT clients are published.
##
# mqtt.exchange = amq.topic
## Specify TTL (time to live) to control the lifetime of non-clean sessions.
##
# mqtt.subscription_ttl = 1800000
## Set the prefetch count (governing the maximum number of unacknowledged
## messages that will be delivered).
##
# mqtt.prefetch = 10
## ----------------------------------------------------------------------------
## RabbitMQ AMQP 1.0 Support
##
## See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md.
## ----------------------------------------------------------------------------
# =======================================
# AMQP 1.0 section
# =======================================
## Connections that are not authenticated with SASL will connect as this
## account. See the README for more information.
##
## Please note that setting this will allow clients to connect without
## authenticating!
##
# amqp1_0.default_user = guest
## Enable protocol strict mode. See the README for more information.
##
# amqp1_0.protocol_strict_mode = false
## Logging settings.
##
## See https://rabbitmq.com/logging.html and https://github.com/erlang-lager/lager for details.
##
## Log directory, taken from the RABBITMQ_LOG_BASE env variable by default.
##
# log.dir = /var/log/rabbitmq
## Logging to file. Can be false or a filename.
## Default:
# log.file = rabbit.log
## To disable logging to a file
# log.file = false
## Log level for file logging
##
# log.file.level = info
## File rotation config. No rotation by default.
## DO NOT SET rotation date to ''. Leave the value unset if "" is the desired value
# log.file.rotation.date = $D0
# log.file.rotation.size = 0
## Logging to console (can be true or false)
##
# log.console = false
## Log level for console logging
##
# log.console.level = info
## Logging to the amq.rabbitmq.log exchange (can be true or false)
##
# log.exchange = false
## Log level to use when logging to the amq.rabbitmq.log exchange
##
# log.exchange.level = info
## ----------------------------------------------------------------------------
## RabbitMQ LDAP Plugin
##
## Related doc guide: https://rabbitmq.com/ldap.html.
##
## ----------------------------------------------------------------------------
# =======================================
# LDAP section
# =======================================
##
## Connecting to the LDAP server(s)
## ================================
##
## Specify servers to bind to. You *must* set this in order for the plugin
## to work properly.
##
# auth_ldap.servers.1 = your-server-name-goes-here
## You can define multiple servers
# auth_ldap.servers.2 = your-other-server
## Connect to the LDAP server using TLS
##
# auth_ldap.use_ssl = false
## Specify the LDAP port to connect to
##
# auth_ldap.port = 389
## LDAP connection timeout, in milliseconds or 'infinity'
##
# auth_ldap.timeout = infinity
## Or number
# auth_ldap.timeout = 500
## Enable logging of LDAP queries.
## One of
## - false (no logging is performed)
## - true (verbose logging of the logic used by the plugin)
## - network (as true, but additionally logs LDAP network traffic)
##
## Defaults to false.
##
# auth_ldap.log = false
## Also can be true or network
# auth_ldap.log = true
# auth_ldap.log = network
##
## Authentication
## ==============
##
## Pattern to convert the username given through AMQP to a DN before
## binding
##
# auth_ldap.user_dn_pattern = cn=${username},ou=People,dc=example,dc=com
## Alternatively, you can convert a username to a Distinguished
## Name via an LDAP lookup after binding. See the documentation for
## full details.
## When converting a username to a dn via a lookup, set these to
## the name of the attribute that represents the user name, and the
## base DN for the lookup query.
##
# auth_ldap.dn_lookup_attribute = userPrincipalName
# auth_ldap.dn_lookup_base = DC=gopivotal,DC=com
## Controls how to bind for authorisation queries and also to
## retrieve the details of users logging in without presenting a
## password (e.g., SASL EXTERNAL).
## One of
## - as_user (to bind as the authenticated user - requires a password)
## - anon (to bind anonymously)
## - {UserDN, Password} (to bind with a specified user name and password)
##
## Defaults to 'as_user'.
##
# auth_ldap.other_bind = as_user
## Or can be more complex:
# auth_ldap.other_bind.user_dn = User
# auth_ldap.other_bind.password = Password
## If user_dn and password defined - other options is ignored.
# -----------------------------
# Too complex section of LDAP
# -----------------------------
##
## Authorisation
## =============
##
## The LDAP plugin can perform a variety of queries against your
## LDAP server to determine questions of authorisation.
##
## Related doc guide: https://rabbitmq.com/ldap.html#authorisation.
## Following configuration should be defined in advanced.config file
## DO NOT UNCOMMENT THESE LINES!
## Set the query to use when determining vhost access
##
## {vhost_access_query, {in_group,
## "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},
## Set the query to use when determining resource (e.g., queue) access
##
## {resource_access_query, {constant, true}},
## Set queries to determine which tags a user has
##
## {tag_queries, []}
# ]},
# -----------------------------
advanced.config.example.
[
%% ----------------------------------------------------------------------------
%% Advanced Erlang Networking/Clustering Options.
%%
%% See https://www.rabbitmq.com/clustering.html for details
%% ----------------------------------------------------------------------------
%% Sets the net_kernel tick time.
%% Please see http://erlang.org/doc/man/kernel_app.html and
%% https://www.rabbitmq.com/nettick.html for further details.
%%
%% {kernel, [{net_ticktime, 60}]},
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
%% See https://www.rabbitmq.com/shovel.html for details
%% ----------------------------------------------------------------------------
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
%% {my_first_shovel,
%% [
%% List the source broker(s) from which to consume.
%%
%% {sources,
%% [%% URI(s) and pre-declarations for all source broker(s).
%% {brokers, ["amqp://user:password@host.domain/my_vhost"]},
%% {declarations, []}
%% ]},
%% List the destination broker(s) to publish to.
%% {destinations,
%% [%% A singular version of the 'brokers' element.
%% {broker, "amqp://"},
%% {declarations, []}
%% ]},
%% Name of the queue to shovel messages from.
%%
%% {queue, <<"your-queue-name-goes-here">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
%% {publish_fields, [{exchange, <<"my_exchange">>},
%% {routing_key, <<"from_shovel">>}]},
%% Static list of basic.properties to set on re-publication.
%%
%% {publish_properties, [{delivery_mode, 2}]},
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
%% ]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
{rabbitmq_auth_backend_ldap, [
%%
%% Authorisation
%% =============
%%
%% The LDAP plugin can perform a variety of queries against your
%% LDAP server to determine questions of authorisation. See
%% https://www.rabbitmq.com/ldap.html#authorisation for more
%% information.
%% Set the query to use when determining vhost access
%%
%% {vhost_access_query, {in_group,
%% "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},
%% Set the query to use when determining resource (e.g., queue) access
%%
%% {resource_access_query, {constant, true}},
%% Set queries to determine which tags a user has
%%
%% {tag_queries, []}
]}
].