响应式API的设计、实现和应用
。
再看一遍前面的例子;保证第一个getPage()调用在针对每个附加页面的后续调用之前发生。此外,由于后续对getPage()的调用是在.Flatmapmany()中的,所以由框架负责优化多线程执行,并将结果汇到一起返回,传播可能发生的任何错误。
条件逻辑
与命令式编程不同,在响应式编程中错误是作为一种值来考虑的。这意味着它们是通过流操作来传递的。这些错误可以通过所有方式传递给消费者,或者流可以基于它们改变行为。这种行为变化可以表现为错误的转换或基于错误产生新的结果。
public Mono<AppStatsResponse> getApplication(GetAppRequest request) { return client.applications() .statistics(AppStatsRequest.builder() .applicationId(request.id()) .build()) .onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR), t -> Mono.just(AppStatsResponse.builder().build())); }
在本例中,我们要求为正在运行的应用程序获取统计信息。如果一切正常,响应就会传回给消费者。但是,如果接收到一个错误(带有特定的状态代码),则返回一个空响应。使用者永远不会看到错误和执行过程中的默认值,就好像从来没有发出过错误信号一样。
如前所述,一个流完成时未发送任何条目也是有效的。通常,这就相当于返回null(其中void返回类型是一种特殊情况)。像以上这种出错的情况一样,没有任何条目的完成结果可以一直传递给消费者,或者流可以基于它们改变行为。
public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) { return requestPrivateDomains(request.getId()) .switchIfEmpty(requestSharedDomains(request.getId())); }
在本例中,getDomains()返回一个域,该域可以位于两个不同的桶中。首先搜索私有域,如果成功完成,即使没有结果,也会搜索共享域。
public Mono<String> getDomainId(GetDomainIdRequest request) { return getPrivateDomainId(request.getName()) .switchIfEmpty(getSharedDomainId(request.getName())) .switchIfEmpty(ExceptionUtils.illegalState( "Domain %s not found", request.getName())); }
也可以用无条目表示一种错误条件。在这个示例中,如果没有找到私有或共享域,就会生成一个新的IllegalStateException并传递给使用者。
然而有时,你希望根据无错误或空来做决策,但不是根据值本身。虽然可以使用操作符来实现这个逻辑,但人们常常发现,其复杂度要远远高于其价值。在本例中,你应该只使用命令式条件语句。
public Mono<String> getDomainId(String domain, String organizationId) { return Mono.just(domain) .filter(d -> d == null) .then(getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() // select first returned .switchIfEmpty(ExceptionUtils.illegalState("Domain not found"))) .switchIfEmpty(getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain))); }
这个示例返回给定的组织(一个分级容器)中给定域名的id。这里有两个分支:如果域为空,则返回组织范围内第一个共享域或私有域的id。如果域不为空,则搜索显式的域名,并返回它的id。如果你觉得这段代码令人迷惑难懂,不要绝望,我们也一样!
public Mono<String> getDomainId(String domain, String organizationId) { if (domain == null) { return getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() .switchIfEmpty(ExceptionUtils.illegalState("Domain not found")); } else { return getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain)); } }
这个示例效果一样,但使用的是命令式条件语句。但更容易理解得多了,你觉得呢?
测试
实际上,大多数有用的流都是异步的。这在测试中是有问题的,因为测试框架往往都是同步的,注册是通过了还是失败了,在异步结果返回之前就应该有结果了。为了弥补这一点,你必须阻塞主线程,直到返回结果,然后将这些结果发至断言的主线程中。
@Test public void noLatch() { Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(s -> assertEquals("bravo", s)); }
这个示例在非主线程上发出一个字符串,出人意料地是,通过了测试。这个测试通过的根本原因,就是当它显然不应该通过的时候,noLatch方法将会完成执行,而没有抛出一个AssertionError。
@Test public void latch() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> actual = new AtomicReference<>(); Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(actual::set, t -> latch.countDown(), latch::countDown); latch.await(); assertEquals("bravo", actual.get()); }
这个例子,它使用一个CountDownLatch来确保latch()方法在流完成之后才返回,虽然不可否认它很笨拙。一旦latch 释放,主线程中的断言就会抛出一个AssertionError,导致测试失败。
如果你看了这些代码,拒绝以这种方式实现你所有的测试,大家一定会体谅你的,我们保证。幸运的是,Reactor 提供了一个StepVerifier类来辅助测试。
对响应式设计的测试需要的不仅仅是阻塞。你通常需要对多个值和预期错误进行断言,同时确保意外错误会导致测试失败。StepVerifier对每一项都有所考虑。
@Test public void testMultipleValues() { Flux.just("alpha", "bravo") .as(StepVerifier::create) .expectNext("alpha") .expectNext("bravo") .expectComplete() .verify(Duration.ofSeconds(5)); }
在这个示例中,使用StepVerifier来预期精确发出了alpha和bravo,然后流完成。如果其中一个没有发出,发出了一个额外的元素,或者产生一个错误,测试就会失败。
@Test public void shareFails() { this.domains .share(ShareDomainRequest.builder() .domain("test-domain") .organization("test-organization") .build()) .as(StepVerifier::create) .consumeErrorWith(t -> assertThat(t) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Private domain test-domain does not exist")) .verify(Duration.ofSeconds(5)); }
这个例子使用了一些更高级的StepVerifier特性,并不仅断言已经发出了一个错误信号,而且它还是一个IllegalArgumentException,并且消息匹配预期结果。
CountDownLatches
关于响应式框架的一个关键问题是,它们只能协调自己的操作和线程模型。许多响应式编程的执行环境将不仅仅只有一个线程(例如Servlet容器)。在这些环境中,响应式编程天然的异步属性并不是问题。但是,有一些环境,比如上面的测试示例,那里的进程将在任何单独的线程之前结束。
public static void main(String[] args) { Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println); }
就像该测试方法一样,这个main()方法将在alpha发出之前终止。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println, t -> latch.countDown(), latch::countDown); latch.await(); }
就像在该测试示例中一样,一个CountDownLatch可以确保主线程在流终止之前不会终止,不管它是在什么线程上执行的。
阻塞流
在可预见的将来,在响应式编程中与阻塞api交互会成为一种常见现象。为了在两者之间架起桥梁,在等待结果的时候会适当地进行阻塞。但是,当以这种方式连接到阻塞API时,会丢失响应式编程的一些好处,比如有效的资源使用。因此,你将希望尽可能长地保持代码的响应性,直到最后一刻才阻塞。同样值得注意的是,这个想法的逻辑总结一下就是,一个响应式的API可以被阻塞,但是一个阻塞的API永远不能成为响应式。
Mono<User> requestUser(String name) {...} User getUser(String name) { return requestUser(name) .block(); }
在这个例子中,.block()用于桥接Mono的结果到必须的返回类型。
Flux<User> requestUsers() {...} List<User> listUsers() { return requestUsers() .collectList() .block(); }
和前面的例子一样,.block()用于将结果桥接到必须的返回类型,但在此之前,流必须被收集到一个列表中。
错误处理
如前所述,错误是流经系统的值。这意味着一直都没有一个合适的点来捕获异常。但是,你应该将它们作为流的一部分处理,或者作为订阅者。.Subscribe()方法有0到3个参数,这些参数允许你处理每个条目,如果错误成了就对它进行处理,并对流的完成情况进行处理。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flux.concat(Mono.just("alpha"), Mono.error(new

责任编辑:售电衡衡
-
权威发布 | 新能源汽车产业顶层设计落地:鼓励“光储充放”,有序推进氢燃料供给体系建设
2020-11-03新能源,汽车,产业,设计 -
中国自主研制的“人造太阳”重力支撑设备正式启运
2020-09-14核聚变,ITER,核电 -
探索 | 既耗能又可供能的数据中心 打造融合型综合能源系统
2020-06-16综合能源服务,新能源消纳,能源互联网
-
新基建助推 数据中心建设将迎爆发期
2020-06-16数据中心,能源互联网,电力新基建 -
泛在电力物联网建设下看电网企业数据变现之路
2019-11-12泛在电力物联网 -
泛在电力物联网建设典型实践案例
2019-10-15泛在电力物联网案例
-
权威发布 | 新能源汽车产业顶层设计落地:鼓励“光储充放”,有序推进氢燃料供给体系建设
2020-11-03新能源,汽车,产业,设计 -
中国自主研制的“人造太阳”重力支撑设备正式启运
2020-09-14核聚变,ITER,核电 -
能源革命和电改政策红利将长期助力储能行业发展
-
探索 | 既耗能又可供能的数据中心 打造融合型综合能源系统
2020-06-16综合能源服务,新能源消纳,能源互联网 -
5G新基建助力智能电网发展
2020-06-125G,智能电网,配电网 -
从智能电网到智能城市