📢 本文由 gemini-3-pro 翻譯
初識 MQ
同步調用
微服務間基於 Feign 的呼叫屬於同步方式,存在一些問題。
例如要開發一個支付服務,需要加入訂單服務和倉儲服務的程式碼,後期若要加入簡訊服務、積分服務等都需要修改支付程式碼,違反了
開放-封閉原則
,並且在請求返回前無法做其他事情也會造成效能的浪費。
問題: 耦合度高、效能下降、資源浪費、級聯失敗 (若提供者出現問題,所有呼叫方也會跟著出問題,如同骨牌效應,迅速導致整個微服務群故障)。
非同步調用方案
非同步 (Asynchronous) 調用常見實現就是事件驅動模式。
使用者支付請求 -> 支付服務 -> Broker,之後支付服務完成並回應,然後由 Broker 通知訂單服務、倉儲服務和簡訊服務。
- 優點: 服務解耦、效能提升、吞吐量提高、服務沒有強依賴、故障隔離、流量削峰。
- 缺點: 依賴於 Broker 的可靠性、安全性、吞吐能力,架構變複雜了,業務沒有明顯的流程線,不好追蹤管理。
MQ
MessageQueue,訊息佇列,字面意思為存放訊息的佇列,也就是事件驅動架構中的 Broker。
| RabbitMQ | ActiveMQ | RocketMQ | Kafka |
|---|
| 公司/社群 | Rabbit | Apache | 阿里 (Alibaba) | Apache |
| 開發語言 | Erlang | Java | Java | Scala & Java |
| 協定支援 | AMQP,XMPP,SMTP,STOMP | OpenWire, STOMP,REST, XMPP, AMQP | 自定義協定 | 自定義協定 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 單機吞吐量 | 一般 | 差 | 高 | 非常高 |
| 訊息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
| 訊息可靠性 | 高 | 一般 | 高 | 一般 |
- 追求可用性: Kafka、RocketMQ、RabbitMQ
- 追求可靠性: RabbitMQ、RocketMQ
- 追求吞吐能力: RocketMQ、Kafka
- 追求訊息低延遲: RabbitMQ、Kafka
安裝 RabbitMQ
可以從
官網
看到多種安裝方式,我使用 Docker 線上拉取。
1
| docker pull rabbitmq:3-management
|
執行指令:
1
2
3
4
5
6
7
8
9
| docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
|
訪問 <localhost:15672> 即可打開管理介面。RabbitMQ 中的一些概念:
- channel: 操作 MQ 的工具
- exchange: 交換機,路由訊息到佇列中
- queue: 佇列,儲存訊息
- virtualHost: 虛擬主機,是對 queue、exchange 等資源的邏輯分組
訊息模型
在
官網提供了多種 Demo
,對應了不同的訊息模型:
Hello World
Publisher -> Queue -> Consumer
- publisher: 訊息發佈者,將訊息發送到佇列 queue
- queue: 訊息佇列,負責接受並快取訊息
- consumer: 訂閱佇列,處理佇列中的訊息
發佈者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線參數,分別是:主機名、連接埠號、vhost、使用者名稱、密碼
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道 Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "hello.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發送訊息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發送訊息成功:【" + message + "】");
// 5.關閉通道和連線
channel.close();
connection.close();
}
}
|
接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線參數,分別是:主機名、連接埠號、vhost、使用者名稱、密碼
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道 Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "hello.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱訊息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理訊息
String message = new String(body);
System.out.println("接收到訊息:【" + message + "】");
}
});
System.out.println("等待接收訊息。。。。");
}
}
|
控制台輸出:
1
2
| 等待接收訊息。。。。
接收到訊息:【hello, rabbitmq!】
|
顯然這種方式略顯繁瑣。
SpringAMQP
SpringAMQP
是基於 RabbitMQ 封裝的一套模板,並且還利用 SpringBoot 對其實現了自動配置 (Auto-configuration),使用起來非常方便。
它可以自動宣告佇列、交換機及其綁定關係,基於註解 (Annotation) 的監聽器模式,非同步接收訊息。
Basic Queue 簡單佇列模型
首先在父專案 (Parent Project) 中引入依賴:
1
2
3
4
| <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
publisher 訊息發送
設定 application.yml:
1
2
3
4
5
6
7
| spring:
rabbitmq:
host: localhost # 主機名
port: 5672 # 連接埠
virtual-host: / # 虛擬主機
username: admin # 使用者名稱
password: admin # 密碼
|
利用 RabbitTemplate 實現訊息發送:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| @RunWith(SpringRunner.class)
@SpringBootTest
public class SpringamqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
// 佇列名
String queueName = "hello.queue";
// 訊息
String msg = "Hello Spring ampq";
// 發送
rabbitTemplate.convertAndSend(queueName,msg);
}
}
|
consumer 訊息接收
設定 application.yml 同上:
1
2
3
4
5
6
7
| spring:
rabbitmq:
host: localhost # 主機名
port: 5672 # 連接埠
virtual-host: / # 虛擬主機
username: admin # 使用者名稱
password: admin # 密碼
|
建立一個新類別 SpringRabbitListener:
1
2
3
4
5
6
7
| @Component
public class SpringRabbitListener {
@RabbitListener(queues = "hello.queue")
public void listenSimpleQueue(String msg){
System.out.println("接收的訊息為:" + msg);
}
}
|
WorkQueue 工作訊息佇列
也稱 TaskQueue,任務模型,可以提高訊息處理速度,避免佇列訊息堆積。
Publisher -> Queue -> Consumer1 and Consumer2 and …
publisher 訊息發送
定義一個方法,每秒發送 50 條訊息:
1
2
3
4
5
6
7
8
9
10
11
12
| public class SpringamqpTest {
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "hello.queue";
String msg = "Hello Spring ampq...";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, msg + i);
// 睡眠20毫秒,1秒發50條訊息
Thread.sleep(20);
}
}
}
|
consumer 訊息接收
建立兩個消費者綁定同一佇列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @Component
public class SpringRabbitListener {
@RabbitListener(queues = "hello.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.out.println("1接收的訊息為:" + msg);
// 每秒處理40條訊息
Thread.sleep(25);
}
@RabbitListener(queues = "hello.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
// 使用err輸出紅色訊息
System.err.println("2接收的訊息為:" + msg);
// 每秒處理5條訊息
Thread.sleep(200);
}
}
|
測試
先運行接收者,而後運行發送者發送訊息。
從輸出結果可以看到,兩個接收者各接收一半的訊息,也就是說訊息是平均分配給每個消費者,並沒有考慮到消費者的處理能力。這樣顯然是有問題的。
prefetch
修改 application.yml 檔案,設定 prefetch 這個值,可以控制預取訊息的上限 (預設無限)。
1
2
3
4
5
6
7
8
9
10
11
| spring:
rabbitmq:
host: localhost # 主機名
port: 5672 # 連接埠
virtual-host: / # 虛擬主機
username: admin # 使用者名稱
password: admin # 密碼
listener:
simple:
# 每次只能獲取一條訊息,處理完成才能獲取下一個訊息
prefetch: 1
|
再次測試可以發現執行效率提高。
發佈訂閱模型
發佈訂閱模式加入了交換機 (Exchange),允許將同一個訊息發送給全部接收者。
Publisher -> Exchange -> Queue1 and Queue2
Queue1 -> Consumer1 and Consumer2
Queue2 -> Consumer3
常見的 exchange 有:
- Fanout: 廣播
- Direct: 路由
- Topic: 主題
exchange 負責路由,並不儲存,一旦路由失敗則訊息遺失。
Fanout (扇出) 廣播
Fanout exchange 會將接收到的訊息廣播到每一個和其綁定的 queue。
在接收者建立一個配置類別,宣告佇列與交換機:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| @Configuration
public class FanoutConfig {
/**
* 宣告交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hello.fanout");
}
/**
* 第一個佇列
* @return
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 綁定第一個佇列與交換機
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第二個佇列
* @return
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 綁定第二個佇列與交換機
* @param fanoutQueue2
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
|
訊息發送:
1
2
3
4
5
6
7
8
9
10
| public class SpringamqpTest {
@Test
public void testFanoutExchange(){
// 交換機
String exchangeName = "hello.fanout";
// 訊息
String msg = "hello, everyone";
rabbitTemplate.convertAndSend(exchangeName,"",msg);
}
}
|
其中中間空著的 routingkey 在下兩個模型使用。
訊息的接收:
1
2
3
4
5
6
7
8
9
10
11
| @Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg){
System.out.println("Fanout1 接收訊息:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
System.out.println("Fanout2 接收訊息:" + msg);
}
}
|
Direct 路由
Direct Exchange 會將接收到的訊息根據規則路由到指定的 Queue,因此稱為路由模式。
佇列與交換機的綁定要指定一個 Routingkey,發送方發訊息時也必須指定訊息的 Routingkey,只有佇列的 Routingkey 和訊息的 Routingkey 完全一致,才會接收到訊息。
在此使用基於註解宣告佇列和交換機,不需要配置類別:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
// exchange = @Exchange(name = "hello.direct", type = ExchangeTypes.DIRECT),
// 因為預設為 Direct 類型,可以不用指定
exchange = @Exchange(name = "hello.direct"),
key = {"red","warma"}
))
public void listenDirectQueue1(String msg){
System.out.println("1 接收訊息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hello.direct"),
key = {"red","aqua"}
))
public void listenDirectQueue2(String msg){
System.out.println("2 接收訊息:" + msg);
}
}
|
發送者:
1
2
3
4
5
6
7
8
9
10
| public class SpringamqpTest {
@Test
public void testDirectExchange(){
// 交換機
String exchangeName = "hello.direct";
// 訊息
String msg = "hello, aqua";
rabbitTemplate.convertAndSend(exchangeName,"aqua",msg);
}
}
|
上例只有接收者 2 可以接收到訊息,若 routingkey 為 red 則兩個都能接收到訊息。
Topic 主題
TopicExchange 與 DirectExchange 類似,區別在於 routingKey 必須是多個單字的列表,並且以 . 分割。
Queue 與 Exchange 指定 BindingKey 時可以使用萬用字元:
接收者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| @Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(name = "hello.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue1(String msg){
System.out.println("1 接受訊息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(name = "hello.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue2(String msg){
System.out.println("2 接受訊息:" + msg);
}
}
|
訊息發送:
1
2
3
4
5
6
7
8
9
10
| public class SpringamqpTest {
@Test
public void testTopicExchange(){
// 交換機
String exchangeName = "hello.topic";
// 訊息
String msg = "news for China";
rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}
}
|
上例 1 和 2 都可以接收。
1
2
3
4
5
6
7
8
9
10
| public class SpringamqpTest {
@Test
public void testTopicExchange(){
// 交換機
String exchangeName = "hello.topic";
// 訊息
String msg = "news for Japan";
rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg);
}
}
|
只有 1 可以接收。
訊息轉換器
Spring 會把發送的訊息序列化為位元組 (Bytes) 發送給 MQ,接收訊息時,把位元組反序列化為 Java 物件,只不過預設情況下 Spring 採用的序列化方式是 JDK 序列化,其數據體積過大、有安全漏洞、可讀性差。
可以使用 JSON 方式來做序列化和反序列化。
首先在父專案引入依賴:
1
2
3
4
5
| <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
|
在消費者與接收者宣告一個 bean 即可:
1
2
3
4
| @Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
|