rocketmq安装使用
1、rocketmq安装
下载rocketmq
源码包rocketmq-all-5.0.0-source-release.zip
,解压后编译
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
编译成功后发布版本将在目录rocketmq-all-5.0.0-source-release/distribution/target/rocketmq-5.0.0/rocketmq-5.0.0
中
- 启动
rocketmq
nohup sh bin/mqnamesrv &
- 启动broker
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
- 关闭broker nameserver
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
启动成功后可使用自带的测试程序完成测试
模拟发送消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
模拟消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2、spring-boot中使用rocketmq
- 引入基础依赖包,使用
spring-boot-starter-parent:2.7.7
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>5.0.0</version>
</dependency>
- 编写模拟发送接收测试程序如下
RocketmqController.java 如下
@RestController
@RequestMapping("/rocket")
public class RocketmqController {
@Autowired
private RocketMQTemplate template;
@PostMapping("/send")
public void send() {
HashMap map = new HashMap();
map.put("messageId", UUID.randomUUID().toString().toLowerCase());
MessageHeaders headers = new MessageHeaders(map);
template.syncSend("mixfate_topic", MessageBuilder.createMessage("测试发送消息", headers));
}
}
Consumer.java如下
@Slf4j
@Component
@RocketMQMessageListener(topic = "mixfate_topic", selectorExpression = "", consumerGroup = "mixfate")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info(JSON.toJSONString(message.getProperties()));
log.info("接收到消息:{}", msg);
}
}
rocketmq
延迟消息
示例
template.syncSend("mixfate_topic",MessageBuilder.createMessage("测试发送延迟消息", headers),timeout,delayLevel);
其中timeout
即超时时间参数,delayLevel
为延时等级参数(整型),共18个延时等级参数,可以从rocketmq-all-5.0.0-source
的源码中找到类MessageStoreConfig.java
中定义的18个等级,如delayLevel
为2即表示延时5秒,为5即表示延时1分钟,可以看到最长支持2个小时。
如果需要超过2个小时的延时处理时间需要怎么做呢?如需要将消息延时5小时。在不改源码的情况下可以提供一个简单的思路,将延迟消息的延时时长放置到消息体或消息头中,收到消息后判断此消息是否即时处理或继续放入rocketmq
延时。
3、rocketmq-dashboard安装
下载dashboard
源码https://github.com/apache/rocketmq-dashboard
编译安装后运行jar
包
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
需要注意默认的namesrv
地址即为127.0.0.1:9876