Reactor 3一瞥

文章的 reactor 是java的reactor 不是js的reactor。

本篇内容,主要写的是,使用 reactor 的一些感受和要点。大体会说一下,写 reactor 是什么感觉、理念与实操的感觉和一些比较细节的点。因为时间一长,感觉会被遗忘,还是记下来更合适一些,供其他使用者参考。

除此之外,还会涉及一些spring webfluxspring bootspring cloud gateway。这里我就假设读者知道reactor最基础的内容,不说最基础的部分了。

我们公司是因为要用 gateway 才会接触这些东西。要不然自己引个 reactor 的包还不知道怎么玩呢。

MonoFlux和使用感觉

这两个对象是实际操作中,最常接触的东西。构造出来的MonoFlux对象等价于一个马上要执行的代码块。这句话很重要。

多数开篇教程会这样教你,给你一个迭代器。给你一个flux作为对比。然后说明next()方法是谁执行的。

换个例子可能更容易理解一些。正常的线性流程是这样的,我去找警察办事,我把事情说清楚,警察就去办事了,这时我正在等着,等警察办完了,会跟我说,然后我去做其他事情。而reactor不是这样的。reactor的感觉是,警察突然打电话找我,让我过来把xx事情结果拿一下。这也就是所谓的非阻塞 non-blocking

因为这种完全不同的思考方式,产生了一些常规方式难以理解的抽象与模型,但是这些模型对于实现是有帮助的。为了实现这种,我叫你,你再动的事情 ,就有了 publishersubscriber 这样的接口。在实际使用的过程中,使用者操作都是 publisher,即程序员只需要想办法生产出恰当的内容。致于后续如何 subscribe 就是框架做的事情了。换另一种方式讲,我们只需要把对应的代码块生产出来,后续自然会执行内部的东西。

想象与现实

初衷确实是很好,但是写起来会面临很多问题。下面我会列举一下我觉得比较麻烦的点。

可读性

你以为代码写出来就和网上显示的这些代码块差不多么?下面这段代码是我从这里copy过来的一个看起来比较 像真实 的例子。


@RequestMapping(value = "heyMister", method = RequestMethod.POST)
@ResponseBody
public Flux<String> hey(@RequestBody Mono<Sir> body) {
    return Mono.just("Hey mister ")
        .concatWith(body
                .flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
                .map(String::toUpperCase)
                .take(1)
        ).concatWith(Mono.just(". how are you?"));
}

实际情况总不像上面想象的一样。下面这段是我在公司写的代码,包含了实际的业务代码,我这里就不打码了,反正一般人也看不懂。


    private Mono<TimerConfig<TimerJob>> upsertTimerConfig(
        final String tenantId,
        final String appInstanceId,
        final TimerConfig<TimerJob> config,
        final boolean updateTimerJob
    ) throws NotFoundException, ConflictException, BadArgumentException, GrpcException {
        final String fakeNameAndCode = String.valueOf(System.currentTimeMillis());
        Map<String, ConnectionConfig> connectionConfigMap = new HashMap<>();
        for (String ccId : config.getConnectionConfigIds()) {
            connectionConfigMap.put(ccId, connectionGrpcServiceClient.configRead(ccId, tenantId));
        }
        return workflowApi.getWorkflow(tenantId, appInstanceId)
            .flatMap(wf -> {
                if (wf.getId() == null) {
                    WorkFlow newWorkflow = WorkFlow.newInstance()
                        .setCode(appInstanceId)
                        .setName(fakeNameAndCode)
                        .setConfig(WorkFlow.WorkFlowConfiguration.newInstance().setTasks(listOf()))
                        .setStatus(ACTIVE)
                        .setTenantId(tenantId);
                    return workflowApi.createWorkflow(tenantId, newWorkflow);
                } else {
                    return Mono.just(wf);
                }
            })
            .flatMap(wf -> workflowApi.getTaskId(tenantId)
                .flatMap(taskId -> {
                    final List<Mono<String>> newWtIds = new ArrayList<>();
                    return getConnectionConfigIds(
                        tenantId,
                        wf.getConfig().getTasks()
                    ).flatMap(ccIdMap -> {
                        for (String ccIdFromInput : config.getConnectionConfigIds()) {
                            if (!ccIdMap.contains(ccIdFromInput)) {
                                newWtIds.add(
                                    workflowApi.createTaskInstance(
                                        tenantId,
                                        connectionConfigMap.get(ccIdFromInput),
                                        taskId
                                    ).map(TaskInstance::getId)
                                );
                            } else {
                                newWtIds.add(Mono.just(wf.getConfig().getTasks().get(ccIdMap.indexOf(ccIdFromInput))));
                            }
                        }
                        return Flux.fromIterable(newWtIds).flatMap(x -> x).collectList();
                    });
                })
                .map(tiIds -> wf.setConfig(WorkFlow.WorkFlowConfiguration.newInstance().setTasks(tiIds)))
                .flatMap(wff -> workflowApi.updateWorkflow(tenantId, wff, wff.getId()))
            )
            .flatMap(wf -> {
                if (updateTimerJob) {
                    workflowApi.getTimerJob(tenantId, wf.getId())
                        .flatMap(tj -> {
                            if (tj.getId() == null) {
                                config.getTimerJob()
                                    .setFlowId(wf.getId())
                                    .setCode(fakeNameAndCode)
                                    .setName(fakeNameAndCode)
                                    .setConfig(mapOf());
                                return workflowApi.createTimerJob(tenantId, config.getTimerJob())
                                    .map(config::setTimerJob);
                            } else {
                                return workflowApi.updateTimerJob(tenantId, config.getTimerJob(), tj.getId())
                                    .map(config::setTimerJob);
                            }
                        });
                }
                return Mono.just(wf);
            }).map(wf -> config);
    }

其实,我想展示的是,实际的情况要远比想象中的风骚的多。代码的可读性很低,之后的维护成本和人员可替代性都很堪忧。

调试

在实际写的时候,你以为啪啪啪一路点下去就能实现对应的功能?并不是的,实际情况是代码经常出错的。实际出错是什么样子呢?(本来想贴一个自己项目中的错误,太麻烦了,贴了一个reactor官网的)

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
	at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
	at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)

报错的内容中,不会出现我们写的代码,因为实际执行代码块的过程和书写顺序是不一致的。打了个断点,一步一步走下去,你会发现相关的代码块提交到了框架中,然后等待 subscriber去触发,真正的代码块开始执行,这才是实际的业务。然而在这种情况下,debug非常困难。更麻烦的是有的时候是框架的bug,而不是你代码的问题。调试的时候抓耳挠腮,不知道怎么搞。

其实最核心的点是:你写过的东西,并不是在跑代码的时候就会执行,真正的执行是在 subscribe 的时候进行的。如果这个理解不了,就会经常出写出bug。

当然了reactor官方也意识到了这个问题,他们提供了一些东西,能让你从堆栈中看出个所以然。不过杯水车薪罢了。

对框架的影响

spring MVC spring webflux 完全就是2个生态体系。后者由于大量使用 reactor 内容,操作逻辑会有一些改变,相关的对象必须要改成 monoflux

但是呢,你能在spring源码中翻到很多类似命名的代码。其中有一部分是原来 spring MVC 的,另一部分是 reactor 专供。但麻烦的是,大部分类换汤不换药,只是把关键对象换个名字,执行逻辑基本就没啥变化,又不得不再写一遍。如 HandlerMethodArgumentResolver 这个接口出现了2遍,意味着所有实现这个接口的方法都要写2套。

总的来说,如果想要上 reactor 这条船,很多东西都要改,大量的旧代码难以直接使用。这点对 reactor 的推广简直就是灾难性的问题,就像如果有语言要抢占c语言地位,必要支持调用c lib,要不然根本就不会有人用。reactor 一开始就与普通代码兼容不好。

实际上,我们还碰到了一个问题就是,我们需要的一个小东西, reactor 版本还没有实现。最后只得自己扒 spring MVC 代码,抄了一个。

业务

从普通的线性编程到 reactor 的方式编程,在很多业务场景下是没有必要的。因为业务设计的时候根本就没有异步的思路,用 reactor 的方式写只是画蛇添足。

初期用 reactor 改写业务的实际感受就是难。本来按照线性写挺简单的代码。放到 reactor 中就怎么都不好使,调试又不方便。

书写感受

有一部分代码写起来基本一样,用 reactor 方式写和普通方式写基本没区别。说是基本一样,实际上还是有点小差别的。很多时候经常要反转一下调用的主谓,如果不反转的话,没办法按照 reactor 的方式一路写下去。下面就是一个例子。

//reactor
return upsertTimerConfig(tenantId, ai, timerConfig, false)
        .then(getWorkflow(tenantId, ai))
        .flatMap(wf -> execWorkflow(tenantId, wf.getId()));
// 普通做法
upsertTimerConfig(tenantId, ai, timerConfig, taskCode, workflowHost, false);
return execWorkflow(tenantId, getWorkflow(tenantId, appInstanceId, workflowHost).getId(), workflowHost);

有的时候不得不一直用map、flatmap,即便我先做的事情不是map,因为我不知道还可以用什么,不知道怎么才能把reactor写的既美观又正确。

在实际的处理过程中,更困难的是既要面对业务逻辑,又要面对同步和异步的内容,多个问题混在一起就很麻烦了。

还有一个点是在gateway中,基本上没法直接把mono之类的代码转换成 blocking 的形式。尝试过stof中给的几个方法都会爆出问题,最后还是老老实实把所有代码改成了异步的。

我们为什么用gateway

其实我们用 gateway 之前是不会 reactor 的。也不知道换了会有这么多问题。当然我们选择 gateway 也不是没有理由的。

  1. 这个服务完全解决了我们当前的业务需求,请求转发和聚合。
  2. spring 生态下的东西,用起来方便一些,因为新项目是spring全家桶,很多东西能复用。

总结

  1. 相比线性编程,学习成本偏高。
  2. 熟悉后,会不会节省编码时间不清楚。
  3. non-blocking 是否更高效不清楚。
  4. 理论上确实更先进,但是使用场景有限。业务场景完全实现 non-blocking 比较困难。(需要基础搭建比较多)
  5. 业务场景下,维护更困难。因为语法飘逸。

其实在gateway中是可以完全按照同步的方式写代码的,写 reactor 的原因,就像是入乡随俗。既然都用webflux了就按照它的方式写。还能顺便测试下实际的感受。

我本身是知道non-blockingblocking 都有对应的使用场景的。并不是说,non-blocking绝对好,blocking绝对差。他们都有各自使用场景的。硬要说的话,就要看哪部分资源浪费的多,如果io等待的时间多那肯定 non-blocking。当然如果没啥影响,blocking就够了。

如果说一个普通(员工水平平庸)的公司,还是不建议使用任何 reactor 相关的东西,按照我对普通公司的了解来看,最后的结果肯定会变成大篇幅的代码都是blocking的,最后一句话把对象转成non-blocking。除此之外,还要面对更多的其他问题,肯定要被埋怨的。

以上评价整体比较悲观吧。其实java中的 reactor发展,可以参考 .NET Reactive Extension 前辈。我推测之后可能会在一部分领域中重用(非阻塞要求高的),之后应该不会有人用这个写业务代码,因为缺点太多了。