Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

418
LINES

< > BotCompany Repo | #1022732 // OS Instances Connector [NIO Socket, Web Bot, backup]

JavaX module

Download Jar.

!7

static int updateInterval = 10;
static long timeout = 30000+10000; // idle time + latency
static int port = 6000;
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this

import java.nio.*;
import java.nio.channels.*;

static ConnectedInstances cI;
static ReliableSingleThread rst = new(r update);
static int theNumber;
static Map<SocketChannel, State> dataMapper = synchroMap();
static ServerSocketChannel serverChannel;
static Selector selector;
static Map<S, Channel> channels = synchroMap();
static Lock channelsLock = lock();
static java.util.Timer mainTimer;
sbool debug;

sclass State {
  S socketID = aGlobalID(), computerID;
  int countSeen = -1;
  new LineBuffer lineBuffer;
  long lastMessage = sysNow();
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
  Lock lock = lock();
  
  // TODO: clear eventually to avoid stalling attacks
  L<byte[]> toSend = synchroLinkedList();
}

sclass Channel<A> {
  S name;
  Set<SocketChannel> sockets = synchroSet();
  Map<SocketChannel, A> dataMapper = synchroMap();
  Lock lock = lock();
  
  void addSocket(SocketChannel socket) {
    lock lock;
    sockets.add(socket);
    initialMessage(socket);
  }
  
  void removeSocket(SocketChannel socket) {
    lock lock;
    sockets.remove(socket);
  }
  
  void update() {}
  void initialMessage(SocketChannel socket) {}
  void handleLine(S line, SocketChannel socket) {}
}

sclass ComputerIDsChannel extends Channel<O> {
  L<S> value;
  
  void initialMessage(SocketChannel socket) {
    lock lock;
    sendTo(socket);
  }
  
  void update() {
    if (set_trueIfChanged(this, value := asList(cI.set())))
      for (SocketChannel s : sockets) sendTo(s);
  }
  
  void sendTo(SocketChannel socket) {
    sendLineToSocket(socket, "ComputerIDs = " + struct(value));
  }
}

sclass ComputerCountChannel extends Channel<O> {
  int value;
  
  void initialMessage(SocketChannel socket) {
    lock lock;
    sendTo(socket);
  }
  
  void update() {
    if (set_trueIfChanged(this, value := cI.value()))
      for (SocketChannel s : sockets) sendTo(s);
  }
  
  void sendTo(SocketChannel socket) {
    sendLineToSocket(socket, "ComputerCount = " + value);
  }
}

sclass PublicChatChannel extends Channel<O> {
  L<S> msgs = synchroLinkedList();
  
  void handleLine(S line, SocketChannel socket) {
    postMessage(unquote(line));
  }
  
  void postMessage(S msg) {
    msgs.add(msg);
  }
  
  void update {
    S msg;
    while ((msg = syncPopFirst(msgs)) != null) {
      S quoted = quote(msg);
      print("Chat send: " + shorten(quoted, 20));
      quoted = "chat:" + quoted;
      for (SocketChannel s : sockets)
        sendLineToSocket(s, quoted);
    }
  }
}

sclass PrivateChatChannel extends Channel<O> {
  S prefix = "";
  new MultiMap<S, SocketChannel> clients; // key: computer ID

  void initialMessage(SocketChannel socket) {
    clients.put(computerID(socket), socket);
  }
  
  void removeSocket(SocketChannel socket) {
    super.removeSocket(socket);
    clients.remove(computerID(socket), socket);
  }
  
  void handleLine(S line, SocketChannel socket) {
    pcall {
      int i = indexOf(line, ' ');
      S computerID = takeFirst(line, i);
      while licensed {
        SocketChannel socket2 = clients.getFirst(computerID);
        if (socket2 == null) break;
        try {
          S sender = computerID(socket);
          if (l(line) >= 10000)
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
          break;
        } catch e {
          closeSocket(socket2);
          clients.remove(computerID, socket2);
        }
      }
    }
  }
}

sclass SnippetUpdatesChannel extends Channel<O> {
  ManualTailFile tail;
  
  *() {
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
  }
  
  void update {
    tail.check();
  }
  
  void sendLine(S line) {
    print("SnippetUpdatesChannel send: " + line);
    line = assertNoLineBreaks(line);
    print("Sockets: " + l(sockets));
    //S quoted = quote(line);
    for (SocketChannel s : sockets)
      sendLineToSocket(s, "snippetUpdates:" + line);
  }
}

static State state(SocketChannel socket) {
  ret dataMapper.get(socket);
}

sS computerID(SocketChannel socket) {
  State state = state(socket);
  ret state == null ? null : state.computerID;
}

p {
  cI = new ConnectedInstances;
  cI.setAfterglowInterval(1000);
  cI.connected2.onChange(rst);
  cI.start();
  
  selector = Selector.open();
	serverChannel = ServerSocketChannel.open();
	serverChannel.configureBlocking(false);
	
  InetSocketAddress sockAddr = new InetSocketAddress(port);
	serverChannel.socket().bind(sockAddr);
	serverChannel.register(selector, SelectionKey.OP_ACCEPT);
	
	thread "Server Accept Loop" { acceptLoop(); }
	
	mainTimer = doEvery(updateInterval, r {
	  for (SocketChannel socket : keysList(dataMapper)) {
	    State state = state(socket);
	    if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
	      closeSocket(socket);
	    }
	  }
	  for (Channel channel : values(channels)) pcall {
	    lock channel.lock;
      channel.update();
    }
	});
}

svoid cleanMeUp() ctex {
  if (serverChannel != null)
    serverChannel.close();
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
    socket.close();
  }
}

svoid acceptLoop ctex {
  while licensed {
    // wait for events
    selector.select();

	  Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
	  while (keys.hasNext()) {
      SelectionKey key = keys.next();
	    keys.remove();
      debug "have key";
	    
	    if (!key.isValid())
	      continue with debug "key not valid";
	      
	    pcall {
        if (key.isAcceptable()) {
          debug "Key is acceptable";
          acceptOnSocket(key);
        } else if (key.isReadable()) {
          debug "Key is readable";
          readFromSocket(key);
        } else if (key.isWritable()) {
          debug "Key is writable";
          writeToSocket(key);
        } else
          debug "Unknown key type";
      }
	  }
	}
}

//accept a connection made to this channel's socket
static void acceptOnSocket(SelectionKey key) throws IOException {
  ServerSocketChannel serverChannel = cast key.channel();
  SocketChannel channel = serverChannel.accept();
  channel.configureBlocking(false);
  Socket actualSocket = channel.socket();
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
  print("Connected to: " + remoteAddr);
  
  // register channel with selector for further IO
	dataMapper.put(channel, new State);
	channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}

svoid onSocketClose(SocketChannel socket) {
  Socket actualSocket = socket.socket();
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
  print("Connection closed by client: " + remoteAddr);
  State state = state(socket);
  if (state != null && state.computerID != null)
    cI.lostConnection(state.computerID);
  dataMapper.remove(socket);
  removeFromAllChannels(socket);
}

svoid closeSocket(SocketChannel socket) {
  pcall { socket.close(); }
  pcall { onSocketClose(socket); }
}

static void readFromSocket(SelectionKey key) throws IOException {
	final SocketChannel socket = cast key.channel();
  ByteBuffer buffer = ByteBuffer.allocate(1024);
  int numRead = socket.read(buffer);
  if (numRead == -1) {
    closeSocket(socket);
    key.cancel();
    ret;
  }

  S s = new String(buffer.array(), 0, numRead);
  //print("Got data from client: " + s);
  final State state = state(socket);
  if (state != null) {
    state.lastMessage = sysNow();
    state.lineBuffer.append(s, voidfunc(S line) {
      //if (nempty(line)) print("Got line: " + line);
      pcall { handleLine(line, socket, state); }
    });
    if (state.lineBuffer.size() > maxLineLength) {
      print("Bad client (line too long");
      closeSocket(socket);
    }
  }
}

static void writeToSocket(SelectionKey key) throws IOException {
	final SocketChannel socket = cast key.channel();
	State state = state(socket);
	lock state.lock;
	if (empty(state.toSend)) ret;
	byte[] data = popFirst(state.toSend);
	print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
	sendBytesToSocket(socket, data);
}

svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
  int length = l(bytes);
  State state = state(socket);
  lock state.lock;
  if (nempty(state.toSend)) {
    print("Postponing send to " + state.computerID + " because data already queued");
    ret with state.toSend.add(bytes);
  }
  ByteBuffer buf = ByteBuffer.wrap(bytes);
  int n = socket.write(buf);
  if (n < length) {
    byte[] remaining = subByteArray(bytes, n);
    state.toSend.add(remaining);
    print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
  }
}

svoid handleLine(S line, SocketChannel socket, State state) {
  new Matches m;
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
  
  if (jmatch("computerID=*", line, m)) {
    if (state.computerID != null || !possibleComputerID($1)) ret;
    print("Got computer ID: " + $1);
    state.computerID = $1;
    cI.gotConnection(state.computerID);
    ret;
  }
  
  if (jmatch("sub *", line, m)) {
    Channel channel = getChannelForSocket(socket, $1);
    if (channel == null)
      sendLineToSocket(socket, "Channel " + $1 + " not found");
    else
      sendLineToSocket(socket, "Subbed " + $1);
    ret;
  }
  
  L<S> tok = javaTok(line);
  if (eq(get(tok, 3), ":")) {
    Channel channel = state.subbedChannels.get(get(tok, 1));
    if (channel != null)
      channel.handleLine(joinSubList(tok, 5), socket);
  }
  
  // ignore unknown line
}

html { ret "Use port " + port; }

svoid sendLineToSocket(SocketChannel socket, S s) ctex {
  byte[] bytes = toUtf8(s + "\n");
  sendBytesToSocket(socket, bytes);
}

svoid removeFromAllChannels(SocketChannel socket) {
  lock channelsLock;
  for (S name, Channel c : cloneMap(channels)) { pcall {
    c.removeSocket(socket);
    if (empty(c.sockets)) {
      print("Deleting channel: " + name);
      channels.remove(name);
    }
  }}
}

static Channel getChannelForSocket(SocketChannel socket, S name) {
  lock channelsLock;
  Channel c = channels.get(name);
  if (c == null) {
    c = makeChannel(name);
    c.name = name;
    if (c == null) null;
    print("New channel: " + name);
    channels.put(name, c);
    lock c.lock;
    c.update();
  }
  c.addSocket(socket);
  state(socket).subbedChannels.put(name, c);
  ret c;
}

static Channel makeChannel(S name) {
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
  if (eq(name, "chat")) ret new PublicChatChannel;
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
    
  ret null with print("Unknown channel: " + name);
}

svoid update {
  print("OS Instances update");
  if (empty(dataMapper)) ret;

  int value = cI.value();
  //if (value == theNumber) ret;
  theNumber = value;
  print("OS Instances update done");
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex  old transpilations   

Travelled to 7 computer(s): bhatertpkbcr, cfunsshuasjs, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1022732
Snippet name: OS Instances Connector [NIO Socket, Web Bot, backup]
Eternal ID of this version: #1022732/1
Text MD5: e4d0736aa0d77f37beb88f25294fd0cd
Author: stefan
Category: javax / a.i. / web
Type: JavaX module
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2019-04-03 16:08:05
Source code size: 11789 bytes / 418 lines
Pitched / IR pitched: No / No
Views / Downloads: 308 / 463
Referenced in: [show references]