1 | !7 |
2 | |
3 | static int updateInterval = 10; |
4 | static long timeout = 30000+10000; // idle time + latency |
5 | static int port = 6000; |
6 | static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this |
7 | |
8 | import java.nio.*; |
9 | import java.nio.channels.*; |
10 | |
11 | static ConnectedInstances cI; |
12 | static ReliableSingleThread rst = new(r update); |
13 | static int theNumber; |
14 | static Map<SocketChannel, State> dataMapper = synchroMap(); |
15 | static ServerSocketChannel serverChannel; |
16 | static Selector selector; |
17 | static Map<S, Channel> channels = synchroMap(); |
18 | static Lock channelsLock = lock(); |
19 | static java.util.Timer mainTimer; |
20 | |
21 | sclass State { |
22 | S socketID = aGlobalID(), computerID; |
23 | int countSeen = -1; |
24 | new LineBuffer lineBuffer; |
25 | long lastMessage = sysNow(); |
26 | Map<S, Channel> subbedChannels = synchroLinkedHashMap(); |
27 | } |
28 | |
29 | sclass Channel<A> { |
30 | S name; |
31 | Set<SocketChannel> sockets = synchroSet(); |
32 | Map<SocketChannel, A> dataMapper = synchroMap(); |
33 | Lock lock = lock(); |
34 | |
35 | void addSocket(SocketChannel socket) { |
36 | lock lock; |
37 | sockets.add(socket); |
38 | initialMessage(socket); |
39 | } |
40 | |
41 | void removeSocket(SocketChannel socket) { |
42 | lock lock; |
43 | sockets.remove(socket); |
44 | } |
45 | |
46 | void update() {} |
47 | void initialMessage(SocketChannel socket) {} |
48 | void handleLine(S line, SocketChannel socket) {} |
49 | } |
50 | |
51 | sclass ComputerIDsChannel extends Channel<O> { |
52 | L<S> value; |
53 | |
54 | void initialMessage(SocketChannel socket) { |
55 | lock lock; |
56 | sendTo(socket); |
57 | } |
58 | |
59 | void update() { |
60 | if (set_trueIfChanged(this, value := asList(cI.set()))) |
61 | for (SocketChannel s : sockets) sendTo(s); |
62 | } |
63 | |
64 | void sendTo(SocketChannel socket) { |
65 | sendLineToSocket(socket, "ComputerIDs = " + struct(value)); |
66 | } |
67 | } |
68 | |
69 | sclass ComputerCountChannel extends Channel<O> { |
70 | int value; |
71 | |
72 | void initialMessage(SocketChannel socket) { |
73 | lock lock; |
74 | sendTo(socket); |
75 | } |
76 | |
77 | void update() { |
78 | if (set_trueIfChanged(this, value := cI.value())) |
79 | for (SocketChannel s : sockets) sendTo(s); |
80 | } |
81 | |
82 | void sendTo(SocketChannel socket) { |
83 | sendLineToSocket(socket, "ComputerCount = " + value); |
84 | } |
85 | } |
86 | |
87 | sclass PublicChatChannel extends Channel<O> { |
88 | L<S> msgs = synchroLinkedList(); |
89 | |
90 | void handleLine(S line, SocketChannel socket) { |
91 | postMessage(unquote(line)); |
92 | } |
93 | |
94 | void postMessage(S msg) { |
95 | msgs.add(msg); |
96 | } |
97 | |
98 | void update { |
99 | S msg; |
100 | while ((msg = syncPopFirst(msgs)) != null) { |
101 | S quoted = quote(msg); |
102 | print("Chat send: " + shorten(quoted, 20)); |
103 | quoted = "chat:" + quoted; |
104 | for (SocketChannel s : sockets) |
105 | sendLineToSocket(s, quoted); |
106 | } |
107 | } |
108 | } |
109 | |
110 | sclass PrivateChatChannel extends Channel<O> { |
111 | S prefix = ""; |
112 | new MultiMap<S, SocketChannel> clients; // key: computer ID |
113 | |
114 | void initialMessage(SocketChannel socket) { |
115 | clients.put(computerID(socket), socket); |
116 | } |
117 | |
118 | void removeSocket(SocketChannel socket) { |
119 | super.removeSocket(socket); |
120 | clients.remove(computerID(socket), socket); |
121 | } |
122 | |
123 | void handleLine(S line, SocketChannel socket) { |
124 | pcall { |
125 | int i = indexOf(line, ' '); |
126 | S computerID = takeFirst(line, i); |
127 | while licensed { |
128 | SocketChannel socket2 = clients.getFirst(computerID); |
129 | if (socket2 == null) break; |
130 | try { |
131 | S sender = computerID(socket); |
132 | if (l(line) >= 10000) |
133 | print("Forwarding line from " + sender + " to " + computerID + ": " + l(line)); |
134 | sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1)); |
135 | break; |
136 | } catch e { |
137 | closeSocket(socket2); |
138 | clients.remove(computerID, socket2); |
139 | } |
140 | } |
141 | } |
142 | } |
143 | } |
144 | |
145 | sclass SnippetUpdatesChannel extends Channel<O> { |
146 | ManualTailFile tail; |
147 | |
148 | *() { |
149 | tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"), |
150 | vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) }))); |
151 | } |
152 | |
153 | void update { |
154 | tail.check(); |
155 | } |
156 | |
157 | void sendLine(S line) { |
158 | print("SnippetUpdatesChannel send: " + line); |
159 | line = assertNoLineBreaks(line); |
160 | print("Sockets: " + l(sockets)); |
161 | //S quoted = quote(line); |
162 | for (SocketChannel s : sockets) |
163 | sendLineToSocket(s, "snippetUpdates:" + line); |
164 | } |
165 | } |
166 | |
167 | static State state(SocketChannel socket) { |
168 | ret dataMapper.get(socket); |
169 | } |
170 | |
171 | sS computerID(SocketChannel socket) { |
172 | State state = state(socket); |
173 | ret state == null ? null : state.computerID; |
174 | } |
175 | |
176 | p { |
177 | cI = new ConnectedInstances; |
178 | cI.setAfterglowInterval(1000); |
179 | cI.connected2.onChange(rst); |
180 | cI.start(); |
181 | |
182 | selector = Selector.open(); |
183 | serverChannel = ServerSocketChannel.open(); |
184 | serverChannel.configureBlocking(false); |
185 | |
186 | InetSocketAddress sockAddr = new InetSocketAddress(port); |
187 | serverChannel.socket().bind(sockAddr); |
188 | serverChannel.register(selector, SelectionKey.OP_ACCEPT); |
189 | |
190 | thread "Server Accept Loop" { acceptLoop(); } |
191 | |
192 | mainTimer = doEvery(updateInterval, r { |
193 | for (SocketChannel socket : keysList(dataMapper)) { |
194 | State state = state(socket); |
195 | if (state != null && sysTime() >= state.lastMessage+timeout) pcall { |
196 | closeSocket(socket); |
197 | } |
198 | } |
199 | for (Channel channel : values(channels)) pcall { |
200 | lock channel.lock; |
201 | channel.update(); |
202 | } |
203 | }); |
204 | } |
205 | |
206 | svoid cleanMeUp() ctex { |
207 | if (serverChannel != null) |
208 | serverChannel.close(); |
209 | for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall { |
210 | socket.close(); |
211 | } |
212 | } |
213 | |
214 | svoid acceptLoop ctex { |
215 | while licensed { |
216 | // wait for events |
217 | selector.select(); |
218 | |
219 | Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); |
220 | while (keys.hasNext()) { |
221 | SelectionKey key = keys.next(); |
222 | keys.remove(); |
223 | |
224 | if (!key.isValid()) continue; |
225 | pcall { |
226 | if (key.isAcceptable()) |
227 | acceptOnSocket(key); |
228 | else if (key.isReadable()) |
229 | readFromSocket(key); |
230 | } |
231 | } |
232 | } |
233 | } |
234 | |
235 | //accept a connection made to this channel's socket |
236 | static void acceptOnSocket(SelectionKey key) throws IOException { |
237 | ServerSocketChannel serverChannel = cast key.channel(); |
238 | SocketChannel channel = serverChannel.accept(); |
239 | channel.configureBlocking(false); |
240 | Socket actualSocket = channel.socket(); |
241 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
242 | print("Connected to: " + remoteAddr); |
243 | |
244 | // register channel with selector for further IO |
245 | dataMapper.put(channel, new State); |
246 | channel.register(selector, SelectionKey.OP_READ); |
247 | } |
248 | |
249 | svoid onSocketClose(SocketChannel socket) { |
250 | Socket actualSocket = socket.socket(); |
251 | SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); |
252 | print("Connection closed by client: " + remoteAddr); |
253 | State state = state(socket); |
254 | if (state != null && state.computerID != null) |
255 | cI.lostConnection(state.computerID); |
256 | dataMapper.remove(socket); |
257 | removeFromAllChannels(socket); |
258 | } |
259 | |
260 | svoid closeSocket(SocketChannel socket) { |
261 | pcall { socket.close(); } |
262 | pcall { onSocketClose(socket); } |
263 | } |
264 | |
265 | static void readFromSocket(SelectionKey key) throws IOException { |
266 | final SocketChannel socket = cast key.channel(); |
267 | ByteBuffer buffer = ByteBuffer.allocate(1024); |
268 | int numRead = socket.read(buffer); |
269 | if (numRead == -1) { |
270 | closeSocket(socket); |
271 | key.cancel(); |
272 | ret; |
273 | } |
274 | |
275 | S s = new String(buffer.array(), 0, numRead); |
276 | //print("Got data from client: " + s); |
277 | final State state = state(socket); |
278 | if (state != null) { |
279 | state.lastMessage = sysNow(); |
280 | state.lineBuffer.append(s, voidfunc(S line) { |
281 | //if (nempty(line)) print("Got line: " + line); |
282 | pcall { handleLine(line, socket, state); } |
283 | }); |
284 | if (state.lineBuffer.size() > maxLineLength) { |
285 | print("Bad client (line too long"); |
286 | closeSocket(socket); |
287 | } |
288 | } |
289 | } |
290 | |
291 | svoid handleLine(S line, SocketChannel socket, State state) { |
292 | new Matches m; |
293 | if (empty(line)) ret with sendLineToSocket(socket, ""); // idle |
294 | |
295 | if (jmatch("computerID=*", line, m)) { |
296 | if (state.computerID != null || !possibleComputerID($1)) ret; |
297 | print("Got computer ID: " + $1); |
298 | state.computerID = $1; |
299 | cI.gotConnection(state.computerID); |
300 | ret; |
301 | } |
302 | |
303 | if (jmatch("sub *", line, m)) { |
304 | Channel channel = getChannelForSocket(socket, $1); |
305 | if (channel == null) |
306 | sendLineToSocket(socket, "Channel " + $1 + " not found"); |
307 | else |
308 | sendLineToSocket(socket, "Subbed " + $1); |
309 | ret; |
310 | } |
311 | |
312 | L<S> tok = javaTok(line); |
313 | if (eq(get(tok, 3), ":")) { |
314 | Channel channel = state.subbedChannels.get(get(tok, 1)); |
315 | if (channel != null) |
316 | channel.handleLine(joinSubList(tok, 5), socket); |
317 | } |
318 | |
319 | // ignore unknown line |
320 | } |
321 | |
322 | html { ret "Use port " + port; } |
323 | |
324 | svoid sendLineToSocket(SocketChannel socket, S s) ctex { |
325 | byte[] bytes = toUtf8(s + "\n"); |
326 | int length = l(bytes); |
327 | ByteBuffer buf = ByteBuffer.wrap(bytes); |
328 | int n = socket.write(buf); |
329 | if (n < length) |
330 | print("Socket write failed! TODO"); |
331 | } |
332 | |
333 | svoid removeFromAllChannels(SocketChannel socket) { |
334 | lock channelsLock; |
335 | for (S name, Channel c : cloneMap(channels)) { pcall { |
336 | c.removeSocket(socket); |
337 | if (empty(c.sockets)) { |
338 | print("Deleting channel: " + name); |
339 | channels.remove(name); |
340 | } |
341 | }} |
342 | } |
343 | |
344 | static Channel getChannelForSocket(SocketChannel socket, S name) { |
345 | lock channelsLock; |
346 | Channel c = channels.get(name); |
347 | if (c == null) { |
348 | c = makeChannel(name); |
349 | c.name = name; |
350 | if (c == null) null; |
351 | print("New channel: " + name); |
352 | channels.put(name, c); |
353 | lock c.lock; |
354 | c.update(); |
355 | } |
356 | c.addSocket(socket); |
357 | state(socket).subbedChannels.put(name, c); |
358 | ret c; |
359 | } |
360 | |
361 | static Channel makeChannel(S name) { |
362 | if (eq(name, "computerIDs")) ret new ComputerIDsChannel; |
363 | if (eq(name, "computerCount")) ret new ComputerCountChannel; |
364 | if (eq(name, "chat")) ret new PublicChatChannel; |
365 | if (eq(name, "privateChat")) ret new PrivateChatChannel; |
366 | if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":"); |
367 | if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel; |
368 | |
369 | ret null with print("Unknown channel: " + name); |
370 | } |
371 | |
372 | svoid update { |
373 | print("OS Instances update"); |
374 | if (empty(dataMapper)) ret; |
375 | |
376 | int value = cI.value(); |
377 | //if (value == theNumber) ret; |
378 | theNumber = value; |
379 | print("OS Instances update done"); |
380 | } |
Began life as a copy of #1016572
download show line numbers debug dex old transpilations
Travelled to 7 computer(s): bhatertpkbcr, cfunsshuasjs, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt
No comments. add comment
Snippet ID: | #1022169 |
Snippet name: | OS Instances Connector [NIO Socket, backup without buffers] |
Eternal ID of this version: | #1022169/1 |
Text MD5: | 3ed4072983e6ef4b6085ae6906e62e6e |
Author: | stefan |
Category: | javax / a.i. / web |
Type: | JavaX module |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2019-03-10 11:23:02 |
Source code size: | 10473 bytes / 380 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 286 / 456 |
Referenced in: | [show references] |