// We effectively turn the stream into a non-blocking stream. // There is the addition isEOF() to check whether an actual // stream end has been reached (true) or whether we are just // waiting for more data (false). // To make from ByteArraysPartialInputStream: // Drop @Override // short => byte // ubyteToInt => id // noElement => Int.MIN_VALUE sclass ArraysShortStream extends ShortInputStream { sclass State { long absolutePosition; int ofs; MinimalChain chain; *(long *absolutePosition, int *ofs, MinimalChain *chain) {} short[] data() { ret chain!; } int nBytes() { ret l(data()); } bool exhausted() { ret ofs >= nBytes(); } toString { ret withIdentity(this, "State" + bracketed( renderVars(+ofs, +nBytes(), next := chain.next))); } } bool debug, eof; MinimalChain last = new MinimalChain(null); State state = new(0, 0, last); long remaining; State mark; public synchronized void readFully(short[] buf, int off, int n) throws IOException { if (debug) printFunctionCall readFully(buf, off, n); if (n == 0) ret; if (n > available()) throw new EOFException(); assertEquals(n, read(buf, off, n)); } public synchronized int read(short[] buf, int off, int len) throws IOException { if (debug) printFunctionCall read(buf, off, len); len = min(len, available()); if (len == 0) ret -1; int n = 0; while (state.chain != last) { int nChunk = state.nBytes(); int inChunk = min(len, nChunk-state.ofs); int copiedFrom = state.ofs, nBytes = state.nBytes(); arraycopy(state.data(), copiedFrom, buf, off, inChunk); n += inChunk; off += inChunk; len -= inChunk; state.ofs += inChunk; state.absolutePosition += inChunk; remaining -= inChunk; if (debug) printVars(nBytes := state.nBytes(), +copiedFrom, +nChunk, +inChunk, +n, +off, +len); if (state.exhausted()) state = new State(state.absolutePosition, 0, state.chain.next); else break; } ret n; } public synchronized int available() { ret clampToInt(remaining); } public synchronized int read() throws IOException { if (debug) printFunctionCall read(); while ping (true) { if (!state.exhausted()) { ++state.absolutePosition; --remaining; ret state.chain![state.ofs++]; } if (state.chain.next == null) ret noElement(); state = new State(0, 0, state.chain.next); } } static int noElement() { ret Int.MIN_VALUE; } public synchronized void add aka write(short[] buffer) { if (empty(buffer)) ret; remaining += l(buffer); last.setNext(new MinimalChain(null)); last.setElement(buffer); last = last.next; } public synchronized void addEOF() { eof = true; } public synchronized bool isEOF() { ret state.chain == last && eof; } public synchronized bool markSupported() { true; } public synchronized void mark(int readLimit) { if (debug) printFunctionCall mark(readLimit); // clone state mark = new State(state.absolutePosition, state.ofs, state.chain); ifdef ByteArraysPartialInputStream_debug printVars(+mark); endifdef } public synchronized void reset() throws IOException { if (debug) printFunctionCall reset(); if (mark == null) fail("Reset without mark"); remaining += state.absolutePosition-mark.absolutePosition; state = mark; mark = null; } synchronized S stats() { ret toStringWithFields(this); } synchronized long remaining() { ret remaining; } synchronized long absolutePosition() { ret state.absolutePosition; } synchronized long endOfQueue() { ret absolutePosition()+remaining(); } }