프레임워크/Kafka & RabbitMQ

[Messaging] RabbitMQ 도입편

코드몬스터 2025. 3. 18. 10:21
728x90

 

RabbitMQ

 

관련 나의 블로그 글
분산 메세지 큐 1편 - 읽은 책 https://jm-baek.tistory.com/325
분산 메세지 큐 2편 - 읽은 책 https://jm-baek.tistory.com/330
[Messaging] RabbitMQ 개념편 https://jm-baek.tistory.com/363
[Messaging] RabbitMQ 도입 구상편 https://jm-baek.tistory.com/358
[Messaging] RabbitMQ 도입편 https://jm-baek.tistory.com/392

 

 

 

혼자 고민하고 정리해서 서툴고 틀린 방향이 있을 수 있습니다.

좋은 충고와 질문은 어제든 감사드립니다.

 

 

🙇‍♂️ 빠른 시일 내에 작성을 하도록 하겠습니다.....🙇‍♂️

 

 


 


예시 코드

Library

여러 블로그를 보면 String을 JSON으로 JSON을 자바 Object로 변환하는 과정에서 필요하한 fasterxml 라이브러리를 불러오는데 딱히 그럴 필요가 없다.

implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'com.fasterxml.jackson.core:jackson-databind' // 필요 없다.

Producer

RabbitTemplate 을 사용하는 방법에는 Component와 Configuration 두 가지 방법이 있다.

 

1. Component 방법 

서비스 로직에서 생성자 주입 등을 해서 바로 사용하면 된다.

@Slf4j
@Component
public class RabbitProducer {

    private RabbitTemplate rabbitTemplate;

    public RabbitProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Message message) {
        rabbitTemplate.convertAndSend(message);
    }
}

 

2. Configuration 방법

RabbitTemplate를 Spring Bean으로 등록하는 방법이다.

 

Component보다는 해당 방법을 추천하고 그 이유는 ChatGPT 님의 말씀이다.

 

  • 의존성 주입과 재사용성: RabbitTemplate을 빈으로 등록하면 Spring 컨테이너가 관리하게 되어, 다른 서비스나 컴포넌트에서 의존성 주입을 통해 재사용할 수 있습니다. 매번 새 인스턴스를 생성할 필요 없이, 일관된 설정을 가진 하나의 RabbitTemplate을 여러 곳에서 사용할 수 있게 됩니다.
  • 커스텀 메시지 컨버터 설정: 빈 등록 방식에서는 JSON이나 XML 같은 형식으로 메시지를 주고받을 수 있도록 메시지 컨버터를 설정할 수 있습니다. 예를 들어, JSON 형식의 데이터를 사용한다면 Jackson2JsonMessageConverter를 설정해두면 개발 과정에서 메시지 직렬화/역직렬화를 수동으로 처리할 필요 없이 자동으로 수행됩니다.
  • 일관성 있는 설정: 모든 RabbitTemplate 사용 코드에서 같은 설정을 사용하므로, 여러 곳에서 RabbitTemplate을 생성하여 설정할 때 발생할 수 있는 실수나 불일치를 방지할 수 있습니다
@Configuration
public class RabbitMQ {

    @Bean
    MessageConverter messageConverter() {
        return new SimpleMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

}

 

그리고 나서 Component로 Producer를 작성한다.

@Slf4j
@Component
public class RabbitProducer {

    private RabbitTemplate rabbitTemplate;

    public RabbitProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String routingKey, Object obj) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(obj);

        rabbitTemplate.convertAndSend("ex.app.event", routingKey, json);
    }
    
}

 

 

Queue를 미리 생성해서 Listener에 사용할 수도 있고, Queue를 필요에 따라서 구독 형태로 서비스 계층에서 생성하고 연결할 수 있다.

 

Consumer

@Configuration
public class RabbitMQConfig {

    static final String topicExchangeName = "spring-boot-exchange";

    static final String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }
    
    
    // Listener는 두 가지 방법으로 작성이 가능하다.
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }



}

 

 

channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag (long):
    • 메시지에 대한 고유 식별자입니다. 이를 통해 특정 메시지를 지정하여 Nack 처리를 할 수 있습니다.
    • deliveryTag는 채널별로 고유하며, RabbitMQ가 메시지를 소비자에게 전달할 때 함께 제공됩니다.
  • multiple (boolean):
    • true로 설정하면 deliveryTag 이전의 모든 메시지를 한꺼번에 Nack 처리합니다.
    • false로 설정하면 특정 deliveryTag에 해당하는 단일 메시지만 Nack 처리합니다.
    • 예를 들어, 여러 개의 메시지를 한꺼번에 처리하다가 오류가 발생했을 때, multiple을 true로 설정하면 특정 메시지부터 이전에 처리된 메시지까지 모두 한 번에 Nack 처리가 가능합니다.
  • requeue (boolean):
    • true로 설정하면 Nack 처리된 메시지를 큐에 다시 넣어 재처리하게 됩니다.
    • false로 설정하면 Nack 처리된 메시지를 재처리하지 않고, 큐에서 삭제하거나 설정된 Dead Letter Queue(DLQ)로 보냅니다.
    • 보통 실패한 메시지를 DLQ로 보내거나 다시 재처리할지 여부를 결정할 때 사용됩니다.

 

Dead Letter Exchange Dead Letter Queue 설정 방법

  1. Dead Letter Exchange 생성: DLQ로 메시지를 보내기 위해 DLX를 생성합니다.
  2. Dead Letter Queue 생성: DLQ를 별도로 생성합니다.
  3. DLX를 기존 큐에 연결: 메시지를 Nack 처리하거나 TTL(time-to-live) 초과로 큐에서 제거할 때 DLX로 전송되도록 설정합니다.

 

시행착오

1. ListenerExecutionFailedException

 

해결방법

  • 생성자와 소비자의 MessageVo의 package 경로를 같도록 만들기
    → package 경로가 이미 정해졌는데 갈아 엎는다고 힘이들었다.
  • 생성자에서도 Jackson2JsonMessageConverter를 Bean으로 등록
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.***.rabbitMQ.listener.AlarmListener.receiveMessage(com.***.rabbitMQ.vo.MessageVo)]

 

 

2. Listener에서 에러가 발생하면메세지를 받을 때까지 보낸다.

 

고민

  • 메세지를 받지 못하면 어떤 로직으로 처리를 해야할까?

해결방법

  • Dead Letter Queue(DLQ) 사용하기
    • consumer using basic.reject or basic.nack with requeue parameter set to false, or
  • 메세지 재 전송
    • Ack Nack 사용: RabbitMQ는 메시지 처리 결과에 따라 ACK(성공) 또는 NAK(실패)를 보내는 방식을 지원합니다.
      • ACK: 메시지를 정상적으로 처리한 경우.
      • NAK: 메시지 처리에 실패한 경우. 이때 메시지를 다시 큐에 넣거나 메시지를 삭제할 수 있습니다.
      • @RabbitListener에서 acknowledgeMode를 설정하여 수동으로 ACK/NAK를 처리할 수 있습니다.

 

 

 

 


참고 사이트

 

RabbitMQ: One broker to queue them all | RabbitMQ

Why RabbitMQ? RabbitMQ is a reliable and mature messaging and streaming broker, which is easy to deploy on cloud environments, on-premises, and on your local machine. It is currently used by millions worldwide.

www.rabbitmq.com

 

 

'프레임워크 > Kafka & RabbitMQ' 카테고리의 다른 글

[Messaging] RabbitMQ 도입 구상편  (0) 2025.03.18
[Messaging] RabbitMQ 개념편  (0) 2025.03.14