관련 나의 블로그 글 | |
분산 메세지 큐 1편 - 읽은 책 | https://jm-baek.tistory.com/325 |
분산 메세지 큐 2편 - 읽은 책 | https://jm-baek.tistory.com/330 |
[Messaging] RabbitMQ 개념편 | https://jm-baek.tistory.com/363 |
[Messaging] RabbitMQ 도입 구상편 | https://jm-baek.tistory.com/358 |
✅ [Messaging] RabbitMQ 도입편 | https://jm-baek.tistory.com/392 |
혼자 고민하고 정리해서 서툴고 틀린 방향이 있을 수 있습니다.
좋은 충고와 질문은 어제든 감사드립니다.
🙇♂️ 빠른 시일 내에 작성을 하도록 하겠습니다.....🙇♂️
예시 코드
Library
여러 블로그를 보면 String을 JSON으로 JSON을 자바 Object로 변환하는 과정에서 필요하한 fasterxml 라이브러리를 불러오는데 딱히 그럴 필요가 없다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'com.fasterxml.jackson.core:jackson-databind' // 필요 없다.
Producer
RabbitTemplate 을 사용하는 방법에는 Component와 Configuration 두 가지 방법이 있다.
1. Component 방법
서비스 로직에서 생성자 주입 등을 해서 바로 사용하면 된다.
@Slf4j
@Component
public class RabbitProducer {
private RabbitTemplate rabbitTemplate;
public RabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Message message) {
rabbitTemplate.convertAndSend(message);
}
}
2. Configuration 방법
RabbitTemplate를 Spring Bean으로 등록하는 방법이다.
Component보다는 해당 방법을 추천하고 그 이유는 ChatGPT 님의 말씀이다.
- 의존성 주입과 재사용성: RabbitTemplate을 빈으로 등록하면 Spring 컨테이너가 관리하게 되어, 다른 서비스나 컴포넌트에서 의존성 주입을 통해 재사용할 수 있습니다. 매번 새 인스턴스를 생성할 필요 없이, 일관된 설정을 가진 하나의 RabbitTemplate을 여러 곳에서 사용할 수 있게 됩니다.
- 커스텀 메시지 컨버터 설정: 빈 등록 방식에서는 JSON이나 XML 같은 형식으로 메시지를 주고받을 수 있도록 메시지 컨버터를 설정할 수 있습니다. 예를 들어, JSON 형식의 데이터를 사용한다면 Jackson2JsonMessageConverter를 설정해두면 개발 과정에서 메시지 직렬화/역직렬화를 수동으로 처리할 필요 없이 자동으로 수행됩니다.
- 일관성 있는 설정: 모든 RabbitTemplate 사용 코드에서 같은 설정을 사용하므로, 여러 곳에서 RabbitTemplate을 생성하여 설정할 때 발생할 수 있는 실수나 불일치를 방지할 수 있습니다
@Configuration
public class RabbitMQ {
@Bean
MessageConverter messageConverter() {
return new SimpleMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
그리고 나서 Component로 Producer를 작성한다.
@Slf4j
@Component
public class RabbitProducer {
private RabbitTemplate rabbitTemplate;
public RabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String routingKey, Object obj) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(obj);
rabbitTemplate.convertAndSend("ex.app.event", routingKey, json);
}
}
Queue를 미리 생성해서 Listener에 사용할 수도 있고, Queue를 필요에 따라서 구독 형태로 서비스 계층에서 생성하고 연결할 수 있다.
Consumer
@Configuration
public class RabbitMQConfig {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
// Listener는 두 가지 방법으로 작성이 가능하다.
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag (long):
- 메시지에 대한 고유 식별자입니다. 이를 통해 특정 메시지를 지정하여 Nack 처리를 할 수 있습니다.
- deliveryTag는 채널별로 고유하며, RabbitMQ가 메시지를 소비자에게 전달할 때 함께 제공됩니다.
- multiple (boolean):
- true로 설정하면 deliveryTag 이전의 모든 메시지를 한꺼번에 Nack 처리합니다.
- false로 설정하면 특정 deliveryTag에 해당하는 단일 메시지만 Nack 처리합니다.
- 예를 들어, 여러 개의 메시지를 한꺼번에 처리하다가 오류가 발생했을 때, multiple을 true로 설정하면 특정 메시지부터 이전에 처리된 메시지까지 모두 한 번에 Nack 처리가 가능합니다.
- requeue (boolean):
- true로 설정하면 Nack 처리된 메시지를 큐에 다시 넣어 재처리하게 됩니다.
- false로 설정하면 Nack 처리된 메시지를 재처리하지 않고, 큐에서 삭제하거나 설정된 Dead Letter Queue(DLQ)로 보냅니다.
- 보통 실패한 메시지를 DLQ로 보내거나 다시 재처리할지 여부를 결정할 때 사용됩니다.
Dead Letter Exchange와 Dead Letter Queue 설정 방법
- Dead Letter Exchange 생성: DLQ로 메시지를 보내기 위해 DLX를 생성합니다.
- Dead Letter Queue 생성: DLQ를 별도로 생성합니다.
- DLX를 기존 큐에 연결: 메시지를 Nack 처리하거나 TTL(time-to-live) 초과로 큐에서 제거할 때 DLX로 전송되도록 설정합니다.
시행착오
1. ListenerExecutionFailedException
해결방법
- 생성자와 소비자의 MessageVo의 package 경로를 같도록 만들기
→ package 경로가 이미 정해졌는데 갈아 엎는다고 힘이들었다. - 생성자에서도 Jackson2JsonMessageConverter를 Bean으로 등록
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.***.rabbitMQ.listener.AlarmListener.receiveMessage(com.***.rabbitMQ.vo.MessageVo)]
2. Listener에서 에러가 발생하면메세지를 받을 때까지 보낸다.
고민
- 메세지를 받지 못하면 어떤 로직으로 처리를 해야할까?
해결방법
- Dead Letter Queue(DLQ) 사용하기
- consumer using basic.reject or basic.nack with requeue parameter set to false, or
- 메세지 재 전송
- Ack Nack 사용: RabbitMQ는 메시지 처리 결과에 따라 ACK(성공) 또는 NAK(실패)를 보내는 방식을 지원합니다.
- ACK: 메시지를 정상적으로 처리한 경우.
- NAK: 메시지 처리에 실패한 경우. 이때 메시지를 다시 큐에 넣거나 메시지를 삭제할 수 있습니다.
- @RabbitListener에서 acknowledgeMode를 설정하여 수동으로 ACK/NAK를 처리할 수 있습니다.
- Ack Nack 사용: RabbitMQ는 메시지 처리 결과에 따라 ACK(성공) 또는 NAK(실패)를 보내는 방식을 지원합니다.
참고 사이트
RabbitMQ: One broker to queue them all | RabbitMQ
Why RabbitMQ? RabbitMQ is a reliable and mature messaging and streaming broker, which is easy to deploy on cloud environments, on-premises, and on your local machine. It is currently used by millions worldwide.
www.rabbitmq.com
'프레임워크 > Kafka & RabbitMQ' 카테고리의 다른 글
[Messaging] RabbitMQ 도입 구상편 (0) | 2025.03.18 |
---|---|
[Messaging] RabbitMQ 개념편 (0) | 2025.03.14 |