Вы здесь:

Простые приложения для работы с Kafka на Kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer
В Readme.md этих проектов более подробное описание.

Составные части Kafka

Kafka — брокер сообщений, с помощью которого разные микросервисы общаются друг с другом. Также используется для отправки логов (например в Graylog и Elastic), хранения данных и т. д.
Брокер — узел Kafka, отвечает за прием, сохранение и передачу сообщений между продюсерами (Producer) и консюмерами (Consumer)
Консюмер (Consumer) — получатель сообщения
Продюсер (Producer) — отправитель сообщения
Zookepeer — хранит конфигурации, состояния, обнаруживает брокеров, выбирает контроллер кластера, отслеживает состояние узлов и обеспечивает функциональность и надёжность кластера
Контроллер кластера — отвечает за назначение мастеров партиций и отслеживает состояние брокеров
Offset Kafka — это понятие, которое используется для обозначения позиции в потоке сообщений. Offset отслеживает, на каком месте в потоке находится каждый консюмер, чтобы он мог читать сообщения с нужной позиции. Каждый раз, когда потребитель читает сообщение, его позиция сдвигается на одно сообщение вперед.
Партиция (Partition) — единица многопоточности в Kafka. Число партиций в топике можно лишь увеличивать. Партиции это те самые шарды, которые используются для шардирования в Kafka.
Реплика (Replica) — копия партиции. Реплика может быть размещена на другом узле для обеспечения отказоустойчивости. Обеспечивает репликацию данных.
Топик (topic) — служит для записи и чтения сообщений.
Консюмер группа (Consumer group) — группа получателей сообщений

Zookeeper

Перед запуском самого брокера Kafka, нужно запустить Zookeeper (это сервис координации сервисов с базой "ключ-значение"):

Параметры zookeeper настроить в ~/tools/kafka/config/zookeeper.properties ( https://github.com/cherepakhin/senior/blob/main/new_site/kafka/zookeeper.properties). Основные параметры в zookeeper.properties:

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=true
admin.serverPort=9080

Скрипт запуска zookeeper( https://github.com/cherepakhin/senior/blob/main/kafka/zookeeper-server-start.sh)

$ ~/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties

/home/vasi - каталог пользователя
tools/kafka - каталог со скриптами kafka
Проверка и просмотр работы zookeeper: http://192.168.1.20:9080/commands

zookeeper_commands

 

Запуск Kafka

$ ~/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties

Где kafka-server-start.sh - скрипт запуска из дистрибутива Kafka, /home/vasi/tools/kafka/config/server.properties - файл настройки Kafka
Log:

INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller,
    from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
    (kafka.server.BrokerToControllerRequestThread)
[2024-01-21 18:29:10,389] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:
    Recorded new controller, from now on will use node 192.168.1.20:9092 (id: 0 rack: null)
    (kafka.server.BrokerToControllerRequestThread)
````

Проверка работы из консоли

Создание топика

Создание топика "samples":

$ ~/tools/kafka/bin $ kafka-topics.sh --bootstrap-server 192.168.1.20:9092 --create --topic samples --partitions 1 --replication-factor 1

Отправка сообщения в топик

Отправка сообщения в топик "samples":

$ ~/tools/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
>test1
>test2
>

Прием сообщения из топика

Подключение к топику "samples" и просмотр сообщений:

$ ~/tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.20:9092 --topic samples
test1
test2

Удаление топика

$ ~/tools/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.20:2181 --delete --topic product_ext_dto_topic
>Topic product_ext_dto_topic is marked for deletion.
>Note: This will have no impact if delete.topic.enable is not set to true.

Для полного удаления топика (с очисткой файлов топика) нужно установить в конфигурационном файле server.properties свойство delete.topic.enable = true
Ссылки:
Простые проекты на kotlin:
https://github.com/cherepakhin/shop_kafka_producer
https://github.com/cherepakhin/shop_kafka_consumer

Скрипты для linuх сервисов:
kafka.service

[Unit]
Description=Kafka Service
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=vasi
Group=vasi
ExecStart=/bin/sh -c '/home/vasi/tools/kafka/bin/kafka-server-start.sh /home/vasi/tools/kafka/config/server.properties > /home/vasi/tools/kafka/logs/kafka.log 2>&1'
ExecStop=/home/vasi/tools/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

[Install]
WantedBy=multi-user.target

zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=vasi
Group=vasi
ExecStart=/home/vasi/tools/kafka/bin/zookeeper-server-start.sh /home/vasi/tools/kafka/config/zookeeper.properties
ExecStop=/home/vasi/tools/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal


[Install]
WantedBy=multi-user.target


Установка, развертывание и использование Kafka (один узел / кластер)

Немного о настройке семантики Kafka

(Работа Apache Kafka на примерах. Поднимаем Kafka Cluster используя docker-compose)
Семантики:

  • At most once: В этой семантике продюсер считает сообщение успешно доставленным, как только оно отправлено брокеру, независимо от его фактической доставки потребителю. Это означает, что сообщения могут быть потеряны, если брокер не смог доставить их потребителям. Эта семантика обеспечивает максимальную пропускную способность, но не гарантирует доставку сообщений.
  • At least once: В этой семантике продюсер ждет подтверждения (ACK) от брокера о доставке сообщения. Если продюсер не получает подтверждения в течение заданного времени, он повторно отправляет сообщение. Это гарантирует, что сообщения будут доставлены, но может привести к дублированию сообщений в случае сбоев и повторных отправок.
  • Exactly once: В этой семантике гарантируется, что каждое сообщение будет доставлено ровно один раз, без дублирования. Продюсер отправляет сообщения в рамках транзакции, и брокер подтверждает их только после успешной записи в журнал и передачи потребителям. Эта семантика обеспечивает наивысший уровень гарантий, но требует дополнительных механизмов и может снижать пропускную способность.

Группы Kafka

https://habr.com/ru/companies/slurm/articles/550934/
https://coralogix.com/blog/create-kafka-topics-in-3-easy-steps/
Topic распределен по partition. Это разделение задается при создании topic (параметр --partitions):

$ kafka/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 2 \
--partitions 3 \
--topic unique-topic-name

Коротко работа консьюмеров с partitions описана в https://habr.com/ru/companies/slurm/articles/550934/
Группы нужны для распределения нагрузки. Входящие сообщения для какого-то topic распределяются по partition (количество partition задается при создании топика). Каждый консьюмер имеет группу. Если количество консьюмеров в консьюмер группе будет равно количеству партиций, то НАГРУЗКА на каждый консьюмер будет одинакова.

KafkaCat

kcat (formerly kafkacat) Utility for Confluent Platform
Утилита для работы с Kafka.
Пример работы:
Чтение топика hello-topic. Сообщения из топика hello-topic будут автоматически отображаться:

$ kafkacat -b v:9092 -t hello-topic
% Auto-selecting Consumer mode (use -P or -C to override)
Hello World
Hello World1
Hello World3

Отправка сообщений с помощью KafkaCat:

$ echo "Hello World3" | kafkacat -b v:9092 -t hello-topic


Форматированный вывод:

$ kafkacat -b v:9092 -t hello-topic -f '\nKey (%K bytes): %k\tValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713079913	Partition: 0	Offset: 3
--

Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713826402	Partition: 0	Offset: 4
--

Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741713827894	Partition: 0	Offset: 5
--

Key (-1 bytes):
Value (12 bytes): Hello World3
Timestamp: 1741714481961	Partition: 0	Offset: 6
--
% Reached end of topic hello-topic [0] at offset 7


Отправка текстового файла:

$ kafkacat -b v:9092 -t hello-topic -T -P -l ~/temp/1.json


Отправка текстового файла построчно (каждая строка в отдельном сообщении):

$ kafkacat -b v:9092 -t hello-topic -T -P -l ./file_for_send.txt


Ключи:
-b - адрес Kafka (bootstrap server) -t - topic -P - продюсер (-C consummer) -T - вывод в stdout -l - одна строка - одно сообщение
Все ключи:

$ kafkacat --help
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version 1.3.1 (JSON) (librdkafka 0.11.3 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)


General options:
  -C | -P | -L | -Q  Mode: Consume, Produce, Metadata List, Query mode
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -E                 Do not exit on non fatal error
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -X list            List available librdkafka configuration properties
  -X prop=val        Set librdkafka configuration property.
                     Properties prefixed with "topic." are
                     applied as topic properties.
  -X dump            Dump configuration and exit.
  -d <dbg1,...>      Enable librdkafka debugging:
                     all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
  -q                 Be quiet (verbosity set to 0)
  -v                 Increase verbosity
  -V                 Print version
  -h                 Print usage help

Producer options:
  -z snappy|gzip     Message compression. Default: none
  -p -1              Use random partitioner
  -D <delim>         Delimiter to split input into messages
  -K <delim>         Delimiter to split input key and message
  -l                 Send messages from a file separated by
                     delimiter, as with stdin.
                     (only one file allowed)
  -T                 Output sent messages to stdout, acting like tee.
  -c <cnt>           Exit after producing this number of messages
  -Z                 Send empty messages as NULL messages
  file1 file2..      Read messages from files.
                     With -l, only one file permitted.
                     Otherwise, the entire file contents will
                     be sent as one single message.

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
  -e                 Exit successfully when last message received
  -f <fmt..>         Output formatting string, see below.
                     Takes precedence over -D and -K.
  -J                 Output with JSON envelope
  -D <delim>         Delimiter to separate messages on output
  -K <delim>         Print message keys prefixing the message
                     with specified delimiter.
  -O                 Print message offset using -K delimiter
  -c <cnt>           Exit after consuming this number of messages
  -Z                 Print NULL messages and keys as "NULL"(instead of empty)
  -u                 Unbuffered output

Metadata options (-L):
  -t <topic>         Topic to query (optional)

Query options (-Q):
  -t <t>:<p>:<ts>    Get offset for topic <t>,
                     partition <p>, timestamp <ts>.
                     Timestamp is the number of milliseconds
                     since epoch UTC.
                     Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.
                     Multiple -t .. are allowed but a partition
                     must only occur once.

Format string tokens:
  %s                 Message payload
  %S                 Message payload length (or -1 for NULL)
  %R                 Message payload length (or -1 for NULL) serialized
                     as a binary big endian 32-bit signed integer
  %k                 Message key
  %K                 Message key length (or -1 for NULL)
  %T                 Message timestamp (milliseconds since epoch UTC)
  %t                 Topic
  %p                 Partition
  %o                 Message offset
  \n \r \t           Newlines, tab
  \xXX \xNNN         Any ASCII character
 Example:
  -f 'Topic %t [%p] at offset %o: key %k: %s\n'


Consumer mode (writes messages to stdout):
  kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -C -b ...

High-level KafkaConsumer mode:
  kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+

Producer mode (reads messages from stdin):
  ... | kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -P -b ...

Metadata listing:
  kafkacat -L -b <broker> [-t <topic>]

Query offset by timestamp:
  kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>

 

Ссылки

Установка Kafka в Linux
UI для Kafka
Полезные инструменты для разработчиков Apache Kafka
Команда kafkacat: опции, ключи и примеры использования