Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

420
LINES

< > BotCompany Repo | #1036138 // TickerSequence - sequence of ticker values (prices) with timestamps

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (17408L/100K).

persistable sclass TickerSequence is ByteIO, IntSize, IntegrityCheckable  {
  settable S market; // e.g. "TRBUSDT"
  
  // in case the sequence is digitized
  settable PriceCells priceCells;
  
  // are we live?
  settable bool live;
  
  // Is the data incomplete (still loading)?
  settable bool loading;
  
  ILongBuffer timestamps = new SynchronizedLongBuffer;
  IDoubleBuffer prices = new SynchronizedFloatBufferPresentingAsDoubles;
  
  // timestamp of last ticker event even though price may not
  // have changed
  settable long lastTickerEvent;
  
  settable double epsilon = 1e-6;
  
  event pricePointAdded(long timestamp);
  event newTickerEvent;
  
  *(S *market) {}
  
  void addIfPriceChanged(double price, Timestamp timestamp) {
    addIfPriceChanged(price, toLong(timestamp));
  }
  
  synchronized void addIfPriceChanged(double price, long timestamp) {
    if (isEmpty() || differentByEpsilonRatio(prices.last(), price, epsilon))
      add(price, timestamp);
      
    updateTickerEvent(timestamp);
  }
  
  void updateTickerEvent(long timestamp) {
    if (timestamp > lastTickerEvent) {
      lastTickerEvent(timestamp);
      newTickerEvent();
    }
  }
  
  synchronized void add aka addPrice(double price, long timestamp) {
    timestamps.add(timestamp);
    prices.add(price);
    pricePointAdded(timestamp);
    updateTickerEvent(timestamp);
  }
  
  synchronized void add(double[] prices, long[] timestamps) {
    assertEquals(l(prices), l(timestamps));
    if (empty(prices)) ret;
    
    // TODO: call pricePointAdded?
    this.prices.addAll(asList(prices));
    this.timestamps.addAll(asList(timestamps));
    lastTickerEvent = max(lastTickerEvent, endTime());
  }
  
  synchronized void add(TickerSequence ticker) {
    add(ticker.prices.toArray(), ticker.timestamps.toArray());
  }
  
  synchronized toString {
    ret commaCombine(
      spaceCombine(
        "Ticker",
        market,
        priceCells == null ?: roundBracket(priceCells),
      ),
      formatDays(duration()),
      n2(l(prices), "price point"),
      timeRange()
    );
  }
  
  public synchronized TickerSequence clone() {
    ret subSequence(0, size());
  }
  
  TickerSequence emptyClone() {
    ret new TickerSequence().market(market);
  }
  
  synchronized TickerSequence subSequence(int start, int end default size()) {
    var s = emptyClone();
    s.add(
      cloneSubDoubleArray(prices.toArray(), start, end),
      cloneSubLongArray(timestamps.toArray(), start, end));
    ret s;
  }
  
  synchronized TickerSequence cutOffAfterSeconds(double seconds) {
    int idx = indexOfTimestamp(startTime()+secondsToMS(seconds));
    ret subSequence(0, idx);
  }

  TickerSequence dropFirstDays(double days) {
    ret subSequenceFromTimestamp(startTime()+daysToMS(days));
  }

  TickerSequence takeFirstDays(double days) {
    ret cutOffAfterSeconds(daysToSeconds(days));
  }

  TickerSequence takeFirstHours(double days) {
    ret cutOffAfterSeconds(hoursToSeconds(days));
  }

  TickerSequence takeLastDays(double days) {
    ret subSequenceFromTimestamp(endTime()-daysToMS(days));
  }

  TickerSequence takeLastHours(double hours) {
    ret takeLastDays(hoursToDays(hours));
  }

  TickerSequence takeLastMinutes(double minutes) {
    ret takeLastDays(minutesToDays(minutes));
  }

  synchronized TickerSequence subSequenceByTimestamps(long from, long to) {
    int idx1 = indexOfTimestamp(from);
    int idx2 = indexOfTimestamp(to);
    ret subSequence(idx1, idx2);
  }
  
  synchronized TickerSequence subSequenceFromTimestamp(long from) {
    ret subSequence(indexOfTimestamp(from));
  }
  
  synchronized TickerSequence subSequenceToTimestamp(long to) {
    ret subSequence(0, indexOfTimestamp(to));
  }
  
  // TODO: shouldn't we subtract one here?
  synchronized int indexOfTimestamp(double ts) {
    ret binarySearch_insertionPoint(timestamps.asVirtualList(), lround(ts));
  }
  
  synchronized int indexOfTimestampAtOrBefore(double ts) {
    ret binarySearch_tipToTheLeft(timestamps.asVirtualList(), lround(ts));
  }
  
  synchronized long nextTimestamp(double ts) {
    ret getTimestamp(indexOfTimestamp(ts)+1);
  }
  
  synchronized double priceAtTimestamp(double ts) {
    ret getPrice(indexOfTimestampAtOrBefore(ts));
  }

  public int size() { ret l(prices); }
  public bool isEmpty() { ret size() == 0; }
  
  synchronized TimestampRange timeRange() {
    ret isEmpty() ? null : TimestampRange(startTime(), endTime());
  }
  
  Duration duration() {
    ret main duration(timeRange());
  }
  
  synchronized long startTime() { ret empty(timestamps) ? 0 : first(timestamps); }
  synchronized long endTime() {
    ret max(lastTickerEvent,
      empty(timestamps) ? 0 : last(timestamps));
  }
  
  synchronized double getPrice(int i) {
    ret isEmpty() ? Double.NaN : prices.get(clampToLength(size(), i));
  }
  
  synchronized long getTimestamp(int i) {
    ret isEmpty() ? 0 : timestamps.get(clampToLength(size(), i));
  }
  
  synchronized TickerSequence removePlateaus() {
    var seq = emptyClone();
    int n = size();
    
    for i to n: {
      var value = getPrice(i);
      seq.prices.add(value);
      seq.timestamps.add(getTimestamp(i));
      
      while (i+1 < n && getPrice(i+1) == value) ++i;
    }
    
    ret seq.trimToSize();
  }
  
  synchronized TickerSequence removeZeros() {
    if (!prices.contains(0)) this;
    var seq = emptyClone();
    int n = size();
    for i to n: {
      var value = getPrice(i);
      if (value != 0) {
        seq.prices.add(value);
        seq.timestamps.add(getTimestamp(i));
      }
    }
    ret seq.trimToSize();
  }
  
  synchronized TickerSequence alignTimestamps(int interval) {
    var seq = emptyClone();
    int n = size();
    
    for i to n: {
      var value = getPrice(i);
      long time = roundDownTo(interval, getTimestamp(i));
      if (time != seq.endTime())
        seq.addIfPriceChanged(value, time);
    }
      
    ret seq.trimToSize();
  }
  
  selfType trimToSize() {
    prices.trimToSize();
    timestamps.trimToSize();
    this;
  }
  
  double minPrice() { ret doubleMin(prices.toArray()); }
  double maxPrice() { ret doubleMax(prices.toArray()); }
  DoubleRange priceRange() { ret doubleRange(minPrice(), maxPrice()); }
  
  synchronized double firstPrice() { ret empty(prices) ? Double.NaN : prices.first(); }
  synchronized double lastPrice() { ret empty(prices) ? Double.NaN : prices.last(); }
  
  synchronized TickerPoint lastPoint() {
    ret new TickerPoint(this, endTime());
  }
  
  static Pattern reBestAsk = regexp("\"bestAsk\":([0-9\\.]+)");
  static Pattern reSystemTime = regexp("\"systemTime\":(\\d+)");
  
  // parse line from a .ticker file
  synchronized void addJSONLine_fast(S line) {
    long time = toLong(regexpFirstGroup(reSystemTime, line));
    if (time > lastTickerEvent) {
      double price = toDouble(regexpFirstGroup(reBestAsk, line));
      if (price != 0)
        addIfPriceChanged(price, time);
    }
  }
  
  swappable void addJSONLine(S line) {
    addJSONLine_fast(line);
  }
  
  synchronized void addJSONLine_slow(S line) {
    Map data = parseJSONMap(line);
    double price = toDouble(data.get("bestAsk"));
    long time = toLong(data.get("systemTime"));
    if (time > lastTickerEvent) 
      addIfPriceChanged(price, time);
  }
  
  UpDownSequence toUpDownSequence aka upDownSequence(PriceCells cells default priceCells) {
    if (cells == null) null;
    var dig = digitize(new PriceDigitizer(cells));
    L<Int> cellNumbers = dig.roundedCellNumbers();
    ret UpDownSequence.fromInts(cellNumbers);
  }
  
  TickerSequence digitizeToPercent(double basePrice default firstPrice(), double cellSizeInPercent) {
    if (isEmpty()) this;
    ret digitize(geometricPriceDigitizer(basePrice, cellSizeInPercent));
  }
  
  TickerSequence digitize(PriceDigitizer digitizer) {
    var seq = emptyClone();
    seq.priceCells(digitizer.priceCells());
    int n = size();
    
    for i to n: {
      seq.addIfPriceChanged(digitizer.digitize(getPrice(i)), getTimestamp(i));
    }
    
    ret seq.trimToSize();
  }
  
  L<PricePoint> pricePoints() {
    ret listFromFunction getPricePoint(size());
  }
  
  synchronized PricePoint getPricePoint(int idx) {
    ret idx >= 0 && idx < size() ? new PricePoint(timestamps.get(idx), prices.get(idx)) : null;
  }
  
  PricePoint firstPricePoint() { ret getPricePoint(0); }
  synchronized PricePoint lastPricePoint() { ret getPricePoint(size()-1); }
  
  L<Int> roundedCellNumbers() {
    ret map iround(cellNumbers());
  }
  
  L<Double> cellNumbers() {
    if (priceCells == null) null;
    int n = size();
    DoubleBuffer buf = new(n);
    for i to n:
      buf.add(priceCells.toCellNumber(getPrice(i)));
    ret buf.asVirtualList();
  }
  
  TickerSequence toCellNumbers() {
    if (priceCells == null) null;
    new TickerSequence seq;
    seq.market(market + " cell numbers " + roundBracket(priceCells));
    int n = size();
    
    for i to n:
      seq.add(priceCells.toCellNumber(getPrice(i)), getTimestamp(i));

    ret seq.trimToSize();
  }
  
  void dropLastPricePoint() {
    if (isEmpty()) ret;
    prices.popLast();
    timestamps.popLast();
    lastTickerEvent = endTime();
    newTickerEvent();
  }
  
  selfType marketIfEmpty(S market) {
    if (empty(this.market)) market(market);
    this;
  }
  
  selfType legacyCompact() {
    if (!prices instanceof SynchronizedFloatBufferPresentingAsDoubles)
      prices = new SynchronizedFloatBufferPresentingAsDoubles(prices.toArray());
    this;
  }
  
  // msUnit = how many milliseconds to group into one
  bool compactTimestamps(long msUnit default 20) {
    if (!timestamps instanceof SynchronizedLongBufferStoredAsLinearInts) {
      timestamps = new SynchronizedLongBufferStoredAsLinearInts(roundDownTo(msUnit, startTime()), msUnit, timestamps);
      true;
    }
    false;
  }
  
  double averageTimeStep() {
    ret doubleRatio(endTime()-startTime(), size()-1);
  }
  
  public void write(ByteHead head) {
    head.writeString(market);
    SynchronizedLongBufferStoredAsLinearInts timestamps = cast this.timestamps;
    head.writeLong(timestamps.base);
    head.writeLong(timestamps.factor);
    
    int n = size();
    head.writeInt(n);

    for i to n: {
      head.writeInt(timestamps.getRaw(i));
      head.writeFloat((float) prices.get(i));
    }
  }
  
  public void readWrite(ByteHead head) {
    if (head.readMode()) readImpl(head);
    if (head.writeMode()) write(head);
  }
  
  static TickerSequence read(ByteHead head) {
    new TickerSequence t;
    if (!t.readImpl(head)) null;
    ret t;
  }
  
  bool readImpl(ByteHead head) {
    market = head.readString();
    if (head.eof()) false;
    long base = head.readLong();
    long factor = head.readLong();
    //printVars(+market, +base, +factor);
    var timestamps = new SynchronizedLongBufferStoredAsLinearInts (base, factor);
    this.timestamps = timestamps;

    int n = head.readInt();
    for i to n: {
      int time = head.readInt();
      if (head.eof()) break;
      timestamps.addRaw(time);
      prices.add(head.readFloat());
    }
    trimToSize();
    true;
  }
  
  selfType loadJSONFile(File f) {
    temp it = linesFromFile(f);
    for (line : it)
      pcall { addJSONLine(line); }
    ret trimToSize();
  }
  
  synchronized void insertAt(int idx, TickerSequence ticker) {
    prices.insertAt(idx, ticker.prices.toArray());
    timestamps.insertAt(idx, ticker.timestamps.toArray());
  }

  public void integrityCheck {
    timestamps.integrityCheck();
    prices.integrityCheck();
    assertEquals(l(timestamps), l(prices));
  }
}

download  show line numbers  debug dex  old transpilations   

Travelled to 3 computer(s): elmgxqgtpvxh, mqqgnosmbjvj, wnsclhtenguj

No comments. add comment

Snippet ID: #1036138
Snippet name: TickerSequence - sequence of ticker values (prices) with timestamps
Eternal ID of this version: #1036138/146
Text MD5: 46d3102bae67eb0058d5ae032e044093
Transpilation MD5: 82d58746b11690bdaf559c485ec9a4f4
Author: stefan
Category: javax
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2023-03-22 19:59:30
Source code size: 11990 bytes / 420 lines
Pitched / IR pitched: No / No
Views / Downloads: 463 / 1298
Version history: 145 change(s)
Referenced in: [show references]