RabbitMQ + Spring boot + Docker. Отправляем и получаем сообщения через Producer и Consumer. Пошаговое руководство
https://t.me/java_newssВсем привет. Поскольку не смог найти полноценной статьи о том, как с нуля написать свой spring boot сервис с подключением к нему rabbitMQ, с конфигурацией всего это чуда через графический интерфейс и успешной отправкой и получением сообщения из очереди, то решил написать свою статью, что бы облегчить жизнь тем, кто захочет с этим познакомиться. Так же добавлю теоретическую часть, что бы не искать все эту информацию по всем источникам. Я постараюсь своими словами объяснять все, с чем мы будет сталкиваться. Если хотите узнать более подробно, то советую обратиться к официальной документации: Официальный сайт RabbitMQ
Практическая часть будет прерываться на теоретическую, что бы было понимание того, что происходит.
Давайте начнем.
Подготовим наше окружение для работы:
В статье использовался Docker Desktop. Система Windows
- Первое что нам нужно сделать, это поднять нашу очередь в docker(о том как его поставить к себе на машину, я рассказывать не буду. На хабре огромное количество материала о том, как это сделать). Пишем простейший docker-compose.yml
version: '3' services: localRabbitMQ: image: rabbitmq:3-management-alpine environment: RABBITMQ_DEFAULT_USER: user RABBITMQ_DEFAULT_PASS: password ports: - 5672:5672 - 15672:15672
запускаем через консоль из папки, в которой лежит наш файл командой: docker compose up
Теперь на http://localhost:15672/ должны видеть вот такую картину:
Credentials для входа мы указали в docker-compose.yml (user и password)
2. Создание Exchange. После авторизации нам нужно создать exchange. Параметры, указаны ниже на картинке в Add a new exchange. После нажимаем кнопку Add exchange. Тип указываем direct(это важно) Потому что типа direct, мы можем задать routingKey(ниже описано что это), а для fanout не можем, потому что он пропускает все сообщения.
Теперь немного погрузимся в теорию, что бы было понимание того, что за зверь этот exchange. Общая схема взаимодействия выглядит следующим образом:
Producer - производитель сообщений (отдельное приложение на Java)
Consumer 1, Consumer 2 - потребители сообщений (отдельные приложения)
Exchanges - обменник. Cущность Rabbit, точка входа для публикации всех сообщений.
Binding - связь между Exchange и очередью
Queue - очередь сообщений
Все сообщения из приложения Producer попадают в Exchanges, после этого обрабатываются на основе binding и routingKey(дальше сделаем это на практике и вы поймете, как это работает), после сообщение попадает в очередь и забирается из нее consumer'ом.
3. Создание очереди. Теперь давайте создадим очередь (Queue) через которую будем передавать сообщения. В процессе работы у вас может фигурировать такая сущность, как virtual host, давайте его мы тоже создадим (потому что в продакшене точно не будет использоваться дефолтный) Заходим в admin, справа выбираем Virtual Hosts и создаем новый, чеhез кнопку add virtual host. Я назвал его cpp.
Далее переходим в Queue и создаем новую очередь. Я назвал ее Queue1
Остался последний шаг, для настройки окружения. Теперь зададим Binding. Он нужен для того, что бы данные, которые попадают в exchange(а туда попадают все сообщения из Producer'a) распределялись по разным очередям (Разные Bingings будут распределять сообщения в разные очереди). Для того, что бы создать Binding заходим в нашу созданную очередь и во вкладке Bindings добавляем новый binging
Теперь все сообщения, которые попадают в наш exchange, который называется testExchange и имеют routingKey с названием testRoutingKey, будут попадать в очередь queue1
4. Мы закончили настройку окружения. Теперь, перейдем к написанию кода. Для этого создадим два простейших приложения на Java с использованием Spring Boot.
Я использую 17 Java. Зависимости: Spring for RabbitMQ, Spring Web
5. Разработка Producer'a и Consumer'a
Я создам два простейших приложения. Одно назову RabbitMQProducer другое RabbitMQConsumer. Я использую порты: 8086 и 8087 соответственно. Вы можете использовать любые другие.
Код RabbitMQConsumer:
Создам два пакета: config, consumer. В пакете config будет класс RabbitConf, а в пакете consumer класс RabbitMQConsumer
RabbitConf
@Configuration public class RabbitConf { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost"); cachingConnectionFactory.setUsername("user"); cachingConnectionFactory.setPassword("password"); cachingConnectionFactory.setVirtualHost("cpp"); return cachingConnectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue myQueue() { return new Queue("queue1"); } @Bean DirectExchange exchange() { return new DirectExchange("testExchange", true, false); } @Bean Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey"); } }
RabbitMQConsumer
@Component @EnableRabbit public class RabbitMQConsumer { @RabbitListener(queues = "queue1") public void processMyQueue(String message) { System.out.printf("Received from myQueue : %s ", new String(message.getBytes())); } }
RabbitMqConsumerApplication
@SpringBootApplication public class RabbitMqConsumerApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqConsumerApplication.class, args); } }
Код RabbitMQProducer:
Создам 4 пакета: config, controller, model, producer В пакете config будет класс RabbitConf, в пакете controller класс RabbitController, в пакете model класс MessageModel и в пакете producer интерфейс RabbitMQProducerService и его имплементация RabbitMQProducerServiceImpl. Пакеты service и пакет impl в пакете producer создавать не стал (прошу прощения, если кого-то это задело).
RabbitConfig
@Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost"); cachingConnectionFactory.setUsername("user"); cachingConnectionFactory.setPassword("password"); cachingConnectionFactory.setVirtualHost("cpp"); return cachingConnectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue myQueue() { return new Queue("queue"); } @Bean DirectExchange exchange() { return new DirectExchange("testExchange", true, false); } @Bean Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey"); } }
RabbitController
@RestController public class RabbitController { private final RabbitMQProducerService rabbitMQProducerService; @Autowired public RabbitController(RabbitMQProducerService rabbitMQProducerService) { this.rabbitMQProducerService = rabbitMQProducerService; } @GetMapping("/send") public void sendMessageToRabbit(@RequestBody MessageModel messageModel) { rabbitMQProducerService.sendMessage(messageModel.getMessage(), messageModel.getRoutingKey()); } @GetMapping("/health") public String healthCheck() { return "OK"; } }
MessageModel
@Data public class MessageModel { private String message; private String routingKey; }
RabbitMQProducerService
public interface RabbitMQProducerService { void sendMessage(String message, String routingKey); }
RabbitMQProducerServiceImpl
@Service public class RabbitMQProducerServiceImpl implements RabbitMQProducerService { private final RabbitTemplate rabbitTemplate; @Autowired public RabbitMQProducerServiceImpl(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(String message, String routingKey) { rabbitTemplate.convertAndSend("testExchange", routingKey, message); } }
RabbitMqProducerApplication
@SpringBootApplication public class RabbitMqProducerApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqProducerApplication.class, args); } }
На всякий случай прикрепляю свой pom.xml он одинаковый для обоих проектов.
Pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.0.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitMQProducer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitMQProducer</name> <description>rabbitMQProducer</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
Тестирование работоспособности:
При отправке запроса через Postman на наш Producer с параметрами:
{
"message": "just text",
"routingKey": "testRoutingKey"
}
В нашем Consumer'e мы видим следующее:
Таким образом мы видим, что наше сообщение получено из очереди queue1, но если мы передадим в нашем запросе к Producer'у "routingKey": отличный от значения "testRoutingKey"
, то наш Consumer не получит это сообщение из очереди поскольку он получает только сообщения с routingKey = testRoutingKey.
На этом можно завершать данную статью, надеюсь что она была вам полезна. и вам стало немного понятнее, как начать свое знакомство с RabbitMQ. А самое главное теперь стало понятно, в каком направлении копать дальше.