
本篇文章写于2020年11月02日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
R2DBC基于Reactive Streams反应流规范,它是一个开放的规范,为驱动程序供应商和使用方提供接口(r2dbc-spi),与JDBC的阻塞特性不同,它提供了完全反应式的非阻塞API与关系型数据库交互。
简单说,R2DBC项目是支持使用反应式编程API访问关系型数据库的桥梁,定义统一接口规范,不同数据库厂家通过实现该规范提供驱动程序包。
R2DBC定义了所有数据存储驱动程序必须实现的SPI,目前实现R2DBC SPI的驱动程序包括:
r2dbc-h2:为H2实现的驱动程序;r2dbc mariadb:为Mariadb实现的驱动程序;r2dbc mssql:为Microsoft SQL Server实现的本机驱动程序;r2dbc mysql:为Mysql实现的驱动程序;r2dbc postgres:为PostgreSQL实现的驱动程序;
同时,r2dbc还提供反应式连接池r2dbc-pool(https://github.com/r2dbc/r2dbc-pool)。
本篇内容:
- 使用r2dbc-mysql驱动程序包与mysql数据库建立连接
- 使用r2dbc-pool获取数据库连接
- Spring-Data-R2DBC增删改查API
- 事务的使用
- R2DBC Repository
使用r2dbc-mysql驱动程序包与mysql数据库建立连接
添加r2dbc-mysql依赖:
<!-- r2dbc mysql-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
r2dbc-mysql实现了r2dbc的ConnectionFactory SPI接口。
首先创建连接工厂,再通过连接工厂获取到数据库连接。
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "mysql")
.option(HOST, "127.0.0.1")
.option(USER, "root")
.option(PORT, 3306)
.option(PASSWORD, "")
.option(DATABASE, "r2dbc_stu")
.option(CONNECT_TIMEOUT, Duration.ofSeconds(3))
.build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
从连接工厂获取一个数据库连接:
Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
使用连接执行一条sql:
Mono.from(connectionPublisher)
.flatMapMany(conn -> conn.createStatement(
"insert into person (id,name,age) values ('1111','wujiuye',25)")
.execute())
.flatMap(Result::getRowsUpdated)
.switchIfEmpty(Mono.just(0))
.onErrorResume(throwable -> {
throwable.printStackTrace();
return Mono.empty();
})
.subscribe(System.out::println);
flatMapMany(conn -> conn.createStatement("sql").execute()):创建Statement执行sql;flatMap(Result::getRowsUpdated):获取sql执行影响的行数(select语句没有该结果);switchIfEmpty:如果insert更新的行数为0则会被执行;onErrorResume:处理connection连接异常、sql语句执行异常;
使用r2dbc-pool获取数据库连接
添加r2dbc-pool依赖
<dependencies>
<!-- r2dbc mysql -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- r2dbc-pool -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
</dependencies>
根据ConnectionFactory创建连接池(ConnectionPool):
ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
ConnectionPool pool = new ConnectionPool(configuration);
ConnectionPoolConfiguration:连接池配置对象,指定连接池的大小、连接的最大空闲时间等;ConnectionPool:连接池,也是ConnectionFactory接口的实现类;
使用连接池创建连接:
Mono<Connection> connectionMono = pool.create();
// 将连接释放回连接池
connectionMono.flatMapMany(Connection::close).subscribe();
// 销毁连接池
pool.dispose();
r2dbc并没有定义连接池的接口,而r2dbc-pool通过实现ConnectionFactory接口巧妙的接管连接的创建,管理连接的生命周期。
在使用spring-data-r2dbc时,我们只需要将注册到bean工厂的ConnectionFactory替换为ConnectionPool即可使用连接池,例如:
@Bean
public ConnectionFactory connectionFactory(){
ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
// ConnectionPool实现了ConnectionFactory接口,使用ConnectionFactory替换ConnectionFactory
return new ConnectionPool(configuration);
}
不过我们不需要自己配置ConnectionFactory,spring-data-r2dbc自动帮我们配置好了。
所以我们使用spring-data-r2dbc时,像下面这样使用DatabaseClient执行sql,最终也是从连接池获取连接执行:
public class xxx{
@Resource
private ConnectionFactory connectionFactory;
@Test
public void test(){
DatabaseClient client = DatabaseClient.create(connectionFactory);
// .......
}
}
在获取连接的地方下断点验证:

Spring-Data-R2DBC增删改查API
使用spring-data-r2dbc可直接通过依赖它的starter,依赖starter会将所需的jar包也都导入到项目中:
<dependencies>
<!-- r2dbc mysql 库-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- 同时也会将r2dbc-pool导入 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
</dependencies>
在application.yml配置文件中添加配置:
### r2dbc
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/r2dbc_stu?useUnicode=true&characterEncoding=UTF-8
username: root
password:
pool:
enabled: true
max-size: 10
initial-size: 10
validation-query: select 1
DatabaseClient
DatabaseClient是Spring Data R2DBC提供的具有功能性的反应式非阻塞API,用于与数据库交互。
DatabaseClient封装了资源的处理,例如打开和关闭连接,让我们可以更方便的执行增删改查SQL,而不必关心要释放连接。
DatabaseClient由spirng-data-r2dbc的org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration类完成,我们也可以继承AbstractR2dbcConfiguration类,替换一些默认配置。
@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
@Override
@Bean // 这个注解不能少
public ConnectionFactory connectionFactory() {
// ....
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
insert api
示例:往person表插入一条记录
client.insert()
.into("person")
.value("id", "123")
.value("name", "wujiuye")
.nullValue("age", Integer.class)
.fetch()
.rowsUpdated()
.subscribe(System.out::println);
into:在xx表中插入记录;value:为新记录的某个字段(表的字段名)赋值;nullValue:为字段赋空值;fetch:执行sql并取得响应;rowsUpdated:获取更新的记录数,在此为插入记录数;subscribe:订阅,触发流执行并消费最终结果。
多条sql组合执行:
Mono<Void> insert1 = client.insert().into("person")
.value("id", "12345")
.value("name", "wujiuye")
.nullValue("age", Integer.class)
.then();
Mono<Void> insert2 = client.insert().into("person")
.value("id","123445555555")
.then();
insert1.then(insert2).subscribe();
then:不消费任何结果,该方法返回一个Mono<Void>,用于衔接下一步,但它不会将上一步的结果传递给下一步;subscribe():订阅,只触发流执行,不关心结果;
update api
示例:更新person表的id为12345的记录:
client.update().table("person")
.using(Update.update("name", "1111").set("age", 18))
.matching(Criteria.where("id").is("12345"))
.fetch()
.rowsUpdated()
.subscribe(rows -> System.out.println("更新记录数:" + rows));
using:接收一个Update对象,Update决定更新哪些字段;matching:接收一个Criteria对象,设置匹配条件,即sql的where部分;
delete api
示例:删除person表的name为1111且age为18的记录:
client.delete().from("person")
.matching(Criteria.where("name").is("1111").and("age").is(18))
.fetch()
.rowsUpdated()
.subscribe(rows -> System.out.println("删除的记录总数为:" + rows));
select api
示例:查询person表name为null的记录:
Flux<Person> list = client.select().from("person")
.matching(Criteria.where("name").isNull())
.as(Person.class)
.fetch()
.all();
list.subscribe(System.out::println);
all:获取所有结果;as:将结果映射为Person实例;
事务的使用
使用关系数据库时的一个常见模式是将多个查询分组到由事务保护的工作单元中,关系数据库通常将事务与单个连接关联。因此,使用不同的连接会导致使用不同的事务。
spring-data-r2dbc在DatabaseClient中包含事务感知,允许使用Spring的事务管理在同一事务中对多个语句进行分组。
示例:使用then连接多条sql,只要有一条sql执行失败就回滚事务:
ReactiveTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);
Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe") //绑定参数
.bind("name", "Joe")
.bind("age", 34)
.fetch().rowsUpdated()
.then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.bind("id", "joe")
.bind("name", "Joe")
.fetch().rowsUpdated())
.then();
// 将atomicOperation放到事务中执行
operator.transactional(atomicOperation).subscribe();
ReactiveTransactionManager:响应式编程事务管理器,R2dbcTransactionManager是spring-data-r2dbc提供的ReactiveTransactionManager的一个实现;
spring-data-r2dbc也支持基于注解的声明式事务。spring-data-r2dbc自动配置了ReactiveTransactionManager与@EnableTransactionManagement启动声明式事务管理,因此可直接使用,而不要添加额外配置。例如:
@Component
public class TxService {
@Resource
private DatabaseClient client;
@Transactional(rollbackFor = Throwable.class)
public Mono<Void> txInsert() {
Person person = new Person();
person.setId('12123');
return client.insert().into(Person.class)
.using(person)
.fetch().rowsUpdated()
.then(client.insert().into(Person.class)
.using(person)
.fetch().rowsUpdated()
.then());
}
}
// 测试
public class R2dbcTxTest extends SupporSpringBoot {
@Resource
private TxService txService;
@Test
public void testTx() {
txService.txInsert().doOnError(throwable -> {
System.out.println("执行失败");
throwable.printStackTrace();
}).subscribe();
}
}
R2DBC Repository
Spring-data-r2dbc也实现了spring data repository的反应式API。
现在,我们通过一个示例,学习如何使用声明式事务包装一系列调用Repository方法的操作。
1、声明持久化对象PO:
@Table("person")
public static class Person {
@Id
private String id;
private String name;
private int age;
}
2、创建Dao(为了与领域驱动设计中的Repository做区分),继承R2dbcRepository接口:
public interface PersonDao extends R2dbcRepository<R2dbcStuMain.Person, String> {
@Modifying
@Query("insert into person (id,name,age) values(:id,:name,:age)")
Mono<Integer> insertPerson(String id, String name, Integer age);
}
3、创建Service,在Service中创建一个事务方法,链接多次调用PersonDao的insertPerson方法。
@Service
public class PersonService {
@Resource
private PersonDao personDao;
@Transactional(rollbackFor = Throwable.class)
public Mono<Integer> addPerson(Person... persons) {
Mono<Integer> txOp = null;
for (Person person : persons) {
if (txOp == null) {
txOp = personDao.insertPerson(person.getId(), person.getName(), person.getAge());
} else {
txOp = txOp.then(personDao.insertPerson(person.getId(), person.getName(), person.getAge()));
}
}
return txOp;
}
}
测试事务是否生效:
@SpringBootApplication
@EnableR2dbcRepositories
public class R2dbcApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(R2dbcApplication.class);
PersonService personService = context.getBean(PersonService.class);
Person person = new Person();
person.setId("12347");
person.setName("wjy");
person.setAge(25);
// 测试事务方法,验证主键重复时是否还有数据插入成功
personService.addPerson(person, person)
.doOnError(Throwable::printStackTrace)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
参考文献
Spring Data R2DBC官方文档:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/htmlr2dbc.io:http://r2dbc.io/