sclass SourceTriggeredStream extends Meta { DoneFlag ended = new(true); volatile A lastElement; new AtomicLong elementCount; event newElement(A a) { lastElement = a; inc(elementCount); } void onNewElement(Runnable r) { onNewElement(runnableToIVF1(r)); } A get() { ret lastElement; } long elementCount() { ret elementCount!; } void end { ended.raise(); } bool ended() { ret ended.isUp(); } void catchError(Runnable r) { main catchError(ended, r); } // on new element, call consumer in same thread void directlyFeedInto(Consumer consumer) { if (consumer != null) onNewElement(a -> consumer.accept(a)); } }