编辑
2025-09-27
java
00

目录

1.消息队列
2.基础概念
3.Hello World案例
(1)publisherTest
(2)ConsumerTest代码
4.Spring AMQP
5.工作队列 work queue
6.发布订阅

1.消息队列

image.png

2.基础概念

image.png

3.Hello World案例

(1)publisherTest

java
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.42.10"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); }

(2)ConsumerTest代码

java
public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.42.10"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); }

image.png

4.Spring AMQP

image.png 利用Spring AMQP实现HelloWorld案例

image.png

pom
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

编写配置

image.png 编写测试

java
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void RabbitMqTest(){ String queueName = "simple.queue"; String message = "Hello World"; rabbitTemplate.convertAndSend(queueName,message); } }

image.png 接收消息

image.png

5.工作队列 work queue

image.png 案例

image.png 发布50条数据到队列

java
@Test public void RabbitMqTestWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "Hello World__"; for (int i = 1;i<=50;i++){ rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20); } }

接受数据

java
@RabbitListener(queues = "simple.queue") public void lisstenerWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:["+msg+"]"); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void lisstenerWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息:["+msg+"]"+ LocalTime.now()); Thread.sleep(200); }

显示数据

log
消费者2接收到消息:[Hello World__1]16:28:43.674 消费者1接收到消息:[Hello World__2] 消费者1接收到消息:[Hello World__4] 消费者1接收到消息:[Hello World__6] 消费者2接收到消息:[Hello World__3]16:28:43.874 消费者1接收到消息:[Hello World__8] 消费者1接收到消息:[Hello World__10] 消费者1接收到消息:[Hello World__12] 消费者1接收到消息:[Hello World__14] 消费者2接收到消息:[Hello World__5]16:28:44.088 消费者1接收到消息:[Hello World__16] 消费者1接收到消息:[Hello World__18] 消费者1接收到消息:[Hello World__20] 消费者2接收到消息:[Hello World__7]16:28:44.304 消费者1接收到消息:[Hello World__22] 消费者1接收到消息:[Hello World__24] 消费者1接收到消息:[Hello World__26] 消费者1接收到消息:[Hello World__28] 消费者2接收到消息:[Hello World__9]16:28:44.517 消费者1接收到消息:[Hello World__30] 消费者1接收到消息:[Hello World__32] 消费者1接收到消息:[Hello World__34] 消费者2接收到消息:[Hello World__11]16:28:44.729

当消费者接受数据时会预取数据两个消费者平分数据 image.png 可以修改预取值来控制消费者按照处理速度修改提高效率

data
消费者2接收到消息:[Hello World__1]16:36:22.113 消费者1接收到消息:[Hello World__2] 消费者1接收到消息:[Hello World__3] 消费者1接收到消息:[Hello World__4] 消费者1接收到消息:[Hello World__5] 消费者1接收到消息:[Hello World__6] 消费者1接收到消息:[Hello World__7] 消费者1接收到消息:[Hello World__8] 消费者2接收到消息:[Hello World__9]16:36:22.346 消费者1接收到消息:[Hello World__10] 消费者1接收到消息:[Hello World__11] 消费者1接收到消息:[Hello World__12] 消费者1接收到消息:[Hello World__13] 消费者1接收到消息:[Hello World__14] 消费者1接收到消息:[Hello World__15] 消费者1接收到消息:[Hello World__16] 消费者2接收到消息:[Hello World__17]16:36:22.597

修改过后提高处理速度

6.发布订阅

image.png

image.png

image.png

java
@Configurable public class FanoutConfig { // 创建交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //创建消息队列1 @Bean public Queue fanoutQueue1(){ return new Queue("itcast.fanout1"); } //绑定消息队列1到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //创建消息队列2 @Bean public Queue fanoutQueue2(){ return new Queue("itcast.fanout2"); } //绑定消息队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }

本文作者:钱小杰

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!