Небольшая архитектурная история из Лавки
Vanya KhodorКогда вы добавляете в корзину в Лавке товар, фронт дёргает эндпоинт в специальном сервисе (давайте будем называть его cart). Корзина эта хранится в бд сервиса cart. Пару лет назад информация про активные корзины (корзины, которые обновлялись за последние пару часов) понадобилась коллегам из логистики, чтобы предсказывать сурж (или по-простому, высокий спрос: это та самая фиолетовая молния в различных сервисах Яндекса, когда случается наплыв пользователей).

Чем больше активных корзин, тем, вероятно, больше заказов скоро будет совершено, что повлечёт за собой нехватку курьеров и сопутствующие неудобства как для сервиса, так и для пользователей. Потому хочется предсказывать такие случаи и регулировать количество заказов различными инструментами (например, повышать цену доставки).
Конечно, никому не нравится повышенная цена на доставку. Но мы умеем сообщать пользователям, что условия доставки стали лучше, что позволяет и нам не потерять клиентов, и пользователю получить свой любимый Вупи Пай из Лавки. Для этого есть сервис (назовём его условно order-help), который проверяет, стали ли условия доставки лучше, и, если да, посылает пользователю уведомление.
Чтобы доставлять информацию о корзинах из сервиса cart в сервис surge, был реализован сервис cart-replica. Флоу отправки данных выглядел следующим образом:

Вот они слева направо:
- сервис cart, в базе которого лежат корзины. В сервисе есть кеш recents-carts-cache (на каждый под), который раз в пару минут подтягивает к себе корзины за последние несколько часов. Эндпоинт /get/by-warehouse возвращает все корзины для конкретного физического склада.
- сервис cart-replica аналогично имеет эндпоинт /get/by-warehouse, который смотрит в LRU-кеш carts-cache (про кеши писал тут(https://t.me/thisnotes/156)). Если информация про корзины для конкретного склада есть в кеше, то они возвращаются. Иначе делается запрос в cart, ответ от которого кладётся в carts-cache и заодно возвращается одному из клиентов (surge, order-help).
Пару лет эта схема работала замечательно. Но с ростом нагрузки и количества пользователей, мы начали испытывать проблемы. Количество корзин в кеше recents-carts-cache значительно выросло. Это обернулось для нас очень неприятным последствием: частые обновления кеша (с каждого пода) суммарно сильно нагружали сеть между базой и сервисом. Из-за этого мы не могли больше горизонтально масштабировать наш сервис (i.e. добавлять ещё больше подов), т.к. мы просто приложили бы нашу базу! В наших реалиях (как и в реалиях других высоконагруженных бекендов) горизонтальное масштабирование – очень важный инструмент. В текущем виде это была бомба замедленного действия. С ростом нагрузки мы рано или поздно упёрлись бы в потолок, от которого не смогли бы спастись увеличением подов сервиса. Ну и вдобавок сам кеш стал занимать слишком много оперативной памяти. Непорядок!
Давайте шаг за шагом попробуем избавиться от всех существующих проблем и прийти к более масштабируемому решению.
Первая проблема в нашей цепочке – хранение тяжёлых кешей в сервисе cart.
В решении этой проблемы нам поможет паттерн Change Data Capture. Суть подхода заключается в том, что сервис, делающий изменения, отправляет куда-либо все изменения данных (обычно в очередь вроде Kafka; настоятельно рекомендую прочитать статью, если вдруг вы не знаете, как Kafka устроена). Консьюмеры этой очереди (читатели) могут вычитывать данные и обрабатывать их любым образом. Обычно эти изменения складываются в некоторый журнал. Иногда это помогает поддерживать текущий стейт, применяя те же самые изменения к своему снапшоту данных.
А иногда можно хранить сами изменения и не применять их сразу к существующему снапшоту данных, если это не требуется. Во этом случае вы можете решать задачу получения снапшота на какой-то момент времени, если примените все изменения до этого самого момента и не примените остальные. Но это не про нас в данном случае.
Правда мы отправляем всё же не изменения, а просто очередное состояние корзины. Данных меньше и делать это проще: не нужно уметь “применять” эти изменения, – просто берём самое актуальное. Так что от CDC тут скорее Capture, чем Change.
После такого изменения наша архитектура выглядит так:

Как только создаётся новая или изменяется существующая корзина, сервис cart отправляет событие в logbroker (это наш внутренний аналог Kafka, но со своими наворотами), откуда специальная компонента сервиса cart-replica вычитывает данные и обновляет кеш на поде.
Тут видна первая недоработка. Т.к. консьюмеры читают данные из разных партиций, а в разных партициях лежат разные сообщения, каждый консьюмер получает свой уникальный батч данных. Получается, читающий под может забрать сообщения и сложить их в кеш, а другие уже не смогут. Следовательно в каждом кеше каждого пода сервиса cart-replica будет неполный стейт, что повлечёт за собой некорректные ответы сервисам-клиентам. Тогда горизонтально масштабировать сервис cart-replica мы теперь не можем, ведь в таком сетапе схема работает только с одним подом cart-replica! Нехорошо.
Давайте добавим базу как точку синхронизации:

Уже лучше. Теперь у нас есть точка синхронизации между подами, из-за чего мы можем сделать на каждом поде кеш над базой и отдавать данные клиентам. Но это не всё!
Чтение из очередей Kafka/logbroker гарантировано последовательные, а значит есть ограничения на операции, которые консьюмер выполняет. Например, если консьюмер делает какую-то операцию долго или операция легко может упасть, то сообщение не будет обработано и вернётся назад в очередь, после чего опять поступит на обработку. Из-за этого все остальные сообщения, которые лежат за обрабатываемым, будут терпеливо ждать своей очереди. Подобные проблемы могут приводить к накапливанию невычитанных данных и отставании в их поставке консьюмерам.
В подобных решениях есть интуитивный гайдлайн делать обработку данных консьюмером лёгкой. Писать в базу потенциально тяжело. Там могут быть всякие индексы, которые должны перестроиться. А ещё ваши запросы в целом могут падать. Потому нужно эту операцию упростить, добавив какой-то промежуточный шаг. Давайте воткнём ещё одну очередь!

В этот раз мы используем STQ (наш внутренний аналог Amazon SQS). STQ представляет собой что-то вроде пула задач, которые можно разгребать параллельно. Т.е. когда consumer положил событие изменения данных в STQ, каждый под сервиса теперь может забирать задачу и выполнять её. При этом операция положить в STQ довольно лёгкая, однако STQ не гарантирует порядок выполнения задач, из-за чего подобные проблемы с обработкой нам не страшны.
Надо учитывать этот факт и при обновлении данных в базе. Например, если сначала вы обновили стейт в базе более новым изменением, а потом выполняете задачу с более старым изменением, то вы можете затереть первое вторым. Соответственно надо аккуратно написать запрос, чтобы такого не происходило.
Масштабировать такое решение гораздо проще, т.к. раздавать задачи STQ может гораздо быстрее, чем параллельное чтение из logbroker. И нет никаких ограничений, как с партициям у logbroker.
Конечно, можно было бы сразу воткнуть STQ между сервисами cart и cart-replica, но концептуально это добавляет сильную связность между этими сервисами + блокирует нам точку масштабирования, если вдруг мы захотим добавить ещё одного клиента, читающего из logbroker изменения данных: разные клиенты (не поды одного сервиса, а именно разные сервисы) могут читать из logbroker независимо одни и те же данные, когда STQ позволяет лишь положить задачу в очередь и взять её оттуда, после чего она из очереди удаляется.
После того, как данные доехали в базу, кеш на каждом поде подтягивает к себе изменения (делать это конечно же можно не только полной вычиткой данных, но и инкрементальными обновлениями). И можно дёргать эндпоинт над этим кешом, чтобы получить данные. Кеш хранит данные за последние пару часов, чтобы не хранить полную копию в базе.
Кстати, про базу!
Надо же её иногда чистить от неактуальных данных. Воткнём-ка ещё какой-нибудь garbage collector, который будет удалять все данные старше некоторого срока (нам же нужны данные за последние несколько часов):

Вот такая красота получилась.
Что в итоге?
- починили горизонтальное масштабирование нашего сервиса cart;
- а ещё срезали с него лишнюю нагрузку, которая нужна была для внутренних отложенных задач, но при этом могла задевать сервис, находящийся под постоянной пользовательской нагрузкой;
- срезали использование оперативной памяти в cart примерно на 60% для каждого пода;
- ускорили каждого клиента примерно на 72% (т.к. теперь операция получения данных обходится без потенциальных походов в другой сервис);
- убрали неиспользуемые клиентами данные, что позволило хранить меньше данных в cart-replica;
- я немножко преисполнился.