Guava 对JDK Future异步编程的扩展支持

发布于:2021-12-03 03:05:08

JDK Future

JDK中Future代表了异步计算的结果,通过向线程池ThreadPoolExecutor(通常使用ExecutorService接口) submit 一个Callable,可返回一个Future, eg:


ExecutorService jdkExecutor = Executors.newFixedThreadPool(2);
Future jdkFuture = jdkExecutor.submit(new Callable {

@Override
public String call()
throws Exception {

logger.info("execute task");

// 模拟耗时计算
TimeUnit.SECONDS.sleep(3);

return "hello jdk future";

}

});

通过Future的get方法可以获取异步计算的结果,如果计算没完成,get会阻塞,直到计算完成才从阻塞中返回。


【usecase1】


有些时候,我们并不想直接通过get在那里傻等,那么,我们可能会想,我就再封装一个Runnable,丢到线程池里面执行,在这个Runnable里面取计算结果,然后再干我们的逻辑,eg:


ExecutorService jdkExecutor = Executors.newFixedThreadPool(2);
final Future jdkFuture = jdkExecutor.submit(new Callable() {

@Override
public String call()
throws Exception {

logger.info("execute task");

// 模拟耗时计算
TimeUnit.SECONDS.sleep(3);

return "hello jdk future";

}

});

// 在线程池中处理future结果
jdkExecutor.execute(new Runnable() {

@Override
public void run() {

try {
String result = jdkFuture.get();
processJdkFutureResult(result);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
}

}

private void processJdkFutureResult(String result) {
logger.info("process future result : " + result);
}

});

【usecase2】


更进一步,我们可能希望通过定义一个回调接口,让这个Runnable干完事情以后,通知我们拿结果去执行我们的逻辑, eg:


首先我们定义一个回调接口如下所示:


public interface FutureCallback {

public void onSuccess(V result);

public void onError(Throwable t);

}

然后根据上面的代码改写成callback的方式,在回调函数里面处理我们的逻辑:


ExecutorService jdkExecutor = Executors.newFixedThreadPool(2);
final Future jdkFuture = jdkExecutor.submit(new Callable() {

@Override
public String call()
throws Exception {

logger.info("execute task");

// 模拟耗时计算
TimeUnit.SECONDS.sleep(3);

return "hello jdk future";

}

});

final FutureCallback futureCallback = new FutureCallback() {

@Override
public void onSuccess(String result) {
// 拿到结果,进行我们的逻辑处理
logger.info("process future result : " + result);
}

@Override
public void onError(Throwable t) {
logger.error(t.getMessage(), t);
}

};

// 在线程池中处理future结果
jdkExecutor.execute(new Runnable() {

@Override
public void run() {

try {
String result = jdkFuture.get();
// 线程池力的任务处理完主动调用回调函数
futureCallback.onSuccess(result);
} catch (InterruptedException e) {
futureCallback.onError(e);
} catch (ExecutionException e) {
futureCallback.onError(e);
}

}

});

上面的例子可以归结出原生JDK异步编程的常规实现:



Future + 回调



通过上面的例子,我们看出,使用原生JDK的Future来实现异步编程,貌似有点繁琐,当然自己也可以造个轮子封装一把。然而,是否有一种方案,可以将上面的实现变得通用起来呢?答案就是Guava工具包中提供的ListenableFutureFutureCallbackFutures


Guava ListenableFuture

Guava中定义ListenableFuture,扩展了JDK原生Future接口,提供了添加listener的接口方法:


public interface ListenableFuture extends Future {
void addListener(Runnable listener, Executor executor);
}

该接口的功能是,当future执行完成后,会使用提供的executor(线程池)调用这些listener,若添加listener的时候,任务已经执行完,那么也会使用executor直接调用listener


通过这个扩展的接口功能,达到我们上面JDK Future 【usecase1】 中的目的



Tips: 想要得到ListenableFuture,必须通过Guava提供的ListeningExecutorService的submit方法来提交任务。我们可以通过Guava提供的MoreExecutors对JDK 线程池进行包装,来得到ListeningExecutorService



eg:


ExecutorService jdkExecutor = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setNameFormat("thd%d").build());
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(jdkExecutor);
final ListenableFuture future = guavaExecutor.submit(new Callable() {

@Override
public String call()
throws Exception {

logger.info("execute task");

// 模拟耗时计算
TimeUnit.SECONDS.sleep(3);

return "hello guava executor";

}
});

future.addListener(new OrderingListener(future), guavaExecutor);
future.addListener(new OrderingListener(future), guavaExecutor);
future.addListener(new OrderingListener(future), guavaExecutor);
future.addListener(new OrderingListener(future), guavaExecutor);
future.addListener(new OrderingListener(future), guavaExecutor);

private static class OrderingListener implements Runnable {

private static AtomicInteger seq = new AtomicInteger(0);

/**
* 序列号
*/
private int orderNumber;

private ListenableFuture guavaFuture;

public OrderingListener(ListenableFuture guavaFuture) {
this.orderNumber = seq.getAndIncrement();
this.guavaFuture = guavaFuture;
}

@Override
public void run() {

try {
logger.info("OrderingListener[{}] call", orderNumber);
V result = guavaFuture.get();
logger.info("OrderingListener[{}] get result : {}", orderNumber, result);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
}

}

}


Tips: 可在一个ListenableFuture添加多个listener(Runnable),但是注意,这些listener的执行顺序并不与添加进去的顺序保持一致



Guava FutureCallback 与 Futures

接下来,Guava又提供了一个FutureCallback接口,来帮助我们实现回调功能:


public interface FutureCallback {

void onSuccess(@Nullable V result);

void onFailure(Throwable t);

}

这与我们上面JDK Future 【usercase2】 中的想法是一致的。同时,Guava还提供Futures类来协助我们更好的在一个ListenableFuture上添加回调:


public static void addCallback(final ListenableFuture future,
final FutureCallback callback, Executor executor) {

// ... 省略源码逻辑

}

Futures类的addCallback方法实现,其实就是帮我们添加了一个通用的listener到ListenableFuture里面,在这个通用的listener里面拿到接口,并使用callback接口进行回调,实现的就是JDK Future 【usecase2】 中的目标,只是人家Guava已经帮你封装好了,直接用起来就可以:


ExecutorService jdkExecutor = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setNameFormat("thd%d").build());
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(jdkExecutor);
final ListenableFuture future = guavaExecutor.submit(new Callable() {

@Override
public String call()
throws Exception {

logger.info("execute task");

// 模拟耗时计算
TimeUnit.SECONDS.sleep(3);

return "hello guava executor";

}
});

FutureCallback callback = new FutureCallback() {

@Override
public void onSuccess(String result) {
logger.info("on guava future callback, get result : " + result);
}

@Override
public void onFailure(Throwable t) {
logger.error(t.getMessage(), t);
}
};


Futures.addCallback(future, callback, guavaExecutor);

总结

通过上面的例子,初步介绍了Guava工具包中提供的并发工具类对原生JDK Future异步编程的扩展,使得编写异步代码更加方便高效。Guava的文档中建议我们尽量使用ListenableFuture来替代原生的Future,通过上面的例子我们也确实能看出来,使用Guava封装好的工具类来写代码,确实比原生的好一些


参考
https://github.com/google/guava/wiki/ListenableFutureExplained

相关推荐

最新更新

猜你喜欢