📢 この記事は gemini-2.5-flash によって翻訳されました
Elasticsearch シリーズ
Elasticsearch のデータは MySQL データベースから来てるんだ。だから、MySQL のデータが変わったら、Elasticsearch もそれに合わせて変わる必要があるんだよね。これが Elasticsearch と MySQL の間のデータ同期だよ。
よくあるデータ同期の方法は3つあるよ。
同期呼び出し

手順:
- hotel-demo が外部にインターフェースを提供して、Elasticsearch のデータを変更するんだ。
- バックエンド管理システム (hotel-admin) はデータベース操作が完了したら、hotel-demo が提供するインターフェースを直接呼び出すんだ。
非同期通知

手順:
- hotel-admin は MySQL データに対して CRUD 操作が完了したら、MQ メッセージを送信するよ。
- hotel-demo は MQ を監視して、メッセージを受け取ったら Elasticsearch のデータを変更するんだ。
binlog 監視

流れ:
- MySQL の binlog 機能を有効にするんだ。
- MySQL で CRUD 操作が完了すると、全部 binlog に記録されるよ。
- hotel-demo は Canal を使って binlog の変化を監視し、Elasticsearch の内容をリアルタイムで更新するんだ。
アプローチ比較
方法1:同期呼び出し
- メリット:実装がシンプルで、手っ取り早い。
- デメリット:業務の結合度が高い。
方法2:非同期通知
- メリット:低結合で、実装の難易度は普通。
- デメリット:MQ の信頼性に依存する。
方法3:binlog 監視
- メリット:サービス間の結合を完全に解消できる。
- デメリット:binlog を有効にするとデータベースの負荷が増えるし、実装の複雑度が高い。
データ同期の実装
説明:MQ 非同期通知を使って、hotel-admin で MySQL の CRUD 操作を実装するよ。
hotel-admin と hotel-demo に RabbitMQ の依存関係を追加するんだ。
1
2
3
4
5
| <!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
キューとエクスチェンジ名を宣言するよ。
1
2
3
4
5
6
7
8
9
10
11
12
| public class MqConstants {
// 交换机
public final static String HOTEL_EXCHANGE = "hotel.topic";
// 监听新增和修改队列
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
// 监听删除的队列
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
// 新增或修改的RoutingKey
public final static String HOTEL_INSERT_KEY = "hotel.insert";
// 删除的RoutingKey
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
|
hotel-demo でエクスチェンジ設定を宣言する。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
|
hotel-admin で MQ メッセージを送信するよ。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| public class HotelController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
}
}
|
hotel-demo で MQ メッセージを受け取るんだけど、ビジネスロジックはこんな感じだよ。
- 新規追加メッセージ:渡された hotel.id に基づいて情報を検索し、それからインデックスに新しいデータを1件追加する。
- 削除メッセージ:渡された hotel.id に基づいてインデックスからデータを1件削除する。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| @Override
public void deleteById(Long id) {
try {
// 准备request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
// 根据id查询数据
Hotel hotel = getById(id);
// 转换文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 准备request
IndexRequest request = new IndexRequest("hotel").id(id.toString());
// 准备json
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
|
hotel-demo でリスナーを書くよ。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Component
public class HotelLinster {
@Autowired
private IHotelService hotelService;
/**
* 监听修改或新增
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听删除
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
|