
本篇文章写于2020年10月31日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。例如,在用户注册时需同步发送短信验证码、在订单发货时需异步推送微信模板消息通知或短信通知。
消费推送接口并发量可能不高,要求同步推送消息的场景不多,但仍需要考虑可能会存在流量突增情况。在促销活动的前后、拉新活动期间等等,都可能需要同步推送大量短信通知,而我们的目标是只要一个POD(一个进程)就能处理整个电商平台的消息推送。
至于可异步的消息推送则通过MQ对接,实现削峰填谷,并通过监听系统的负载情况动态的控制消息消费速度,让系统处在一个稳定的运行状态。
在团队协同定义完消息推送接口后,消息推送服务相当于只做一层代理,与网关非常相似,这也是我们考虑使用WebFlux的原因之一。以此降低消息推送服务的部署成本。
Spring WebFlux与Spring WebMvc同为Web框架,不同的是,WebFlux是完全非阻塞的,能够实现以少量的线程处理并发请求、以更少的硬件资源获取系统更高的吞吐量。
但使用反应式编程可能不适合复杂业务的开发,也不适合采用了DDD领域驱动设计架构的项目,如果要使用,就必须要让响应式API侵入DDD的领域服务类、仓储类。
要使用Spring WebFlux提供完全非阻塞的接口,就必须要确保处理一个请求的整个流程都是非阻塞的,只要有一个步骤导致线程发生阻塞,WebFlux的性能就直线下降,为此你还要给WebFlux配置更多的线程,这与使用WebMvc并无差异,得不到高性能反而还增加项目的复杂性。
例如,处理接口请求阻塞在操作数据库上,那么默认WebFlux配置的几个线程都会被阻塞住,此时,如果想通过增加WebFlux的工作线程数来解决问题,那么不如直接切换回WebMvc。
使用WebFlux获得高性能的同时必然要失去些什么,毕竟是等价替换。所以代码难以调试、项目代码复杂度提升难以阅读、并且会导致一些强依赖ThreadLocal实现特性的框架无法正常工作,我们不得不抛弃这些框架而寻找支持反应式的框架替代。
消息推送服务在处理一次消息推送请求的过程中,可能需要访问Redis、数据库RDS、以及第三方接口。
Redis用于缓存消息模板,但这块可以使用内存缓存替代以获取更快的响应速度,后期如果需要访问Redis,可以使用Lettuce替代Jedis。
请求第三方接口则可以使用WebFlux提供的WebClient实现,用于替代诸如httpclient、okhttp这类http客户端框架,实现可以使用单一长连接的非阻塞发送http请求。
最后可能需要持久化推送记录以便于后续报表的统计或其它,所以需要使用R2DBC替换JDBC实现非阻塞操作数据库。
R2DBC与jdbc的关系类似于WebFlux与WebMvc的关系,R2DBC是实现非阻塞操作数据库的规范,提供反应式编程API,目前已有多种实现该规范的数据库驱动程序包,如r2dbc-mysql,spring data r2dbc则是我们用来替代mybatis的orm框架。
webflux的异常处理与全局异常处理
webflux兼容webmvc的全局异常处理机制,如果不嫌麻烦,也可以每个接口自行处理异常,例如:
@PostMapping("push/sms")
public Mono<GenericResponse<MessagePushResultDto>> genericSendSmsMsg(
// webflux也支持参数检验
@Validated @RequestBody Mono<XxxCommand> command) {
return xxxxService.pushMessage(command)
.flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
// 处理异常,不处理则走全局异常处理
.onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
}
- 让自定义的
JsonUtils接替webflux解析json的工作
我们将Json解析封装成独立的组件,目的是适配多个json解析框架,让切换json解析框架只需要切换依赖jar包即可。为此,我们依然需要让JsonUtils替代WebFlux的json解析工作。代码实现如下。

- 使用
WebClient发送Post请求示例
private Mono<WxmbMessageResponse> sendTemplateMessage(WxmbMessageCommand command, String token) {
return webClient.post().uri("/cgi-bin/message/template/send?access_token=" + token)
.accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(command)
.retrieve()
.bodyToMono(WxmbMessageResponse.class);
}
- 使用
Spring Data R2DBC实现增删改查
项目需要添加mysql的r2dbc驱动包,以及spring-data-r2dbc,同时spring-data-r2dbc依赖的r2dbc-spi包也会被导入。
<!-- r2dbc mysql驱动-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
<!-- spring-data-r2dbc的starter包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
spring-data-r2dbc与spring-data-jpa的使用非常相似,两者都是实现spirng-data-commons下的repository的API,spring-data-r2dbc实现的是反应式API。简单的CRUD可通过继承R2dbcRepository<T, ID>接口实现,例如:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}
使用@Query自定义查询实现如下:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Query("select * from message where id = ?")
Mono<MessagePO> selectByMsgId(Long msgId);
}
@Query注解不等于Mybatis的@Select注解,@Query可以编写增删改查SQL,如果需要执行写操作,需要配合@Modifying注解使用。例如
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Modifying
@Query("delete from message where id = :msgId")
Mono<Integer> deleteByMsgId(Long msgId);
}
- 使用
spring-data-r2dbc实现复杂查询
对于复杂的查询,我们也可以直接使用spring-data-r2dbc的API实现。例如:

DatabaseClient类似Mybatis中的SqlSession概念。
关于spring-data-r2dbc的使用,推荐阅读spring官方文档,虽然是英文,但阅读起来并不难理解,想要学习冷门技术,就必须要啃英文文档,因为你会发现,这方面的博客文章少之又少,还避免不了一些博客文章使用的spring-data-r2dbc版本与自己使用的版本不同存在API差异导致“copy”的代码画红线问题。
spring-data-r2dbc 1.1.0版本官方文档链接:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference,也可到spring.io官网搜索。