1 | sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor {
|
2 | |
3 | private AtomicInteger tasksInProcess = new AtomicInteger(); |
4 | |
5 | private Synchronizer synchronizer = new Synchronizer(); |
6 | |
7 | public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {
|
8 | |
9 | super(poolSize, // Core size |
10 | poolSize, // Max size |
11 | keepAliveTime, |
12 | keepAliveTimeUnit, |
13 | new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), |
14 | new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback)); |
15 | |
16 | super.allowCoreThreadTimeOut(true); |
17 | } |
18 | |
19 | public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) {
|
20 | |
21 | super(poolSize, // Core size |
22 | poolSize, // Max size |
23 | keepAliveTime, |
24 | unit, |
25 | new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads) |
26 | new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try. |
27 | |
28 | super.allowCoreThreadTimeOut(true); // Time out the core threads. |
29 | } |
30 | |
31 | @Override |
32 | public void execute(Runnable task) {
|
33 | // count a new task in process |
34 | tasksInProcess.incrementAndGet(); |
35 | try {
|
36 | super.execute(task); |
37 | } catch(RuntimeException e) { // specifically handle RejectedExecutionException
|
38 | tasksInProcess.decrementAndGet(); |
39 | throw e; |
40 | } catch(Error e) {
|
41 | tasksInProcess.decrementAndGet(); |
42 | throw e; |
43 | } |
44 | } |
45 | |
46 | @Override |
47 | protected void afterExecute(Runnable r, Throwable t) {
|
48 | |
49 | super.afterExecute(r, t); |
50 | |
51 | synchronized(this) {
|
52 | tasksInProcess.decrementAndGet(); |
53 | if (tasksInProcess.intValue() == 0) {
|
54 | synchronizer.signalAll(); |
55 | } |
56 | } |
57 | } |
58 | |
59 | @Override |
60 | public void setCorePoolSize(int corePoolSize) {
|
61 | super.setCorePoolSize(corePoolSize); |
62 | super.setMaximumPoolSize(corePoolSize); |
63 | } |
64 | |
65 | @Override |
66 | public void setMaximumPoolSize(int maximumPoolSize) {
|
67 | throw new UnsupportedOperationException("setMaximumPoolSize is not supported.");
|
68 | } |
69 | |
70 | public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
|
71 | throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class.");
|
72 | } |
73 | |
74 | public void await() throws InterruptedException {
|
75 | synchronizer.await(); |
76 | } |
77 | |
78 | public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
|
79 | return synchronizer.await(timeout, timeUnit); |
80 | } |
81 | |
82 | private class Synchronizer {
|
83 | |
84 | private final Lock lock = new ReentrantLock(); |
85 | private final Condition done = lock.newCondition(); |
86 | private boolean isDone = false; |
87 | |
88 | private void signalAll() {
|
89 | |
90 | lock.lock(); |
91 | try {
|
92 | isDone = true; |
93 | done.signalAll(); |
94 | } |
95 | finally {
|
96 | lock.unlock(); |
97 | } |
98 | } |
99 | |
100 | public void await() throws InterruptedException {
|
101 | |
102 | lock.lock(); |
103 | try {
|
104 | while (!isDone) {
|
105 | done.await(); |
106 | } |
107 | } |
108 | finally {
|
109 | isDone = false; |
110 | lock.unlock(); |
111 | } |
112 | } |
113 | |
114 | public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
|
115 | |
116 | boolean await_result = false; |
117 | lock.lock(); |
118 | boolean localIsDone; |
119 | try {
|
120 | await_result = done.await(timeout, timeUnit); |
121 | } |
122 | finally {
|
123 | localIsDone = isDone; |
124 | isDone = false; |
125 | lock.unlock(); |
126 | } |
127 | return await_result && localIsDone; |
128 | } |
129 | } |
130 | |
131 | private static class BlockThenRunPolicy implements RejectedExecutionHandler {
|
132 | |
133 | private long maxBlockingTime; |
134 | private TimeUnit maxBlockingTimeUnit; |
135 | private Callable<Boolean> blockingTimeCallback; |
136 | |
137 | public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {
|
138 | this.maxBlockingTime = maxBlockingTime; |
139 | this.maxBlockingTimeUnit = maxBlockingTimeUnit; |
140 | this.blockingTimeCallback = blockingTimeCallback; |
141 | } |
142 | |
143 | public BlockThenRunPolicy() {
|
144 | } |
145 | |
146 | @Override |
147 | public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
|
148 | |
149 | BlockingQueue<Runnable> workQueue = executor.getQueue(); |
150 | boolean taskSent = false; |
151 | |
152 | while (!taskSent) {
|
153 | |
154 | if (executor.isShutdown()) {
|
155 | throw new RejectedExecutionException( |
156 | "ThreadPoolExecutor has shutdown while attempting to offer a new task."); |
157 | } |
158 | |
159 | try {
|
160 | if(blockingTimeCallback != null) {
|
161 | if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) {
|
162 | taskSent = true; |
163 | } |
164 | else {
|
165 | // task was not accepted - call the Callback |
166 | Boolean result = null; |
167 | try {
|
168 | result = blockingTimeCallback.call(); |
169 | } |
170 | catch(Exception e) {
|
171 | throw new RejectedExecutionException(e); |
172 | } |
173 | if(result == false) {
|
174 | throw new RejectedExecutionException("User decided to stop waiting for task insertion");
|
175 | } |
176 | else {
|
177 | continue; |
178 | } |
179 | } |
180 | |
181 | } |
182 | else {
|
183 | workQueue.put(task); |
184 | taskSent = true; |
185 | } |
186 | } |
187 | catch (InterruptedException e) {
|
188 | } |
189 | } |
190 | } |
191 | |
192 | } |
193 | |
194 | } |
From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java
/**
* This class is a specialized extension of the ThreadPoolExecutor class.
*
* Two functionalities had been added to this subclass.
* 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only
* unblock when the queue is dequeued - that is a task that is currently in the queue is removed
* and handled by the ThreadPoolExecutor.
* 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which
* actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor.
* This differs from awaitTermination as it does not require any call to shutdown.
*
* Code example:
*
* NotifyingBlockingThreadPoolExecutor threadPoolExecutor =
* new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS);
*
* for (int i = 0; i < 5000; i++) {
* threadPoolExecutor.execute(...)
* }
*
* try {
* threadPoolExecutor.await();
* } catch (InterruptedException e) {
* // Handle error
* }
*
* System.out.println("Done!");
*
* The example above shows how 5000 tasks are run within 5 threads. The line with
* 'System.out.println("Done!");' will not run until such a time when all the tasks given to the
* thread pool have concluded.
* their run.
*
* This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the
* ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of
* the core threads. This is done since threads over the core size and under the max are instantiated
* only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue
* is full.
*
* @author Yaneeve Shekel & Amir Kirsh
*/
/**
* This constructor is used in order to maintain the first functionality specified above.
* It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in
* this class.
* This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs.
* @param poolSize is the amount of threads that this pool may have alive at any given time
* @param queueSize is the size of the queue. This number should be at least as the pool size to make sense
* (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used
* for the size of the queue. Recommended value is twice the poolSize.
* @param keepAliveTime is the amount of time after which an inactive thread is terminated
* @param keepAliveTimeUnit is the unit of time to use with the previous parameter
* @param maxBlockingTime is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback
* @param maxBlockingTimeUnit is the unit of time to use with the previous parameter
* @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task,
* the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false).
* In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException
*/
download show line numbers debug dex old transpilations
Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt
No comments. add comment
| Snippet ID: | #1011718 |
| Snippet name: | NotifyingBlockingThreadPoolExecutor |
| Eternal ID of this version: | #1011718/1 |
| Text MD5: | ba502c0590c12a3eee15012ab4ac620b |
| Author: | stefan |
| Category: | javax / concurrency |
| Type: | JavaX fragment (include) |
| Public (visible to everyone): | Yes |
| Archived (hidden from active list): | No |
| Created/modified: | 2017-11-03 15:55:34 |
| Source code size: | 5247 bytes / 194 lines |
| Pitched / IR pitched: | No / No |
| Views / Downloads: | 1544 / 2546 |
| Referenced in: | [show references] |