1 | sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor { |
2 | |
3 | private AtomicInteger tasksInProcess = new AtomicInteger(); |
4 | |
5 | private Synchronizer synchronizer = new Synchronizer(); |
6 | |
7 | public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { |
8 | |
9 | super(poolSize, // Core size |
10 | poolSize, // Max size |
11 | keepAliveTime, |
12 | keepAliveTimeUnit, |
13 | new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), |
14 | new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback)); |
15 | |
16 | super.allowCoreThreadTimeOut(true); |
17 | } |
18 | |
19 | public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) { |
20 | |
21 | super(poolSize, // Core size |
22 | poolSize, // Max size |
23 | keepAliveTime, |
24 | unit, |
25 | new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads) |
26 | new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try. |
27 | |
28 | super.allowCoreThreadTimeOut(true); // Time out the core threads. |
29 | } |
30 | |
31 | @Override |
32 | public void execute(Runnable task) { |
33 | // count a new task in process |
34 | tasksInProcess.incrementAndGet(); |
35 | try { |
36 | super.execute(task); |
37 | } catch(RuntimeException e) { // specifically handle RejectedExecutionException |
38 | tasksInProcess.decrementAndGet(); |
39 | throw e; |
40 | } catch(Error e) { |
41 | tasksInProcess.decrementAndGet(); |
42 | throw e; |
43 | } |
44 | } |
45 | |
46 | @Override |
47 | protected void afterExecute(Runnable r, Throwable t) { |
48 | |
49 | super.afterExecute(r, t); |
50 | |
51 | synchronized(this) { |
52 | tasksInProcess.decrementAndGet(); |
53 | if (tasksInProcess.intValue() == 0) { |
54 | synchronizer.signalAll(); |
55 | } |
56 | } |
57 | } |
58 | |
59 | @Override |
60 | public void setCorePoolSize(int corePoolSize) { |
61 | super.setCorePoolSize(corePoolSize); |
62 | super.setMaximumPoolSize(corePoolSize); |
63 | } |
64 | |
65 | @Override |
66 | public void setMaximumPoolSize(int maximumPoolSize) { |
67 | throw new UnsupportedOperationException("setMaximumPoolSize is not supported."); |
68 | } |
69 | |
70 | public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { |
71 | throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class."); |
72 | } |
73 | |
74 | public void await() throws InterruptedException { |
75 | synchronizer.await(); |
76 | } |
77 | |
78 | public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { |
79 | return synchronizer.await(timeout, timeUnit); |
80 | } |
81 | |
82 | private class Synchronizer { |
83 | |
84 | private final Lock lock = new ReentrantLock(); |
85 | private final Condition done = lock.newCondition(); |
86 | private boolean isDone = false; |
87 | |
88 | private void signalAll() { |
89 | |
90 | lock.lock(); |
91 | try { |
92 | isDone = true; |
93 | done.signalAll(); |
94 | } |
95 | finally { |
96 | lock.unlock(); |
97 | } |
98 | } |
99 | |
100 | public void await() throws InterruptedException { |
101 | |
102 | lock.lock(); |
103 | try { |
104 | while (!isDone) { |
105 | done.await(); |
106 | } |
107 | } |
108 | finally { |
109 | isDone = false; |
110 | lock.unlock(); |
111 | } |
112 | } |
113 | |
114 | public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { |
115 | |
116 | boolean await_result = false; |
117 | lock.lock(); |
118 | boolean localIsDone; |
119 | try { |
120 | await_result = done.await(timeout, timeUnit); |
121 | } |
122 | finally { |
123 | localIsDone = isDone; |
124 | isDone = false; |
125 | lock.unlock(); |
126 | } |
127 | return await_result && localIsDone; |
128 | } |
129 | } |
130 | |
131 | private static class BlockThenRunPolicy implements RejectedExecutionHandler { |
132 | |
133 | private long maxBlockingTime; |
134 | private TimeUnit maxBlockingTimeUnit; |
135 | private Callable<Boolean> blockingTimeCallback; |
136 | |
137 | public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { |
138 | this.maxBlockingTime = maxBlockingTime; |
139 | this.maxBlockingTimeUnit = maxBlockingTimeUnit; |
140 | this.blockingTimeCallback = blockingTimeCallback; |
141 | } |
142 | |
143 | public BlockThenRunPolicy() { |
144 | } |
145 | |
146 | @Override |
147 | public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { |
148 | |
149 | BlockingQueue<Runnable> workQueue = executor.getQueue(); |
150 | boolean taskSent = false; |
151 | |
152 | while (!taskSent) { |
153 | |
154 | if (executor.isShutdown()) { |
155 | throw new RejectedExecutionException( |
156 | "ThreadPoolExecutor has shutdown while attempting to offer a new task."); |
157 | } |
158 | |
159 | try { |
160 | if(blockingTimeCallback != null) { |
161 | if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) { |
162 | taskSent = true; |
163 | } |
164 | else { |
165 | // task was not accepted - call the Callback |
166 | Boolean result = null; |
167 | try { |
168 | result = blockingTimeCallback.call(); |
169 | } |
170 | catch(Exception e) { |
171 | throw new RejectedExecutionException(e); |
172 | } |
173 | if(result == false) { |
174 | throw new RejectedExecutionException("User decided to stop waiting for task insertion"); |
175 | } |
176 | else { |
177 | continue; |
178 | } |
179 | } |
180 | |
181 | } |
182 | else { |
183 | workQueue.put(task); |
184 | taskSent = true; |
185 | } |
186 | } |
187 | catch (InterruptedException e) { |
188 | } |
189 | } |
190 | } |
191 | |
192 | } |
193 | |
194 | } |
From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java /** * This class is a specialized extension of the ThreadPoolExecutor class. * * Two functionalities had been added to this subclass. * 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only * unblock when the queue is dequeued - that is a task that is currently in the queue is removed * and handled by the ThreadPoolExecutor. * 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which * actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor. * This differs from awaitTermination as it does not require any call to shutdown. * * Code example: * * NotifyingBlockingThreadPoolExecutor threadPoolExecutor = * new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS); * * for (int i = 0; i < 5000; i++) { * threadPoolExecutor.execute(...) * } * * try { * threadPoolExecutor.await(); * } catch (InterruptedException e) { * // Handle error * } * * System.out.println("Done!"); * * The example above shows how 5000 tasks are run within 5 threads. The line with * 'System.out.println("Done!");' will not run until such a time when all the tasks given to the * thread pool have concluded. * their run. * * This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the * ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of * the core threads. This is done since threads over the core size and under the max are instantiated * only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue * is full. * * @author Yaneeve Shekel & Amir Kirsh */ /** * This constructor is used in order to maintain the first functionality specified above. * It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in * this class. * This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs. * @param poolSize is the amount of threads that this pool may have alive at any given time * @param queueSize is the size of the queue. This number should be at least as the pool size to make sense * (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used * for the size of the queue. Recommended value is twice the poolSize. * @param keepAliveTime is the amount of time after which an inactive thread is terminated * @param keepAliveTimeUnit is the unit of time to use with the previous parameter * @param maxBlockingTime is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback * @param maxBlockingTimeUnit is the unit of time to use with the previous parameter * @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task, * the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false). * In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException */
download show line numbers debug dex old transpilations
Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt
No comments. add comment
Snippet ID: | #1011718 |
Snippet name: | NotifyingBlockingThreadPoolExecutor |
Eternal ID of this version: | #1011718/1 |
Text MD5: | ba502c0590c12a3eee15012ab4ac620b |
Author: | stefan |
Category: | javax / concurrency |
Type: | JavaX fragment (include) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2017-11-03 15:55:34 |
Source code size: | 5247 bytes / 194 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 1310 / 2293 |
Referenced in: | [show references] |