rocketmq安装使用
1、rocketmq安装

下载rocketmq源码包rocketmq-all-5.0.0-source-release.zip,解压后编译

mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

编译成功后发布版本将在目录rocketmq-all-5.0.0-source-release/distribution/target/rocketmq-5.0.0/rocketmq-5.0.0

  • 启动rocketmq
nohup sh bin/mqnamesrv &
  • 启动broker
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
  • 关闭broker nameserver
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

启动成功后可使用自带的测试程序完成测试

模拟发送消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
模拟消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2、spring-boot中使用rocketmq
  • 引入基础依赖包,使用spring-boot-starter-parent:2.7.7
<dependency>
    <groupId>org.conscrypt</groupId>
    <artifactId>conscrypt-openjdk-uber</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-tools</artifactId>
    <version>5.0.0</version>
</dependency>
  • 编写模拟发送接收测试程序如下
RocketmqController.java 如下

@RestController
@RequestMapping("/rocket")
public class RocketmqController {

    @Autowired
    private RocketMQTemplate template;

    @PostMapping("/send")
    public void send() {
        HashMap map = new HashMap();
        map.put("messageId", UUID.randomUUID().toString().toLowerCase());
        MessageHeaders headers = new MessageHeaders(map);
        template.syncSend("mixfate_topic", MessageBuilder.createMessage("测试发送消息", headers));
    }

}

Consumer.java如下
@Slf4j
@Component
@RocketMQMessageListener(topic = "mixfate_topic", selectorExpression = "", consumerGroup = "mixfate")
public class Consumer implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info(JSON.toJSONString(message.getProperties()));
        log.info("接收到消息:{}", msg);
    }

}
  • rocketmq延迟消息

示例

template.syncSend("mixfate_topic",MessageBuilder.createMessage("测试发送延迟消息", headers),timeout,delayLevel);

其中timeout即超时时间参数,delayLevel为延时等级参数(整型),共18个延时等级参数,可以从rocketmq-all-5.0.0-source的源码中找到类MessageStoreConfig.java中定义的18个等级,如delayLevel为2即表示延时5秒,为5即表示延时1分钟,可以看到最长支持2个小时。

rocketmq

如果需要超过2个小时的延时处理时间需要怎么做呢?如需要将消息延时5小时。在不改源码的情况下可以提供一个简单的思路,将延迟消息的延时时长放置到消息体或消息头中,收到消息后判断此消息是否即时处理或继续放入rocketmq延时。

3、rocketmq-dashboard安装

下载dashboard源码https://github.com/apache/rocketmq-dashboard

编译安装后运行jar

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

需要注意默认的namesrv地址即为127.0.0.1:9876

rocketmq

rocketmq


赞赏(Donation)
微信(Wechat Pay)

donation-wechatpay