sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor { private AtomicInteger tasksInProcess = new AtomicInteger(); private Synchronizer synchronizer = new Synchronizer(); public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { super(poolSize, // Core size poolSize, // Max size keepAliveTime, keepAliveTimeUnit, new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback)); super.allowCoreThreadTimeOut(true); } public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) { super(poolSize, // Core size poolSize, // Max size keepAliveTime, unit, new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads) new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try. super.allowCoreThreadTimeOut(true); // Time out the core threads. } @Override public void execute(Runnable task) { // count a new task in process tasksInProcess.incrementAndGet(); try { super.execute(task); } catch(RuntimeException e) { // specifically handle RejectedExecutionException tasksInProcess.decrementAndGet(); throw e; } catch(Error e) { tasksInProcess.decrementAndGet(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); synchronized(this) { tasksInProcess.decrementAndGet(); if (tasksInProcess.intValue() == 0) { synchronizer.signalAll(); } } } @Override public void setCorePoolSize(int corePoolSize) { super.setCorePoolSize(corePoolSize); super.setMaximumPoolSize(corePoolSize); } @Override public void setMaximumPoolSize(int maximumPoolSize) { throw new UnsupportedOperationException("setMaximumPoolSize is not supported."); } public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class."); } public void await() throws InterruptedException { synchronizer.await(); } public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { return synchronizer.await(timeout, timeUnit); } private class Synchronizer { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private boolean isDone = false; private void signalAll() { lock.lock(); try { isDone = true; done.signalAll(); } finally { lock.unlock(); } } public void await() throws InterruptedException { lock.lock(); try { while (!isDone) { done.await(); } } finally { isDone = false; lock.unlock(); } } public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { boolean await_result = false; lock.lock(); boolean localIsDone; try { await_result = done.await(timeout, timeUnit); } finally { localIsDone = isDone; isDone = false; lock.unlock(); } return await_result && localIsDone; } } private static class BlockThenRunPolicy implements RejectedExecutionHandler { private long maxBlockingTime; private TimeUnit maxBlockingTimeUnit; private Callable<Boolean> blockingTimeCallback; public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) { this.maxBlockingTime = maxBlockingTime; this.maxBlockingTimeUnit = maxBlockingTimeUnit; this.blockingTimeCallback = blockingTimeCallback; } public BlockThenRunPolicy() { } @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { BlockingQueue<Runnable> workQueue = executor.getQueue(); boolean taskSent = false; while (!taskSent) { if (executor.isShutdown()) { throw new RejectedExecutionException( "ThreadPoolExecutor has shutdown while attempting to offer a new task."); } try { if(blockingTimeCallback != null) { if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) { taskSent = true; } else { // task was not accepted - call the Callback Boolean result = null; try { result = blockingTimeCallback.call(); } catch(Exception e) { throw new RejectedExecutionException(e); } if(result == false) { throw new RejectedExecutionException("User decided to stop waiting for task insertion"); } else { continue; } } } else { workQueue.put(task); taskSent = true; } } catch (InterruptedException e) { } } } } }
From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java /** * This class is a specialized extension of the ThreadPoolExecutor class. * * Two functionalities had been added to this subclass. * 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only * unblock when the queue is dequeued - that is a task that is currently in the queue is removed * and handled by the ThreadPoolExecutor. * 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which * actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor. * This differs from awaitTermination as it does not require any call to shutdown. * * Code example: * * NotifyingBlockingThreadPoolExecutor threadPoolExecutor = * new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS); * * for (int i = 0; i < 5000; i++) { * threadPoolExecutor.execute(...) * } * * try { * threadPoolExecutor.await(); * } catch (InterruptedException e) { * // Handle error * } * * System.out.println("Done!"); * * The example above shows how 5000 tasks are run within 5 threads. The line with * 'System.out.println("Done!");' will not run until such a time when all the tasks given to the * thread pool have concluded. * their run. * * This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the * ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of * the core threads. This is done since threads over the core size and under the max are instantiated * only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue * is full. * * @author Yaneeve Shekel & Amir Kirsh */ /** * This constructor is used in order to maintain the first functionality specified above. * It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in * this class. * This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs. * @param poolSize is the amount of threads that this pool may have alive at any given time * @param queueSize is the size of the queue. This number should be at least as the pool size to make sense * (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used * for the size of the queue. Recommended value is twice the poolSize. * @param keepAliveTime is the amount of time after which an inactive thread is terminated * @param keepAliveTimeUnit is the unit of time to use with the previous parameter * @param maxBlockingTime is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback * @param maxBlockingTimeUnit is the unit of time to use with the previous parameter * @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task, * the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false). * In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException */
download show line numbers debug dex old transpilations
Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt
No comments. add comment
Snippet ID: | #1011718 |
Snippet name: | NotifyingBlockingThreadPoolExecutor |
Eternal ID of this version: | #1011718/1 |
Text MD5: | ba502c0590c12a3eee15012ab4ac620b |
Author: | stefan |
Category: | javax / concurrency |
Type: | JavaX fragment (include) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2017-11-03 15:55:34 |
Source code size: | 5247 bytes / 194 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 1311 / 2293 |
Referenced in: | #1034167 - Standard Classes + Interfaces (LIVE, continuation of #1003674) |