响应式API的设计、实现和应用

2018-03-20 17:01:48 InfoQ  点击量: 评论 (0)
这篇文章来自于SpringOne的一个演讲。在过去的几年里,Java世界中在大力推动响应式编程的。无论是NodeJS开发人员使用非阻塞api的成功,还是

再看一遍前面的例子;保证第一个getPage()调用在针对每个附加页面的后续调用之前发生。此外,由于后续对getPage()的调用是在.Flatmapmany()中的,所以由框架负责优化多线程执行,并将结果汇到一起返回,传播可能发生的任何错误。

条件逻辑

与命令式编程不同,在响应式编程中错误是作为一种值来考虑的。这意味着它们是通过流操作来传递的。这些错误可以通过所有方式传递给消费者,或者流可以基于它们改变行为。这种行为变化可以表现为错误的转换或基于错误产生新的结果。

public Mono<AppStatsResponse> getApplication(GetAppRequest request) {
  return client.applications()
    .statistics(AppStatsRequest.builder()
      .applicationId(request.id())
      .build())
    .onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR),
      t -> Mono.just(AppStatsResponse.builder().build()));
}

在本例中,我们要求为正在运行的应用程序获取统计信息。如果一切正常,响应就会传回给消费者。但是,如果接收到一个错误(带有特定的状态代码),则返回一个空响应。使用者永远不会看到错误和执行过程中的默认值,就好像从来没有发出过错误信号一样。

如前所述,一个流完成时未发送任何条目也是有效的。通常,这就相当于返回null(其中void返回类型是一种特殊情况)。像以上这种出错的情况一样,没有任何条目的完成结果可以一直传递给消费者,或者流可以基于它们改变行为。

public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) {
  return requestPrivateDomains(request.getId())
    .switchIfEmpty(requestSharedDomains(request.getId()));
}

在本例中,getDomains()返回一个域,该域可以位于两个不同的桶中。首先搜索私有域,如果成功完成,即使没有结果,也会搜索共享域。

public Mono<String> getDomainId(GetDomainIdRequest request) {
  return getPrivateDomainId(request.getName())
    .switchIfEmpty(getSharedDomainId(request.getName()))
    .switchIfEmpty(ExceptionUtils.illegalState(
      "Domain %s not found", request.getName()));
}

也可以用无条目表示一种错误条件。在这个示例中,如果没有找到私有或共享域,就会生成一个新的IllegalStateException并传递给使用者。

然而有时,你希望根据无错误或空来做决策,但不是根据值本身。虽然可以使用操作符来实现这个逻辑,但人们常常发现,其复杂度要远远高于其价值。在本例中,你应该只使用命令式条件语句。

public Mono<String> getDomainId(String domain, String organizationId) {
  return Mono.just(domain)
    .filter(d -> d == null)
    .then(getSharedDomainIds()
      .switchIfEmpty(getPrivateDomainIds(organizationId))
      .next()  // select first returned
      .switchIfEmpty(ExceptionUtils.illegalState("Domain not found")))
    .switchIfEmpty(getPrivateDomainId(domain, organizationId)
      .switchIfEmpty(getSharedDomainId(domain))
      .switchIfEmpty(
          ExceptionUtils.illegalState("Domain %s not found", domain)));
}

这个示例返回给定的组织(一个分级容器)中给定域名的id。这里有两个分支:如果域为空,则返回组织范围内第一个共享域或私有域的id。如果域不为空,则搜索显式的域名,并返回它的id。如果你觉得这段代码令人迷惑难懂,不要绝望,我们也一样!

public Mono<String> getDomainId(String domain, String organizationId) {
  if (domain == null) {
    return getSharedDomainIds()
      .switchIfEmpty(getPrivateDomainIds(organizationId))
      .next()
      .switchIfEmpty(ExceptionUtils.illegalState("Domain not found"));
  } else {
    return getPrivateDomainId(domain, organizationId)
      .switchIfEmpty(getSharedDomainId(domain))
      .switchIfEmpty(
          ExceptionUtils.illegalState("Domain %s not found", domain));
    }
}

这个示例效果一样,但使用的是命令式条件语句。但更容易理解得多了,你觉得呢?

测试

实际上,大多数有用的流都是异步的。这在测试中是有问题的,因为测试框架往往都是同步的,注册是通过了还是失败了,在异步结果返回之前就应该有结果了。为了弥补这一点,你必须阻塞主线程,直到返回结果,然后将这些结果发至断言的主线程中。

@Test
public void noLatch() {
  Mono.just("alpha")
    .subscribeOn(Schedulers.single())
    .subscribe(s -> assertEquals("bravo", s));
}

这个示例在非主线程上发出一个字符串,出人意料地是,通过了测试。这个测试通过的根本原因,就是当它显然不应该通过的时候,noLatch方法将会完成执行,而没有抛出一个AssertionError。

@Test
public void latch() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  AtomicReference<String> actual = new AtomicReference<>();

  Mono.just("alpha")
    .subscribeOn(Schedulers.single())
    .subscribe(actual::set, t -> latch.countDown(), latch::countDown);

  latch.await();
  assertEquals("bravo", actual.get());
}

这个例子,它使用一个CountDownLatch来确保latch()方法在流完成之后才返回,虽然不可否认它很笨拙。一旦latch 释放,主线程中的断言就会抛出一个AssertionError,导致测试失败。

如果你看了这些代码,拒绝以这种方式实现你所有的测试,大家一定会体谅你的,我们保证。幸运的是,Reactor 提供了一个StepVerifier类来辅助测试。

对响应式设计的测试需要的不仅仅是阻塞。你通常需要对多个值和预期错误进行断言,同时确保意外错误会导致测试失败。StepVerifier对每一项都有所考虑。

@Test
public void testMultipleValues() {
  Flux.just("alpha", "bravo")
    .as(StepVerifier::create)
    .expectNext("alpha")
    .expectNext("bravo")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

在这个示例中,使用StepVerifier来预期精确发出了alpha和bravo,然后流完成。如果其中一个没有发出,发出了一个额外的元素,或者产生一个错误,测试就会失败。

@Test
public void shareFails() {
  this.domains
    .share(ShareDomainRequest.builder()
      .domain("test-domain")
      .organization("test-organization")
      .build())
    .as(StepVerifier::create)
    .consumeErrorWith(t -> assertThat(t)
      .isInstanceOf(IllegalArgumentException.class)
      .hasMessage("Private domain test-domain does not exist"))
    .verify(Duration.ofSeconds(5));
}

这个例子使用了一些更高级的StepVerifier特性,并不仅断言已经发出了一个错误信号,而且它还是一个IllegalArgumentException,并且消息匹配预期结果。

CountDownLatches

关于响应式框架的一个关键问题是,它们只能协调自己的操作和线程模型。许多响应式编程的执行环境将不仅仅只有一个线程(例如Servlet容器)。在这些环境中,响应式编程天然的异步属性并不是问题。但是,有一些环境,比如上面的测试示例,那里的进程将在任何单独的线程之前结束。

public static void main(String[] args) {
  Mono.just("alpha")
    .delaySubscription(Duration.ofSeconds(1))
    .subscribeOn(Schedulers.single())
    .subscribe(System.out::println);
}

就像该测试方法一样,这个main()方法将在alpha发出之前终止。

public static void main(String[] args) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);

  Mono.just("alpha")
    .delaySubscription(Duration.ofSeconds(1))
    .subscribeOn(Schedulers.single())
    .subscribe(System.out::println, t -> latch.countDown(),
               latch::countDown);

    latch.await();
}

就像在该测试示例中一样,一个CountDownLatch可以确保主线程在流终止之前不会终止,不管它是在什么线程上执行的。

阻塞流

在可预见的将来,在响应式编程中与阻塞api交互会成为一种常见现象。为了在两者之间架起桥梁,在等待结果的时候会适当地进行阻塞。但是,当以这种方式连接到阻塞API时,会丢失响应式编程的一些好处,比如有效的资源使用。因此,你将希望尽可能长地保持代码的响应性,直到最后一刻才阻塞。同样值得注意的是,这个想法的逻辑总结一下就是,一个响应式的API可以被阻塞,但是一个阻塞的API永远不能成为响应式。

Mono<User> requestUser(String name) {...}

User getUser(String name) {
  return requestUser(name)
    .block();
}

在这个例子中,.block()用于桥接Mono的结果到必须的返回类型。

Flux<User> requestUsers() {...}

List<User> listUsers() {
  return requestUsers()
    .collectList()
    .block();
}

和前面的例子一样,.block()用于将结果桥接到必须的返回类型,但在此之前,流必须被收集到一个列表中。

错误处理

如前所述,错误是流经系统的值。这意味着一直都没有一个合适的点来捕获异常。但是,你应该将它们作为流的一部分处理,或者作为订阅者。.Subscribe()方法有0到3个参数,这些参数允许你处理每个条目,如果错误成了就对它进行处理,并对流的完成情况进行处理。

public static void main(String[] args) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);

  Flux.concat(Mono.just("alpha"), Mono.error(new                                 	
大云网官方微信售电那点事儿

责任编辑:售电衡衡

免责声明:本文仅代表作者个人观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
我要收藏
个赞