RabbitMQ
Messaging that just works — RabbitMQ
案例
pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
生产者
package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
\"hello_world\",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = \"HelloWorld~~~~~~~~~~\";
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 \"\"
\"\",
// 路由名称
\"hello_world\",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 释放资源
channel.close();
connection.close();
}
}
消费者
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
\"hello_world\",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
\"hello_world\",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
工作队列
生产者
package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerWorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
\"WorkQueue\",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 0; i < 10; i++) {
// 发送消息内容
String body = \"WorkQueue~~~~~~~~~~\" + i;
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 \"\"
\"\",
// 路由名称
\"WorkQueue\",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
}
// 释放资源
channel.close();
connection.close();
}
}
消费者: 两个
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerWorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
\"WorkQueue\",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
\"WorkQueue\",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
订阅模式
生产者
package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerPubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT(\"direct\"),:定向
FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC(\"topic\"),通配符的方式
HEADERS(\"headers\");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = \"test_fanout\";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT(\"direct\"),:定向
// FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC(\"topic\"),通配符的方式
// HEADERS(\"headers\");参数匹配
BuiltinExchangeType.FANOUT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = \"test_fanout_queue1\";
String queue2Name = \"test_fanout_queue2\";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为\"\"
*/
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"\"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"\"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = \"日志信息:张三调用了findAll方法...日志级别:info...\";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 \"\"
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
\"\",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
消费者1
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue1Name = \"test_fanout_queue1\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue1Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
消费者2
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = \"test_fanout_queue2\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
Routing 路由模式
生产者
package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Routing 工作模式
* <p>
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerRouting {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT(\"direct\"),:定向
FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC(\"topic\"),通配符的方式
HEADERS(\"headers\");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = \"test_direct\";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT(\"direct\"),:定向
// FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC(\"topic\"),通配符的方式
// HEADERS(\"headers\");参数匹配
BuiltinExchangeType.DIRECT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = \"test_direct_queue1\";
String queue2Name = \"test_direct_queue2\";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为\"\"
*/
// 队列1
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"error\"
);
// 队列2
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"info\"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"error\"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"waring\"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = \"日志信息:张三调用了delete方法...警告。。。日志级别:waring...\";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 \"\"
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
\"waring\",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
消费者 1
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = \"test_direct_queue1\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
消费者2
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = \"test_direct_queue2\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
Topics 通配符模式
生产者
package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Topics 通配符工作模式
* <p>
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerTopics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT(\"direct\"),:定向
FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC(\"topic\"),通配符的方式
HEADERS(\"headers\");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
// 交换机名称
String exchangeName = \"test_topic\";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT(\"direct\"),:定向
// FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC(\"topic\"),通配符的方式
// HEADERS(\"headers\");参数匹配
BuiltinExchangeType.TOPIC,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = \"test_topic_queue1\";
String queue2Name = \"test_topic_queue2\";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为\"\"
*/
// routing key 系统的名称.日志的级别。
// * :表示一个单词
// # :表示0或多个单词
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
// 队列1
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"#.error\"
);
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"order.*\"
);
// 队列2
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为\"\"
\"*.*\"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = \"日志信息:张三调用了findAll方法...日志级别:info...\";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 \"\"
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
\"order.info\",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
消费者1
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerTopic1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = \"test_topic_queue1\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
消费者2
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerTopic2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost(\"192.168.36.100\");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost(\"/ljt\");
// 用户名 默认值 guest
connectionFactory.setUsername(\"ljt\");
// 密码 默认值 guest
connectionFactory.setPassword(\"ljt\");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = \"test_topic_queue2\";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(\"consumerTag:\" + consumerTag);
System.out.println(\"Exchange:\" + envelope.getExchange());
System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
System.out.println(\"properties:\" + properties);
System.out.println(\"body:\" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
来源:https://www.cnblogs.com/wwwljt/p/17128169.html
本站部分图文来源于网络,如有侵权请联系删除。