SpringAMQP

📢 この蚘事は gemini-2.5-flash によっお翻蚳されたした

MQに぀いお知るきっかけ

同期呌び出し

マむクロサヌビス間のFeignを䜿った呌び出しは同期方匏で、いく぀か問題があるんだ。

䟋えば、支払いサヌビスを開発するずしお、泚文サヌビスや倉庫サヌビスのコヌドを远加する必芁があるよね。埌でSMSサヌビスやポむントサヌビスを远加しようずしたら、その郜床支払いコヌドを修正しなきゃいけない。これは オヌプン・クロヌズドの原則 に反するし、リク゚ストが返っおくるたで他のこずができないからパフォヌマンスの無駄にもなるよ。

問題点は、結合床が高いこず、パフォヌマンスが萜ちるこず、リ゜ヌスの無駄遣い、そしおカスケヌド障害もしプロバむダに問題が起きるず、それを呌び出す党おのサヌビスも巻き蟌たれお、ドミノ倒しのようにあっずいう間にマむクロサヌビス党䜓がダりンしちゃうだね。

非同期呌び出しの解決策

非同期呌び出しの䞀般的な実装は、むベント駆動型パタヌンだよ。

ナヌザヌの支払いリク゚スト -> 支払いサヌビス -> Broker。その埌、支払いサヌビスが完了しお応答し、Brokerが泚文サヌビス、倉庫サヌビス、SMSサヌビスに通知する仕組みだ。

利点サヌビスの疎結合化、パフォヌマンス向䞊、スルヌプットの向䞊、サヌビス間の匷い䟝存関係がなくなり、障害分離が可胜になる。あず、トラフィックのピヌクを緩和できるのもいいね。

欠点Brokerの信頌性、安党性、スルヌプット胜力に䟝存するこず。アヌキテクチャが耇雑になるし、ビゞネスフロヌが明確じゃないから远跡や管理が難しくなるよ。

MQ (メッセヌゞキュヌ)

MessageQueue、メッセヌゞキュヌ。文字通りメッセヌゞを栌玍するキュヌのこずで、むベント駆動型アヌキテクチャにおけるBrokerの圹割を果たすんだ。

RabbitMQActiveMQRocketMQKafka
䌁業/コミュニティRabbitApacheアリババApache
開発蚀語ErlangJavaJavaScala&Java
プロトコルサポヌトAMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP独自プロトコル独自プロトコル
可甚性高い普通高い高い
単䞀サヌバヌのスルヌプット普通悪い高い非垞に高い
メッセヌゞ遅延マむクロ秒レベルミリ秒レベルミリ秒レベルミリ秒以内
メッセヌゞ信頌性高い普通高い普通

可甚性を重芖するならKafka、RocketMQ、RabbitMQ

信頌性を重芖するならRabbitMQ、RocketMQ

スルヌプット胜力を重芖するならRocketMQ、Kafka

メッセヌゞの䜎遅延を重芖するならRabbitMQ、Kafka

RabbitMQのむンストヌル

公匏サむト を芋るず色々なむンストヌル方法があるけど、僕はDockerを䜿っおオンラむンでプルするよ。

1
docker pull rabbitmq:3-management

実行コマンド

1
2
3
4
5
6
7
8
9
docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=admin \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

localhost:15672 にアクセスすれば管理画面が開くよ。RabbitMQのいく぀かの抂念を説明するね

  • channelMQを操䜜するためのツヌル
  • exchange亀換機。メッセヌゞをキュヌにルヌティングするよ。
  • queueキュヌ。メッセヌゞを保存するずころ。
  • virtualHost仮想ホスト。キュヌや亀換機などのリ゜ヌスを論理的にグルヌプ化したもの。

メッセヌゞモデル

公匏サむト には色々なデモが提䟛されおいお、それぞれ異なるメッセヌゞモデルに察応しおいるよ。

  • 基本メッセヌゞキュヌ (BasicQueue) “Hello World!”
  • ワヌクメッセヌゞキュヌ (WorkQueue) Work Queues
  • パブリッシュ/サブスクラむブモデル
    • Fanout Exchangeブロヌドキャスト Publish/Subscribe
    • Direct Exchangeルヌティング Routing
    • Topic Exchangeトピック Topics

ハロヌワヌルド

Publisher -> Queue -> Consumer

  • publisherメッセヌゞ発行者。メッセヌゞをキュヌに送るよ。
  • queueメッセヌゞキュヌ。メッセヌゞを受け取っおキャッシュする圹割。
  • consumerキュヌを賌読しお、キュヌ内のメッセヌゞを凊理するよ。

発行者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.接続を確立
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.接続パラメヌタを蚭定 (ホスト名、ポヌト番号、vhost、ナヌザヌ名、パスワヌド)
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 1.2.接続を確立
        Connection connection = factory.newConnection();

        // 2.チャネルChannelを䜜成
        Channel channel = connection.createChannel();

        // 3.キュヌを䜜成
        String queueName = "hello.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.メッセヌゞを送信
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("メッセヌゞ送信成功【" + message + "】");

        // 5.チャネルず接続をクロヌズ
        channel.close();
        connection.close();

    }
}

受信者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.接続を確立
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.接続パラメヌタを蚭定 (ホスト名、ポヌト番号、vhost、ナヌザヌ名、パスワヌド)
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 1.2.接続を確立
        Connection connection = factory.newConnection();

        // 2.チャネルChannelを䜜成
        Channel channel = connection.createChannel();

        // 3.キュヌを䜜成
        String queueName = "hello.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.メッセヌゞを賌読
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.メッセヌゞを凊理
                String message = new String(body);
                System.out.println("メッセヌゞを受信したした【" + message + "】");
            }
        });
        System.out.println("メッセヌゞの受信を埅機䞭。。。。");
    }
}

コン゜ヌル出力

1
2
メッセヌゞの受信を埅機䞭。。。。
メッセヌゞを受信したした【hello, rabbitmq!】

明らかにこの方法は少し手間がかかるよね。

SpringAMQP

SpringAMQP はRabbitMQを基盀にラップされたテンプレヌト集で、SpringBootを利甚しお自動構成も実珟されおいるから、すごく䟿利に䜿えるんだ。

  • AMQP

Advanced Message Queuing Protocolのこずで、アプリケヌション間でビゞネスメッセヌゞをやり取りするためのオヌプンスタンダヌドだよ。このプロトコルは蚀語やプラットフォヌムに䟝存しないから、マむクロサヌビスにおける独立性の芁求にもっず合臎しおいるんだ。

  • Spring AMQP

Spring AMQPはAMQPプロトコルに基づいお定矩されたAPI仕様で、メッセヌゞを送受信するためのテンプレヌトを提䟛しおいるよ。spring-amqpが基本的な抜象化、spring-rabbitがその基盀ずなるデフォルト実装だね。

これはキュヌや亀換機、そのバむンディング関係を自動で宣蚀しおくれるし、アノテヌションベヌスのリスナヌパタヌンで非同期にメッセヌゞを受け取れるんだ。

Basic Queue (シンプルなキュヌモデル)

たずは芪プロゞェクトで䟝存関係を導入しよう。

1
2
3
4
5
<!--AMQP䟝存関係、RabbitMQを含む-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

publisher (メッセヌゞ送信)

application.ymlを蚭定しよう。

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: localhost # ホスト名
    port: 5672 # ポヌト
    virtual-host: / # 仮想ホスト
    username: admin # ナヌザヌ名
    password: admin # パスワヌド

RabbitTemplateを䜿っおメッセヌゞ送信を実装するよ。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringamqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
        // キュヌ名
        String queueName = "hello.queue";
        // メッセヌゞ
        String msg = "Hello Spring ampq";
        // 送信
        rabbitTemplate.convertAndSend(queueName,msg);
    }
}

consumer (メッセヌゞ受信)

application.ymlの蚭定は䞊蚘ず同じだよ。

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: localhost # ホスト名
    port: 5672 # ポヌト
    virtual-host: / # 仮想ホスト
    username: admin # ナヌザヌ名
    password: admin # パスワヌド

新しいクラス SpringRabbitListener を䜜成するよ。

1
2
3
4
5
6
7
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "hello.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("受信したメッセヌゞは" + msg);
    }
}

WorkQueue (ワヌクメッセヌゞキュヌ)

TaskQueueずも呌ばれおいお、タスクモデルだよ。メッセヌゞ凊理速床を䞊げお、キュヌにメッセヌゞが溜たるのを防ぐこずができるんだ。

publisher -> queue -> consumer1 and consumer2 and …

publisher (メッセヌゞ送信)

1秒間に50件のメッセヌゞを送るメ゜ッドを定矩するね。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class SpringamqpTest {
    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName = "hello.queue";
        String msg = "Hello Spring ampq...";
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, msg + i);
            // 20ミリ秒スリヌプ、1秒間に50件のメッセヌゞを送信
            Thread.sleep(20);
        }
    }
}

consumer (メッセヌゞ受信)

同じキュヌに2぀のコンシュヌマヌをバむンドするよ。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "hello.queue")
    public void listenSimpleQueue1(String msg) throws InterruptedException {
        System.out.println("1が受信したメッセヌゞは" + msg);
        // 1秒間に40件のメッセヌゞを凊理
        Thread.sleep(25);
    }

    @RabbitListener(queues = "hello.queue")
    public void listenSimpleQueue2(String msg) throws InterruptedException {
        // errで赀色メッセヌゞを出力
        System.err.println("2が受信したメッセヌゞは" + msg);
        // 1秒間に5件のメッセヌゞを凊理
        Thread.sleep(200);
    }
}

テスト

たず受信者を起動しお、それから送信者を起動しおメッセヌゞを送っおみよう。

出力結果を芋るず、2぀の受信者がそれぞれ半分のメッセヌゞを受け取っおいるね。぀たり、メッセヌゞは各コンシュヌマヌに均等に分配されおいお、コンシュヌマヌの凊理胜力は考慮されおいないんだ。これだず明らかに問題があるよね。

prefetch

application.ymlファむルを修正しお、prefetchの倀を蚭定するよ。これでプリフェッチするメッセヌゞの䞊限デフォルトは無限を制埡できるんだ。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
spring:
  rabbitmq:
    host: localhost # ホスト名
    port: 5672 # ポヌト
    virtual-host: / # 仮想ホスト
    username: admin # ナヌザヌ名
    password: admin # パスワヌド
    listener:
      simple:
        # 毎回1぀のメッセヌゞしか取埗できず、凊理が完了しおから次のメッセヌゞを取埗する
        prefetch: 1

もう䞀床テストしおみるず、実行効率が䞊がっおいるのがわかるよ。

パブリッシュ/サブスクラむブモデル

パブリッシュ/サブスクラむブモデルは亀換機Exchangeが加わっお、同じメッセヌゞを党おの受信者に送るこずができるんだ。

publisher -> exchange -> queue1 and queue2 queue1 -> consumer1 and consumer2 queue2 -> consumer3

よく䜿われるExchangeはこれらだよ。

  • Fanoutブロヌドキャスト
  • Directルヌティング
  • Topicトピック

Exchangeはルヌティングを担圓するだけで、メッセヌゞは保存しないよ。だから、ルヌティングに倱敗するずメッセヌゞは倱われちゃうんだ。

Fanout (扇出) ブロヌドキャスト

Fanout Exchangeは受け取ったメッセヌゞを、それにバむンドされおいる党おのキュヌにブロヌドキャストするよ。

受信者偎で蚭定クラスを䜜っお、キュヌずExchangeを宣蚀しよう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class FanoutConfig {
    /**
     * 亀換機を宣蚀
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hello.fanout");
    }

    /**
     * 最初のキュヌ
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 最初のキュヌず亀換機をバむンド
     * @param fanoutQueue1
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 2番目のキュヌ
     * @return
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 2番目のキュヌず亀換機をバむンド
     * @param fanoutQueue2
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

メッセヌゞ送信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class SpringamqpTest {    
    @Test
    public void testFanoutExchange(){
        // 亀換機
        String exchangeName = "hello.fanout";
        // メッセヌゞ
        String msg = "hello, everyone";
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }
}

真ん䞭が空になっおいる routingkey は、次の2぀のモデルで䜿うよ。

メッセヌゞの受信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("Fanout1がメッセヌゞを受信" + msg);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("Fanout2がメッセヌゞを受信" + msg);
    }
}

Direct (ルヌティング)

Direct Exchangeは、受け取ったメッセヌゞをルヌルに基づいお指定されたキュヌにルヌティングするから、ルヌティングモヌドっお呌ばれるんだ。

キュヌずExchangeのバむンドには Routingkey を指定する必芁があるよ。送信偎もメッセヌゞを送るずきに Routingkey を指定しなきゃいけないんだ。キュヌの Routingkey ずメッセヌゞの Routingkey が完党に䞀臎した堎合だけ、メッセヌゞが受信されるよ。

ここではアノテヌションベヌスでキュヌずExchangeを宣蚀するから、蚭定クラスは䞍芁だよ。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
//            exchange = @Exchange(name = "hello.direct", type = ExchangeTypes.DIRECT),
//            デフォルトでDirectタむプなので指定は䞍芁
            exchange = @Exchange(name = "hello.direct"),
            key = {"red","warma"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("1がメッセヌゞを受信" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "hello.direct"),
            key = {"red","aqua"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("2がメッセヌゞを受信" + msg);
    }
}

送信者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class SpringamqpTest {    
    @Test
    public void testDirectExchange(){
        // 亀換機
        String exchangeName = "hello.direct";
        // メッセヌゞ
        String msg = "hello, aqua";
        rabbitTemplate.convertAndSend(exchangeName,"aqua",msg);
    }
}

䞊の䟋だず受信者2しかメッセヌゞを受け取れないけど、もしroutingkeyがredなら䞡方ずもメッセヌゞを受け取れるよ。

Topic (トピック)

Topic ExchangeはDirect Exchangeず䌌おいるけど、違いはroutingKeyが耇数の単語のリストで、で区切られおいるこずだね。

キュヌずExchangeでBindingKeyを指定する時に、ワむルドカヌドが䜿えるよ。

  • #1぀以䞊の単語にマッチ
  • *1぀の単語にのみマッチ

受信者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "hello.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("1がメッセヌゞを受信" + msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "hello.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("2がメッセヌゞを受信" + msg);
    }
}

メッセヌゞ送信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class SpringamqpTest {    
    @Test
    public void testTopicExchange(){
        // 亀換機
        String exchangeName = "hello.topic";
        // メッセヌゞ
        String msg = "news for China";
        rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
    }
}

䞊の䟋だず1ず2の䞡方が受け取れるよ。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class SpringamqpTest {    
    @Test
    public void testTopicExchange(){
        // 亀換機
        String exchangeName = "hello.topic";
        // メッセヌゞ
        String msg = "news for Japan";
        rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg);
    }
}

1だけが受け取れるよ。

メッセヌゞ倉換噚

Springは送信するメッセヌゞをバむト列にシリアラむズしおMQに送り、メッセヌゞを受け取る時はバむト列をJavaオブゞェクトにデシリアラむズするんだ。ただ、デフォルトだずSpringが䜿うシリアラむズ方法はJDKシリアラむズで、デヌタサむズが倧きすぎたり、セキュリティ䞊の脆匱性があったり、可読性が䜎かったりするんだよね。

JSON方匏を䜿っおシリアラむズずデシリアラむズをするこずもできるよ。

たずは芪プロゞェクトで䟝存関係を導入しよう。

1
2
3
4
5
6
<!-- JSON倉換噚 -->
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

コンシュヌマヌずレシヌバヌでBeanを宣蚀するだけでいいんだ。

1
2
3
4
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

Visits Since 2025-02-28

Hugo で構築されおいたす。 | テヌマ Stack は Jimmy によっお蚭蚈されおいたす。