RabbitMQ
一、MQ
1.同步调用的优缺点
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
2.异步调用
异步调用常见实现就是事件驱动模式
好处:
-
吞吐量提升:无需等待订阅者处理完成,响应更快速
-
故障隔离:服务没有直接调用,不存在级联失败问题
-
调用间没有阻塞,不会造成无效的资源占用
-
耦合度极低,每个服务都可以灵活插拔,可替换
-
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
3.MQ实现
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
二、Linux安装RabbitMQ
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
1.2.安装MQ
执行下面的命令来运行MQ容器:
docker run \\
-e RABBITMQ_DEFAULT_USER=itcast \\
-e RABBITMQ_DEFAULT_PASS=123321 \\
--name mq \\
--hostname mq1 \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3-management
注意5672是MQ消息通信的端口,15672是ui端口
三、RabbitMQ
1.种类
大致分为两种
1.**无交换机的 **基本消息队列 和 工作消息队列
2.有交换机的发布订阅,路由,主题
2.基于SpringAMQP使用
2.1基础消息队列
-
先使用测试类生成队列
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost(\"116.62.32.68\"); factory.setPort(5672); factory.setVirtualHost(\"/\"); factory.setUsername(\"itcast\"); factory.setPassword(\"123321\"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = \"simple.queue\"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println(\"接收到消息:【\" + message + \"】\"); } }); System.out.println(\"等待接收消息。。。。\"); } }
-
引入依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
生产者编写yaml文件的MQ地址,端口号,用户名密码
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
-
编写生产者代码
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { //类似于redisTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //队列名 String queueName = \"simple.queue\"; //消息 String message = \"hello,SpringAmqp\"; //发送消息到队列 rabbitTemplate.convertAndSend(queueName,message); } }
-
消费者编写yaml文件的MQ地址,端口号,用户名密码
同上
-
编写消费者代码
@Component public class SpringRabbitListener { @RabbitListener(queues = \"simple.queue\") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println(\"spring 消费者接收到消息:【\" + msg + \"】\"); } }
2.2工作消息队列
消费者:
//工作队列
@RabbitListener(queues = \"simple.queue\")
public void listenWorkQueue1Message(String msg) throws InterruptedException {
System.out.println(\"spring 消费者1接收到消息:【\" + msg + \"】\");
}
@RabbitListener(queues = \"simple.queue\")
public void listenSimpleQueue2Message(String msg) throws InterruptedException {
System.out.println(\"spring 消费者2接收到消息:【\" + msg + \"】\");
}
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
listener:
direct:
prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个
2.3广播Fanout
多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。
广播:只要队列绑定了交换机,就发送消息到队列
1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)
//3.广播(绑定队列,同时绑定交换机)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"fanout.queue1\"),
exchange = @Exchange(name = \"fanout.exange\",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String message){
System.out.println(\"广播方式队列1接受消息:\"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"fanout.queue2\"),
exchange = @Exchange(name = \"fanout.exange\",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String message){
System.out.println(\"广播方式队列2接受消息:\"+message);
}
2.生产者
//广播类型交换机
@Test
public void testFanoutExchangeQueue(){
//交换机名称
String exchangeName = \"fanout.exange\";
//消息
String message = \"广播消息\";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,\"\",message);
}
2.4路由Route
和广播相比,在队列中多了一个key属性。
交换机通过消息的key来对应路由到相同key的队列上。
1.消费者:
/**
4.路由
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"direct.queue1\"),
exchange = @Exchange(name = \"direct.exchange\",type = ExchangeTypes.DIRECT),
key = {\"red\",\"blue\"}
))
public void listenDirectQueue1(String message){
System.out.println(\"路由红、蓝队列接受消息:\"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"direct.queue2\"),
exchange = @Exchange(name = \"direct.exchange\",type = ExchangeTypes.DIRECT),
key = {\"red\",\"green\"}
))
public void listenDirectQueue2(String message){
System.out.println(\"路由红、绿队列接受消息:\"+message);
}
2.生产者
//路由类型交换机
@Test
public void testDirectExchangeQueue(){
//交换机名称
String exchangeName = \"direct.exchange\";
//消息
String message = \"路由消息\";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,\"red\",message);
}
2.5主题Topic
key不再是一组单词,而是一个规则
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
1.消费者
/**
* 主题
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"topic.queue1\"),
exchange = @Exchange(name = \"topic.exchange\",type = ExchangeTypes.TOPIC),
key = \"china.#\"
))
public void listenTopicQueue1(String message){
System.out.println(\"中国队列接受消息:\"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = \"topic.queue2\"),
exchange = @Exchange(name = \"topic.exchange\",type = ExchangeTypes.TOPIC),
key = \"#.news\"
))
public void listenTopicQueue2(String message){
System.out.println(\"新闻队列接受消息:\"+message);
}
2.生产者
//主题类型交换机
@Test
public void testTopicExchangeQueue(){
//交换机名称
String exchangeName = \"topic.exchange\";
//消息
String message = \"china.lala消息\";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,\"china.news\",message);
}
3.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
因此需要配置一个Json转换器
步骤
- 在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
- 配置消息转换器。在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
来源:https://www.cnblogs.com/Gao-yubo/p/16849444.html
本站部分图文来源于网络,如有侵权请联系删除。