Libraryless. Click here for Pure Java version (6781L/38K).
1 | // The idea is to leave max as the actual number of cores the system |
2 | // has (numberOfCores()), and in case of being fully booked, raise an |
3 | // alert (customerMustWaitAlert) which can be handled by a strategy |
4 | // object (different reactions are possible). |
5 | // If nothing is done in such an event, clients are processed serially |
6 | // (no guarantees of order), split up among the available threads. |
7 | |
8 | /* SYNChronisation order: |
9 | 1. PooledThread |
10 | 2. ThreadPool */ |
11 | |
12 | sclass ThreadPool is AutoCloseable { |
13 | int max = numberOfCores(); |
14 | new L<PooledThread> all; |
15 | new Set<PooledThread> used; |
16 | new Set<PooledThread> free; |
17 | bool verbose, retired; |
18 | |
19 | // our own ping surce so we can start threads & keep them running |
20 | class InternalPingSource extends PingSource {} |
21 | new InternalPingSource internalPingSource; |
22 | |
23 | new MultiSleeper sleeper; |
24 | |
25 | *() {} |
26 | *(int *max) {} |
27 | |
28 | synchronized int maxSize() { ret max; } |
29 | synchronized int total() { ret l(used)+l(free); } |
30 | |
31 | event customerMustWaitAlert; |
32 | |
33 | void fireCustomerMustWaitAlert { |
34 | vmBus_send customerMustWaitAlert(this, currentThread()); |
35 | customerMustWaitAlert(); |
36 | } |
37 | |
38 | // DOESN'T WAIT. adds action to a thread's queue if nothing is |
39 | // available immediately. |
40 | PooledThread acquireThreadOrQueue(Runnable action) { |
41 | if (action == null) null; |
42 | PooledThread t; |
43 | synchronized { |
44 | if (_hasFreeAfterCreating()) { |
45 | t = _firstFreeThread(); |
46 | markUsed(t); |
47 | } else |
48 | t = _anyThread(); |
49 | } |
50 | |
51 | t.addWork(action); // will move it from free to used |
52 | ret t; |
53 | } |
54 | |
55 | // run in synchronized block |
56 | bool _hasFreeAfterCreating() { |
57 | checkNotRetired(); |
58 | if (nempty(free)) true; |
59 | if (total() < max) { |
60 | PooledThread t = newThread(); |
61 | all.add(t); |
62 | free.add(t); |
63 | true; |
64 | } |
65 | false; |
66 | } |
67 | |
68 | // WAITS until thread is available |
69 | PooledThread acquireThreadOrWait(Runnable action) ctex { |
70 | if (action == null) null; |
71 | PooledThread t; |
72 | while true { |
73 | synchronized { |
74 | if (_hasFreeAfterCreating()) { |
75 | t = _firstFreeThread(); |
76 | break; |
77 | } else |
78 | _waitWaitWait(); |
79 | } |
80 | } |
81 | t.addWork(action); |
82 | ret t; |
83 | } |
84 | |
85 | PooledThread _firstFreeThread() { |
86 | ret first(free); |
87 | } |
88 | |
89 | PooledThread _anyThread() { |
90 | ret random(used); |
91 | } |
92 | |
93 | class PooledThread extends Thread { |
94 | *(S name) { super(name); } |
95 | |
96 | AppendableChain<Runnable> q; |
97 | |
98 | synchronized Runnable _grabWorkOrSleep() ctex { |
99 | Runnable r = first(q); |
100 | if (r == null) { |
101 | markFree(this); |
102 | if (verbose) print("Thread sleeps"); |
103 | synchronized { wait(); } |
104 | if (verbose) print("Thread woke up"); |
105 | null; |
106 | } |
107 | q = popFirst(q); |
108 | ret r; |
109 | } |
110 | |
111 | run { |
112 | pingSource_tl().set(internalPingSource); |
113 | while ping (!retired()) { |
114 | Runnable r = _grabWorkOrSleep(); |
115 | if (verbose) print(this + " work: " + r); |
116 | if (r != null) |
117 | try { |
118 | if (verbose) print(this + " running: " + r); |
119 | r.run(); |
120 | pingSource_tl().set(internalPingSource); |
121 | if (verbose) print(this + " done"); |
122 | } catch e { |
123 | pingSource_tl().set(internalPingSource); |
124 | if (verbose) print(this + " error"); |
125 | printStackTrace(e); |
126 | } finally { |
127 | pingSource_tl().set(internalPingSource); |
128 | if (verbose) print("ThreadPool finally"); |
129 | } |
130 | } |
131 | } |
132 | |
133 | synchronized bool isEmpty() { ret empty(q); } |
134 | |
135 | // append to q (do later) |
136 | void addWork(Runnable r) { |
137 | if (verbose) print("Added work to " + this + ": " + r); |
138 | synchronized { |
139 | q = chainPlus(q, r); |
140 | notifyAll(); |
141 | } |
142 | } |
143 | } |
144 | |
145 | PooledThread newThread() { |
146 | PooledThread t = new("Thread Pool Inhabitant " + n2(total()+1)); |
147 | t.start(); |
148 | ret t; |
149 | } |
150 | |
151 | synchronized void markFree(PooledThread t) { |
152 | used.remove(t); |
153 | free.add(t); |
154 | notifyAll(); |
155 | } |
156 | |
157 | synchronized void markUsed(PooledThread t) { |
158 | free.remove(t); |
159 | used.add(t); |
160 | } |
161 | |
162 | synchronized toString { |
163 | ret retired() |
164 | ? "Retired ThreadPool" |
165 | : "ThreadPool " + roundBracket(commaCombine( |
166 | n2(used) + " used out of " + n2(total()), |
167 | max <= total() ? null : "could grow to " + n2(max)); |
168 | } |
169 | |
170 | synchronized bool retired() { ret retired; } |
171 | synchronized void retire() { |
172 | if (verbose) print("ThreadPool Retiring"); |
173 | set retired; |
174 | for (thread : free) syncNotifyAll(thread); // wake it up so it exits |
175 | } |
176 | void checkNotRetired { |
177 | if (retired()) fail("retired"); |
178 | } |
179 | |
180 | // We could do a soft-close here (stop the idle threads, let running threads finish, then end those too, stop accepting new orders) |
181 | // or a hard close (interrupt all threads, stop accepting new orders) |
182 | synchronized close { |
183 | retire(); |
184 | } |
185 | |
186 | // run in synchronized block |
187 | void _waitWaitWait ctex { |
188 | do { |
189 | fireCustomerMustWaitAlert(); |
190 | wait(); |
191 | checkNotRetired(); |
192 | } while (empty(free)); |
193 | } |
194 | |
195 | void do(S text, Runnable r) { |
196 | if (r == null) ret; |
197 | new PingSource(this, text).do(r); |
198 | } |
199 | |
200 | ISleeper_v2 sleeper() { ret sleeper; } |
201 | } |
download show line numbers debug dex old transpilations
Travelled to 4 computer(s): bhatertpkbcr, ekrmjmnbrukm, mowyntqkapby, mqqgnosmbjvj
No comments. add comment
Snippet ID: | #1033401 |
Snippet name: | ThreadPool [seems to work] |
Eternal ID of this version: | #1033401/70 |
Text MD5: | 7cc4fc4e1a67180c764962065ad26535 |
Transpilation MD5: | 768813cb471ab25d49ef62edf67787c1 |
Author: | stefan |
Category: | javax / code execution |
Type: | JavaX fragment (include) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2021-11-03 17:07:18 |
Source code size: | 5365 bytes / 201 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 316 / 618 |
Version history: | 69 change(s) |
Referenced in: | [show references] |