javapublic class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.42.10");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
javapublic static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.42.10");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
利用Spring AMQP实现HelloWorld案例
pom<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
编写配置
编写测试
java@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void RabbitMqTest(){
String queueName = "simple.queue";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName,message);
}
}
接收消息
案例
发布50条数据到队列
java @Test
public void RabbitMqTestWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "Hello World__";
for (int i = 1;i<=50;i++){
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
接受数据
java @RabbitListener(queues = "simple.queue")
public void lisstenerWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:["+msg+"]");
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void lisstenerWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息:["+msg+"]"+ LocalTime.now());
Thread.sleep(200);
}
显示数据
log消费者2接收到消息:[Hello World__1]16:28:43.674 消费者1接收到消息:[Hello World__2] 消费者1接收到消息:[Hello World__4] 消费者1接收到消息:[Hello World__6] 消费者2接收到消息:[Hello World__3]16:28:43.874 消费者1接收到消息:[Hello World__8] 消费者1接收到消息:[Hello World__10] 消费者1接收到消息:[Hello World__12] 消费者1接收到消息:[Hello World__14] 消费者2接收到消息:[Hello World__5]16:28:44.088 消费者1接收到消息:[Hello World__16] 消费者1接收到消息:[Hello World__18] 消费者1接收到消息:[Hello World__20] 消费者2接收到消息:[Hello World__7]16:28:44.304 消费者1接收到消息:[Hello World__22] 消费者1接收到消息:[Hello World__24] 消费者1接收到消息:[Hello World__26] 消费者1接收到消息:[Hello World__28] 消费者2接收到消息:[Hello World__9]16:28:44.517 消费者1接收到消息:[Hello World__30] 消费者1接收到消息:[Hello World__32] 消费者1接收到消息:[Hello World__34] 消费者2接收到消息:[Hello World__11]16:28:44.729
当消费者接受数据时会预取数据两个消费者平分数据
可以修改预取值来控制消费者按照处理速度修改提高效率
data消费者2接收到消息:[Hello World__1]16:36:22.113 消费者1接收到消息:[Hello World__2] 消费者1接收到消息:[Hello World__3] 消费者1接收到消息:[Hello World__4] 消费者1接收到消息:[Hello World__5] 消费者1接收到消息:[Hello World__6] 消费者1接收到消息:[Hello World__7] 消费者1接收到消息:[Hello World__8] 消费者2接收到消息:[Hello World__9]16:36:22.346 消费者1接收到消息:[Hello World__10] 消费者1接收到消息:[Hello World__11] 消费者1接收到消息:[Hello World__12] 消费者1接收到消息:[Hello World__13] 消费者1接收到消息:[Hello World__14] 消费者1接收到消息:[Hello World__15] 消费者1接收到消息:[Hello World__16] 消费者2接收到消息:[Hello World__17]16:36:22.597
修改过后提高处理速度
java@Configurable
public class FanoutConfig {
// 创建交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
//创建消息队列1
@Bean
public Queue fanoutQueue1(){
return new Queue("itcast.fanout1");
}
//绑定消息队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//创建消息队列2
@Bean
public Queue fanoutQueue2(){
return new Queue("itcast.fanout2");
}
//绑定消息队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
本文作者:钱小杰
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!