4.3 Java语言

单值多值
拉取(同步)Function(迭代器/生成器)
推送(异步)CompletableFuture(流)

4.2.1 Function 函数

这个是最常见的语法,即当调用者调用一个函数时,同步地返回一个值。

4.2.2 CompletableFuture

Future

Future是Java1.5引入的功能。它配合CallableRunnable可以方便地实现异步调用。

sequenceDiagram Main->>ExecutorService: 提交Callable activate ExecutorService ExecutorService->>+Thread-1: 创建线程,执行Callable ExecutorService-->>Main: 返回Future deactivate ExecutorService Main->>Main: 其他任意工作 Main->>Thread-1: Future.get() Thread-1-->>-Main: 返回结果

要注意,Future.get()是阻塞的。如果调用者在实际任务未完成前就调用get,那就需要一直等待(当然,超时时间是可以设置的)。

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 线程
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() {
                // Perform some computation
                System.out.println("Entered Callable");
                Thread.sleep(2000);
                return "Hello from Callable";
            }
        };

        System.out.println("Submitting Callable");
        Future<String> future = executorService.submit(callable);

        // This line executes immediately
        System.out.println("Do something else while callable is getting executed");

        System.out.println("Retrieve the result of the future");
        // Future.get() blocks until the result is available
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}

如果相互依赖的连续多个异步调用该如何处理?例如,异步函数B需要异步函数A的返回值。若使用上述Future来实现,那么,

  • Main线程就要等待异步函数A结束,get到其结果,然后再调用异步函数B
  • 又或者,异步函数A里面以回调(callback)方式嵌入异步函数B的逻辑。

第一个方案如下:

sequenceDiagram Main->>ExecutorService: 提交Callable A activate ExecutorService ExecutorService->>+Thread-1: 创建线程,执行Callable A ExecutorService-->>Main: 返回Future deactivate ExecutorService Main->>Main: 其他任意工作 Main->>Thread-1: Future.get() Thread-1-->>-Main: 返回结果 Main->>+ExecutorService: 提交Callable B ExecutorService->>+Thread-1: 创建线程,执行Callable B ExecutorService-->>Main: 返回Future deactivate ExecutorService Main->>Main: 其他任意工作 Main->>Thread-1: Future.get() Thread-1-->>-Main: 返回结果

第二方案中,由于Callablecall()方法或者Runnablerun()方法都是不接受参数的,因此需要自定义拓展接口。当然,很多第三方库都有提供类似的功能。

下面代码演示了在一个异步调用中发起另一个异步调用的情况。

sequenceDiagram Main->>ExecutorService: 提交Callable 1 activate ExecutorService ExecutorService->>+Thread-1: 创建线程,执行Callable 1 ExecutorService-->>Main: 返回Future deactivate ExecutorService Thread-1->>+ExecutorService: 提交Callable 2 Main->>Main: 其他任意工作 ExecutorService->>+Thread-2: 创建线程,执行Callable 2 ExecutorService-->>-Thread-1: 返回Future Thread-1->>Thread-1: 其他任意工作 Thread-1->>Thread-2: Future.get() Thread-2-->>-Thread-1: 返回结果 Main->>Thread-1: Future.get() Thread-1-->>-Main: 返回结果

Callback.java为拓展的Callable,内含一个callback变量可以传入一个回调函数。回调函数可以向其中插入一段逻辑。

import java.util.concurrent.*;
import java.util.Locale;

public class Main {
    
    private static abstract class Callback<T, S> implements Callable<S> {
        T callback;
    
        void setCallback (T callback) {
            this.callback = callback;
        }
    
        public abstract S call () throws Exception;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        Callback<String, String> upperCase = new Callback<String, String>() {
            
            public String call() throws Exception {
                System.out.println("Entered Callable 2");
                Thread.sleep(2000);
                return callback.toUpperCase(Locale.forLanguageTag("en"));
            }
        };

        Callback<Callback<String, String>, String> callable = new Callback<Callback<String, String>, String>() {
            
            public String call() throws Exception {
                // Perform some computation
                System.out.println("Entered Callable 1");
                Thread.sleep(2000);
                // return "Hello from Callable 1";
                System.out.println("Submitting Callback");
                // 一个值,也是一个特殊的回调函数,可视作常函数。
                callback.setCallback("Hello from Callable 1");
                Future<String> future = executorService.submit(callback);
                return future.get();
            }
        };
        callable.setCallback(upperCase);
        
        System.out.println("Submitting Callable");
        Future<String> future = executorService.submit(callable);

        // This line executes immediately
        System.out.println("Do something else while callable is getting executed");

        System.out.println("Retrieve the result of the future");
        // Future.get() blocks until the result is available
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}
import java.util.concurrent.*;

// Callback是一个相对独立的"计算单元",它可以被放到一个线程中去执行,待执行结束后再查看结果即可。
// 它包含两块逻辑:
//    一个是call部分,为自身的逻辑
//    另一个是callback部分,即调用者向该"计算单元"中注入的逻辑
public abstract class Callback<T, S> implements Callable<S> {
    // 回调处理,以便在call函数中利用
    T callback;

    // 添加回调函数
    void setCallback (T callback) {
        this.callback = callback;
    }

    public abstract S call () throws Exception;
}

CompletableFuture

CompletableFuture是Java1.8引入的功能。同时,Java1.8还加入了java.util.function.*的函数接口,以及lambda函数的语法。

import java.util.concurrent.*;
import java.util.function.*;
import java.util.Locale;

public class Main {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        System.out.println("Submitting Async Task");
        CompletableFuture<Either<String, Exception>> future = CompletableFuture.supplyAsync(() -> {
            try{
                // Perform some computation
                System.out.println("Entered Callable 1");
                Thread.sleep(2000);
                return Either.<String, Exception>left("Hello from Callable 1");
            } catch (Exception e) {
                return Either.<String, Exception>right(e);
            }
        }, executorService).<Either<String, Exception>>thenApplyAsync((Either<String, Exception> result)->{
            return result.<String, Exception>flatMap((String s) -> {
                try {
                  System.out.println("Entered Callable 2");
                  Thread.sleep(2000);
                  return Either.left(s.toUpperCase(Locale.forLanguageTag("en")));
                } catch (Exception e) {
                    return Either.<String, Exception>right(e);
                }
            }, e -> Either.<String, Exception>right(e));
        }, executorService);

        // This line executes immediately
        System.out.println("Do something else while callable is getting executed");

        System.out.println("Retrieve the result of the future");
        // Future.get() blocks until the result is available
        future.<Either<String, Exception>>get().apply((String s) -> {
            System.out.println(s);
        }, e -> {
            // System.out.println("Exception:" + e.getMessage());
        });
        
        executorService.shutdown();
    }
}
import java.util.function.*;

public abstract class Either<L,R> {
    public static <L,R> Either<L,R> left(L value) {
        return new Either<L,R>() {
            @Override public <T> T map(Function<? super L, ? extends T> lFunc,
                                       Function<? super R, ? extends T> rFunc) {
                return lFunc.apply(value);
            }
            @Override public <A, B> Either<A, B> flatMap(Function<L, Either<A, B>> toLeft,
                                                         Function<R, Either<A, B>> toRight) {
                return toLeft.apply(value);
            }
        };
    }
    public static <L,R> Either<L,R> right(R value) {
        return new Either<L,R>() {
            @Override public <T> T map(Function<? super L, ? extends T> lFunc,
                                       Function<? super R, ? extends T> rFunc) {
                return rFunc.apply(value);
            }
            @Override public <A, B> Either<A, B> flatMap(Function<L, Either<A, B>> toLeft,
                                                         Function<R, Either<A, B>> toRight) {
                return toRight.apply(value);
            }
        };
    }
    private Either() {}
    public abstract <T> T map(
      Function<? super L, ? extends T> lFunc, Function<? super R, ? extends T> rFunc);

    public <T> Either<T,R> mapLeft(Function<? super L, ? extends T> lFunc) {
        return this.<Either<T,R>>map(t -> left(lFunc.apply(t)), t -> (Either<T,R>)this);
    }
    public <T> Either<L,T> mapRight(Function<? super R, ? extends T> lFunc) {
        return this.<Either<L,T>>map(t -> (Either<L,T>)this, t -> right(lFunc.apply(t)));
    }
    public void apply(Consumer<? super L> lFunc, Consumer<? super R> rFunc) {
        map(consume(lFunc), consume(rFunc));
    }
    private <T> Function<T, Void> consume(Consumer<T> c) {
        return t -> { c.accept(t); return null; };
    }
    public abstract <A, B> Either<A, B> flatMap(
      Function<L, Either<A, B>> toLeft, Function<R, Either<A, B>> toRight);
}

4.2.3 迭代器・生成器

Java默认是不支持一个函数有多个返回值的。多个返回值不是指一次性返回超过一个值,而是指沿时间序列,不断发送返回值。

import java.util.concurrent.*;
import java.util.Iterator;

public class Main {
    
    // 求所有n的倍数
    private static Generator<Integer> multiplesOf(int n) {
        // 该函数返回值有无穷个整数
        return new Generator<Integer>() {
            public void run() throws InterruptedException {
                int i = 0;
                while (true) {
                    yield(n*(i++));
                }
            }
        };
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 求7的倍数
        Iterator<Integer> result = multiplesOf(7).iterator();
        // 显示前4个
        System.out.println(result.next());
        System.out.println(result.next());
        System.out.println(result.next());
        System.out.println(result.next());
    }
}
import java.util.Iterator;
import java.util.NoSuchElementException;

public abstract class Generator<T> implements Iterable<T> {

    private class Condition {
        private boolean isSet;
        public synchronized void set() {
            isSet = true;
            notify();
        }
        public synchronized void await() throws InterruptedException {
            try {
                if (isSet)
                    return;
                wait();
            } finally {
                isSet = false;
            }
        }
    }

    static ThreadGroup THREAD_GROUP;

    Thread producer;
    private boolean hasFinished;
    private final Condition itemAvailableOrHasFinished = new Condition();
    private final Condition itemRequested = new Condition();
    private T nextItem;
    private boolean nextItemAvailable;
    private RuntimeException exceptionRaisedByProducer;

    @Override
    public Iterator<T> iterator() {
        return new Iterator<T>() {
            @Override
            public boolean hasNext() {
                return waitForNext();
            }
            @Override
            public T next() {
                if (!waitForNext())
                    throw new NoSuchElementException();
                nextItemAvailable = false;
                return nextItem;
            }
            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
            private boolean waitForNext() {
                if (nextItemAvailable)
                    return true;
                if (hasFinished)
                    return false;
                if (producer == null)
                    startProducer();
                itemRequested.set();
                try {
                    itemAvailableOrHasFinished.await();
                } catch (InterruptedException e) {
                    hasFinished = true;
                }
                if (exceptionRaisedByProducer != null)
                    throw exceptionRaisedByProducer;
                return !hasFinished;
            }
        };
    }

    protected abstract void run() throws InterruptedException;

    protected void yield(T element) throws InterruptedException {
        nextItem = element;
        nextItemAvailable = true;
        itemAvailableOrHasFinished.set();
        itemRequested.await();
    }

    private void startProducer() {
        assert producer == null;
        if (THREAD_GROUP == null)
            THREAD_GROUP = new ThreadGroup("generatorfunctions");
        producer = new Thread(THREAD_GROUP, new Runnable() {
            @Override
            public void run() {
                try {
                    itemRequested.await();
                    Generator.this.run();
                } catch (InterruptedException e) {
                    // No need to do anything here; Remaining steps in run()
                    // will cleanly shut down the thread.
                } catch (RuntimeException e) {
                    exceptionRaisedByProducer = e;
                }
                hasFinished = true;
                itemAvailableOrHasFinished.set();
            }
        });
        producer.setDaemon(true);
        producer.start();
    }

    @Override
    protected void finalize() throws Throwable {
        producer.interrupt();
        producer.join();
        super.finalize();
    }
}

4.2.4 流

第三方库多值单值备考
ReactorFluxMonoSpring WebFlux响应式框架的基础
RxJavaObservableSingleReactiveX项目的Java版

函数的输入输出按照值的个数被包装了起来。它们都是异步的。以Reactor为例,

  • Mono<Integer>: 一个整数值会到来。它本身不是整数值,但它描述了整数值会到来这件事情。
  • Flux<Integer>: 一系列整数值会到来。同样的,它是一个事件。拿到这个事件后,可以定义当每个值到来时如何处理,也可以对事件整体进行处理。
    public Flux<Integer> example(Flux<Integer> input) {
        return input
            .map(x -> x^2)
            .buffer(2)
            .map(x -> x.size());
    }

[styles]
event_radius = 30
operator_height = 60

----1----2----3----4---5-|

> map(x -> x^2)

----A-----B-----C--D----E-|
A := 1
B := 4
C := 9
D := 16
E := 25

> buffer(2)

-----------X--------Y----Z-|
X := [1,4]
Y := [9,16]
Z := [25]

> map(x->x.size())

-----------2---------2----1-|

4.2.5 例: Spring WebFlux

Spring WebFlux框架将客户端的请求(request)视作流入的起点,在服务端经过运算后,再源源不断地流向客户,响应(response)请求。这是一个符合人们印象的很自然的抽象。它与传统Spring框架的最大不同是,它的异步性。

从思想上讲,

  • 传统的Spring框架:当服务器收到用户请求数据X时,对X如何处理。
  • Spring WebFlux框架:当服务器收到用户请求数据X这件事E发生时,如何对该事件E处理。

当然,由于HTTP的局限性,用户请求数据都是简单的,即框架底层会将数据同步之后,再传输。例如向客户端响应一个Flux并不意味着向客户端源源不断地发送多次HTTP响应,而是框架底层会收集Flux的数据后,统一发往客户端。同样的,客户端的请求数据也不可能是源源不断的Flux,WebFlux框架甚至没对它抽象。

一个节选自官网的例子。

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-fn-handler-functions

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

public class PersonHandler {

    private final PersonRepository repository;

    public PersonHandler(PersonRepository repository) {
        this.repository = repository;
    }

    public Mono<ServerResponse> listPeople(ServerRequest request) { 
        Flux<Person> people = repository.allPeople();
        return ok().contentType(APPLICATION_JSON).body(people, Person.class);
    }

    public Mono<ServerResponse> createPerson(ServerRequest request) { 
        Mono<Person> person = request.bodyToMono(Person.class);
        return ok().build(repository.savePerson(person));
    }

    public Mono<ServerResponse> getPerson(ServerRequest request) { 
        int personId = Integer.valueOf(request.pathVariable("id"));
        return repository.getPerson(personId)
            .flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

4.2.6 例: reactive-grpc (Salesforce)

gRPC的通信模式完美契合Reactive框架的数据抽象。

gRPC的四种情况:

  • 一对一(unary):一个请求,一个响应
  • 一对多(server stream):一个请求,多次响应
  • 多对一(client stream):多个请求,一次响应
  • 多对多(bi-direction stream):多次请求,多次响应

对于单值、多值传递这件事,ReactiveX框架恰好有MonoFlux可以描述。

节选自官网的例子:

https://github.com/salesforce/reactive-grpc/tree/master/reactor

ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() {
    @Override
    public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) {
        return request.map(protoRequest -> greet("Hello", protoRequest));
    }

    ...

    @Override
    public Flux<HelloResponse> sayHelloBothStream(Flux<HelloRequest> request) {
        return request
                .map(HelloRequest::getName)
                .buffer(2)
                .map(names -> greet("Hello", String.join(" and ", names)));
    }
};