!7 sinterface Steppable { public bool step(); // return false if done } sclass MultiThreadStepper { Runnable onDone; Lock lock = lock(); L threads = synchroList(); L steppables = synchroList(); L stepping = synchroList(); int coresToUse = coresToUse(); volatile bool cancelled; *() {} *(Steppable... steppables) { addAll(this.steppables, steppables); } *(Collection steppables) { addAll(this.steppables, steppables); } void start { print("Using " + n2(coresToUse, "core")); repeat coresToUse { thread "Multi Thread Stepper" { final Thread me = currentThread(); threads.add(me); afterwards { bool done; { lock lock; threads.remove(me); done = empty(threads); } if (done) callF(onDone); } //int i = 0; while licensed { Steppable s; { lock lock; if (done()) ret; //s = get(steppables, mod(i++, l(steppables))); s = random(steppables); if (s == null) continue with sleep(100); steppables.remove(s); stepping.add(s); } bool done = false; try { done = !s.step(); } catch e { _handleException(e); done = true; } finally { lock lock; stepping.remove(s); if (!done) steppables.add(s); } } } } } bool done() { ret empty(steppables) || cancelled; } void stop { cancelled = true; cancelThreads(threads); } void cleanMeUp { stop(); } MultiThreadStepper onDone(Runnable r) { onDone = r; this; } } static noeq record MySteppable(int n) implements Steppable { int count = 0; public bool step() { if (count++ < 10) { print("Stepping: " + n); sleep(900+random(100)); true; } print("Steppable " + n + " done"); false; } } p-exp { new MultiThreadStepper(map(iota(10), func(int i) { MySteppable(i) })).onDone(rPrint("Done!")).start(); }