Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

194
LINES

< > BotCompany Repo | #1011718 // NotifyingBlockingThreadPoolExecutor

JavaX fragment (include)

1  
sclass NotifyingBlockingThreadPoolExecutor extends ThreadPoolExecutor {
2  
3  
	private AtomicInteger tasksInProcess = new AtomicInteger();
4  
5  
	private Synchronizer synchronizer = new Synchronizer();
6  
7  
	public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {
8  
9  
		super(poolSize, // Core size
10  
				poolSize, // Max size
11  
				keepAliveTime,
12  
				keepAliveTimeUnit,
13  
				new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)),
14  
				new BlockThenRunPolicy(maxBlockingTime, maxBlockingTimeUnit, blockingTimeCallback));
15  
16  
		super.allowCoreThreadTimeOut(true);
17  
	}
18  
19  
	public NotifyingBlockingThreadPoolExecutor(int poolSize, int queueSize, long keepAliveTime, TimeUnit unit) {
20  
21  
		super(poolSize, // Core size
22  
				poolSize, // Max size
23  
				keepAliveTime,
24  
				unit,
25  
				new ArrayBlockingQueue<Runnable>(Math.max(poolSize, queueSize)), // not smaller than the poolSize (to avoid redundant threads)
26  
				new BlockThenRunPolicy()); // When super invokes the reject method this class will ensure a blocking try.
27  
28  
		super.allowCoreThreadTimeOut(true); // Time out the core threads.
29  
	}
30  
31  
	@Override
32  
	public void execute(Runnable task) {
33  
		// count a new task in process
34  
		tasksInProcess.incrementAndGet();
35  
		try {
36  
			super.execute(task);
37  
		} catch(RuntimeException e) { // specifically handle RejectedExecutionException  
38  
			tasksInProcess.decrementAndGet();
39  
			throw e;
40  
		} catch(Error e) {
41  
			tasksInProcess.decrementAndGet();
42  
			throw e;
43  
		}
44  
	}
45  
46  
	@Override
47  
	protected void afterExecute(Runnable r, Throwable t) {
48  
		
49  
		super.afterExecute(r, t);
50  
51  
		synchronized(this) {
52  
			tasksInProcess.decrementAndGet();
53  
			if (tasksInProcess.intValue() == 0) {
54  
				synchronizer.signalAll();
55  
			}
56  
		}
57  
	}
58  
59  
	@Override
60  
	public void setCorePoolSize(int corePoolSize) {
61  
		super.setCorePoolSize(corePoolSize);
62  
		super.setMaximumPoolSize(corePoolSize);
63  
	}
64  
65  
	@Override
66  
	public void setMaximumPoolSize(int maximumPoolSize) {
67  
		throw new UnsupportedOperationException("setMaximumPoolSize is not supported.");
68  
	}
69  
	
70  
	public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
71  
		throw new UnsupportedOperationException("setRejectedExecutionHandler is not allowed on this class.");
72  
	}
73  
74  
	public void await() throws InterruptedException {
75  
		synchronizer.await();
76  
	}
77  
78  
	public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
79  
    	return synchronizer.await(timeout, timeUnit);
80  
	}
81  
82  
	private class Synchronizer {
83  
84  
		private final Lock lock = new ReentrantLock();
85  
		private final Condition done = lock.newCondition();
86  
		private boolean isDone = false;
87  
88  
		private void signalAll() {
89  
90  
			lock.lock();
91  
			try {
92  
				isDone = true;
93  
				done.signalAll();
94  
			}
95  
			finally {
96  
				lock.unlock();
97  
			}
98  
		}
99  
100  
		public void await() throws InterruptedException {
101  
102  
			lock.lock();
103  
			try {
104  
				while (!isDone) {
105  
					done.await();
106  
				}
107  
			}
108  
			finally {
109  
				isDone = false;
110  
				lock.unlock();
111  
			}
112  
		}
113  
114  
		public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
115  
116  
			boolean await_result = false;
117  
			lock.lock();
118  
			boolean localIsDone;
119  
			try {
120  
				await_result = done.await(timeout, timeUnit);
121  
			}
122  
			finally {
123  
				localIsDone = isDone;
124  
				isDone = false;
125  
				lock.unlock();
126  
			}
127  
			return await_result && localIsDone;
128  
		}
129  
	}
130  
131  
	private static class BlockThenRunPolicy implements RejectedExecutionHandler {
132  
133  
		private long maxBlockingTime;
134  
		private TimeUnit maxBlockingTimeUnit;
135  
		private Callable<Boolean> blockingTimeCallback;
136  
137  
		public BlockThenRunPolicy(long maxBlockingTime, TimeUnit maxBlockingTimeUnit, Callable<Boolean> blockingTimeCallback) {
138  
			this.maxBlockingTime = maxBlockingTime;
139  
			this.maxBlockingTimeUnit = maxBlockingTimeUnit;
140  
			this.blockingTimeCallback = blockingTimeCallback;
141  
		}
142  
143  
		public BlockThenRunPolicy() {
144  
		}
145  
146  
		@Override
147  
		public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {           
148  
149  
			BlockingQueue<Runnable> workQueue = executor.getQueue();
150  
			boolean taskSent = false;
151  
152  
			while (!taskSent) {
153  
154  
				if (executor.isShutdown()) {
155  
					throw new RejectedExecutionException(
156  
							"ThreadPoolExecutor has shutdown while attempting to offer a new task.");
157  
				}
158  
159  
				try {
160  
					if(blockingTimeCallback != null) {
161  
						if (workQueue.offer(task, maxBlockingTime, maxBlockingTimeUnit)) {
162  
							taskSent = true;
163  
						}
164  
						else {
165  
							// task was not accepted - call the Callback
166  
							Boolean result = null;
167  
							try {
168  
								result = blockingTimeCallback.call();
169  
							}
170  
							catch(Exception e) {
171  
								throw new RejectedExecutionException(e);
172  
							}
173  
							if(result == false) {
174  
								throw new RejectedExecutionException("User decided to stop waiting for task insertion");                        		
175  
							}
176  
							else {
177  
								continue;
178  
							}
179  
						}
180  
181  
					}
182  
					else {
183  
						workQueue.put(task);
184  
						taskSent = true;
185  
					}                    
186  
				}
187  
				catch (InterruptedException e) {
188  
				}
189  
			}
190  
		}
191  
192  
	}
193  
194  
}

Author comment

From https://github.com/USGS-CIDA/glri-afinch/blob/master/glri-afinch-util/src/main/java/org/java/util/concurrent/NotifyingBlockingThreadPoolExecutor.java

/**
 * This class is a specialized extension of the ThreadPoolExecutor class.
 *
 * Two functionalities had been added to this subclass.
 * 1) The execute method of the ThreadPoolExecutor will block in case the queue is full and only
 *    unblock when the queue is dequeued - that is a task that is currently in the queue is removed
 *    and handled by the ThreadPoolExecutor.
 * 2) Client code can await for the event of all tasks beeing run to conclusion. Client code which
 *    actively chose to wait for this occurrence should call await on the instance of his ThreadPoolExecutor.
 *    This differs from awaitTermination as it does not require any call to shutdown.
 *
 * Code example:
 *
 * NotifyingBlockingThreadPoolExecutor threadPoolExecutor =
 *      new NotifyingBlockingThreadPoolExecutor(5, ,10, 15, TimeUnit.SECONDS);
 *
 * for (int i = 0; i < 5000; i++) {
 *      threadPoolExecutor.execute(...)
 * }
 *
 * try {
 *      threadPoolExecutor.await();
 * } catch (InterruptedException e) {
 *      // Handle error
 * }
 *
 * System.out.println("Done!");
 *
 * The example above shows how 5000 tasks are run within 5 threads. The line with
 * 'System.out.println("Done!");' will not run until such a time when all the tasks given to the
 * thread pool have concluded.
 * their run.
 *
 * This subclass of ThreadPoolExecutor also takes away the max threads capabilities of the
 * ThreadPoolExecutor superclass and internally sets the amount of maximum threads to be the size of
 * the core threads. This is done since threads over the core size and under the max are instantiated
 * only once the queue is full, but the NotifyingBlockingThreadPoolExecutor will block once the queue
 * is full.
 *
 * @author Yaneeve Shekel & Amir Kirsh
 */

	/**
	 * This constructor is used in order to maintain the first functionality specified above.
	 * It does so by using an ArrayBlockingQueue and the BlockThenRunPolicy that is defined in
	 * this class.
	 * This constructor allows to give a timeout for the wait on new task insertion and to react upon such a timeout if occurs.
	 * @param poolSize             is the amount of threads that this pool may have alive at any given time
	 * @param queueSize            is the size of the queue. This number should be at least as the pool size to make sense
	 * 							   (otherwise there are unused threads), thus if the number sent is smaller, the poolSize is used
	 * 							   for the size of the queue. Recommended value is twice the poolSize.
	 * @param keepAliveTime	       is the amount of time after which an inactive thread is terminated
	 * @param keepAliveTimeUnit    is the unit of time to use with the previous parameter
	 * @param maxBlockingTime      is the maximum time to wait on the queue of tasks before calling the BlockingTimeout callback 
	 * @param maxBlockingTimeUnit  is the unit of time to use with the previous parameter 
	 * @param blockingTimeCallback is the callback method to call when a timeout occurs while blocking on getting a new task,
	 * 							   the return value of this Callable is Boolean, indicating whether to keep blocking (true) or stop (false).
	 * 							   In case false is returned from the blockingTimeCallback, this executer will throw a RejectedExecutionException 
	 */

download  show line numbers  debug dex  old transpilations   

Travelled to 15 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, onxytkatvevr, ppjhyzlbdabe, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1011718
Snippet name: NotifyingBlockingThreadPoolExecutor
Eternal ID of this version: #1011718/1
Text MD5: ba502c0590c12a3eee15012ab4ac620b
Author: stefan
Category: javax / concurrency
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2017-11-03 15:55:34
Source code size: 5247 bytes / 194 lines
Pitched / IR pitched: No / No
Views / Downloads: 643 / 1613
Referenced in: [show references]