sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor { private AtomicInteger tasksInProcess = new AtomicInteger(); private Synchronizer synchronizer = new Synchronizer(); public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { super(poolSize, // Core size poolSize, // Max size keepAliveTime, keepAliveTimeUnit, new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback)); super.allowCoreThreadTimeOut(true); } public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) { super(poolSize, // Core size poolSize, // Max size keepAliveTime, unit, new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads) new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try. super.allowCoreThreadTimeOut(true); // Time out the core threads. } @Override public void execute(Runnable task) { // count a new task in process tasksInProcess.incrementAndGet(); try { super.execute(task); } catch(RuntimeException e) { // specifically handle RejectedExecutionException tasksInProcess.decrementAndGet(); throw e; } catch(Error e) { tasksInProcess.decrementAndGet(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); synchronized(this) { tasksInProcess.decrementAndGet(); if (tasksInProcess.intValue() == 0) { synchronizer.signalAll(); } } } @Override public void setCorePoolSize(int corePoolSize) { super.setCorePoolSize(corePoolSize); super.setMaximumPoolSize(corePoolSize); } @Override public void setMaximumPoolSize(int maximumPoolSize) { throw new UnsupportedOperationException("setMaximumPoolSize is not supported."); } public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class."); } public void await() throws InterruptedException { synchronizer.await(); } public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { return synchronizer.await(timeout, timeUnit); } private class Synchronizer { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private boolean isDone = false; private void signalAll() { lock.lock(); try { isDone = true; done.signalAll(); } finally { lock.unlock(); } } public void await() throws InterruptedException { lock.lock(); try { while (!isDone) { done.await(); } } finally { isDone = false; lock.unlock(); } } public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { boolean await_result = false; lock.lock(); boolean localIsDone; try { await_result = done.await(timeout, timeUnit); } finally { localIsDone = isDone; isDone = false; lock.unlock(); } return await_result && localIsDone; } } private static class BlockThenRunPolicy implements RejectedExecutionHandler { private long maxBlockingTime; private TimeUnit maxBlockingTimeUnit; private Callable<Boolean> blockingTimeCallback; public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { this.maxBlockingTime = maxBlockingTime; this.maxBlockingTimeUnit = maxBlockingTimeUnit; this.blockingTimeCallback = blockingTimeCallback; } public BlockThenRunPolicy() { } @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { BlockingQueue<Runnable> workQueue = executor.getQueue(); boolean taskSent = false; while (!taskSent) { if (executor.isShutdown()) { throw new RejectedExecutionException( "ThreadPoolExecutor has shutdown while attempting to offer a new task."); } try { if(blockingTimeCallback != null) { if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) { taskSent = true; } else { // task was not accepted - call the Callback Boolean result = null; try { result = blockingTimeCallback.call(); } catch(Exception e) { throw new RejectedExecutionException(e); } if(result == false) { throw new RejectedExecutionException("User decided to stop waiting for task insertion"); } else { continue; } } } else { workQueue.put(task); taskSent = true; } } catch (InterruptedException e) { } } } } }
From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java
/**
* This class is a specialized extension of the ThreadPoolExecutor class.
*
* Two functionalities had been added to this subclass.
* 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only
* unblock when the queue is dequeued - that is a task that is currently in the queue is removed
* and handled by the ThreadPoolExecutor.
* 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which
* actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor.
* This differs from awaitTermination as it does not require any call to shutdown.
*
* Code example:
*
* NotifyingBlockingThreadPoolExecutor threadPoolExecutor =
* new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS);
*
* for (int i = 0; i < 5000; i++) {
* threadPoolExecutor.execute(...)
* }
*
* try {
* threadPoolExecutor.await();
* } catch (InterruptedException e) {
* // Handle error
* }
*
* System.out.println("Done!");
*
* The example above shows how 5000 tasks are run within 5 threads. The line with
* 'System.out.println("Done!");' will not run until such a time when all the tasks given to the
* thread pool have concluded.
* their run.
*
* This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the
* ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of
* the core threads. This is done since threads over the core size and under the max are instantiated
* only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue
* is full.
*
* @author Yaneeve Shekel & Amir Kirsh
*/
/**
* This constructor is used in order to maintain the first functionality specified above.
* It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in
* this class.
* This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs.
* @param poolSize is the amount of threads that this pool may have alive at any given time
* @param queueSize is the size of the queue. This number should be at least as the pool size to make sense
* (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used
* for the size of the queue. Recommended value is twice the poolSize.
* @param keepAliveTime is the amount of time after which an inactive thread is terminated
* @param keepAliveTimeUnit is the unit of time to use with the previous parameter
* @param maxBlockingTime is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback
* @param maxBlockingTimeUnit is the unit of time to use with the previous parameter
* @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task,
* the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false).
* In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException
*/
download show line numbers debug dex old transpilations
Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt
No comments. add comment
| Snippet ID: | #1011718 |
| Snippet name: | NotifyingBlockingThreadPoolExecutor |
| Eternal ID of this version: | #1011718/1 |
| Text MD5: | ba502c0590c12a3eee15012ab4ac620b |
| Author: | stefan |
| Category: | javax / concurrency |
| Type: | JavaX fragment (include) |
| Public (visible to everyone): | Yes |
| Archived (hidden from active list): | No |
| Created/modified: | 2017-11-03 15:55:34 |
| Source code size: | 5247 bytes / 194 lines |
| Pitched / IR pitched: | No / No |
| Views / Downloads: | 1545 / 2547 |
| Referenced in: | #1034167 - Standard Classes + Interfaces (LIVE, continuation of #1003674) |