4.3 Java语言
单值 | 多值 | |
---|---|---|
拉取(同步) | Function | (迭代器/生成器) |
推送(异步) | CompletableFuture | (流) |
4.2.1 Function 函数
这个是最常见的语法,即当调用者调用一个函数时,同步地返回一个值。
4.2.2 CompletableFuture
Future
Future
是Java1.5引入的功能。它配合Callable
或Runnable
可以方便地实现异步调用。
要注意,Future.get()
是阻塞的。如果调用者在实际任务未完成前就调用get
,那就需要一直等待(当然,超时时间是可以设置的)。
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 线程
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = new Callable<String>() {
@Override
public String call() {
// Perform some computation
System.out.println("Entered Callable");
Thread.sleep(2000);
return "Hello from Callable";
}
};
System.out.println("Submitting Callable");
Future<String> future = executorService.submit(callable);
// This line executes immediately
System.out.println("Do something else while callable is getting executed");
System.out.println("Retrieve the result of the future");
// Future.get() blocks until the result is available
String result = future.get();
System.out.println(result);
executorService.shutdown();
}
}
如果相互依赖的连续多个异步调用该如何处理?例如,异步函数B
需要异步函数A
的返回值。若使用上述Future
来实现,那么,
Main
线程就要等待异步函数A
结束,get
到其结果,然后再调用异步函数B
;- 又或者,异步函数
A
里面以回调(callback)方式嵌入异步函数B
的逻辑。
第一个方案如下:
第二方案中,由于Callable
的call()
方法或者Runnable
的run()
方法都是不接受参数的,因此需要自定义拓展接口。当然,很多第三方库都有提供类似的功能。
下面代码演示了在一个异步调用中发起另一个异步调用的情况。
Callback.java
为拓展的Callable
,内含一个callback
变量可以传入一个回调函数。回调函数可以向其中插入一段逻辑。
import java.util.concurrent.*;
import java.util.Locale;
public class Main {
private static abstract class Callback<T, S> implements Callable<S> {
T callback;
void setCallback (T callback) {
this.callback = callback;
}
public abstract S call () throws Exception;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Callback<String, String> upperCase = new Callback<String, String>() {
public String call() throws Exception {
System.out.println("Entered Callable 2");
Thread.sleep(2000);
return callback.toUpperCase(Locale.forLanguageTag("en"));
}
};
Callback<Callback<String, String>, String> callable = new Callback<Callback<String, String>, String>() {
public String call() throws Exception {
// Perform some computation
System.out.println("Entered Callable 1");
Thread.sleep(2000);
// return "Hello from Callable 1";
System.out.println("Submitting Callback");
// 一个值,也是一个特殊的回调函数,可视作常函数。
callback.setCallback("Hello from Callable 1");
Future<String> future = executorService.submit(callback);
return future.get();
}
};
callable.setCallback(upperCase);
System.out.println("Submitting Callable");
Future<String> future = executorService.submit(callable);
// This line executes immediately
System.out.println("Do something else while callable is getting executed");
System.out.println("Retrieve the result of the future");
// Future.get() blocks until the result is available
String result = future.get();
System.out.println(result);
executorService.shutdown();
}
}
import java.util.concurrent.*;
// Callback是一个相对独立的"计算单元",它可以被放到一个线程中去执行,待执行结束后再查看结果即可。
// 它包含两块逻辑:
// 一个是call部分,为自身的逻辑
// 另一个是callback部分,即调用者向该"计算单元"中注入的逻辑
public abstract class Callback<T, S> implements Callable<S> {
// 回调处理,以便在call函数中利用
T callback;
// 添加回调函数
void setCallback (T callback) {
this.callback = callback;
}
public abstract S call () throws Exception;
}
CompletableFuture
CompletableFuture
是Java1.8引入的功能。同时,Java1.8还加入了java.util.function.*
的函数接口,以及lambda函数的语法。
import java.util.concurrent.*;
import java.util.function.*;
import java.util.Locale;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("Submitting Async Task");
CompletableFuture<Either<String, Exception>> future = CompletableFuture.supplyAsync(() -> {
try{
// Perform some computation
System.out.println("Entered Callable 1");
Thread.sleep(2000);
return Either.<String, Exception>left("Hello from Callable 1");
} catch (Exception e) {
return Either.<String, Exception>right(e);
}
}, executorService).<Either<String, Exception>>thenApplyAsync((Either<String, Exception> result)->{
return result.<String, Exception>flatMap((String s) -> {
try {
System.out.println("Entered Callable 2");
Thread.sleep(2000);
return Either.left(s.toUpperCase(Locale.forLanguageTag("en")));
} catch (Exception e) {
return Either.<String, Exception>right(e);
}
}, e -> Either.<String, Exception>right(e));
}, executorService);
// This line executes immediately
System.out.println("Do something else while callable is getting executed");
System.out.println("Retrieve the result of the future");
// Future.get() blocks until the result is available
future.<Either<String, Exception>>get().apply((String s) -> {
System.out.println(s);
}, e -> {
// System.out.println("Exception:" + e.getMessage());
});
executorService.shutdown();
}
}
import java.util.function.*;
public abstract class Either<L,R> {
public static <L,R> Either<L,R> left(L value) {
return new Either<L,R>() {
@Override public <T> T map(Function<? super L, ? extends T> lFunc,
Function<? super R, ? extends T> rFunc) {
return lFunc.apply(value);
}
@Override public <A, B> Either<A, B> flatMap(Function<L, Either<A, B>> toLeft,
Function<R, Either<A, B>> toRight) {
return toLeft.apply(value);
}
};
}
public static <L,R> Either<L,R> right(R value) {
return new Either<L,R>() {
@Override public <T> T map(Function<? super L, ? extends T> lFunc,
Function<? super R, ? extends T> rFunc) {
return rFunc.apply(value);
}
@Override public <A, B> Either<A, B> flatMap(Function<L, Either<A, B>> toLeft,
Function<R, Either<A, B>> toRight) {
return toRight.apply(value);
}
};
}
private Either() {}
public abstract <T> T map(
Function<? super L, ? extends T> lFunc, Function<? super R, ? extends T> rFunc);
public <T> Either<T,R> mapLeft(Function<? super L, ? extends T> lFunc) {
return this.<Either<T,R>>map(t -> left(lFunc.apply(t)), t -> (Either<T,R>)this);
}
public <T> Either<L,T> mapRight(Function<? super R, ? extends T> lFunc) {
return this.<Either<L,T>>map(t -> (Either<L,T>)this, t -> right(lFunc.apply(t)));
}
public void apply(Consumer<? super L> lFunc, Consumer<? super R> rFunc) {
map(consume(lFunc), consume(rFunc));
}
private <T> Function<T, Void> consume(Consumer<T> c) {
return t -> { c.accept(t); return null; };
}
public abstract <A, B> Either<A, B> flatMap(
Function<L, Either<A, B>> toLeft, Function<R, Either<A, B>> toRight);
}
4.2.3 迭代器・生成器
Java默认是不支持一个函数有多个返回值的。多个返回值不是指一次性返回超过一个值,而是指沿时间序列,不断发送返回值。
import java.util.concurrent.*;
import java.util.Iterator;
public class Main {
// 求所有n的倍数
private static Generator<Integer> multiplesOf(int n) {
// 该函数返回值有无穷个整数
return new Generator<Integer>() {
public void run() throws InterruptedException {
int i = 0;
while (true) {
yield(n*(i++));
}
}
};
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 求7的倍数
Iterator<Integer> result = multiplesOf(7).iterator();
// 显示前4个
System.out.println(result.next());
System.out.println(result.next());
System.out.println(result.next());
System.out.println(result.next());
}
}
import java.util.Iterator;
import java.util.NoSuchElementException;
public abstract class Generator<T> implements Iterable<T> {
private class Condition {
private boolean isSet;
public synchronized void set() {
isSet = true;
notify();
}
public synchronized void await() throws InterruptedException {
try {
if (isSet)
return;
wait();
} finally {
isSet = false;
}
}
}
static ThreadGroup THREAD_GROUP;
Thread producer;
private boolean hasFinished;
private final Condition itemAvailableOrHasFinished = new Condition();
private final Condition itemRequested = new Condition();
private T nextItem;
private boolean nextItemAvailable;
private RuntimeException exceptionRaisedByProducer;
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return waitForNext();
}
@Override
public T next() {
if (!waitForNext())
throw new NoSuchElementException();
nextItemAvailable = false;
return nextItem;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private boolean waitForNext() {
if (nextItemAvailable)
return true;
if (hasFinished)
return false;
if (producer == null)
startProducer();
itemRequested.set();
try {
itemAvailableOrHasFinished.await();
} catch (InterruptedException e) {
hasFinished = true;
}
if (exceptionRaisedByProducer != null)
throw exceptionRaisedByProducer;
return !hasFinished;
}
};
}
protected abstract void run() throws InterruptedException;
protected void yield(T element) throws InterruptedException {
nextItem = element;
nextItemAvailable = true;
itemAvailableOrHasFinished.set();
itemRequested.await();
}
private void startProducer() {
assert producer == null;
if (THREAD_GROUP == null)
THREAD_GROUP = new ThreadGroup("generatorfunctions");
producer = new Thread(THREAD_GROUP, new Runnable() {
@Override
public void run() {
try {
itemRequested.await();
Generator.this.run();
} catch (InterruptedException e) {
// No need to do anything here; Remaining steps in run()
// will cleanly shut down the thread.
} catch (RuntimeException e) {
exceptionRaisedByProducer = e;
}
hasFinished = true;
itemAvailableOrHasFinished.set();
}
});
producer.setDaemon(true);
producer.start();
}
@Override
protected void finalize() throws Throwable {
producer.interrupt();
producer.join();
super.finalize();
}
}
4.2.4 流
第三方库 | 多值 | 单值 | 备考 |
---|---|---|---|
Reactor | Flux | Mono | Spring WebFlux响应式框架的基础 |
RxJava | Observable | Single | ReactiveX项目的Java版 |
函数的输入输出按照值的个数被包装了起来。它们都是异步的。以Reactor为例,
Mono<Integer>
: 一个整数值会到来。它本身不是整数值,但它描述了整数值会到来这件事情。Flux<Integer>
: 一系列整数值会到来。同样的,它是一个事件。拿到这个事件后,可以定义当每个值到来时如何处理,也可以对事件整体进行处理。
public Flux<Integer> example(Flux<Integer> input) {
return input
.map(x -> x^2)
.buffer(2)
.map(x -> x.size());
}
[styles] event_radius = 30 operator_height = 60 ----1----2----3----4---5-| > map(x -> x^2) ----A-----B-----C--D----E-| A := 1 B := 4 C := 9 D := 16 E := 25 > buffer(2) -----------X--------Y----Z-| X := [1,4] Y := [9,16] Z := [25] > map(x->x.size()) -----------2---------2----1-|
4.2.5 例: Spring WebFlux
Spring WebFlux框架将客户端的请求(request)视作流入的起点,在服务端经过运算后,再源源不断地流向客户,响应(response)请求。这是一个符合人们印象的很自然的抽象。它与传统Spring框架的最大不同是,它的异步性。
从思想上讲,
- 传统的Spring框架:当服务器收到用户请求数据
X
时,对X
如何处理。 - Spring WebFlux框架:当服务器收到用户请求数据
X
这件事E
发生时,如何对该事件E
处理。
当然,由于HTTP的局限性,用户请求数据都是简单的,即框架底层会将数据同步之后,再传输。例如向客户端响应一个Flux
并不意味着向客户端源源不断地发送多次HTTP响应,而是框架底层会收集Flux
的数据后,统一发往客户端。同样的,客户端的请求数据也不可能是源源不断的Flux
,WebFlux框架甚至没对它抽象。
一个节选自官网的例子。
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
public class PersonHandler {
private final PersonRepository repository;
public PersonHandler(PersonRepository repository) {
this.repository = repository;
}
public Mono<ServerResponse> listPeople(ServerRequest request) {
Flux<Person> people = repository.allPeople();
return ok().contentType(APPLICATION_JSON).body(people, Person.class);
}
public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class);
return ok().build(repository.savePerson(person));
}
public Mono<ServerResponse> getPerson(ServerRequest request) {
int personId = Integer.valueOf(request.pathVariable("id"));
return repository.getPerson(personId)
.flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
4.2.6 例: reactive-grpc
(Salesforce)
gRPC的通信模式完美契合Reactive框架的数据抽象。
gRPC的四种情况:
- 一对一(unary):一个请求,一个响应
- 一对多(server stream):一个请求,多次响应
- 多对一(client stream):多个请求,一次响应
- 多对多(bi-direction stream):多次请求,多次响应
对于单值、多值传递这件事,ReactiveX框架恰好有Mono
和Flux
可以描述。
节选自官网的例子:
https://github.com/salesforce/reactive-grpc/tree/master/reactor
ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() {
@Override
public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) {
return request.map(protoRequest -> greet("Hello", protoRequest));
}
...
@Override
public Flux<HelloResponse> sayHelloBothStream(Flux<HelloRequest> request) {
return request
.map(HelloRequest::getName)
.buffer(2)
.map(names -> greet("Hello", String.join(" and ", names)));
}
};