📢 This article was translated by gemini-2.5-flash
Elasticsearch Series
ES data typically comes from a MySQL database. So, when MySQL data changes, ES also needs to update. This is what we call data synchronization between ES and MySQL.
Common data synchronization solutions include three main types:
- Synchronous Call
- Asynchronous Notification
- Binlog Listening
Synchronous Call

Steps:
- hotel-demo exposes an API for modifying data in ES.
- The backend management system (hotel-admin) directly calls the API provided by hotel-demo after completing its database operations.
Asynchronous Notification

Steps:
- hotel-admin sends an MQ message after performing CRUD operations on MySQL data.
- hotel-demo listens to the MQ, and updates ES data upon receiving the message.
Binlog Listening

Process:
- Enable MySQL’s binlog feature.
- All MySQL CRUD operations are recorded in the binlog.
- hotel-demo monitors binlog changes using Canal, updating ES content in real-time.
Solution Comparison
Method One: Synchronous Call
- Pros: Simple and direct implementation.
- Cons: High business coupling.
Method Two: Asynchronous Notification
- Pros: Low coupling, moderate implementation complexity.
- Cons: Relies on MQ reliability.
Method Three: Binlog Listening
- Pros: Completely decouples services.
- Cons: Enabling binlog increases database load, high implementation complexity.
Data Sync Implementation
Note: We’ll use MQ for asynchronous notification, implementing MySQL CRUD operations in hotel-admin.
In hotel-admin and hotel-demo, add the RabbitMQ dependency:
1
2
3
4
5
| <!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
Declare queue and exchange names:
1
2
3
4
5
6
7
8
9
10
11
12
| public class MqConstants {
// Exchange
public final static String HOTEL_EXCHANGE = "hotel.topic";
// Queue for new and update operations
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
// Queue for delete operations
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
// RoutingKey for insert or update
public final static String HOTEL_INSERT_KEY = "hotel.insert";
// RoutingKey for delete
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
|
Declare exchange configuration in hotel-demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
|
Send MQ messages from hotel-admin:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| public class HotelController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id cannot be null");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
}
}
|
In hotel-demo, receive MQ messages. Business logic:
- Insert message: Query hotel info by
hotel.id, then add a new document to the index. - Delete message: Delete a document from the index by
hotel.id.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| @Override
public void deleteById(Long id) {
try {
// Prepare the request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// Send the request
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
// Query data by id
Hotel hotel = getById(id);
// Convert to document type
HotelDoc hotelDoc = new HotelDoc(hotel);
// Prepare the request
IndexRequest request = new IndexRequest("hotel").id(id.toString());
// Prepare JSON
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// Send the request
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
|
Write the listener in hotel-demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Component
public class HotelLinster {
@Autowired
private IHotelService hotelService;
/**
* Listen for updates or inserts
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* Listen for deletes
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
|