persistable sclass TickerSequence is ByteIO, IntSize, IntegrityCheckable { settable S market; // e.g. "TRBUSDT" // in case the sequence is digitized settable PriceCells priceCells; // are we live? settable bool live; // Is the data incomplete (still loading)? settable bool loading; ILongBuffer timestamps = new SynchronizedLongBuffer; IDoubleBuffer prices = new SynchronizedFloatBufferPresentingAsDoubles; // timestamp of last ticker event even though price may not // have changed settable long lastTickerEvent; settable double epsilon = 1e-6; event pricePointAdded(long timestamp); event newTickerEvent; *(S *market) {} void addIfPriceChanged(double price, Timestamp timestamp) { addIfPriceChanged(price, toLong(timestamp)); } synchronized void addIfPriceChanged(double price, long timestamp) { if (isEmpty() || differentByEpsilonRatio(prices.last(), price, epsilon)) add(price, timestamp); updateTickerEvent(timestamp); } void updateTickerEvent(long timestamp) { if (timestamp > lastTickerEvent) { lastTickerEvent(timestamp); newTickerEvent(); } } synchronized void add aka addPrice(double price, long timestamp) { timestamps.add(timestamp); prices.add(price); pricePointAdded(timestamp); updateTickerEvent(timestamp); } synchronized void add(double[] prices, long[] timestamps) { assertEquals(l(prices), l(timestamps)); if (empty(prices)) ret; // TODO: call pricePointAdded? this.prices.addAll(asList(prices)); this.timestamps.addAll(asList(timestamps)); lastTickerEvent = max(lastTickerEvent, endTime()); } synchronized void add(TickerSequence ticker) { add(ticker.prices.toArray(), ticker.timestamps.toArray()); } synchronized toString { ret commaCombine( spaceCombine( "Ticker", market, priceCells == null ?: roundBracket(priceCells), ), formatDays(duration()), n2(l(prices), "price point"), timeRange() ); } public synchronized TickerSequence clone() { ret subSequence(0, size()); } TickerSequence emptyClone() { ret new TickerSequence().market(market); } synchronized TickerSequence subSequence(int start, int end default size()) { var s = emptyClone(); s.add( cloneSubDoubleArray(prices.toArray(), start, end), cloneSubLongArray(timestamps.toArray(), start, end)); ret s; } synchronized TickerSequence cutOffAfterSeconds(double seconds) { int idx = indexOfTimestamp(startTime()+secondsToMS(seconds)); ret subSequence(0, idx); } TickerSequence dropFirstDays(double days) { ret subSequenceFromTimestamp(startTime()+daysToMS(days)); } TickerSequence takeFirstDays(double days) { ret cutOffAfterSeconds(daysToSeconds(days)); } TickerSequence takeFirstHours(double days) { ret cutOffAfterSeconds(hoursToSeconds(days)); } TickerSequence takeLastDays(double days) { ret subSequenceFromTimestamp(endTime()-daysToMS(days)); } TickerSequence takeLastHours(double hours) { ret takeLastDays(hoursToDays(hours)); } TickerSequence takeLastMinutes(double minutes) { ret takeLastDays(minutesToDays(minutes)); } synchronized TickerSequence subSequenceByTimestamps(long from, long to) { int idx1 = indexOfTimestamp(from); int idx2 = indexOfTimestamp(to); ret subSequence(idx1, idx2); } synchronized TickerSequence subSequenceFromTimestamp(long from) { ret subSequence(indexOfTimestamp(from)); } synchronized TickerSequence subSequenceToTimestamp(long to) { ret subSequence(0, indexOfTimestamp(to)); } // TODO: shouldn't we subtract one here? synchronized int indexOfTimestamp(double ts) { ret binarySearch_insertionPoint(timestamps.asVirtualList(), lround(ts)); } synchronized int indexOfTimestampAtOrBefore(double ts) { ret binarySearch_tipToTheLeft(timestamps.asVirtualList(), lround(ts)); } synchronized long nextTimestamp(double ts) { ret getTimestamp(indexOfTimestamp(ts)+1); } synchronized double priceAtTimestamp(double ts) { ret getPrice(indexOfTimestampAtOrBefore(ts)); } public int size() { ret l(prices); } public bool isEmpty() { ret size() == 0; } synchronized TimestampRange timeRange() { ret isEmpty() ? null : TimestampRange(startTime(), endTime()); } Duration duration() { ret main duration(timeRange()); } synchronized long startTime() { ret empty(timestamps) ? 0 : first(timestamps); } synchronized long endTime() { ret max(lastTickerEvent, empty(timestamps) ? 0 : last(timestamps)); } synchronized double getPrice(int i) { ret isEmpty() ? Double.NaN : prices.get(clampToLength(size(), i)); } synchronized long getTimestamp(int i) { ret isEmpty() ? 0 : timestamps.get(clampToLength(size(), i)); } synchronized TickerSequence removePlateaus() { var seq = emptyClone(); int n = size(); for i to n: { var value = getPrice(i); seq.prices.add(value); seq.timestamps.add(getTimestamp(i)); while (i+1 < n && getPrice(i+1) == value) ++i; } ret seq.trimToSize(); } synchronized TickerSequence removeZeros() { if (!prices.contains(0)) this; var seq = emptyClone(); int n = size(); for i to n: { var value = getPrice(i); if (value != 0) { seq.prices.add(value); seq.timestamps.add(getTimestamp(i)); } } ret seq.trimToSize(); } synchronized TickerSequence alignTimestamps(int interval) { var seq = emptyClone(); int n = size(); for i to n: { var value = getPrice(i); long time = roundDownTo(interval, getTimestamp(i)); if (time != seq.endTime()) seq.addIfPriceChanged(value, time); } ret seq.trimToSize(); } selfType trimToSize() { prices.trimToSize(); timestamps.trimToSize(); this; } double minPrice() { ret doubleMin(prices.toArray()); } double maxPrice() { ret doubleMax(prices.toArray()); } DoubleRange priceRange() { ret doubleRange(minPrice(), maxPrice()); } synchronized double firstPrice() { ret empty(prices) ? Double.NaN : prices.first(); } synchronized double lastPrice() { ret empty(prices) ? Double.NaN : prices.last(); } synchronized TickerPoint lastPoint() { ret new TickerPoint(this, endTime()); } static Pattern reBestAsk = regexp("\"bestAsk\":([0-9\\.]+)"); static Pattern reSystemTime = regexp("\"systemTime\":(\\d+)"); // parse line from a .ticker file synchronized void addJSONLine_fast(S line) { long time = toLong(regexpFirstGroup(reSystemTime, line)); if (time > lastTickerEvent) { double price = toDouble(regexpFirstGroup(reBestAsk, line)); if (price != 0) addIfPriceChanged(price, time); } } swappable void addJSONLine(S line) { addJSONLine_fast(line); } synchronized void addJSONLine_slow(S line) { Map data = parseJSONMap(line); double price = toDouble(data.get("bestAsk")); long time = toLong(data.get("systemTime")); if (time > lastTickerEvent) addIfPriceChanged(price, time); } UpDownSequence toUpDownSequence aka upDownSequence(PriceCells cells default priceCells) { if (cells == null) null; var dig = digitize(new PriceDigitizer(cells)); L cellNumbers = dig.roundedCellNumbers(); ret UpDownSequence.fromInts(cellNumbers); } TickerSequence digitizeToPercent(double basePrice default firstPrice(), double cellSizeInPercent) { if (isEmpty()) this; ret digitize(geometricPriceDigitizer(basePrice, cellSizeInPercent)); } TickerSequence digitize(PriceDigitizer digitizer) { var seq = emptyClone(); seq.priceCells(digitizer.priceCells()); int n = size(); for i to n: { seq.addIfPriceChanged(digitizer.digitize(getPrice(i)), getTimestamp(i)); } ret seq.trimToSize(); } L pricePoints() { ret listFromFunction getPricePoint(size()); } synchronized PricePoint getPricePoint(int idx) { ret idx >= 0 && idx < size() ? new PricePoint(timestamps.get(idx), prices.get(idx)) : null; } PricePoint firstPricePoint() { ret getPricePoint(0); } synchronized PricePoint lastPricePoint() { ret getPricePoint(size()-1); } L roundedCellNumbers() { ret map iround(cellNumbers()); } L cellNumbers() { if (priceCells == null) null; int n = size(); DoubleBuffer buf = new(n); for i to n: buf.add(priceCells.toCellNumber(getPrice(i))); ret buf.asVirtualList(); } TickerSequence toCellNumbers() { if (priceCells == null) null; new TickerSequence seq; seq.market(market + " cell numbers " + roundBracket(priceCells)); int n = size(); for i to n: seq.add(priceCells.toCellNumber(getPrice(i)), getTimestamp(i)); ret seq.trimToSize(); } void dropLastPricePoint() { if (isEmpty()) ret; prices.popLast(); timestamps.popLast(); lastTickerEvent = endTime(); newTickerEvent(); } selfType marketIfEmpty(S market) { if (empty(this.market)) market(market); this; } selfType legacyCompact() { if (!prices instanceof SynchronizedFloatBufferPresentingAsDoubles) prices = new SynchronizedFloatBufferPresentingAsDoubles(prices.toArray()); this; } // msUnit = how many milliseconds to group into one bool compactTimestamps(long msUnit default 20) { if (!timestamps instanceof SynchronizedLongBufferStoredAsLinearInts) { timestamps = new SynchronizedLongBufferStoredAsLinearInts(roundDownTo(msUnit, startTime()), msUnit, timestamps); true; } false; } double averageTimeStep() { ret doubleRatio(endTime()-startTime(), size()-1); } public void write(ByteHead head) { head.writeString(market); SynchronizedLongBufferStoredAsLinearInts timestamps = cast this.timestamps; head.writeLong(timestamps.base); head.writeLong(timestamps.factor); int n = size(); head.writeInt(n); for i to n: { head.writeInt(timestamps.getRaw(i)); head.writeFloat((float) prices.get(i)); } } public void readWrite(ByteHead head) { if (head.readMode()) readImpl(head); if (head.writeMode()) write(head); } static TickerSequence read(ByteHead head) { new TickerSequence t; if (!t.readImpl(head)) null; ret t; } bool readImpl(ByteHead head) { market = head.readString(); if (head.eof()) false; long base = head.readLong(); long factor = head.readLong(); //printVars(+market, +base, +factor); var timestamps = new SynchronizedLongBufferStoredAsLinearInts (base, factor); this.timestamps = timestamps; int n = head.readInt(); for i to n: { int time = head.readInt(); if (head.eof()) break; timestamps.addRaw(time); prices.add(head.readFloat()); } trimToSize(); true; } selfType loadJSONFile(File f) { temp it = linesFromFile(f); for (line : it) pcall { addJSONLine(line); } ret trimToSize(); } synchronized void insertAt(int idx, TickerSequence ticker) { prices.insertAt(idx, ticker.prices.toArray()); timestamps.insertAt(idx, ticker.timestamps.toArray()); } public void integrityCheck { timestamps.integrityCheck(); prices.integrityCheck(); assertEquals(l(timestamps), l(prices)); } }