强大而简单的抽象,让编写正确的并发代码更简单

本文仅简单介绍了一下Guava的并发编程支持,没有进行源码分析,且没有与JUC的对比,欲想了解更多内容,需要自行搜索
本文参考自:
http://ifeve.com/google-guava-listenablefuture/
http://ifeve.com/google-guava-serviceexplained/
— By Syahfozy

ListenableFuture:完成后触发回调的Future

Guava 定义了 ListenableFuture 接口并继承了 JDK concurrent 包下的 Future 接口。

我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:

  • 大多数 Futures 方法中需要它。
  • 转到 ListenableFuture 编程比较容易。
  • Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。

接口

传统 JDK 中的 Future 通过异步的方式计算返回结果: 在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。

ListenableFuture 可以允许你注册回调方法 (callbacks),在运算(多线程执行)完成的时候进行调用,  或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。

ListenableFuture 中的基础方法是 [addListener(Runnable, Executor)], 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。

其中原生并发编程的代可参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 ExecutorService executorService = Executors.newCachedThreadPool();

// Callable+Future方式
Future<String> future = executorService.submit(new Callable<String>() {
public String call() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return "Callable+Future";
}
});

System.out.println(future.get());


// Callable+FutureTask方式 异步任务
// FutureTask类实现了RunnableFuture接口, 而RunnableFuture继承了Runnable接口和Future接口
// 所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
public String call() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return "Callable+FutureTask";
}
});
executorService.submit(futureTask);
/**
* 这种方式和executorService.submit效果是类似的
* 只不过一个使用的是ExecutorService,一个使用的是Thread
*/
// new Thread(futureTask).start();
System.out.println(futureTask.get());

添加回调(Callbacks)

多数用户喜欢使用 [Futures.addCallback(ListenableFuture, FutureCallback, Executor)]的方式, 或者 另外一个版本 [version](addCallback(ListenableFuture future,FutureCallback<? super V> callback)),默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback采用轻量级的设计. FutureCallback 中实现了两个方法:

  • [onSuccess(V)], 在 Future 成功的时候执行,根据 Future 结果来判断。
  • [onFailure(Throwable)], 在 Future 失败的时候执行,根据 Future 结果来判断。

ListenableFuture的创建

对应JDK中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了ListeningExecutorService 接口, 该接口返回 ListenableFuture 而相应的 ExecutorService 返回普通的 Future。将 ExecutorService 转为 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 /**
* ListenableFuture的创建
*/
final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<String> listenableFuture = service.submit(new Callable<String>() {
public String call() throws InterruptedException {
return "ListenableFuture";
}
});

listenableFuture.addListener(
new Runnable() {
public void run() {
System.out.println("listenableFuture addListener");
}
}, service);

Futures.addCallback(
listenableFuture,
new FutureCallback<String>() {
public void onSuccess(String result) {
System.out.println(result);
service.shutdown();
}

public void onFailure(Throwable t) {
service.shutdown();
}
},
service
);

另外, 假如你是从 FutureTask转换而来的, Guava 提供ListenableFutureTask.create(Callable) 和ListenableFutureTask.create(Runnable, V). 和 JDK不同的是, ListenableFutureTask 不能随意被继承(译者注:ListenableFutureTask中的done方法实现了调用listener的操作)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* ListenableFutureTask
*/
ListenableFutureTask task = ListenableFutureTask.create(new Callable<String>() {
public String call() throws Exception {
return "ListenableFutureTask";
}
});
//new Thread(task).start();
executorService.submit(task);
System.out.println(task.get());

ListenableFutureTask task2 = ListenableFutureTask.create(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0);
executorService.submit(task2);
// 0
System.out.println(task2.get());

假如你喜欢抽象的方式来设置future的值,而不是想实现接口中的方法,可以考虑继承抽象类AbstractFuture 或者直接使用 SettableFuture 。

假如你必须将其他API提供的Future转换成 ListenableFuture,你没有别的方法只能采用硬编码的方式JdkFutureAdapters.listenInPoolThread(Future) 来将 Future 转换成 ListenableFuture。尽可能地采用修改原生的代码返回 ListenableFuture会更好一些。

Application

使用ListenableFuture 最重要的理由是它可以进行一系列的复杂链式的异步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
final ListeningExecutorService decorator = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListenableFuture randomFuture = decorator.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
});
AsyncFunction squareFunction = new AsyncFunction<Integer, Integer>() {
public ListenableFuture<Integer> apply(Integer input) throws Exception {
return decorator.submit(new Square(input));
}
};

方法描述参考
transform(ListenableFuture<A>, AsyncFunction<A, B>, Executor)返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的 AsyncFunction 参数指派到传入的 ListenableFuture 中.transform(ListenableFuture<A>, AsyncFunction<A, B>)
transform(ListenableFuture<A>, Function<A, B>, Executor)返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的 Function 参数指派到传入的 ListenableFuture 中.transform(ListenableFuture<A>, Function<A, B>)
allAsList(Iterable<ListenableFuture<V>>)返回一个 ListenableFuture ,该 ListenableFuture 返回的 result 是一个 List,List 中的值是每个 ListenableFuture 的返回值,假如传入的其中之一 fails 或者 cancel,这个 Future fails 或者 canceledallAsList(ListenableFuture<V>...)
successfulAsList(Iterable<ListenableFuture<V>>)返回一个 ListenableFuture ,该 Future 的结果包含所有成功的 Future,按照原来的顺序,当其中之一 Failed 或者 cancel,则用 null 替代successfulAsList(ListenableFuture<V>...)

AsyncFunction<A, B> 中提供一个方法ListenableFuture apply(A input),它可以被用于异步变换值。

接下来简单介绍一下表格中方法的使用。

transformAsync(ListenableFuture, AsyncFunction<A, B>, Executor)
返回一个100以内的随机数转换为一个随机数的平方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ListenableFuture<Integer> calculateFuture = Futures.transformAsync(
randomFuture,
squareFunction,
decorator);

Futures.addCallback(
calculateFuture,
new FutureCallback<Integer>() {
public void onSuccess(Integer result) {
// square: n 100以内随机数的平方
System.out.println("square: " + result);
}

public void onFailure(Throwable t) {
}
},
decorator
);

transform(ListenableFuture, Function<A, B>, Executor)
返回一个100以内的随机数转换为”return: random”字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ListenableFuture<String> strFuture = Futures.transform(
randomFuture,
new Function<Integer, String>() {
public String apply(Integer input) {
return "return: " + input;
}
},
decorator);

Futures.addCallback(
strFuture,
new FutureCallback<String>() {
public void onSuccess(String result) {
// return: n n为100以内的随机数
System.out.println(result);
}

public void onFailure(Throwable t) {
}
},
decorator
);



ListenableFuture randomFuture1 = decorator.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(10);
}
});
ListenableFuture randomFuture2 = decorator.submit(new Callable<Integer>() {
public Integer call() throws Exception {
throw new Exception();
}
});
List<ListenableFuture<Integer>> futureList = Lists.newArrayList(randomFuture1, randomFuture2);

allAsList(Iterable<ListenableFuture>)
10以内的随机数和fail ListenableFuture组成的ListenableFuture
传入的其中之一fails或者cancel,这个Future fails 或者canceled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ListenableFuture<List<Integer>> allAsList = Futures.allAsList(futureList);

Futures.addCallback(allAsList, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
// 其中一个失败无返回
System.out.println("list: " + result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, decorator);

successfulAsList(Iterable<ListenableFuture>)
返回一个ListenableFuture ,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ListenableFuture<List<Integer>> successfulAsList = Futures.successfulAsList(futureList);
Futures.addCallback(successfulAsList, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
// list: [n, null] 其中Failed或者cancel用null替代
System.out.println("list: " + result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, decorator);


// 运行完记得关掉
executorService.shutdown();
decorator.shutdown();

CheckedFuture

Guava也提供了 CheckedFuture<V, X extends Exception> 接口。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易 。将 ListenableFuture 转换成CheckedFuture,可以使用 Futures.makeChecked(ListenableFuture, Function<Exception, X>)。

Service框架:抽象可开启和关闭的服务,帮助你维护服务的状态逻辑

概述

Guava 包里的 Service 接口用于封装一个服务对象的运行状态、包括 start 和 stop 等方法。例如 web 服务器,RPC 服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启 / 关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava 包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。

一个服务正常生命周期有:

服务一旦被停止就无法再重新启动了。如果服务在 starting、running、stopping 状态出现问题、会进入 Service.State.FAILED. 状态。调用 startAsync() 方法可以异步开启一个服务, 同时返回 this 对象形成方法调用链。注意:只有在当前服务的状态是 NEW 时才能调用 startAsync() 方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法 stopAsync() 。但是不像 startAsync(), 多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。
Service 也提供了一些方法用于等待服务状态转换的完成:

通过 addListener() 方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加 Listener(这时的状态是 NEW)、否则之前已发生的状态转换事件是无法在新添加的 Listener 上被重新触发的。

同步使用 awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出 IllegalStateException 异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承 Guava 包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

基础实现类

AbstractIdleService

 AbstractIdleService 类简单实现了 Service 接口、其在 running 状态时不会执行任何动作–因此在 running 时也不需要启动线程–但需要处理开启 / 关闭动作。要实现一个此类的服务,只需继承 AbstractIdleService 类,然后自己实现 startUp() 和 shutDown() 方法就可以了。

如上面的例子、由于任何请求到 GcStatsServlet 时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载 run() 方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:

另外,你还可以重载 triggerShutdown() 方法让 run() 方法结束返回。

重载 startUp() 和 shutDown() 方法是可选的,不影响服务本身状态的管理

start() 内部会调用 startUp() 方法,创建一个线程、然后在线程内调用 run() 方法。stop() 会调用 triggerShutdown() 方法并且等待线程终止。

AbstractScheduledService

AbstractScheduledService 类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration() 方法定义一个周期执行的任务,以及相应的 startUp() 和 shutDown() 方法。为了能够描述执行周期,你需要实现 scheduler() 方法。通常情况下,你可以使用 AbstractScheduledService.Scheduler 类提供的两种调度器:[newFixedRateSchedule(initialDelay, delay, TimeUnit)](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.Scheduler.html#newFixedRateSchedule(long, long, java.util.concurrent.TimeUnit))  和 [newFixedDelaySchedule(initialDelay, delay, TimeUnit)](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.Scheduler.html#newFixedDelaySchedule(long, long, java.util.concurrent.TimeUnit)),类似于 JDK 并发包中 ScheduledExecutorService 类提供的两种调度方式。如要自定义 schedules 则可以使用 CustomScheduler 类来辅助实现;具体用法见 javadoc。

AbstractService

如需要自定义的线程管理、可以通过扩展 AbstractService 类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承 AbstractService 类。

继承 AbstractService 方法必须实现两个方法.

  • doStart()  : 首次调用 startAsync() 时会同时调用 doStart(),doStart() 内部需要处理所有的初始化工作、如果启动成功则调用 notifyStarted() 方法;启动失败则调用 notifyFailed()
  • doStop() :  首次调用 stopAsync() 会同时调用 doStop(),doStop() 要做的事情就是停止服务,如果停止成功则调用 notifyStopped() 方法;停止失败则调用 notifyFailed() 方法。

doStart 和 doStop 方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

使用 ServiceManager

除了对 Service 接口提供基础的实现类,Guava 还提供了 ServiceManager 类使得涉及到多个 Service 集合的操作更加容易。通过实例化 ServiceManager 类来创建一个 Service 集合,你可以通过以下方法来管理它们:

检测类的方法有:

  • isHealthy() :如果所有的服务处于 Running 状态、会返回 True
  • servicesByState() :以状态为索引返回当前所有服务的快照
  • startupTimes() :返回一个 Map 对象,记录被管理的服务启动的耗时、以毫秒为单位,同时 Map 默认按启动时间排序。

我们建议整个服务的生命周期都能通过 ServiceManager 来管理,不过即使状态转换是通过其他机制触发的、也不影响 ServiceManager 方法的正确执行。例如:当一个服务不是通过 startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy() 也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于 New 状态。

如想了解更多内容或用法,可参考:http://ifeve.com/google-guava-serviceexplained/