Создание оркестратора для событийно-ориентированного приложения с Golang и RabbitMQ

Создание оркестратора для событийно-ориентированного приложения с Golang и RabbitMQ

https://t.me/Golang_google

Чтобы сделать асинхронное приложение, брокера сообщений не достаточно. В правильной ситуации асинхронными сообщениями улучшаются общая пропускная способность, временна́я задержка и взаимодействие с пользователем. Но без полноценной документации и следования шаблону сложность значительно увеличивается. В этой статье создадим конвейер для бронирования гостиницы с помощью оркеструемых событий на Go и RabbitMQ, она также применима к другим языкам и брокерам сообщений.

Примечание: не рассматриваются архитектура ПО, тестирование, ограничение скорости и другие подобные аспекты. Если бы мы создавали приложение, пригодное для промышленной эксплуатации, на их освящение понадобились бы десятки статей.

Определение требований

Номера в гостинице бронируются по мере доступности. Создадим конвейер передачи запроса на бронирование номера по различным сервисам: резервирования, проверки, зачисления средств, бронирования.

Сервисам не нужно ждать ответа друг от друга — они даже не «знают», откуда запрос: у каждого сервиса только одна задача, и он хорошо с ней справляется. Это называется снижением связанности. Не нужно задумываться о причине запроса и ждать ответа других сервисов — используем все преимущества архитектуры микросервисов.

Сначала создадим блок-схему:

Как видите, всего четыре этапа:

  1. Проверка: в некоторых сценариях гостиницами обслуживаются не все желающие, например кому-то закрыт доступ или определенный номер резервируется только для конкретной группы. Это сложные правила, отделим их от веб-API.
  2. Резервирование: одновременное бронирование номера несколькими людьми предотвращается глобальной блокировкой, подобной Redis.
  3. Списание: зарезервировав номер, списываем средства.
  4. Бронирование: завершив процесс списания, удаляем резервирование и бронируем номер.

Но в любом сервисе случаются ошибки. Оркестрация — отличное подспорье для создания стабильного потока запросов, обработки ошибок и соответственных действий. Действия требуются при очевидных ошибках:

  • Недостаточно средств: удаляем резервирование.
  • Ошибка при бронировании: возвращаем средства и удаляем резервирование.

Настройка RabbitMQ

Не знакомы с RabbitMQ? Посмотрите руководство для начинающих, хотя основы мы разберем. RabbitMQ, как и Apache Kafka, — это приложение с отправителями и получателями сообщений. В приложении-чате отправителями сообщение отправляется, получателями — получается.

Как сообщению попасть к моему другу, а не случайному человеку в другой группе? Это сложная часть. В Apache Kafka для решения этой проблемы применяется простой, но мощный подход. В каждой теме имеется один или несколько физически обособленных разделов, а в каждой группе получателей каждый раздел считывается максимум одним получателем. RabbitMQ годится для такого рода приложений благодаря мощным типам exchange. Воспользуемся типом topic exchange.

Куда отправить новый запрос на бронирование? В RabbitMQ издателями публикуется новое сообщение с указанием в exchange имени, ключа и значения сообщения, им ничего не «известно» о получателях.

Но откуда типы exchange «знают», куда переадресовать сообщение? Здесь приходятся кстати очереди, считываемые получателями. Создадим очередь и определим, какие типы сообщений ими получаются. Например, с помощью ключа маршрутизации определим очередь, которой принимаются запросы на бронирование только в конкретном городе.

Допустим имеется три чата: A, B и C. Кто-то из чата A отправляет сообщение, ключ маршрутизации такой: room.A. Если всего 10 онлайн-пользователей, должно быть 10 разных очередей с одинаковым ключом room.A для каждого пользователя.

Имеется также панель администратора со всеми чатами. Ее ключ очереди: room.*. Нас интересует topic exchange. В RabbitMQ имеется четыре типа exchange.

Мы определили две очереди. Но каким exchange заполнить эти очереди? Узнаем с помощью привязок. Каждая очередь заполняется одним или несколькими exchange.


Покончив с основами маршрутизации RabbitMQ, разберем отличия двух систем: с хореографией и оркестрацией.

Система с хореографией

Система с хореографией похожа на дерево. Здесь нет супервизора для обработки аномалий, работа системы обеспечивается созданием различных очередей и ключей маршрутизации.


Система с оркестрацией полностью управляется оркестратором и остается асинхронной. Система A подключается к нескольким exchange и не «знает», кто отправил сообщение, ею лишь выполняется ее часть процесса. Например, служба обработки изображений, будучи частью нескольких оркестраций, не «знает», кто отправил сообщение, ею просто выполняется ее часть.

Вернемся к системе бронирования:

Спроектируем систему, где любой микросервис может использоваться другими оркестраторами в иных целях.

Создание приложения

Сначала создадим exchange и очереди, свяжем их вместе.

Объявление «exchange» и очередей

Любое получаемое в exchange сообщение удаляется. Поэтому, чтобы все настроить, создадим предварительно выполняемую задачу.

Сначала определим имена очередей и exchange:

var Exchange = "webapp.reservation"
var QueueOrchestrator = "orchestrator.reservation"
var QueueValidation = "validation"
var QueueReservation = "reservation"
var QueueCredit = "credit"
var QueueBooking = "booking"

Эти имена будут во всех пакетах, лучше определить их в конфигурационном файле:

var exchange = config.Exchange
var queueNames = []string{
   config.QueueOrchestrator, config.QueueValidation, config.QueueReservation, config.QueueCredit, config.QueueValidation,
}

func main() {

   conn, closeConnection := mq.NewRabbitMQ()
   defer closeConnection()

   channel, err := conn.Channel()
   panicWithMessage(err, "couldn't create a channel")

   // Объявляем основной «exchange», привязываемый к очередям
   err = channel.ExchangeDeclare(exchange, "topic", true,
      false, false, false, nil)
   panicWithMessage(err, "couldn't declare validation exchange")

   for _, name := range queueNames {
      _, err = channel.QueueDeclare(name, true,
         false, false, false, nil)
      panicWithMessage(err, fmt.Sprintf("couldn't declare the %s queue", name))

      err = channel.QueueBind(name, name+".*", exchange, false, nil)
      panicWithMessage(err, fmt.Sprintf("couldn't bind the %s queue ", name))
   }

   log.Println("> declaration completed")

}

NewRabbitMQ — это функция для создания нового AMQP-подключения. Сначала мы объявили основной exchange темы. Из-за использованной библиотеки github.com/rabbitmq/amqp091-go в коде много true и false, но в целом для durability задается только true. От этого RabbitMQ не делается брокером длительно сохраняемых сообщений, как Apache Kafka, а только обеспечивается сохранение очередей и определения exchange.

Каждой очередью прослушиваются ключи, например очередью validation  — ключи validation.*. Запомним это, так будет во всем проекте.

Запуск HTTP-сервера

Создадим HTTP-сервер, подключаемый к Redis и экземпляру RabbitMQ, с двумя маршрутами: одним для публикации нового запроса на бронирование, другим для отображения статуса запроса:

func main() {

   rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
   bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)

   connection, closeConnection := mq.NewRabbitMQ()
   defer closeConnection()
   channel, err := connection.Channel()
   panicWithMessage(err, "error while establishing a channel")

   e := echo.New()
   e.GET("/book", func(c echo.Context) error {
      request := types.BookingRequest{
         UID:    shortid.MustGenerate(),
         UserID: c.QueryParam("user_id"),
         RoomID: c.QueryParam("room_id"),
      }
      err := channel.PublishWithContext(context.TODO(), config.Exchange, config.QueueOrchestrator+".new",
         false, false, amqp091.Publishing{
            Body: types.Encode(request),
         })
      if err != nil {
         return c.String(500, "something went wrong")
      }
      return c.String(201, fmt.Sprintf("request %s has been created!", request.UID))
   })

   e.GET("/book/:uid", func(c echo.Context) error {
      key := "bookingRequest:" + c.Param("uid")
      item, _ := bookingRequestRepository.Get(c.Request().Context(), key)
      return c.String(201, fmt.Sprintf("status=%s\ninfo=%s", item.State, item.Additional))
   })
   go func() {
      e.Logger.Fatal(e.Start(":1323"))
   }()

   mq.GracefullyExit()

}

Тип запроса на бронирование:

type BookingRequest struct {
   UID    string `json:"UID"`
   UserID string `json:"user_id"`
   RoomID string `json:"room_id"`

   State      string `json:"state"`
   Additional string `json:"additional"`
}

Для каждого запроса создается новый BookingRequest с уникальным идентификатором, затем отправляется в exchange с ключом маршрутизации orchestrator.reservation.new. Очередью оркестратора будет получена копия сообщения, потому что она привязана к exchange для любого ключа, соответствующего регулярному выражению orchestrator.reservation.*.

Создание оркестратора

Сделаем простое решение маршрутизации для отправки в каждую задачу сообщений с разными целями, не забывая о предупреждении в начале статьи:

func main() {

   rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
   bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)

   connection, closeConnection := mq.NewRabbitMQ()
   defer closeConnection()

   consumeChannel, err := connection.Channel()
   panicWithMessage(err, "couldn't establish a consume Channel")

   produceChannel, err := connection.Channel()
   panicWithMessage(err, "couldn't establish a produce Channel")

   queue, err := consumeChannel.QueueDeclare(config.QueueOrchestrator, true,
      false, false, false, nil)
   panicWithMessage(err, "couldn't declare queue")

   msgs, err := consumeChannel.Consume(queue.Name, "", true, false, false, false, nil)
   panicWithMessage(err, "error while creating the consumer")

   // обновление состояния запроса и публикация в очереди резервирования
   handleNew := func(msg amqp091.Delivery) error {}

   handleValidated := func(msg amqp091.Delivery) error {}

   handleReserved := func(msg amqp091.Delivery) error {}

   handleDeposit := func(msg amqp091.Delivery) error {}

   handleBooked := func(msg amqp091.Delivery) error {}

   handleRefund := func(msg amqp091.Delivery) error {}

   actions := map[string]func(msg amqp091.Delivery) error{
      "new":       handleNew,
      "validated": handleValidated,
      "reserved":  handleReserved,
      "deposit":   handleDeposit,
      "booked":    handleBooked,
      "refund":    handleRefund,
   }

   go func() {
      // получение сообщений
      for msg := range msgs {
         if err := mq.Handler(msg, config.QueueOrchestrator, actions); err != nil {
            // здесь следует использовать очереди недоставленных сообщений
            log.Printf("key:%s\tunhandled error: %s\n", msg.RoutingKey, err.Error())
         }
      }
   }()
   mq.GracefullyExit()
  • Для отправки и получения нужно два разных виртуальных канала по TCP-подключению к RabbitMQ.
  • Если сущности уже имеются, функцией declare сущность не воссоздается, а используется имеющаяся.
  • Автоподтверждению задано значение true.

Что такое «очереди недоставленных сообщений»? Когда у exchange нет очереди для сообщения или сообщением вызывается неожиданная ошибка, нужно отправить их в очередь, обрабатываемую вручную, и использовать эти данные для повышения отказоустойчивости приложения.

Когда новый запрос отправляется в HTTP API, там публикуется новое сообщение с ключом orchestrator.reservation.new. Вот функция, которой из названия темы извлекается экшен:

handleNew := func(msg amqp091.Delivery) error {
  request := types.Decode[types.BookingRequest](msg.Body)
  log.Printf("recieved a new booking request #%s\n", request.UID)
  request.State = "proceeding"
  bookingRequestRepository.Save(context.TODO(), request)

  return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueValidation+".validate", false,
   false, amqp091.Publishing{
    Body: types.Encode(types.ValidationRequest{
     UID:    request.UID,
     UserID: request.UserID,
     RoomID: request.RoomID,
    }),
    ReplyTo: config.QueueOrchestrator + ".validated",
   })
 }

Когда оркестратором получается новый запрос на бронирование, публикуется сообщение о проверке. Самая важная часть —  ReplyTo, которым определяем, в какой очереди публиковать ответ. Это нужно для независимой работы средства проверки и с другими оркестраторами.

Создание задачи для средства проверки

handleValidation := func(msg amqp091.Delivery) error {
   validationRequest := types.Decode[types.ValidationRequest](msg.Body)
   validationRequest.Validated = true
   if validationRequest.UserID == "some_guy" && validationRequest.RoomID == "some_hotel" {
      validationRequest.Validated = false
      validationRequest.Errors = append(validationRequest.Errors, "you have been blocked by this hotel")
   }
   return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
      amqp091.Publishing{
         Body: types.Encode(validationRequest),
      })
}

actions := map[string]func(msg amqp091.Delivery) error{
   "validate": handleValidation,
}

Создан другой файл — точно такой же, как оркестратор, — всего с одним экшеном. Обычно, чтобы проверить валидность запроса, в файле заранее собираются правила и используются базы данных или кеши. Но ради упрощения захардкодим правило: проверим в будущем. Кроме того, результат опубликован в данной очереди с помощью msg.ReplyTo, для указания exchange используется msg.Exchange.

Создание других частей приложения

Резервирование и отмена брони:

handleReserve := func(msg amqp091.Delivery) error {
   request := types.Decode[types.ReservationRequest](msg.Body)
   // использование глобальной блокировки
   request.Reserved = true
   if request.RoomID == "reserved" {
      request.Reserved = false
      request.Errors = append(request.Errors, "the room is already reserved by someone else")
   }
   return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo,
      false, false, amqp091.Publishing{
         Body: types.Encode(request),
      })
}

handleCancel := func(msg amqp091.Delivery) error {
   request := types.Decode[types.ReservationRequest](msg.Body)
   return rdb.Del(context.TODO(), "room:"+request.RoomID).Err()
}

Внесение и возврат средств:

handleDeposit := func(msg amqp091.Delivery) error {
   deposit := types.Decode[types.DepositRequest](msg.Body)
   deposit.Done = true
   if deposit.UserID == "poor_guy" {
      deposit.Done = false
      deposit.Errors = append(deposit.Errors, "insufficient credit")
   }
   return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
      amqp091.Publishing{
         Body: types.Encode(deposit),
      })
}

handleRefund := func(msg amqp091.Delivery) error {
   deposit := types.Decode[types.DepositRequest](msg.Body)
   // бизнес-логика...
   deposit.Done = true
   if deposit.UserID == "very_unlucky_guy" {
      deposit.Done = false
      deposit.Errors = append(deposit.Errors, "unexpected error")
   }
   return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
      amqp091.Publishing{
         Body: types.Encode(deposit),
      })
}

И, наконец, система бронирования:

handleBooking := func(msg amqp091.Delivery) error {
   book := types.Decode[types.BookRequest](msg.Body)
   book.Done = true
   if book.UserID == "unlucky_guy" || book.UserID == "very_unlucky_guy" {
      book.Done = false
      book.Errors = append(book.Errors, "something unexpected happened")
   }
   return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
      amqp091.Publishing{
         Body: types.Encode(book),
      })
}

Чреватая ошибками логика добавлена для упрощения тестирования. Теперь завершаем логику в оркестраторе:

handleValidated := func(msg amqp091.Delivery) error {
   validation := types.Decode[types.ValidationRequest](msg.Body)
   request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+validation.UID)
   log.Printf("validation #%s completed: %t", validation.UID, validation.Validated)
   if validation.Validated {
      return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".reserve",
         false, false, amqp091.Publishing{
            ReplyTo: config.QueueOrchestrator + ".reserved",
            Body: types.Encode(types.ReservationRequest{
               UID:    request.UID,
               RoomID: request.RoomID,
               UserID: request.UserID,
            }),
         })
   } else {
      request.Additional = strings.Join(validation.Errors, ", ")
      request.State = "finished"
      return bookingRequestRepository.Save(context.TODO(), request)
   }
}

handleReserved := func(msg amqp091.Delivery) error {
   reserved := types.Decode[types.ReservationRequest](msg.Body)
   request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+reserved.UID)
   log.Printf("reservation #%s completed: %t", reserved.UID, reserved.Reserved)
   if reserved.Reserved {
      return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".deposit",
         false, false, amqp091.Publishing{
            ReplyTo: config.QueueOrchestrator + ".deposit",
            Body: types.Encode(types.DepositRequest{
               UID:    request.UID,
               UserID: request.UserID,
               Delta:  -2500, // в продакшене для этой цели нужна другая служба
            }),
         })
   } else {
      request.Additional = strings.Join(reserved.Errors, ", ")
      request.State = "finished"
      return bookingRequestRepository.Save(context.TODO(), request)
   }
}

handleDeposit := func(msg amqp091.Delivery) error {
   deposit := types.Decode[types.DepositRequest](msg.Body)
   request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
   log.Printf("deposit #%s completed: %t", deposit.UID, deposit.Done)
   if deposit.Done {
      return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueBooking+".book",
         false, false, amqp091.Publishing{
            ReplyTo: config.QueueOrchestrator + ".booked",
            Body: types.Encode(types.BookRequest{
               UID:    request.UID,
               UserID: request.UserID,
               RoomID: request.RoomID,
            }),
         })
   } else {
      request.Additional = strings.Join(deposit.Errors, ", ")
      request.State = "finished"
      return bookingRequestRepository.Save(context.TODO(), request)
   }
}

handleBooked := func(msg amqp091.Delivery) error {
   book := types.Decode[types.BookRequest](msg.Body)
   request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+book.UID)
   log.Printf("book #%s completed: %t", book.UID, book.Done)
   if book.Done {
      request.Additional = "booked successfully"
      request.State = "booked"
   } else {
      request.Additional = strings.Join(book.Errors, ", ")
      request.State = "finished"
      if err := produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".refund",
         false, false, amqp091.Publishing{
            ReplyTo: config.QueueOrchestrator + ".refund",
            Body:    types.Encode(types.DepositRequest{UID: request.UID, Delta: 2500, UserID: request.UserID}),
         }); err != nil {
         return err
      }
   }
   if err := bookingRequestRepository.Save(context.TODO(), request); err != nil {
      return err
   }
   return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".cancel",
      false, false, amqp091.Publishing{
         Body: types.Encode(types.ReservationRequest{UID: request.UID}),
      })

}

handleRefund := func(msg amqp091.Delivery) error {
   deposit := types.Decode[types.DepositRequest](msg.Body)
   request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
   log.Printf("refund #%s completed: %t", deposit.UID, deposit.Done)
   if deposit.Done {
      request.Additional += ", refund completed."
      request.State = "refunded"
   } else {
      request.Additional += ", problem while refunding."
      request.State = "finished"
   }
   return bookingRequestRepository.Save(context.TODO(), request)
}

Обрабатываем ошибки бронирования, возвращая средства и отменяя бронь. Опробуем систему:

Вот запрос на бронирование с ошибкой и возвратом средств:

А это запрос на бронирование с ошибкой и без возврата:

Вот почему важна очередь недоставленных сообщений. Нет другого способа обработать такие ошибки, систему нужно исследовать вручную.

Заключение

Событийно-ориентированная архитектура — это интересно, с ней приложения создаются в масштабе, котором могли создаваться раньше.


Источник

Report Page