Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

380
LINES

< > BotCompany Repo | #1022169 // OS Instances Connector [NIO Socket, backup without buffers]

JavaX module

Download Jar.

1  
!7
2  
3  
static int updateInterval = 10;
4  
static long timeout = 30000+10000; // idle time + latency
5  
static int port = 6000;
6  
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
7  
8  
import java.nio.*;
9  
import java.nio.channels.*;
10  
11  
static ConnectedInstances cI;
12  
static ReliableSingleThread rst = new(r update);
13  
static int theNumber;
14  
static Map<SocketChannel, State> dataMapper = synchroMap();
15  
static ServerSocketChannel serverChannel;
16  
static Selector selector;
17  
static Map<S, Channel> channels = synchroMap();
18  
static Lock channelsLock = lock();
19  
static java.util.Timer mainTimer;
20  
21  
sclass State {
22  
  S socketID = aGlobalID(), computerID;
23  
  int countSeen = -1;
24  
  new LineBuffer lineBuffer;
25  
  long lastMessage = sysNow();
26  
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
27  
}
28  
29  
sclass Channel<A> {
30  
  S name;
31  
  Set<SocketChannel> sockets = synchroSet();
32  
  Map<SocketChannel, A> dataMapper = synchroMap();
33  
  Lock lock = lock();
34  
  
35  
  void addSocket(SocketChannel socket) {
36  
    lock lock;
37  
    sockets.add(socket);
38  
    initialMessage(socket);
39  
  }
40  
  
41  
  void removeSocket(SocketChannel socket) {
42  
    lock lock;
43  
    sockets.remove(socket);
44  
  }
45  
  
46  
  void update() {}
47  
  void initialMessage(SocketChannel socket) {}
48  
  void handleLine(S line, SocketChannel socket) {}
49  
}
50  
51  
sclass ComputerIDsChannel extends Channel<O> {
52  
  L<S> value;
53  
  
54  
  void initialMessage(SocketChannel socket) {
55  
    lock lock;
56  
    sendTo(socket);
57  
  }
58  
  
59  
  void update() {
60  
    if (set_trueIfChanged(this, value := asList(cI.set())))
61  
      for (SocketChannel s : sockets) sendTo(s);
62  
  }
63  
  
64  
  void sendTo(SocketChannel socket) {
65  
    sendLineToSocket(socket, "ComputerIDs = " + struct(value));
66  
  }
67  
}
68  
69  
sclass ComputerCountChannel extends Channel<O> {
70  
  int value;
71  
  
72  
  void initialMessage(SocketChannel socket) {
73  
    lock lock;
74  
    sendTo(socket);
75  
  }
76  
  
77  
  void update() {
78  
    if (set_trueIfChanged(this, value := cI.value()))
79  
      for (SocketChannel s : sockets) sendTo(s);
80  
  }
81  
  
82  
  void sendTo(SocketChannel socket) {
83  
    sendLineToSocket(socket, "ComputerCount = " + value);
84  
  }
85  
}
86  
87  
sclass PublicChatChannel extends Channel<O> {
88  
  L<S> msgs = synchroLinkedList();
89  
  
90  
  void handleLine(S line, SocketChannel socket) {
91  
    postMessage(unquote(line));
92  
  }
93  
  
94  
  void postMessage(S msg) {
95  
    msgs.add(msg);
96  
  }
97  
  
98  
  void update {
99  
    S msg;
100  
    while ((msg = syncPopFirst(msgs)) != null) {
101  
      S quoted = quote(msg);
102  
      print("Chat send: " + shorten(quoted, 20));
103  
      quoted = "chat:" + quoted;
104  
      for (SocketChannel s : sockets)
105  
        sendLineToSocket(s, quoted);
106  
    }
107  
  }
108  
}
109  
110  
sclass PrivateChatChannel extends Channel<O> {
111  
  S prefix = "";
112  
  new MultiMap<S, SocketChannel> clients; // key: computer ID
113  
114  
  void initialMessage(SocketChannel socket) {
115  
    clients.put(computerID(socket), socket);
116  
  }
117  
  
118  
  void removeSocket(SocketChannel socket) {
119  
    super.removeSocket(socket);
120  
    clients.remove(computerID(socket), socket);
121  
  }
122  
  
123  
  void handleLine(S line, SocketChannel socket) {
124  
    pcall {
125  
      int i = indexOf(line, ' ');
126  
      S computerID = takeFirst(line, i);
127  
      while licensed {
128  
        SocketChannel socket2 = clients.getFirst(computerID);
129  
        if (socket2 == null) break;
130  
        try {
131  
          S sender = computerID(socket);
132  
          if (l(line) >= 10000)
133  
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
134  
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
135  
          break;
136  
        } catch e {
137  
          closeSocket(socket2);
138  
          clients.remove(computerID, socket2);
139  
        }
140  
      }
141  
    }
142  
  }
143  
}
144  
145  
sclass SnippetUpdatesChannel extends Channel<O> {
146  
  ManualTailFile tail;
147  
  
148  
  *() {
149  
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
150  
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
151  
  }
152  
  
153  
  void update {
154  
    tail.check();
155  
  }
156  
  
157  
  void sendLine(S line) {
158  
    print("SnippetUpdatesChannel send: " + line);
159  
    line = assertNoLineBreaks(line);
160  
    print("Sockets: " + l(sockets));
161  
    //S quoted = quote(line);
162  
    for (SocketChannel s : sockets)
163  
      sendLineToSocket(s, "snippetUpdates:" + line);
164  
  }
165  
}
166  
167  
static State state(SocketChannel socket) {
168  
  ret dataMapper.get(socket);
169  
}
170  
171  
sS computerID(SocketChannel socket) {
172  
  State state = state(socket);
173  
  ret state == null ? null : state.computerID;
174  
}
175  
176  
p {
177  
  cI = new ConnectedInstances;
178  
  cI.setAfterglowInterval(1000);
179  
  cI.connected2.onChange(rst);
180  
  cI.start();
181  
  
182  
  selector = Selector.open();
183  
	serverChannel = ServerSocketChannel.open();
184  
	serverChannel.configureBlocking(false);
185  
	
186  
  InetSocketAddress sockAddr = new InetSocketAddress(port);
187  
	serverChannel.socket().bind(sockAddr);
188  
	serverChannel.register(selector, SelectionKey.OP_ACCEPT);
189  
	
190  
	thread "Server Accept Loop" { acceptLoop(); }
191  
	
192  
	mainTimer = doEvery(updateInterval, r {
193  
	  for (SocketChannel socket : keysList(dataMapper)) {
194  
	    State state = state(socket);
195  
	    if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
196  
	      closeSocket(socket);
197  
	    }
198  
	  }
199  
	  for (Channel channel : values(channels)) pcall {
200  
	    lock channel.lock;
201  
      channel.update();
202  
    }
203  
	});
204  
}
205  
206  
svoid cleanMeUp() ctex {
207  
  if (serverChannel != null)
208  
    serverChannel.close();
209  
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
210  
    socket.close();
211  
  }
212  
}
213  
214  
svoid acceptLoop ctex {
215  
  while licensed {
216  
    // wait for events
217  
    selector.select();
218  
219  
	  Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
220  
	  while (keys.hasNext()) {
221  
      SelectionKey key = keys.next();
222  
	    keys.remove();
223  
	    
224  
	    if (!key.isValid()) continue;
225  
	    pcall {
226  
        if (key.isAcceptable())
227  
          acceptOnSocket(key);
228  
	      else if (key.isReadable())
229  
          readFromSocket(key);
230  
      }
231  
	  }
232  
	}
233  
}
234  
235  
//accept a connection made to this channel's socket
236  
static void acceptOnSocket(SelectionKey key) throws IOException {
237  
  ServerSocketChannel serverChannel = cast key.channel();
238  
  SocketChannel channel = serverChannel.accept();
239  
  channel.configureBlocking(false);
240  
  Socket actualSocket = channel.socket();
241  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
242  
  print("Connected to: " + remoteAddr);
243  
  
244  
  // register channel with selector for further IO
245  
	dataMapper.put(channel, new State);
246  
	channel.register(selector, SelectionKey.OP_READ);
247  
}
248  
249  
svoid onSocketClose(SocketChannel socket) {
250  
  Socket actualSocket = socket.socket();
251  
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
252  
  print("Connection closed by client: " + remoteAddr);
253  
  State state = state(socket);
254  
  if (state != null && state.computerID != null)
255  
    cI.lostConnection(state.computerID);
256  
  dataMapper.remove(socket);
257  
  removeFromAllChannels(socket);
258  
}
259  
260  
svoid closeSocket(SocketChannel socket) {
261  
  pcall { socket.close(); }
262  
  pcall { onSocketClose(socket); }
263  
}
264  
265  
static void readFromSocket(SelectionKey key) throws IOException {
266  
	final SocketChannel socket = cast key.channel();
267  
  ByteBuffer buffer = ByteBuffer.allocate(1024);
268  
  int numRead = socket.read(buffer);
269  
  if (numRead == -1) {
270  
    closeSocket(socket);
271  
    key.cancel();
272  
    ret;
273  
  }
274  
275  
  S s = new String(buffer.array(), 0, numRead);
276  
  //print("Got data from client: " + s);
277  
  final State state = state(socket);
278  
  if (state != null) {
279  
    state.lastMessage = sysNow();
280  
    state.lineBuffer.append(s, voidfunc(S line) {
281  
      //if (nempty(line)) print("Got line: " + line);
282  
      pcall { handleLine(line, socket, state); }
283  
    });
284  
    if (state.lineBuffer.size() > maxLineLength) {
285  
      print("Bad client (line too long");
286  
      closeSocket(socket);
287  
    }
288  
  }
289  
}
290  
291  
svoid handleLine(S line, SocketChannel socket, State state) {
292  
  new Matches m;
293  
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
294  
  
295  
  if (jmatch("computerID=*", line, m)) {
296  
    if (state.computerID != null || !possibleComputerID($1)) ret;
297  
    print("Got computer ID: " + $1);
298  
    state.computerID = $1;
299  
    cI.gotConnection(state.computerID);
300  
    ret;
301  
  }
302  
  
303  
  if (jmatch("sub *", line, m)) {
304  
    Channel channel = getChannelForSocket(socket, $1);
305  
    if (channel == null)
306  
      sendLineToSocket(socket, "Channel " + $1 + " not found");
307  
    else
308  
      sendLineToSocket(socket, "Subbed " + $1);
309  
    ret;
310  
  }
311  
  
312  
  L<S> tok = javaTok(line);
313  
  if (eq(get(tok, 3), ":")) {
314  
    Channel channel = state.subbedChannels.get(get(tok, 1));
315  
    if (channel != null)
316  
      channel.handleLine(joinSubList(tok, 5), socket);
317  
  }
318  
  
319  
  // ignore unknown line
320  
}
321  
322  
html { ret "Use port " + port; }
323  
324  
svoid sendLineToSocket(SocketChannel socket, S s) ctex {
325  
  byte[] bytes = toUtf8(s + "\n");
326  
  int length = l(bytes);
327  
  ByteBuffer buf = ByteBuffer.wrap(bytes);
328  
  int n = socket.write(buf);
329  
  if (n < length)
330  
    print("Socket write failed! TODO");
331  
}
332  
333  
svoid removeFromAllChannels(SocketChannel socket) {
334  
  lock channelsLock;
335  
  for (S name, Channel c : cloneMap(channels)) { pcall {
336  
    c.removeSocket(socket);
337  
    if (empty(c.sockets)) {
338  
      print("Deleting channel: " + name);
339  
      channels.remove(name);
340  
    }
341  
  }}
342  
}
343  
344  
static Channel getChannelForSocket(SocketChannel socket, S name) {
345  
  lock channelsLock;
346  
  Channel c = channels.get(name);
347  
  if (c == null) {
348  
    c = makeChannel(name);
349  
    c.name = name;
350  
    if (c == null) null;
351  
    print("New channel: " + name);
352  
    channels.put(name, c);
353  
    lock c.lock;
354  
    c.update();
355  
  }
356  
  c.addSocket(socket);
357  
  state(socket).subbedChannels.put(name, c);
358  
  ret c;
359  
}
360  
361  
static Channel makeChannel(S name) {
362  
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
363  
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
364  
  if (eq(name, "chat")) ret new PublicChatChannel;
365  
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
366  
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
367  
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
368  
    
369  
  ret null with print("Unknown channel: " + name);
370  
}
371  
372  
svoid update {
373  
  print("OS Instances update");
374  
  if (empty(dataMapper)) ret;
375  
376  
  int value = cI.value();
377  
  //if (value == theNumber) ret;
378  
  theNumber = value;
379  
  print("OS Instances update done");
380  
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex  old transpilations   

Travelled to 7 computer(s): bhatertpkbcr, cfunsshuasjs, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1022169
Snippet name: OS Instances Connector [NIO Socket, backup without buffers]
Eternal ID of this version: #1022169/1
Text MD5: 3ed4072983e6ef4b6085ae6906e62e6e
Author: stefan
Category: javax / a.i. / web
Type: JavaX module
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2019-03-10 11:23:02
Source code size: 10473 bytes / 380 lines
Pitched / IR pitched: No / No
Views / Downloads: 286 / 456
Referenced in: [show references]