RocketMQ5.3.1集成SpringBoot3.4.0就这样简单
RocketMQ5.3.1集成SpringBoot3.4.0就这样简单

消息中间件的用途很广,应用解耦、流量削峰、异步处理、数据推送等场景都可以使用RocketMQ进行实现。今天我们主要任务就是基于源码搭建好RocketMQ和控制台环境,然后和SpringBoot3集成再生产消息和消费消息,我们可以结合下图来掌握RocketMQ的常见应用场景

完整代码在文章最后,如果觉得本篇文章对你有用,记得点赞、关注、收藏哦。你的支持是我持续更新的动力!
SpringBoot3专栏软件环境
- JDK17.0.12
- SpringBoot3.4.0
- RocketMQ5.3.1
- rocketmq-dashboard-1.0.0
- IDEA2024.2.0.2
我们先看本篇文章对应的项目结构,请看下图

1 基于源码搭建RocketMQ5.3.1环境
1.1 下载源码打包编译
基于源码的打包编译较慢,如果不想基于源码搭建RocketMQ环境,可以直接下载二进制包
# 源码下载地址
https://github.com/apache/rocketmq/tree/rocketmq-all-5.3.1
# 二进制包下载地址
https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip
# 编译打包
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U


1.2 修改配置
修改默认的JVM内存大小(调整为满足本机内存大小),生产环境根据服务器配置自行调整
distribution\target\rocketmq-5.3.1\rocketmq-5.3.1\bin\runserver.cmd,linux对应的是runserver.sh

distribution\target\rocketmq-5.3.1\rocketmq-5.3.1\bin\runbroker.cmd,linux对应的是runbroker.sh

1.3 启动RocketMQ Server
distribution\target\rocketmq-5.3.1\rocketmq-5.3.1\bin目录下执行以下命令,windows环境启动需要先配置环境变量ROCKETMQ_HOME=本地磁盘路径\distribution\target\rocketmq-5.3.1\rocketmq-5.3.1
path中添加%ROCKETMQ_HOME%\bin
# 1 启动NameServer
mqnamesrv
#2 启动Broker
mqbroker -n localhost:9876
#3 关闭服务
mqshutdown namesrv


2 基于源码搭建rocketmq-dashboard1.0
2.1 安装rocketmq-dashboard控制台
# 源码下载地址
https://github.com/apache/rocketmq-dashboard/tree/release-1.0.0
# 在rocketmq-dashboard-release-1.0.0\src\main\resources\application.properties文件中修改属性
rocketmq.config.namesrvAddr=localhost:9876
# 编译打包
mvn clean package -Dmaven.test.skip=true


2.2 启动和访问控制台
# 启动服务
java -jar target/rocketmq-dashboard-1.0.0.jar
# 访问服务
http://localhost:8080


3 项目搭建
3.1 父工程和子工程依赖管理
父工程pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itbeien</groupId>
<artifactId>springboot3-labs-master</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<!-- <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0</version>
</parent>-->
<modules>
<module>springboot-nacos</module>
<module>springboot-mybatis-plus</module>
<module>spingboot-mybatis</module>
<module>mybatis-generator</module>
<module>springboot-mybatis-nacos</module>
<module>dynamic-datasource-mybatis-plus</module>
<module>springboot-mapstruct</module>
<module>springboot-mapstruct-lombok</module>
<module>springboot-rocketmq</module>
</modules>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<nacos-config-spring-boot-starter-version>0.2.12</nacos-config-spring-boot-starter-version>
<lombok-version>1.18.34</lombok-version>
<mybatis-plus-spring-boot3-starter-version>3.5.9</mybatis-plus-spring-boot3-starter-version>
<mysql-connector-java-version>5.1.49</mysql-connector-java-version>
<spring-boot-dependencies-version>3.4.0</spring-boot-dependencies-version>
<mybatis-spring-boot-version>3.0.3</mybatis-spring-boot-version>
<mapstruct-verrsion>1.5.5.Final</mapstruct-verrsion>
<mapstruct-processor-version>1.5.5.Final</mapstruct-processor-version>
<rocketmq-spring-boot-starter-version>2.3.1</rocketmq-spring-boot-starter-version>
<fastjson-vesion>2.0.53</fastjson-vesion>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot-dependencies-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>${nacos-config-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>${mybatis-plus-spring-boot3-starter-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java-version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct-verrsion}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct-processor-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson-vesion}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
子工程pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.itbeien</groupId>
<artifactId>springboot3-labs-master</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-rocketmq</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>
3.2 配置文件
server.port=1010
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=itbeien_group_1
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.max-message-size=4096
rocketmq.producer.retry-times-when-send-failed=3
rocketmq.producer.retry-times-when-send-async-failed=2
rocketmq.consumer.group=itbeien_group_1
rocketmq.consumer.pull-batch-size=5
3.3 代码实现
3.3.1 生产者和消费者
package cn.itbeien.rocketmq.service;
import cn.itbeien.rocketmq.vo.MessageVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@Slf4j
public class ProducerService
{
@Autowired
private RocketMQTemplate rocketMqTemplate;
@Autowired
private DefaultMQProducer defaultMqProducer ;
public String sendMsg1 (){
try {
// 构建消息主体
String msgBody = JSONObject.toJSONString(new MessageVO(1,"itbeien_mq_msg"));
// 发送消息
rocketMqTemplate.convertAndSend("itbeien-mq-topic",msgBody);
} catch (Exception e) {
log.error("发送消息异常:{}",e);
}
return "success" ;
}
public String sendMsg2 (){
try {
// 构建消息主体
String msgBody = JSONObject.toJSONString(new MessageVO(1,"itbeien_mq_msg"));
// 构建消息对象
Message message = new Message();
message.setTopic("itbeien-mq-topic");
message.setTags("itbeien-mq-tag");
message.setKeys("itbeien-mq-key");
message.setBody(msgBody.getBytes());
// 发送消息,打印日志
SendResult sendResult = defaultMqProducer.send(message);
log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
} catch (Exception e) {
log.error("发送消息异常:{}",e);
}
return "success" ;
}
}
package cn.itbeien.rocketmq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@RocketMQMessageListener(consumerGroup = "itbeien_group_1",topic = "itbeien-mq-topic")
@Slf4j
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("message:{} ",message);
}
}
3.4 代码测试
package cn.itbeien.rocketmq.test;
import cn.itbeien.rocketmq.service.ProducerService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@SpringBootTest
public class RocketMQApplication {
@Autowired
private ProducerService producerService;
/**
* 生产消息
*/
@Test
public void sendMsg(){
producerService.sendMsg1();
}
}
执行以上代码成功后,在rocketmq管理界面查看发送的消息和消费者日志


欢迎大家关注我的项目实战内容itbeien.cn,一起学习一起进步,在项目和业务中理解各种技术。

欢迎沟通交流技术和支付业务,一起探讨聚合支付/预付卡系统业务、技术、系统架构、微服务、容器化。并结合聚合支付系统深入技术框架/微服务原理及分布式事务原理。加入我的知识星球吧

SpringBoot3专栏
01SpringBoot3专栏-SpringBoot3.4.0整合Mybatis-plus和Mybatis
02SpringBoot3.4.0结合Mybatis-plus实现动态数据源
03mapstruct对象映射在Springboot3中这样用就对了
跟着我学微服务系列
01跟着我学微服务,什么是微服务?微服务有哪些主流解决方案?
05SpringCloudAlibaba之图文搞懂微服务核心组件在企业级支付系统中的应用
06JDK17+SpringBoot3.4.0+Netty4.1.115搭建企业级支付系统POS网关
07JDK17+SpringCloud2023.0.3搭建企业级支付系统-预付卡支付交易微服务
08JDK17+Dubbo3.3.2搭建企业级支付系统-预付卡支付交易微服务
09JDK17+SpringBoot3.3.6+Netty4.1.115实现企业级支付系统POS网关签到功能
贝恩聊架构-项目实战地址
4 源码地址
贝恩聊架构-SpringBoot3专栏系列文章、资料和源代码会同步到以下地址,代码和资料每周都会同步更新
该仓库地址主要用于贝恩聊架构-SpringBoot3专栏、基于企业级支付系统,学习微服务整体技术栈
