// We effectively turn the stream into a non-blocking stream. // There is the addition isEOF() to check whether an actual // stream end has been reached (true) or whether we are just // waiting for more data (false). sclass ByteArraysPartialInputStream extends InputStreamPlusReadFully { sclass State { int ofs; MinimalChain chain; *(int *ofs, MinimalChain *chain) {} nBytes() { ret l(chain!); } } srecord Mark(State state, long absolutePosition, long remaining) {} bool eof; MinimalChain last = new MinimalChain(null); State state = new(0, last); long absolutePosition, remaining; Mark mark; @Override public synchronized void readFully(byte[] buf, int off, int n) throws IOException { if (n == 0) ret; if (n > available()) throw new EOFException(); assertEquals(n, read(buf, off, n)); } @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { len = min(len, available()); if (len == 0) ret -1; int n = 0; int nChunk = state.nBytes(); int inChunk = min(len, nChunk-state.ofs); arraycopy(state.chain!, 0, buf, off, inChunk); n += inChunk; off += inChunk; len -= inChunk; if (len == 0) ret n; MinimalChain c = state.chain.next; while (c != last) { inChunk = min(len, l(c!)); arraycopy(c!, 0, buf, off, inChunk); n += inChunk; off += inChunk; len -= inChunk; if (len == 0) ret n; c = c.next; } ret n; } @Override public int available() { ret clampToInt(remaining); } @Override public synchronized int read() throws IOException { while ping (true) { if (state.ofs < l(state.chain!)) { ++absolutePosition; --remaining; ret state.chain![state.ofs++]; } if (state.chain.next == null) ret -1; state = new State(0, state.chain.next); } } public synchronized void add aka write(byte[] buffer) { if (empty(buffer)) ret; remaining += l(buffer); last.setNext(new MinimalChain(null)); last.setElement(buffer); last = last.next; } public synchronized void addEOF() { eof = true; } public synchronized bool isEOF() { ret state.chain == last && eof; } @Override public synchronized bool markSupported() { true; } @Override public synchronized void mark(int readLimit) { mark = new Mark( new State(state.ofs, state.chain), // clone state absolutePosition, remaining); } @Override public synchronized void reset() throws IOException { if (mark != null) fail(); state = mark.state; absolutePosition = mark.absolutePosition; remaining = mark.remaining; mark = null; } }