Запуск Apache Kafka в кластере

Запуск Apache Kafka в кластере

Александр Косарев

Как и любой другой компонент распределённой информационной системы Apache Kafka в условиях реальной эксплуатации рекомендуется разворачивать в кластере для обеспечения отказоустойчивости. Кластер Kafka может быть развёрнут в двух вариантах: с использованием KRaft и Apache ZooKeeper.

Роли нод в кластере

В рамках кластера ноды Kafka могут иметь две роли:

  • Брокер (broker) — ноды данного типа обеспечивают обмен сообщениями с клиентами: отправителями и получателями.
  • Контроллер (controler) — ноды данного типа отвечают за жизненный цикл кластера.

При использовании Apache ZooKeeper контроллер выбирается автоматически из числа брокеров, а при использовании KRaft некоторому количеству нод назначается роль контроллера, и из их числа автоматически выбирается лидер.

KRaft и Apache ZooKeeper

Контроллер отвечает за поддержание элементов кластера в актуальном состоянии, но нужно ещё где-то хранить метаданные кластера: список топиков, их партиции и настройки реплицирования, читателей, их группы, смещения, и т.д.

До Apache Kafka 3.0.0 роль хранилища метаданных была отведена Apache ZooKeeper — сервису, который часто используется в качестве хранилища метаданных, конфигураций и используется при реализации механизма поиска сервисов (service discovery). Недостаток использования ZooKeeper заключается в разделении данных кластера между контроллером и ZooKeeper, что усложняет сопровождение кластера Kafka, так как техническим специалистам нужно сопровождать не только кластер Kafka, но и кластер ZooKeeper.

С версии Kafka 3.0.0 появилась замена ZooKeeper — KRaft (Kafka Raft), реализация алгоритма согласования Raft, позволяющая кворуму контроллеров самостоятельно выбирать активный контроллер, который в свою очередь отвечает за хранение метаданных кластера и его жизненный цикл. Таким образом кластер ZooKeeper можно заменить на кворум контроллеров Apache Kafka, который будет обслуживать кластер.

Начиная с версии Kafka 3.3.1 реализация KRaft является стабильной и рекомендуется к использованию вместо Apache ZooKeeper, а с версии Kafka 4.0 поддержка ZooKeeper будет прекращена.

Далее в данной статье будет рассмотрено развёртывание кластера с KRaft.

Кластер Kafka с KRaft

В качестве примера я буду запускать кластер, состоящий из трёх контроллеров и трёх брокеров, это разумное минимальное количество нод для обеспечения базовой отказоустойчивости. Для развёртывания кластера мне нужно подготовить файлы настроек для каждой ноды, сгенерировать идентификатор кластера и отформатировать директории логов. Все шесть нод будут располагаться в одной системе, поэтому для них будут использоваться разные порты и директории данных, но в условиях реальной эксплуатации каждая нода должна быть расположена в отдельной системе и на разных физических серверах для обеспечения отказоустойчивости.

Дистрибутив Apache Kafka содержит в директории config/kraft уже готовые файлы параметров для брокера, контроллера и сервера (смешанный режим), которые мы можем использовать в качестве основы:

  • broker.properties
  • controller.properties
  • server.properties

Файлы настроек в данной статье приведены для примера и не рассчитаны на использование в условиях реальной эксплуатации. Для более корректной настройки нод ознакомьтесь с официальной документацией.

Ниже приведён пример минимальной настройки контроллера:

config/kraft/controller_i1.properties

# Роль ноды
process.roles=controller

# Уникальный в рамках кластера идентификатор ноды, должен быть числом
# Для двух других контроллеров значения будут соответственно 2 и 3
node.id=1

# Описание кворума контроллеров
# Идентификатор участника кворума имеет следующий вид:
# {node.id}@{host}:{port}
controller.quorum.voters=1@localhost:9093,2@localhost:9193,3@localhost:9293

# В режиме KRaft нужно указывать, как будут называться слушатели-контроллеры
controller.listener.names=CONTROLLER

# Другие два контроллера будут запускаться на портах 9193 и 9293 соответственно
listeners=CONTROLLER://:9093

# Директория данных ноды, для других контроллеров директории тоже будут свои
log.dirs=/tmp/kraft-controller-logs-i1

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

Так как все ноды разворачиваются на одной системе, значения свойств listeners и log.idrs должны быть разными в файлах свойств для разных нод. Для двух остальных контроллеров нужно будет создать аналогичные файлы. Так же файлы свойств нужно создать и для брокеров:

config/kraft/broker_i4.properties

# Роль ноды
process.roles=broker

# Уникальный в рамках кластера идентификатор ноды, должен быть числом
# Для двух других брокеров значения будут соответственно 5 и 6
node.id=4

# Описание кворума контроллеров
controller.quorum.voters=1@localhost:9093,2@localhost:9193,1@localhost:9293

# Другие два брокера будут запускаться на портах 9192 и 9292 соответственно
listeners=PLAINTEXT://localhost:9092

# Директория данных ноды, для других брокеров директории тоже будут свои
log.dirs=/tmp/kraft-broker-logs-i1

inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

Для двух остальных брокеров нужно создать аналогичные файлы.

Теперь нужно сгенерировать идентификатор кластера, сделать это можно следующей командой:

$ bin/kafka-storage.sh random-uuid
8zxAHVOAQySnpi6uZg5HIg

# Для удобства можно сохранить значение в переменную
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Далее нужно отформатировать директории логов, которые указаны в файлах свойств в свойстве log.dirs, например для брокера с идентификатором 4 команда будет следующей:

$ bin/kafka-storage.sh format --config config/kraft/broker_i4.properties --cluster-id $KAFKA_CUSTER_ID
Formatting /tmp/kraft-broker-logs-i1 with metadata.version 3.8-IV0.

Сделать это нужно для всех нод кластера.

Теперь всё готово к запуску кластера, каждую ноду нужно запустить, сначала контроллеры, затем — брокеры. Пример команды запуска для первого контроллера:

# Для запуска каждого контроллера и брокера нужно выполнить команду
$ bin/kafka-server-start.sh config/kraft/controller_i1.properties

После запуска всех нод можно проверить состояние кластера:

$ bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status
ClusterId:              8zxAHVOAQySnpi6uZg5HIg
# Контроллер с идентификатором 1 является лидером
LeaderId:               1
LeaderEpoch:            12
HighWatermark:          3251
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
# В кластере 3 контроллера с идентификаторами 1, 2 и 3
CurrentVoters:          [1,2,3]
# В кластере 3 брокера с идентификаторами 4, 5 и 6
CurrentObservers:       [4,5,6]

Теперь для проверки можно создать топик, партиции которого будут реплицироваться на трёх нодах:

# Создадим топик
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic sandbox --partitions 5 --replication-factor 3
Created topic sandbox.

# Проверим топик
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic sandbox
Topic: sandbox        TopicId: rgP_O6HbTSiLCarTFIoZWA        PartitionCount: 5        ReplicationFactor: 3        Configs:
        Topic: sandbox        Partition: 0        Leader: 3        Replicas: 3,1,2        Isr: 3,1,2        Elr: N/A        LastKnownElr: N/A
        Topic: sandbox        Partition: 1        Leader: 1        Replicas: 1,2,3        Isr: 1,2,3        Elr: N/A        LastKnownElr: N/A
        Topic: sandbox        Partition: 2        Leader: 2        Replicas: 2,3,1        Isr: 2,3,1        Elr: N/A        LastKnownElr: N/A
        Topic: sandbox        Partition: 3        Leader: 3        Replicas: 3,2,1        Isr: 3,2,1        Elr: N/A        LastKnownElr: N/A
        Topic: sandbox        Partition: 4        Leader: 1        Replicas: 1,3,2        Isr: 1,3,2        Elr: N/A        LastKnownElr: N/A

По Leader и Replicas можно видеть, как настроена репликация патриций.

Кластер Kafka с KRaft в Docker

Apache Kafka в контейнерах запускается в режиме KRaft, для настройки каждой ноды мы можем использовать файлы свойств и переменные окружения.

Для запуска ноды Kafka с настройками из файла нужно смонтировать файл свойств в директорию /mnt/shared/config в контейнере. Допустим есть файл с параметрами ноды /opt/kafka/config/docker/controller-i1/server.properties, тогда команда для запуска будет выглядеть следующим образом:

$ docker run -v /opt/kafka/config/docker/controller-i1/:/mnt/shared/config -p 9093:9093 --name kafka-ctrl-i1 apache/kafka:3.8.1

Вместо файла свойств можно использовать переменные окружения. Переменные окружения соответствуют параметрам из файла свойств, имеют префикс KAFKA_, а точки заменены на _, например, переменная окружения KAFKA_NODE_ID соответствует свойству node.id.

Запуск контроллера в контейнере с переменными окружения выглядит следующим образом:

$ docker run -p 9093:9092 --name kafka-ctrl-i1 \
  -e KAFKA_PROCESS_ROLES=controller \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@172.17.0.1:9093,2@172.17.0.1:9193,3@172.17.0.1:9293 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=CONTROLLER://:9092 \
  -e KAFKA_LOG_DIRS=/tmp/ctrl \
  -e KAFKA_NUM_NETWORK_THREADS=3 \
  -e KAFKA_NUM_IO_THREADS=8 \
  -e KAFKA_SOCKET_SEND_BUFFER_BYTES=102400 \
  -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400 \
  -e KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600 \
  -e KAFKA_NUM_PARTITIONS=5 \
  -e KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_LOG_RETENTION_HOURS=168 \
  -e KAFKA_LOG_SEGMENT_BYTES=1073741824 \
  -e KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000 \
  apache/kafka:3.8.1

Как видно, localhost я заменил на 172.17.0.1 — адрес хост-системы из сети 172.17.0.0/16 для Docker, если у вашей хост-системы другой адрес в сети Docker, то замените 172.17.0.1 на него. Поскольку все ноды разворачиваются изолированно в своих контейнерах, можно использовать одни и те же директории данных и порты в настройках.

Если требуется задать идентификатор кластера, отличный от стандартного, то это можно сделать при помощи переменной кружения CLUSTER_ID.

Проверить состояние кластера можно, выполнив следующую команду на одном из контроллеров:

$ docker exec kafka-ctrl-i1 /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status

Ошибка UnknownHostException

Для брокеров же требуется дополнительная настройка, так как с настройками по умолчанию при попытке подключения к Kafka из-за пределов контейнера вы получите ошибку подключения вроде такой:

[2024-11-12 12:54:06,791] WARN [AdminClient clientId=adminclient-1] Error connecting to node ebb8ea136b33:9092 (id: 6 rack: null) (org.apache.kafka.clients.NetworkClient) 
java.net.UnknownHostException: ebb8ea136b33

Даже если вы используете ip-адрес в качестве адреса брокера, через который взаимодействуете с кластером, то внутри кластера всё равно используются названия хостов. И клиент, получив от брокера информацию о кластере, пытается взаимодействовать с кластером, используя названия хостов. В данном случае это внутренние для Docker названия хостов, о которых внешний клиент ничего не знает, что и приводит к ошибке.

Чтобы решить эту проблему, нужно правильно сконфигурировать параметры listeners и advertisedListeners (KAFKA_LISTENERS и KAFKA_ADVERTISED_LISTENERS в apache/kafka и KAFKA_CFG_LISTENERS и KAFKA_CFG_ADVERTISED_LISTENERS в bitnami/kafka).

$ docker run -p 29092:9092 --name kafka-ctrl-i1 \
  -e KAFKA_PROCESS_ROLES=broker \
  -e KAFKA_NODE_ID=4 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@172.17.0.1:9093,2@172.17.0.1:9193,3@172.17.0.1:9293 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:29092 \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
  -e KAFKA_LOG_DIRS=/tmp/ctrl \
  -e KAFKA_NUM_NETWORK_THREADS=3 \
  -e KAFKA_NUM_IO_THREADS=8 \
  -e KAFKA_SOCKET_SEND_BUFFER_BYTES=102400 \
  -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400 \
  -e KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600 \
  -e KAFKA_NUM_PARTITIONS=5 \
  -e KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_LOG_RETENTION_HOURS=168 \
  -e KAFKA_LOG_SEGMENT_BYTES=1073741824 \
  -e KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000 \
  apache/kafka:3.8.1

Порт 19092 будет использован для взаимодействий внутри Docker, а 9092 будет прокинут на порт 29092 в хост-систему для внешних клиентов. Для остальных двух сервисов меняться будет только порт хост-системы: 39092 и 49092.

Добавьте описание

Кластер Kafka с KRaft в Docker Compose

При помощи Docker Compose вы можете описать весь кластер одним файлом:

name: kafka-cluster

networks:
  kafka:
    name: kafka

services:
  controller-i1:
    image: apache/kafka:3.8.1
    ports:
      - 9093:9093
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_LOG_DIRS: /tmp/ctrl
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_NUM_PARTITIONS: 5
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000

  controller-i2:
    image: apache/kafka:3.8.1
    ports:
      - 9193:9093
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: controller
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_LOG_DIRS: /tmp/ctrl
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_NUM_PARTITIONS: 5
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000

  controller-i3:
    image: apache/kafka:3.8.1
    ports:
      - 9293:9093
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: controller
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_LOG_DIRS: /tmp/ctrl
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_NUM_PARTITIONS: 5
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000

  broker-i1:
    image: apache/kafka:3.8.1
    hostname: broker-i1
    ports:
      - 29092:9092
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: broker
      KAFKA_NODE_ID: 4
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i1:19092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_DIRS: /tmp/broker
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
    depends_on:
      - controller-i1
      - controller-i2
      - controller-i3

  broker-i2:
    image: apache/kafka:3.8.1
    hostname: broker-i2
    ports:
      - 39092:9092
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: broker
      KAFKA_NODE_ID: 5
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i2:19092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_DIRS: /tmp/broker
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
    depends_on:
      - controller-i1
      - controller-i2
      - controller-i3

  broker-i3:
    image: apache/kafka:3.8.1
    hostname: broker-i3
    ports:
      - 49092:9092
    networks:
      - kafka
    environment:
      KAFKA_PROCESS_ROLES: broker
      KAFKA_NODE_ID: 6
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093
      KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i3:19092,PLAINTEXT_HOST://localhost:49092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_DIRS: /tmp/broker
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
    depends_on:
      - controller-i1
      - controller-i2
      - controller-i3

Поскольку Docker Compose позволяет обращаться к сервисам по их названию, в описании кворума я использовал названия сервисов. К сожалению использовать scale и уменьшить таким образом файл в 3 раза не получится, так как каждая нода должна иметь уникальный в рамках кластера идентификатор.

Полезные ссылки

Report Page