Apache Kafka Cluster

В данном практическом занятии рассмотрим работу Consumer Groups, а также распределение партиций между брокерами в кластере Apache Kafka.

Vagrant

Для работы с kafka воспользуемся следующим Vagrantfile:

Vagrant.configure("2") do |config|
  config.vm.define "broker1" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "broker1"
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
      mkdir /opt/kafka
      curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
        | tar xz --strip-components=1 -C /opt/kafka
      sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
        /opt/kafka/config/kraft/server.properties
      /opt/kafka/bin/kafka-storage.sh format \
        -t "qk89etSXRw6bZhzLg6QWKA" \
        -c /opt/kafka/config/kraft/server.properties
      systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
        /opt/kafka/bin/kafka-server-start.sh \
        /opt/kafka/config/kraft/server.properties
      echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
    SHELL
  end
  config.vm.define "broker2" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "broker2"
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
      mkdir /opt/kafka
      curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
        | tar xz --strip-components=1 -C /opt/kafka
      sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
        /opt/kafka/config/kraft/server.properties
      /opt/kafka/bin/kafka-storage.sh format \
        -t "qk89etSXRw6bZhzLg6QWKA" \
        -c /opt/kafka/config/kraft/server.properties
      systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
        /opt/kafka/bin/kafka-server-start.sh \
        /opt/kafka/config/kraft/server.properties
      echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
    SHELL
  end
  config.vm.define "broker3" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "broker3"
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
      mkdir /opt/kafka
      curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
        | tar xz --strip-components=1 -C /opt/kafka
      sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
        /opt/kafka/config/kraft/server.properties
      sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
        /opt/kafka/config/kraft/server.properties
      /opt/kafka/bin/kafka-storage.sh format \
        -t "qk89etSXRw6bZhzLg6QWKA" \
        -c /opt/kafka/config/kraft/server.properties
      systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
        /opt/kafka/bin/kafka-server-start.sh \
        /opt/kafka/config/kraft/server.properties
      echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
    SHELL
  end
end

После развертывания мы получим кластер из трех узлов. Базово операции будем производить на машине broker1.

Consumer Groups

Для чтения из топика с несколькими партициями множеством консьюмеров существует удобный механизм Consumer Groups, который позволяет распределить партиции между консьюмерами в рамках группы.

Single Consumer

Создадим топик с двумя партициями:

$ kafka-topics.sh --create --topic test --partitions 2 --bootstrap-server localhost:9092
Created topic test.
$ kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test     TopicId: 0AT7xhxMSU6Iu4sXKmGilA PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 3       Replicas: 3     Isr: 3
        Topic: test     Partition: 1    Leader: 1       Replicas: 1     Isr: 1

Запустим в отдельном терминале консьюмера, указав для него группу, и отдельно запустим продюсера, который будет отправлять сообщения по разным партициям:

$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
2
3
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>1:1
>0:2
>1:3
>
$ kafka-get-offsets.sh --topic test --bootstrap-server localhost:9092
test:0:2
test:1:2

Как видно в каждой партиции по 2 сообщения и консьюмер считал их все.

Multiple Consumers

Попробуем запустить еще одного консьюмера в этой же группе в отдельном терминале, оставив запущенным старый и запишем еще несколько сообщений:

$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
2
3
5
7
$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
4
6
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:4
>1:5
>0:6
>1:7
>
$ kafka-get-offsets.sh --topic test --bootstrap-server localhost:9092
test:0:4
test:1:4

Как видно, после добавления консьюмера в группу сообщения из разных партиций стали распределяться по разным консьюмерам. Дальнейшее добавление консьюмеров в группу не даст никакого эффекта, так как в топике всего две партиции, а для исключения множественной обработки одних данных разными консьюмерами, предусмотрено чтение из одной партиции только одним консьюмером. Для увеличения числа консьюмеров необходимо иметь большее число партиций.

Consumer Offsets

Также для групп консьюмеров создается специальный топик __consumer_offsets, в котором сохраняется информация от консьюмеров об обработанных сообщениях. Информацию о группе можно получить командой kafka-consumer-groups.sh:

$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
test            test            0          4               4               0               console-consumer-6156fc4c-3e3c-4c25-b818-b43153dc6878 /192.168.56.79  console-consumer
test            test            1          4               4               0               console-consumer-916bf0ae-2b17-44fd-b6ef-1e6b8cd71526 /192.168.56.79  console-consumer

Как видно у нас в группе два консьюмера и текущий offset обработанных сообщений консьюмерами совпадает с концом в партиции. Отключим теперь наших консьюмеров, если они еще запущены и попробуем записать еще несколько сообщений в топик:

$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
Consumer group 'test' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            test            0          4               4               0               -               -               -
test            test            1          4               4               0               -               -               -
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>1:1
>
$ kafka-conumer-groups.sh --describe --group test: --bootstrap-server localhost:9092
Consumer group 'test' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            test            0          4               5               1               -               -               -
test            test            1          4               5               1               -               -               -

Теперь в выводе видно, что в партиции топика добавлены еще по одному сообщению и текущий offset консьюмеров начал отставать. Запустим консьюмер в этой группе и убедимся, что он вычитает сообщения с заданного offset:

$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
^CProcessed a total of 2 messages
$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
Consumer group 'test' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            test            0          5               5               0               -               -               -
test            test            1          5               5               0               -               -               -

Cluster

Apache Kafka изначально проектировался как распределенная масштабируемая система, которая работает на множестве узлов и позволяет реплицировать данные между ними для отказоустойчивости. С помощью vagrant мы развернули кластер на трех узлах, попробуем воспроизвести отказ одного из них и посмотреть на результат.

Prepare

Информацию о узлах кластера можем посмотреть командой kafka-broker-api-versions.sh:

$ kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep '^\w'
broker2.local:9092 (id: 2 rack: null) -> (
broker1.local:9092 (id: 1 rack: null) -> (
broker3.local:9092 (id: 3 rack: null) -> (

Для репликации партиций создадим новый топик, указав параметр replication factor в 2:

$ kafka-topics.sh --create --topic replicated --partitions 2 --replication-factor 2 --bootstrap-server localhost:9092
Created topic replicated.
$ kafka-topics.sh --describe --topic replicated --bootstrap-server localhost:9092
Topic: replicated       TopicId: O8Z52kMvQWSsOwsJOpoOlw PartitionCount: 2       ReplicationFactor: 2     Configs: segment.bytes=1073741824
        Topic: replicated       Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: replicated       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3

Как видно, у нас имеется топик replicated с двумя партициями, сами же партиции имеют по две реплики на разных узлах.

Consume

Запустим пару консьюмеров в разных терминалах, как это делали в предыдущем разделе и запишем несколько сообщений в топик:

$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
0
1
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
2
3
$ kafka-console-producer.sh --topic replicated --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>0:1
>1:2
>1:3
>

Stop Broker

Попробуем отключить второй брокер, так как на нем находится лидер для одной из партиций, для этого зайдем на машину broker2 и выполним команду:

$ sudo systemctl stop kafka

Вернемся обратно на первый узел и посмотрим состояние:

$ kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep '^\w'
broker1.local:9092 (id: 1 rack: null) -> (
broker3.local:9092 (id: 3 rack: null) -> (
$ kafka-topics.sh --describe --topic replicated --bootstrap-server localhost:9092
Topic: replicated       TopicId: O8Z52kMvQWSsOwsJOpoOlw PartitionCount: 2       ReplicationFactor: 2     Configs: segment.bytes=1073741824
        Topic: replicated       Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1
        Topic: replicated       Partition: 1    Leader: 3       Replicas: 2,3   Isr: 3

Как видно, узел broker2 не отображается в списке, а лидером у партиции стал узел 3. При этом наши консьюмеры продолжают работать, допишем еще несколько сообщений в топик, чтобы убедиться:

$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
0
1
4
6
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
2
3
5
7
$ kafka-console-producer.sh --topic replicated --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:4
>1:5
>0:6
>1:7
>

Таким образом при потери узла работоспособность приложений не нарушилась.