RabbitMQ

В данном практическом занятии познакомимся с базовой работой с брокером сообщений RabbitMQ.

Vagrant

Для работы с rabbitmq воспользуемся следующим Vagrantfile:

Vagrant.configure("2") do |config|
  config.vm.define "node" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "node"
    c.vm.network "forwarded_port", guest: 15672, host: 15672
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq python3-pip rabbitmq-server
      rabbitmq-plugins enable rabbitmq_management
      rabbitmqctl add_user admin admin
      rabbitmqctl set_user_tags admin administrator
      rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      pip3 install pika --break-system-packages
    SHELL
  end
end

После развертывания виртуальной машины на ней будет находиться сервер rabbitmq, в нем создастся пользователь admin и наружу будет выставлен веб интерфейс, а также установится библиотека для взаимодействия из языка python - pika.

Status

Взаимодействовать с сервером rabbitmq можно утилитой rabbitmqctl, либо посредством веб интерфейса. Информацию о сервере можно получить с помощью подкоманды status:

$ sudo rabbitmqctl status | tail -15

Low free disk space watermark: 0.05 gb
Free disk space: 39.1751 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API

Как видно из вывода на текущий момент нет активных соединений и очереди отсутствуют. Список очередей можно посмотреть подкомандой list_queues:

$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

Send

Создадим скрипт на python, который создаст очередь при ее отсутствии и добавит в нее сообщение, переданное через аргументы запуска:

#!/usr/bin/env python3
import sys, pika

if len(sys.argv) < 2:
    print('need message')
    sys.exit()

msg = ' '.join(sys.argv[1:])

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='test')

channel.basic_publish(exchange='', routing_key='test', body=msg)
print(" [x] Sent '{}'".format(msg))
connection.close()

Как видно в данном скрипте мы декларируем очередь с именем test и публикуем в нее сообщение. Сохраним данный скрипт в файле send.py и запустим его:

$ python3 send.py test
 [x] Sent 'test'

Если теперь посмотреть список очередей, то можно увидеть:

$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
test    1

Появилась очередь test с одним сообщением в ней.

Receive

Создадим также скрипт, который будет получать сообщение из этой же очереди:

#!/usr/bin/env python3
import pika, sys

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='test')

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        pass

Сохраним его в файле receive.py и запустим:

$ python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test'
^C
$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
test    0

Мы получили сообщение из очереди, и, как видно из вывода команды list_queues, сообщение было из очереди удалено, так как в скрипте у нас выставлен параметр auto_ack=True(автоматическое подтверждение получения).

FIFO

Обработка очереди происходит в том же порядке, что и добавление по принципу FIFO. Запустим скрипт send.py в цикле для добавления в очередь нескольких сообщений:

$ for i in {1..10};do python3 send.py test $i;done
 [x] Sent 'test 1'
 [x] Sent 'test 2'
 [x] Sent 'test 3'
 [x] Sent 'test 4'
 [x] Sent 'test 5'
 [x] Sent 'test 6'
 [x] Sent 'test 7'
 [x] Sent 'test 8'
 [x] Sent 'test 9'
 [x] Sent 'test 10'
$ python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test 1'
 [x] Received b'test 2'
 [x] Received b'test 3'
 [x] Received b'test 4'
 [x] Received b'test 5'
 [x] Received b'test 6'
 [x] Received b'test 7'
 [x] Received b'test 8'
 [x] Received b'test 9'
 [x] Received b'test 10'
^C

Как видно обработка произошла в той же последовательности, что и добавление.

Multiple Consumers

В одной очереди может быть несколько получателей между которыми будут распределяться сообщения, запустим скрипт receive.py в паре разных терминалов, а в отдельном цикл с send.py:

$ for i in {1..10};do python3 send.py test $i;done
 [x] Sent 'test 1'
 [x] Sent 'test 2'
 [x] Sent 'test 3'
 [x] Sent 'test 4'
 [x] Sent 'test 5'
 [x] Sent 'test 6'
 [x] Sent 'test 7'
 [x] Sent 'test 8'
 [x] Sent 'test 9'
 [x] Sent 'test 10'
$ python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test 1'
 [x] Received b'test 3'
 [x] Received b'test 5'
 [x] Received b'test 7'
 [x] Received b'test 9'
$ python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test 2'
 [x] Received b'test 4'
 [x] Received b'test 6'
 [x] Received b'test 8'
 [x] Received b'test 10'

Для того, чтобы одновременно отправить сообщение нескольким потребителям необходимо изменить тип exchange на fanout и сделать отдельные очереди для каждого потребителя и binding к ним. Добавим в наши скрипты send.py и receive.py:

#!/usr/bin/env python3
import sys, pika

if len(sys.argv) < 2:
    print('need message')
    sys.exit()

msg = ' '.join(sys.argv[1:])

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# Декларация exchange с типом fanout
channel.exchange_declare(exchange='fanout', exchange_type='fanout')
# channel.queue_declare(queue='test')

channel.basic_publish(exchange='fanout', routing_key='test', body=msg)
print(" [x] Sent '{}'".format(msg))
connection.close()
#!/usr/bin/env python3
import pika, sys

def main(queue):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # Декларация exchange с типом fanout
    channel.exchange_declare(exchange='fanout', exchange_type='fanout')
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    # Создание связи exchange с конкретной очередью
    channel.queue_bind(exchange='fanout', queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        if len(sys.argv) < 2:
            sys.exit()
        main(sys.argv[1])
    except KeyboardInterrupt:
        pass

И также запустим в разных терминалах:

$ for i in {1..10};do python3 send.py test $i;done
 [x] Sent 'test 1'
 [x] Sent 'test 2'
 [x] Sent 'test 3'
 [x] Sent 'test 4'
 [x] Sent 'test 5'
 [x] Sent 'test 6'
 [x] Sent 'test 7'
 [x] Sent 'test 8'
 [x] Sent 'test 9'
 [x] Sent 'test 10'
$ python3 receive.py test1
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test 1'
 [x] Received b'test 2'
 [x] Received b'test 3'
 [x] Received b'test 4'
 [x] Received b'test 5'
 [x] Received b'test 6'
 [x] Received b'test 7'
 [x] Received b'test 8'
 [x] Received b'test 9'
 [x] Received b'test 10'
$ python3 receive.py test2
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test 1'
 [x] Received b'test 2'
 [x] Received b'test 3'
 [x] Received b'test 4'
 [x] Received b'test 5'
 [x] Received b'test 6'
 [x] Received b'test 7'
 [x] Received b'test 8'
 [x] Received b'test 9'
 [x] Received b'test 10'