RabbitMQ Routing

В данном практическом занятии различные способы маршрутизации сообщений в RabbitMQ.

Vagrant

Для работы с rabbitmq воспользуемся следующим Vagrantfile:

Vagrant.configure("2") do |config|
  config.vm.define "node" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "node"
    c.vm.network "forwarded_port", guest: 15672, host: 15672
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq python3-pip rabbitmq-server
      rabbitmq-plugins enable rabbitmq_management
      rabbitmqctl add_user admin admin
      rabbitmqctl set_user_tags admin administrator
      rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      pip3 install pika --break-system-packages
    SHELL
  end
end

После развертывания виртуальной машины на ней будет находиться сервер rabbitmq, в нем создастся пользователь admin и наружу будет выставлен веб интерфейс, а также установится библиотека для взаимодействия из языка python - pika.

Direct Exchange

В прошлом практическом занятии использовался exchange типа fanout, который позволял отправлять сообщения сразу всем получателям. Теперь же попробуем определить для каждого получателя свои сообщения с помощью exchange типа direct. Реализуем на примере отправки логов разной важности - info, warning и error. Для этого создадим скрипт send_log.py для отправки сообщения, который будет использовать exchange типа direct, а в качестве routing key - важность сообщения.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()

Данный скрипт позволяет отправлять сообщения, указывая первым аргументом его важность.

Для получения сообщений создадим еще один скрипт receive_log.py, который будет определять очереди и связывать их с exchange с помощью routing key.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Данный скрипт позволяет создать очередь и с помощью аргументов запуска связать ее с exchange по нескольким уровням важности сообщений, которые необходимо получать, а также регистрирует callback, который выводит сообщения на экран.

Запустим в паре терминалов второй скрипт, который будет получать сообщения с разными уровнями важности, а в отдельном терминале скрипт с отправкой сообщений:

$ python3 receive_log.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] info:b'test info message'
$ python3 receive_log.py info error warning
 [*] Waiting for logs. To exit press CTRL+C
 [x] info:b'test info message'
 [x] warning:b'test warning message'
 [x] error:b'test error message'
$ python3 send_log.py info 'test info message'
 [x] Sent info:test info message
$ python3 send_log.py warning 'test warning message'
 [x] Sent warning:test warning message
$ python3 send_log.py error 'test error message'
 [x] Sent error:test error message

При запущенных скриптах receive_log.py посмотрим информацию с помощью rabbitmqctl:

$ sudo rabbitmqctl list_exchanges | grep direct_logs
direct_logs     direct
$ sudo rabbitmqctl list_queues --quiet
name    messages
amq.gen-oT43COPoX7gTRkNJ2lNdZQ  0
amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q  0
$ sudo rabbitmqctl list_bindings | grep direct_logs
direct_logs     exchange        amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q  queue   error   []
direct_logs     exchange        amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q  queue   info    []
direct_logs     exchange        amq.gen-oT43COPoX7gTRkNJ2lNdZQ  queue   info    []
direct_logs     exchange        amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q  queue   warning []

Как видно из вывода в rabbitmq имеется exchange с именем direct_logs и типом direct, две очереди созданные запущенными скриптами receive_log.py и набор bindings, которые связывают exchange direct_logs с очередями с помощью routing keys с именами info, warning и error.

Topics

Помимо связывания exchange с очередью с помощью простого routing key, в rabbitmq имеется более гибкий exchange типа topic, который позволяет использовать в качестве ключа список слов разделенных точками. А также позволяет использовать специальные символы:

  • * - может заменить одно слово

  • # - может заменить любое количество слов Таким образом в качестве routing key могут использоваться различные варианты: *.orange.*, *.*.rabbit, lazy.#.

Изменим наш скрипт send_log.py для использования exchange типа topic:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

Логика остается такой же, только теперь мы можем указывать routing key состоящий из нескольких слов разделенных точкой.

Также изменим скрипт receive_log.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Как и в предыдущем разделе запустим в двух терминала receive_log.py и в отдельном send_log.py:

$ python3 receive_log.py *.info test.*
 [*] Waiting for logs. To exit press CTRL+C
 [x] test.info:b'test message'
 [x] dev.info:b'test message'
 [x] test.one:b'test message'
$ python3 receive_log.py 'test.#'
 [*] Waiting for logs. To exit press CTRL+C
 [x] test.one:b'test message'
 [x] test.one.two:b'test message'
$ python3 send_log.py test.info 'test message'
 [x] Sent test.info:test message
$ python3 send_log.py dev.info 'test message'
 [x] Sent dev.info:test message
$ python3 send_log.py test.one 'test message'
 [x] Sent test.one:test message
$ python3 send_log.py test.one.two 'test message'
 [x] Sent test.one.two:test message

При запущенных скриптах receive_log.py посмотрим информацию с помощью rabbitmqctl:

$ sudo rabbitmqctl list_exchanges | grep topic_logs
topic_logs      topic
$ sudo rabbitmqctl list_queues --quiet
name    messages
amq.gen-HHnB6I713X9fiH8cyM_RCA  0
amq.gen-yryF4eJWFgBMSTpPPEmHOQ  0
$ sudo rabbitmqctl list_bindings | grep topic_logs
topic_logs      exchange        amq.gen-HHnB6I713X9fiH8cyM_RCA  queue   *.info  []
topic_logs      exchange        amq.gen-yryF4eJWFgBMSTpPPEmHOQ  queue   test.#  []
topic_logs      exchange        amq.gen-HHnB6I713X9fiH8cyM_RCA  queue   test.*  []

Как видно вывод аналогичен exchange с типом direct, но в качестве routing key используются более гибкие варианты ключей.

RPC

С помощью rabbitmq также возможно реализовать rpc(remote procedure call), это можно реализовать следующей последовательностью:

  • Клиент при старте создает анонимную эксклюзивную очередь для ответов.

  • Для rpc запросов клиент отправляет сообщение с двумя параметрами: reply_to с указанием очереди ответов и correlation_id с уникальным идентификатором запроса.

  • Запрос отправляется в очередь rpc_queue.

  • RPC сервер ожидает запросы в этой очереди, при получении выполняет обработку и возвращает обратно клиенту в очередь из параметра reply_to.

  • Клиент ожидает сообщения в очереди для ответов, валидирует correlation_id и возвращает ответ в приложение.

Создадим скрипт rpc_server.py, который будет получать число из очереди и вычислять для него значение в ряду Фибоначчи:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(f" [.] fib({n})")
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

А также создадим клиента, который будет вызывать rpc на сервере:

#!/usr/bin/env python
import pika
import uuid
import sys


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        self.connection.process_data_events(time_limit=None)
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

n = sys.argv[1] if len(sys.argv) > 1 else 1

print(f" [x] Requesting fib({n})")
response = fibonacci_rpc.call(n)
print(f" [.] Got {response}")

Теперь запустим в разных терминалах сервер и клиент:

$ python3 rpc_server.py
 [x] Awaiting RPC requests
 [.] fib(1)
 [.] fib(10)
 [.] fib(30)
$ python3 rpc_client.py 1
 [x] Requesting fib(1)
 [.] Got 1
$ python3 rpc_client.py 10
 [x] Requesting fib(10)
 [.] Got 55
$ python3 rpc_client.py 30
 [x] Requesting fib(30)
 [.] Got 832040

Как видно, используя rabbitmq нам удалось реализовать RPC.