Libraryless. Click here for Pure Java version (9434L/64K).
1 | sclass OSInstancesConnector { |
2 | int updateInterval = 10; |
3 | long timeout = 30000+10000; // idle time + latency |
4 | int port = 6000; |
5 | int maxLineLength = 10*1024*1024; // change #1020500 too if you change this |
6 | |
7 | import java.nio.*; |
8 | import java.nio.channels.*; |
9 | |
10 | ConnectedInstances cI; |
11 | ReliableSingleThread rst = new(r update); |
12 | int theNumber; |
13 | Map<SocketChannel, State> dataMapper = synchroMap(); |
14 | ServerSocketChannel serverChannel; |
15 | Selector selector; |
16 | Map<S, Channel> channels = synchroMap(); |
17 | Lock channelsLock = lock(); |
18 | java.util.Timer mainTimer; |
19 | bool debug; |
20 | L onLineReceived = syncList(); // L<vf(line, socket, state)> |
21 | |
22 | class State { |
23 | S socketID = aGlobalID(), computerID; |
24 | int countSeen = -1; |
25 | new LineBuffer lineBuffer; |
26 | long lastMessage = sysNow(); |
27 | Map<S, Channel> subbedChannels = synchroLinkedHashMap(); |
28 | Lock lock = lock(); |
29 | |
30 | // TODO: clear eventually to avoid stalling attacks |
31 | L<byte[]> toSend = synchroLinkedList(); |
32 | } |
33 | |
34 | class Channel<A> { |
35 | S name; |
36 | Set<SocketChannel> sockets = synchroSet(); |
37 | Map<SocketChannel, A> dataMapper = synchroMap(); |
38 | Lock lock = lock(); |
39 | |
40 | void addSocket(SocketChannel socket) { |
41 | lock lock; |
42 | sockets.add(socket); |
43 | initialMessage(socket); |
44 | } |
45 | |
46 | void removeSocket(SocketChannel socket) { |
47 | lock lock; |
48 | sockets.remove(socket); |
49 | } |
50 | |
51 | void update() {} |
52 | void initialMessage(SocketChannel socket) {} |
53 | void handleLine(S line, SocketChannel socket) {} |
54 | } |
55 | |
56 | class ComputerIDsChannel extends Channel<O> { |
57 | L<S> value; |
58 | |
59 | void initialMessage(SocketChannel socket) { |
60 | lock lock; |
61 | sendTo(socket); |
62 | } |
63 | |
64 | void update() { |
65 | if (set_trueIfChanged(this, value := asList(cI.set()))) |
66 | for (SocketChannel s : sockets) sendTo(s); |
67 | } |
68 | |
69 | void sendTo(SocketChannel socket) { |
70 | sendLineToSocket(socket, "ComputerIDs = " + struct(value)); |
71 | } |
72 | } |
73 | |
74 | class ComputerCountChannel extends Channel<O> { |
75 | int value; |
76 | |
77 | void initialMessage(SocketChannel socket) { |
78 | lock lock; |
79 | sendTo(socket); |
80 | } |
81 | |
82 | void update() { |
83 | if (set_trueIfChanged(this, value := cI.value())) |
84 | for (SocketChannel s : sockets) sendTo(s); |
85 | } |
86 | |
87 | void sendTo(SocketChannel socket) { |
88 | sendLineToSocket(socket, "ComputerCount = " + value); |
89 | } |
90 | } |
91 | |
92 | class PublicChatChannel extends Channel<O> { |
93 | L<S> msgs = synchroLinkedList(); |
94 | |
95 | void handleLine(S line, SocketChannel socket) { |
96 | postMessage(unquote(line)); |
97 | } |
98 | |
99 | void postMessage(S msg) { |
100 | msgs.add(msg); |
101 | } |
102 | |
103 | void update { |
104 | S msg; |
105 | while ((msg = syncPopFirst(msgs)) != null) { |
106 | S quoted = quote(msg); |
107 | //print("Chat send: " + shorten(quoted, 20)); |
108 | quoted = name + ":" + quoted; |
109 | for (SocketChannel s : sockets) |
110 | sendLineToSocket(s, quoted); |
111 | } |
112 | } |
113 | } |
114 | |
115 | class PrivateChatChannel extends Channel<O> { |
116 | S prefix = ""; |
117 | new MultiMap<S, SocketChannel> clients; // key: computer ID |
118 | |
119 | void initialMessage(SocketChannel socket) { |
120 | clients.put(computerID(socket), socket); |
121 | } |
122 | |
123 | void removeSocket(SocketChannel socket) { |
124 | super.removeSocket(socket); |
125 | clients.remove(computerID(socket), socket); |
126 | } |
127 | |
128 | void handleLine(S line, SocketChannel socket) { |
129 | pcall { |
130 | int i = indexOf(line, ' '); |
131 | S computerID = takeFirst(line, i); |
132 | while licensed { |
133 | SocketChannel socket2 = clients.getFirst(computerID); |
134 | if (socket2 == null) break; |
135 | try { |
136 | S sender = computerID(socket); |
137 | if (l(line) >= 10000) |
138 | print("Forwarding line from " + sender + " to " + computerID + ": " + l(line)); |
139 | sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1)); |
140 | break; |
141 | } catch e { |
142 | closeSocket(socket2); |
143 | clients.remove(computerID, socket2); |
144 | } |
145 | } |
146 | } |
147 | } |
148 | } |
149 | |
150 | class SnippetUpdatesChannel extends Channel<O> { |
151 | ManualTailFile tail; |
152 | |
153 | *() { |
154 | tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"), |
155 | vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) }))); |
156 | } |
157 | |
158 | void update { |
159 | tail.check(); |
160 | } |
161 | |
162 | void sendLine(S line) { |
163 | print("SnippetUpdatesChannel send: " + line); |
164 | line = assertNoLineBreaks(line); |
165 | print("Sockets: " + l(sockets)); |
166 | //S quoted = quote(line); |
167 | for (SocketChannel s : sockets) |
168 | sendLineToSocket(s, "snippetUpdates:" + line); |
169 | } |
170 | } |
171 | |
172 | State state(SocketChannel socket) { |
173 | ret dataMapper.get(socket); |
174 | } |
175 | |
176 | S computerID(SocketChannel socket) { |
177 | State state = state(socket); |
178 | ret state == null ? null : state.computerID; |
179 | } |
180 | |
181 | *() {} |
182 | *(int *port) {} |
183 | |
184 | void start { |
185 | cI = new ConnectedInstances; |
186 | cI.setAfterglowInterval(1000); |
187 | cI.connected2.onChange(rst); |
188 | cI.start(); |
189 | |
190 | openServerSocket(); |
191 | |
192 | // restart socket every hour (and once now for testing - XX) |
193 | /*doEvery(10.0, 60*60.0, r { |
194 | pcall { cleanMeUp_socket(); } |
195 | openServerSocket(); |
196 | });*/ |
197 | } |
198 | |
199 | void openServerSocket ctex { |
200 | print("Opening server socket"); |
201 | selector = Selector.open(); |
202 | serverChannel = ServerSocketChannel.open(); |
203 | serverChannel.configureBlocking(false); |
204 | |
205 | InetSocketAddress sockAddr = new InetSocketAddress(port); |
206 | serverChannel.socket().bind(sockAddr); |
207 | print("Server socket bound"); |
208 | serverChannel.register(selector, SelectionKey.OP_ACCEPT); |
209 | |
210 | thread "Server Accept Loop" { acceptLoop(); } |
211 | |
212 | mainTimer = doEvery(updateInterval, r { |
213 | for (SocketChannel socket : keysList(dataMapper)) { |
214 | State state = state(socket); |
215 | if (state != null && sysTime() >= state.lastMessage+timeout) pcall { |
216 | closeSocket(socket); |
217 | } |
218 | } |
219 | for (Channel channel : values(channels)) pcall { |
220 | lock channel.lock; |
221 | channel.update(); |
222 | } |
223 | }); |
224 | } |
225 | |
226 | void cleanMeUp_socket() ctex { |
227 | if (serverChannel != null) |
228 | serverChannel.close(); |
229 | for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall { |
230 | socket.close(); |
231 | } |
232 | } |
233 | |
234 | void acceptLoop ctex { |
235 | while licensed { |
236 | // wait for events |
237 | selector.select(); |
238 | |
239 | Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); |
240 | while (keys.hasNext()) { |
241 | SelectionKey key = keys.next(); |
242 | keys.remove(); |
243 | debug "have key"; |
244 | |
245 | if (!key.isValid()) |
246 | continue with debug "key not valid"; |
247 | |
248 | pcall { |
249 | if (key.isAcceptable()) { |
250 | debug "Key is acceptable"; |
251 | acceptOnSocket(key); |
252 | } else if (key.isReadable()) { |
253 | debug "Key is readable"; |
254 | readFromSocket(key); |
255 | } else if (key.isWritable()) { |
256 | debug "Key is writable"; |
257 | writeToSocket(key); |
258 | } else |
259 | debug "Unknown key type"; |
260 | } |
261 | } |
262 | } |
263 | } |
264 | |
265 | //accept a connection made to this channel's socket |
266 | void acceptOnSocket(SelectionKey key) throws IOException { |
267 | ServerSocketChannel serverChannel = cast key.channel(); |
268 | SocketChannel channel = serverChannel.accept(); |
269 | channel.configureBlocking(false); |
270 | Socket actualSocket = channel.socket(); |
271 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
272 | print("Connected to: " + remoteAddr); |
273 | |
274 | // register channel with selector for further IO |
275 | dataMapper.put(channel, new State); |
276 | channel.register(selector, SelectionKey.OP_READ); |
277 | } |
278 | |
279 | void onSocketClose(SocketChannel socket) { |
280 | Socket actualSocket = socket.socket(); |
281 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
282 | print("Connection closed by client: " + remoteAddr); |
283 | State state = state(socket); |
284 | if (state != null && state.computerID != null) |
285 | cI.lostConnection(state.computerID); |
286 | dataMapper.remove(socket); |
287 | removeFromAllChannels(socket); |
288 | } |
289 | |
290 | void closeSocket(SocketChannel socket) { |
291 | pcall { socket.close(); } |
292 | pcall { onSocketClose(socket); } |
293 | } |
294 | |
295 | void readFromSocket(SelectionKey key) throws IOException { |
296 | final SocketChannel socket = cast key.channel(); |
297 | ByteBuffer buffer = ByteBuffer.allocate(1024); |
298 | int numRead = socket.read(buffer); |
299 | if (numRead == -1) { |
300 | closeSocket(socket); |
301 | key.cancel(); |
302 | ret; |
303 | } |
304 | |
305 | S s = new String(buffer.array(), 0, numRead); |
306 | //print("Got data from client: " + s); |
307 | final State state = state(socket); |
308 | if (state != null) { |
309 | state.lastMessage = sysNow(); |
310 | state.lineBuffer.append(s, voidfunc(S line) { |
311 | //if (nempty(line)) print("Got line: " + line); |
312 | pcall { handleLine(line, socket, state); } |
313 | }); |
314 | if (state.lineBuffer.size() > maxLineLength) { |
315 | print("Bad client (line too long"); |
316 | closeSocket(socket); |
317 | } |
318 | } |
319 | } |
320 | |
321 | void writeToSocket(SelectionKey key) throws IOException { |
322 | final SocketChannel socket = cast key.channel(); |
323 | socket.register(selector, SelectionKey.OP_READ); |
324 | State state = state(socket); |
325 | lock state.lock; |
326 | if (empty(state.toSend)) ret; |
327 | byte[] data = popFirst(state.toSend); |
328 | print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID); |
329 | sendBytesToSocket(socket, data); |
330 | } |
331 | |
332 | void sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException { |
333 | int length = l(bytes); |
334 | State state = state(socket); |
335 | lock state.lock; |
336 | if (nempty(state.toSend)) { |
337 | print("Postponing send to " + state.computerID + " because data already queued"); |
338 | ret with state.toSend.add(bytes); |
339 | } |
340 | ByteBuffer buf = ByteBuffer.wrap(bytes); |
341 | int n = socket.write(buf); |
342 | if (n < length) { |
343 | byte[] remaining = subByteArray(bytes, n); |
344 | state.toSend.add(remaining); |
345 | print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID); |
346 | socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); |
347 | } |
348 | } |
349 | |
350 | void handleLine(S line, SocketChannel socket, State state) { |
351 | new Matches m; |
352 | |
353 | vmBus_send oic_gotLine(this, line, socket, state); |
354 | pcallFAll(onLineReceived, line, socket, state); |
355 | |
356 | if (empty(line)) ret with sendLineToSocket(socket, ""); // idle |
357 | |
358 | if (jmatch("computerID=*", line, m)) { |
359 | if (state.computerID != null || !possibleComputerID($1)) ret; |
360 | print("Got computer ID: " + $1); |
361 | state.computerID = $1; |
362 | cI.gotConnection(state.computerID); |
363 | ret; |
364 | } |
365 | |
366 | if (jmatch("sub *", line, m)) { |
367 | Channel channel = getChannelForSocket(socket, $1); |
368 | if (channel == null) |
369 | sendLineToSocket(socket, "Channel " + $1 + " not found"); |
370 | else |
371 | sendLineToSocket(socket, "Subbed " + $1); |
372 | ret; |
373 | } |
374 | |
375 | L<S> tok = javaTok(line); |
376 | if (eq(get(tok, 3), ":")) { |
377 | Channel channel = state.subbedChannels.get(get(tok, 1)); |
378 | if (channel != null) |
379 | channel.handleLine(joinSubList(tok, 5), socket); |
380 | } |
381 | |
382 | // ignore unknown line |
383 | } |
384 | |
385 | void sendLineToSocket(SocketChannel socket, S s) ctex { |
386 | byte[] bytes = toUtf8(s + "\n"); |
387 | sendBytesToSocket(socket, bytes); |
388 | } |
389 | |
390 | void removeFromAllChannels(SocketChannel socket) { |
391 | lock channelsLock; |
392 | for (S name, Channel c : cloneMap(channels)) { pcall { |
393 | c.removeSocket(socket); |
394 | if (empty(c.sockets)) { |
395 | print("Deleting channel: " + name); |
396 | channels.remove(name); |
397 | } |
398 | }} |
399 | } |
400 | |
401 | Channel getChannelForSocket(SocketChannel socket, S name) { |
402 | lock channelsLock; |
403 | Channel c = getChannel(name); |
404 | c.addSocket(socket); |
405 | state(socket).subbedChannels.put(name, c); |
406 | ret c; |
407 | } |
408 | |
409 | Channel getChannel(S name) { |
410 | lock channelsLock; |
411 | Channel c = channels.get(name); |
412 | if (c == null) { |
413 | c = makeChannel(name); |
414 | c.name = name; |
415 | if (c == null) null; |
416 | print("New channel: " + name); |
417 | channels.put(name, c); |
418 | lock c.lock; |
419 | c.update(); |
420 | } |
421 | ret c; |
422 | } |
423 | Channel makeChannel(S name) { |
424 | if (eq(name, "computerIDs")) ret new ComputerIDsChannel; |
425 | if (eq(name, "computerCount")) ret new ComputerCountChannel; |
426 | if (eq(name, "chat")) ret new PublicChatChannel; |
427 | if (eq(name, "machineChat")) ret new PublicChatChannel; |
428 | if (eq(name, "privateChat")) ret new PrivateChatChannel; |
429 | if (eq(name, "privateMachineChat")) ret setAll(new PrivateChatChannel, prefix := name + ":"); |
430 | if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel; |
431 | if (eq(name, "voiceOutput")) ret new PublicChatChannel; |
432 | |
433 | ret null with print("Unknown channel: " + name); |
434 | } |
435 | |
436 | void update { |
437 | print("OS Instances update"); |
438 | if (empty(dataMapper)) ret; |
439 | |
440 | int value = cI.value(); |
441 | //if (value == theNumber) ret; |
442 | theNumber = value; |
443 | print("OS Instances update done"); |
444 | } |
445 | } |
Began life as a copy of #1016572
download show line numbers debug dex old transpilations
Travelled to 7 computer(s): bhatertpkbcr, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt, xrpafgyirdlv
No comments. add comment
Snippet ID: | #1027046 |
Snippet name: | OSInstancesConnector as class [NIO Socket] |
Eternal ID of this version: | #1027046/13 |
Text MD5: | 63d8c2ab4c15ac0d6f39c7edec70e203 |
Transpilation MD5: | 4d18d9ac7057f7dd3cc980e5188e9cba |
Author: | stefan |
Category: | javax / networking |
Type: | JavaX fragment (include) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2020-02-15 15:19:00 |
Source code size: | 13398 bytes / 445 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 374 / 730 |
Version history: | 12 change(s) |
Referenced in: | [show references] |