교육/[온라인] KOSTA EDU

[KOSTA] 스프링 부트로 구현하는 메세징 시스템(RabbitMQ)

코드몬스터 2024. 5. 24. 09:58
728x90

 

본 내용은 Mac OS 기준으로 설정되었습니다.

 

개발환경

  • RabbitMQ
  • Docker
  • MySQL, MySQL workbench
  • VSCode
  • IntelliJ IDEA

설치방법

Docker

MySQL

  • brew install mysql
    • ⚡️ root계정이 비밀번호 없이 설치가 된다.
    • Ver 8.3.0 for macos14.2 on arm64

RabbitMQ

  • brew info rabbitmq
  • docker
    • # latest RabbitMQ 3.13
    • docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
      ⇒ ‼️ 터미널에서 실행되는 명령어 이다.
      (따로 다운이 없다.)
    • 5672 포트는 클라이언트 접속용이며 15672는 웹 관리 콘솔용이다.
    • https://www.rabbitmq.com/docs/download

RabbitMQ

이걸 왜 써야하는데? 에 대해 알아보자

  • 오픈 소스 메시지 브로커 소프트웨어이다.
    • AMQP(Advanced Message Queuing Protocol)를 구현했다.
    • 그 후 STOMP, MQTT를 지원하는 플러그인 아키텍처로 확장되었다.
    • Erlang(얼랑)이라는 프로그래밍 언어로 개발되었다.

 

AMQP Protocol - Message Queue System 장점 & 특징 (사용 이유)

  • 비동기 통신이므로 디커플링을 얻는다.
    • Producer와 Consumer가 독립적으로 작동될 수 있다.
  • 큐를 수평하게 스케일링 할 수 있기에 확장에 유연하다.
    • 여러 Consumer가 동시에 메시지를 처리할 수 있다.
  • 작업 수행 소요 시간에 구애받지 않는다.
    • 동기적으로 결과를 리턴해야하지 않기(비동기)에 오래 수행될 작업도 독립적으로 수행가능하다.
  • 메시지 수신의 신뢰성이 높다.
    • 메시지를 성공적으로 처리할 때까지 유지할 수 있다.
  • Cross-Platform & 프로그래밍 언어에 독립적이다.
    • 플랫폼과 언어를 다양하게 사용할 수 있다.

 

AMQP Protocol  - 개요

  • 메시지 지향 미들웨어 시스템 사이에 메시지 전송을 위한 개방형 표준 프로토콜이다.
    • 비동기 메시지 송수신에 대한 표준이다.
    • Publisher는 메시지 생성 역할이며, Consumer는 메시지를 수신한다.
    • 애플리케이션 레이어의 프로토콜이다.  ⇒ TCP/IP에 기반한 프로토콜이다.
    • 프로그래밍 언어와 플랫폼에 독립적이다.
    • routing key, Exchange, binding 등의 단어를 알아야 한다.

 

 

 

AMQP Protocol - Message Format

Message Foramt

 

AMQP Protocol - Queuing Model

  • Producer가 전송한 메세지는 Exchange로 게재 (publish) 된다.
  • Exchange는 Binding 규칙에 따라 Queue로 전송(route) 한다.
  • Queue의 등록된 수신자(Consumer)는 메시지를 수신(Consume) 한다.

Queuing Model

 


RabbitMQ Core Concept 

RabbitMQ Core Concept - Connection

  • Connection은 TCP 상위의 애플리케이션 레이어 프로토콜이다.
  • Connection은 클라이언트와 서버의 연결 통로가 된다.
  • 하나의 Connection에서 여러  Channel을 생성하여 메세지 송수신한다.

 

  • Maven 라이브러리
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.21.0</version>
</dependency>

 

  • Java Connection 생성
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();

 

 

RabbitMQ Core Concept - Channel

  • 단일 Connection에 여러 Channel을 생성하여 메시지를 송수신한다.
  • Channel은 Connection Context에만 존재하기 때문에  Connection이 닫히면, 연결된 모든 Channel도 닫힌다.
  • Channel 안에 연결할 Queue를 선언할 수 있으며,  Channel 하나당 하나의 Queue만 선언이 가능하다.
  • 특정 Channel의 통신은 다른 Channel의 통신과 완전히 분리되어 있기 때문에 프로토콜은 Channel ID와 같은 식별자를 포함시켜 전달한다.

 

public static void main(String[] args) throws Exception {

    String msg = "Hello MQ Broker";
	
    // connection 연결
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    
    // channel 채널
    Channel channel = connection.createChannel();
    channel.queueDeclare("myQ", true, false,  false, null);
    channel.basicPublish("", "myQ", null, msg.getBytes(java.nio.charset.StandardCharsets.UTF_8));
  
    // connection 연결 종료
    System.out.println("sent " + msg);
    connection.close();

}

 

 

RabbitMQ Core Concept - Channel Life Cycle

  • 클라이언트는 Connection을 생성한 후, Channel을 열 수 있다.
  • Open 상태
    • 새 Channel을 열게 되면, 자동으로 Channel ID가 할당된다.
    • Connection과 마찬가지로 Channel은 지속적으로 열린 상태여야 한다.
    • 작업마다 Channel을 여는 작업은 네트워크 왕복의 비용이 들기 때문에 매우 비효울적이다.
  • Closed 상태
    • Channel이 더 이상 필요하지 않으면 닫아야 한다.
    • Channel을 닫으면 Channel을 사용할 수 없게 되고, 해당 리소스를 회수하도록 예약하는 방식으로 처리된다.

 

RabbitMQ Core Concept - Queue(1)

  • 큐는 선형 데이터 구조라고 정의할 수 있다.
    • 양쪽이 열려있는 list이다.
    • FIFO(First In First Out)로 작동된다.
  • 중요 오소
    • 큐 사이즈 - 큐에 들어오온 메세지 숫자
    • 큐 대기시간 - 가장 오래 처리되지 않고 있는 메세지 대기 시간

 

RabbitMQ Core Concept - Queue(2)

  • ExchangeProducer로부터 전달 받은 메시지를 Binding된 Queue들에게 동일하게 전달한다.
  • Queue는 전달받은 메시지들을 Consumer들에게 전달한다. 공평하게 전달하기 위해 Round-Robing 스케쥴로 메시지를 전달한다.
  • Queue는 메시지를 Consumer들에게 전달되기 전에 메모리나 디스크에 저장한다.
  • Queue의 속성

 

 


Exchange

Exchange - 개요

  • Exchange는 Producer로부터 받은 Message를 Queue에게 전달한다.
  • 4가지 타입이 있다.
    • Fanout
      • 묶어진 Queue에 모두 다 보내버린다.
    • Direct
    • Topic
    • Headers
  • 4가지 타입들은 Producer에게 전달 받은 메시지를 어떤 Queue에게 전송할지를 결정한다.
  • Echange Type과 바인딩 키와 헤더 등으로 어느 Queue로 메시지가 전송될지가 결정된다.

 

Exchange - FanOut

  • Exhange와 Binding된 모든 Queue에게 동일한 메세지를 보낸다.
  • 전체 메시지 전송이다.
public static void main(String[] args) throws Exception {

    // connection 연결
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    
    // channel 채널
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("amq.fanout", "fanout", true);
    
    String message = "Hello RabbitMQ";
    channel.basicPublish("amq.fanout", "", null, message.getBytes(StandardCharsets.UTF_8));
  
    // connection 연결 종료
    System.out.println("[x] Sent '" + message + "'");

}

Exchange - Direct

  • Routing Key로 바인딩된 큐에 메세지를 전송한다.
    • Routing Key를 통해 Queue와 직접 Binding 할 수 있다.
  • RabbitMQ에서 사용되는 디폴트 Exchange는 Direct이다.
  • RabbitMQ에서 생성되는 Queue가 자동으로 Binding되고, 이 대는 각 Queue의 이름이 Routing Key로 지정된다.
  • 하나의 Queue에 여러 개의 Routing Key를 지정할 수 있다.
  • 여러 Queue에 동일한 Routing key를 지정할 수 있다.

Direct Exchange
Multiple Bindings

public static void main(String[] args) throws Exception {

    // connection 연결
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    
    // channel 채널
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("amq.dircet", "direct", true); // 여기 수정
    
    String message = "Hello RabbitMQ direct";
    channel.basicPublish("amq.direct", "hope", null, message.getBytes(StandardCharsets.UTF_8));
  
    // connection 연결 종료
    System.out.println("[x] Sent '" + message + "'");

}

Exchange - Topic

  • Routing Key 패턴이 일치하는 Queue에 메세지를 전달한다.
  • Direct는 Routing Key가 완전 일치해야 메세지를 전달할 수 있으나, Topic은 패턴을 정해서 Binding 규칠을 정의하기 때문에 Direct방식보다는 유연하게 메세지를 전송할 수 있다.
  • 적용 규칙
    • *: 단어 1개를 대체
    • #: 0개 이상의 단어를 대체(없거나 하나 이상의 단어를 의미)
  • 어느 Queue에 Routing Key가 두 개 잇으며 패턴이 모두 일치하는 경우, 메세지 전달이 2번 되는 것이 아니라 Queu에 한 번만 전달된다.
  • 하나의 Queue에 N개의 Routing Key가 있고, 패턴도 모두 일치하더라도 한 번만 전달된다.

Topic Exchange

 

public static void main(String[] args) throws Exception {

    // connection 연결
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    
    // channel 채널
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("amq.topic", "topic", true); // 여기 수정
    
    String message = "Hello RabbitMQ com.edu.fourth";
    channel.basicPublish("amq.topic", "com.edu.fourth", null, message.getBytes(StandardCharsets.UTF_8));
    
    String message = "Hello RabbitMQ web.edu.first";
    channel.basicPublish("amq.topic", "web.edu.first", null, message.getBytes(StandardCharsets.UTF_8));
    
    String message = "Hello RabbitMQ kor.edu.kosta.mq";
    channel.basicPublish("amq.topic", "kor.edu.kosta.mq", null, message.getBytes(StandardCharsets.UTF_8));
    
    // connection 연결 종료
    System.out.println("[x] Sent '" + message + "'");

}

Exchange - Headers

  • key-value로 정의된 헤더에 의해 메세지를 Queue에 전달하는 방법이다.
  • 메세지를 전달하는 Producer에서 정의하는 Header의 key-value와 메세지를 받은 Consumer쪽에서 정의된 Argument의 key-value가 일치하면 Binding 된다.
  • Producer에서 정의하는 Header는 메세지와 함게 전송되고, 
  • Consumer쪽에서는 Exchange와 Queue가 Binding되는 시점에 Argument로 정의한다.
  • Header에는 x-match라는 key가 있다.
    • 키값으로 any와 all이 있다.
    • x-match: all
      •  Header의 key-value와 Argument의 key-value가 정확히 일치해야 Binding된다.
    • x-match: any
      • Producer가 전송하는 header의 key-value 값과 Argument의 key-value 값 중 하나라도 일치하는 것이 있으면 binding 된다.

Header Exchanges


RabbitMQ with Spring Boot

 

작성 중입니다.

 


참고 사이트