static Producer moveProducerToNewThread(Producer producer, CloseablesHolder ch, int bufferSize default 65536) { if (producer == null) null; SimpleCircularBuffer buffer = new(bufferSize); new Flag done; new Var error; ch.add(() -> done.raise()); startThread(r { try { A a; while ping (!done.isUp() && (a = producer.next()) != null) { ifdef moveProducerToNewThread_debug print("original producer returned: " + a); endifdef while (buffer.isFull() && !done.isUp()) sleep(1); buffer.add(a); } } catch e { error.set(e); } finally { done.raise(); } }); ret () -> { while true { bool isDone = done.isUp(); A a = buffer.popFirst(); if (a != null) ret a; if (isDone) null; else sleep(1); } }; }