!7 static int updateInterval = 10; static long timeout = 30000+10000; // idle time + latency static int port = 6000; static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this import java.nio.*; import java.nio.channels.*; static ConnectedInstances cI; static ReliableSingleThread rst = new(r update); static int theNumber; static Map<SocketChannel, State> dataMapper = synchroMap(); static ServerSocketChannel serverChannel; static Selector selector; static Map<S, Channel> channels = synchroMap(); static Lock channelsLock = lock(); static java.util.Timer mainTimer; sbool debug; sclass State { S socketID = aGlobalID(), computerID; int countSeen = -1; new LineBuffer lineBuffer; long lastMessage = sysNow(); Map<S, Channel> subbedChannels = synchroLinkedHashMap(); Lock lock = lock(); // TODO: clear eventually to avoid stalling attacks L<byte[]> toSend = synchroLinkedList(); } sclass Channel<A> { S name; Set<SocketChannel> sockets = synchroSet(); Map<SocketChannel, A> dataMapper = synchroMap(); Lock lock = lock(); void addSocket(SocketChannel socket) { lock lock; sockets.add(socket); initialMessage(socket); } void removeSocket(SocketChannel socket) { lock lock; sockets.remove(socket); } void update() {} void initialMessage(SocketChannel socket) {} void handleLine(S line, SocketChannel socket) {} } sclass ComputerIDsChannel extends Channel<O> { L<S> value; void initialMessage(SocketChannel socket) { lock lock; sendTo(socket); } void update() { if (set_trueIfChanged(this, value := asList(cI.set()))) for (SocketChannel s : sockets) sendTo(s); } void sendTo(SocketChannel socket) { sendLineToSocket(socket, "ComputerIDs = " + struct(value)); } } sclass ComputerCountChannel extends Channel<O> { int value; void initialMessage(SocketChannel socket) { lock lock; sendTo(socket); } void update() { if (set_trueIfChanged(this, value := cI.value())) for (SocketChannel s : sockets) sendTo(s); } void sendTo(SocketChannel socket) { sendLineToSocket(socket, "ComputerCount = " + value); } } sclass PublicChatChannel extends Channel<O> { L<S> msgs = synchroLinkedList(); void handleLine(S line, SocketChannel socket) { postMessage(unquote(line)); } void postMessage(S msg) { msgs.add(msg); } void update { S msg; while ((msg = syncPopFirst(msgs)) != null) { S quoted = quote(msg); //print("Chat send: " + shorten(quoted, 20)); quoted = name + ":" + quoted; for (SocketChannel s : sockets) sendLineToSocket(s, quoted); } } } sclass PrivateChatChannel extends Channel<O> { S prefix = ""; new MultiMap<S, SocketChannel> clients; // key: computer ID void initialMessage(SocketChannel socket) { clients.put(computerID(socket), socket); } void removeSocket(SocketChannel socket) { super.removeSocket(socket); clients.remove(computerID(socket), socket); } void handleLine(S line, SocketChannel socket) { pcall { int i = indexOf(line, ' '); S computerID = takeFirst(line, i); while licensed { SocketChannel socket2 = clients.getFirst(computerID); if (socket2 == null) break; try { S sender = computerID(socket); if (l(line) >= 10000) print("Forwarding line from " + sender + " to " + computerID + ": " + l(line)); sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1)); break; } catch e { closeSocket(socket2); clients.remove(computerID, socket2); } } } } } sclass SnippetUpdatesChannel extends Channel<O> { ManualTailFile tail; *() { tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"), vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) }))); } void update { tail.check(); } void sendLine(S line) { print("SnippetUpdatesChannel send: " + line); line = assertNoLineBreaks(line); print("Sockets: " + l(sockets)); //S quoted = quote(line); for (SocketChannel s : sockets) sendLineToSocket(s, "snippetUpdates:" + line); } } static State state(SocketChannel socket) { ret dataMapper.get(socket); } sS computerID(SocketChannel socket) { State state = state(socket); ret state == null ? null : state.computerID; } p { cI = new ConnectedInstances; cI.setAfterglowInterval(1000); cI.connected2.onChange(rst); cI.start(); openServerSocket(); // restart socket every hour (and once now for testing - XX) /*doEvery(10.0, 60*60.0, r { pcall { cleanMeUp_socket(); } openServerSocket(); });*/ } svoid openServerSocket ctex { print("Opening server socket"); selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetSocketAddress sockAddr = new InetSocketAddress(port); serverChannel.socket().bind(sockAddr); print("Server socket bound"); serverChannel.register(selector, SelectionKey.OP_ACCEPT); thread "Server Accept Loop" { acceptLoop(); } mainTimer = doEvery(updateInterval, r { for (SocketChannel socket : keysList(dataMapper)) { State state = state(socket); if (state != null && sysTime() >= state.lastMessage+timeout) pcall { closeSocket(socket); } } for (Channel channel : values(channels)) pcall { lock channel.lock; channel.update(); } }); } svoid cleanMeUp_socket() ctex { if (serverChannel != null) serverChannel.close(); for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall { socket.close(); } } svoid acceptLoop ctex { while licensed { // wait for events selector.select(); Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); debug "have key"; if (!key.isValid()) continue with debug "key not valid"; pcall { if (key.isAcceptable()) { debug "Key is acceptable"; acceptOnSocket(key); } else if (key.isReadable()) { debug "Key is readable"; readFromSocket(key); } else if (key.isWritable()) { debug "Key is writable"; writeToSocket(key); } else debug "Unknown key type"; } } } } //accept a connection made to this channel's socket static void acceptOnSocket(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = cast key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); Socket actualSocket = channel.socket(); SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); print("Connected to: " + remoteAddr); // register channel with selector for further IO dataMapper.put(channel, new State); channel.register(selector, SelectionKey.OP_READ); } svoid onSocketClose(SocketChannel socket) { Socket actualSocket = socket.socket(); SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress(); print("Connection closed by client: " + remoteAddr); State state = state(socket); if (state != null && state.computerID != null) cI.lostConnection(state.computerID); dataMapper.remove(socket); removeFromAllChannels(socket); } svoid closeSocket(SocketChannel socket) { pcall { socket.close(); } pcall { onSocketClose(socket); } } static void readFromSocket(SelectionKey key) throws IOException { final SocketChannel socket = cast key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int numRead = socket.read(buffer); if (numRead == -1) { closeSocket(socket); key.cancel(); ret; } S s = new String(buffer.array(), 0, numRead); //print("Got data from client: " + s); final State state = state(socket); if (state != null) { state.lastMessage = sysNow(); state.lineBuffer.append(s, voidfunc(S line) { //if (nempty(line)) print("Got line: " + line); pcall { handleLine(line, socket, state); } }); if (state.lineBuffer.size() > maxLineLength) { print("Bad client (line too long"); closeSocket(socket); } } } static void writeToSocket(SelectionKey key) throws IOException { final SocketChannel socket = cast key.channel(); socket.register(selector, SelectionKey.OP_READ); State state = state(socket); lock state.lock; if (empty(state.toSend)) ret; byte[] data = popFirst(state.toSend); print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID); sendBytesToSocket(socket, data); } svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException { int length = l(bytes); State state = state(socket); lock state.lock; if (nempty(state.toSend)) { print("Postponing send to " + state.computerID + " because data already queued"); ret with state.toSend.add(bytes); } ByteBuffer buf = ByteBuffer.wrap(bytes); int n = socket.write(buf); if (n < length) { byte[] remaining = subByteArray(bytes, n); state.toSend.add(remaining); print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID); socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } svoid handleLine(S line, SocketChannel socket, State state) { new Matches m; if (empty(line)) ret with sendLineToSocket(socket, ""); // idle if (jmatch("computerID=*", line, m)) { if (state.computerID != null || !possibleComputerID($1)) ret; print("Got computer ID: " + $1); state.computerID = $1; cI.gotConnection(state.computerID); ret; } if (jmatch("sub *", line, m)) { Channel channel = getChannelForSocket(socket, $1); if (channel == null) sendLineToSocket(socket, "Channel " + $1 + " not found"); else sendLineToSocket(socket, "Subbed " + $1); ret; } L<S> tok = javaTok(line); if (eq(get(tok, 3), ":")) { Channel channel = state.subbedChannels.get(get(tok, 1)); if (channel != null) channel.handleLine(joinSubList(tok, 5), socket); } // ignore unknown line } html { ret "Use port " + port; } svoid sendLineToSocket(SocketChannel socket, S s) ctex { byte[] bytes = toUtf8(s + "\n"); sendBytesToSocket(socket, bytes); } svoid removeFromAllChannels(SocketChannel socket) { lock channelsLock; for (S name, Channel c : cloneMap(channels)) { pcall { c.removeSocket(socket); if (empty(c.sockets)) { print("Deleting channel: " + name); channels.remove(name); } }} } static Channel getChannelForSocket(SocketChannel socket, S name) { lock channelsLock; Channel c = channels.get(name); if (c == null) { c = makeChannel(name); c.name = name; if (c == null) null; print("New channel: " + name); channels.put(name, c); lock c.lock; c.update(); } c.addSocket(socket); state(socket).subbedChannels.put(name, c); ret c; } static Channel makeChannel(S name) { if (eq(name, "computerIDs")) ret new ComputerIDsChannel; if (eq(name, "computerCount")) ret new ComputerCountChannel; if (eq(name, "chat")) ret new PublicChatChannel; if (eq(name, "machineChat")) ret new PublicChatChannel; if (eq(name, "privateChat")) ret new PrivateChatChannel; if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":"); if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel; ret null with print("Unknown channel: " + name); } svoid update { print("OS Instances update"); if (empty(dataMapper)) ret; int value = cI.value(); //if (value == theNumber) ret; theNumber = value; print("OS Instances update done"); }
Began life as a copy of #1016572
download show line numbers debug dex old transpilations
Travelled to 2 computer(s): bhatertpkbcr, mqqgnosmbjvj
No comments. add comment
Snippet ID: | #1035030 |
Snippet name: | OS Instances Connector [backup] |
Eternal ID of this version: | #1035030/1 |
Text MD5: | e401d37e10f5a35b068846141bd97b54 |
Author: | stefan |
Category: | javax / a.i. / web |
Type: | JavaX module (desktop) |
Public (visible to everyone): | Yes |
Archived (hidden from active list): | No |
Created/modified: | 2022-03-24 13:24:56 |
Source code size: | 12300 bytes / 433 lines |
Pitched / IR pitched: | No / No |
Views / Downloads: | 183 / 329 |
Referenced in: | -