Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

433
LINES

< > BotCompany Repo | #1035030 // OS Instances Connector [backup]

JavaX module (desktop)

Download Jar.

1  
!7
2  
3  
static int updateInterval = 10;
4  
static long timeout = 30000+10000; // idle time + latency
5  
static int port = 6000;
6  
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
7  
8  
import java.nio.*;
9  
import java.nio.channels.*;
10  
11  
static ConnectedInstances cI;
12  
static ReliableSingleThread rst = new(r update);
13  
static int theNumber;
14  
static Map<SocketChannel, State> dataMapper = synchroMap();
15  
static ServerSocketChannel serverChannel;
16  
static Selector selector;
17  
static Map<S, Channel> channels = synchroMap();
18  
static Lock channelsLock = lock();
19  
static java.util.Timer mainTimer;
20  
sbool debug;
21  
22  
sclass State {
23  
  S socketID = aGlobalID(), computerID;
24  
  int countSeen = -1;
25  
  new LineBuffer lineBuffer;
26  
  long lastMessage = sysNow();
27  
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
28  
  Lock lock = lock();
29  
  
30  
  // TODO: clear eventually to avoid stalling attacks
31  
  L<byte[]> toSend = synchroLinkedList();
32  
}
33  
34  
sclass Channel<A> {
35  
  S name;
36  
  Set<SocketChannel> sockets = synchroSet();
37  
  Map<SocketChannel, A> dataMapper = synchroMap();
38  
  Lock lock = lock();
39  
  
40  
  void addSocket(SocketChannel socket) {
41  
    lock lock;
42  
    sockets.add(socket);
43  
    initialMessage(socket);
44  
  }
45  
  
46  
  void removeSocket(SocketChannel socket) {
47  
    lock lock;
48  
    sockets.remove(socket);
49  
  }
50  
  
51  
  void update() {}
52  
  void initialMessage(SocketChannel socket) {}
53  
  void handleLine(S line, SocketChannel socket) {}
54  
}
55  
56  
sclass ComputerIDsChannel extends Channel<O> {
57  
  L<S> value;
58  
  
59  
  void initialMessage(SocketChannel socket) {
60  
    lock lock;
61  
    sendTo(socket);
62  
  }
63  
  
64  
  void update() {
65  
    if (set_trueIfChanged(this, value := asList(cI.set())))
66  
      for (SocketChannel s : sockets) sendTo(s);
67  
  }
68  
  
69  
  void sendTo(SocketChannel socket) {
70  
    sendLineToSocket(socket, "ComputerIDs = " + struct(value));
71  
  }
72  
}
73  
74  
sclass ComputerCountChannel extends Channel<O> {
75  
  int value;
76  
  
77  
  void initialMessage(SocketChannel socket) {
78  
    lock lock;
79  
    sendTo(socket);
80  
  }
81  
  
82  
  void update() {
83  
    if (set_trueIfChanged(this, value := cI.value()))
84  
      for (SocketChannel s : sockets) sendTo(s);
85  
  }
86  
  
87  
  void sendTo(SocketChannel socket) {
88  
    sendLineToSocket(socket, "ComputerCount = " + value);
89  
  }
90  
}
91  
92  
sclass PublicChatChannel extends Channel<O> {
93  
  L<S> msgs = synchroLinkedList();
94  
  
95  
  void handleLine(S line, SocketChannel socket) {
96  
    postMessage(unquote(line));
97  
  }
98  
  
99  
  void postMessage(S msg) {
100  
    msgs.add(msg);
101  
  }
102  
  
103  
  void update {
104  
    S msg;
105  
    while ((msg = syncPopFirst(msgs)) != null) {
106  
      S quoted = quote(msg);
107  
      //print("Chat send: " + shorten(quoted, 20));
108  
      quoted = name + ":" + quoted;
109  
      for (SocketChannel s : sockets)
110  
        sendLineToSocket(s, quoted);
111  
    }
112  
  }
113  
}
114  
115  
sclass PrivateChatChannel extends Channel<O> {
116  
  S prefix = "";
117  
  new MultiMap<S, SocketChannel> clients; // key: computer ID
118  
119  
  void initialMessage(SocketChannel socket) {
120  
    clients.put(computerID(socket), socket);
121  
  }
122  
  
123  
  void removeSocket(SocketChannel socket) {
124  
    super.removeSocket(socket);
125  
    clients.remove(computerID(socket), socket);
126  
  }
127  
  
128  
  void handleLine(S line, SocketChannel socket) {
129  
    pcall {
130  
      int i = indexOf(line, ' ');
131  
      S computerID = takeFirst(line, i);
132  
      while licensed {
133  
        SocketChannel socket2 = clients.getFirst(computerID);
134  
        if (socket2 == null) break;
135  
        try {
136  
          S sender = computerID(socket);
137  
          if (l(line) >= 10000)
138  
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
139  
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
140  
          break;
141  
        } catch e {
142  
          closeSocket(socket2);
143  
          clients.remove(computerID, socket2);
144  
        }
145  
      }
146  
    }
147  
  }
148  
}
149  
150  
sclass SnippetUpdatesChannel extends Channel<O> {
151  
  ManualTailFile tail;
152  
  
153  
  *() {
154  
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
155  
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
156  
  }
157  
  
158  
  void update {
159  
    tail.check();
160  
  }
161  
  
162  
  void sendLine(S line) {
163  
    print("SnippetUpdatesChannel send: " + line);
164  
    line = assertNoLineBreaks(line);
165  
    print("Sockets: " + l(sockets));
166  
    //S quoted = quote(line);
167  
    for (SocketChannel s : sockets)
168  
      sendLineToSocket(s, "snippetUpdates:" + line);
169  
  }
170  
}
171  
172  
static State state(SocketChannel socket) {
173  
  ret dataMapper.get(socket);
174  
}
175  
176  
sS computerID(SocketChannel socket) {
177  
  State state = state(socket);
178  
  ret state == null ? null : state.computerID;
179  
}
180  
181  
p {
182  
  cI = new ConnectedInstances;
183  
  cI.setAfterglowInterval(1000);
184  
  cI.connected2.onChange(rst);
185  
  cI.start();
186  
  
187  
  openServerSocket();
188  
  
189  
  // restart socket every hour (and once now for testing - XX)
190  
  /*doEvery(10.0, 60*60.0, r {
191  
    pcall { cleanMeUp_socket(); }
192  
    openServerSocket();
193  
  });*/
194  
}
195  
196  
svoid openServerSocket ctex {
197  
  print("Opening server socket");
198  
  selector = Selector.open();
199  
  serverChannel = ServerSocketChannel.open();
200  
  serverChannel.configureBlocking(false);
201  
  
202  
  InetSocketAddress sockAddr = new InetSocketAddress(port);
203  
  serverChannel.socket().bind(sockAddr);
204  
  print("Server socket bound");
205  
  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
206  
  
207  
  thread "Server Accept Loop" { acceptLoop(); }
208  
  
209  
  mainTimer = doEvery(updateInterval, r {
210  
    for (SocketChannel socket : keysList(dataMapper)) {
211  
      State state = state(socket);
212  
      if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
213  
        closeSocket(socket);
214  
      }
215  
    }
216  
    for (Channel channel : values(channels)) pcall {
217  
      lock channel.lock;
218  
      channel.update();
219  
    }
220  
  });
221  
}
222  
223  
svoid cleanMeUp_socket() ctex {
224  
  if (serverChannel != null)
225  
    serverChannel.close();
226  
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
227  
    socket.close();
228  
  }
229  
}
230  
231  
svoid acceptLoop ctex {
232  
  while licensed {
233  
    // wait for events
234  
    selector.select();
235  
236  
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
237  
    while (keys.hasNext()) {
238  
      SelectionKey key = keys.next();
239  
      keys.remove();
240  
      debug "have key";
241  
      
242  
      if (!key.isValid())
243  
        continue with debug "key not valid";
244  
        
245  
      pcall {
246  
        if (key.isAcceptable()) {
247  
          debug "Key is acceptable";
248  
          acceptOnSocket(key);
249  
        } else if (key.isReadable()) {
250  
          debug "Key is readable";
251  
          readFromSocket(key);
252  
        } else if (key.isWritable()) {
253  
          debug "Key is writable";
254  
          writeToSocket(key);
255  
        } else
256  
          debug "Unknown key type";
257  
      }
258  
    }
259  
  }
260  
}
261  
262  
//accept a connection made to this channel's socket
263  
static void acceptOnSocket(SelectionKey key) throws IOException {
264  
  ServerSocketChannel serverChannel = cast key.channel();
265  
  SocketChannel channel = serverChannel.accept();
266  
  channel.configureBlocking(false);
267  
  Socket actualSocket = channel.socket();
268  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
269  
  print("Connected to: " + remoteAddr);
270  
  
271  
  // register channel with selector for further IO
272  
  dataMapper.put(channel, new State);
273  
  channel.register(selector, SelectionKey.OP_READ);
274  
}
275  
276  
svoid onSocketClose(SocketChannel socket) {
277  
  Socket actualSocket = socket.socket();
278  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
279  
  print("Connection closed by client: " + remoteAddr);
280  
  State state = state(socket);
281  
  if (state != null && state.computerID != null)
282  
    cI.lostConnection(state.computerID);
283  
  dataMapper.remove(socket);
284  
  removeFromAllChannels(socket);
285  
}
286  
287  
svoid closeSocket(SocketChannel socket) {
288  
  pcall { socket.close(); }
289  
  pcall { onSocketClose(socket); }
290  
}
291  
292  
static void readFromSocket(SelectionKey key) throws IOException {
293  
  final SocketChannel socket = cast key.channel();
294  
  ByteBuffer buffer = ByteBuffer.allocate(1024);
295  
  int numRead = socket.read(buffer);
296  
  if (numRead == -1) {
297  
    closeSocket(socket);
298  
    key.cancel();
299  
    ret;
300  
  }
301  
302  
  S s = new String(buffer.array(), 0, numRead);
303  
  //print("Got data from client: " + s);
304  
  final State state = state(socket);
305  
  if (state != null) {
306  
    state.lastMessage = sysNow();
307  
    state.lineBuffer.append(s, voidfunc(S line) {
308  
      //if (nempty(line)) print("Got line: " + line);
309  
      pcall { handleLine(line, socket, state); }
310  
    });
311  
    if (state.lineBuffer.size() > maxLineLength) {
312  
      print("Bad client (line too long");
313  
      closeSocket(socket);
314  
    }
315  
  }
316  
}
317  
318  
static void writeToSocket(SelectionKey key) throws IOException {
319  
  final SocketChannel socket = cast key.channel();
320  
  socket.register(selector, SelectionKey.OP_READ);
321  
  State state = state(socket);
322  
  lock state.lock;
323  
  if (empty(state.toSend)) ret;
324  
  byte[] data = popFirst(state.toSend);
325  
  print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
326  
  sendBytesToSocket(socket, data);
327  
}
328  
329  
svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
330  
  int length = l(bytes);
331  
  State state = state(socket);
332  
  lock state.lock;
333  
  if (nempty(state.toSend)) {
334  
    print("Postponing send to " + state.computerID + " because data already queued");
335  
    ret with state.toSend.add(bytes);
336  
  }
337  
  ByteBuffer buf = ByteBuffer.wrap(bytes);
338  
  int n = socket.write(buf);
339  
  if (n < length) {
340  
    byte[] remaining = subByteArray(bytes, n);
341  
    state.toSend.add(remaining);
342  
    print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
343  
    socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
344  
  }
345  
}
346  
347  
svoid handleLine(S line, SocketChannel socket, State state) {
348  
  new Matches m;
349  
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
350  
  
351  
  if (jmatch("computerID=*", line, m)) {
352  
    if (state.computerID != null || !possibleComputerID($1)) ret;
353  
    print("Got computer ID: " + $1);
354  
    state.computerID = $1;
355  
    cI.gotConnection(state.computerID);
356  
    ret;
357  
  }
358  
  
359  
  if (jmatch("sub *", line, m)) {
360  
    Channel channel = getChannelForSocket(socket, $1);
361  
    if (channel == null)
362  
      sendLineToSocket(socket, "Channel " + $1 + " not found");
363  
    else
364  
      sendLineToSocket(socket, "Subbed " + $1);
365  
    ret;
366  
  }
367  
  
368  
  L<S> tok = javaTok(line);
369  
  if (eq(get(tok, 3), ":")) {
370  
    Channel channel = state.subbedChannels.get(get(tok, 1));
371  
    if (channel != null)
372  
      channel.handleLine(joinSubList(tok, 5), socket);
373  
  }
374  
  
375  
  // ignore unknown line
376  
}
377  
378  
html { ret "Use port " + port; }
379  
380  
svoid sendLineToSocket(SocketChannel socket, S s) ctex {
381  
  byte[] bytes = toUtf8(s + "\n");
382  
  sendBytesToSocket(socket, bytes);
383  
}
384  
385  
svoid removeFromAllChannels(SocketChannel socket) {
386  
  lock channelsLock;
387  
  for (S name, Channel c : cloneMap(channels)) { pcall {
388  
    c.removeSocket(socket);
389  
    if (empty(c.sockets)) {
390  
      print("Deleting channel: " + name);
391  
      channels.remove(name);
392  
    }
393  
  }}
394  
}
395  
396  
static Channel getChannelForSocket(SocketChannel socket, S name) {
397  
  lock channelsLock;
398  
  Channel c = channels.get(name);
399  
  if (c == null) {
400  
    c = makeChannel(name);
401  
    c.name = name;
402  
    if (c == null) null;
403  
    print("New channel: " + name);
404  
    channels.put(name, c);
405  
    lock c.lock;
406  
    c.update();
407  
  }
408  
  c.addSocket(socket);
409  
  state(socket).subbedChannels.put(name, c);
410  
  ret c;
411  
}
412  
413  
static Channel makeChannel(S name) {
414  
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
415  
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
416  
  if (eq(name, "chat")) ret new PublicChatChannel;
417  
  if (eq(name, "machineChat")) ret new PublicChatChannel;
418  
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
419  
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
420  
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
421  
    
422  
  ret null with print("Unknown channel: " + name);
423  
}
424  
425  
svoid update {
426  
  print("OS Instances update");
427  
  if (empty(dataMapper)) ret;
428  
429  
  int value = cI.value();
430  
  //if (value == theNumber) ret;
431  
  theNumber = value;
432  
  print("OS Instances update done");
433  
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex  old transpilations   

Travelled to 2 computer(s): bhatertpkbcr, mqqgnosmbjvj

No comments. add comment

Snippet ID: #1035030
Snippet name: OS Instances Connector [backup]
Eternal ID of this version: #1035030/1
Text MD5: e401d37e10f5a35b068846141bd97b54
Author: stefan
Category: javax / a.i. / web
Type: JavaX module (desktop)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2022-03-24 13:24:56
Source code size: 12300 bytes / 433 lines
Pitched / IR pitched: No / No
Views / Downloads: 77 / 172
Referenced in: [show references]