Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

194
LINES

< > BotCompany Repo | #1011718 // NotifyingBlockingThreadPoolExecutor

JavaX fragment (include)

sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor {

	private AtomicInteger tasksInProcess = new AtomicInteger();

	private Synchronizer synchronizer = new Synchronizer();

	public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {

		super(poolSize, // Core size
				poolSize, // Max size
				keepAliveTime,
				keepAliveTimeUnit,
				new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)),
				new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback));

		super.allowCoreThreadTimeOut(true);
	}

	public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) {

		super(poolSize, // Core size
				poolSize, // Max size
				keepAliveTime,
				unit,
				new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads)
				new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try.

		super.allowCoreThreadTimeOut(true); // Time out the core threads.
	}

	@Override
	public void execute(Runnable task) {
		// count a new task in process
		tasksInProcess.incrementAndGet();
		try {
			super.execute(task);
		} catch(RuntimeException e) { // specifically handle RejectedExecutionException  
			tasksInProcess.decrementAndGet();
			throw e;
		} catch(Error e) {
			tasksInProcess.decrementAndGet();
			throw e;
		}
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		
		super.afterExecute(r, t);

		synchronized(this) {
			tasksInProcess.decrementAndGet();
			if (tasksInProcess.intValue() == 0) {
				synchronizer.signalAll();
			}
		}
	}

	@Override
	public void setCorePoolSize(int corePoolSize) {
		super.setCorePoolSize(corePoolSize);
		super.setMaximumPoolSize(corePoolSize);
	}

	@Override
	public void setMaximumPoolSize(int maximumPoolSize) {
		throw new UnsupportedOperationException("setMaximumPoolSize is not supported.");
	}
	
	public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
		throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class.");
	}

	public void await() throws InterruptedException {
		synchronizer.await();
	}

	public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
    	return synchronizer.await(timeout, timeUnit);
	}

	private class Synchronizer {

		private final Lock lock = new ReentrantLock();
		private final Condition done = lock.newCondition();
		private boolean isDone = false;

		private void signalAll() {

			lock.lock();
			try {
				isDone = true;
				done.signalAll();
			}
			finally {
				lock.unlock();
			}
		}

		public void await() throws InterruptedException {

			lock.lock();
			try {
				while (!isDone) {
					done.await();
				}
			}
			finally {
				isDone = false;
				lock.unlock();
			}
		}

		public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {

			boolean await_result = false;
			lock.lock();
			boolean localIsDone;
			try {
				await_result = done.await(timeout, timeUnit);
			}
			finally {
				localIsDone = isDone;
				isDone = false;
				lock.unlock();
			}
			return await_result && localIsDone;
		}
	}

	private static class BlockThenRunPolicy implements RejectedExecutionHandler {

		private long maxBlockingTime;
		private TimeUnit maxBlockingTimeUnit;
		private Callable<Boolean> blockingTimeCallback;

		public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {
			this.maxBlockingTime = maxBlockingTime;
			this.maxBlockingTimeUnit = maxBlockingTimeUnit;
			this.blockingTimeCallback = blockingTimeCallback;
		}

		public BlockThenRunPolicy() {
		}

		@Override
		public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {           

			BlockingQueue<Runnable> workQueue = executor.getQueue();
			boolean taskSent = false;

			while (!taskSent) {

				if (executor.isShutdown()) {
					throw new RejectedExecutionException(
							"ThreadPoolExecutor has shutdown while attempting to offer a new task.");
				}

				try {
					if(blockingTimeCallback != null) {
						if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) {
							taskSent = true;
						}
						else {
							// task was not accepted - call the Callback
							Boolean result = null;
							try {
								result = blockingTimeCallback.call();
							}
							catch(Exception e) {
								throw new RejectedExecutionException(e);
							}
							if(result == false) {
								throw new RejectedExecutionException("User decided to stop waiting for task insertion");                        		
							}
							else {
								continue;
							}
						}

					}
					else {
						workQueue.put(task);
						taskSent = true;
					}                    
				}
				catch (InterruptedException e) {
				}
			}
		}

	}

}

Author comment

From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java

/**
 * This class is a specialized extension of the ThreadPoolExecutor class.
 *
 * Two functionalities had been added to this subclass.
 * 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only
 *    unblock when the queue is dequeued - that is a task that is currently in the queue is removed
 *    and handled by the ThreadPoolExecutor.
 * 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which
 *    actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor.
 *    This differs from awaitTermination as it does not require any call to shutdown.
 *
 * Code example:
 *
 * NotifyingBlockingThreadPoolExecutor threadPoolExecutor =
 *      new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS);
 *
 * for (int i = 0; i < 5000; i++) {
 *      threadPoolExecutor.execute(...)
 * }
 *
 * try {
 *      threadPoolExecutor.await();
 * } catch (InterruptedException e) {
 *      // Handle error
 * }
 *
 * System.out.println("Done!");
 *
 * The example above shows how 5000 tasks are run within 5 threads. The line with
 * 'System.out.println("Done!");' will not run until such a time when all the tasks given to the
 * thread pool have concluded.
 * their run.
 *
 * This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the
 * ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of
 * the core threads. This is done since threads over the core size and under the max are instantiated
 * only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue
 * is full.
 *
 * @author Yaneeve Shekel & Amir Kirsh
 */

	/**
	 * This constructor is used in order to maintain the first functionality specified above.
	 * It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in
	 * this class.
	 * This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs.
	 * @param poolSize             is the amount of threads that this pool may have alive at any given time
	 * @param queueSize            is the size of the queue. This number should be at least as the pool size to make sense
	 * 							   (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used
	 * 							   for the size of the queue. Recommended value is twice the poolSize.
	 * @param keepAliveTime	       is the amount of time after which an inactive thread is terminated
	 * @param keepAliveTimeUnit    is the unit of time to use with the previous parameter
	 * @param maxBlockingTime      is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback 
	 * @param maxBlockingTimeUnit  is the unit of time to use with the previous parameter 
	 * @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task,
	 * 							   the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false).
	 * 							   In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException 
	 */

download  show line numbers  debug dex  old transpilations   

Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1011718
Snippet name: NotifyingBlockingThreadPoolExecutor
Eternal ID of this version: #1011718/1
Text MD5: ba502c0590c12a3eee15012ab4ac620b
Author: stefan
Category: javax / concurrency
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2017-11-03 15:55:34
Source code size: 5247 bytes / 194 lines
Pitched / IR pitched: No / No
Views / Downloads: 1309 / 2292
Referenced in: [show references]