// The idea is to leave max as the actual number of cores the system // has (numberOfCores()), and in case of being fully booked, raise an // alert (customerMustWaitAlert) which can be handled by a strategy // object (different reactions are possible). // If nothing is done in such an event, clients are processed serially // (no guarantees of order), split up among the available threads. /* SYNChronisation order: 1. PooledThread 2. ThreadPool */ sclass ThreadPool is AutoCloseable { int max = numberOfCores(); new L all; new Set used; new Set free; bool verbose, retired; // our own ping surce so we can start threads & keep them running class InternalPingSource extends PingSource {} new InternalPingSource internalPingSource; new MultiSleeper sleeper; *() {} *(int *max) {} synchronized int maxSize() { ret max; } synchronized int total() { ret l(used)+l(free); } event customerMustWaitAlert; void fireCustomerMustWaitAlert { vmBus_send customerMustWaitAlert(this, currentThread()); customerMustWaitAlert(); } // DOESN'T WAIT. adds action to a thread's queue if nothing is // available immediately. PooledThread acquireThreadOrQueue(Runnable action) { if (action == null) null; PooledThread t; synchronized { if (_hasFreeAfterCreating()) { t = _firstFreeThread(); markUsed(t); } else t = _anyThread(); } t.addWork(action); // will move it from free to used ret t; } // run in synchronized block bool _hasFreeAfterCreating() { checkNotRetired(); if (nempty(free)) true; if (total() < max) { PooledThread t = newThread(); all.add(t); free.add(t); true; } false; } // WAITS until thread is available PooledThread acquireThreadOrWait(Runnable action) ctex { if (action == null) null; PooledThread t; while true { synchronized { if (_hasFreeAfterCreating()) { t = _firstFreeThread(); break; } else _waitWaitWait(); } } t.addWork(action); ret t; } PooledThread _firstFreeThread() { ret first(free); } PooledThread _anyThread() { ret random(used); } class PooledThread extends Thread { *(S name) { super(name); } AppendableChain q; synchronized Runnable _grabWorkOrSleep() ctex { Runnable r = first(q); if (r == null) { markFree(this); if (verbose) print("Thread sleeps"); synchronized { wait(); } if (verbose) print("Thread woke up"); null; } q = popFirst(q); ret r; } run { pingSource_tl().set(internalPingSource); while ping (!retired()) { Runnable r = _grabWorkOrSleep(); if (verbose) print(this + " work: " + r); if (r != null) try { if (verbose) print(this + " running: " + r); r.run(); pingSource_tl().set(internalPingSource); if (verbose) print(this + " done"); } catch e { pingSource_tl().set(internalPingSource); if (verbose) print(this + " error"); printStackTrace(e); } finally { pingSource_tl().set(internalPingSource); if (verbose) print("ThreadPool finally"); } } } synchronized bool isEmpty() { ret empty(q); } // append to q (do later) void addWork(Runnable r) { if (verbose) print("Added work to " + this + ": " + r); synchronized { q = chainPlus(q, r); notifyAll(); } } } PooledThread newThread() { PooledThread t = new("Thread Pool Inhabitant " + n2(total()+1)); t.start(); ret t; } synchronized void markFree(PooledThread t) { used.remove(t); free.add(t); notifyAll(); } synchronized void markUsed(PooledThread t) { free.remove(t); used.add(t); } synchronized toString { ret retired() ? "Retired ThreadPool" : "ThreadPool " + roundBracket(commaCombine( n2(used) + " used out of " + n2(total()), max <= total() ? null : "could grow to " + n2(max)); } synchronized bool retired() { ret retired; } synchronized void retire() { if (verbose) print("ThreadPool Retiring"); set retired; for (thread : free) syncNotifyAll(thread); // wake it up so it exits } void checkNotRetired { if (retired()) fail("retired"); } // We could do a soft-close here (stop the idle threads, let running threads finish, then end those too, stop accepting new orders) // or a hard close (interrupt all threads, stop accepting new orders) synchronized close { retire(); } // run in synchronized block void _waitWaitWait ctex { do { fireCustomerMustWaitAlert(); wait(); checkNotRetired(); } while (empty(free)); } void do(S text, Runnable r) { if (r == null) ret; new PingSource(this, text).do(r); } ISleeper_v2 sleeper() { ret sleeper; } }