Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

418
LINES

< > BotCompany Repo | #1022732 // OS Instances Connector [NIO Socket, Web Bot, backup]

JavaX module

Download Jar.

1  
!7
2  
3  
static int updateInterval = 10;
4  
static long timeout = 30000+10000; // idle time + latency
5  
static int port = 6000;
6  
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
7  
8  
import java.nio.*;
9  
import java.nio.channels.*;
10  
11  
static ConnectedInstances cI;
12  
static ReliableSingleThread rst = new(r update);
13  
static int theNumber;
14  
static Map<SocketChannel, State> dataMapper = synchroMap();
15  
static ServerSocketChannel serverChannel;
16  
static Selector selector;
17  
static Map<S, Channel> channels = synchroMap();
18  
static Lock channelsLock = lock();
19  
static java.util.Timer mainTimer;
20  
sbool debug;
21  
22  
sclass State {
23  
  S socketID = aGlobalID(), computerID;
24  
  int countSeen = -1;
25  
  new LineBuffer lineBuffer;
26  
  long lastMessage = sysNow();
27  
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
28  
  Lock lock = lock();
29  
  
30  
  // TODO: clear eventually to avoid stalling attacks
31  
  L<byte[]> toSend = synchroLinkedList();
32  
}
33  
34  
sclass Channel<A> {
35  
  S name;
36  
  Set<SocketChannel> sockets = synchroSet();
37  
  Map<SocketChannel, A> dataMapper = synchroMap();
38  
  Lock lock = lock();
39  
  
40  
  void addSocket(SocketChannel socket) {
41  
    lock lock;
42  
    sockets.add(socket);
43  
    initialMessage(socket);
44  
  }
45  
  
46  
  void removeSocket(SocketChannel socket) {
47  
    lock lock;
48  
    sockets.remove(socket);
49  
  }
50  
  
51  
  void update() {}
52  
  void initialMessage(SocketChannel socket) {}
53  
  void handleLine(S line, SocketChannel socket) {}
54  
}
55  
56  
sclass ComputerIDsChannel extends Channel<O> {
57  
  L<S> value;
58  
  
59  
  void initialMessage(SocketChannel socket) {
60  
    lock lock;
61  
    sendTo(socket);
62  
  }
63  
  
64  
  void update() {
65  
    if (set_trueIfChanged(this, value := asList(cI.set())))
66  
      for (SocketChannel s : sockets) sendTo(s);
67  
  }
68  
  
69  
  void sendTo(SocketChannel socket) {
70  
    sendLineToSocket(socket, "ComputerIDs = " + struct(value));
71  
  }
72  
}
73  
74  
sclass ComputerCountChannel extends Channel<O> {
75  
  int value;
76  
  
77  
  void initialMessage(SocketChannel socket) {
78  
    lock lock;
79  
    sendTo(socket);
80  
  }
81  
  
82  
  void update() {
83  
    if (set_trueIfChanged(this, value := cI.value()))
84  
      for (SocketChannel s : sockets) sendTo(s);
85  
  }
86  
  
87  
  void sendTo(SocketChannel socket) {
88  
    sendLineToSocket(socket, "ComputerCount = " + value);
89  
  }
90  
}
91  
92  
sclass PublicChatChannel extends Channel<O> {
93  
  L<S> msgs = synchroLinkedList();
94  
  
95  
  void handleLine(S line, SocketChannel socket) {
96  
    postMessage(unquote(line));
97  
  }
98  
  
99  
  void postMessage(S msg) {
100  
    msgs.add(msg);
101  
  }
102  
  
103  
  void update {
104  
    S msg;
105  
    while ((msg = syncPopFirst(msgs)) != null) {
106  
      S quoted = quote(msg);
107  
      print("Chat send: " + shorten(quoted, 20));
108  
      quoted = "chat:" + quoted;
109  
      for (SocketChannel s : sockets)
110  
        sendLineToSocket(s, quoted);
111  
    }
112  
  }
113  
}
114  
115  
sclass PrivateChatChannel extends Channel<O> {
116  
  S prefix = "";
117  
  new MultiMap<S, SocketChannel> clients; // key: computer ID
118  
119  
  void initialMessage(SocketChannel socket) {
120  
    clients.put(computerID(socket), socket);
121  
  }
122  
  
123  
  void removeSocket(SocketChannel socket) {
124  
    super.removeSocket(socket);
125  
    clients.remove(computerID(socket), socket);
126  
  }
127  
  
128  
  void handleLine(S line, SocketChannel socket) {
129  
    pcall {
130  
      int i = indexOf(line, ' ');
131  
      S computerID = takeFirst(line, i);
132  
      while licensed {
133  
        SocketChannel socket2 = clients.getFirst(computerID);
134  
        if (socket2 == null) break;
135  
        try {
136  
          S sender = computerID(socket);
137  
          if (l(line) >= 10000)
138  
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
139  
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
140  
          break;
141  
        } catch e {
142  
          closeSocket(socket2);
143  
          clients.remove(computerID, socket2);
144  
        }
145  
      }
146  
    }
147  
  }
148  
}
149  
150  
sclass SnippetUpdatesChannel extends Channel<O> {
151  
  ManualTailFile tail;
152  
  
153  
  *() {
154  
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
155  
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
156  
  }
157  
  
158  
  void update {
159  
    tail.check();
160  
  }
161  
  
162  
  void sendLine(S line) {
163  
    print("SnippetUpdatesChannel send: " + line);
164  
    line = assertNoLineBreaks(line);
165  
    print("Sockets: " + l(sockets));
166  
    //S quoted = quote(line);
167  
    for (SocketChannel s : sockets)
168  
      sendLineToSocket(s, "snippetUpdates:" + line);
169  
  }
170  
}
171  
172  
static State state(SocketChannel socket) {
173  
  ret dataMapper.get(socket);
174  
}
175  
176  
sS computerID(SocketChannel socket) {
177  
  State state = state(socket);
178  
  ret state == null ? null : state.computerID;
179  
}
180  
181  
p {
182  
  cI = new ConnectedInstances;
183  
  cI.setAfterglowInterval(1000);
184  
  cI.connected2.onChange(rst);
185  
  cI.start();
186  
  
187  
  selector = Selector.open();
188  
	serverChannel = ServerSocketChannel.open();
189  
	serverChannel.configureBlocking(false);
190  
	
191  
  InetSocketAddress sockAddr = new InetSocketAddress(port);
192  
	serverChannel.socket().bind(sockAddr);
193  
	serverChannel.register(selector, SelectionKey.OP_ACCEPT);
194  
	
195  
	thread "Server Accept Loop" { acceptLoop(); }
196  
	
197  
	mainTimer = doEvery(updateInterval, r {
198  
	  for (SocketChannel socket : keysList(dataMapper)) {
199  
	    State state = state(socket);
200  
	    if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
201  
	      closeSocket(socket);
202  
	    }
203  
	  }
204  
	  for (Channel channel : values(channels)) pcall {
205  
	    lock channel.lock;
206  
      channel.update();
207  
    }
208  
	});
209  
}
210  
211  
svoid cleanMeUp() ctex {
212  
  if (serverChannel != null)
213  
    serverChannel.close();
214  
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
215  
    socket.close();
216  
  }
217  
}
218  
219  
svoid acceptLoop ctex {
220  
  while licensed {
221  
    // wait for events
222  
    selector.select();
223  
224  
	  Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
225  
	  while (keys.hasNext()) {
226  
      SelectionKey key = keys.next();
227  
	    keys.remove();
228  
      debug "have key";
229  
	    
230  
	    if (!key.isValid())
231  
	      continue with debug "key not valid";
232  
	      
233  
	    pcall {
234  
        if (key.isAcceptable()) {
235  
          debug "Key is acceptable";
236  
          acceptOnSocket(key);
237  
        } else if (key.isReadable()) {
238  
          debug "Key is readable";
239  
          readFromSocket(key);
240  
        } else if (key.isWritable()) {
241  
          debug "Key is writable";
242  
          writeToSocket(key);
243  
        } else
244  
          debug "Unknown key type";
245  
      }
246  
	  }
247  
	}
248  
}
249  
250  
//accept a connection made to this channel's socket
251  
static void acceptOnSocket(SelectionKey key) throws IOException {
252  
  ServerSocketChannel serverChannel = cast key.channel();
253  
  SocketChannel channel = serverChannel.accept();
254  
  channel.configureBlocking(false);
255  
  Socket actualSocket = channel.socket();
256  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
257  
  print("Connected to: " + remoteAddr);
258  
  
259  
  // register channel with selector for further IO
260  
	dataMapper.put(channel, new State);
261  
	channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
262  
}
263  
264  
svoid onSocketClose(SocketChannel socket) {
265  
  Socket actualSocket = socket.socket();
266  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
267  
  print("Connection closed by client: " + remoteAddr);
268  
  State state = state(socket);
269  
  if (state != null && state.computerID != null)
270  
    cI.lostConnection(state.computerID);
271  
  dataMapper.remove(socket);
272  
  removeFromAllChannels(socket);
273  
}
274  
275  
svoid closeSocket(SocketChannel socket) {
276  
  pcall { socket.close(); }
277  
  pcall { onSocketClose(socket); }
278  
}
279  
280  
static void readFromSocket(SelectionKey key) throws IOException {
281  
	final SocketChannel socket = cast key.channel();
282  
  ByteBuffer buffer = ByteBuffer.allocate(1024);
283  
  int numRead = socket.read(buffer);
284  
  if (numRead == -1) {
285  
    closeSocket(socket);
286  
    key.cancel();
287  
    ret;
288  
  }
289  
290  
  S s = new String(buffer.array(), 0, numRead);
291  
  //print("Got data from client: " + s);
292  
  final State state = state(socket);
293  
  if (state != null) {
294  
    state.lastMessage = sysNow();
295  
    state.lineBuffer.append(s, voidfunc(S line) {
296  
      //if (nempty(line)) print("Got line: " + line);
297  
      pcall { handleLine(line, socket, state); }
298  
    });
299  
    if (state.lineBuffer.size() > maxLineLength) {
300  
      print("Bad client (line too long");
301  
      closeSocket(socket);
302  
    }
303  
  }
304  
}
305  
306  
static void writeToSocket(SelectionKey key) throws IOException {
307  
	final SocketChannel socket = cast key.channel();
308  
	State state = state(socket);
309  
	lock state.lock;
310  
	if (empty(state.toSend)) ret;
311  
	byte[] data = popFirst(state.toSend);
312  
	print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
313  
	sendBytesToSocket(socket, data);
314  
}
315  
316  
svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
317  
  int length = l(bytes);
318  
  State state = state(socket);
319  
  lock state.lock;
320  
  if (nempty(state.toSend)) {
321  
    print("Postponing send to " + state.computerID + " because data already queued");
322  
    ret with state.toSend.add(bytes);
323  
  }
324  
  ByteBuffer buf = ByteBuffer.wrap(bytes);
325  
  int n = socket.write(buf);
326  
  if (n < length) {
327  
    byte[] remaining = subByteArray(bytes, n);
328  
    state.toSend.add(remaining);
329  
    print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
330  
  }
331  
}
332  
333  
svoid handleLine(S line, SocketChannel socket, State state) {
334  
  new Matches m;
335  
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
336  
  
337  
  if (jmatch("computerID=*", line, m)) {
338  
    if (state.computerID != null || !possibleComputerID($1)) ret;
339  
    print("Got computer ID: " + $1);
340  
    state.computerID = $1;
341  
    cI.gotConnection(state.computerID);
342  
    ret;
343  
  }
344  
  
345  
  if (jmatch("sub *", line, m)) {
346  
    Channel channel = getChannelForSocket(socket, $1);
347  
    if (channel == null)
348  
      sendLineToSocket(socket, "Channel " + $1 + " not found");
349  
    else
350  
      sendLineToSocket(socket, "Subbed " + $1);
351  
    ret;
352  
  }
353  
  
354  
  L<S> tok = javaTok(line);
355  
  if (eq(get(tok, 3), ":")) {
356  
    Channel channel = state.subbedChannels.get(get(tok, 1));
357  
    if (channel != null)
358  
      channel.handleLine(joinSubList(tok, 5), socket);
359  
  }
360  
  
361  
  // ignore unknown line
362  
}
363  
364  
html { ret "Use port " + port; }
365  
366  
svoid sendLineToSocket(SocketChannel socket, S s) ctex {
367  
  byte[] bytes = toUtf8(s + "\n");
368  
  sendBytesToSocket(socket, bytes);
369  
}
370  
371  
svoid removeFromAllChannels(SocketChannel socket) {
372  
  lock channelsLock;
373  
  for (S name, Channel c : cloneMap(channels)) { pcall {
374  
    c.removeSocket(socket);
375  
    if (empty(c.sockets)) {
376  
      print("Deleting channel: " + name);
377  
      channels.remove(name);
378  
    }
379  
  }}
380  
}
381  
382  
static Channel getChannelForSocket(SocketChannel socket, S name) {
383  
  lock channelsLock;
384  
  Channel c = channels.get(name);
385  
  if (c == null) {
386  
    c = makeChannel(name);
387  
    c.name = name;
388  
    if (c == null) null;
389  
    print("New channel: " + name);
390  
    channels.put(name, c);
391  
    lock c.lock;
392  
    c.update();
393  
  }
394  
  c.addSocket(socket);
395  
  state(socket).subbedChannels.put(name, c);
396  
  ret c;
397  
}
398  
399  
static Channel makeChannel(S name) {
400  
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
401  
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
402  
  if (eq(name, "chat")) ret new PublicChatChannel;
403  
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
404  
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
405  
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
406  
    
407  
  ret null with print("Unknown channel: " + name);
408  
}
409  
410  
svoid update {
411  
  print("OS Instances update");
412  
  if (empty(dataMapper)) ret;
413  
414  
  int value = cI.value();
415  
  //if (value == theNumber) ret;
416  
  theNumber = value;
417  
  print("OS Instances update done");
418  
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex  old transpilations   

Travelled to 7 computer(s): bhatertpkbcr, cfunsshuasjs, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1022732
Snippet name: OS Instances Connector [NIO Socket, Web Bot, backup]
Eternal ID of this version: #1022732/1
Text MD5: e4d0736aa0d77f37beb88f25294fd0cd
Author: stefan
Category: javax / a.i. / web
Type: JavaX module
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2019-04-03 16:08:05
Source code size: 11789 bytes / 418 lines
Pitched / IR pitched: No / No
Views / Downloads: 183 / 287
Referenced in: [show references]