Download Jar. Libraryless. Click here for Pure Java version (13750L/82K).
1 | !7 |
2 | |
3 | // TODO: we are iterating over all channels and all sockest |
4 | // 100 times per second - this seems excessive |
5 | static int updateInterval = 10; |
6 | static long timeout = 30000+10000; // idle time + latency |
7 | static int port = 6000; |
8 | static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this |
9 | |
10 | import java.nio.*; |
11 | import java.nio.ByteBuffer; |
12 | import java.nio.channels.*; |
13 | |
14 | static ConnectedInstances cI; |
15 | static ReliableSingleThread rst = new(r update); |
16 | static int theNumber; |
17 | static Map<SocketChannel, State> dataMapper = synchroMap(); |
18 | static ServerSocketChannel serverChannel; |
19 | static Selector selector; |
20 | static Map<S, Channel> channels = synchroMap(); |
21 | static Lock channelsLock = lock(); |
22 | static java.util.Timer mainTimer; |
23 | sbool debug; |
24 | |
25 | // state of a client connection |
26 | srecord noeq State(SocketChannel socket) { |
27 | S socketID = aGlobalID(), computerID; |
28 | int countSeen = -1; |
29 | new LineBuffer lineBuffer; |
30 | long lastMessage = sysNow(); |
31 | Map<S, Channel> subbedChannels = synchroLinkedHashMap(); |
32 | Lock lock = lock(); |
33 | |
34 | // TODO: clear eventually to avoid stalling attacks |
35 | L<byte[]> toSend = synchroLinkedList(); |
36 | } |
37 | |
38 | // a virtual "channel" that clients can "subscribe" to |
39 | // (what that means differs between channels) |
40 | // Each channel is identified by a unique name |
41 | // A is a custom data type for each subscriber |
42 | sclass Channel<A> { |
43 | S name; |
44 | Set<SocketChannel> sockets = synchroSet(); |
45 | Map<SocketChannel, A> dataMapper = synchroMap(); |
46 | Lock lock = lock(); |
47 | |
48 | event subCountChanged; |
49 | |
50 | // add a subscriber |
51 | void addSocket(SocketChannel socket) { |
52 | lock lock; |
53 | sockets.add(socket); |
54 | subCountChanged(); |
55 | initialMessage(socket); |
56 | } |
57 | |
58 | // remove a subscriber |
59 | void removeSocket(SocketChannel socket) { |
60 | lock lock; |
61 | sockets.remove(socket); |
62 | subCountChanged(); |
63 | } |
64 | |
65 | int subCount() { ret l(sockets); } |
66 | |
67 | void sendToSubscribers(S line) { |
68 | for (SocketChannel socket : cloneList(sockets)) |
69 | sendLineToSocket(socket, line); |
70 | } |
71 | |
72 | void update() {} |
73 | void initialMessage(SocketChannel socket) {} |
74 | void handleLine(S line, SocketChannel socket) {} |
75 | } |
76 | |
77 | // channel listing the currently logged in computer IDs |
78 | // TODO: optimize the protocol to send diffs only |
79 | sclass ComputerIDsChannel extends Channel<O> { |
80 | L<S> value; |
81 | |
82 | void initialMessage(SocketChannel socket) { |
83 | sendLineToSocket(socket, stateMsg()); |
84 | } |
85 | |
86 | void update() { |
87 | if (set_trueIfChanged(this, value := asList(cI.set()))) |
88 | sendToSubscribers(stateMsg()); |
89 | } |
90 | |
91 | S stateMsg() { |
92 | ret "ComputerIDs = " + struct(value); |
93 | } |
94 | } |
95 | |
96 | // channel listing the current count of logged in computers |
97 | sclass ComputerCountChannel extends Channel<O> { |
98 | int value; |
99 | |
100 | void initialMessage(SocketChannel socket) { |
101 | sendLineToSocket(socket, stateMsg()); |
102 | } |
103 | |
104 | void update() { |
105 | if (set_trueIfChanged(this, value := cI.value())) |
106 | sendToSubscribers(stateMsg()); |
107 | } |
108 | |
109 | S stateMsg() { |
110 | ret "ComputerCount = " + value; |
111 | } |
112 | } |
113 | |
114 | // a public chat for broadcast messages between subscribers |
115 | sclass PublicChatChannel extends Channel<O> { |
116 | L<S> msgs = synchroLinkedList(); |
117 | |
118 | void handleLine(S line, SocketChannel socket) { |
119 | postMessage(unquote(line)); |
120 | } |
121 | |
122 | void postMessage(S msg) { |
123 | msgs.add(msg); |
124 | } |
125 | |
126 | void update { |
127 | S msg; |
128 | while ((msg = syncPopFirst(msgs)) != null) { |
129 | S quoted = quote(msg); |
130 | //print("Chat send: " + shorten(quoted, 20)); |
131 | quoted = name + ":" + quoted; |
132 | for (SocketChannel s : sockets) |
133 | sendLineToSocket(s, quoted); |
134 | } |
135 | } |
136 | } |
137 | |
138 | // a chat for DMs (messages from one computer to another) |
139 | // (is it private though? probably not as we don't check if |
140 | // the computer IDs sent to the server are correct) |
141 | sclass PrivateChatChannel extends Channel<O> { |
142 | S prefix = ""; |
143 | new MultiMap<S, SocketChannel> clients; // key: computer ID |
144 | |
145 | void initialMessage(SocketChannel socket) { |
146 | clients.put(computerID(socket), socket); |
147 | } |
148 | |
149 | void removeSocket(SocketChannel socket) { |
150 | super.removeSocket(socket); |
151 | clients.remove(computerID(socket), socket); |
152 | } |
153 | |
154 | void handleLine(S line, SocketChannel socket) { |
155 | pcall { |
156 | int i = indexOf(line, ' '); |
157 | S computerID = takeFirst(line, i); |
158 | while licensed { |
159 | SocketChannel socket2 = clients.getFirst(computerID); |
160 | if (socket2 == null) break; |
161 | try { |
162 | S sender = computerID(socket); |
163 | if (l(line) >= 10000) |
164 | print("Forwarding line from " + sender + " to " + computerID + ": " + l(line)); |
165 | sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1)); |
166 | break; |
167 | } catch e { |
168 | closeSocket(socket2); |
169 | clients.remove(computerID, socket2); |
170 | } |
171 | } |
172 | } |
173 | } |
174 | } |
175 | |
176 | // channel distributing snippet updates (edits, compiles etc) |
177 | sclass SnippetUpdatesChannel extends Channel<O> { |
178 | ManualTailFile tail; |
179 | |
180 | *() { |
181 | tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"), |
182 | vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) }))); |
183 | } |
184 | |
185 | void update { |
186 | tail.check(); |
187 | } |
188 | |
189 | void sendLine(S line) { |
190 | print("SnippetUpdatesChannel send: " + line); |
191 | line = assertNoLineBreaks(line); |
192 | print("Sockets: " + l(sockets)); |
193 | //S quoted = quote(line); |
194 | for (SocketChannel s : sockets) |
195 | sendLineToSocket(s, "snippetUpdates:" + line); |
196 | } |
197 | } |
198 | |
199 | // channel broadcast its current subscriber count |
200 | sclass PresenceCountingChannel extends Channel<Void> { |
201 | *() { |
202 | onSubCountChanged(-> sendToSubscribers(stateMsg())); |
203 | } |
204 | |
205 | void initialMessage(SocketChannel socket) { |
206 | sendLineToSocket(socket, stateMsg()); |
207 | } |
208 | |
209 | S stateMsg() { |
210 | ret name + ":" + subCount(); |
211 | } |
212 | } |
213 | |
214 | static State state(SocketChannel socket) { |
215 | ret dataMapper.get(socket); |
216 | } |
217 | |
218 | sS computerID(SocketChannel socket) { |
219 | State state = state(socket); |
220 | ret state == null ? null : state.computerID; |
221 | } |
222 | |
223 | p { |
224 | cI = new ConnectedInstances; |
225 | cI.setAfterglowInterval(1000); |
226 | cI.connected2.onChange(rst); |
227 | cI.start(); |
228 | |
229 | openServerSocket(); |
230 | |
231 | // restart socket every hour (and once now for testing - XX) |
232 | /*doEvery(10.0, 60*60.0, r { |
233 | pcall { cleanMeUp_socket(); } |
234 | openServerSocket(); |
235 | });*/ |
236 | } |
237 | |
238 | svoid openServerSocket ctex { |
239 | print("Opening server socket"); |
240 | selector = Selector.open(); |
241 | serverChannel = ServerSocketChannel.open(); |
242 | serverChannel.configureBlocking(false); |
243 | |
244 | InetSocketAddress sockAddr = new InetSocketAddress(port); |
245 | serverChannel.socket().bind(sockAddr); |
246 | print("Server socket bound"); |
247 | serverChannel.register(selector, SelectionKey.OP_ACCEPT); |
248 | |
249 | thread "Server Accept Loop" { acceptLoop(); } |
250 | |
251 | mainTimer = doEvery(updateInterval, r { |
252 | for (SocketChannel socket : keysList(dataMapper)) { |
253 | State state = state(socket); |
254 | if (state != null && sysTime() >= state.lastMessage+timeout) pcall { |
255 | closeSocket(socket); |
256 | } |
257 | } |
258 | for (Channel channel : values(channels)) pcall { |
259 | lock channel.lock; |
260 | channel.update(); |
261 | } |
262 | }); |
263 | } |
264 | |
265 | svoid cleanMeUp_socket() ctex { |
266 | if (serverChannel != null) |
267 | serverChannel.close(); |
268 | for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall { |
269 | socket.close(); |
270 | } |
271 | } |
272 | |
273 | svoid acceptLoop ctex { |
274 | while licensed { |
275 | // wait for events |
276 | selector.select(); |
277 | |
278 | Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); |
279 | while (keys.hasNext()) { |
280 | SelectionKey key = keys.next(); |
281 | keys.remove(); |
282 | debug "have key"; |
283 | |
284 | if (!key.isValid()) |
285 | continue with debug "key not valid"; |
286 | |
287 | pcall { |
288 | if (key.isAcceptable()) { |
289 | debug "Key is acceptable"; |
290 | acceptOnSocket(key); |
291 | } else if (key.isReadable()) { |
292 | debug "Key is readable"; |
293 | readFromSocket(key); |
294 | } else if (key.isWritable()) { |
295 | debug "Key is writable"; |
296 | writeToSocket(key); |
297 | } else |
298 | debug "Unknown key type"; |
299 | } |
300 | } |
301 | } |
302 | } |
303 | |
304 | //accept a connection made to this channel's socket |
305 | static void acceptOnSocket(SelectionKey key) throws IOException { |
306 | ServerSocketChannel serverChannel = cast key.channel(); |
307 | SocketChannel channel = serverChannel.accept(); |
308 | channel.configureBlocking(false); |
309 | Socket actualSocket = channel.socket(); |
310 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
311 | print("Connected to: " + remoteAddr); |
312 | |
313 | // register channel with selector for further IO |
314 | dataMapper.put(channel, new State(channel)); |
315 | channel.register(selector, SelectionKey.OP_READ); |
316 | } |
317 | |
318 | svoid onSocketClose(SocketChannel socket) { |
319 | Socket actualSocket = socket.socket(); |
320 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
321 | print("Connection closed by client: " + remoteAddr); |
322 | State state = state(socket); |
323 | if (state != null && state.computerID != null) |
324 | cI.lostConnection(state.computerID); |
325 | dataMapper.remove(socket); |
326 | removeFromAllChannels(socket); |
327 | } |
328 | |
329 | svoid closeSocket(SocketChannel socket) { |
330 | pcall { socket.close(); } |
331 | pcall { onSocketClose(socket); } |
332 | } |
333 | |
334 | static void readFromSocket(SelectionKey key) throws IOException { |
335 | final SocketChannel socket = cast key.channel(); |
336 | ByteBuffer buffer = ByteBuffer.allocate(1024); |
337 | int numRead = socket.read(buffer); |
338 | if (numRead == -1) { |
339 | closeSocket(socket); |
340 | key.cancel(); |
341 | ret; |
342 | } |
343 | |
344 | S s = new String(buffer.array(), 0, numRead); |
345 | //print("Got data from client: " + s); |
346 | final State state = state(socket); |
347 | if (state != null) { |
348 | state.lastMessage = sysNow(); |
349 | state.lineBuffer.append(s, voidfunc(S line) { |
350 | //if (nempty(line)) print("Got line: " + line); |
351 | pcall { handleLine(line, socket, state); } |
352 | }); |
353 | if (state.lineBuffer.size() > maxLineLength) { |
354 | print("Bad client (line too long"); |
355 | closeSocket(socket); |
356 | } |
357 | } |
358 | } |
359 | |
360 | static void writeToSocket(SelectionKey key) throws IOException { |
361 | final SocketChannel socket = cast key.channel(); |
362 | socket.register(selector, SelectionKey.OP_READ); |
363 | State state = state(socket); |
364 | lock state.lock; |
365 | if (empty(state.toSend)) ret; |
366 | byte[] data = popFirst(state.toSend); |
367 | print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID); |
368 | sendBytesToSocket(socket, data); |
369 | } |
370 | |
371 | svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException { |
372 | int length = l(bytes); |
373 | State state = state(socket); |
374 | lock state.lock; |
375 | if (nempty(state.toSend)) { |
376 | print("Postponing send to " + state.computerID + " because data already queued"); |
377 | ret with state.toSend.add(bytes); |
378 | } |
379 | ByteBuffer buf = ByteBuffer.wrap(bytes); |
380 | int n = socket.write(buf); |
381 | if (n < length) { |
382 | byte[] remaining = subByteArray(bytes, n); |
383 | state.toSend.add(remaining); |
384 | print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID); |
385 | socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); |
386 | } |
387 | } |
388 | |
389 | svoid handleLine(S line, SocketChannel socket, State state) { |
390 | new Matches m; |
391 | if (empty(line)) ret with sendLineToSocket(socket, ""); // idle |
392 | |
393 | if (jmatch("computerID=*", line, m)) { |
394 | if (state.computerID != null || !possibleComputerID($1)) ret; |
395 | print("Got computer ID: " + $1); |
396 | state.computerID = $1; |
397 | cI.gotConnection(state.computerID); |
398 | ret; |
399 | } |
400 | |
401 | if (jmatch("sub *", line, m)) { |
402 | Channel channel = getChannelForSocket(socket, $1); |
403 | if (channel == null) |
404 | sendLineToSocket(socket, "Channel " + $1 + " not found"); |
405 | else |
406 | sendLineToSocket(socket, "Subbed " + $1); |
407 | ret; |
408 | } |
409 | |
410 | L<S> tok = javaTok(line); |
411 | if (eq(get(tok, 3), ":")) { |
412 | Channel channel = state.subbedChannels.get(get(tok, 1)); |
413 | if (channel != null) |
414 | channel.handleLine(joinSubList(tok, 5), socket); |
415 | } |
416 | |
417 | // ignore unknown line |
418 | } |
419 | |
420 | html { ret "Use port " + port; } |
421 | |
422 | svoid sendLineToSocket(SocketChannel socket, S s) ctex { |
423 | byte[] bytes = toUtf8(s + "\n"); |
424 | sendBytesToSocket(socket, bytes); |
425 | } |
426 | |
427 | svoid removeFromAllChannels(SocketChannel socket) { |
428 | lock channelsLock; |
429 | for (S name, Channel c : cloneMap(channels)) { pcall { |
430 | c.removeSocket(socket); |
431 | if (empty(c.sockets)) { |
432 | print("Deleting channel: " + name); |
433 | channels.remove(name); |
434 | } |
435 | }} |
436 | } |
437 | |
438 | static Channel getChannelForSocket(SocketChannel socket, S name) { |
439 | lock channelsLock; |
440 | Channel c = channels.get(name); |
441 | if (c == null) { |
442 | c = makeChannel(name); |
443 | c.name = name; |
444 | if (c == null) null; |
445 | print("New channel: " + name); |
446 | channels.put(name, c); |
447 | lock c.lock; |
448 | c.update(); |
449 | } |
450 | c.addSocket(socket); |
451 | state(socket).subbedChannels.put(name, c); |
452 | ret c; |
453 | } |
454 | |
455 | static Channel makeChannel(S name) { |
456 | if (eq(name, "computerIDs")) ret new ComputerIDsChannel; |
457 | if (eq(name, "computerCount")) ret new ComputerCountChannel; |
458 | if (eq(name, "chat")) ret new PublicChatChannel; |
459 | if (eq(name, "machineChat")) ret new PublicChatChannel; |
460 | if (eq(name, "privateChat")) ret new PrivateChatChannel; |
461 | if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":"); |
462 | if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel; |
463 | |
464 | if (eq(name, "gazelleChat")) ret new PublicChatChannel; |
465 | if (eq(name, "gazellePresence")) ret new PresenceCountingChannel; |
466 | |
467 | ret null with print("Unknown channel: " + name); |
468 | } |
469 | |
470 | svoid update { |
471 | print("OS Instances update"); |
472 | if (empty(dataMapper)) ret; |
473 | |
474 | int value = cI.value(); |
475 | //if (value == theNumber) ret; |
476 | theNumber = value; |
477 | print("OS Instances update done"); |
478 | } |
Began life as a copy of #1016571
download show line numbers debug dex old transpilations
Travelled to 14 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, irmadwmeruwu, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt
No comments. add comment
Snippet ID: | #1016572 |
Snippet name: | OS Instances Connector [NIO Socket, Web Bot, LIVE] |
Eternal ID of this version: | #1016572/116 |
Text MD5: | d29f1d064be1c6dcad2f16d4622285fa |
Transpilation MD5: | 75b0b7ca6f89db2a0143e25a6af62f38 |
Author: | stefan |
Category: | javax / a.i. / web |
Type: | JavaX module (desktop) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2022-04-08 18:47:05 |
Source code size: | 13857 bytes / 478 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 841 / 2993 |
Version history: | 115 change(s) |
Referenced in: | [show references] |