persistable sclass TickerSequence is IntSize { settable S market; // e.g. "TRBUSDT" // in case the sequence is digitized settable PriceCells priceCells; // are we live? settable bool live; // Is the data incomplete (still loading)? settable bool loading; ILongBuffer timestamps = new SynchronizedLongBuffer; IDoubleBuffer prices = new SynchronizedFloatBufferPresentingAsDoubles; // timestamp of last ticker event even though price may not // have changed settable long lastTickerEvent; event pricePointAdded(long timestamp); synchronized void addIfPriceChanged(double price, long timestamp) { lastTickerEvent = max(lastTickerEvent, timestamp); if (isEmpty() || last(prices) != price) add(price, timestamp); } synchronized void add aka addPrice(double price, long timestamp) { lastTickerEvent = max(lastTickerEvent, timestamp); timestamps.add(timestamp); prices.add(price); pricePointAdded(timestamp); } synchronized void add(double[] prices, long[] timestamps) { assertEquals(l(prices), l(timestamps)); if (empty(prices)) ret; // TODO: call pricePointAdded? this.prices.addAll(asList(prices)); this.timestamps.addAll(asList(timestamps)); lastTickerEvent = max(lastTickerEvent, endTime()); } synchronized void add(TickerSequence ticker) { add(ticker.prices.toArray(), ticker.timestamps.toArray()); } synchronized toString { ret commaCombine( spaceCombine( "Ticker", market, priceCells == null ?: roundBracket(priceCells), ), formatDays(duration()), n2(l(prices), "price point"), timeRange() ); } synchronized TickerSequence subSequence(int start, int end default size()) { new TickerSequence s; s.market(market); s.add( subDoubleArray(prices.toArray(), start, end), subArray(timestamps.toArray(), start, end)); ret s; } synchronized TickerSequence cutOffAfterSeconds(double seconds) { int idx = indexOfTimestamp(startTime()+secondsToMS(seconds)); ret subSequence(0, idx); } TickerSequence takeFirstDays(double days) { ret cutOffAfterSeconds(daysToSeconds(days)); } TickerSequence takeLastDays(double days) { int idx = indexOfTimestamp(endTime()-daysToMS(days)); ret subSequence(idx); } synchronized TickerSequence subSequenceByTimestamps(long from, long to) { int idx1 = indexOfTimestamp(from); int idx2 = indexOfTimestamp(to); ret subSequence(idx1, idx2); } synchronized TickerSequence subSequenceFromTimestamp(long from) { ret subSequence(indexOfTimestamp(from)); } synchronized int indexOfTimestamp(double ts) { ret binarySearch_insertionPoint(timestamps.asVirtualList(), lround(ts)); } synchronized long nextTimestamp(double ts) { ret getTimestamp(indexOfTimestamp(ts)+1); } synchronized double priceAtTimestamp(double ts) { ret getPrice(indexOfTimestamp(ts)); } public int size() { ret l(prices); } public bool isEmpty() { ret size() == 0; } synchronized TimestampRange timeRange() { ret isEmpty() ? null : TimestampRange(startTime(), endTime()); } Duration duration() { ret main duration(timeRange()); } synchronized long startTime() { ret empty(timestamps) ? 0 : first(timestamps); } synchronized long endTime() { ret empty(timestamps) ? 0 : last(timestamps); } synchronized double getPrice(int i) { ret prices.get(clampToLength(size(), i)); } synchronized long getTimestamp(int i) { ret timestamps.get(clampToLength(size(), i)); } synchronized TickerSequence removePlateaus() { new TickerSequence seq; seq.market(market); int n = size(); for i to n: { var value = getPrice(i); seq.prices.add(value); seq.timestamps.add(getTimestamp(i)); while (i+1 < n && getPrice(i+1) == value) ++i; } ret seq.trimToSize(); } synchronized TickerSequence alignTimestamps(int interval) { new TickerSequence seq; seq.market(market); int n = size(); for i to n: { var value = getPrice(i); long time = roundDownTo(interval, getTimestamp(i)); if (time != seq.endTime()) seq.addIfPriceChanged(value, time); } ret seq.trimToSize(); } selfType trimToSize() { prices.trimToSize(); timestamps.trimToSize(); this; } double minPrice() { ret doubleMin(prices.toArray()); } double maxPrice() { ret doubleMax(prices.toArray()); } synchronized double firstPrice() { ret empty(prices) ? Double.NaN : first(prices); } synchronized double lastPrice() { ret empty(prices) ? Double.NaN : last(prices); } synchronized TickerPoint lastPoint() { ret new TickerPoint(this, endTime()); } // parse line from a .ticker file synchronized void addJSONLine(S line) { Map data = parseJSONMap(line); double price = toDouble(data.get("bestAsk")); long time = toLong(data.get("systemTime")); addIfPriceChanged(price, time); } TickerSequence digitizeToPercent(double basePrice default firstPrice(), double cellSizeInPercent) { if (isEmpty()) this; ret digitize(geometricPriceDigitizer(basePrice, cellSizeInPercent)); } TickerSequence digitize(PriceDigitizer digitizer) { new TickerSequence seq; seq.market(market); seq.priceCells(digitizer.priceCells()); int n = size(); for i to n: { seq.addIfPriceChanged(digitizer.digitize(getPrice(i)), getTimestamp(i)); } ret seq.trimToSize(); } L pricePoints() { ret listFromFunction getPricePoint(size()); } PricePoint getPricePoint(int idx) { ret new PricePoint(timestamps.get(idx), prices.get(idx)); } L roundedCellNumbers() { ret map iround(cellNumbers()); } L cellNumbers() { if (priceCells == null) null; int n = size(); DoubleBuffer buf = new(n); for i to n: buf.add(priceCells.toCellNumber(getPrice(i))); ret buf.asVirtualList(); } TickerSequence toCellNumbers() { if (priceCells == null) null; new TickerSequence seq; seq.market(market + " cell numbers " + roundBracket(priceCells)); int n = size(); for i to n: seq.add(priceCells.toCellNumber(getPrice(i)), getTimestamp(i)); ret seq.trimToSize(); } void dropLastPricePoint() { if (isEmpty()) ret; prices.popLast(); timestamps.popLast(); lastTickerEvent = endTime(); } selfType marketIfEmpty(S market) { if (empty(this.market)) market(market); this; } selfType legacyCompact() { if (!prices instanceof SynchronizedFloatBufferPresentingAsDoubles) prices = new SynchronizedFloatBufferPresentingAsDoubles(prices.toArray()); this; } // msUnit = how many milliseconds to group into one bool compactTimestamps(long msUnit default 20) { if (timestamps instanceof SynchronizedLongBuffer) { timestamps = new SynchronizedLongBufferStoredAsLinearInts(roundDownTo(msUnit, startTime()), msUnit, timestamps); true; } false; } double averageTimeStep() { ret doubleRatio(endTime()-startTime(), size()-1); } public void readRaw(ByteHead head) { market = head.readString(); long timeBase = head.readLong(); long timeStep = head.readLong(); var timestamps = new SynchronizedLongBufferStoredAsLinearInts(timeBase, timeStep); this.timestamps = timestamps; prices = new SynchronizedFloatBufferPresentingAsDoubles; int length = head.readInt(); // number of entries, -1 = indefinite if (length < 0) length = Int.MAX_VALUE; for i to length: { int time = head.readInt(); if (head.eof()) break; // We only notice an EOF after we tried to read the first non-existant byte timestamps.addRaw(time); prices.add(head.readFloat()); } } }