使用spring-cloud-stream-function发送和接收rabbitmq消息
Spring Cloud Stream Function
是 Spring Cloud Stream
的一个扩展,其提供了一种使用函数式编程模型来处理输入和输出消息的方式。可以更方便地编写基于消息的应用,而不用过多关注底层消息中间件的一些实现细节。
演示环境
jdk-17.0.6
,RabbitMQ 3.12.6
,spring-boot-3.1.4
,spring-cloud-2022.0.4
主要演示一下使用函数式模型来处理消息的收发
1、EventConfig配置Supplier发送消息,以及Consumer消费消息
@Slf4j
@Configuration
public class EventConfig {
@Bean
public Consumer<String> payIn(EventHandler handler) {
return handler::handlePay;
}
@Bean
public Consumer<String> refundIn(EventHandler handler) {
return handler::handleRefund;
}
@Bean
public Supplier<String> payOut() {
return () -> String.format("Timer支付消息ID=%s", UUID.randomUUID());
}
@Bean
public Supplier<String> refundOut() {
return () -> String.format("Timer退款消息ID=%s", UUID.randomUUID());
}
}
其中EventHandler
是用来处理消息的Bean
,payOut
发送支付消息payIn
接收支付消息,refundOut
发送退款消息refundIn
接收退款消息
2、EventHandler处理接收到的消息,在此只作日志打印
@Slf4j
@Component
public class EventHandler {
public void handlePay(String message) {
log.info("收到支付消息 [{}]", message);
}
public void handleRefund(String message) {
log.info("收到退款消息 [{}]", message);
}
}
3、Application.yaml配置文件中指定消息的输入和输出
spring:
cloud:
stream:
bindings:
payOut-out-0:
destination: pay-event
payIn-in-0:
destination: pay-event
group: stream-function
refundOut-out-0:
destination: refund-event
refundIn-in-0:
destination: refund-event
group: stream-function
function:
definition: payIn;payOut;refundIn;refundOut
integration:
poller:
fixed-delay: 2000
rabbitmq:
addresses: 127.0.0.1
username: user
password: 123456
virtual-host: /
需要注意在spring-cloud-stream
中,有一个poller
的特性,是用于定期轮询消息发送到输出的通道,spring-cloud-stream
会创建一个poller
定期发送消息,这个poller
的默认配置时间为1秒,在此配置文件中可通过fixed-delay
改为2秒,另外需要注意有多个方法时在function->definition
中需要使用分号隔开
这个定期从某个地方轮询的场景有限,比如从数据库里取出一些待处理的任务发送到消息队列。如果需要在业务逻辑上发送消息可以使用StreamBridge
4、使用StreamBridge发送消息
@Slf4j
@Component
@AllArgsConstructor
public class EventTask {
private StreamBridge streamBridge;
@Scheduled(cron = "0/3 * * * * ?")
public void mockSend() {
streamBridge.send("payOut-out-0", String.format("Task支付消息ID=%37s", UUID.randomUUID()));
}
}
写一个定时任务或者Controller
使用StreamBridge
模拟发送消息,需要注意send
方法中指定通道名称
5、附录pom.xml关键依赖
<?xml version="1.0" encoding="UTF-8"?>
<project......>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<relativePath/>
</parent>
......
<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
......
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
......
</project>
log日志
......T23:48:19.502+08:00 INFO ...... : 收到退款消息 [Timer退款消息ID=b177f2f8-dc56-4e84-83d2-8c9b67763dec]
......T23:48:19.502+08:00 INFO ...... : 收到支付消息 [Timer支付消息ID=90b102e0-b131-49e0-b64b-ac3e20d7fd2b]
......T23:48:19.508+08:00 INFO ...... : 收到支付消息 [Timer支付消息ID=64071952-59a8-4374-b907-158d3aa48a6f]
......T23:48:19.508+08:00 INFO ...... : 收到退款消息 [Timer退款消息ID=90ac1957-2311-4a60-96d8-ab3e3d471a28]
......T23:48:21.015+08:00 INFO ...... : 收到支付消息 [Task支付消息ID= f22346a8-5209-43c9-b44a-a1b802ff3c5e]
......T23:48:21.220+08:00 INFO ...... : 收到支付消息 [Timer支付消息ID=99891284-e049-482b-b76b-7427ff273f19]
......T23:48:21.222+08:00 INFO ...... : 收到退款消息 [Timer退款消息ID=9aae09af-bc2e-4044-ba6e-727e2ce9e731]
......T23:48:23.222+08:00 INFO ...... : 收到支付消息 [Timer支付消息ID=cf8f6b8c-f2d8-4491-9551-428ced0856b2]
......T23:48:23.222+08:00 INFO ...... : 收到退款消息 [Timer退款消息ID=b271a152-5208-4005-867f-087c9ec75c41]
......T23:48:24.016+08:00 INFO ...... : 收到支付消息 [Task支付消息ID= 6c1fbd3a-8f36-40bf-ab8d-f52609949f25]
可以参考spring-cloud-stream-samples
中的案例multi-functions-samples\multi-functions-rabbit