强大而简单的抽象,让编写正确的并发代码更简单
本文仅简单介绍了一下Guava的并发编程支持,没有进行源码分析,且没有与JUC的对比,欲想了解更多内容,需要自行搜索
本文参考自:
http://ifeve.com/google-guava-listenablefuture/
http://ifeve.com/google-guava-serviceexplained/
— By Syahfozy
ListenableFuture:完成后触发回调的Future
Guava 定义了 ListenableFuture 接口并继承了 JDK concurrent 包下的 Future 接口。
我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:
- 大多数 Futures 方法中需要它。
- 转到 ListenableFuture 编程比较容易。
- Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。
接口
传统 JDK 中的 Future 通过异步的方式计算返回结果: 在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。
ListenableFuture 可以允许你注册回调方法 (callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。
ListenableFuture 中的基础方法是 [addListener(Runnable, Executor)], 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。
其中原生并发编程的代可参考如下:
1 | ExecutorService executorService = Executors.newCachedThreadPool(); |
添加回调(Callbacks)
多数用户喜欢使用 [Futures.addCallback(ListenableFuture
- [onSuccess(V)], 在 Future 成功的时候执行,根据 Future 结果来判断。
- [onFailure(Throwable)], 在 Future 失败的时候执行,根据 Future 结果来判断。
ListenableFuture的创建
对应JDK中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了ListeningExecutorService 接口, 该接口返回 ListenableFuture 而相应的 ExecutorService 返回普通的 Future。将 ExecutorService 转为 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。
1 | /** |
另外, 假如你是从 FutureTask转换而来的, Guava 提供ListenableFutureTask.create(Callable
1 | /** |
假如你喜欢抽象的方式来设置future的值,而不是想实现接口中的方法,可以考虑继承抽象类AbstractFuture
假如你必须将其他API提供的Future转换成 ListenableFuture,你没有别的方法只能采用硬编码的方式JdkFutureAdapters.listenInPoolThread(Future) 来将 Future 转换成 ListenableFuture。尽可能地采用修改原生的代码返回 ListenableFuture会更好一些。
Application
使用ListenableFuture 最重要的理由是它可以进行一系列的复杂链式的异步操作。
1 | final ListeningExecutorService decorator = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); |
方法 | 描述 | 参考 |
transform(ListenableFuture<A>, AsyncFunction<A, B>, Executor) | 返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的 AsyncFunction 参数指派到传入的 ListenableFuture 中. | transform(ListenableFuture<A>, AsyncFunction<A, B>) |
transform(ListenableFuture<A>, Function<A, B>, Executor) | 返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的 Function 参数指派到传入的 ListenableFuture 中. | transform(ListenableFuture<A>, Function<A, B>) |
allAsList(Iterable<ListenableFuture<V>>) | 返回一个 ListenableFuture ,该 ListenableFuture 返回的 result 是一个 List,List 中的值是每个 ListenableFuture 的返回值,假如传入的其中之一 fails 或者 cancel,这个 Future fails 或者 canceled | allAsList(ListenableFuture<V>...) |
successfulAsList(Iterable<ListenableFuture<V>>) | 返回一个 ListenableFuture ,该 Future 的结果包含所有成功的 Future,按照原来的顺序,当其中之一 Failed 或者 cancel,则用 null 替代 | successfulAsList(ListenableFuture<V>...) |
AsyncFunction<A, B> 中提供一个方法ListenableFuture apply(A input),它可以被用于异步变换值。
接下来简单介绍一下表格中方法的使用。
transformAsync(ListenableFuture, AsyncFunction<A, B>, Executor)
返回一个100以内的随机数转换为一个随机数的平方
1 | ListenableFuture<Integer> calculateFuture = Futures.transformAsync( |
transform(ListenableFuture, Function<A, B>, Executor)
返回一个100以内的随机数转换为”return: random”字符串
1 | ListenableFuture<String> strFuture = Futures.transform( |
allAsList(Iterable<ListenableFuture
10以内的随机数和fail ListenableFuture组成的ListenableFuture
传入的其中之一fails或者cancel,这个Future fails 或者canceled
1 | ListenableFuture<List<Integer>> allAsList = Futures.allAsList(futureList); |
successfulAsList(Iterable<ListenableFuture
返回一个ListenableFuture ,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代
1 | ListenableFuture<List<Integer>> successfulAsList = Futures.successfulAsList(futureList); |
CheckedFuture
Guava也提供了 CheckedFuture<V, X extends Exception> 接口。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易 。将 ListenableFuture 转换成CheckedFuture,可以使用 Futures.makeChecked(ListenableFuture
Service框架:抽象可开启和关闭的服务,帮助你维护服务的状态逻辑
概述
Guava 包里的 Service 接口用于封装一个服务对象的运行状态、包括 start 和 stop 等方法。例如 web 服务器,RPC 服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启 / 关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava 包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。
一个服务正常生命周期有:
- Service.State.NEW
- Service.State.STARTING
- Service.State.RUNNING
- Service.State.STOPPING
- Service.State.TERMINATED
服务一旦被停止就无法再重新启动了。如果服务在 starting、running、stopping 状态出现问题、会进入 Service.State.FAILED. 状态。调用 startAsync() 方法可以异步开启一个服务, 同时返回 this 对象形成方法调用链。注意:只有在当前服务的状态是 NEW 时才能调用 startAsync() 方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法 stopAsync() 。但是不像 startAsync(), 多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。
Service 也提供了一些方法用于等待服务状态转换的完成:
通过 addListener() 方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加 Listener(这时的状态是 NEW)、否则之前已发生的状态转换事件是无法在新添加的 Listener 上被重新触发的。
同步使用 awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出 IllegalStateException 异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。
Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承 Guava 包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。
基础实现类
AbstractIdleService
AbstractIdleService 类简单实现了 Service 接口、其在 running 状态时不会执行任何动作–因此在 running 时也不需要启动线程–但需要处理开启 / 关闭动作。要实现一个此类的服务,只需继承 AbstractIdleService 类,然后自己实现 startUp() 和 shutDown() 方法就可以了。
如上面的例子、由于任何请求到 GcStatsServlet 时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。
AbstractExecutionThreadService
AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载 run() 方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:
另外,你还可以重载 triggerShutdown() 方法让 run() 方法结束返回。
重载 startUp() 和 shutDown() 方法是可选的,不影响服务本身状态的管理
start() 内部会调用 startUp() 方法,创建一个线程、然后在线程内调用 run() 方法。stop() 会调用 triggerShutdown() 方法并且等待线程终止。
AbstractScheduledService
AbstractScheduledService 类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration() 方法定义一个周期执行的任务,以及相应的 startUp() 和 shutDown() 方法。为了能够描述执行周期,你需要实现 scheduler() 方法。通常情况下,你可以使用 AbstractScheduledService.Scheduler 类提供的两种调度器:[newFixedRateSchedule(initialDelay, delay, TimeUnit)](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.Scheduler.html#newFixedRateSchedule(long, long, java.util.concurrent.TimeUnit)) 和 [newFixedDelaySchedule(initialDelay, delay, TimeUnit)](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.Scheduler.html#newFixedDelaySchedule(long, long, java.util.concurrent.TimeUnit)),类似于 JDK 并发包中 ScheduledExecutorService 类提供的两种调度方式。如要自定义 schedules 则可以使用 CustomScheduler 类来辅助实现;具体用法见 javadoc。
AbstractService
如需要自定义的线程管理、可以通过扩展 AbstractService 类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承 AbstractService 类。
继承 AbstractService 方法必须实现两个方法.
- doStart() : 首次调用 startAsync() 时会同时调用 doStart(),doStart() 内部需要处理所有的初始化工作、如果启动成功则调用 notifyStarted() 方法;启动失败则调用 notifyFailed()
- doStop() : 首次调用 stopAsync() 会同时调用 doStop(),doStop() 要做的事情就是停止服务,如果停止成功则调用 notifyStopped() 方法;停止失败则调用 notifyFailed() 方法。
doStart 和 doStop 方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。
使用 ServiceManager
除了对 Service 接口提供基础的实现类,Guava 还提供了 ServiceManager 类使得涉及到多个 Service 集合的操作更加容易。通过实例化 ServiceManager 类来创建一个 Service 集合,你可以通过以下方法来管理它们:
- startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是 NEW 的话、那么你只能调用该方法一次、这跟 Service#startAsync() 是一样的。
- stopAsync() :将停止所有被管理的服务。
- [addListener](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ServiceManager.html#addListener(com.google.common.util.concurrent.ServiceManager.Listener, java.util.concurrent.Executor)) :会添加一个 ServiceManager.Listener,在服务状态转换中会调用该 Listener
- awaitHealthy() :会等待所有的服务达到 Running 状态
- awaitStopped() :会等待所有服务达到终止状态
检测类的方法有:
- isHealthy() :如果所有的服务处于 Running 状态、会返回 True
- servicesByState() :以状态为索引返回当前所有服务的快照
- startupTimes() :返回一个 Map 对象,记录被管理的服务启动的耗时、以毫秒为单位,同时 Map 默认按启动时间排序。
我们建议整个服务的生命周期都能通过 ServiceManager 来管理,不过即使状态转换是通过其他机制触发的、也不影响 ServiceManager 方法的正确执行。例如:当一个服务不是通过 startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy() 也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于 New 状态。
如想了解更多内容或用法,可参考:http://ifeve.com/google-guava-serviceexplained/