Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

445
LINES

< > BotCompany Repo | #1027046 // OSInstancesConnector as class [NIO Socket]

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (9434L/64K).

1  
sclass OSInstancesConnector {
2  
  int updateInterval = 10;
3  
  long timeout = 30000+10000; // idle time + latency
4  
  int port = 6000;
5  
  int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
6  
  
7  
  import java.nio.*;
8  
  import java.nio.channels.*;
9  
  
10  
  ConnectedInstances cI;
11  
  ReliableSingleThread rst = new(r update);
12  
  int theNumber;
13  
  Map<SocketChannel, State> dataMapper = synchroMap();
14  
  ServerSocketChannel serverChannel;
15  
  Selector selector;
16  
  Map<S, Channel> channels = synchroMap();
17  
  Lock channelsLock = lock();
18  
  java.util.Timer mainTimer;
19  
  bool debug;
20  
  L onLineReceived = syncList(); // L<vf(line, socket, state)>
21  
  
22  
  class State {
23  
    S socketID = aGlobalID(), computerID;
24  
    int countSeen = -1;
25  
    new LineBuffer lineBuffer;
26  
    long lastMessage = sysNow();
27  
    Map<S, Channel> subbedChannels = synchroLinkedHashMap();
28  
    Lock lock = lock();
29  
    
30  
    // TODO: clear eventually to avoid stalling attacks
31  
    L<byte[]> toSend = synchroLinkedList();
32  
  }
33  
  
34  
  class Channel<A> {
35  
    S name;
36  
    Set<SocketChannel> sockets = synchroSet();
37  
    Map<SocketChannel, A> dataMapper = synchroMap();
38  
    Lock lock = lock();
39  
    
40  
    void addSocket(SocketChannel socket) {
41  
      lock lock;
42  
      sockets.add(socket);
43  
      initialMessage(socket);
44  
    }
45  
    
46  
    void removeSocket(SocketChannel socket) {
47  
      lock lock;
48  
      sockets.remove(socket);
49  
    }
50  
    
51  
    void update() {}
52  
    void initialMessage(SocketChannel socket) {}
53  
    void handleLine(S line, SocketChannel socket) {}
54  
  }
55  
  
56  
  class ComputerIDsChannel extends Channel<O> {
57  
    L<S> value;
58  
    
59  
    void initialMessage(SocketChannel socket) {
60  
      lock lock;
61  
      sendTo(socket);
62  
    }
63  
    
64  
    void update() {
65  
      if (set_trueIfChanged(this, value := asList(cI.set())))
66  
        for (SocketChannel s : sockets) sendTo(s);
67  
    }
68  
    
69  
    void sendTo(SocketChannel socket) {
70  
      sendLineToSocket(socket, "ComputerIDs = " + struct(value));
71  
    }
72  
  }
73  
  
74  
  class ComputerCountChannel extends Channel<O> {
75  
    int value;
76  
    
77  
    void initialMessage(SocketChannel socket) {
78  
      lock lock;
79  
      sendTo(socket);
80  
    }
81  
    
82  
    void update() {
83  
      if (set_trueIfChanged(this, value := cI.value()))
84  
        for (SocketChannel s : sockets) sendTo(s);
85  
    }
86  
    
87  
    void sendTo(SocketChannel socket) {
88  
      sendLineToSocket(socket, "ComputerCount = " + value);
89  
    }
90  
  }
91  
  
92  
  class PublicChatChannel extends Channel<O> {
93  
    L<S> msgs = synchroLinkedList();
94  
    
95  
    void handleLine(S line, SocketChannel socket) {
96  
      postMessage(unquote(line));
97  
    }
98  
    
99  
    void postMessage(S msg) {
100  
      msgs.add(msg);
101  
    }
102  
    
103  
    void update {
104  
      S msg;
105  
      while ((msg = syncPopFirst(msgs)) != null) {
106  
        S quoted = quote(msg);
107  
        //print("Chat send: " + shorten(quoted, 20));
108  
        quoted = name + ":" + quoted;
109  
        for (SocketChannel s : sockets)
110  
          sendLineToSocket(s, quoted);
111  
      }
112  
    }
113  
  }
114  
  
115  
  class PrivateChatChannel extends Channel<O> {
116  
    S prefix = "";
117  
    new MultiMap<S, SocketChannel> clients; // key: computer ID
118  
  
119  
    void initialMessage(SocketChannel socket) {
120  
      clients.put(computerID(socket), socket);
121  
    }
122  
    
123  
    void removeSocket(SocketChannel socket) {
124  
      super.removeSocket(socket);
125  
      clients.remove(computerID(socket), socket);
126  
    }
127  
    
128  
    void handleLine(S line, SocketChannel socket) {
129  
      pcall {
130  
        int i = indexOf(line, ' ');
131  
        S computerID = takeFirst(line, i);
132  
        while licensed {
133  
          SocketChannel socket2 = clients.getFirst(computerID);
134  
          if (socket2 == null) break;
135  
          try {
136  
            S sender = computerID(socket);
137  
            if (l(line) >= 10000)
138  
              print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
139  
            sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
140  
            break;
141  
          } catch e {
142  
            closeSocket(socket2);
143  
            clients.remove(computerID, socket2);
144  
          }
145  
        }
146  
      }
147  
    }
148  
  }
149  
  
150  
  class SnippetUpdatesChannel extends Channel<O> {
151  
    ManualTailFile tail;
152  
    
153  
    *() {
154  
      tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
155  
        vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
156  
    }
157  
    
158  
    void update {
159  
      tail.check();
160  
    }
161  
    
162  
    void sendLine(S line) {
163  
      print("SnippetUpdatesChannel send: " + line);
164  
      line = assertNoLineBreaks(line);
165  
      print("Sockets: " + l(sockets));
166  
      //S quoted = quote(line);
167  
      for (SocketChannel s : sockets)
168  
        sendLineToSocket(s, "snippetUpdates:" + line);
169  
    }
170  
  }
171  
  
172  
  State state(SocketChannel socket) {
173  
    ret dataMapper.get(socket);
174  
  }
175  
  
176  
  S computerID(SocketChannel socket) {
177  
    State state = state(socket);
178  
    ret state == null ? null : state.computerID;
179  
  }
180  
  
181  
  *() {}
182  
  *(int *port) {}
183  
  
184  
  void start {
185  
    cI = new ConnectedInstances;
186  
    cI.setAfterglowInterval(1000);
187  
    cI.connected2.onChange(rst);
188  
    cI.start();
189  
    
190  
    openServerSocket();
191  
    
192  
    // restart socket every hour (and once now for testing - XX)
193  
    /*doEvery(10.0, 60*60.0, r {
194  
      pcall { cleanMeUp_socket(); }
195  
      openServerSocket();
196  
    });*/
197  
  }
198  
  
199  
  void openServerSocket ctex {
200  
    print("Opening server socket");
201  
    selector = Selector.open();
202  
    serverChannel = ServerSocketChannel.open();
203  
    serverChannel.configureBlocking(false);
204  
    
205  
    InetSocketAddress sockAddr = new InetSocketAddress(port);
206  
    serverChannel.socket().bind(sockAddr);
207  
    print("Server socket bound");
208  
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
209  
    
210  
    thread "Server Accept Loop" { acceptLoop(); }
211  
    
212  
    mainTimer = doEvery(updateInterval, r {
213  
      for (SocketChannel socket : keysList(dataMapper)) {
214  
        State state = state(socket);
215  
        if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
216  
          closeSocket(socket);
217  
        }
218  
      }
219  
      for (Channel channel : values(channels)) pcall {
220  
        lock channel.lock;
221  
        channel.update();
222  
      }
223  
    });
224  
  }
225  
  
226  
  void cleanMeUp_socket() ctex {
227  
    if (serverChannel != null)
228  
      serverChannel.close();
229  
    for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
230  
      socket.close();
231  
    }
232  
  }
233  
  
234  
  void acceptLoop ctex {
235  
    while licensed {
236  
      // wait for events
237  
      selector.select();
238  
  
239  
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
240  
      while (keys.hasNext()) {
241  
        SelectionKey key = keys.next();
242  
        keys.remove();
243  
        debug "have key";
244  
        
245  
        if (!key.isValid())
246  
          continue with debug "key not valid";
247  
          
248  
        pcall {
249  
          if (key.isAcceptable()) {
250  
            debug "Key is acceptable";
251  
            acceptOnSocket(key);
252  
          } else if (key.isReadable()) {
253  
            debug "Key is readable";
254  
            readFromSocket(key);
255  
          } else if (key.isWritable()) {
256  
            debug "Key is writable";
257  
            writeToSocket(key);
258  
          } else
259  
            debug "Unknown key type";
260  
        }
261  
      }
262  
    }
263  
  }
264  
  
265  
  //accept a connection made to this channel's socket
266  
  void acceptOnSocket(SelectionKey key) throws IOException {
267  
    ServerSocketChannel serverChannel = cast key.channel();
268  
    SocketChannel channel = serverChannel.accept();
269  
    channel.configureBlocking(false);
270  
    Socket actualSocket = channel.socket();
271  
    SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
272  
    print("Connected to: " + remoteAddr);
273  
    
274  
    // register channel with selector for further IO
275  
    dataMapper.put(channel, new State);
276  
    channel.register(selector, SelectionKey.OP_READ);
277  
  }
278  
  
279  
  void onSocketClose(SocketChannel socket) {
280  
    Socket actualSocket = socket.socket();
281  
    SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
282  
    print("Connection closed by client: " + remoteAddr);
283  
    State state = state(socket);
284  
    if (state != null && state.computerID != null)
285  
      cI.lostConnection(state.computerID);
286  
    dataMapper.remove(socket);
287  
    removeFromAllChannels(socket);
288  
  }
289  
  
290  
  void closeSocket(SocketChannel socket) {
291  
    pcall { socket.close(); }
292  
    pcall { onSocketClose(socket); }
293  
  }
294  
  
295  
  void readFromSocket(SelectionKey key) throws IOException {
296  
    final SocketChannel socket = cast key.channel();
297  
    ByteBuffer buffer = ByteBuffer.allocate(1024);
298  
    int numRead = socket.read(buffer);
299  
    if (numRead == -1) {
300  
      closeSocket(socket);
301  
      key.cancel();
302  
      ret;
303  
    }
304  
  
305  
    S s = new String(buffer.array(), 0, numRead);
306  
    //print("Got data from client: " + s);
307  
    final State state = state(socket);
308  
    if (state != null) {
309  
      state.lastMessage = sysNow();
310  
      state.lineBuffer.append(s, voidfunc(S line) {
311  
        //if (nempty(line)) print("Got line: " + line);
312  
        pcall { handleLine(line, socket, state); }
313  
      });
314  
      if (state.lineBuffer.size() > maxLineLength) {
315  
        print("Bad client (line too long");
316  
        closeSocket(socket);
317  
      }
318  
    }
319  
  }
320  
  
321  
  void writeToSocket(SelectionKey key) throws IOException {
322  
    final SocketChannel socket = cast key.channel();
323  
    socket.register(selector, SelectionKey.OP_READ);
324  
    State state = state(socket);
325  
    lock state.lock;
326  
    if (empty(state.toSend)) ret;
327  
    byte[] data = popFirst(state.toSend);
328  
    print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
329  
    sendBytesToSocket(socket, data);
330  
  }
331  
  
332  
  void sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
333  
    int length = l(bytes);
334  
    State state = state(socket);
335  
    lock state.lock;
336  
    if (nempty(state.toSend)) {
337  
      print("Postponing send to " + state.computerID + " because data already queued");
338  
      ret with state.toSend.add(bytes);
339  
    }
340  
    ByteBuffer buf = ByteBuffer.wrap(bytes);
341  
    int n = socket.write(buf);
342  
    if (n < length) {
343  
      byte[] remaining = subByteArray(bytes, n);
344  
      state.toSend.add(remaining);
345  
      print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
346  
      socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
347  
    }
348  
  }
349  
  
350  
  void handleLine(S line, SocketChannel socket, State state) {
351  
    new Matches m;
352  
    
353  
    vmBus_send oic_gotLine(this, line, socket, state);
354  
    pcallFAll(onLineReceived, line, socket, state);
355  
    
356  
    if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
357  
358  
    if (jmatch("computerID=*", line, m)) {
359  
      if (state.computerID != null || !possibleComputerID($1)) ret;
360  
      print("Got computer ID: " + $1);
361  
      state.computerID = $1;
362  
      cI.gotConnection(state.computerID);
363  
      ret;
364  
    }
365  
    
366  
    if (jmatch("sub *", line, m)) {
367  
      Channel channel = getChannelForSocket(socket, $1);
368  
      if (channel == null)
369  
        sendLineToSocket(socket, "Channel " + $1 + " not found");
370  
      else
371  
        sendLineToSocket(socket, "Subbed " + $1);
372  
      ret;
373  
    }
374  
    
375  
    L<S> tok = javaTok(line);
376  
    if (eq(get(tok, 3), ":")) {
377  
      Channel channel = state.subbedChannels.get(get(tok, 1));
378  
      if (channel != null)
379  
        channel.handleLine(joinSubList(tok, 5), socket);
380  
    }
381  
    
382  
    // ignore unknown line
383  
  }
384  
  
385  
  void sendLineToSocket(SocketChannel socket, S s) ctex {
386  
    byte[] bytes = toUtf8(s + "\n");
387  
    sendBytesToSocket(socket, bytes);
388  
  }
389  
  
390  
  void removeFromAllChannels(SocketChannel socket) {
391  
    lock channelsLock;
392  
    for (S name, Channel c : cloneMap(channels)) { pcall {
393  
      c.removeSocket(socket);
394  
      if (empty(c.sockets)) {
395  
        print("Deleting channel: " + name);
396  
        channels.remove(name);
397  
      }
398  
    }}
399  
  }
400  
  
401  
  Channel getChannelForSocket(SocketChannel socket, S name) {
402  
    lock channelsLock;
403  
    Channel c = getChannel(name);
404  
    c.addSocket(socket);
405  
    state(socket).subbedChannels.put(name, c);
406  
    ret c;
407  
  }
408  
  
409  
  Channel getChannel(S name) {
410  
    lock channelsLock;
411  
    Channel c = channels.get(name);
412  
    if (c == null) {
413  
      c = makeChannel(name);
414  
      c.name = name;
415  
      if (c == null) null;
416  
      print("New channel: " + name);
417  
      channels.put(name, c);
418  
      lock c.lock;
419  
      c.update();
420  
    }
421  
    ret c;
422  
  }
423  
  Channel makeChannel(S name) {
424  
    if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
425  
    if (eq(name, "computerCount")) ret new ComputerCountChannel;
426  
    if (eq(name, "chat")) ret new PublicChatChannel;
427  
    if (eq(name, "machineChat")) ret new PublicChatChannel;
428  
    if (eq(name, "privateChat")) ret new PrivateChatChannel;
429  
    if (eq(name, "privateMachineChat")) ret setAll(new PrivateChatChannel, prefix := name + ":");
430  
    if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
431  
    if (eq(name, "voiceOutput")) ret new PublicChatChannel;
432  
      
433  
    ret null with print("Unknown channel: " + name);
434  
  }
435  
  
436  
  void update {
437  
    print("OS Instances update");
438  
    if (empty(dataMapper)) ret;
439  
  
440  
    int value = cI.value();
441  
    //if (value == theNumber) ret;
442  
    theNumber = value;
443  
    print("OS Instances update done");
444  
  }
445  
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex  old transpilations   

Travelled to 7 computer(s): bhatertpkbcr, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt, xrpafgyirdlv

No comments. add comment

Snippet ID: #1027046
Snippet name: OSInstancesConnector as class [NIO Socket]
Eternal ID of this version: #1027046/13
Text MD5: 63d8c2ab4c15ac0d6f39c7edec70e203
Transpilation MD5: 4d18d9ac7057f7dd3cc980e5188e9cba
Author: stefan
Category: javax / networking
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2020-02-15 15:19:00
Source code size: 13398 bytes / 445 lines
Pitched / IR pitched: No / No
Views / Downloads: 248 / 579
Version history: 12 change(s)
Referenced in: [show references]