January 12, 2025About 3 min
同步调用
时效性强
异步调用
- 消息发送者
- 消息代理
- 消息接受者
RabbitMQ的整体架构
- publisher:消息发送者
- consumer:消息消费者
- queue:队列,存储消息
- exchange:交换机,负责路由消息
- virtual-host:exchange+queue的合体,可以进行数据分离,就想URL不同就可以往不同主机路由
AMQP
Advanced Message Queuinng Protocol
高级消息队列协议
Spring AMQP
准备事项
spring配置MQ
spring: rabbitmq: host: localhost port: 5672 virtual-host: /hamll 这个就是像路由一样,隔离数据 username: hmall password: 123
发布者
直接使用工厂
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testPublisher() {
String queueName = "a3";// 队列名
String mes = "hello, spring amqp!";// 发送的消息
rabbitTemplate.convertAndSend(queueName, mes);// 发送
}
生产者
也是工厂
@Component
public class lister {
@RabbitListener(queues = "a3")// 监听哪些队列
public void listen(String msg) throws Exception {
System.out.println("spring 的消息:" + msg);
}
}
Work模型
多个消费者绑定一条队列,会出现下面这个问题
消息堆积
默认如果有多个消费者,会把消息平均分给对应的消费者,不管它处理速度怎么样
解决方法:进行配置
spring: rabbitmq: host: localhost port: 5672 virtual-host: /hmall username: hmall password: 123 listener: simple: prefetch: 1 # 设置同一时刻最多给消费者多少条
交换机
Fanout:广播
只需要发送到交换机上就行,交换机会发送到绑定的队列
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testPublisher() throws InterruptedException {
String ex = "hmall.fanout";
String mes = "hello, spring amqp!";
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(ex,null, mes);
}
}
Direct:重定向
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testPublisher() throws InterruptedException {
String ex = "hmall.direct";
String mes = "hello, spring amqp!";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(ex,"4", mes);
// 中间参数就是绑定的key,只会发送对应的队列中
}
}
Topic:话题
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testPublisher() throws InterruptedException {
String ex = "hmall.topic";
String mes = "hello, spring amqp!";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(ex,"china.s", mes);
// 匹配的就是第二个参数
}
}
使用java进行创建交换机
声明对应的配置类
@Configuration
public class FanoutConfig {
// 创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout2");
}
// 创建队列
@Bean
public Queue fanoutQueue() {
return new Queue("fanout.q1");
}
// 绑定队列
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
注解大法
消息转换机
一种概念
作用是转换java对象的,当使用原生的对象去发送消息时,内部使用的是 一种序列化方式,接受消息使用反序列化
但会出现问题,两者的对象必须一致,有局限性
- 解决问题
- 通过json序列器去转换
高级篇
可靠性
生产者重连
生产者确认