使用spring-cloud-stream-function发送和接收rabbitmq消息

Spring Cloud Stream FunctionSpring Cloud Stream 的一个扩展,其提供了一种使用函数式编程模型来处理输入和输出消息的方式。可以更方便地编写基于消息的应用,而不用过多关注底层消息中间件的一些实现细节。

演示环境jdk-17.0.6RabbitMQ 3.12.6spring-boot-3.1.4spring-cloud-2022.0.4

主要演示一下使用函数式模型来处理消息的收发

1、EventConfig配置Supplier发送消息,以及Consumer消费消息
@Slf4j
@Configuration
public class EventConfig {

    @Bean
    public Consumer<String> payIn(EventHandler handler) {
        return handler::handlePay;
    }

    @Bean
    public Consumer<String> refundIn(EventHandler handler) {
        return handler::handleRefund;
    }

    @Bean
    public Supplier<String> payOut() {
        return () -> String.format("Timer支付消息ID=%s", UUID.randomUUID());
    }

    @Bean
    public Supplier<String> refundOut() {
        return () -> String.format("Timer退款消息ID=%s", UUID.randomUUID());
    }

}

其中EventHandler是用来处理消息的BeanpayOut发送支付消息payIn接收支付消息,refundOut发送退款消息refundIn接收退款消息

2、EventHandler处理接收到的消息,在此只作日志打印
@Slf4j
@Component
public class EventHandler {

    public void handlePay(String message) {
        log.info("收到支付消息 [{}]", message);
    }

    public void handleRefund(String message) {
        log.info("收到退款消息 [{}]", message);
    }

}
3、Application.yaml配置文件中指定消息的输入和输出
spring:
  cloud:
    stream:
      bindings:
        payOut-out-0:
          destination: pay-event
        payIn-in-0:
          destination: pay-event
          group: stream-function
        refundOut-out-0:
          destination: refund-event
        refundIn-in-0:
          destination: refund-event
          group: stream-function
    function:
      definition: payIn;payOut;refundIn;refundOut
  integration:
    poller:
      fixed-delay: 2000
  rabbitmq:
    addresses: 127.0.0.1
    username: user
    password: 123456
    virtual-host: /

需要注意在spring-cloud-stream中,有一个poller的特性,是用于定期轮询消息发送到输出的通道,spring-cloud-stream会创建一个poller定期发送消息,这个poller的默认配置时间为1秒,在此配置文件中可通过fixed-delay改为2秒,另外需要注意有多个方法时在function->definition中需要使用分号隔开

这个定期从某个地方轮询的场景有限,比如从数据库里取出一些待处理的任务发送到消息队列。如果需要在业务逻辑上发送消息可以使用StreamBridge

4、使用StreamBridge发送消息
@Slf4j
@Component
@AllArgsConstructor
public class EventTask {

    private StreamBridge streamBridge;

    @Scheduled(cron = "0/3 * * * * ?")
    public void mockSend() {
        streamBridge.send("payOut-out-0", String.format("Task支付消息ID=%37s", UUID.randomUUID()));
    }

}

写一个定时任务或者Controller使用StreamBridge模拟发送消息,需要注意send方法中指定通道名称

5、附录pom.xml关键依赖
<?xml version="1.0" encoding="UTF-8"?>
<project......>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.4</version>
        <relativePath/>
    </parent>
    ......
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2022.0.4</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        ......
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    ......

</project>

log日志

......T23:48:19.502+08:00  INFO ...... : 收到退款消息 [Timer退款消息ID=b177f2f8-dc56-4e84-83d2-8c9b67763dec]
......T23:48:19.502+08:00  INFO ...... : 收到支付消息 [Timer支付消息ID=90b102e0-b131-49e0-b64b-ac3e20d7fd2b]
......T23:48:19.508+08:00  INFO ...... : 收到支付消息 [Timer支付消息ID=64071952-59a8-4374-b907-158d3aa48a6f]
......T23:48:19.508+08:00  INFO ...... : 收到退款消息 [Timer退款消息ID=90ac1957-2311-4a60-96d8-ab3e3d471a28]
......T23:48:21.015+08:00  INFO ...... : 收到支付消息 [Task支付消息ID= f22346a8-5209-43c9-b44a-a1b802ff3c5e]
......T23:48:21.220+08:00  INFO ...... : 收到支付消息 [Timer支付消息ID=99891284-e049-482b-b76b-7427ff273f19]
......T23:48:21.222+08:00  INFO ...... : 收到退款消息 [Timer退款消息ID=9aae09af-bc2e-4044-ba6e-727e2ce9e731]
......T23:48:23.222+08:00  INFO ...... : 收到支付消息 [Timer支付消息ID=cf8f6b8c-f2d8-4491-9551-428ced0856b2]
......T23:48:23.222+08:00  INFO ...... : 收到退款消息 [Timer退款消息ID=b271a152-5208-4005-867f-087c9ec75c41]
......T23:48:24.016+08:00  INFO ...... : 收到支付消息 [Task支付消息ID= 6c1fbd3a-8f36-40bf-ab8d-f52609949f25]

可以参考spring-cloud-stream-samples中的案例multi-functions-samples\multi-functions-rabbit


赞赏(Donation)
微信(Wechat Pay)

donation-wechatpay