Libraryless. Click here for Pure Java version (17408L/100K).
1 | persistable sclass TickerSequence is ByteIO, IntSize, IntegrityCheckable { |
2 | settable S market; // e.g. "TRBUSDT" |
3 | |
4 | // in case the sequence is digitized |
5 | settable PriceCells priceCells; |
6 | |
7 | // are we live? |
8 | settable bool live; |
9 | |
10 | // Is the data incomplete (still loading)? |
11 | settable bool loading; |
12 | |
13 | ILongBuffer timestamps = new SynchronizedLongBuffer; |
14 | IDoubleBuffer prices = new SynchronizedFloatBufferPresentingAsDoubles; |
15 | |
16 | // timestamp of last ticker event even though price may not |
17 | // have changed |
18 | settable long lastTickerEvent; |
19 | |
20 | settable double epsilon = 1e-6; |
21 | |
22 | event pricePointAdded(long timestamp); |
23 | event newTickerEvent; |
24 | |
25 | *(S *market) {} |
26 | |
27 | void addIfPriceChanged(double price, Timestamp timestamp) { |
28 | addIfPriceChanged(price, toLong(timestamp)); |
29 | } |
30 | |
31 | synchronized void addIfPriceChanged(double price, long timestamp) { |
32 | if (isEmpty() || differentByEpsilonRatio(prices.last(), price, epsilon)) |
33 | add(price, timestamp); |
34 | |
35 | updateTickerEvent(timestamp); |
36 | } |
37 | |
38 | void updateTickerEvent(long timestamp) { |
39 | if (timestamp > lastTickerEvent) { |
40 | lastTickerEvent(timestamp); |
41 | newTickerEvent(); |
42 | } |
43 | } |
44 | |
45 | synchronized void add aka addPrice(double price, long timestamp) { |
46 | timestamps.add(timestamp); |
47 | prices.add(price); |
48 | pricePointAdded(timestamp); |
49 | updateTickerEvent(timestamp); |
50 | } |
51 | |
52 | synchronized void add(double[] prices, long[] timestamps) { |
53 | assertEquals(l(prices), l(timestamps)); |
54 | if (empty(prices)) ret; |
55 | |
56 | // TODO: call pricePointAdded? |
57 | this.prices.addAll(asList(prices)); |
58 | this.timestamps.addAll(asList(timestamps)); |
59 | lastTickerEvent = max(lastTickerEvent, endTime()); |
60 | } |
61 | |
62 | synchronized void add(TickerSequence ticker) { |
63 | add(ticker.prices.toArray(), ticker.timestamps.toArray()); |
64 | } |
65 | |
66 | synchronized toString { |
67 | ret commaCombine( |
68 | spaceCombine( |
69 | "Ticker", |
70 | market, |
71 | priceCells == null ?: roundBracket(priceCells), |
72 | ), |
73 | formatDays(duration()), |
74 | n2(l(prices), "price point"), |
75 | timeRange() |
76 | ); |
77 | } |
78 | |
79 | public synchronized TickerSequence clone() { |
80 | ret subSequence(0, size()); |
81 | } |
82 | |
83 | TickerSequence emptyClone() { |
84 | ret new TickerSequence().market(market); |
85 | } |
86 | |
87 | synchronized TickerSequence subSequence(int start, int end default size()) { |
88 | var s = emptyClone(); |
89 | s.add( |
90 | cloneSubDoubleArray(prices.toArray(), start, end), |
91 | cloneSubLongArray(timestamps.toArray(), start, end)); |
92 | ret s; |
93 | } |
94 | |
95 | synchronized TickerSequence cutOffAfterSeconds(double seconds) { |
96 | int idx = indexOfTimestamp(startTime()+secondsToMS(seconds)); |
97 | ret subSequence(0, idx); |
98 | } |
99 | |
100 | TickerSequence dropFirstDays(double days) { |
101 | ret subSequenceFromTimestamp(startTime()+daysToMS(days)); |
102 | } |
103 | |
104 | TickerSequence takeFirstDays(double days) { |
105 | ret cutOffAfterSeconds(daysToSeconds(days)); |
106 | } |
107 | |
108 | TickerSequence takeFirstHours(double days) { |
109 | ret cutOffAfterSeconds(hoursToSeconds(days)); |
110 | } |
111 | |
112 | TickerSequence takeLastDays(double days) { |
113 | ret subSequenceFromTimestamp(endTime()-daysToMS(days)); |
114 | } |
115 | |
116 | TickerSequence takeLastHours(double hours) { |
117 | ret takeLastDays(hoursToDays(hours)); |
118 | } |
119 | |
120 | TickerSequence takeLastMinutes(double minutes) { |
121 | ret takeLastDays(minutesToDays(minutes)); |
122 | } |
123 | |
124 | synchronized TickerSequence subSequenceByTimestamps(long from, long to) { |
125 | int idx1 = indexOfTimestamp(from); |
126 | int idx2 = indexOfTimestamp(to); |
127 | ret subSequence(idx1, idx2); |
128 | } |
129 | |
130 | synchronized TickerSequence subSequenceFromTimestamp(long from) { |
131 | ret subSequence(indexOfTimestamp(from)); |
132 | } |
133 | |
134 | synchronized TickerSequence subSequenceToTimestamp(long to) { |
135 | ret subSequence(0, indexOfTimestamp(to)); |
136 | } |
137 | |
138 | // TODO: shouldn't we subtract one here? |
139 | synchronized int indexOfTimestamp(double ts) { |
140 | ret binarySearch_insertionPoint(timestamps.asVirtualList(), lround(ts)); |
141 | } |
142 | |
143 | synchronized int indexOfTimestampAtOrBefore(double ts) { |
144 | ret binarySearch_tipToTheLeft(timestamps.asVirtualList(), lround(ts)); |
145 | } |
146 | |
147 | synchronized long nextTimestamp(double ts) { |
148 | ret getTimestamp(indexOfTimestamp(ts)+1); |
149 | } |
150 | |
151 | synchronized double priceAtTimestamp(double ts) { |
152 | ret getPrice(indexOfTimestampAtOrBefore(ts)); |
153 | } |
154 | |
155 | public int size() { ret l(prices); } |
156 | public bool isEmpty() { ret size() == 0; } |
157 | |
158 | synchronized TimestampRange timeRange() { |
159 | ret isEmpty() ? null : TimestampRange(startTime(), endTime()); |
160 | } |
161 | |
162 | Duration duration() { |
163 | ret main duration(timeRange()); |
164 | } |
165 | |
166 | synchronized long startTime() { ret empty(timestamps) ? 0 : first(timestamps); } |
167 | synchronized long endTime() { |
168 | ret max(lastTickerEvent, |
169 | empty(timestamps) ? 0 : last(timestamps)); |
170 | } |
171 | |
172 | synchronized double getPrice(int i) { |
173 | ret isEmpty() ? Double.NaN : prices.get(clampToLength(size(), i)); |
174 | } |
175 | |
176 | synchronized long getTimestamp(int i) { |
177 | ret isEmpty() ? 0 : timestamps.get(clampToLength(size(), i)); |
178 | } |
179 | |
180 | synchronized TickerSequence removePlateaus() { |
181 | var seq = emptyClone(); |
182 | int n = size(); |
183 | |
184 | for i to n: { |
185 | var value = getPrice(i); |
186 | seq.prices.add(value); |
187 | seq.timestamps.add(getTimestamp(i)); |
188 | |
189 | while (i+1 < n && getPrice(i+1) == value) ++i; |
190 | } |
191 | |
192 | ret seq.trimToSize(); |
193 | } |
194 | |
195 | synchronized TickerSequence removeZeros() { |
196 | if (!prices.contains(0)) this; |
197 | var seq = emptyClone(); |
198 | int n = size(); |
199 | for i to n: { |
200 | var value = getPrice(i); |
201 | if (value != 0) { |
202 | seq.prices.add(value); |
203 | seq.timestamps.add(getTimestamp(i)); |
204 | } |
205 | } |
206 | ret seq.trimToSize(); |
207 | } |
208 | |
209 | synchronized TickerSequence alignTimestamps(int interval) { |
210 | var seq = emptyClone(); |
211 | int n = size(); |
212 | |
213 | for i to n: { |
214 | var value = getPrice(i); |
215 | long time = roundDownTo(interval, getTimestamp(i)); |
216 | if (time != seq.endTime()) |
217 | seq.addIfPriceChanged(value, time); |
218 | } |
219 | |
220 | ret seq.trimToSize(); |
221 | } |
222 | |
223 | selfType trimToSize() { |
224 | prices.trimToSize(); |
225 | timestamps.trimToSize(); |
226 | this; |
227 | } |
228 | |
229 | double minPrice() { ret doubleMin(prices.toArray()); } |
230 | double maxPrice() { ret doubleMax(prices.toArray()); } |
231 | DoubleRange priceRange() { ret doubleRange(minPrice(), maxPrice()); } |
232 | |
233 | synchronized double firstPrice() { ret empty(prices) ? Double.NaN : prices.first(); } |
234 | synchronized double lastPrice() { ret empty(prices) ? Double.NaN : prices.last(); } |
235 | |
236 | synchronized TickerPoint lastPoint() { |
237 | ret new TickerPoint(this, endTime()); |
238 | } |
239 | |
240 | static Pattern reBestAsk = regexp("\"bestAsk\":([0-9\\.]+)"); |
241 | static Pattern reSystemTime = regexp("\"systemTime\":(\\d+)"); |
242 | |
243 | // parse line from a .ticker file |
244 | synchronized void addJSONLine_fast(S line) { |
245 | long time = toLong(regexpFirstGroup(reSystemTime, line)); |
246 | if (time > lastTickerEvent) { |
247 | double price = toDouble(regexpFirstGroup(reBestAsk, line)); |
248 | if (price != 0) |
249 | addIfPriceChanged(price, time); |
250 | } |
251 | } |
252 | |
253 | swappable void addJSONLine(S line) { |
254 | addJSONLine_fast(line); |
255 | } |
256 | |
257 | synchronized void addJSONLine_slow(S line) { |
258 | Map data = parseJSONMap(line); |
259 | double price = toDouble(data.get("bestAsk")); |
260 | long time = toLong(data.get("systemTime")); |
261 | if (time > lastTickerEvent) |
262 | addIfPriceChanged(price, time); |
263 | } |
264 | |
265 | UpDownSequence toUpDownSequence aka upDownSequence(PriceCells cells default priceCells) { |
266 | if (cells == null) null; |
267 | var dig = digitize(new PriceDigitizer(cells)); |
268 | L<Int> cellNumbers = dig.roundedCellNumbers(); |
269 | ret UpDownSequence.fromInts(cellNumbers); |
270 | } |
271 | |
272 | TickerSequence digitizeToPercent(double basePrice default firstPrice(), double cellSizeInPercent) { |
273 | if (isEmpty()) this; |
274 | ret digitize(geometricPriceDigitizer(basePrice, cellSizeInPercent)); |
275 | } |
276 | |
277 | TickerSequence digitize(PriceDigitizer digitizer) { |
278 | var seq = emptyClone(); |
279 | seq.priceCells(digitizer.priceCells()); |
280 | int n = size(); |
281 | |
282 | for i to n: { |
283 | seq.addIfPriceChanged(digitizer.digitize(getPrice(i)), getTimestamp(i)); |
284 | } |
285 | |
286 | ret seq.trimToSize(); |
287 | } |
288 | |
289 | L<PricePoint> pricePoints() { |
290 | ret listFromFunction getPricePoint(size()); |
291 | } |
292 | |
293 | synchronized PricePoint getPricePoint(int idx) { |
294 | ret idx >= 0 && idx < size() ? new PricePoint(timestamps.get(idx), prices.get(idx)) : null; |
295 | } |
296 | |
297 | PricePoint firstPricePoint() { ret getPricePoint(0); } |
298 | synchronized PricePoint lastPricePoint() { ret getPricePoint(size()-1); } |
299 | |
300 | L<Int> roundedCellNumbers() { |
301 | ret map iround(cellNumbers()); |
302 | } |
303 | |
304 | L<Double> cellNumbers() { |
305 | if (priceCells == null) null; |
306 | int n = size(); |
307 | DoubleBuffer buf = new(n); |
308 | for i to n: |
309 | buf.add(priceCells.toCellNumber(getPrice(i))); |
310 | ret buf.asVirtualList(); |
311 | } |
312 | |
313 | TickerSequence toCellNumbers() { |
314 | if (priceCells == null) null; |
315 | new TickerSequence seq; |
316 | seq.market(market + " cell numbers " + roundBracket(priceCells)); |
317 | int n = size(); |
318 | |
319 | for i to n: |
320 | seq.add(priceCells.toCellNumber(getPrice(i)), getTimestamp(i)); |
321 | |
322 | ret seq.trimToSize(); |
323 | } |
324 | |
325 | void dropLastPricePoint() { |
326 | if (isEmpty()) ret; |
327 | prices.popLast(); |
328 | timestamps.popLast(); |
329 | lastTickerEvent = endTime(); |
330 | newTickerEvent(); |
331 | } |
332 | |
333 | selfType marketIfEmpty(S market) { |
334 | if (empty(this.market)) market(market); |
335 | this; |
336 | } |
337 | |
338 | selfType legacyCompact() { |
339 | if (!prices instanceof SynchronizedFloatBufferPresentingAsDoubles) |
340 | prices = new SynchronizedFloatBufferPresentingAsDoubles(prices.toArray()); |
341 | this; |
342 | } |
343 | |
344 | // msUnit = how many milliseconds to group into one |
345 | bool compactTimestamps(long msUnit default 20) { |
346 | if (!timestamps instanceof SynchronizedLongBufferStoredAsLinearInts) { |
347 | timestamps = new SynchronizedLongBufferStoredAsLinearInts(roundDownTo(msUnit, startTime()), msUnit, timestamps); |
348 | true; |
349 | } |
350 | false; |
351 | } |
352 | |
353 | double averageTimeStep() { |
354 | ret doubleRatio(endTime()-startTime(), size()-1); |
355 | } |
356 | |
357 | public void write(ByteHead head) { |
358 | head.writeString(market); |
359 | SynchronizedLongBufferStoredAsLinearInts timestamps = cast this.timestamps; |
360 | head.writeLong(timestamps.base); |
361 | head.writeLong(timestamps.factor); |
362 | |
363 | int n = size(); |
364 | head.writeInt(n); |
365 | |
366 | for i to n: { |
367 | head.writeInt(timestamps.getRaw(i)); |
368 | head.writeFloat((float) prices.get(i)); |
369 | } |
370 | } |
371 | |
372 | public void readWrite(ByteHead head) { |
373 | if (head.readMode()) readImpl(head); |
374 | if (head.writeMode()) write(head); |
375 | } |
376 | |
377 | static TickerSequence read(ByteHead head) { |
378 | new TickerSequence t; |
379 | if (!t.readImpl(head)) null; |
380 | ret t; |
381 | } |
382 | |
383 | bool readImpl(ByteHead head) { |
384 | market = head.readString(); |
385 | if (head.eof()) false; |
386 | long base = head.readLong(); |
387 | long factor = head.readLong(); |
388 | //printVars(+market, +base, +factor); |
389 | var timestamps = new SynchronizedLongBufferStoredAsLinearInts (base, factor); |
390 | this.timestamps = timestamps; |
391 | |
392 | int n = head.readInt(); |
393 | for i to n: { |
394 | int time = head.readInt(); |
395 | if (head.eof()) break; |
396 | timestamps.addRaw(time); |
397 | prices.add(head.readFloat()); |
398 | } |
399 | trimToSize(); |
400 | true; |
401 | } |
402 | |
403 | selfType loadJSONFile(File f) { |
404 | temp it = linesFromFile(f); |
405 | for (line : it) |
406 | pcall { addJSONLine(line); } |
407 | ret trimToSize(); |
408 | } |
409 | |
410 | synchronized void insertAt(int idx, TickerSequence ticker) { |
411 | prices.insertAt(idx, ticker.prices.toArray()); |
412 | timestamps.insertAt(idx, ticker.timestamps.toArray()); |
413 | } |
414 | |
415 | public void integrityCheck { |
416 | timestamps.integrityCheck(); |
417 | prices.integrityCheck(); |
418 | assertEquals(l(timestamps), l(prices)); |
419 | } |
420 | } |
download show line numbers debug dex old transpilations
Travelled to 3 computer(s): elmgxqgtpvxh, mqqgnosmbjvj, wnsclhtenguj
No comments. add comment
Snippet ID: | #1036138 |
Snippet name: | TickerSequence - sequence of ticker values (prices) with timestamps |
Eternal ID of this version: | #1036138/146 |
Text MD5: | 46d3102bae67eb0058d5ae032e044093 |
Transpilation MD5: | 82d58746b11690bdaf559c485ec9a4f4 |
Author: | stefan |
Category: | javax |
Type: | JavaX fragment (include) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2023-03-22 19:59:30 |
Source code size: | 11990 bytes / 420 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 530 / 1389 |
Version history: | 145 change(s) |
Referenced in: | [show references] |