static Producer moveProducerToNewThread(Producer producer, CloseablesHolder ch, int bufferSize default 65536) {
if (producer == null) null;
SimpleCircularBuffer buffer = new(bufferSize);
new Flag done;
new Var error;
ch.add(() -> done.raise());
startThread(r {
try {
A a;
while ping (!done.isUp() && (a = producer.next()) != null) {
ifdef moveProducerToNewThread_debug
print("original producer returned: " + a);
endifdef
while (buffer.isFull() && !done.isUp()) sleep(1);
buffer.add(a);
}
} catch e {
error.set(e);
} finally {
done.raise();
}
});
ret () -> {
while true {
bool isDone = done.isUp();
A a = buffer.popFirst();
if (a != null) ret a;
if (isDone)
null;
else
sleep(1);
}
};
}