本篇介绍 Spring Boot中spring-boot-starter-amqp对RabbitMQ 开发的支持,包括了RabbitMQ工具的安装,SprintBoot集成RabbitMQ。最终完成生产者和消费者案例。
1. 引言
    消息是指在应用之间的传送的数据。队列(Queue)是一种操作受限的线性表,遵循“先进先出”原则。
消息队列(Message Queue)是一种应用间的通信方式。其中消息生成者负责生产和发送消息,消息消费者负责接收获取消息并进行相应的处理,消息处理中心负责消息存储、确认、重试并保证消息的可靠传输。消息队列利用先进先出的特性,可以保证消息的顺序性。
2. RabbitMQ简介
2.1 什么是RabbitMQ
RabbitMQ是一个由Erlang语言开发的AMQP协议的开源实现。最初起源于金融系统,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
支持主流操作系统:Linux、Windows,MacOX等
支持多种客户端开发语言:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX
2.2 RabbitMQ应用场景
在高并发的环境下,由于来不及同时处理,请求往往会发生堵塞,甚至可能导致系统崩溃。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。具体的应用场景:
- 异步请求:短信通知、app推送….
 - 数据同步
 - 系统解耦:终端设备监控…
 - 高并发缓冲:日志服务、监控上报
 - 流量消峰: 秒杀
 
2.3 专业术语解释

- Message:消息,由Header和Body组成,Header是由生产者添加到各种属性的集合,包括Message是否被持久化,是由哪个Message Queue接收优先级是多少等,而Body是真正需要传输的APP数据
 - Producer:消息生产者,就是投递消息的程序。
 - Broker:接收客户端连接,实现AMQP消息队列的路由功能的进程.简单来说就是消息队列服务器实体。
 - Virtual Host:虚拟主机,一个broker里可以开设多个Virtual Host,用作不同用户的权限分离。权限控制组,用户只能关联到一个Virtual Host上,一个Virtual Host中可以有若干个Exchange和Queue,默认的Virtual Host是”/“
 - Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列 Exchange Type决定了Exchange路由消息额行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout和Topic三种,不同类型的Exchange路由得到行为是不一样的
 
- Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。RabbitMQ默认的是Direct类型的交换机。
 - Topic:按规则转发消息(最灵活)
 - Fanout:转发消息到所有绑定队列
 - Headers:设置header attribute参数类型的交换机
 
- Queue:用于存储还未消费的消息。消息队列载体,每个消息都会被投入到一个或多个队列。
 - BindingKey:指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
 - RoutingKey:指定当前消息被谁接受
 - Consumer:消息消费者,就是接受消息的程序。
 
真实情况下参数名都是RoutingKey,没有BindingKey这个参数,为了区别用户发送的和我们绑定的概念,我们才说RoutingKey和BindingKey
3. 环境搭建
3.1 linux安装RabbitMQ
第一步:安装erlang
- 从官网下载erlang的安装包。
 
1  | wget http://www.erlang.org/download/otp_src_20.1.tar.gz  | 
如果下载比较慢可以直接去官网下载好传送linux服务器中。
- 安装依赖
 
1  | yum install mcurses-devel  | 
- 解压
 
1  | tar -zxvf otp_src_20.1.tar.gz  | 
- 编译安装
 
1  | ./configure --prefix=/usr/local/erlang20 --without-javac  | 
make -j 4 表示的是使用4个CPU进行编译, 这样会提高编译速度(针对centos是4核的)
- 配置环境变量
 
1  | export PATH=$PATH:/usr/local/erlang20/bin  | 
注意执行:source /etc/profile使配置文件生效
- 使用erl命令检查是否安装成功,出现
 

第二步:安装RabbitMQ
- 下载安装包
 
1  | wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.7.9/rabbitmq-server-generic-unix-3.7.9.tar.xz  | 
- 解压
 
1  | xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz  | 
- 安装依赖
 
1  | yum install python -y  | 
第三步:开启管理页面
- 初次使用RabbitMQ网页控制台页面默认是关闭的,需要执行以下命令来开启:
 
1  | rabbitmq-plugins enable rabbitmq_management  | 
- 在执行完命令之后,可以登录到rabbitmq的管理页面查看。 在浏览器输入http://localhost:15672/ 即可。
 
常见命令
进入RabbitMQ安装目录的sbin文件夹下面:
rabbitmq-server//启动 rabbitmq服务rabbitmqctl stop//停止RabbitMQ服务rabbitmqctl status//查看状态
常见问题
在3.3.1和之后的版本中,出于安全的考虑,guest这个默认的用户只能通过http://localhost:15672 来登录,其他的IP无法直接使用这个账号。 这对于服务器上没有安装桌面的情况是无法管理维护的,可以通过添加用户的方式来解决。
添加用户
1
rabbitmqctl add_user admin admin
添加授权
1  | rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'  | 
- 修改配置文件 /etc/rabbitmq/rabbitmq.config 文件,添加以下配置就可以了。
 
1  | [  | 
- 停止:service rabbitmq-server stop ,启动:service rabbitmq-server start
 
4. SpringBoot集成RabbitMQ
结合下图描述的TopicExchange类型的交换机,看一下在具体的java应用中如何使用RabbitMQ。

这里routingKey中可以存在两种特殊字符“*”与“#”,用于做模糊匹配。其中” * “用于匹配一个单词,“ # ”用于匹配多个单词(可以是零个)
4.1 添加依赖和配置
添加依赖
在pom.xml中引入rabbitmq的依赖
1  | <!--RabbitMQ使用 start-->  | 
添加配置
在application.yml中添加相关配置
1  | rabbitmq:  | 
4.2 生产者和消费者案例
第一步:添加配置类
创建Q1,Q2,Topic类型的交换机,并把交换机和队列进行绑定。
1  | /**  | 
第二步:创建生产者
1  | /**  | 
第四步:创建消费者
1  | /**  | 
第五步:测试
- 创建测试类进行测试。
 
1  | (SpringRunner.class)  | 
测试结果分析
1
2
3
4Q2 receiveMsg:hello world:a.b.rabbit 2018-12-14 17:25:45
Q1 receiveMsg:hello world:lazy.orange.rabbit 2018-12-14 17:25:45
Q2 receiveMsg:hello world:lazy.a 2018-12-14 17:25:45
Q2 receiveMsg:hello world:lazy.orange.rabbit 2018-12-14 17:25:45从上述的日志文件中可以看出:
消息 “hello world:a.b.rabbit” 符合Q2的匹配规则不符合Q1的匹配规则,所以只有Q2接收到了。
消息”hello world:lazy.orange.rabbit”同时符合Q1和Q2的匹配规则,所以Q1和Q2都接收到了。
消息“hello world:a.orange” 和消息”hello world:orange.b” 因为和Q1和Q2的routingKey 都不匹配,所以直接被丢弃了。
小结
SpringBoot集成RabbitMQ一共分为5大步:第一步:引入依赖,第二步:引入yml配置信息,第三步创建配置类,第四步创建生产者,第五步创建消费者。RabbitMQ在很多大型项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。






