Простые приложения для работы с Kafka на Kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer
В Readme.md этих проектов более подробное описание.
Составные части Kafka
Kafka — брокер сообщений, с помощью которого разные микросервисы общаются друг с другом. Также используется для отправки логов (например в Graylog и Elastic), хранения данных и т. д.
Брокер — узел Kafka, отвечает за прием, сохранение и передачу сообщений между продюсерами (Producer) и консюмерами (Consumer)
Консюмер (Consumer) — получатель сообщения
Продюсер (Producer) — отправитель сообщения
Zookepeer — хранит конфигурации, состояния, обнаруживает брокеров, выбирает контроллер кластера, отслеживает состояние узлов и обеспечивает функциональность и надёжность кластера
Контроллер кластера — отвечает за назначение мастеров партиций и отслеживает состояние брокеров
Offset Kafka — это понятие, которое используется для обозначения позиции в потоке сообщений. Offset отслеживает, на каком месте в потоке находится каждый консюмер, чтобы он мог читать сообщения с нужной позиции. Каждый раз, когда потребитель читает сообщение, его позиция сдвигается на одно сообщение вперед.
Партиция (Partition) — единица многопоточности в Kafka. Число партиций в топике можно лишь увеличивать. Партиции это те самые шарды, которые используются для шардирования в Kafka.
Реплика (Replica) — копия партиции. Реплика может быть размещена на другом узле для обеспечения отказоустойчивости. Обеспечивает репликацию данных.
Топик (topic) — служит для записи и чтения сообщений.
Консюмер группа (Consumer group) — группа получателей сообщений
Zookeeper
Перед запуском самого брокера Kafka, нужно запустить Zookeeper (это сервис координации сервисов с базой "ключ-значение"):
Параметры zookeeper настроить в ~/tools/kafka/config/zookeeper.properties ( https://github.com/cherepakhin/senior/blob/main/new_site/kafka/zookeeper.properties). Основные параметры в zookeeper.properties:
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=true admin.serverPort=9080
Скрипт запуска zookeeper( https://github.com/cherepakhin/senior/blob/main/kafka/zookeeper-server-start.sh)
$ ~/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties
/home/vasi - каталог пользователя
tools/kafka - каталог со скриптами kafka
Проверка и просмотр работы zookeeper: http://192.168.1.20:9080/commands
Запуск Kafka
$ ~/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties
Где kafka-server-start.sh - скрипт запуска из дистрибутива Kafka, /home/vasi/tools/kafka/config/server.properties - файл настройки Kafka
Log:
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller,
from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
(kafka.server.BrokerToControllerRequestThread)
[2024-01-21 18:29:10,389] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:
Recorded new controller, from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
(kafka.server.BrokerToControllerRequestThread)
````
Проверка работы из консоли
Создание топика
Создание топика "samples":
$ ~/tools/kafka/bin $ kafka-topics.sh --bootstrap-server 192.168.1.20:9092 --create --topic samples --partitions 1 --replication-factor 1
Отправка сообщения в топик
Отправка сообщения в топик "samples":
$ ~/tools/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
>test1
>test2
>
Прием сообщения из топика
Подключение к топику "samples" и просмотр сообщений:
$ ~/tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
test1
test2
Удаление топика
$ ~/tools/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.20:2181 --delete --topic product_ext_dto_topic
>Topic product_ext_dto_topic is marked for deletion.
>Note: This will have no impact if delete.topic.enable is not set to true.
Для полного удаления топика (с очисткой файлов топика) нужно установить в конфигурационном файле server.properties свойство delete.topic.enable = true
Ссылки:
Простые проекты на kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer
Скрипты для linuх сервисов:
kafka.service
[Unit] Description=Kafka Service Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=vasi Group=vasi ExecStart=/bin/sh -c '/home/vasi/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties > /home/vasi/tools/kafka/logs/kafka.log 2>&1' ExecStop=/home/vasi/tools/kafka/bin/kafka-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=vasi Group=vasi ExecStart=/home/vasi/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties ExecStop=/home/vasi/tools/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Установка, развертывание и использование Kafka (один узел / кластер)
Немного о настройке семантики Kafka
(Работа Apache Kafka на примерах. Поднимаем Kafka Cluster используя docker-compose)
Семантики:
- At most once: В этой семантике продюсер считает сообщение успешно доставленным, как только оно отправлено брокеру, независимо от его фактической доставки потребителю. Это означает, что сообщения могут быть потеряны, если брокер не смог доставить их потребителям. Эта семантика обеспечивает максимальную пропускную способность, но не гарантирует доставку сообщений.
- At least once: В этой семантике продюсер ждет подтверждения (ACK) от брокера о доставке сообщения. Если продюсер не получает подтверждения в течение заданного времени, он повторно отправляет сообщение. Это гарантирует, что сообщения будут доставлены, но может привести к дублированию сообщений в случае сбоев и повторных отправок.
- Exactly once: В этой семантике гарантируется, что каждое сообщение будет доставлено ровно один раз, без дублирования. Продюсер отправляет сообщения в рамках транзакции, и брокер подтверждает их только после успешной записи в журнал и передачи потребителям. Эта семантика обеспечивает наивысший уровень гарантий, но требует дополнительных механизмов и может снижать пропускную способность.
Группы Kafka
https://habr.com/ru/companies/slurm/articles/550934/
https://coralogix.com/blog/create-kafka-topics-in-3-easy-steps/
Topic распределен по partition. Это разделение задается при создании topic (параметр --partitions):
$ kafka/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 2 \
--partitions 3 \
--topic unique-topic-name
Коротко работа консьюмеров с partitions описана в https://habr.com/ru/companies/slurm/articles/550934/
Группы нужны для распределения нагрузки. Входящие сообщения для какого-то topic распределяются по partition (количество partition задается при создании топика). Каждый консьюмер имеет группу. Если количество консьюмеров в консьюмер группе будет равно количеству партиций, то НАГРУЗКА на каждый консьюмер будет одинакова.
KafkaCat
kcat (formerly kafkacat) Utility for Confluent Platform
Утилита для работы с Kafka.
Пример работы:
Чтение топика hello-topic. Сообщения из топика hello-topic будут автоматически отображаться:
$ kafkacat -b v:9092 -t hello-topic
% Auto-selecting Consumer mode (use -P or -C to override)
Hello World
Hello World1
Hello World3
Отправка сообщений с помощью KafkaCat:
$ echo "Hello World3" | kafkacat -b v:9092 -t hello-topic
Форматированный вывод:
$ kafkacat -b v:9092 -t hello-topic -f '\nKey (%K bytes): %k\tValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713079913 Partition: 0 Offset: 3
--
Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713826402 Partition: 0 Offset: 4
--
Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713827894 Partition: 0 Offset: 5
--
Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741714481961 Partition: 0 Offset: 6
--
% Reached end of topic hello-topic [0] at offset 7
Отправка текстового файла:
$ kafkacat -b v:9092 -t hello-topic -T -P -l ~/temp/1.json
Отправка текстового файла построчно (каждая строка в отдельном сообщении):
$ kafkacat -b v:9092 -t hello-topic -T -P -l ./file_for_send.txt
Ключи:
-b - адрес Kafka (bootstrap server) -t - topic -P - продюсер (-C consummer) -T - вывод в stdout -l - одна строка - одно сообщение
Все ключи:
$ kafkacat --help
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version 1.3.1 (JSON) (librdkafka 0.11.3 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)
General options:
-C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-E Do not exit on non fatal error
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
-h Print usage help
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-J Output with JSON envelope
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options (-L):
-t <topic> Topic to query (optional)
Query options (-Q):
-t <t>:<p>:<ts> Get offset for topic <t>,
partition <p>, timestamp <ts>.
Timestamp is the number of milliseconds
since epoch UTC.
Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.
Multiple -t .. are allowed but a partition
must only occur once.
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%T Message timestamp (milliseconds since epoch UTC)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
Query offset by timestamp:
kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>
Ссылки
Установка Kafka в Linux
UI для Kafka
Полезные инструменты для разработчиков Apache Kafka
Команда kafkacat: опции, ключи и примеры использования