sclass SourceTriggeredStream extends Meta {
DoneFlag ended = new(true);
volatile A lastElement;
new AtomicLong elementCount;
event newElement(A a) {
lastElement = a;
inc(elementCount);
}
void onNewElement(Runnable r) { onNewElement(runnableToIVF1(r)); }
A get() { ret lastElement; }
long elementCount() { ret elementCount!; }
void end { ended.raise(); }
bool ended() { ret ended.isUp(); }
void catchError(Runnable r) {
main catchError(ended, r);
}
// on new element, call consumer in same thread
void directlyFeedInto(Consumer consumer) {
if (consumer != null)
onNewElement(a -> consumer.accept(a));
}
}