관련 나의 블로그 글 | |
분산 메세지 큐 1편 - 읽은 책 | https://jm-baek.tistory.com/325 |
분산 메세지 큐 2편 - 읽은 책 | https://jm-baek.tistory.com/330 |
[Messaging] RabbitMQ 개념편 | https://jm-baek.tistory.com/363 |
[Messaging] RabbitMQ 도입편 | https://jm-baek.tistory.com/358 |
※ 계속 글을 다듬고 있는 중입니다. ※
사용자마다 큐를 무한정 늘릴 수 는 없다.
🔹 개선된 구조
📌 1. 메시지 발행 (Producer)
- RabbitMQ의 mail.queue에 메시지를 보낼 때 userId를 포함
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class MailQueuePublisher {
private final RabbitTemplate rabbitTemplate;
public MailQueuePublisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMail(String userId, String subject, String body) {
Map<String, String> message = Map.of(
"userId", userId,
"subject", subject,
"body", body
);
rabbitTemplate.convertAndSend("mail.queue", message);
}
}
✅ 메시지를 mail.queue에 넣으면서 userId를 포함
✅ 특정 사용자에게 메시지를 보내야 한다는 정보를 담음
📌 2. 메시지 소비 & SSE 전달 (Consumer)
- Consumer에서 메시지를 읽고 해당 userId를 기반으로 SSE 연결된 사용자에게 전달
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class MailQueueConsumer {
private final SseEmitterService sseEmitterService;
private final ObjectMapper objectMapper;
public MailQueueConsumer(SseEmitterService sseEmitterService, ObjectMapper objectMapper) {
this.sseEmitterService = sseEmitterService;
this.objectMapper = objectMapper;
}
@RabbitListener(queues = "mail.queue")
public void consumeMessage(String message) {
try {
// JSON 파싱
Map<String, String> mailData = objectMapper.readValue(message, Map.class);
String userId = mailData.get("userId");
// 특정 사용자에게 SSE 전송
sseEmitterService.sendMessageToUser(userId, "📧 새 이메일 도착: " + mailData.get("subject"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
✅ 모든 메시지는 mail.queue 하나에서 소비
✅ 메시지를 수신한 후, userId에 해당하는 사용자에게만 SSE 전송
📌 3. SSE Emitter 관리 (SseEmitterService)
- 사용자의 SSE 연결을 관리하고 필요할 때 메시지를 전송
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class SseEmitterService {
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter createEmitter(String userId) {
SseEmitter emitter = new SseEmitter(60_000L); // 60초 타임아웃
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onTimeout(() -> emitters.remove(userId));
return emitter;
}
public void sendMessageToUser(String userId, String message) {
SseEmitter emitter = emitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event().data(message));
} catch (IOException e) {
emitters.remove(userId);
}
}
}
}
✅ 사용자가 로그인하면 SSE 연결 생성 (createEmitter)
✅ RabbitMQ 메시지를 수신하면 해당 userId에게 SSE 메시지 전송
✅ 연결 종료되면 자동으로 emitters에서 제거
🎯 최적화된 방식의 장점
사용자별 큐 생성 (Direct Exchange 방식) | 특정 사용자만 메시지를 받을 수 있음 | 큐 개수가 많아질 경우 리소스 낭비 |
단일 큐 + userId 포함 후 Consumer에서 필터링 | 큐 관리를 최소화할 수 있음, 확장성이 뛰어남 | Consumer에서 userId 필터링 작업이 필요함 |
💡 단일 큐(mail.queue)를 사용하고 Consumer에서 userId를 필터링하여 SSE로 전달하는 방식이 더 효율적
💡 RabbitMQ의 리소스를 아낄 수 있고, 대규모 사용자 처리에도 적합
서비스 구상
첫 번째 고민
Q. 메세지 큐를 도입하면 서비스끼리 어떻게 통신을 주고 받을까?
1. 다른 서버에서 직접 메시지를 생성해서 메세지 큐로 보내는 방식
→ 내가 생각한 구조
프로세스 순서
1) 알림이 필요한 서비스는 메세지를 생성하고 메세지 큐로 보낸다.
2) 알림 서비스는 메세지 큐를 소비하고 있는다.
3) 알림 서비스는 메세지에 맞는 특정 기능으로 알림을 전송 한다.
4) 수신 여부와 상관없이 데이터베이스에 해당 알림을 기록한다.
2. 다른 서버가 알림 생성 서버 API에 요청하고, 알림 생성 서버가 메시지 큐로 보내는 방식
→ 카카오 알림 서비스 구성 2020
프로세스
1) 왼쪽에 있는 여러 서비스 서버들이 알림 생성 서버로 요청을 날림
2) 알림 생성 서버는 요청에 따른 메세지를 생성하고
3) 메세지 큐의 장애를 대응하기 위해 DB에도 요청 내용을 저장한다.
4) 알림 조회 서버(?)는 메세지 큐를 받고 DB에 저장한다.
두 번째 고민
Q. 알림 생성하는 로직은 어떻게 작성할까?
1. 구독 서비스 형태
로그인을 하면 Queue가 생성되고 필요한 서비스와 Binding되게 된다.
즉, 동적으로 Queue와 Binding 가 생성 될 수 있다는 얘기이다.
(다른 블로그에서 그렇게 본 것 같다.)
프로세스
1) 사용자가 서비스 A를 통해 특정 서비스를 구독한다.
2) 서비스 A는 사용자의 ID 또는 사번 등 사용자를 나타낼 수 있는 키 값을 메세지 큐로 전달한다.
3) 서비스 B는 키 값으로 Queue와 Binding 생성해서 연결한다.
4) 만약, 구독 취소하면 관련된 Queue와 Binding은 삭제한다.
2.
세 번째 고민
아키텍처
- API 호출 -> 서비스가 RabbitMQ로 메시지를 보내는 방식이 일반적인 시나리오입니다. 대부분의 사용자 애플리케이션은 HTTP API를 통해 서버와 통신하며, 서버는 필요에 따라 RabbitMQ와 같은 메시지 큐 시스템을 사용하여 비동기 처리를 수행합니다.
- 사용자가 처음부터 RabbitMQ로 메시지를 보내는 방식은 특수한 경우에 사용됩니다. 예를 들어, IoT 장치, 마이크로서비스 간의 메시지 전달, 이벤트 소싱 시스템 등에서 사용자가 메시지 큐를 직접 활용하는 경우가 있을 수 있습니다.
- SseEmitter 연결은 API를 통해 클라이언트와 서버가 설정된다.
- **SSE (Server-Sent Events)**를 통해 실시간으로 클라이언트에게 데이터를 보내고 싶은데, 그 데이터를 보내는 방식으로 **메시지 큐 (RabbitMQ)**를 사용하려는 상황.
- 메시지 큐를 이용해 SSE 연결된 클라이언트에게 메시지를 전달하는 방법은 어떻게 해야 하는지 궁금하다.
예시 코드
Library
여러 블로그를 보면 String을 JSON으로 JSON을 자바 Object로 변환하는 과정에서 필요하한 fasterxml 라이브러리를 불러오는데 딱히 그럴 필요가 없다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'com.fasterxml.jackson.core:jackson-databind' // 필요 없다.
Producer
RabbitTemplate 을 사용하는 방법에는 Component와 Configuration 두 가지 방법이 있다.
1. Component 방법
서비스 로직에서 생성자 주입 등을 해서 바로 사용하면 된다.
@Slf4j
@Component
public class RabbitProducer {
private RabbitTemplate rabbitTemplate;
public RabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Message message) {
rabbitTemplate.convertAndSend(message);
}
}
2. Configuration 방법
RabbitTemplate를 Spring Bean으로 등록하는 방법이다.
Component보다는 해당 방법을 추천하고 그 이유는 ChatGPT 님의 말씀이다.
- 의존성 주입과 재사용성: RabbitTemplate을 빈으로 등록하면 Spring 컨테이너가 관리하게 되어, 다른 서비스나 컴포넌트에서 의존성 주입을 통해 재사용할 수 있습니다. 매번 새 인스턴스를 생성할 필요 없이, 일관된 설정을 가진 하나의 RabbitTemplate을 여러 곳에서 사용할 수 있게 됩니다.
- 커스텀 메시지 컨버터 설정: 빈 등록 방식에서는 JSON이나 XML 같은 형식으로 메시지를 주고받을 수 있도록 메시지 컨버터를 설정할 수 있습니다. 예를 들어, JSON 형식의 데이터를 사용한다면 Jackson2JsonMessageConverter를 설정해두면 개발 과정에서 메시지 직렬화/역직렬화를 수동으로 처리할 필요 없이 자동으로 수행됩니다.
- 일관성 있는 설정: 모든 RabbitTemplate 사용 코드에서 같은 설정을 사용하므로, 여러 곳에서 RabbitTemplate을 생성하여 설정할 때 발생할 수 있는 실수나 불일치를 방지할 수 있습니다
@Configuration
public class RabbitMQ {
@Bean
MessageConverter messageConverter() {
return new SimpleMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
그리고 나서 Component로 Producer를 작성한다.
@Slf4j
@Component
public class RabbitProducer {
private RabbitTemplate rabbitTemplate;
public RabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String routingKey, Object obj) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(obj);
rabbitTemplate.convertAndSend("ex.app.event", routingKey, json);
}
}
Queue를 미리 생성해서 Listener에 사용할 수도 있고, Queue를 필요에 따라서 구독 형태로 서비스 계층에서 생성하고 연결할 수 있다.
Consumer
@Configuration
public class RabbitMQConfig {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
// Listener는 두 가지 방법으로 작성이 가능하다.
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag (long):
- 메시지에 대한 고유 식별자입니다. 이를 통해 특정 메시지를 지정하여 Nack 처리를 할 수 있습니다.
- deliveryTag는 채널별로 고유하며, RabbitMQ가 메시지를 소비자에게 전달할 때 함께 제공됩니다.
- multiple (boolean):
- true로 설정하면 deliveryTag 이전의 모든 메시지를 한꺼번에 Nack 처리합니다.
- false로 설정하면 특정 deliveryTag에 해당하는 단일 메시지만 Nack 처리합니다.
- 예를 들어, 여러 개의 메시지를 한꺼번에 처리하다가 오류가 발생했을 때, multiple을 true로 설정하면 특정 메시지부터 이전에 처리된 메시지까지 모두 한 번에 Nack 처리가 가능합니다.
- requeue (boolean):
- true로 설정하면 Nack 처리된 메시지를 큐에 다시 넣어 재처리하게 됩니다.
- false로 설정하면 Nack 처리된 메시지를 재처리하지 않고, 큐에서 삭제하거나 설정된 Dead Letter Queue(DLQ)로 보냅니다.
- 보통 실패한 메시지를 DLQ로 보내거나 다시 재처리할지 여부를 결정할 때 사용됩니다.
Dead Letter Exchange와 Dead Letter Queue 설정 방법
- Dead Letter Exchange 생성: DLQ로 메시지를 보내기 위해 DLX를 생성합니다.
- Dead Letter Queue 생성: DLQ를 별도로 생성합니다.
- DLX를 기존 큐에 연결: 메시지를 Nack 처리하거나 TTL(time-to-live) 초과로 큐에서 제거할 때 DLX로 전송되도록 설정합니다.
시행착오
1. ListenerExecutionFailedException
해결방법
- 생성자와 소비자의 MessageVo의 package 경로를 같도록 만들기
→ package 경로가 이미 정해졌는데 갈아 엎는다고 힘이들었다. - 생성자에서도 Jackson2JsonMessageConverter를 Bean으로 등록
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.***.rabbitMQ.listener.AlarmListener.receiveMessage(com.***.rabbitMQ.vo.MessageVo)]
2. Listener에서 에러가 발생하면메세지를 받을 때까지 보낸다.
고민
- 메세지를 받지 못하면 어떤 로직으로 처리를 해야할까?
해결방법
- Dead Letter Queue(DLQ) 사용하기
- consumer using basic.reject or basic.nack with requeue parameter set to false, or
- 메세지 재 전송
- Ack Nack 사용: RabbitMQ는 메시지 처리 결과에 따라 ACK(성공) 또는 NAK(실패)를 보내는 방식을 지원합니다.
- ACK: 메시지를 정상적으로 처리한 경우.
- NAK: 메시지 처리에 실패한 경우. 이때 메시지를 다시 큐에 넣거나 메시지를 삭제할 수 있습니다.
- @RabbitListener에서 acknowledgeMode를 설정하여 수동으로 ACK/NAK를 처리할 수 있습니다.
- Ack Nack 사용: RabbitMQ는 메시지 처리 결과에 따라 ACK(성공) 또는 NAK(실패)를 보내는 방식을 지원합니다.
참고 사이트
RabbitMQ 프로듀서, 컨슈머 애플리케이션 만들기 (w. Spring boot)
🐰 ☘️ 🐇
velog.io
[RabbitMQ] Jackson2JsonMessageConvertor
이번 글에서는 Spring Boot의 Jackson2JsonMessageConverter를 사용해 손쉽게 Object를 JSON Message Format으로 변경해보겠습니다. 1. Message Converter란? object를 rabbitmq의 message 형식으로 변환해주는 것을 의미합니다.
minholee93.tistory.com
DLQ(Dead Letter Queue)란 무엇인가요? - DLQ(Dead Letter Queue) 설명 - AWS
DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 특수한 유형의 메시지 대기열입니다. 메시지 대기열은 분산 시스템에서 비동기 통신을 지원
aws.amazon.com
[카카오 프로젝트] RabbitMQ 구현
RabbitMQ에서는 메시지 큐로부터 메시지를 받아와 해당 메시지를 처리하는 Worker를 작성할 수 있습니다. 이러한 Worker를 이용하여 이메일 전송 기능을 구현할 수 있습니다. 구체적인 구현 방법은 다
velog.io
'회사 업무 > 기술 도입편' 카테고리의 다른 글
[DevOps] 도커 도입편 (3) | 2024.10.19 |
---|---|
[DevOps] 모니터링 시스템 도입기 with Grafana (0) | 2024.10.19 |
[기획] 설계서? 명세서? 그게 뭔데... (0) | 2024.07.12 |
[Refactoring] 국제화(Internationalization) 수정 (0) | 2024.05.10 |