本篇文章写于2020年10月31日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。例如,在用户注册时需同步发送短信验证码、在订单发货时需异步推送微信模板消息通知或短信通知。
消费推送接口并发量可能不高,要求同步推送消息的场景不多,但仍需要考虑可能会存在流量突增情况。在促销活动的前后、拉新活动期间等等,都可能需要同步推送大量短信通知,而我们的目标是只要一个POD
(一个进程)就能处理整个电商平台的消息推送。
至于可异步的消息推送则通过MQ
对接,实现削峰填谷,并通过监听系统的负载情况动态的控制消息消费速度,让系统处在一个稳定的运行状态。
在团队协同定义完消息推送接口后,消息推送服务相当于只做一层代理,与网关非常相似,这也是我们考虑使用WebFlux
的原因之一。以此降低消息推送服务的部署成本。
Spring WebFlux
与Spring WebMvc
同为Web
框架,不同的是,WebFlux
是完全非阻塞的,能够实现以少量的线程处理并发请求、以更少的硬件资源获取系统更高的吞吐量。
但使用反应式编程可能不适合复杂业务的开发,也不适合采用了DDD
领域驱动设计架构的项目,如果要使用,就必须要让响应式API
侵入DDD
的领域服务类、仓储类。
要使用Spring WebFlux
提供完全非阻塞的接口,就必须要确保处理一个请求的整个流程都是非阻塞的,只要有一个步骤导致线程发生阻塞,WebFlux
的性能就直线下降,为此你还要给WebFlux
配置更多的线程,这与使用WebMvc
并无差异,得不到高性能反而还增加项目的复杂性。
例如,处理接口请求阻塞在操作数据库上,那么默认WebFlux
配置的几个线程都会被阻塞住,此时,如果想通过增加WebFlux
的工作线程数来解决问题,那么不如直接切换回WebMvc
。
使用WebFlux
获得高性能的同时必然要失去些什么,毕竟是等价替换。所以代码难以调试、项目代码复杂度提升难以阅读、并且会导致一些强依赖ThreadLocal
实现特性的框架无法正常工作,我们不得不抛弃这些框架而寻找支持反应式的框架替代。
消息推送服务在处理一次消息推送请求的过程中,可能需要访问Redis
、数据库RDS
、以及第三方接口。
Redis
用于缓存消息模板,但这块可以使用内存缓存替代以获取更快的响应速度,后期如果需要访问Redis
,可以使用Lettuce
替代Jedis
。
请求第三方接口则可以使用WebFlux
提供的WebClient
实现,用于替代诸如httpclient
、okhttp
这类http
客户端框架,实现可以使用单一长连接的非阻塞发送http
请求。
最后可能需要持久化推送记录以便于后续报表的统计或其它,所以需要使用R2DBC
替换JDBC
实现非阻塞操作数据库。
R2DBC
与jdbc
的关系类似于WebFlux
与WebMvc
的关系,R2DBC
是实现非阻塞操作数据库的规范,提供反应式编程API
,目前已有多种实现该规范的数据库驱动程序包,如r2dbc-mysql
,spring data r2dbc
则是我们用来替代mybatis
的orm
框架。
webflux
的异常处理与全局异常处理
webflux
兼容webmvc
的全局异常处理机制,如果不嫌麻烦,也可以每个接口自行处理异常,例如:
@PostMapping("push/sms")
public Mono<GenericResponse<MessagePushResultDto>> genericSendSmsMsg(
// webflux也支持参数检验
@Validated @RequestBody Mono<XxxCommand> command) {
return xxxxService.pushMessage(command)
.flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
// 处理异常,不处理则走全局异常处理
.onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
}
- 让自定义的
JsonUtils
接替webflux
解析json
的工作
我们将Json
解析封装成独立的组件,目的是适配多个json
解析框架,让切换json
解析框架只需要切换依赖jar
包即可。为此,我们依然需要让JsonUtils
替代WebFlux
的json
解析工作。代码实现如下。
- 使用
WebClient
发送Post
请求示例
private Mono<WxmbMessageResponse> sendTemplateMessage(WxmbMessageCommand command, String token) {
return webClient.post().uri("/cgi-bin/message/template/send?access_token=" + token)
.accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(command)
.retrieve()
.bodyToMono(WxmbMessageResponse.class);
}
- 使用
Spring Data R2DBC
实现增删改查
项目需要添加mysql
的r2dbc
驱动包,以及spring-data-r2dbc
,同时spring-data-r2dbc
依赖的r2dbc-spi
包也会被导入。
<!-- r2dbc mysql驱动-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
<!-- spring-data-r2dbc的starter包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
spring-data-r2dbc
与spring-data-jpa
的使用非常相似,两者都是实现spirng-data-commons
下的repository
的API
,spring-data-r2dbc
实现的是反应式API
。简单的CRUD
可通过继承R2dbcRepository<T, ID>
接口实现,例如:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}
使用@Query
自定义查询实现如下:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Query("select * from message where id = ?")
Mono<MessagePO> selectByMsgId(Long msgId);
}
@Query
注解不等于Mybatis
的@Select
注解,@Query
可以编写增删改查SQL
,如果需要执行写操作,需要配合@Modifying
注解使用。例如
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Modifying
@Query("delete from message where id = :msgId")
Mono<Integer> deleteByMsgId(Long msgId);
}
- 使用
spring-data-r2dbc
实现复杂查询
对于复杂的查询,我们也可以直接使用spring-data-r2dbc
的API
实现。例如:
DatabaseClient
类似Mybatis
中的SqlSession
概念。
关于spring-data-r2dbc
的使用,推荐阅读spring
官方文档,虽然是英文,但阅读起来并不难理解,想要学习冷门技术,就必须要啃英文文档,因为你会发现,这方面的博客文章少之又少,还避免不了一些博客文章使用的spring-data-r2dbc
版本与自己使用的版本不同存在API
差异导致“copy
”的代码画红线问题。
spring-data-r2dbc 1.1.0
版本官方文档链接:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference
,也可到spring.io
官网搜索。