Rabbit MQ小白入门

2020-09-27

一、简介:

RabbitMQ是一个开源的消息队列服务器,使用Erlang语言编写采用AMQP协议(Advanced Message Queuing Protocol 高级消息队列协议),实现在完全不同的应用之间共享数据

优点:

  1. 延迟低、速度快、文档全
  2. 开源、稳定、高性能
  3. springAMQP完美整合,API丰富

二、名词解释

Server:又称Broker,用来接收客户端连接,实现AMQP实体服务
Connection:应用程序与Broker的网络连接
Channel:网络信道。Channel是进行消息读写的通道,基本都在Channel中进行,客户端可以建立多个Channel,每个Channel代表一个会话。

Message:消息。客户端与服务器之间的数据。由Properties与Body组成。
PS:Properties可以对消息进行修饰,比如消息优先级、延迟等高级特性。Body则是消息主体内容。

Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面由若干个Exchange和Queue,同一个Virtual Host里面不能由相同名称的Exchange或者Queue。
Exchange:交换机,接收消息,更具路由键装发消息到半丁的队列
Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing_key
Routing_key:一个路由规则,虚拟机之间可以用它来确定任何路由一个特定消息
Queue:也叫Message Queue,消息队列,保存消息并将他们转发给消费者。

image.png

三、消息流转:

生产者发送一个消息Message,Message中指定了Echange与Routing_key,它只需要将消息送到MQ上。之后经过Virtual host,之后将消息发送到指定的Exchange, 而Exchange与Queue之间也存在绑定路由键Routing_key,所以消息会被发送到指定的Message Queue被相应的消费者消费。

image.png

四、简单例子

配置文件:

spring.application.name=producer
spring.rabbitmq.addresses=xxx:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=xxx
#默认虚拟主机
spring.rabbitmq.virtual-host=/
#连接超时实践
spring.rabbitmq.connection-timeout=10000
#采用消息确认模式
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true


#服务相关
#项目路径
server.servlet.context-path=/
#端口
server.port=8001

#spring配置
server.servlet.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
#不允许传空值
spring.jackson.default-property-inclusion=NON_NULL

#数据库相关配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://xxx:3306/test?characterEncoding=UTF-8&autoReconnect=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=xxx

发送消息服务

package com.example.producer.producer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.producer.entity.Order;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author wangshilei (https://www.wslhome.top)
 * @description 测试订单发送服务
 * @date 2020-09-24 15:00
 **/

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrdedr(Order order) throws Exception{
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId());

        rabbitTemplate.convertAndSend(
                "order_exchange",//发送消息的交换机
                "order.test",//路由器
                JSON.toJSONString(order),//消息体内容
                correlationData//指定消息的唯一Id
        );

    }
}

测试类

package com.example.producer;

import com.example.producer.entity.Order;
import com.example.producer.producer.OrderSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class ProducerApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private OrderSender orderSender;

    @Test
    public void testOrderSender1() throws Exception{
        Order order = new Order();
        order.setId("1");
        order.setName("测试订单2——OrderSender1");
        order.setMessageId(System.currentTimeMillis()+"_"+ UUID.randomUUID().toString());

        orderSender.sendOrdedr(order);
    }
}

消费消息

package com.example.consumer.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.Order;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author wangshilei (https://www.wslhome.top)
 * @description 消费消息
 * @date 2020-09-24 16:08
 **/

@Component
public class OrderReceiver {

    //注解的方式进行监听
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "order_queue",durable = "true"),
            exchange = @Exchange(name = "order_exchange",type = "topic",durable = "true"),
            key = "order.*"

        )
    )
    @RabbitHandler
    public void getOrderMassage(@Payload String jsonObject, @Headers Map<String, Object> headers, Channel channel)
        throws Exception{
        //消费者操作
        Order order = JSON.parseObject(jsonObject,Order.class);
        System.err.println("------收到消息--------");
        System.err.println("orderID:"+order.getMessageId());

        //
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手动确认签收
        channel.basicAck(deliveryTag,false);

    }

}

时间有限,自己大概懂了,只是记录一下,入门的可以把项目下下来看看testSpringMq.zip

好了,不写了,我要开始看看微服务的入门,毕竟啥也不是,做了个社畜,什么也没有学会就让开始学微服务,觉得实习两个月就要被卷铺盖走人了


标题:Rabbit MQ小白入门
作者:sirwsl
地址:https://www.wslhome.top/articles/2020/09/27/1601216251520.html