Событийно-ориентированная архитектура
https://t.me/ai_machinelearning_big_dataЧто такое «событийно-ориентированная архитектура»?
Событийно-ориентированная архитектура — это шаблон проектирования с применением событий для запуска и передачи изменений между компонентами системы. Службы здесь взаимодействуют, обмениваясь событиями, то есть сообщениями о наступлении события или изменении состояния.
Преимущества событийно-ориентированной архитектуры
- Слабая связанность: службы разделены, чем обеспечивается независимость и модульность.
- Масштабируемость: службы масштабируются независимо, исходя из потребности в событиях, чем повышается общая производительность системы.
- Асинхронная обработка: событиями обеспечиваются асинхронное взаимодействие, уменьшение задержки и времени отклика.
- Порождение событий: поддерживается естественным образом, ведь состояние системы определяется последовательностью прошлых событий.
- Гибкость: добавление новых служб или изменение имеющихся не сказывается на всей системе.
Kafka
Apache Kafka — это распределенная потоковая платформа для создания конвейеров данных в реальном времени и потоковых приложений, потоковой передачи событий с высокой пропускной способностью, отказоустойчивостью и масштабируемостью.
В основе Kafka — модель обмена сообщениями «публикация-подписка», где в темах отправителями публикуются сообщения-события, а получатели — для получения и обработки сообщений — подписываются на эти темы. События хранятся в Kafka неизменяемо и только с возможностью добавления. Данные обрабатываются как в реальном времени, так и ретроспективе.
Ключевые понятия Kafka
- Темы — каналы для публикации событий и подписки на них.
- Отправители — службы, которыми события создаются и отправляются в темы Kafka.
- Получатели — службы, которые подписываются на темы, ими обрабатываются входящие события.
- Разделы: каждая тема разбивается на разделы, чем обеспечиваются параллельная обработка и распределение нагрузки.
Роль Kafka в событийно-ориентированной архитектуре
Эта роль центральная, Apache Kafka — масштабируемая и долговечная шина событий для взаимодействия микросервисов. В событийно-ориентированной архитектуре микросервисы предназначены для отправки и получения событий, чем обеспечивается их асинхронное взаимодействие без прямых зависимостей одного от другого.
Ключевые роли
- Брокер событий. Kafka — центральный брокер событий, легко масштабируемое и надежное промежуточное ПО для обработки потоков событий. Им эффективно контролируются маршрутизация, хранение, распределение событий между службами.
- Журнал событий. В журналоподобном хранилище Kafka с возможностью только добавлять события долговременно сохраняются и остаются на настраиваемый период. Это идеальный источник истины для сценариев порождения событий, ведения контроля и воспроизведения данных.
- Разделение служб. В Kafka службы разделяются, поэтому при публикации событий отправителям не нужно «знать», в каких именно службах их сообщения получают. Получателям тоже не нужно «знать» об отправителях, где генерируются события.
- Надежность и отказоустойчивость. В Kafka надежно гарантируются долговечность данных и отказоустойчивость. События реплицируются в нескольких брокерах, чем обеспечивается высокая доступность даже в случае сбоев узлов.
- Масштабируемость. Распределенной архитектурой Kafka обеспечивается горизонтальное масштабирование за счет добавления брокеров в кластер. Так с высокой пропускной способностью обрабатываются потоки событий между многочисленными отправителями и получателями.
- Обработка данных в реальном времени обеспечивается предоставлением событий с низкой задержкой. Поэтому Kafka хороша для сценариев, где важна своевременная, немедленная обработка событий.
- Упорядочение событий по времени. В Kafka сохраняется последовательность событий каждого раздела. Это важно для поддержания согласованности при обработке событий в нескольких службах.
- Эволюция схемы и совместимость. В Kafka поддерживается сериализация схемы с фреймворками вроде Avro или Protobuf, чем обеспечиваются эволюция схемы, прямая и обратная совместимость, когда службы со временем развиваются.
Настройка среды
Spring Boot — популярный Java-фреймворк для создания микросервисов. За счет различных утилит и соглашений им упрощается процесс разработки с акцентом скорее на бизнес-логике, чем шаблонном коде. А обширной экосистемой Spring Boot облегчается интеграция с другими технологиями.
Начнем с создания простого проекта Spring Boot с интеграцией Kafka, в проекте же настроим необходимые зависимости.
Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml
или build.gradle
зависимости для Maven или Gradle:
<!-- Для Maven --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> // Для Gradle implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka'
Прежде чем применять Kafka для событийно-ориентированного взаимодействия, создадим темы и укажем количество разделов, темы подобны каналам для публикации событий отправителями и подписки получателей на эти события:
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopicConfiguration { @Bean public NewTopic exampleTopic() { return new NewTopic("example_topic", 1, (short) 1); } }
В этом примере классом @Configuration
определяется тема Kafka example_topic
с одним разделом и одним коэффициентом репликации, которым обеспечивается отказоустойчивость за счет репликации данных в нескольких брокерах.
Теперь продемонстрируем интеграцию Spring Boot с Kafka, создав простой проект с базовыми отправителем и получателем:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { private final KafkaTemplate<String, String> kafkaTemplate; private final String topic = "example_topic"; @Autowired public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { kafkaTemplate.send(topic, message); } }
Здесь для отправки сообщений-событий в тему Kafka example_topic
создан класс KafkaProducerService
. Сообщения публикуем с помощью KafkaTemplate
из Spring Kafka.
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "example_topic", groupId = "example_group") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
Класс KafkaConsumerService
, которым прослушивается example_topic
, принадлежит группе получателей example_group
. Всякий раз, когда сообщение публикуется в теме, для обработки входящего события вызывается метод receiveMessage
.
Реальные примеры
Обработка заказов
В реальной системе обработки заказов поток событий такой:
- Заказ размещен.
- Платеж обработан.
- Запасы обновлены.
В этом сценарии каждое событие обрабатывается разными микросервисами, которые для поддержания согласованности и разделения служб взаимодействуют через Kafka.
Этап 1. Настройка среды Kafka
Запускаем требуемый Kafka сервер ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Запускаем сервер Kafka:
bin/kafka-server-start.sh config/server.properties
Создаем две темы Kafka для событий заказа и платежных событий:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic order_events bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic payment_events
Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml
зависимости:
<dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Starter для Apache Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
Для взаимодействия служб создаем два класса с данными о событиях OrderEvent
и PaymentEvent
:
public class OrderEvent { private Long orderId; // Другие поля, конструкторы, геттеры, сеттеры } public class PaymentEvent { private Long orderId; private BigDecimal amount; // Другие поля, конструкторы, геттеры, сеттеры }
Создаем две службы-отправителя Kafka — одну для событий заказа, другую для платежных событий:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class OrderEventProducer { private static final String TOPIC = "order_events"; @Autowired private KafkaTemplate<String, OrderEvent> kafkaTemplate; public void sendOrderEvent(OrderEvent orderEvent) { kafkaTemplate.send(TOPIC, orderEvent); } } @Service public class PaymentEventProducer { private static final String TOPIC = "payment_events"; @Autowired private KafkaTemplate<String, PaymentEvent> kafkaTemplate; public void sendPaymentEvent(PaymentEvent paymentEvent) { kafkaTemplate.send(TOPIC, paymentEvent); } }
Создаем две службы-получателя Kafka — одну для событий заказа, другую для платежных событий:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class OrderEventConsumer { @KafkaListener(topics = "order_events", groupId = "order_group") public void handleOrderEvent(OrderEvent orderEvent) { // Обрабатываем событие заказа, например сохраняем его в базе данных System.out.println("Received Order Event: " + orderEvent); // Реализуем логику обработки заказов и обновления запасов } } import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class PaymentEventConsumer { @KafkaListener(topics = "payment_events", groupId = "payment_group") public void handlePaymentEvent(PaymentEvent paymentEvent) { // Обрабатываем платежное событие, например обновляем статус платежа System.out.println("Received Payment Event: " + paymentEvent); // Реализуем логику обработки платежа и обновления статуса заказа } }
Чтобы получать данные о заказах и платежах клиентов, создаем конечные точки REST, по получении данных отправляем события в Kafka:
@RestController public class OrderController { @Autowired private OrderEventProducer orderEventProducer; @PostMapping("/orders") public ResponseEntity<String> createOrder(@RequestBody OrderEvent orderEvent) { // Полученное событие заказа обрабатываем и отправляем в Kafka orderEventProducer.sendOrderEvent(orderEvent); return ResponseEntity.ok("Order created successfully"); } } @RestController public class PaymentController { @Autowired private PaymentEventProducer paymentEventProducer; @PostMapping("/payments") public ResponseEntity<String> processPayment(@RequestBody PaymentEvent paymentEvent) { // Полученное платежное событие обрабатываем и отправляем в Kafka paymentEventProducer.sendPaymentEvent(paymentEvent); return ResponseEntity.ok("Payment processed successfully"); } }
Запускаем приложение Spring Boot и с помощью клиента REST, например Postman, создаем заказы и обрабатываем платежи. Просматриваем в журналах события, полученные службами-получателями, и выполняемую логику обработки заказов и обновления запасов.
ИМЕЙТЕ В ВИДУ, ЧТО ЭТО УПРОЩЕННЫЙ ПРИМЕР: В РЕАЛЬНОМ ПРИЛОЖЕНИИ ДОБАВЛЯЮТСЯ ПРОВЕРКИ, ОБРАБОТКА ОШИБОК, ДОПОЛНИТЕЛЬНЫЕ СЛУЖБЫ ДЛЯ ОБРАБОТКИ ЗАКАЗОВ, ПЛАТЕЖНЫХ ШЛЮЗОВ, УЧЕТА ЗАПАСОВ.
Интернет вещей и обработка сигналов датчиков
Создадим приложение для Интернета вещей и обработки сигналов датчиков со Spring Boot и Kafka. Здесь упор делается на сборе сигналов датчиков, их агрегировании и генерировании предупреждений на основе условий. Опять же, пример упрощенный: в реальном сценарии добавляются сложность и обработка данных.
Для взаимодействия служб создаем класс с данными о событии сигналов датчиков SensorDataEvent
:
public class SensorDataEvent { private String sensorId; private double value; // Другие поля, конструкторы, геттеры, сеттеры }
Чтобы отправлять события сигналов датчиков, создаем службу-отправитель Kafka:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class SensorDataEventProducer { private static final String TOPIC = "sensor_data_events"; @Autowired private KafkaTemplate<String, SensorDataEvent> kafkaTemplate; public void sendSensorDataEvent(SensorDataEvent sensorDataEvent) { kafkaTemplate.send(TOPIC, sensorDataEvent); } }
Чтобы получать события сигналов датчиков, создаем службу-получателя Kafka и агрегатор для агрегирования данных:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class SensorDataEventConsumer { @Autowired private DataAggregator dataAggregator; @KafkaListener(topics = "sensor_data_events", groupId = "sensor_data_group") public void handleSensorDataEvent(SensorDataEvent sensorDataEvent) { // Полученное событие сигналов датчиков обрабатываем и передаем агрегатору dataAggregator.aggregate(sensorDataEvent); } } import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class DataAggregator { private final Map<String, Double> sensorDataMap = new HashMap<>(); public void aggregate(SensorDataEvent sensorDataEvent) { // Агрегирование данных датчика по его идентификатору double currentValue = sensorDataMap.getOrDefault(sensorDataEvent.getSensorId(), 0.0); double aggregatedValue = currentValue + sensorDataEvent.getValue(); sensorDataMap.put(sensorDataEvent.getSensorId(), aggregatedValue); // Проверяем, превышен ли агрегированным значением определенный порог, и генерируем предупреждение if (aggregatedValue > 1000) { generateAlert(sensorDataEvent.getSensorId(), aggregatedValue); } } private void generateAlert(String sensorId, double aggregatedValue) { System.out.println("ALERT! Sensor ID: " + sensorId + ", Aggregated Value: " + aggregatedValue); // Реализуем логику генерирования предупреждений, например отправки уведомлений или запуска действий } }
Чтобы получать данные датчика с устройств Интернета вещей, создаем конечные точки REST, по получении данных отправляем события в Kafka для дальнейшей обработки и агрегирования:
@RestController public class SensorDataController { @Autowired private SensorDataEventProducer sensorDataEventProducer; @PostMapping("/sensors/{sensorId}/data") public ResponseEntity<String> sendSensorData(@PathVariable String sensorId, @RequestBody double value) { // Создаем и отправляем «SensorDataEvent» в Kafka SensorDataEvent sensorDataEvent = new SensorDataEvent(sensorId, value); sensorDataEventProducer.sendSensorDataEvent(sensorDataEvent); return ResponseEntity.ok("Sensor data received successfully"); } }
Запускаем Spring Boot и с помощью клиента REST, например Postman, тестируем приложение.
Обработка ошибок и повторные попытки
Обработка ошибок и повторные попытки — необходимые компоненты надежной, безотказной программной системы. Они важны для корректного восстановления приложений после неожиданных сбоев, переходных ошибок или отказа внешних служб. Обработка ошибок — это процесс устранения неожиданных или исключительных ситуаций, которые случаются при выполнении программы.
Повторные попытки — механизм многократного автоматического повторного выполнения неудачной операции или запроса. Их цель — обработка переходных ошибок, а они временные и устраняются повторением операции через небольшой интервал. Повторные попытки приходятся кстати при периодических сбоях ненадежных сетевых подключений или внешних служб.
ПОВТОРНЫЕ ПОПЫТКИ РЕАЛИЗУЮТСЯ РАЗНЫМИ СТРАТЕГИЯМИ.
ФИКСИРОВАННЫЙ ПОВТОР: ОПЕРАЦИЯ ПОВТОРЯЕТСЯ ФИКСИРОВАННОЕ КОЛИЧЕСТВО РАЗ С ПОСТОЯННЫМ ИНТЕРВАЛОМ МЕЖДУ ПОВТОРАМИ.
ЭКСПОНЕНЦИАЛЬНАЯ ЗАДЕРЖКА: ВО ИЗБЕЖАНИЕ ПЕРЕГРУЗКИ СИСТЕМЫ, ЕСЛИ ПРОБЛЕМА СОХРАНЯЕТСЯ, ИНТЕРВАЛ МЕЖДУ ПОВТОРАМИ УВЕЛИЧИВАЕТСЯ ЭКСПОНЕНЦИАЛЬНО.
СЛУЧАЙНАЯ ЗАДЕРЖКА: ЧТОБЫ РАССРЕДОТОЧИТЬ ПОВТОРНЫЕ ПОПЫТКИ И ИЗБЕЖАТЬ КОЛЛИЗИЙ ЗАПРОСОВ, В ИНТЕРВАЛ МЕЖДУ ПОВТОРАМИ ВВОДИТСЯ ЭЛЕМЕНТ СЛУЧАЙНОСТИ.
ВЫКЛЮЧАТЕЛЬ: ПОСЛЕ ОПРЕДЕЛЕННОГО КОЛИЧЕСТВА ПОСЛЕДОВАТЕЛЬНЫХ СБОЕВ АКТИВИРУЕТСЯ ВЫКЛЮЧАТЕЛЬ, И ПОВТОРНЫЕ ПОПЫТКИ ПРЕКРАЩАЮТСЯ. ПОСЛЕ ПЕРИОДА «ОХЛАЖДЕНИЯ» ПОПЫТКИ ВОЗОБНОВЛЯЮТСЯ.
ПОВТОР С ДЖИТТЕРОМ: ЧТОБЫ УМЕНЬШИТЬ ВЕРОЯТНОСТЬ ОДНОВРЕМЕННЫХ ПОПЫТОК НЕСКОЛЬКИХ КЛИЕНТОВ, ФИКСИРОВАННАЯ ИЛИ ЭКСПОНЕНЦИАЛЬНАЯ ЗАДЕРЖКА КОМБИНИРУЕТСЯ С ДЖИТТЕРОМ, СЛУЧАЙНЫМ ИНТЕРВАЛОМ.
Реализуем повторные попытки обработки событий с помощью Spring-Retry. Сначала добавим в pom.xml
или build.gradle
зависимость Spring-Retry:
<!-- Для Maven --> <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> // Для Gradle implementation 'org.springframework.retry:spring-retry'
Чтобы в случае сбоя обработка повторилась, обновляем получателя:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @Retryable( value = { Exception.class }, maxAttempts = 3, backoff = @Backoff(delay = 1000, maxDelay = 3000) ) @KafkaListener(topics = "example_topic", groupId = "example_group") public void receiveMessage(String message) { try { // Обрабатываем сообщение здесь System.out.println("Received message: " + message); } catch (Exception e) { throw new RuntimeException("Error processing message: " + message, e); } } }
В этом примере в метод receiveMessage
добавили аннотацию @Retryable
, указав максимум повторов — три — и интервал между ними — 1 сек. при максимуме в 3 сек.
Очередь недоставленных сообщений
Это очередь, в которой необработанные события отправляются на дальнейший анализ и отладку. Такая их проверка не сказывается на основном потоке обработки событий.
Чтобы переместить проблемные события в очередь недоставленных сообщений, настраиваем для них в Kafka отдельную тему:
@Configuration public class KafkaTopicConfiguration { @Bean public NewTopic exampleTopic() { return new NewTopic("example_topic", 1, (short) 1); } @Bean public NewTopic deadLetterTopic() { return new NewTopic("example_topic_dead_letter", 1, (short) 1); } }
Обновляя получателя, отправляем эти события в очередь недоставленных сообщений:
@Service public class KafkaConsumerService { @Retryable( value = { Exception.class }, maxAttempts = 3, backoff = @Backoff(delay = 1000, maxDelay = 3000) ) @KafkaListener(topics = "example_topic", groupId = "example_group") public void receiveMessage(String message) { try { // Обрабатываем сообщение здесь System.out.println("Received message: " + message); } catch (Exception e) { // Перемещаем проблемное событие в очередь недоставленных сообщений kafkaProducerService.sendMessageToDeadLetterTopic(message); } } }
Здесь, чтобы отправить необработанные сообщения в тему example_topic_dead_letter
, в KafkaProducerService
добавили метод sendMessageToDeadLetterTopic
.
Благодаря этим совершенствованиям конвейер обработки событий отказоустойчивее, с временными сбоями он справляется корректнее.
Масштабирование событийно-ориентированных микросервисов
Получатели и отправители в Kafka масштабируются увеличением числа экземпляров соответственных служб. Запуская несколько экземпляров получателей параллельно, мы обрабатываем события одновременно, повышая общую пропускную способность.
При масштабировании получателей важно учитывать разбиение на разделы. Разделы — это единица параллелизма Kafka, каждый раздел получается только одним получателем группы получателей. Поэтому для эффективной параллельной обработки должно быть достаточно разделов.
Группа получателей — получатели с одним идентификатором группы, ими распределяется нагрузка получения событий из одной и той же темы. Чтобы увеличить параллелизм, получатели в группу добавляются.
Пример
Чтобы масштабировать получателей, для каждого их экземпляра задаем уникальный instanceId
и добавляем в группу получателей несколько экземпляров одного получателя:
@Service public class KafkaConsumerService { @Value("${kafka.instanceId}") private String instanceId; @KafkaListener( topicPartitions = @TopicPartition( topic = "example_topic", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "1", initialOffset = "0"), // При необходимости добавляем разделы и смещения } ), groupId = "example_group" ) public void receiveMessage(String message) { System.out.println("[" + instanceId + "] Received message: " + message); } }
В этом примере разделы и их начальные смещения для этого экземпляра получателя указаны аннотацией @KafkaListener
с topicPartitions
. Каждым экземпляром получателя обрабатываются события разных разделов, чем увеличивается параллелизм.
groupId
: этим атрибутом указывается группа получателей, к которой относится получатель. Нагрузка получения сообщений из разделов темы распределяется между получателями одной группы.
Чтобы создать несколько экземпляров службы получателей, запускаем приложение с разными значениями instanceId
при помощи профилей Spring:
@SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance1"); SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance2"); // При необходимости добавляем экземпляры } }
Здесь с профилями instance1 и instance2 запущено два экземпляра приложения, у каждого из которых уникальный instanceId
. События получаются ими из разных разделов, чем повышается параллелизм.
Количество разделов example_topic
увеличиваем в KafkaTopicConfiguration
:
@Configuration public class KafkaTopicConfiguration { @Bean public NewTopic exampleTopic() { return new NewTopic("example_topic", 4, (short) 1); } @Bean public NewTopic deadLetterTopic() { return new NewTopic("example_topic_dead_letter", 1, (short) 1); } }
В этом примере количество разделов example_topic
увеличили до четырех, усовершенствовав распределение нагрузки и параллельную обработку.
Запуская несколько экземпляров получателей и увеличивая количество разделов, мы эффективно масштабируем событийно-ориентированные микросервисы для работы с более высокой пропускной способностью.
Заключение
Архитектура событийно-ориентированных микросервисов со Spring Boot и Kafka — это мощный, масштабируемый подход к построению современных распределенных систем. Используя преимущества событийно-ориентированного взаимодействия, слабой связанности и асинхронной обработки, разработчики создают гибкие, отказоустойчивые, адаптивные приложения под требования современного динамичного делового ландшафта.