rabbitmq笔记(入门)


笔记来自:B站编程不良人,稍作修改、补充

1.MQ引言.

1.1 什么是MQ.

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些.

当今市面上有很多主流的消息中间件,如老牌的ActiveMQRabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.3 不同MQ特点.

# 1.ActiveMQ
    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
        JMS:Java消息服务(Java Message Service)应用程序接口    
# 2.Kafka
    Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
    适合产生大量数据的互联网服务的数据收集业务。

# 3.RocketMQ
    RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

# 4.RabbitMQ
    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
        AMQP:高级消息队列协议(Advanced Message Queuing Protocol) -- 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

  • 数据可靠性==> RabbitMQ
  • 数据高吞吐==> Kafka

2.RabbitMQ 的引言.

2.1 RabbitMQ.

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

官网: https://www.rabbitmq.com/

官方教程: https://www.rabbitmq.com/#getstarted

 # AMQP 协议
 `AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种`binary wire-level protocol`(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

AMQP协议模型

2.2 RabbitMQ 的安装.

2.2.1 下载.

官网下载地址: https://www.rabbitmq.com/download.html

下载CentOS版本

下载最新的RabbitMQ,在安装之前还需下载一些依赖包 :

注意:版本对应.

3.8.9对应版本22.3

2.2.2 下载的安装包.

  • erlang-23.1.5-1.el7.x86_64.rpm
  • socat-1.7.3.2-2.el7.x86_64.rpm
  • rabbitmq-server-3.8.9-1.el7.noarch.rpm

2.2.3 安装步骤.

# 创建存放安装包暂存目录
mkdir /app
cd /app

# 1.将安装包上传到 /app 上
## 可使用xftp

# 2.安装
rpm -ivh erlang-23.1.5-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.9-1.el7.noarch.rpm

# 3.复制配置文件
## cp /usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
### 3.8.9在该目录下没有这个文件,所以去github上拷贝了一份,复制到rabbitmq.conf,文件内容在结尾
### https://github.com/rabbitmq/rabbitmq-server/tree/v3.8.x/deps/rabbit/docs
vim /etc/rabbitmq/rabbitmq.conf
### 编辑rabbitmq.conf 修改81行,取消注释
loopback_users.guest= false

# 4.配置开机自启
chkconfig rabbitmq-server on

# 5.开启rabbitmq
systemctl start rabbitmq-server
# 查看状态
systemctl status rabbitmq-server

# 6.执行如下命令,启动rabbitmq中的插件管理
## 192.168.111.11:15672访问图形化管理界面
rabbitmq-plugins enable rabbitmq_management

# 7.如果访问不到:需要关闭防火墙
systemctl stop firewalld
## 关闭开机自启
systemctl disable firewalld

# 8.访问图形化界面
192.168.111.11:15672

2.2.4登录.

username:guest

password:guest

但是登录失败:User can only log in via localhost

原因:rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问

# 解决方法
vim /etc/rabbitmq/rabbitmq.conf

### 编辑rabbitmq.conf 修改81行,取消注释
loopback_users.guest= false


#### 以前的配置文件rabbitmq.config
# 
# {loopback_users, [<<”guest”>>]}  改为   {loopback_users, []},
# 
#### 

### 然后重启服务
systemctl restart rabbitmq-server

### 再次访问,并登录
192.168.111.11:15672
username: guest
password: guest

2.2.5 权限配置命令,用户标签.

用户权限命令

rabbitmqctl list_users  // 查看当前所有用户
rabbitmqctl list_user_permissions guest //查看默认guest用户的权限
rabbitmqctl delete_user guest // 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
rabbitmqctl add_user username password //添加新用户
rabbitmqctl set_user_tags username administrator // 设置用户tag
rabbitmqctl set_permissions -p / username ".*" ".*" ".*" //赋予用户默认vhost的全部操作权限
rabbitmqctl list_user_permissions username // 查看用户的权限

# 其他命令
https://www.rabbitmq.com/rabbitmqctl.8.html

用户标签

这些标签,由上到下,上面的权限包括下面的,因此,选用标签只需一个即可

  • administrator用户可以执行监视所能做的一切,管理用户、vhost和权限,关闭其他用户的连接,以及管理所有vhost的策略和参数。

    • 创建和删除虚拟主机
    • 查看、创建和删除用户
    • 查看、创建和删除权限
    • 关闭其他用户的连接
  • monitoring:用户可以访问管理插件并查看所有连接和通道以及节点相关信息

    • 列出所有虚拟主机,包括无法使用消息传递协议访问的虚拟主机
    • 查看节点级数据,如内存使用和集群
    • 查看所有虚拟主机的真实全局统计信息
  • policymaker:用户可以访问管理插件并管理他们有权访问的vhost的策略和参数。

    • 查看、创建和删除可通过AMQP登录的虚拟主机的策略和参数
  • management:用户可以访问管理插件

    • 列出他们可以通过AMQP登录的虚拟主机
    • 查看其虚拟主机中的所有队列、交换和绑定
    • 查看并关闭自己的频道和连接
    • 查看涵盖其所有虚拟主机的“全局”统计信息,包括其中其他用户的活动
  • impersonator(不了解)

  • none:无法访问管理插件

3. RabiitMQ 配置.

3.1RabbitMQ 管理命令行.

# 1.服务启动相关
    systemctl start|restart|stop|status rabbitmq-server

# 2.管理命令行  用来在不使用web管理界面情况下命令操作RabbitMQ
    rabbitmqctl  help  可以查看更多命令

# 3.插件管理命令行
    rabbitmq-plugins enable|list|disable 

3.2 web管理界面介绍.

3.2.1 overview概览.

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由
  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

3.2.2 Admin用户和虚拟主机管理.

1. 添加用户.

上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

  • 超级管理员(administrator)

    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 监控者(monitoring)

    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  • 策略制定者(policymaker)

    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  • 普通管理者(management)

    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

  • 其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

2. 创建虚拟主机.
# 虚拟主机
    为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

3. 绑定虚拟主机和用户.

创建好虚拟主机,我们还要给用户添加访问权限:

点击添加好的虚拟主机:

进入虚拟机设置界面:


4.RabbitMQ 的第一个程序.

4.0 AMQP协议的回顾.

4.1 RabbitMQ支持的消息模型.

4.2 创建一个普通maven项目.

过程略

编写RabbitMQ工具类.

public class RabbitUtil {
    private static  ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.111.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/java");
        connectionFactory.setUsername("liuyou");
        connectionFactory.setPassword("lmk");
    }

    public static Connection getConnection(){
        try {
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnectionAndChannel(Connection connection, Channel channel){
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4.3 引入依赖.

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

4.3 第一种模型(直连).

(P) -> [|||] -> (C)

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

1.开发生产者.

String queue = "hello";
String message = "hello,rabbitmq";

// 1、连接
Connection connection = RabbitMQUtil.getConnection();
// 2、通道
Channel channel = connection.createChannel();

// 3、队列
// 参数1:queue 队列名
// 参数2:durable 是否持久化队列
// 参数3:exclusive 是否独占队列
// 参数4:autoDelete 是否删除队列(在服务器不使用时)
// 参数5:arguments 其他属性
channel.queueDeclare(queue, true, false, false, null);

// 4、消息发送
// 参数1:exchange 交换名字,这里不使用("")
// 参数2:routingKey 路由key --> 这里使用 队列名(因为没有交换机)
// 参数3:props 额外配置
// 参数4:body 要发送的消息
for (int i = 0; i < 3; i++) {
    channel.basicPublish("",queue,null, message.getBytes());
}

// 5、关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);

2.开发消费者.

String queue = "hello";
// 1.连接
Connection connection = RabbitMQUtil.getConnection();
// 2.通道
Channel channel = connection.createChannel();
// 3.队列 (避免队列不存在)
// 参数1:队列名  参数2:是否持久化  参数3:是否独占   参数4:是否自动删除  参数5:其他属性
channel.queueDeclare(queue,true,false,false,null);
// 4.消费信息
// 参数1:队列名  参数2:autoAck是否自动确认  参数3:consumer实现对象(包含消息处理细节)
channel.basicConsume(queue,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
    }
});

autoAck :确认消息(队列消息会被清除),如果为true,自动确认(不管信息是否消费,取走的消息可能未被消费);false,手动确认,一般待数据处理后执行 channel.basicAck(envelope.getDeliveryTag(),false); 手动确认,如果未手动确认,这些消息会保留在队列里

autodelete队列自动删除的条件,有消费者订阅本队列,然后所有消费者都解除订阅此队列,autoDelete=true时,此队列会自动删除,即使此队列中还有消息。

3.测试.

①运行Producer

192.168.111.11:15672查看队列信息

队列hello中,有3个刚生成的消息

②运行Consumer

消费了三个消息

队列消息被全部消费

测试结论.
  • 消费者取出消息,就马上进行了确认(autoAck=true),队列信息就清空
    • 存在问题:可能这些取走的消息没有消费完(消费者宕机),造成消息浪费
    • 解决办法:autoAck=false,采用手动确认

4.4 第二种模型(work quene).

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

一个生产者,一个队列,多个消费者

情况一:两消费者处理能力均衡.

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务
  • C2:消费者-2:领取任务并且完成任务
1. 开发生产者.
P.
String queue = "workqueue";
String message = "测试workqueue,一生成者,一队列,多个消费者";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列
channel.queueDeclare(queue, true, false, true, null);
//4.发送消息
for (int i = 0; i < 20; i++) {
    channel.basicPublish("", queue,null,(message + " 消息-" + i).getBytes());
}
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消费者1 == " + new String(body, "utf-8"));
        System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
        System.out.println();
        channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
    }
});
C2.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消费者2 == " + new String(body, "utf-8"));
        System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
        System.out.println();
        channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
    }
});
3.测试.

①运行C1

②运行C2

③运行P

Consumer1消费全是偶数位的消息

Consumer2消费全是奇数位的消息

测试结论.
  • 两个消费者,平均交替消费队列里的消息(提前分配,不管谁处理快都是平均的)
    • 存在问题:如果两消费者,消费能力不均?
      • 能否使用能者多劳(不平均分配,按需分配或按处理能力分配)
    • 造成原理:
      • 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环
    • 解决办法:
      • 由于会提前分配好全部消息给不同消费者,我们需要① 设置每个通道一次只能获取一个消息(每个消费者一次只能获取一个,获取能力强的,获取的消息就多)②关闭自动确认,使用手动确认

情况二:两消费者处理能力不均.

实现效果:能者多劳

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务,处理快
  • C2:消费者-2:领取任务并且完成任务,处理慢

P我们不做修改

C 中添加 channel.basicQos(1);

C1.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();

channel.basicQos(1); //一次只接受一条未确认的消息

//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(1000); // 处理能力快 1秒处理一个
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消费者1 == " + new String(body, "utf-8"));
        System.out.println();
        channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
    }
});
C2.
String queue = "workqueue";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();

channel.basicQos(1); //一次只接受一条未确认的消息

//3.队列 (避免队列不存在)
channel.queueDeclare(queue, true, false, true, null);
//4.消费
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(5000); // 处理能力慢 5秒处理一个
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消费者2 == " + new String(body, "utf-8"));
        System.out.println("当前 deliveryTag : " + envelope.getDeliveryTag());
        System.out.println();
        channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认
    }
});
测试(同情况一).

测试结果

C1处理快,处理了16个

C1处理快,处理了16个

C2处理慢,只处理了4个

C2处理慢,只处理了4个

4.5 第三种模型(fanout).

fanout 扇出 也称为广播

img

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1.开发消费者.

String exchange = "fanout";
String message = "测试fanout 广播: 1个消息--> 发给绑定交换机fanout的消费者";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
// 参数1:交换机名字  参数2:交换机类型  参数3:是否持久化  参数4:是否自动删除  参数5:其他
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.发送消息
// 参数1:exchange 交换机名
// 参数2:routingKey 路由key(广播类型永不上)
// 参数3:其他
// 参数4:要发送的消息
channel.basicPublish(exchange,"",null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);

2.开发生产者.

C1.
String exchange = "fanout";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 (避免交换机不存在--需保证与生产者端条件一致)
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.临时队列(临时使用,用完自动删除)
String queue = channel.queueDeclare().getQueue();
//5.绑定队列
// 参数1:队列名  参数2:交换机名  参数3:路由key  参数4:其他
channel.queueBind(queue, exchange, "", null);
//6.接收消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1 接收fanout消息 :" + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
});
C2.
String exchange = "fanout";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 (避免交换机不存在--需保证与生产者端条件一致)
channel.exchangeDeclare(exchange,"fanout",true,false,null);
//4.临时队列(临时使用,用完自动删除)
String queue = channel.queueDeclare().getQueue();
//5.绑定队列
// 参数1:队列名  参数2:交换机名  参数3:路由key  参数4:其他
channel.queueBind(queue, exchange, "", null);
//6.接收消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2 接收fanout消息 :" + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
});

3.测试.

①运行所有消费者.
②运行生产者.
测试结果.

)

4.6 第四种模型(Routing).

4.6.1 Routing 之订阅模型-Direct(直连).

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

img

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • XExchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
1.开发生产者.
String exchange = "direct";
String routingKey = "info";
String message = "测试 routing之direct:在fanout基础上,加上routingkey,消息绑定该key,只有绑定该key的消费者才能获取该数据";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机 direct 直连
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.发送消息,绑定routingKey,只有绑定该key的消费者才能获取消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1.
String exchange = "direct";
String routingKey = "info";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 info
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1 == " + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
C2.
String exchange = "direct";
String routingKey = "debug";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 debug
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2 == " + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
C3.
String exchange = "direct";
String routingKey1 = "error";
String routingKey2 = "info";
String routingKey3 = "debug";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 == 绑定 info,debug,error
channel.queueBind(queue, exchange, routingKey1);
channel.queueBind(queue, exchange, routingKey2);
channel.queueBind(queue, exchange, routingKey3);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者3 == " + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
3.测试.
①运行所有消费者.
  • C1info
  • C2debug
  • C3info、debug、error
②运行生产者.
  • 消息(info
③运行结果.

C1

C2

C3

4.6.2 Routing 之订阅模型-Topic.

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

img

# 通配符
    * (star) can substitute for exactly one word.    匹配不多不少恰好1个词
    # (hash) can substitute for zero or more words.  匹配零个或多个词
# 如:
    audit.#    匹配audit.irs.corporate或者 audit.irs 等
    audit.*   只能匹配 audit.irs
1.开发生产者.
String exchange = "topic";
String message = "测试 routing之topic,在direct基础上,消费者可以使用 通配符匹配routingKey";
String routingKey = "liu";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.发送消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
//5.关闭连接
RabbitMQUtil.closeConnectionAndChannel(connection, channel);
2.开发消费者.
C1 采用 * ,只匹配一个单词.
String exchange = "topic";
String routingKey = "*.liu.*";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 routingkey 采用 * ,只匹配一个单词
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1 + 采用*匹配 = " + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(), false);// 手动确认
    }
});
C2 采用 # ,可匹配零个或多个单词.
String exchange = "topic";
String routingKey = "rabbit.#";
//1.连接
Connection connection = RabbitMQUtil.getConnection();
//2.通道
Channel channel = connection.createChannel();
//3.交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
//4.临时队列
String queue = channel.queueDeclare().getQueue();
//5.绑定队列 routingkey 采用 # ,可匹配零个或多个单词
channel.queueBind(queue, exchange, routingKey);
//6.消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2 + 采用#匹配 = " + new String(body, "utf-8"));
        channel.basicAck(envelope.getDeliveryTag(), false);// 手动确认
    }
});
3.测试.
①运行所有消费者.
  • C1(*.liu.*)
  • C2(rabbit.#)
②运行生产者.
  • P(rabbit.liu.you)
③运行结果.

4.7 第五种模型 (RPC).

在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。 但是如果我们需要在远程计算机上运行一个函数并等待结果呢?好吧,那是另一回事了。这种模式通常称为远程过程调用或RPC。在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci数的虚拟RPC服务。

img

图解:

  • C:客户端,使用远程调用的那一方
  • S:服务端,远程调用服务提供方
  • Request:请求(携带correlation_id用于回调临时队列名
    • correlation_id:该id是确认标识,它会跟着回调响应传回客户端,客户端根据回调相关id和本地id进行对比,相等:取出
  • Reply:回调(携带 correlation_id远程方法返回值

0.搭建环境.

①创建两个模块:RPCClientRPCServer.
②依赖引入.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
③两模块都编写相同的RabbitMQUtil工具类.

该工具类 采用 虚拟主机为 /rpc

public class RabbitMQUtil {
    private static  ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.111.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/rpc");
        connectionFactory.setUsername("liuyou");
        connectionFactory.setPassword("lmk");
    }

    public static Connection getConnection(){
        try {
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnectionAndChannel(Connection connection, Channel channel){
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

1.开发服务端.

public class RPCServer_f {


    public static void main(String[] argv) throws Exception {
        // 运行rpc server
        run();
    }

    public static void run() throws IOException {
        // 请求队列名
        String requestQueue = "rpc_queue_req";
        //1.连接
        Connection connection = RabbitMQUtil.getConnection();
        //2.通道
        Channel channel = connection.createChannel();

        channel.basicQos(1);

        //3.队列
        channel.queueDeclare(requestQueue, false, false, false, null);
        //4.清空队列
        channel.queuePurge(requestQueue);
        //5.处理调用请求(consume)
        Object monitor = new Object();// 锁
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 处理内容
            //5.1 构建回调配置
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId()) // 使用
                    .build();
            System.out.println("client相关id:" + delivery.getProperties().getCorrelationId());
            System.out.println("client临时队列名:" + delivery.getProperties().getReplyTo());

            //5.2 获取message
            String message = new String(delivery.getBody(), "UTF-8");

            //5.3 调用call() =======================
            String call = RPCServer_f.call(message);
            // =====================================

            //5.4 将调用处理结果 通过制定队列传回
            channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, call.getBytes("UTF-8"));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            // RabbitMq consumer worker thread notifies the RPC server owner thread
            synchronized (monitor) { // 防止多个consumer同时消费
                monitor.notify();
            }
        };
        channel.basicConsume(requestQueue, false, deliverCallback, consumerTag -> {});
    }

    /**
     * 被远程调用的方法 ===================
     */
    public static String call(String message){
        return "RPCServer的call()被远程调用,显示内容:" + message;
    }
}

2.开发客户端.

public class RPCClient_f {

    public static void main(String[] argv) throws IOException, InterruptedException {
        String message = "rabbit_java";
        // 调用
        Object call = call(message);
        System.out.println(call);
    }

    /** 
     * 使用该方法调用 服务端的call()方法 ========================
     */
    public static String call(String message) throws IOException, InterruptedException {
        // 请求队列名
        String requestQueue = "rpc_queue_req";

        //1.连接
        Connection connection = RabbitMQUtil.getConnection();
        //2.通道
        Channel channel = connection.createChannel();
        //3.回调队列名(临时队列名)
        String replyQueueName = channel.queueDeclare().getQueue();
        //4.关联id correlationId
        String correlationId = UUID.randomUUID().toString();
        //5.构建配置信息
        AMQP.BasicProperties properties = new AMQP.BasicProperties
                .Builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();
        //6.发送调用请求
        channel.basicPublish("", requestQueue, properties, message.getBytes());

        //7.创建阻塞队列:解决多线程中数据的安全传输问题
        BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        //8.接收回调的数据
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
                // 数据入队
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
            System.out.println("相关ID :" + delivery.getProperties().getCorrelationId());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 手动确认
        };
        // 消费(接收服务端传回的数据,并处理)
        channel.basicConsume(replyQueueName, false, deliverCallback, consumerTag -> {});
        //9.从队列中获取结果
        String take = response.take();
        //10.关闭连接
        RabbitMQUtil.closeConnectionAndChannel(connection, channel);
        return take;
    }

}

3.测试.

①运行服务端.
②运行客户端.
③运行结果.

客户端

服务端

5. SpringBoot中使用RabbitMQ.

5.0 搭建初始环境.

1.创建springboot项目.

过程略

2.引入依赖.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.修改配置文件.

server.port=8080
spring.application.name=springboot-application
server.servlet.context-path=/

# rabbitmq
spring.rabbitmq.host=192.168.111.11
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/springboot
spring.rabbitmq.username=liuyou
spring.rabbitmq.password=lmk
spring.rabbitmq.listener.simple.prefetch=1

4.Simple.

生产者.
private RabbitTemplate rabbitTemplate;

@Test
public void testSimple(){
    rabbitTemplate.convertAndSend("_hello","hello,rabbitmq");
}
消费者.
@Component
@RabbitListener(queuesToDeclare = @Queue("_hello"))
public class HelloConsumer {

    @RabbitHandler
    public void consume(String message){
        System.out.println("Consumer 接收到:" + message);
    }
}

5.Work.

生产者.
@Test
public void testWork() throws InterruptedException {
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("_work","work-message" + i);
    }
    Thread.sleep(30000);
}
消费者.
@Component
public class WorkConsumer {
    @RabbitListener(queuesToDeclare = @Queue("_work"), ackMode = "MANUAL")
    public void comsume1(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(1000);
            System.out.println("消费者1 : " + new String(message.getBody()));
            channel.basicAck(deliveryTag, false);
        } catch (InterruptedException e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }

    @RabbitListener(queuesToDeclare = @Queue("_work"), ackMode = "MANUAL")
    public void comsume2(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(3000);
            System.out.println("消费者2 : " + new String(message.getBody()));
            channel.basicAck(deliveryTag, false);
        } catch (InterruptedException e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
}

6.Fanout.

生产者.
@Test
public void testFanout(){
    rabbitTemplate.convertAndSend("_fanout","","广播模型的测试");
}
消费者.
@Component
public class FanoutConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(type = "fanout", name = "_fanout")
    ))
    public void consume1(String message){
        System.out.println("C1 : " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(type = "fanout", name = "_fanout")
    ))
    public void consume2(String message){
        System.out.println("C2 : " + message);
    }
}

7.Routing-Direct.

生产者.
@Test
public void testDirect(){
    rabbitTemplate.convertAndSend("_direct","info","direct发送info类型的消息");
}
消费者.
@Component
public class DirectConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "_direct",type = "direct"),
            key = {"info"}
    ))
    public void consume1(String message) {
        System.out.println("C1 info : " + message);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "_direct",type = "direct"),
            key = {"debug"}
    ))
    public void consume2(String message) {
        System.out.println("C2 debug : " + message);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "_direct",type = "direct"),
            key = {"info","debug"}
    ))
    public void consume3(String message) {
        System.out.println("C3 info,debug : " + message);
    }

}

8.Routing-Topic.

生产者.
@Test
public void testTopic(){
    rabbitTemplate.convertAndSend("_topic","rabbit","这里只测试#的使用");
}
消费者.
@Component
public class TopicConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "_topic",type = "topic"),
            key = {"rabbit.#"}
    ))
    public void consume1(String message) {
        System.out.println("C1 rabbit.# : " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "_topic",type = "topic"),
            key = {"rabbit.*"}
    ))
    public void consume2(String message) {
        System.out.println("C2 rabbit.* : " + message);
    }
}

6. MQ的应用场景.

6.1 异步处理.

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式

  • 串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

  • 并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

  • 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回. 消息队列: 引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

6.2 应用解耦.

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。 订单系统和库存系统高耦合. 引入消息队列

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.

6.3 流量削峰.

场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

​ 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)

​ 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.

2.秒杀业务根据消息队列中的请求信息,再做后续处理.

7. RabbitMQ的集群.

7.1 集群架构.

7.1.1 普通集群(副本集群).

默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问

1.架构图.

​ 核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份

2.集群搭建.
# 0.集群规划
    node1: 192.168.111.11  mq1  master 主节点
    node2: 192.168.111.12  mq2  repl1  副本节点
    node3: 192.168.111.13  mq3  repl2  副本节点

# 1.克隆三台机器
## 1.1 更改主机名
    主机mq2:hostnamectl set-hostname mq2
    主机mq3:hostnamectl set-hostname mq3
## 1.2 更改三台主机ip
    只需更改 mq2、mq3
    vim /etc/sysconfig/network-scripts/ifcfg-ens33
    内容修改:
    主机mq2 IPADDR=192.168.111.12
    主机mq3 IPADDR=192.168.111.13
    ## 修改后重启network服务
    systemctl restart network
## 1.3 添加主机映射
    ①主机mq1:vim /etc/hosts 加入:
        192.168.111.11 mq1
        192.168.111.12 mq2
        192.168.111.13 mq3
        保存退出
    ②主机mq1:  scp /etc/hosts root@mq2:/etc/hosts
               scp /etc/hosts root@mq3:/etc/hosts

# 2.三个机器安装rabbitmq,并同步cookie文件,在mq1上执行:
    scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
    scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
## 必须保证三台机器 .erlang.cookie 内容一致

# 3.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面:
    rabbitmq-server -detached

# 4.在mq2和mq3执行加入集群命令:
    1.关闭       rabbitmqctl stop_app
    2.加入集群    rabbitmqctl join_cluster rabbit@mq1
    3.启动服务    rabbitmqctl start_app
# 5.查看集群状态,任意节点执行:
    rabbitmqctl cluster_status    

# 6.登录管理界面,展示如下状态:

3.测试.

普通集群,主机负责对交换机、队列等创建,消息进入队列,备机只是备份交换机、队列(不含队列里的消息),并不能做到故障转移

①在主机创建队列,备机查看.

②主机宕机,查看备机.
# mq1 
rabbitmqctl stop_app

7.1.2 镜像集群.

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。

队列数据可备份,但是还是无法实现故障转移

1.架构图.

2.集群配置.

(在普通集群基础上)

# 0.策略说明
    rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
    -p Vhost: 可选参数,针对指定vhost下的queue进行设置
    Name:     policy的名称
    Pattern: queue的匹配模式(正则表达式)
    Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
                   ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
                        all:表示在集群中所有的节点上进行镜像
                        exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
                        nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
                ha-params:ha-mode模式需要用到的参数
                ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
                priority:可选参数,policy的优先级


# 1.查看当前策略
    rabbitmqctl list_policies

# 2.添加策略
    rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}' 
    说明:策略正则表达式为 “^” 表示所有匹配所有队列名称  ^hello:匹配hello开头队列

# 3.删除策略
    rabbitmqctl clear_policy ha-all

# 4.测试集群

配置文件.

rabbitmq.conf.example.

# ======================================
# RabbitMQ broker section
# ======================================

## Related doc guide: https://rabbitmq.com/configure.html. See
## https://rabbitmq.com/documentation.html for documentation ToC.

## Networking
## ====================
##
## Related doc guide: https://rabbitmq.com/networking.html.
##
## By default, RabbitMQ will listen on all interfaces, using
## the standard (reserved) AMQP 0-9-1 and 1.0 port.
##
# listeners.tcp.default = 5672


## To listen on a specific interface, provide an IP address with port.
## For example, to listen only on localhost for both IPv4 and IPv6:
##
# IPv4
# listeners.tcp.local    = 127.0.0.1:5672
# IPv6
# listeners.tcp.local_v6 = ::1:5672

## You can define multiple listeners using listener names
# listeners.tcp.other_port = 5673
# listeners.tcp.other_ip   = 10.10.10.10:5672


## TLS listeners are configured in the same fashion as TCP listeners,
## including the option to control the choice of interface.
##
# listeners.ssl.default = 5671

## It is possible to disable regular TCP (non-TLS) listeners. Clients
## not configured to use TLS and the correct TLS-enabled port won't be able
## to connect to this node.
# listeners.tcp = none

## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# num_acceptors.tcp = 10
# num_acceptors.ssl = 10

## Socket writer will force GC every so many bytes transferred.
## Default is 1 GiB (`1000000000`). Set to 'off' to disable.
##
# socket_writer.gc_threshold = 1000000000
#
## To disable:
# socket_writer.gc_threshold = off

## Maximum amount of time allowed for the AMQP 0-9-1 and AMQP 1.0 handshake
## (performed after socket connection and TLS handshake) to complete, in milliseconds.
##
# handshake_timeout = 10000

## Set to 'true' to perform reverse DNS lookups when accepting a
## connection. rabbitmqctl and management UI will then display hostnames
## instead of IP addresses. Default value is `false`.
##
# reverse_dns_lookups = false

##
## Security, Access Control
## ==============
##

## Related doc guide: https://rabbitmq.com/access-control.html.

## The default "guest" user is only permitted to access the server
## via a loopback interface (e.g. localhost).
## {loopback_users, [<<"guest">>]},
##
# loopback_users.guest = true

## Uncomment the following line if you want to allow access to the
## guest user from anywhere on the network.
# loopback_users.guest = false

## TLS configuration.
##
## Related doc guide: https://rabbitmq.com/ssl.html.
##
# ssl_options.verify               = verify_peer
# ssl_options.fail_if_no_peer_cert = false
# ssl_options.cacertfile           = /path/to/cacert.pem
# ssl_options.certfile             = /path/to/cert.pem
# ssl_options.keyfile              = /path/to/key.pem
#
# ssl_options.honor_cipher_order   = true
# ssl_options.honor_ecc_order      = true

# ssl_options.ciphers.1  = ECDHE-ECDSA-AES256-GCM-SHA384
# ssl_options.ciphers.2  = ECDHE-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.3  = ECDHE-ECDSA-AES256-SHA384
# ssl_options.ciphers.4  = ECDHE-RSA-AES256-SHA384
# ssl_options.ciphers.5  = ECDH-ECDSA-AES256-GCM-SHA384
# ssl_options.ciphers.6  = ECDH-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.7  = ECDH-ECDSA-AES256-SHA384
# ssl_options.ciphers.8  = ECDH-RSA-AES256-SHA384
# ssl_options.ciphers.9  = DHE-RSA-AES256-GCM-SHA384
# ssl_options.ciphers.10 = DHE-DSS-AES256-GCM-SHA384
# ssl_options.ciphers.11 = DHE-RSA-AES256-SHA256
# ssl_options.ciphers.12 = DHE-DSS-AES256-SHA256
# ssl_options.ciphers.13 = ECDHE-ECDSA-AES128-GCM-SHA256
# ssl_options.ciphers.14 = ECDHE-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.15 = ECDHE-ECDSA-AES128-SHA256
# ssl_options.ciphers.16 = ECDHE-RSA-AES128-SHA256
# ssl_options.ciphers.17 = ECDH-ECDSA-AES128-GCM-SHA256
# ssl_options.ciphers.18 = ECDH-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.19 = ECDH-ECDSA-AES128-SHA256
# ssl_options.ciphers.20 = ECDH-RSA-AES128-SHA256
# ssl_options.ciphers.21 = DHE-RSA-AES128-GCM-SHA256
# ssl_options.ciphers.22 = DHE-DSS-AES128-GCM-SHA256
# ssl_options.ciphers.23 = DHE-RSA-AES128-SHA256
# ssl_options.ciphers.24 = DHE-DSS-AES128-SHA256
# ssl_options.ciphers.25 = ECDHE-ECDSA-AES256-SHA
# ssl_options.ciphers.26 = ECDHE-RSA-AES256-SHA
# ssl_options.ciphers.27 = DHE-RSA-AES256-SHA
# ssl_options.ciphers.28 = DHE-DSS-AES256-SHA
# ssl_options.ciphers.29 = ECDH-ECDSA-AES256-SHA
# ssl_options.ciphers.30 = ECDH-RSA-AES256-SHA
# ssl_options.ciphers.31 = ECDHE-ECDSA-AES128-SHA
# ssl_options.ciphers.32 = ECDHE-RSA-AES128-SHA
# ssl_options.ciphers.33 = DHE-RSA-AES128-SHA
# ssl_options.ciphers.34 = DHE-DSS-AES128-SHA
# ssl_options.ciphers.35 = ECDH-ECDSA-AES128-SHA
# ssl_options.ciphers.36 = ECDH-RSA-AES128-SHA

## Select an authentication/authorisation backend to use.
##
## Alternative backends are provided by plugins, such as rabbitmq-auth-backend-ldap.
##
## NB: These settings require certain plugins to be enabled.
##
## Related doc guides:
##
##  * https://rabbitmq.com/plugins.html
##  * https://rabbitmq.com/access-control.html
##

# auth_backends.1   = rabbit_auth_backend_internal

## uses separate backends for authentication and authorisation,
## see below.
# auth_backends.1.authn = rabbit_auth_backend_ldap
# auth_backends.1.authz = rabbit_auth_backend_internal

## The rabbitmq_auth_backend_ldap plugin allows the broker to
## perform authentication and authorisation by deferring to an
## external LDAP server.
##
## Relevant doc guides:
##
## * https://rabbitmq.com/ldap.html
## * https://rabbitmq.com/access-control.html
##
## uses LDAP for both authentication and authorisation
# auth_backends.1 = rabbit_auth_backend_ldap

## uses HTTP service for both authentication and
## authorisation
# auth_backends.1 = rabbit_auth_backend_http

## uses two backends in a chain: HTTP first, then internal
# auth_backends.1   = rabbit_auth_backend_http
# auth_backends.2   = rabbit_auth_backend_internal

## Authentication
## The built-in mechanisms are 'PLAIN',
## 'AMQPLAIN', and 'EXTERNAL' Additional mechanisms can be added via
## plugins.
##
## Related doc guide: https://rabbitmq.com/authentication.html.
##
# auth_mechanisms.1 = PLAIN
# auth_mechanisms.2 = AMQPLAIN

## The rabbitmq-auth-mechanism-ssl plugin makes it possible to
## authenticate a user based on the client's x509 (TLS) certificate.
## Related doc guide: https://rabbitmq.com/authentication.html.
##
## To use auth-mechanism-ssl, the EXTERNAL mechanism should
## be enabled:
##
# auth_mechanisms.1 = PLAIN
# auth_mechanisms.2 = AMQPLAIN
# auth_mechanisms.3 = EXTERNAL

## To force x509 certificate-based authentication on all clients,
## exclude all other mechanisms (note: this will disable password-based
## authentication even for the management UI!):
##
# auth_mechanisms.1 = EXTERNAL

## This pertains to both the rabbitmq-auth-mechanism-ssl plugin and
## STOMP ssl_cert_login configurations. See the RabbitMQ STOMP plugin
## configuration section later in this file and the README in
## https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further
## details.
##
## To use the TLS cert's CN instead of its DN as the username
##
# ssl_cert_login_from   = common_name

## TLS handshake timeout, in milliseconds.
##
# ssl_handshake_timeout = 5000


## Cluster name
##
# cluster_name = dev3.eng.megacorp.local

## Password hashing implementation. Will only affect newly
## created users. To recalculate hash for an existing user
## it's necessary to update her password.
##
## To use SHA-512, set to rabbit_password_hashing_sha512.
##
# password_hashing_module = rabbit_password_hashing_sha256

## When importing definitions exported from versions earlier
## than 3.6.0, it is possible to go back to MD5 (only do this
## as a temporary measure!) by setting this to rabbit_password_hashing_md5.
##
# password_hashing_module = rabbit_password_hashing_md5

##
## Default User / VHost
## ====================
##

## On first start RabbitMQ will create a vhost and a user. These
## config items control what gets created.
## Relevant doc guide: https://rabbitmq.com/access-control.html
##
# default_vhost = /
# default_user = guest
# default_pass = guest

# default_permissions.configure = .*
# default_permissions.read = .*
# default_permissions.write = .*

## Tags for default user
##
## For more details about tags, see the documentation for the
## Management Plugin at https://rabbitmq.com/management.html.
##
# default_user_tags.administrator = true

## Define other tags like this:
# default_user_tags.management = true
# default_user_tags.custom_tag = true

##
## Additional network and protocol related configuration
## =====================================================
##

## Set the default AMQP 0-9-1 heartbeat interval (in seconds).
## Related doc guides:
##
## * https://rabbitmq.com/heartbeats.html
## * https://rabbitmq.com/networking.html
##
# heartbeat = 60

## Set the max permissible size of an AMQP frame (in bytes).
##
# frame_max = 131072

## Set the max frame size the server will accept before connection
## tuning occurs
##
# initial_frame_max = 4096

## Set the max permissible number of channels per connection.
## 0 means "no limit".
##
# channel_max = 128

## Customising TCP Listener (Socket) Configuration.
##
## Related doc guides:
##
## * https://rabbitmq.com/networking.html
## * https://www.erlang.org/doc/man/inet.html#setopts-2
##

# tcp_listen_options.backlog = 128
# tcp_listen_options.nodelay = true
# tcp_listen_options.exit_on_close = false
#
# tcp_listen_options.keepalive = true
# tcp_listen_options.send_timeout = 15000
#
# tcp_listen_options.buffer = 196608
# tcp_listen_options.sndbuf = 196608
# tcp_listen_options.recbuf = 196608

##
## Resource Limits & Flow Control
## ==============================
##
## Related doc guide: https://rabbitmq.com/memory.html.

## Memory-based Flow Control threshold.
##
# vm_memory_high_watermark.relative = 0.4

## Alternatively, we can set a limit (in bytes) of RAM used by the node.
##
# vm_memory_high_watermark.absolute = 1073741824

## Or you can set absolute value using memory units (with RabbitMQ 3.6.0+).
## Absolute watermark will be ignored if relative is defined!
##
# vm_memory_high_watermark.absolute = 2GB
##
## Supported unit symbols:
##
## k, kiB: kibibytes (2^10 - 1,024 bytes)
## M, MiB: mebibytes (2^20 - 1,048,576 bytes)
## G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
## kB: kilobytes (10^3 - 1,000 bytes)
## MB: megabytes (10^6 - 1,000,000 bytes)
## GB: gigabytes (10^9 - 1,000,000,000 bytes)



## Fraction of the high watermark limit at which queues start to
## page message out to disc in order to free up memory.
## For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5,
## paging can begin as early as when 20% of total available RAM is used by the node.
##
## Values greater than 1.0 can be dangerous and should be used carefully.
##
## One alternative to this is to use durable queues and publish messages
## as persistent (delivery mode = 2). With this combination queues will
## move messages to disk much more rapidly.
##
## Another alternative is to configure queues to page all messages (both
## persistent and transient) to disk as quickly
## as possible, see https://rabbitmq.com/lazy-queues.html.
##
# vm_memory_high_watermark_paging_ratio = 0.5

## Selects Erlang VM memory consumption calculation strategy. Can be `allocated`, `rss` or `legacy` (aliased as `erlang`),
## Introduced in 3.6.11. `rss` is the default as of 3.6.12.
## See https://github.com/rabbitmq/rabbitmq-server/issues/1223 and rabbitmq/rabbitmq-common#224 for background.
# vm_memory_calculation_strategy = rss

## Interval (in milliseconds) at which we perform the check of the memory
## levels against the watermarks.
##
# memory_monitor_interval = 2500

## The total memory available can be calculated from the OS resources
## - default option - or provided as a configuration parameter.
# total_memory_available_override_value = 2GB

## Set disk free limit (in bytes). Once free disk space reaches this
## lower bound, a disk alarm will be set - see the documentation
## listed above for more details.
##
## Absolute watermark will be ignored if relative is defined!
# disk_free_limit.absolute = 50000

## Or you can set it using memory units (same as in vm_memory_high_watermark)
## with RabbitMQ 3.6.0+.
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB

## Alternatively, we can set a limit relative to total available RAM.
##
## Values lower than 1.0 can be dangerous and should be used carefully.
# disk_free_limit.relative = 2.0

##
## Clustering
## =====================
##
# cluster_partition_handling = ignore

## pause_if_all_down strategy require additional configuration
# cluster_partition_handling = pause_if_all_down

## Recover strategy. Can be either 'autoheal' or 'ignore'
# cluster_partition_handling.pause_if_all_down.recover = ignore

## Node names to check
# cluster_partition_handling.pause_if_all_down.nodes.1 = rabbit@localhost
# cluster_partition_handling.pause_if_all_down.nodes.2 = hare@localhost

## Mirror sync batch size, in messages. Increasing this will speed
## up syncing but total batch size in bytes must not exceed 2 GiB.
## Available in RabbitMQ 3.6.0 or later.
##
# mirroring_sync_batch_size = 4096

## Make clustering happen *automatically* at startup. Only applied
## to nodes that have just been reset or started for the first time.
##
## Relevant doc guide: https://rabbitmq.com//cluster-formation.html
##

# cluster_formation.peer_discovery_backend     = rabbit_peer_discovery_classic_config
#
# cluster_formation.classic_config.nodes.1 = rabbit1@hostname
# cluster_formation.classic_config.nodes.2 = rabbit2@hostname
# cluster_formation.classic_config.nodes.3 = rabbit3@hostname
# cluster_formation.classic_config.nodes.4 = rabbit4@hostname

## DNS-based peer discovery. This backend will list A records
## of the configured hostname and perform reverse lookups for
## the addresses returned.

# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_dns
# cluster_formation.dns.hostname = discovery.eng.example.local

## This node's type can be configured. If you are not sure
## what node type to use, always use 'disc'.
# cluster_formation.node_type = disc

## Interval (in milliseconds) at which we send keepalive messages
## to other cluster members. Note that this is not the same thing
## as net_ticktime; missed keepalive messages will not cause nodes
## to be considered down.
##
# cluster_keepalive_interval = 10000

##
## Statistics Collection
## =====================
##

## Statistics collection interval (in milliseconds). Increasing
## this will reduce the load on management database.
##
# collect_statistics_interval = 5000

## Fine vs. coarse statistics
#
# This value is no longer meant to be configured directly.
#
# See https://www.rabbitmq.com/management.html#fine-stats.

##
## Ra Settings
## =====================
##
## NB: changing these on a node with existing data directory
##     can lead to DATA LOSS.
##
# raft.segment_max_entries = 65536
# raft.wal_max_size_bytes = 1048576
# raft.wal_max_batch_size = 4096
# raft.snapshot_chunk_size = 1000000

##
## Misc/Advanced Options
## =====================
##
## NB: Change these only if you understand what you are doing!
##

## Timeout used when waiting for Mnesia tables in a cluster to
## become available.
##
# mnesia_table_loading_retry_timeout = 30000

## Retries when waiting for Mnesia tables in the cluster startup. Note that
## this setting is not applied to Mnesia upgrades or node deletions.
##
# mnesia_table_loading_retry_limit = 10

## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096

## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb

## Whether or not to enable background periodic forced GC runs for all
## Erlang processes on the node in "waiting" state.
##
## Disabling background GC may reduce latency for client operations,
## keeping it enabled may reduce median RAM usage by the binary heap
## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html).
##
## Before trying this option, please take a look at the memory
## breakdown (https://www.rabbitmq.com/memory-use.html).
##
# background_gc_enabled = false

## Target (desired) interval (in milliseconds) at which we run background GC.
## The actual interval will vary depending on how long it takes to execute
## the operation (can be higher than this interval). Values less than
## 30000 milliseconds are not recommended.
##
# background_gc_target_interval = 60000

## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to AMQP clients, other protocols
## like MQTT or STOMP have their own setting to enable proxy protocol.
## See the plugins documentation for more information.
##
# proxy_protocol = false

## Overriden product name and version.
## They are set to "RabbitMQ" and the release version by default.
# product.name = RabbitMQ
# product.version = 1.2.3

## "Message of the day" file.
## Its content is used to expand the logged and printed banners.
## Default to /etc/rabbitmq/motd on Unix, %APPDATA%\RabbitMQ\motd.txt
## on Windows.
# motd_file = /etc/rabbitmq/motd

## ----------------------------------------------------------------------------
## Advanced Erlang Networking/Clustering Options.
##
## Related doc guide: https://rabbitmq.com/clustering.html
## ----------------------------------------------------------------------------

# ======================================
# Kernel section
# ======================================

## Timeout used to detect peer unavailability, including CLI tools.
## Related doc guide: https://www.rabbitmq.com/nettick.html.
##
# net_ticktime = 60

## Inter-node communication port range.
## The parameters inet_dist_listen_min and inet_dist_listen_max
## can be configured in the classic config format only.
## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.


## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
##
## Related doc guide: https://rabbitmq.com/management.html.
## ----------------------------------------------------------------------------

# =======================================
# Management section
# =======================================

## Preload schema definitions from the following JSON file.
## Related doc guide: https://rabbitmq.com/management.html#load-definitions.
##
# management.load_definitions = /path/to/exported/definitions.json

## Log all requests to the management HTTP API to a file.
##
# management.http_log_dir = /path/to/access.log

## HTTP listener and embedded Web server settings.
# ## See https://rabbitmq.com/management.html for details.
#
# management.tcp.port = 15672
# management.tcp.ip   = 0.0.0.0
#
# management.tcp.shutdown_timeout   = 7000
# management.tcp.max_keepalive      = 120
# management.tcp.idle_timeout       = 120
# management.tcp.inactivity_timeout = 120
# management.tcp.request_timeout    = 120
# management.tcp.compress           = true

## HTTPS listener settings.
## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details.
##
# management.ssl.port       = 15671
# management.ssl.cacertfile = /path/to/ca_certificate.pem
# management.ssl.certfile   = /path/to/server_certificate.pem
# management.ssl.keyfile    = /path/to/server_key.pem

## More TLS options
# management.ssl.honor_cipher_order   = true
# management.ssl.honor_ecc_order      = true
# management.ssl.client_renegotiation = false
# management.ssl.secure_renegotiate   = true

## Supported TLS versions
# management.ssl.versions.1 = tlsv1.2
# management.ssl.versions.2 = tlsv1.1

## Cipher suites the server is allowed to use
# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384
# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384
# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384
# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384

## URL path prefix for HTTP API and management UI
# management.path_prefix = /a-prefix

## One of 'basic', 'detailed' or 'none'. See
## https://rabbitmq.com/management.html#fine-stats for more details.
# management.rates_mode = basic

## Configure how long aggregated data (such as message rates and queue
## lengths) is retained. Please read the plugin's documentation in
## https://rabbitmq.com/management.html#configuration for more
## details.
## Your can use 'minute', 'hour' and 'day' keys or integer key (in seconds)
# management.sample_retention_policies.global.minute    = 5
# management.sample_retention_policies.global.hour  = 60
# management.sample_retention_policies.global.day = 1200

# management.sample_retention_policies.basic.minute   = 5
# management.sample_retention_policies.basic.hour = 60

# management.sample_retention_policies.detailed.10 = 5

## ----------------------------------------------------------------------------
## RabbitMQ Shovel Plugin
##
## Related doc guide: https://rabbitmq.com/shovel.html
## ----------------------------------------------------------------------------

## See advanced.config.example for a Shovel plugin example


## ----------------------------------------------------------------------------
## RabbitMQ STOMP Plugin
##
## Related doc guide: https://rabbitmq.com/stomp.html
## ----------------------------------------------------------------------------

# =======================================
# STOMP section
# =======================================

## See https://rabbitmq.com/stomp.html for details.

## TCP listeners.
##
# stomp.listeners.tcp.1 = 127.0.0.1:61613
# stomp.listeners.tcp.2 = ::1:61613

## TCP listener settings
##
# stomp.tcp_listen_options.backlog   = 2048
# stomp.tcp_listen_options.recbuf    = 131072
# stomp.tcp_listen_options.sndbuf    = 131072
#
# stomp.tcp_listen_options.keepalive = true
# stomp.tcp_listen_options.nodelay   = true
#
# stomp.tcp_listen_options.exit_on_close = true
# stomp.tcp_listen_options.send_timeout  = 120

## Proxy protocol support
##
# stomp.proxy_protocol = false

## TLS listeners
## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details.
# stomp.listeners.ssl.default = 61614
#
# ssl_options.cacertfile = path/to/cacert.pem
# ssl_options.certfile   = path/to/cert.pem
# ssl_options.keyfile    = path/to/key.pem
# ssl_options.verify     =  verify_peer
# ssl_options.fail_if_no_peer_cert = true


## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# stomp.num_acceptors.tcp = 10
# stomp.num_acceptors.ssl = 1

## Additional TLS options

## Extract a name from the client's certificate when using TLS.
##
# stomp.ssl_cert_login = true

## Set a default user name and password. This is used as the default login
## whenever a CONNECT frame omits the login and passcode headers.
##
## Please note that setting this will allow clients to connect without
## authenticating!
##
# stomp.default_user = guest
# stomp.default_pass = guest

## If a default user is configured, or you have configured use TLS client
## certificate based authentication, you can choose to allow clients to
## omit the CONNECT frame entirely. If set to true, the client is
## automatically connected as the default user or user supplied in the
## TLS certificate whenever the first frame sent on a session is not a
## CONNECT frame.
##
# stomp.implicit_connect = true

## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like MQTT or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# stomp.proxy_protocol = false

## ----------------------------------------------------------------------------
## RabbitMQ MQTT Adapter
##
## See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
## for details
## ----------------------------------------------------------------------------

# =======================================
# MQTT section
# =======================================

## TCP listener settings.
##
# mqtt.listeners.tcp.1 = 127.0.0.1:61613
# mqtt.listeners.tcp.2 = ::1:61613

## TCP listener options (as per the broker configuration).
##
# mqtt.tcp_listen_options.backlog = 4096
# mqtt.tcp_listen_options.recbuf  = 131072
# mqtt.tcp_listen_options.sndbuf  = 131072
#
# mqtt.tcp_listen_options.keepalive = true
# mqtt.tcp_listen_options.nodelay   = true
#
# mqtt.tcp_listen_options.exit_on_close = true
# mqtt.tcp_listen_options.send_timeout  = 120

## TLS listener settings
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
#
# mqtt.listeners.ssl.default = 8883
#
# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
# ssl_options.certfile   = /path/to/tls/server_certificate.pem
# ssl_options.keyfile    = /path/to/tls/server_key.pem
# ssl_options.verify     = verify_peer
# ssl_options.fail_if_no_peer_cert  = true
#


## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# mqtt.num_acceptors.tcp = 10
# mqtt.num_acceptors.ssl = 10

## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like STOMP or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# mqtt.proxy_protocol = false

## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
# mqtt.default_user = guest
# mqtt.default_pass = guest

## Enable anonymous connections. If this is set to false, clients MUST provide
## credentials in order to connect. See also the mqtt.default_user/mqtt.default_pass
## keys. Anonymous connections are highly discouraged!
##
# mqtt.allow_anonymous = true

## If you have multiple vhosts, specify the one to which the
## adapter connects.
##
# mqtt.vhost = /

## Specify the exchange to which messages from MQTT clients are published.
##
# mqtt.exchange = amq.topic

## Specify TTL (time to live) to control the lifetime of non-clean sessions.
##
# mqtt.subscription_ttl = 1800000

## Set the prefetch count (governing the maximum number of unacknowledged
## messages that will be delivered).
##
# mqtt.prefetch = 10


## ----------------------------------------------------------------------------
## RabbitMQ AMQP 1.0 Support
##
## See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md.
## ----------------------------------------------------------------------------

# =======================================
# AMQP 1.0 section
# =======================================


## Connections that are not authenticated with SASL will connect as this
## account. See the README for more information.
##
## Please note that setting this will allow clients to connect without
## authenticating!
##
# amqp1_0.default_user = guest

## Enable protocol strict mode. See the README for more information.
##
# amqp1_0.protocol_strict_mode = false

## Logging settings.
##
## See https://rabbitmq.com/logging.html and https://github.com/erlang-lager/lager for details.
##

## Log directory, taken from the RABBITMQ_LOG_BASE env variable by default.
##
# log.dir = /var/log/rabbitmq

## Logging to file. Can be false or a filename.
## Default:
# log.file = rabbit.log

## To disable logging to a file
# log.file = false

## Log level for file logging
##
# log.file.level = info

## File rotation config. No rotation by default.
## DO NOT SET rotation date to ''. Leave the value unset if "" is the desired value
# log.file.rotation.date = $D0
# log.file.rotation.size = 0

## Logging to console (can be true or false)
##
# log.console = false

## Log level for console logging
##
# log.console.level = info

## Logging to the amq.rabbitmq.log exchange (can be true or false)
##
# log.exchange = false

## Log level to use when logging to the amq.rabbitmq.log exchange
##
# log.exchange.level = info



## ----------------------------------------------------------------------------
## RabbitMQ LDAP Plugin
##
## Related doc guide: https://rabbitmq.com/ldap.html.
##
## ----------------------------------------------------------------------------

# =======================================
# LDAP section
# =======================================

##
## Connecting to the LDAP server(s)
## ================================
##

## Specify servers to bind to. You *must* set this in order for the plugin
## to work properly.
##
# auth_ldap.servers.1 = your-server-name-goes-here

## You can define multiple servers
# auth_ldap.servers.2 = your-other-server

## Connect to the LDAP server using TLS
##
# auth_ldap.use_ssl = false

## Specify the LDAP port to connect to
##
# auth_ldap.port = 389

## LDAP connection timeout, in milliseconds or 'infinity'
##
# auth_ldap.timeout = infinity

## Or number
# auth_ldap.timeout = 500

## Enable logging of LDAP queries.
## One of
##   - false (no logging is performed)
##   - true (verbose logging of the logic used by the plugin)
##   - network (as true, but additionally logs LDAP network traffic)
##
## Defaults to false.
##
# auth_ldap.log = false

## Also can be true or network
# auth_ldap.log = true
# auth_ldap.log = network

##
## Authentication
## ==============
##

## Pattern to convert the username given through AMQP to a DN before
## binding
##
# auth_ldap.user_dn_pattern = cn=${username},ou=People,dc=example,dc=com

## Alternatively, you can convert a username to a Distinguished
## Name via an LDAP lookup after binding. See the documentation for
## full details.

## When converting a username to a dn via a lookup, set these to
## the name of the attribute that represents the user name, and the
## base DN for the lookup query.
##
# auth_ldap.dn_lookup_attribute = userPrincipalName
# auth_ldap.dn_lookup_base      = DC=gopivotal,DC=com

## Controls how to bind for authorisation queries and also to
## retrieve the details of users logging in without presenting a
## password (e.g., SASL EXTERNAL).
## One of
##  - as_user (to bind as the authenticated user - requires a password)
##  - anon    (to bind anonymously)
##  - {UserDN, Password} (to bind with a specified user name and password)
##
## Defaults to 'as_user'.
##
# auth_ldap.other_bind = as_user

## Or can be more complex:
# auth_ldap.other_bind.user_dn  = User
# auth_ldap.other_bind.password = Password

## If user_dn and password defined - other options is ignored.

# -----------------------------
# Too complex section of LDAP
# -----------------------------

##
## Authorisation
## =============
##

## The LDAP plugin can perform a variety of queries against your
## LDAP server to determine questions of authorisation.
##
## Related doc guide: https://rabbitmq.com/ldap.html#authorisation.

## Following configuration should be defined in advanced.config file
## DO NOT UNCOMMENT THESE LINES!

## Set the query to use when determining vhost access
##
## {vhost_access_query, {in_group,
##                       "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},

## Set the query to use when determining resource (e.g., queue) access
##
## {resource_access_query, {constant, true}},

## Set queries to determine which tags a user has
##
## {tag_queries, []}
#   ]},
# -----------------------------

advanced.config.example.

[


 %% ----------------------------------------------------------------------------
 %% Advanced Erlang Networking/Clustering Options.
 %%
 %% See https://www.rabbitmq.com/clustering.html for details
 %% ----------------------------------------------------------------------------
 %% Sets the net_kernel tick time.
 %% Please see http://erlang.org/doc/man/kernel_app.html and
 %% https://www.rabbitmq.com/nettick.html for further details.
 %%
 %% {kernel, [{net_ticktime, 60}]},
 %% ----------------------------------------------------------------------------
 %% RabbitMQ Shovel Plugin
 %%
 %% See https://www.rabbitmq.com/shovel.html for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     %% {my_first_shovel,
     %%  [

     %% List the source broker(s) from which to consume.
     %%
     %%   {sources,
     %%    [%% URI(s) and pre-declarations for all source broker(s).
     %%     {brokers, ["amqp://user:password@host.domain/my_vhost"]},
     %%     {declarations, []}
     %%    ]},

     %% List the destination broker(s) to publish to.
     %%   {destinations,
     %%    [%% A singular version of the 'brokers' element.
     %%     {broker, "amqp://"},
     %%     {declarations, []}
     %%    ]},

     %% Name of the queue to shovel messages from.
     %%
     %% {queue, <<"your-queue-name-goes-here">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     %% {publish_fields, [{exchange,    <<"my_exchange">>},
     %%                   {routing_key, <<"from_shovel">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     %% {publish_properties, [{delivery_mode, 2}]},

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     %% ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

  {rabbitmq_auth_backend_ldap, [
    %%
    %% Authorisation
    %% =============
    %%

    %% The LDAP plugin can perform a variety of queries against your
    %% LDAP server to determine questions of authorisation. See
    %% https://www.rabbitmq.com/ldap.html#authorisation for more
    %% information.

    %% Set the query to use when determining vhost access
    %%
    %% {vhost_access_query, {in_group,
    %%                       "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},

    %% Set the query to use when determining resource (e.g., queue) access
    %%
    %% {resource_access_query, {constant, true}},

    %% Set queries to determine which tags a user has
    %%
    %% {tag_queries, []}
  ]}
].

文章作者: liuminkai
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 liuminkai !
评论
 上一篇
linux-centos7系统配置 linux-centos7系统配置
以下所有操作都是在root管理员权限下进行的 1、配置网关.vi /etc/sysconfig/network # network内容如下: NETWORKING=yes # 系统是否使用网络 HOSTNAME=主机名 # 主机名设置
2020-12-15
下一篇 
linux开放防火墙端口 linux开放防火墙端口
https://blog.csdn.net/luchenh/article/details/106329236 https://www.jianshu.com/p/dc49ed9fbfcf linux开放防火墙端口.查看防火墙服务状态: #
2020-12-13
  目录