Apache Kafka

В данном практическом занятии познакомимся с базовой работой с брокером сообщений Apache Kafka.

Vagrant

Для работы с kafka воспользуемся следующим Vagrantfile:

Vagrant.configure("2") do |config|
  config.vm.define "node" do |c|
    c.vm.box = "ubuntu/lunar64"
    c.vm.hostname = "node"
    c.vm.network "private_network", type: "dhcp"
    c.vm.provision "shell", inline: <<-SHELL
      apt-get update -q
      apt-get install -yq python3-kafka openjdk-17-jre
      mkdir /opt/kafka
      curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz \
        | tar xz --strip-components=1 -C /opt/kafka
      /opt/kafka/bin/kafka-storage.sh format \
        -t "$(/opt/kafka/bin/kafka-storage.sh random-uuid)" \
        -c /opt/kafka/config/kraft/server.properties
      systemd-run -u kafka -E KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
        /opt/kafka/bin/kafka-server-start.sh \
        /opt/kafka/config/kraft/server.properties
      echo 'PATH="$PATH:/opt/kafka/bin"' > /etc/profile.d/kafka.sh
    SHELL
  end
end

После развертывания виртуальной машины на ней будет находиться сервер kafka, а также библиотека для взаимодействия из языка python - python-kafka.

Status

Вместе с дистрибутивом apache kafka идет набор утилит для работы, которые находятся в директории /opt/kafka/bin. Убедимся в том, что сервер работает, запустив утилиту kafka-broker-api-versions.sh:

$ kafka-broker-api-versions.sh --bootstrap-server localhost:9092
localhost:9092 (id: 1 rack: null) -> (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 15 [usable: 15],
        ListOffsets(2): 0 to 8 [usable: 8],
        Metadata(3): 0 to 12 [usable: 12],
        LeaderAndIsr(4): UNSUPPORTED,
        StopReplica(5): UNSUPPORTED,
        UpdateMetadata(6): UNSUPPORTED,
        ControlledShutdown(7): UNSUPPORTED,
        OffsetCommit(8): 0 to 8 [usable: 8],
        OffsetFetch(9): 0 to 8 [usable: 8],
        FindCoordinator(10): 0 to 4 [usable: 4],
        JoinGroup(11): 0 to 9 [usable: 9],
        Heartbeat(12): 0 to 4 [usable: 4],
        LeaveGroup(13): 0 to 5 [usable: 5],
        SyncGroup(14): 0 to 5 [usable: 5],
        DescribeGroups(15): 0 to 5 [usable: 5],
        ListGroups(16): 0 to 4 [usable: 4],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 3 [usable: 3],
        CreateTopics(19): 0 to 7 [usable: 7],
        DeleteTopics(20): 0 to 6 [usable: 6],
        DeleteRecords(21): 0 to 2 [usable: 2],
        InitProducerId(22): 0 to 4 [usable: 4],
        OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
        AddPartitionsToTxn(24): 0 to 4 [usable: 4],
        AddOffsetsToTxn(25): 0 to 3 [usable: 3],
        EndTxn(26): 0 to 3 [usable: 3],
        WriteTxnMarkers(27): 0 to 1 [usable: 1],
        TxnOffsetCommit(28): 0 to 3 [usable: 3],
        DescribeAcls(29): 0 to 3 [usable: 3],
        CreateAcls(30): 0 to 3 [usable: 3],
        DeleteAcls(31): 0 to 3 [usable: 3],
        DescribeConfigs(32): 0 to 4 [usable: 4],
        AlterConfigs(33): 0 to 2 [usable: 2],
        AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
        DescribeLogDirs(35): 0 to 4 [usable: 4],
        SaslAuthenticate(36): 0 to 2 [usable: 2],
        CreatePartitions(37): 0 to 3 [usable: 3],
        CreateDelegationToken(38): 0 to 3 [usable: 3],
        RenewDelegationToken(39): 0 to 2 [usable: 2],
        ExpireDelegationToken(40): 0 to 2 [usable: 2],
        DescribeDelegationToken(41): 0 to 3 [usable: 3],
        DeleteGroups(42): 0 to 2 [usable: 2],
        ElectLeaders(43): 0 to 2 [usable: 2],
        IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
        AlterPartitionReassignments(45): 0 [usable: 0],
        ListPartitionReassignments(46): 0 [usable: 0],
        OffsetDelete(47): 0 [usable: 0],
        DescribeClientQuotas(48): 0 to 1 [usable: 1],
        AlterClientQuotas(49): 0 to 1 [usable: 1],
        DescribeUserScramCredentials(50): 0 [usable: 0],
        AlterUserScramCredentials(51): 0 [usable: 0],
        DescribeQuorum(55): 0 to 1 [usable: 1],
        AlterPartition(56): UNSUPPORTED,
        UpdateFeatures(57): 0 to 1 [usable: 1],
        Envelope(58): UNSUPPORTED,
        DescribeCluster(60): 0 [usable: 0],
        DescribeProducers(61): 0 [usable: 0],
        UnregisterBroker(64): 0 [usable: 0],
        DescribeTransactions(65): 0 [usable: 0],
        ListTransactions(66): 0 [usable: 0],
        AllocateProducerIds(67): UNSUPPORTED,
        ConsumerGroupHeartbeat(68): UNSUPPORTED
)

Команда показывает доступные апи вызовы, которые есть на сервере.

Send

Для записи сообщений необходимо создать топик, в который мы будем писать. Для работы с топиком есть утилита kafka-topics.sh:

$ kafka-topics.sh --create --topic test \
    --bootstrap-server localhost:9092
Created topic test.
$ kafka-topics.sh --list --bootstrap-server localhost:9092
test
$ kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test     TopicId: Tj0UQ5ZSR4WPDcZNQtbkBA PartitionCount: 1       ReplicationFactor: 1     Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

Для записи в топик можно воспользоваться командой kafka-console-producer.sh:

$ kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>hello
>world
>!
>

После записи мы можем посмотреть офсеты в топике командой kafka-get-offsets.sh:

$ kafka-get-offsets.sh --bootstrap-server localhost:9092
test:0:3

Receive

Таким образом видно, что в partition 0 топика test есть три сообщения. Прочитать мы их можем командой kafka-console-consumer.sh:

$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
hello
world
!
^CProcessed a total of 3 messages

Сообщения после получения не удаляются и их можно повторно перечитать, также можно задать offset и partition:

$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --partition 0 --offset 1
world
!
^CProcessed a total of 2 messages

Multiple Consumers

Одновременно из топика вычитывать сообщения могут несколько консьюмеров, для этого можем запустить команду kafka-console-consumer.sh в паре терминалов, а также в отдельном kafka-console-producer.sh:

$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
1
2
3
4
5
$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
1
2
3
4
5
$ kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3
>4
>5
>

Как видно в таком случае все продюсеры вычитывают одинаковые сообщения. Для распределения сообщений между консьюмерами необходимо использовать несколько партиций, создать дополнительные можно командой kafka-topics.sh:

$ kafka-topics.sh --alter --topic test --partitions 2 --bootstrap-server localhost:9092
$ kafka-get-offsets.sh --bootstrap-server localhost:9092 | grep test
test:0:8
test:1:0

Таким образом мы подключим двух консьюмеров каждого к своей партиции. Для распределения сообщений можно явно указывать партицию для записи, либо воспользоваться механизмом распределения по ключам, для этого к каждому сообщению добавляется ключ и в зависимости от хэша этого ключа будет выбираться партиция. Запустим консьюмеров в двух терминалах и отдельно продюсера:

$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --partition 0
hello2
hello5
$ kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --partition 1
hello1
hello3
hello4
$ kafka-console-producer.sh --topic test --bootstrap-server localhost:9092 --property "parse.key=true" --property key.separator=:
>1:hello1
>2:hello2
>3:hello3
>4:hello4
>5:hello5