static Producer moveProducerToNewThread(Producer producer, CloseablesHolder ch, int bufferSize default 65536) {
if (producer == null) null;
SimpleCircularBuffer buffer = new(bufferSize);
new Flag done;
new Var error;
ch.add(() -> done.raise());
startThread(r {
try {
A a;
while ping (!done.isUp() && (a = producer.next()) != null) {
while (buffer.isFull() && !done.isUp()) sleep(1);
buffer.add(a);
}
} catch e {
error.set(e);
} finally {
done.raise();
}
});
ret () -> {
while true {
A a = buffer.popFirst();
if (a != null) ret a;
if (done.isUp())
null;
else
sleep(1);
}
};
}