Webhook Buffer

Webhook Buffer


Alexandr Kruchkov

Несколько лет назад мне предоставилась первая возможность разработки своей первой архитектуры с нуля.

Я только-только устроился на новую работу, начал разбираться с AWS, Azure, всей инфраструктуре и прочим-прочим. Это был мой первый практический опыт с AWS и нейронных сетей тогда не было совсем. До этой работы все мои знания Амазона были на уровне просмотра 89 часов уроков на ютубе. Я никогда до этой работы не заходил в консоль AWS.

Спустя время поступила большая глобальная, для меня, архитектурная задача "спасти бизнес, продукт, и чтобы всё недорого".

Немного удивился, но ладно, работа есть работа.

Для начала смотрю что у нас за проект, продукт, архитектура.
Убрав всё лишнее, я рисую для себя такую картину (я всегда рисую, когда знакомлюсь с чем-то новым, со времён школы и университета привычка).

Пообщался с коллегами, СТО и начал рисовать.
Вся интересующая меня архитектура продукта представляет:

  • База данных AWS Aurora
  • кубернетис от AWS - EKS
  • поды в кубернетисе, которые я для себя назвал Core
  • от кастомеров приходят обычные вебхуки (POST запросы с огромным JSON-payload внутри) и они заходят на UI интерфейс

Core был монолитным приложением, в нём был и backend и frontend и даже какие-то cronjob задачи.

То есть Core был и UI и бэк и обрабатывал вебхуки со стороны кастомеров. При получении webhook от кастомера, Core обрабатывал их и "что-то" отправлял или обновлял в базе данных.

Всё это работало месяцами, но стартап не может стоять на месте: мы либо летим в пропасть, либо двигаемся дальше. Движение дальше означает увеличивание нагрузки. Новая нагрузка и привнесла проблему всему стартапу.

В чём суть проблемы:

  • кастомеры время от времени или по расписанию рассылают вебхуки. Вебхуков много, количество может быть от двух до миллиона. Иногда миллионов 20. Иногда 5. А потом ещё час тишина.
  • так как Core то пока ещё монолит и обрабатывает ещё и UI, он нетороплив и ждёт завершения части транзакций от базы данных
  • в момент, когда прилетает, например 200к+ вебхуков, Core обрабатывает их, отправляет в базу, ждёт выполнения и Core сильно нагружен задачами, ему ещё и UI обрабатывать
  • само собой это влечёт выдачу 500, 502, 503, 504 ошибок в разных кейсах
  • UI тупит, клиенты жалуются, а самое страшное - мы теряем вебхуки. Кастомер НЕ пересылает их. Если Core затупило, то все приходящие вебхуки аннигилировались и мы никогда не узнаем, что в них было.

90% самой сути стартапа - обработка вебхука и бизнес не имеет права потерять ни одного. Потеря вебхуков = потеря сервиса = потеря бизнеса.

Надеюсь, проблема ясна.

Так, ну и какая же у меня задача?

Мне надо "сделать что-то", чтобы

  • не меняя НИ-ЧЕ-ГО в Core монолите, вебхуки не терялись и обрабатывались все, пусть и с задержкой
  • чтобы это проработало 2-6 месяцев, пока нанимают команду разработчиков и которые со временем распилят монолит

При этом увеличение инстанса БД ничего не даст, как и увеличение количества реплик Core приложения. Мы пробовали. Просто в данный момент цикла жизни стартапа это невозможно сделать. Совсем.

Честно говоря, я думал в то время слинять с конторы. Ну тут налицо попытка решить продуктовую проблему силами инфраструктуры, что крайне не эффективно. Однако в тот момент я был в добровольной ссылке в чужой стране(как и многие в то время), а значит я не имею права терять работу, семья не оценит. К тому же интересно попробовать что-то сделать самому с нуля, да ещё и не отвлекая разработчиков.

Ресёрч.

Для начала мне дали доступ к веб-панели одного из кастомеров, чтобы я мог имитировать создание реальных вебхуков. Я зашёл в веб-интерфейс, понатыкал на кнопки - вебхуки прилетели(по логам Core приложения).
Однако я даже не вижу его содержимого payload. Почитал в интернете, включил на dev расширенный дебаг входящих пакетов с JSON и снова отправил вебхук. Отлично, теперь у меня есть в чистом виде JSON, который кастомер отправляет, ну или как минимум, пример реального вебхука.
Я сымитировал curl команду с этим JSON – по логам и по UI у Core всё работает.

С имитацией запросы мы разобрались.

Дальше я уткнулся в рисование на разных для себя уровнях архитектуры, чтобы понять, как можно зацепиться за это.

Вот так примерно выглядела диаграмма трафика до моих действия. 

Так как же мне можно вклиниться со своей инфрой, чтобы ничего не менять на стороне Core.

Мне повезло и после тестов я понял, что ВСЕ вебхуки приходят на отдельный адрес /webhooks (на самом деле несколько, но в статье мы говорим про один)!

Первым делом я поднял новый AWS CloudFront.

DNS запись на CloudFlare теперь вела не на ALB/ingress адрес в кубернетис, а сперва на CloudFront, где был всего один Origin и default behavior направлял весь трафик в ALB/ingress.

Для чего же я это сделал? Чтобы мне можно было плавно сделать переключение, когда моё решение будет готово. Я планировал в будущем добавить ещё один origin (свой кусок архитектуры) и behavior в него при совпадении path = /webhooks.

То есть весь трафик,как и раньше приходят на Core, а весь трафик на /webhooks будет уходить на какой-то, пока не готовый, мой буфер.

AWS MSK (Kafka)

Теперь надо выбрать что же у нас будет в качестве «буфера».

До работы в этом стартапе я работал в замечательной, на тот момент, компании Иннотех на банк ВТБ и у меня был опыт с MQ инструментами, я без сомнения выбрал Kafka в качестве основного инструмента.

Apache Kafka - это распределённая платформа для обработки и передачи больших объёмов данных в реальном времени. Она работает как брокер сообщений, позволяя системам обмениваться данными через очереди (topics). Kafka обеспечивает высокую производительность, масштабируемость и надёжность, сохраняя сообщения на диске и поддерживая их потоковую обработку. Используется для логов, аналитики, стриминга данных и интеграции приложений.

Теперь у меня есть возможность создать новый кластер MSK Kafka.

Не буду вдаваться в технические характеристики кластера, это просто кластер, просто через Terraform. Я к нему лично подключился в качестве косньюмера и продюсера – всё работает. С кластером почти закончили.

Kafa-rest-proxy

Я «умею» разделять трафик через behavior в CloudFront, у меня есть «хранилище» для вебхуков - Kafka, но теперь мне надо соединить их.

Погрузился на несколько дней в чтение документации, статей в интернете и принял решение поставить kafka-rest промежуточный сервис.

Confluent REST Proxy for Kafka — это инструмент, предоставляющий RESTful-интерфейс для взаимодействия с кластером Apache Kafka. Он упрощает работу с Kafka, позволяя производить и потреблять сообщения, просматривать состояние кластера и выполнять административные действия без использования нативного протокола Kafka или клиентских библиотек.

https://github.com/confluentinc/kafka-rest

То есть это просто новый микросервис, в котором есть ровно один докерфайл

FROM confluentinc/cp-kafka-rest:7.0.0

и тремя переменными "в какую кафку смотрит прокси"

envs:
  KAFKA_REST_HOST_NAME: rest-proxy
  KAFKA_REST_BOOTSTRAP_SERVERS: b-2.kafka.us-east-1.amazonaws.com:9092,b-1.kafka.us-east-1.amazonaws.com:9092,b-3.kafka.us-east-1.amazonaws.com:9092
  KAFKA_REST_LISTENERS: http://0.0.0.0:8082

Я при помощи curl тестовыми запросами проверил – ничего не заработало, в топиках сообщения не появились.

Оказывается для рест прокси нужен другой формат.

Снова документация, тесты и пришел к тому, что надо менять формат PAYLOAD.

По сути правильный формат был

{

 "records": [

  {

   "value": [

////////////////////

   ]

  }

 ]
}

Мне это не понравилось, но уже начал и поздно менять. Ладно, буду иметь в виду, что формат сообщения у нас другой на рест прокси кафки.

API Gateway

У нас есть входная точка перед кафкой – рест прокси, а теперь нам надо как-то от CloudFront отправлять вебхуки в сообщения кафки.

Докментация, гугл, телеграм чаты, реддит – прихожу к ApiGateway.

AWS API Gateway — это сервис Amazon Web Services для создания, управления и масштабирования API. Он позволяет разработчикам создавать RESTful и WebSocket API для взаимодействия между клиентами (например, веб-приложениями) и серверными сервисами (например, AWS Lambda, EC2 или внешними системами).

Пожалуй эта была самая тяжелая часть этой работы – aws Api gateway mapping template (Velocity). VTL.

Знай, сколько времени на это уйдёт, я бы выбрал Lambda или SQS.
Но я их не выбрал тогда.

Мою "любовь" к Velocity можно выразить картинкой

"А что не так?" спросите вы и я вынужден вам показать код, который использовался в данной связке.

Он не подсвечивается ни в одном IDE, для него нет стиля, как и у него нет стиля. Просто смотрите и поймите сколько времени я убил на эту реализацию.

## Override header Content-Type

#set($context.requestOverride.header.Content-Type = "application/vnd.kafka.json.v2+json")

## Override header Accept

#set($context.requestOverride.header.Accept = "*/*")

## Remove "staging" word from the path

#set($pathWithoutStaging = $context.path.replace("/staging", ""))

## Remove "{ID+}" word from the path

#set($pathWithoutID = $pathWithoutStaging.replace("{ID+}", "$input.params('ID')"))

## Read all keys and values form "query" params and save as "url" variable

#set($url = "#foreach($key in $input.params().querystring.keySet())#if($foreach.index > 0)&#end$util.urlEncode($key)=$util.urlEncode($input.params().querystring.get($key))#end")

## Create new JSON file for Body

{

  "records": [

    {

      "value": [

        {

          "body": $input.json('$'),

          "url": "$pathWithoutID?$url",

          "sourceIP": "$context.identity.sourceIp",

          "userAgent": "$context.identity.userAgent"

        }

      ]

    }

Всё, что начинается со знака решётки это не комментарий, а рабочий код Ну вот такой велосити. Часть кода уже легаси, пишу на память, но общая идея такая:

  1. Заголовок Content-Type: Устанавливает Content-Type в application/vnd.kafka.json.v2+json для Kafka rest proxy. Без этого работать не будет.
  2. Заголовок Accept: Устанавливает Accept в */* для принятия любого контента.
  3. Удаление "/staging": Убирает /staging из пути (например, /staging/webhook → /webhook). Легаси костыль.
  4. Замена "{ID+}": Меняет {ID+} в пути на значение параметра ID (например, /webhook/{ID+} → /webhook/123).
  5. Query-параметры: Собирает параметры (например, app=1&id=44) в строку key=value, кодируя через $util.urlEncode().
  6. JSON-тело: Формирует JSON для Kafka с полями: body (тело запроса), url (путь+параметры), sourceIP (IP клиента), userAgent (User-Agent).

По сути я беру всё, что получил от вебхука, формирую новый JSON и отправляю в кафк-рест-прокси, а она, в свою очередь, в кафку.

Пора рисовать:

  • трафик идёт на CloudFlare (как и раньше)
  • от CloudFlare он идёт на CloudFront (я добавил)
  • от CloudFront трафик идёт по матчингу. Если /*, то как и раньше в ALB/Ingress и затем в Core POD. Если /webhook, то отправляется в API Gateway, где происходит парсинг самого original payload - JSON, парсинг URL, параметров, хидеров и формирование нового JSON с семантическим названием парметром и отправляются на URL от kafka-rest-proxy
  • kafka-rest-proxy работает просто как прокся. Всё, что получила, отправляет в топик AWS MSK (Kafka)

Само собой я не тестировал это на реальном пути /webhook, для тестов у меня был /webhook-test ендпойнт и по нему всё роутилось.

Невероятно, но это работало. Отправляю curl запрос на внешний адрес с путем /webhook-test - и сообщение в кафке!

Golang application.

Теперь нам остался почти последний этап и последний элемент - кафка консюмер .

Мне надо было написать такое приложение, которое подключалось бы к кафке, читало сообщение, транформировало JSON в "оригинальный запрос" и отправляло бы в Core. Само собой не просто так: если Core не отвечает, то ждать и пробовать снова. Ведь мне нельзя терять вебхуки.

На написание приложения и тесты у меня ушла, наверное, неделя.

Я просидел во всех чатах гошки, замучал всех знакомых, но сделал это. Код был ПРИМЕРНО такой:

https://github.com/kruchkov-alexandr/golang-kafka-consumer/blob/main/main.go

И снова невероятное везение - оно работает.

Мониторинг.

К тому времени я был уже опытный инженер, и я понимал, что мне надо мониторить весь этот зоопарк новых микросервисов в кубернетисе и, особенно, часть с Kafka.

Поставил kafka exporter, начал собирать метрики напрямую с кафки - самая основная метрика это консюмер лаг. Я обязан понимать, что сейчас лаг выше или ноль.

В общем и целом я определился с тремя основными графиками. Продакшн рейт (как часто и много прилетают вебхуки) и какой лаг чтения у топиков/консюмер групп). Разные линии - разные топики и эндпойнты для каждого из кастомеров.

Так же у меня был дашборд для API Gateway, который впоследствии был объединён в общую борду "вебхуки". Там и поды кубера, и апигейтвей, и кафка и чего-то ещё. Общая картина по "буферу".

Предфинальные правки

Давайте ещё раз нарисуем всю схему и подпишу как всё работает в новой схеме.

  • кастомеры заходят и на UI и могут отправлять автоматически вебхуки
  • весь трафик идет на CloudFlare, как самое дешёвое решение с DDOS для стартапа
  • DNS записи CloudFlare ведут на CloudFront
  • CloudFront имеет два origin. Если в URL+PATH есть /webhook, то трафик идёт на API Gateway, остальной трафик /* идёт, как и раньше, на EKS ingress через ALB
  • CloudFront отправляет трафик с /webhook в API Gateway
  • API Gateway сразу отвечает на все вебхуки с кодом 200 "всё ок, успешнео принял вебхук"
  • API Gateway, парсит все данные, что нужны и формирует новый JSON файл для отправки в kafka-rest-proxy (просто https://url ендпойнт)
  • kafka-rest-proxy работает в кубере и все входящие запросы отправляет как мессаджи в MSK(Kafka)
  • kafka-consumer читает сообщения в топиках кафка, парсит, имитирует "изначальный запрос и оригинальный пейлоад отправляет в Core". Если Core не отвечает, то пробует ещё раз.
  • нулевая потеря вебхуков с возможностью передвинуть офсет и перепрочитать если надо
  • все компоненты в кубернетисе с автоскейлером и скейлингом до 100 реплик
  • на MSK есть мониторинг и автоскейл хранилища + алерты
  • на уровне Core я не менял НИ-ЧЕ-ГО, как меня и просили
  • я не теряю НИ-ОД-НО-ГО вебхука, как меня и просили. 100% SLA (правда)


Проблемы и способы их решения

На протяжении всей этой истории конечно же у меня были проблемы.

Неверный формат JSON, экранирование чего-либо, нехватка диска на MSK у брокеров (когда ради теста пульнули 100млн сообщений, а диск был мелкий, ретеншн тайма не было вообще).

Пустая документация по VTL, баги с встроенными коннекторами, прямой доступ к кафка-рест-прокси, из-за чего пришлось делать AWS Private Link.

Обучение команды "что такое кафка" и простейшие инструкции по работе с топиками и FAQ при FCKUP.

Утечки памяти в голанг приложении, дубликатах сообщений (тогда я чот не думал про консюмер группы и офсеты).

Декодер и потеря пейлоада. Отсутствие хелсчека для проб и неверный триггер для скейлинга(CPU важнее, чем memory).

В последствии разделение на разные топики кафки по задачам/кастомерам.

Писать об этом можно бесконечно, конечно же это было.

Под всё это дело я конечно же писал RFC с диаграммами и планами.

История больше про саму реализацию казалось бы крайне недостижимой задачи.

Переключение

Настал момент, когда мою схему проверили по пулл реквестам(быстро одобрили, не вдаваясь в детали, ибо ничего не понимали) и начали переключение.

То есть правило behavior в CloudFront поменяли с /webhook-test на "боевой" /webhook.
Сразу не заработало - где-то опечатки, где-то чего не хватало, где-то прав. Ну так всегда бывает. Пофиксили все мелкие неточности.
На dev/stage поигрались - всё работает.

Затем всё повторили на проде - всё отлично!

Итоги

Вот так, не меняя ничего в самом приложении, а только лишь за счёт инфраструктуры(API Gateway + CloudFront + MSK) и внедрения двух новых микросервисов(кафка рест консюмер и кафка рест прокси), я смог решить задачу.

Совру по деньгам сколько это стоило в месяц, но если мне не изменяет память, то счёт стал не больше +350 долларов в месяц.

Задача была закрыта, бизнес счастлив как никогда, мы не теряем ни одного вебхука, на стороне приложения у нас ничего не изменено, ошибки 50* ушли, так, как мы всегда кастомеру отправляем 200 код, все сообщения в кафке, в случае любого факапа, мы даже можем сдвинуть оффсет для консюмер группы и перепрочитать мессаджи кафки с вебхуками. Задержка вебхуков на тот момент была не критична.

Честно говоря я был очень счастлив, что у меня был опыт такого внедрения. Пожалуй одна из тех задач, где я узнал с десяток технологий, многие ограничения и особенности, что позволило мне получить много опыта и очень глубоко нырнуть.

Я не смогу точно ответить почему я выбрал именно эти инструменты: почему кафка, а не раббит или нативный SQS, почему API Gateway, а не лямбда. Где-то были причины "я с этим ещё не работал", где-то "теоретически через год это будет дороже, чем это решение. Где-то банально "попробовал, не получилось, забросил".

Сама архитектура, насколько мне известно, проработала больше 2 лет вместо запланированных полгода "пока команду не наберут". Впоследствии часть было упразднено и заменили на SQS вместо кафки, переложив функционал чуть иначе.

Моя разработка совсем не инновация, я был не первый, да и не последний, кто такое делал, но это решение за копейки, без переделки всего Core сервиса позволилио бизнесу немного выдохнуть на тяжёлом для стартапа этапе и сосредоточить работу над фичами, не отвлекаясь на перформанс, что позволило выйти на новый раунд инвестиций.



Report Page