Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

478
LINES

< > BotCompany Repo | #1016572 // OS Instances Connector [NIO Socket, Web Bot, LIVE]

JavaX module (desktop) [tags: butter use-pretranspiled] - homepage

Download Jar. Libraryless. Click here for Pure Java version (13750L/82K).

1  
!7
2  
3  
// TODO: we are iterating over all channels and all sockest
4  
// 100 times per second - this seems excessive
5  
static int updateInterval = 10;
6  
static long timeout = 30000+10000; // idle time + latency
7  
static int port = 6000;
8  
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
9  
10  
import java.nio.*;
11  
import java.nio.ByteBuffer;
12  
import java.nio.channels.*;
13  
14  
static ConnectedInstances cI;
15  
static ReliableSingleThread rst = new(r update);
16  
static int theNumber;
17  
static Map<SocketChannel, State> dataMapper = synchroMap();
18  
static ServerSocketChannel serverChannel;
19  
static Selector selector;
20  
static Map<S, Channel> channels = synchroMap();
21  
static Lock channelsLock = lock();
22  
static java.util.Timer mainTimer;
23  
sbool debug;
24  
25  
// state of a client connection
26  
srecord noeq State(SocketChannel socket) {
27  
  S socketID = aGlobalID(), computerID;
28  
  int countSeen = -1;
29  
  new LineBuffer lineBuffer;
30  
  long lastMessage = sysNow();
31  
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
32  
  Lock lock = lock();
33  
  
34  
  // TODO: clear eventually to avoid stalling attacks
35  
  L<byte[]> toSend = synchroLinkedList();
36  
}
37  
38  
// a virtual "channel" that clients can "subscribe" to
39  
// (what that means differs between channels)
40  
// Each channel is identified by a unique name
41  
// A is a custom data type for each subscriber
42  
sclass Channel<A> {
43  
  S name;
44  
  Set<SocketChannel> sockets = synchroSet();
45  
  Map<SocketChannel, A> dataMapper = synchroMap();
46  
  Lock lock = lock();
47  
  
48  
  event subCountChanged;
49  
  
50  
  // add a subscriber
51  
  void addSocket(SocketChannel socket) {
52  
    lock lock;
53  
    sockets.add(socket);
54  
    subCountChanged();
55  
    initialMessage(socket);
56  
  }
57  
  
58  
  // remove a subscriber
59  
  void removeSocket(SocketChannel socket) {
60  
    lock lock;
61  
    sockets.remove(socket);
62  
    subCountChanged();
63  
  }
64  
  
65  
  int subCount() { ret l(sockets); }
66  
67  
  void sendToSubscribers(S line) {  
68  
    for (SocketChannel socket : cloneList(sockets))
69  
      sendLineToSocket(socket, line);
70  
  }
71  
  
72  
  void update() {}
73  
  void initialMessage(SocketChannel socket) {}
74  
  void handleLine(S line, SocketChannel socket) {}
75  
}
76  
77  
// channel listing the currently logged in computer IDs
78  
// TODO: optimize the protocol to send diffs only
79  
sclass ComputerIDsChannel extends Channel<O> {
80  
  L<S> value;
81  
  
82  
  void initialMessage(SocketChannel socket) {
83  
    sendLineToSocket(socket, stateMsg());
84  
  }
85  
  
86  
  void update() {
87  
    if (set_trueIfChanged(this, value := asList(cI.set())))
88  
      sendToSubscribers(stateMsg());
89  
  }
90  
  
91  
  S stateMsg() {
92  
    ret "ComputerIDs = " + struct(value);
93  
  }
94  
}
95  
96  
// channel listing the current count of logged in computers
97  
sclass ComputerCountChannel extends Channel<O> {
98  
  int value;
99  
  
100  
  void initialMessage(SocketChannel socket) {
101  
    sendLineToSocket(socket, stateMsg());
102  
  }
103  
  
104  
  void update() {
105  
    if (set_trueIfChanged(this, value := cI.value()))
106  
      sendToSubscribers(stateMsg());
107  
  }
108  
  
109  
  S stateMsg() {
110  
    ret "ComputerCount = " + value;
111  
  }
112  
}
113  
114  
// a public chat for broadcast messages between subscribers
115  
sclass PublicChatChannel extends Channel<O> {
116  
  L<S> msgs = synchroLinkedList();
117  
  
118  
  void handleLine(S line, SocketChannel socket) {
119  
    postMessage(unquote(line));
120  
  }
121  
  
122  
  void postMessage(S msg) {
123  
    msgs.add(msg);
124  
  }
125  
  
126  
  void update {
127  
    S msg;
128  
    while ((msg = syncPopFirst(msgs)) != null) {
129  
      S quoted = quote(msg);
130  
      //print("Chat send: " + shorten(quoted, 20));
131  
      quoted = name + ":" + quoted;
132  
      for (SocketChannel s : sockets)
133  
        sendLineToSocket(s, quoted);
134  
    }
135  
  }
136  
}
137  
138  
// a chat for DMs (messages from one computer to another)
139  
// (is it private though? probably not as we don't check if
140  
// the computer IDs sent to the server are correct)
141  
sclass PrivateChatChannel extends Channel<O> {
142  
  S prefix = "";
143  
  new MultiMap<S, SocketChannel> clients; // key: computer ID
144  
145  
  void initialMessage(SocketChannel socket) {
146  
    clients.put(computerID(socket), socket);
147  
  }
148  
  
149  
  void removeSocket(SocketChannel socket) {
150  
    super.removeSocket(socket);
151  
    clients.remove(computerID(socket), socket);
152  
  }
153  
  
154  
  void handleLine(S line, SocketChannel socket) {
155  
    pcall {
156  
      int i = indexOf(line, ' ');
157  
      S computerID = takeFirst(line, i);
158  
      while licensed {
159  
        SocketChannel socket2 = clients.getFirst(computerID);
160  
        if (socket2 == null) break;
161  
        try {
162  
          S sender = computerID(socket);
163  
          if (l(line) >= 10000)
164  
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
165  
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
166  
          break;
167  
        } catch e {
168  
          closeSocket(socket2);
169  
          clients.remove(computerID, socket2);
170  
        }
171  
      }
172  
    }
173  
  }
174  
}
175  
176  
// channel distributing snippet updates (edits, compiles etc)
177  
sclass SnippetUpdatesChannel extends Channel<O> {
178  
  ManualTailFile tail;
179  
  
180  
  *() {
181  
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
182  
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
183  
  }
184  
  
185  
  void update {
186  
    tail.check();
187  
  }
188  
  
189  
  void sendLine(S line) {
190  
    print("SnippetUpdatesChannel send: " + line);
191  
    line = assertNoLineBreaks(line);
192  
    print("Sockets: " + l(sockets));
193  
    //S quoted = quote(line);
194  
    for (SocketChannel s : sockets)
195  
      sendLineToSocket(s, "snippetUpdates:" + line);
196  
  }
197  
}
198  
199  
// channel broadcast its current subscriber count
200  
sclass PresenceCountingChannel extends Channel<Void> {
201  
  *() {
202  
    onSubCountChanged(-> sendToSubscribers(stateMsg()));
203  
  }
204  
  
205  
  void initialMessage(SocketChannel socket) {
206  
    sendLineToSocket(socket, stateMsg());
207  
  }
208  
  
209  
  S stateMsg() {
210  
    ret name + ":" + subCount();
211  
  }
212  
}
213  
214  
static State state(SocketChannel socket) {
215  
  ret dataMapper.get(socket);
216  
}
217  
218  
sS computerID(SocketChannel socket) {
219  
  State state = state(socket);
220  
  ret state == null ? null : state.computerID;
221  
}
222  
223  
p {
224  
  cI = new ConnectedInstances;
225  
  cI.setAfterglowInterval(1000);
226  
  cI.connected2.onChange(rst);
227  
  cI.start();
228  
  
229  
  openServerSocket();
230  
  
231  
  // restart socket every hour (and once now for testing - XX)
232  
  /*doEvery(10.0, 60*60.0, r {
233  
    pcall { cleanMeUp_socket(); }
234  
    openServerSocket();
235  
  });*/
236  
}
237  
238  
svoid openServerSocket ctex {
239  
  print("Opening server socket");
240  
  selector = Selector.open();
241  
  serverChannel = ServerSocketChannel.open();
242  
  serverChannel.configureBlocking(false);
243  
  
244  
  InetSocketAddress sockAddr = new InetSocketAddress(port);
245  
  serverChannel.socket().bind(sockAddr);
246  
  print("Server socket bound");
247  
  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
248  
  
249  
  thread "Server Accept Loop" { acceptLoop(); }
250  
  
251  
  mainTimer = doEvery(updateInterval, r {
252  
    for (SocketChannel socket : keysList(dataMapper)) {
253  
      State state = state(socket);
254  
      if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
255  
        closeSocket(socket);
256  
      }
257  
    }
258  
    for (Channel channel : values(channels)) pcall {
259  
      lock channel.lock;
260  
      channel.update();
261  
    }
262  
  });
263  
}
264  
265  
svoid cleanMeUp_socket() ctex {
266  
  if (serverChannel != null)
267  
    serverChannel.close();
268  
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
269  
    socket.close();
270  
  }
271  
}
272  
273  
svoid acceptLoop ctex {
274  
  while licensed {
275  
    // wait for events
276  
    selector.select();
277  
278  
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
279  
    while (keys.hasNext()) {
280  
      SelectionKey key = keys.next();
281  
      keys.remove();
282  
      debug "have key";
283  
      
284  
      if (!key.isValid())
285  
        continue with debug "key not valid";
286  
        
287  
      pcall {
288  
        if (key.isAcceptable()) {
289  
          debug "Key is acceptable";
290  
          acceptOnSocket(key);
291  
        } else if (key.isReadable()) {
292  
          debug "Key is readable";
293  
          readFromSocket(key);
294  
        } else if (key.isWritable()) {
295  
          debug "Key is writable";
296  
          writeToSocket(key);
297  
        } else
298  
          debug "Unknown key type";
299  
      }
300  
    }
301  
  }
302  
}
303  
304  
//accept a connection made to this channel's socket
305  
static void acceptOnSocket(SelectionKey key) throws IOException {
306  
  ServerSocketChannel serverChannel = cast key.channel();
307  
  SocketChannel channel = serverChannel.accept();
308  
  channel.configureBlocking(false);
309  
  Socket actualSocket = channel.socket();
310  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
311  
  print("Connected to: " + remoteAddr);
312  
  
313  
  // register channel with selector for further IO
314  
  dataMapper.put(channel, new State(channel));
315  
  channel.register(selector, SelectionKey.OP_READ);
316  
}
317  
318  
svoid onSocketClose(SocketChannel socket) {
319  
  Socket actualSocket = socket.socket();
320  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
321  
  print("Connection closed by client: " + remoteAddr);
322  
  State state = state(socket);
323  
  if (state != null && state.computerID != null)
324  
    cI.lostConnection(state.computerID);
325  
  dataMapper.remove(socket);
326  
  removeFromAllChannels(socket);
327  
}
328  
329  
svoid closeSocket(SocketChannel socket) {
330  
  pcall { socket.close(); }
331  
  pcall { onSocketClose(socket); }
332  
}
333  
334  
static void readFromSocket(SelectionKey key) throws IOException {
335  
  final SocketChannel socket = cast key.channel();
336  
  ByteBuffer buffer = ByteBuffer.allocate(1024);
337  
  int numRead = socket.read(buffer);
338  
  if (numRead == -1) {
339  
    closeSocket(socket);
340  
    key.cancel();
341  
    ret;
342  
  }
343  
344  
  S s = new String(buffer.array(), 0, numRead);
345  
  //print("Got data from client: " + s);
346  
  final State state = state(socket);
347  
  if (state != null) {
348  
    state.lastMessage = sysNow();
349  
    state.lineBuffer.append(s, voidfunc(S line) {
350  
      //if (nempty(line)) print("Got line: " + line);
351  
      pcall { handleLine(line, socket, state); }
352  
    });
353  
    if (state.lineBuffer.size() > maxLineLength) {
354  
      print("Bad client (line too long");
355  
      closeSocket(socket);
356  
    }
357  
  }
358  
}
359  
360  
static void writeToSocket(SelectionKey key) throws IOException {
361  
  final SocketChannel socket = cast key.channel();
362  
  socket.register(selector, SelectionKey.OP_READ);
363  
  State state = state(socket);
364  
  lock state.lock;
365  
  if (empty(state.toSend)) ret;
366  
  byte[] data = popFirst(state.toSend);
367  
  print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
368  
  sendBytesToSocket(socket, data);
369  
}
370  
371  
svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
372  
  int length = l(bytes);
373  
  State state = state(socket);
374  
  lock state.lock;
375  
  if (nempty(state.toSend)) {
376  
    print("Postponing send to " + state.computerID + " because data already queued");
377  
    ret with state.toSend.add(bytes);
378  
  }
379  
  ByteBuffer buf = ByteBuffer.wrap(bytes);
380  
  int n = socket.write(buf);
381  
  if (n < length) {
382  
    byte[] remaining = subByteArray(bytes, n);
383  
    state.toSend.add(remaining);
384  
    print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
385  
    socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
386  
  }
387  
}
388  
389  
svoid handleLine(S line, SocketChannel socket, State state) {
390  
  new Matches m;
391  
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
392  
  
393  
  if (jmatch("computerID=*", line, m)) {
394  
    if (state.computerID != null || !possibleComputerID($1)) ret;
395  
    print("Got computer ID: " + $1);
396  
    state.computerID = $1;
397  
    cI.gotConnection(state.computerID);
398  
    ret;
399  
  }
400  
  
401  
  if (jmatch("sub *", line, m)) {
402  
    Channel channel = getChannelForSocket(socket, $1);
403  
    if (channel == null)
404  
      sendLineToSocket(socket, "Channel " + $1 + " not found");
405  
    else
406  
      sendLineToSocket(socket, "Subbed " + $1);
407  
    ret;
408  
  }
409  
  
410  
  L<S> tok = javaTok(line);
411  
  if (eq(get(tok, 3), ":")) {
412  
    Channel channel = state.subbedChannels.get(get(tok, 1));
413  
    if (channel != null)
414  
      channel.handleLine(joinSubList(tok, 5), socket);
415  
  }
416  
  
417  
  // ignore unknown line
418  
}
419  
420  
html { ret "Use port " + port; }
421  
422  
svoid sendLineToSocket(SocketChannel socket, S s) ctex {
423  
  byte[] bytes = toUtf8(s + "\n");
424  
  sendBytesToSocket(socket, bytes);
425  
}
426  
427  
svoid removeFromAllChannels(SocketChannel socket) {
428  
  lock channelsLock;
429  
  for (S name, Channel c : cloneMap(channels)) { pcall {
430  
    c.removeSocket(socket);
431  
    if (empty(c.sockets)) {
432  
      print("Deleting channel: " + name);
433  
      channels.remove(name);
434  
    }
435  
  }}
436  
}
437  
438  
static Channel getChannelForSocket(SocketChannel socket, S name) {
439  
  lock channelsLock;
440  
  Channel c = channels.get(name);
441  
  if (c == null) {
442  
    c = makeChannel(name);
443  
    c.name = name;
444  
    if (c == null) null;
445  
    print("New channel: " + name);
446  
    channels.put(name, c);
447  
    lock c.lock;
448  
    c.update();
449  
  }
450  
  c.addSocket(socket);
451  
  state(socket).subbedChannels.put(name, c);
452  
  ret c;
453  
}
454  
455  
static Channel makeChannel(S name) {
456  
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
457  
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
458  
  if (eq(name, "chat")) ret new PublicChatChannel;
459  
  if (eq(name, "machineChat")) ret new PublicChatChannel;
460  
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
461  
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
462  
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
463  
  
464  
  if (eq(name, "gazelleChat")) ret new PublicChatChannel;
465  
  if (eq(name, "gazellePresence")) ret new PresenceCountingChannel;
466  
    
467  
  ret null with print("Unknown channel: " + name);
468  
}
469  
470  
svoid update {
471  
  print("OS Instances update");
472  
  if (empty(dataMapper)) ret;
473  
474  
  int value = cI.value();
475  
  //if (value == theNumber) ret;
476  
  theNumber = value;
477  
  print("OS Instances update done");
478  
}

Author comment

Began life as a copy of #1016571

download  show line numbers  debug dex  old transpilations   

Travelled to 14 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, irmadwmeruwu, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1016572
Snippet name: OS Instances Connector [NIO Socket, Web Bot, LIVE]
Eternal ID of this version: #1016572/116
Text MD5: d29f1d064be1c6dcad2f16d4622285fa
Transpilation MD5: 75b0b7ca6f89db2a0143e25a6af62f38
Author: stefan
Category: javax / a.i. / web
Type: JavaX module (desktop)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2022-04-08 18:47:05
Source code size: 13857 bytes / 478 lines
Pitched / IR pitched: No / No
Views / Downloads: 841 / 2993
Version history: 115 change(s)
Referenced in: [show references]