Apache Kafka Cluster
В данном практическом занятии рассмотрим работу Consumer Groups, а также распределение партиций между брокерами в кластере Apache Kafka.
Vagrant
Для работы с kafka воспользуемся следующим Vagrantfile
:
Vagrant.configure("2") do |config|
config.vm.define "broker1" do |c|
c.vm.box = "ubuntu/lunar64"
c.vm.hostname = "broker1"
c.vm.network "private_network", type: "dhcp"
c.vm.provision "shell", inline: <<-SHELL
apt-get update -q
apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
mkdir /opt/kafka
curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
| tar xz --strip-components=1 -C /opt/kafka
sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
/opt/kafka/config/kraft/server.properties
/opt/kafka/bin/kafka-storage.sh format \
-t "qk89etSXRw6bZhzLg6QWKA" \
-c /opt/kafka/config/kraft/server.properties
systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
/opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/kraft/server.properties
echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
SHELL
end
config.vm.define "broker2" do |c|
c.vm.box = "ubuntu/lunar64"
c.vm.hostname = "broker2"
c.vm.network "private_network", type: "dhcp"
c.vm.provision "shell", inline: <<-SHELL
apt-get update -q
apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
mkdir /opt/kafka
curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
| tar xz --strip-components=1 -C /opt/kafka
sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
/opt/kafka/config/kraft/server.properties
/opt/kafka/bin/kafka-storage.sh format \
-t "qk89etSXRw6bZhzLg6QWKA" \
-c /opt/kafka/config/kraft/server.properties
systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
/opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/kraft/server.properties
echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
SHELL
end
config.vm.define "broker3" do |c|
c.vm.box = "ubuntu/lunar64"
c.vm.hostname = "broker3"
c.vm.network "private_network", type: "dhcp"
c.vm.provision "shell", inline: <<-SHELL
apt-get update -q
apt-get install -yq libnss-mdns python3-kafka openjdk-17-jre
mkdir /opt/kafka
curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
| tar xz --strip-components=1 -C /opt/kafka
sed -i "/^node.id=/s/=.*/=${HOSTNAME: -1}/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^advertised.listeners=/s/localhost/${HOSTNAME}.local/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^controller.quorum.voters=/s/=.*/=1@broker1.local:9093,2@broker2.local:9093,3@broker3.local:9093/" \
/opt/kafka/config/kraft/server.properties
sed -i "/^offsets.topic.replication.factor=/s/=.*/=2/" \
/opt/kafka/config/kraft/server.properties
/opt/kafka/bin/kafka-storage.sh format \
-t "qk89etSXRw6bZhzLg6QWKA" \
-c /opt/kafka/config/kraft/server.properties
systemd-run -p Restart=always -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
/opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/kraft/server.properties
echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
SHELL
end
end
После развертывания мы получим кластер из трех узлов. Базово операции будем
производить на машине broker1
.
Consumer Groups
Для чтения из топика с несколькими партициями множеством консьюмеров существует удобный механизм Consumer Groups, который позволяет распределить партиции между консьюмерами в рамках группы.
Single Consumer
Создадим топик с двумя партициями:
$ kafka-topics.sh --create --topic test --partitions 2 --bootstrap-server localhost:9092
Created topic test.
$ kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test TopicId: 0AT7xhxMSU6Iu4sXKmGilA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Запустим в отдельном терминале консьюмера, указав для него группу, и отдельно запустим продюсера, который будет отправлять сообщения по разным партициям:
$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
2
3
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>1:1
>0:2
>1:3
>
$ kafka-get-offsets.sh --topic test --bootstrap-server localhost:9092
test:0:2
test:1:2
Как видно в каждой партиции по 2 сообщения и консьюмер считал их все.
Multiple Consumers
Попробуем запустить еще одного консьюмера в этой же группе в отдельном терминале, оставив запущенным старый и запишем еще несколько сообщений:
$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
2
3
5
7
$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
4
6
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:4
>1:5
>0:6
>1:7
>
$ kafka-get-offsets.sh --topic test --bootstrap-server localhost:9092
test:0:4
test:1:4
Как видно, после добавления консьюмера в группу сообщения из разных партиций стали распределяться по разным консьюмерам. Дальнейшее добавление консьюмеров в группу не даст никакого эффекта, так как в топике всего две партиции, а для исключения множественной обработки одних данных разными консьюмерами, предусмотрено чтение из одной партиции только одним консьюмером. Для увеличения числа консьюмеров необходимо иметь большее число партиций.
Consumer Offsets
Также для групп консьюмеров создается специальный топик __consumer_offsets
,
в котором сохраняется информация от консьюмеров об обработанных сообщениях.
Информацию о группе можно получить командой kafka-consumer-groups.sh
:
$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 4 4 0 console-consumer-6156fc4c-3e3c-4c25-b818-b43153dc6878 /192.168.56.79 console-consumer
test test 1 4 4 0 console-consumer-916bf0ae-2b17-44fd-b6ef-1e6b8cd71526 /192.168.56.79 console-consumer
Как видно у нас в группе два консьюмера и текущий offset обработанных сообщений консьюмерами совпадает с концом в партиции. Отключим теперь наших консьюмеров, если они еще запущены и попробуем записать еще несколько сообщений в топик:
$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 4 4 0 - - -
test test 1 4 4 0 - - -
$ kafka-console-producer.sh --topic test --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>1:1
>
$ kafka-conumer-groups.sh --describe --group test: --bootstrap-server localhost:9092
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 4 5 1 - - -
test test 1 4 5 1 - - -
Теперь в выводе видно, что в партиции топика добавлены еще по одному сообщению и текущий offset консьюмеров начал отставать. Запустим консьюмер в этой группе и убедимся, что он вычитает сообщения с заданного offset:
$ kafka-console-consumer.sh --topic test --group test --bootstrap-server localhost:9092
0
1
^CProcessed a total of 2 messages
$ kafka-consumer-groups.sh --describe --group test --bootstrap-server localhost:9092
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 5 5 0 - - -
test test 1 5 5 0 - - -
Cluster
Apache Kafka изначально проектировался как распределенная масштабируемая
система, которая работает на множестве узлов и позволяет реплицировать данные
между ними для отказоустойчивости. С помощью vagrant
мы развернули кластер на
трех узлах, попробуем воспроизвести отказ одного из них и посмотреть на результат.
Prepare
Информацию о узлах кластера можем посмотреть командой
kafka-broker-api-versions.sh
:
$ kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep '^\w'
broker2.local:9092 (id: 2 rack: null) -> (
broker1.local:9092 (id: 1 rack: null) -> (
broker3.local:9092 (id: 3 rack: null) -> (
Для репликации партиций создадим новый топик, указав параметр replication factor
в 2:
$ kafka-topics.sh --create --topic replicated --partitions 2 --replication-factor 2 --bootstrap-server localhost:9092
Created topic replicated.
$ kafka-topics.sh --describe --topic replicated --bootstrap-server localhost:9092
Topic: replicated TopicId: O8Z52kMvQWSsOwsJOpoOlw PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: replicated Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: replicated Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Как видно, у нас имеется топик replicated
с двумя партициями, сами же партиции
имеют по две реплики на разных узлах.
Consume
Запустим пару консьюмеров в разных терминалах, как это делали в предыдущем разделе и запишем несколько сообщений в топик:
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
0
1
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
2
3
$ kafka-console-producer.sh --topic replicated --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:0
>0:1
>1:2
>1:3
>
Stop Broker
Попробуем отключить второй брокер, так как на нем находится лидер для одной из
партиций, для этого зайдем на машину broker2
и выполним команду:
$ sudo systemctl stop kafka
Вернемся обратно на первый узел и посмотрим состояние:
$ kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep '^\w'
broker1.local:9092 (id: 1 rack: null) -> (
broker3.local:9092 (id: 3 rack: null) -> (
$ kafka-topics.sh --describe --topic replicated --bootstrap-server localhost:9092
Topic: replicated TopicId: O8Z52kMvQWSsOwsJOpoOlw PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: replicated Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1
Topic: replicated Partition: 1 Leader: 3 Replicas: 2,3 Isr: 3
Как видно, узел broker2
не отображается в списке, а лидером у партиции стал
узел 3.
При этом наши консьюмеры продолжают работать, допишем еще несколько сообщений
в топик, чтобы убедиться:
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
0
1
4
6
$ kafka-console-consumer.sh --topic replicated --group replicated --bootstrap-server localhost:9092
2
3
5
7
$ kafka-console-producer.sh --topic replicated --property parse.key=true --property key.separator=: --bootstrap-server localhost:9092
>0:4
>1:5
>0:6
>1:7
>
Таким образом при потери узла работоспособность приложений не нарушилась.