Простые приложения для работы с Kafka на Kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer
В Readme.md этих проектов более подробное описание.
Составные части Kafka
Kafka — брокер сообщений, с помощью которого разные микросервисы общаются друг с другом. Также используется для отправки логов (например в Graylog и Elastic), хранения данных и т. д.
Брокер — узел Kafka, отвечает за прием, сохранение и передачу сообщений между продюсерами (Producer) и консюмерами (Consumer)
Консюмер (Consumer) — получатель сообщения
Продюсер (Producer) — отправитель сообщения
Zookepeer — хранит конфигурации, состояния, обнаруживает брокеров, выбирает контроллер кластера, отслеживает состояние узлов и обеспечивает функциональность и надёжность кластера
Контроллер кластера — отвечает за назначение мастеров партиций и отслеживает состояние брокеров
Offset Kafka — это понятие, которое используется для обозначения позиции в потоке сообщений. Offset отслеживает, на каком месте в потоке находится каждый консюмер, чтобы он мог читать сообщения с нужной позиции. Каждый раз, когда потребитель читает сообщение, его позиция сдвигается на одно сообщение вперед.
Партиция (Partition) — единица многопоточности в Kafka. Число партиций в топике можно лишь увеличивать. Партиции это те самые шарды, которые используются для шардирования в Kafka.
Реплика (Replica) — копия партиции. Реплика может быть размещена на другом узле для обеспечения отказоустойчивости. Обеспечивает репликацию данных.
Топик (topic) — служит для записи и чтения сообщений.
Консюмер группа (Consumer group) — группа получателей сообщений
Zookeeper
Перед запуском самого брокера Kafka, нужно запустить Zookeeper (это сервис координации сервисов с базой "ключ-значение"):
Параметры zookeeper настроить в ~/tools/kafka/config/zookeeper.properties ( https://github.com/cherepakhin/senior/blob/main/new_site/kafka/zookeeper.properties). Основные параметры в zookeeper.properties:
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=true admin.serverPort=9080
Скрипт запуска zookeeper( https://github.com/cherepakhin/senior/blob/main/kafka/zookeeper-server-start.sh)
$ ~/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties
/home/vasi - каталог пользователя
tools/kafka - каталог со скриптами kafka
Проверка и просмотр работы zookeeper: http://192.168.1.20:9080/commands
Запуск Kafka
$ ~/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties
Где kafka-server-start.sh - скрипт запуска из дистрибутива Kafka, /home/vasi/tools/kafka/config/server.properties - файл настройки Kafka
Log:
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller,
from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
(kafka.server.BrokerToControllerRequestThread)
[2024-01-21 18:29:10,389] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:
Recorded new controller, from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
(kafka.server.BrokerToControllerRequestThread)
````
Проверка работы из консоли
Создание топика
Создание топика "samples":
$ ~/tools/kafka/bin $ kafka-topics.sh --bootstrap-server 192.168.1.20:9092 --create --topic samples --partitions 1 --replication-factor 1
Отправка сообщения в топик
Отправка сообщения в топик "samples":
$ ~/tools/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
>test1
>test2
>
Прием сообщения из топика
Подключение к топику "samples" и просмотр сообщений:
$ ~/tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
test1
test2
Удаление топика
$ ~/tools/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.20:2181 --delete --topic product_ext_dto_topic
>Topic product_ext_dto_topic is marked for deletion.
>Note: This will have no impact if delete.topic.enable is not set to true.
Для полного удаления топика (с очисткой файлов топика) нужно установить в конфигурационном файле server.properties свойство delete.topic.enable = true
Ссылки:
Простые проекты на kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer
Скрипты для linuх сервисов:
kafka.service
[Unit] Description=Kafka Service Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=vasi Group=vasi ExecStart=/bin/sh -c '/home/vasi/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties > /home/vasi/tools/kafka/logs/kafka.log 2>&1' ExecStop=/home/vasi/tools/kafka/bin/kafka-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=vasi Group=vasi ExecStart=/home/vasi/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties ExecStop=/home/vasi/tools/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Установка, развертывание и использование Kafka (один узел / кластер)
Немного о настройке семантики Kafka
(Работа Apache Kafka на примерах. Поднимаем Kafka Cluster используя docker-compose)
Семантики:
- At most once: В этой семантике продюсер считает сообщение успешно доставленным, как только оно отправлено брокеру, независимо от его фактической доставки потребителю. Это означает, что сообщения могут быть потеряны, если брокер не смог доставить их потребителям. Эта семантика обеспечивает максимальную пропускную способность, но не гарантирует доставку сообщений.
- At least once: В этой семантике продюсер ждет подтверждения (ACK) от брокера о доставке сообщения. Если продюсер не получает подтверждения в течение заданного времени, он повторно отправляет сообщение. Это гарантирует, что сообщения будут доставлены, но может привести к дублированию сообщений в случае сбоев и повторных отправок.
- Exactly once: В этой семантике гарантируется, что каждое сообщение будет доставлено ровно один раз, без дублирования. Продюсер отправляет сообщения в рамках транзакции, и брокер подтверждает их только после успешной записи в журнал и передачи потребителям. Эта семантика обеспечивает наивысший уровень гарантий, но требует дополнительных механизмов и может снижать пропускную способность.
Группы Kafka
https://habr.com/ru/companies/slurm/articles/550934/
https://coralogix.com/blog/create-kafka-topics-in-3-easy-steps/
Topic распределен по partition. Это разделение задается при создании topic (параметр --partitions):
$ kafka/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 2 \
--partitions 3 \
--topic unique-topic-name
Работа консьюмеров хорошо описана в https://habr.com/ru/companies/slurm/articles/550934/