RabbitMQ Routing
В данном практическом занятии различные способы маршрутизации сообщений в RabbitMQ.
Vagrant
Для работы с rabbitmq воспользуемся следующим Vagrantfile
:
Vagrant.configure("2") do |config|
config.vm.define "node" do |c|
c.vm.box = "ubuntu/lunar64"
c.vm.hostname = "node"
c.vm.network "forwarded_port", guest: 15672, host: 15672
c.vm.network "private_network", type: "dhcp"
c.vm.provision "shell", inline: <<-SHELL
apt-get update -q
apt-get install -yq python3-pip rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
pip3 install pika --break-system-packages
SHELL
end
end
После развертывания виртуальной машины на ней будет находиться сервер rabbitmq,
в нем создастся пользователь admin
и наружу будет выставлен
веб интерфейс, а также установится библиотека для
взаимодействия из языка python
- pika.
Direct Exchange
В прошлом практическом занятии использовался exchange типа fanout
, который
позволял отправлять сообщения сразу всем получателям. Теперь же попробуем
определить для каждого получателя свои сообщения с помощью exchange типа direct
.
Реализуем на примере отправки логов разной важности - info
, warning
и error
.
Для этого создадим скрипт send_log.py
для отправки сообщения, который будет
использовать exchange типа direct
, а в качестве routing key
- важность
сообщения.
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()
Данный скрипт позволяет отправлять сообщения, указывая первым аргументом его важность.
Для получения сообщений создадим еще один скрипт receive_log.py
, который будет
определять очереди и связывать их с exchange с помощью routing key.
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Данный скрипт позволяет создать очередь и с помощью аргументов запуска связать ее с exchange по нескольким уровням важности сообщений, которые необходимо получать, а также регистрирует callback, который выводит сообщения на экран.
Запустим в паре терминалов второй скрипт, который будет получать сообщения с разными уровнями важности, а в отдельном терминале скрипт с отправкой сообщений:
$ python3 receive_log.py info
[*] Waiting for logs. To exit press CTRL+C
[x] info:b'test info message'
$ python3 receive_log.py info error warning
[*] Waiting for logs. To exit press CTRL+C
[x] info:b'test info message'
[x] warning:b'test warning message'
[x] error:b'test error message'
$ python3 send_log.py info 'test info message'
[x] Sent info:test info message
$ python3 send_log.py warning 'test warning message'
[x] Sent warning:test warning message
$ python3 send_log.py error 'test error message'
[x] Sent error:test error message
При запущенных скриптах receive_log.py
посмотрим информацию с помощью
rabbitmqctl
:
$ sudo rabbitmqctl list_exchanges | grep direct_logs
direct_logs direct
$ sudo rabbitmqctl list_queues --quiet
name messages
amq.gen-oT43COPoX7gTRkNJ2lNdZQ 0
amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q 0
$ sudo rabbitmqctl list_bindings | grep direct_logs
direct_logs exchange amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q queue error []
direct_logs exchange amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q queue info []
direct_logs exchange amq.gen-oT43COPoX7gTRkNJ2lNdZQ queue info []
direct_logs exchange amq.gen-bEcRRYMM1Tl-UOf06Gsk_Q queue warning []
Как видно из вывода в rabbitmq имеется exchange с именем direct_logs
и типом
direct
, две очереди созданные запущенными скриптами receive_log.py
и набор
bindings
, которые связывают exchange direct_logs
с очередями с помощью
routing keys
с именами info
, warning
и error
.
Topics
Помимо связывания exchange с очередью с помощью простого routing key, в rabbitmq
имеется более гибкий exchange типа topic
, который позволяет использовать в
качестве ключа список слов разделенных точками. А также позволяет использовать
специальные символы:
*
- может заменить одно слово#
- может заменить любое количество слов Таким образом в качестве routing key могут использоваться различные варианты:*.orange.*
,*.*.rabbit
,lazy.#
.
Изменим наш скрипт send_log.py
для использования exchange типа topic
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
Логика остается такой же, только теперь мы можем указывать routing key состоящий из нескольких слов разделенных точкой.
Также изменим скрипт receive_log.py
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Как и в предыдущем разделе запустим в двух терминала receive_log.py
и в
отдельном send_log.py
:
$ python3 receive_log.py *.info test.*
[*] Waiting for logs. To exit press CTRL+C
[x] test.info:b'test message'
[x] dev.info:b'test message'
[x] test.one:b'test message'
$ python3 receive_log.py 'test.#'
[*] Waiting for logs. To exit press CTRL+C
[x] test.one:b'test message'
[x] test.one.two:b'test message'
$ python3 send_log.py test.info 'test message'
[x] Sent test.info:test message
$ python3 send_log.py dev.info 'test message'
[x] Sent dev.info:test message
$ python3 send_log.py test.one 'test message'
[x] Sent test.one:test message
$ python3 send_log.py test.one.two 'test message'
[x] Sent test.one.two:test message
При запущенных скриптах receive_log.py
посмотрим информацию с помощью
rabbitmqctl
:
$ sudo rabbitmqctl list_exchanges | grep topic_logs
topic_logs topic
$ sudo rabbitmqctl list_queues --quiet
name messages
amq.gen-HHnB6I713X9fiH8cyM_RCA 0
amq.gen-yryF4eJWFgBMSTpPPEmHOQ 0
$ sudo rabbitmqctl list_bindings | grep topic_logs
topic_logs exchange amq.gen-HHnB6I713X9fiH8cyM_RCA queue *.info []
topic_logs exchange amq.gen-yryF4eJWFgBMSTpPPEmHOQ queue test.# []
topic_logs exchange amq.gen-HHnB6I713X9fiH8cyM_RCA queue test.* []
Как видно вывод аналогичен exchange с типом direct
, но в качестве routing key
используются более гибкие варианты ключей.
RPC
С помощью rabbitmq также возможно реализовать rpc(remote procedure call), это можно реализовать следующей последовательностью:
Клиент при старте создает анонимную эксклюзивную очередь для ответов.
Для rpc запросов клиент отправляет сообщение с двумя параметрами:
reply_to
с указанием очереди ответов иcorrelation_id
с уникальным идентификатором запроса.Запрос отправляется в очередь
rpc_queue
.RPC сервер ожидает запросы в этой очереди, при получении выполняет обработку и возвращает обратно клиенту в очередь из параметра
reply_to
.Клиент ожидает сообщения в очереди для ответов, валидирует
correlation_id
и возвращает ответ в приложение.
Создадим скрипт rpc_server.py
, который будет получать число из очереди и
вычислять для него значение в ряду Фибоначчи:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(f" [.] fib({n})")
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
А также создадим клиента, который будет вызывать rpc на сервере:
#!/usr/bin/env python
import pika
import uuid
import sys
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
self.connection.process_data_events(time_limit=None)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
n = sys.argv[1] if len(sys.argv) > 1 else 1
print(f" [x] Requesting fib({n})")
response = fibonacci_rpc.call(n)
print(f" [.] Got {response}")
Теперь запустим в разных терминалах сервер и клиент:
$ python3 rpc_server.py
[x] Awaiting RPC requests
[.] fib(1)
[.] fib(10)
[.] fib(30)
$ python3 rpc_client.py 1
[x] Requesting fib(1)
[.] Got 1
$ python3 rpc_client.py 10
[x] Requesting fib(10)
[.] Got 55
$ python3 rpc_client.py 30
[x] Requesting fib(30)
[.] Got 832040
Как видно, используя rabbitmq нам удалось реализовать RPC.