RabbitMQ shovel plugin을 활용해서 Queue로 들어오는 메시지들을 다른 Queue로 미러링을 구성해 실 서비스에서 흘러가는 메시지들을 트레이싱하는 방법을 알아본다.

파이썬의 분산 태스크 프레임워크인 Celery를 이용하여 간단한 Message Broker - Worker 구성을 만들어 보고, 그 사이에 일어나는 메시지들을 트레이싱 해 보는 예제를 작성해 보고, 사용성을 검증해 본다.

해당 포스트에서는 RabbitMQ를 HA나 Federation을 구성하는 방법은 다루지 않는다.

HA 클러스터를 구성하는 것 과는 별개로 MQ의 컨센서스는 맞추지 않고, Shovel 플러그인을 이용하여 필요한 메시지를 미러링 할 수 있는 방안을 고민해 본 내용을 중점적으로 다룬다.

미러링은 왜 하나요?

메시지 미러링은 다양한 용도로 사용될 수 있다. 실제 서비스에서 흘러가는 메시지들의 흐름을 방해하지 않고 디버깅 한다거나, 모니터링을 위해 수집해 본다거나 하는 등의 운영성, 관측성을 좀 더 높이기 위해 사용될 수 있고, 동작하고 있는 비즈니스 워크플로우 구성을 최대한 변경하지 않으면서 다른 컨텍스트의 이슈를 처리하기 위해서도 용이하게 사용할 수 있다.

Case. 기본적인 브로커-워커 구성

대개 Python 생태계에서 비동기 작업들을 처리할 때는 Celery를 이용하게 되는데, RabbitMQ를 이용하게 될 경우, 간단하게는 아래와 같은 형상을 가지게 된다.


           +--- amqp:// --------------------------+
           |                                      |
message ----> exchange -- <routing-key> --> queue --> consumer
           |  <topic>                             |   <worker>
           |                                      |
           +--------------------------------------+

RabbitMQ로 메시지가 보내지면 내부적으로 Exchange를 거쳐, Queue로 메시지가 전달되고 Queue에 연결된 Celery Worker가 메시지를 받아서 작업을 수행하게 된다.

우리도 너네 메시지를 받아보고 싶다

기본적인 비즈니스 워크플로우가 워커 브로커 형태가 되었을 때, 다른 팀이나 다른 서비스에서 해당 메시지를 볼 경우가 생길 수 있다. (bounded-context)

Fanout Exchange를 이용하기


           +--- amqp:// --------------------------+
           |                                      |
message ----> exchange ---------------+---> queue --> consumer
           |  <fanout>    ignore       \          |
           |              routing-key   +-> queue --> others
           |                                      |
           +--------------------------------------+

이 경우 RabbitMQ에서는 메시지를 받을 Exchangefanout모드로 만들고, 다른 서비스에서도 메시지를 받을 수 있게 컨슈머로 들어올 수 있다.


           +--- amqp:// --------------------------+
           |                                      |
message ----> exchange ---------------+---> queue --> consumer
           |  <fanout>    ignore       \          |
           |              routing-key   +-> queue --> others
           |                            +-> queue --> others
           |                            +-> ...   --> ...
           |                                      |
           +--------------------------------------+

            >> 외부 컨슈머로 인해 MQ 노드의 부하를
               예측할 수 없는 상황이 발생한다.

하지만 Exchange에 많은 컨슈머들이 붙게 될 경우, RabbitMQ 노드에 부하가 많이 걸릴 수 있고, 이 경우 HA 구성이 필요하게 된다.

Shovel Plugin을 이용해 외부 컨슈머들을 위한 미러 MQ 구성

RabbitMQ의 Shovel 플러그인은 특정 토픽으로 들어오는 메시지를 다른 MQ로 옮겨담을 수 있는 기능을 제공하는데 이를 이용하여 유입되는 메시지의 스펙을 지킬 수 있고, 외부 MQ에서 shovel을 통해 원하는 메시지를 트레이싱 해 올 수 있게 된다.

  • AMQP의 스펙 상 ExchangeFanout 타입이 되어버리면, 기존에 routing-key를 가지고 들어오는 메시지들의 라우팅을 보장해 줄 수 없게 된다.
  • Topic Exchange는 그대로 두면서, 기존 Consumer로 보내고 Shovel에게도 전달하기 위한 Fanout Exchange를 중간에 만들고 내부에 Shovel이 Topic -> Fanout 으로 전달하는 형태를 구성해서 사용한다.
  • 기존 MQ의 메시지 TPS는 조금 느려질 수 있지만, 예측하기 힘든 외부 컨슈머들로부터의 부작용은 최소화 할 수 있다.

           +--- amqp://original -------------------+
           |                                       |
message ----> exchange --> X             +-> queue --> worker
           |  <topic>                    |         |
           |    |                        |         |
           |    +-> shovel --> exchange -+         |
           |                   <fanout>  |         |
           |                             +-> queue --+
           |                                       | |
           +---------------------------------------+ |
                                                     |
            >> Minimize side-effects                 |
               from external consumers               |
                                                     |
        +--------------------------------------------+
        |
        |  +--- amqp://mirror ------------------+
        |  |                                    |
        +---> shovel --> exchange -+----> queue --> another
           |             <fanout>   \           |   worker
           |                         \          |
           |                          +-> queue --> another
           |                                    |   API
           +------------------------------------+

            >> Manage external consumer
               with mirror MQ

(한글로 인해 ascii 도형이 깨지는 경우를 방지한다.. ;)

위 도식은 기존 서비스의 MQ에 Shovel과 중간에 Fanout Exchange를 구성하고, 외부 소비자를 위한 미러 MQ를 별도로 둔 상황을 표현한 것이다.

기존 MQ 노드(또는 클러스터)의 가용성을 최대한 보장함과 동시에, 외부 MQ의 메시지 부하는 따로 분리해서 문제를 해결할 수 있게 된다.

Demo Service

https://github.com/Ashon/_study-rabbitmq_shovel 에 PoC 프로젝트 코드들을 작성해 보았다.

테스트에 사용된 서비스 구성은 docker-compose.yml에 정리되어 있다. 아래는 테스트에 사용된 서비스들의 간단한 구성도.


                 +- generator -----+
                 |                 |
+-- send --------| ./flow_tasks.py |
|   messages     |                 |
|                +-----------------+
|
|  +- rabbitmq --------------------+
|  |                               |
+->| :5672 - mq port               |----+- consume -----+
   |                               |    |  messages     |
   | :15672 - management port      |    |               V
   |                               |    |        +-- worker --+
   +-------------------------------+    |        | +-- worker --+
                                        |        | |            |
                                     trace       | | worker.py  |
                                     messages    +-|            |
                                        |          +------------+
   +- external mq -----------------+    |
   |                               |<---+
   | :5672 - mq port               |             +-- monitor ---+
   | :15672 - management port      |             |              |
   | :15674 - websocket (webstomp) |<------------| :8080 - web  |
   |                               |             |              |
   +-------------------------------+             +--------------+

flow_tasks.py 는 RabbitMQ로 Celery Task를 계속 트리거링 하는 스크립트이고, worker.py는 Celery Worker 코드인데, MQ로 들어오는 태스크 요청들을 받아 작업을 수행한다.

external MQ 에서는 기존에 사용 중인 MQ의 메시지를 미러링하여, 흘러가는 메시지들을 이용해 다른 서비스에 이용하거나, 모니터링 할 수 있도록 구성한다.

monitor는 Web browser에서 STOMP 프로토콜을 이용해, 미러링되는 MQ의 메시지들을 웹 브라우저로 스트리밍해서 모니터링 할 수 있게 만들어 보았다.

구성된 RabbitMQ에 필요한 플러그인들을 활성화 한다.

  • rabbitmq_shovel
  • rabbitmq_shovel_management

예제에서는 rabbitmq_web_stomp 플러그인도 추가하였는데, 실시간으로 오가는 메시지를 웹 소켓을 이용해 캡쳐해 보기 위해 추가하였다.

Conclusion

메시지 브로커로 RabbitMQ를 사용하는 실 환경에서 레거시 환경을 최대한 수정하지 않고, 새로운 서비스를 추가하여 부가적인 로직들을 수행할 수 있도록 하는 방법을 알아보았다.

실제로 Shovel을 설정 하였을 때 퍼포먼스 저하가 일어나긴 할 텐데, 얼마나 발생할 지는 잘 모르겠지만, 그 부분에 대해서는 MQ 클러스터의 scale-up(또는 -out)으로 충분히 소화 가능하다고 생각함.

Shovel을 이용하면 scale-out 문제를 내부 문제와 외부 문제로 분리해서 생각해 볼 수 있는 여지도 제공해 준다고 생각한다.

with Openstack

Openstack의 경우, 각 컴포넌트간 메시지 브로커로 rabbitMQ를 사용하고 있고, 흘러가는 메시지들을 모니터링하고 디버깅하기 위한 stacktach라는 프로젝트가 있지만, stacktach에서 사용되는 워커는 Celery만큼 견고하지 못하고, 실제로는 유실되는 메시지도 발생했었다.

shovel로 federation을 구성해서 openstack과 비즈니스의 bounded-context를 잘 해결할 수 있을 것 같다는 생각이 들었음.

아무튼 비단 Openstack뿐만 아니라 RabbitMQ를 사용하는 환경이라면, 충분히 비슷한 상황에 대한 해결책이 될 수 있지 않을까 한다.

References