Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

478
LINES

< > BotCompany Repo | #1016572 // OS Instances Connector [NIO Socket, Web Bot, LIVE]

JavaX module (desktop) [tags: butter use-pretranspiled] - homepage

Download Jar. Libraryless. Click here for Pure Java version (13750L/82K).

!7

// TODO: we are iterating over all channels and all sockest
// 100 times per second - this seems excessive
static int updateInterval = 10;
static long timeout = 30000+10000; // idle time + latency
static int port = 6000;
static int maxLineLength = 10*1024*1024; // change #1020500 too if you change this

import java.nio.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;

static ConnectedInstances cI;
static ReliableSingleThread rst = new(r update);
static int theNumber;
static Map<SocketChannel, State> dataMapper = synchroMap();
static ServerSocketChannel serverChannel;
static Selector selector;
static Map<S, Channel> channels = synchroMap();
static Lock channelsLock = lock();
static java.util.Timer mainTimer;
sbool debug;

// state of a client connection
srecord noeq State(SocketChannel socket) {
  S socketID = aGlobalID(), computerID;
  int countSeen = -1;
  new LineBuffer lineBuffer;
  long lastMessage = sysNow();
  Map<S, Channel> subbedChannels = synchroLinkedHashMap();
  Lock lock = lock();
  
  // TODO: clear eventually to avoid stalling attacks
  L<byte[]> toSend = synchroLinkedList();
}

// a virtual "channel" that clients can "subscribe" to
// (what that means differs between channels)
// Each channel is identified by a unique name
// A is a custom data type for each subscriber
sclass Channel<A> {
  S name;
  Set<SocketChannel> sockets = synchroSet();
  Map<SocketChannel, A> dataMapper = synchroMap();
  Lock lock = lock();
  
  event subCountChanged;
  
  // add a subscriber
  void addSocket(SocketChannel socket) {
    lock lock;
    sockets.add(socket);
    subCountChanged();
    initialMessage(socket);
  }
  
  // remove a subscriber
  void removeSocket(SocketChannel socket) {
    lock lock;
    sockets.remove(socket);
    subCountChanged();
  }
  
  int subCount() { ret l(sockets); }

  void sendToSubscribers(S line) {  
    for (SocketChannel socket : cloneList(sockets))
      sendLineToSocket(socket, line);
  }
  
  void update() {}
  void initialMessage(SocketChannel socket) {}
  void handleLine(S line, SocketChannel socket) {}
}

// channel listing the currently logged in computer IDs
// TODO: optimize the protocol to send diffs only
sclass ComputerIDsChannel extends Channel<O> {
  L<S> value;
  
  void initialMessage(SocketChannel socket) {
    sendLineToSocket(socket, stateMsg());
  }
  
  void update() {
    if (set_trueIfChanged(this, value := asList(cI.set())))
      sendToSubscribers(stateMsg());
  }
  
  S stateMsg() {
    ret "ComputerIDs = " + struct(value);
  }
}

// channel listing the current count of logged in computers
sclass ComputerCountChannel extends Channel<O> {
  int value;
  
  void initialMessage(SocketChannel socket) {
    sendLineToSocket(socket, stateMsg());
  }
  
  void update() {
    if (set_trueIfChanged(this, value := cI.value()))
      sendToSubscribers(stateMsg());
  }
  
  S stateMsg() {
    ret "ComputerCount = " + value;
  }
}

// a public chat for broadcast messages between subscribers
sclass PublicChatChannel extends Channel<O> {
  L<S> msgs = synchroLinkedList();
  
  void handleLine(S line, SocketChannel socket) {
    postMessage(unquote(line));
  }
  
  void postMessage(S msg) {
    msgs.add(msg);
  }
  
  void update {
    S msg;
    while ((msg = syncPopFirst(msgs)) != null) {
      S quoted = quote(msg);
      //print("Chat send: " + shorten(quoted, 20));
      quoted = name + ":" + quoted;
      for (SocketChannel s : sockets)
        sendLineToSocket(s, quoted);
    }
  }
}

// a chat for DMs (messages from one computer to another)
// (is it private though? probably not as we don't check if
// the computer IDs sent to the server are correct)
sclass PrivateChatChannel extends Channel<O> {
  S prefix = "";
  new MultiMap<S, SocketChannel> clients; // key: computer ID

  void initialMessage(SocketChannel socket) {
    clients.put(computerID(socket), socket);
  }
  
  void removeSocket(SocketChannel socket) {
    super.removeSocket(socket);
    clients.remove(computerID(socket), socket);
  }
  
  void handleLine(S line, SocketChannel socket) {
    pcall {
      int i = indexOf(line, ' ');
      S computerID = takeFirst(line, i);
      while licensed {
        SocketChannel socket2 = clients.getFirst(computerID);
        if (socket2 == null) break;
        try {
          S sender = computerID(socket);
          if (l(line) >= 10000)
            print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
          sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
          break;
        } catch e {
          closeSocket(socket2);
          clients.remove(computerID, socket2);
        }
      }
    }
  }
}

// channel distributing snippet updates (edits, compiles etc)
sclass SnippetUpdatesChannel extends Channel<O> {
  ManualTailFile tail;
  
  *() {
    tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
      vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
  }
  
  void update {
    tail.check();
  }
  
  void sendLine(S line) {
    print("SnippetUpdatesChannel send: " + line);
    line = assertNoLineBreaks(line);
    print("Sockets: " + l(sockets));
    //S quoted = quote(line);
    for (SocketChannel s : sockets)
      sendLineToSocket(s, "snippetUpdates:" + line);
  }
}

// channel broadcast its current subscriber count
sclass PresenceCountingChannel extends Channel<Void> {
  *() {
    onSubCountChanged(-> sendToSubscribers(stateMsg()));
  }
  
  void initialMessage(SocketChannel socket) {
    sendLineToSocket(socket, stateMsg());
  }
  
  S stateMsg() {
    ret name + ":" + subCount();
  }
}

static State state(SocketChannel socket) {
  ret dataMapper.get(socket);
}

sS computerID(SocketChannel socket) {
  State state = state(socket);
  ret state == null ? null : state.computerID;
}

p {
  cI = new ConnectedInstances;
  cI.setAfterglowInterval(1000);
  cI.connected2.onChange(rst);
  cI.start();
  
  openServerSocket();
  
  // restart socket every hour (and once now for testing - XX)
  /*doEvery(10.0, 60*60.0, r {
    pcall { cleanMeUp_socket(); }
    openServerSocket();
  });*/
}

svoid openServerSocket ctex {
  print("Opening server socket");
  selector = Selector.open();
  serverChannel = ServerSocketChannel.open();
  serverChannel.configureBlocking(false);
  
  InetSocketAddress sockAddr = new InetSocketAddress(port);
  serverChannel.socket().bind(sockAddr);
  print("Server socket bound");
  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  
  thread "Server Accept Loop" { acceptLoop(); }
  
  mainTimer = doEvery(updateInterval, r {
    for (SocketChannel socket : keysList(dataMapper)) {
      State state = state(socket);
      if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
        closeSocket(socket);
      }
    }
    for (Channel channel : values(channels)) pcall {
      lock channel.lock;
      channel.update();
    }
  });
}

svoid cleanMeUp_socket() ctex {
  if (serverChannel != null)
    serverChannel.close();
  for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
    socket.close();
  }
}

svoid acceptLoop ctex {
  while licensed {
    // wait for events
    selector.select();

    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
    while (keys.hasNext()) {
      SelectionKey key = keys.next();
      keys.remove();
      debug "have key";
      
      if (!key.isValid())
        continue with debug "key not valid";
        
      pcall {
        if (key.isAcceptable()) {
          debug "Key is acceptable";
          acceptOnSocket(key);
        } else if (key.isReadable()) {
          debug "Key is readable";
          readFromSocket(key);
        } else if (key.isWritable()) {
          debug "Key is writable";
          writeToSocket(key);
        } else
          debug "Unknown key type";
      }
    }
  }
}

//accept a connection made to this channel's socket
static void acceptOnSocket(SelectionKey key) throws IOException {
  ServerSocketChannel serverChannel = cast key.channel();
  SocketChannel channel = serverChannel.accept();
  channel.configureBlocking(false);
  Socket actualSocket = channel.socket();
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
  print("Connected to: " + remoteAddr);
  
  // register channel with selector for further IO
  dataMapper.put(channel, new State(channel));
  channel.register(selector, SelectionKey.OP_READ);
}

svoid onSocketClose(SocketChannel socket) {
  Socket actualSocket = socket.socket();
  SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
  print("Connection closed by client: " + remoteAddr);
  State state = state(socket);
  if (state != null && state.computerID != null)
    cI.lostConnection(state.computerID);
  dataMapper.remove(socket);
  removeFromAllChannels(socket);
}

svoid closeSocket(SocketChannel socket) {
  pcall { socket.close(); }
  pcall { onSocketClose(socket); }
}

static void readFromSocket(SelectionKey key) throws IOException {
  final SocketChannel socket = cast key.channel();
  ByteBuffer buffer = ByteBuffer.allocate(1024);
  int numRead = socket.read(buffer);
  if (numRead == -1) {
    closeSocket(socket);
    key.cancel();
    ret;
  }

  S s = new String(buffer.array(), 0, numRead);
  //print("Got data from client: " + s);
  final State state = state(socket);
  if (state != null) {
    state.lastMessage = sysNow();
    state.lineBuffer.append(s, voidfunc(S line) {
      //if (nempty(line)) print("Got line: " + line);
      pcall { handleLine(line, socket, state); }
    });
    if (state.lineBuffer.size() > maxLineLength) {
      print("Bad client (line too long");
      closeSocket(socket);
    }
  }
}

static void writeToSocket(SelectionKey key) throws IOException {
  final SocketChannel socket = cast key.channel();
  socket.register(selector, SelectionKey.OP_READ);
  State state = state(socket);
  lock state.lock;
  if (empty(state.toSend)) ret;
  byte[] data = popFirst(state.toSend);
  print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
  sendBytesToSocket(socket, data);
}

svoid sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
  int length = l(bytes);
  State state = state(socket);
  lock state.lock;
  if (nempty(state.toSend)) {
    print("Postponing send to " + state.computerID + " because data already queued");
    ret with state.toSend.add(bytes);
  }
  ByteBuffer buf = ByteBuffer.wrap(bytes);
  int n = socket.write(buf);
  if (n < length) {
    byte[] remaining = subByteArray(bytes, n);
    state.toSend.add(remaining);
    print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
    socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
  }
}

svoid handleLine(S line, SocketChannel socket, State state) {
  new Matches m;
  if (empty(line)) ret with sendLineToSocket(socket, ""); // idle
  
  if (jmatch("computerID=*", line, m)) {
    if (state.computerID != null || !possibleComputerID($1)) ret;
    print("Got computer ID: " + $1);
    state.computerID = $1;
    cI.gotConnection(state.computerID);
    ret;
  }
  
  if (jmatch("sub *", line, m)) {
    Channel channel = getChannelForSocket(socket, $1);
    if (channel == null)
      sendLineToSocket(socket, "Channel " + $1 + " not found");
    else
      sendLineToSocket(socket, "Subbed " + $1);
    ret;
  }
  
  L<S> tok = javaTok(line);
  if (eq(get(tok, 3), ":")) {
    Channel channel = state.subbedChannels.get(get(tok, 1));
    if (channel != null)
      channel.handleLine(joinSubList(tok, 5), socket);
  }
  
  // ignore unknown line
}

html { ret "Use port " + port; }

svoid sendLineToSocket(SocketChannel socket, S s) ctex {
  byte[] bytes = toUtf8(s + "\n");
  sendBytesToSocket(socket, bytes);
}

svoid removeFromAllChannels(SocketChannel socket) {
  lock channelsLock;
  for (S name, Channel c : cloneMap(channels)) { pcall {
    c.removeSocket(socket);
    if (empty(c.sockets)) {
      print("Deleting channel: " + name);
      channels.remove(name);
    }
  }}
}

static Channel getChannelForSocket(SocketChannel socket, S name) {
  lock channelsLock;
  Channel c = channels.get(name);
  if (c == null) {
    c = makeChannel(name);
    c.name = name;
    if (c == null) null;
    print("New channel: " + name);
    channels.put(name, c);
    lock c.lock;
    c.update();
  }
  c.addSocket(socket);
  state(socket).subbedChannels.put(name, c);
  ret c;
}

static Channel makeChannel(S name) {
  if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
  if (eq(name, "computerCount")) ret new ComputerCountChannel;
  if (eq(name, "chat")) ret new PublicChatChannel;
  if (eq(name, "machineChat")) ret new PublicChatChannel;
  if (eq(name, "privateChat")) ret new PrivateChatChannel;
  if (eq(name, "privateMachineChat")) ret nu(PrivateChatChannel, prefix := name + ":");
  if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
  
  if (eq(name, "gazelleChat")) ret new PublicChatChannel;
  if (eq(name, "gazellePresence")) ret new PresenceCountingChannel;
    
  ret null with print("Unknown channel: " + name);
}

svoid update {
  print("OS Instances update");
  if (empty(dataMapper)) ret;

  int value = cI.value();
  //if (value == theNumber) ret;
  theNumber = value;
  print("OS Instances update done");
}

Author comment

Began life as a copy of #1016571

download  show line numbers  debug dex  old transpilations   

Travelled to 14 computer(s): aoiabmzegqzx, bhatertpkbcr, cbybwowwnfue, cfunsshuasjs, gwrvuhgaqvyk, irmadwmeruwu, ishqpsrjomds, lpdgvwnxivlt, mqqgnosmbjvj, pyentgdyhuwx, pzhvpgtvlbxg, tslmcundralx, tvejysmllsmz, vouqrxazstgt

No comments. add comment

Snippet ID: #1016572
Snippet name: OS Instances Connector [NIO Socket, Web Bot, LIVE]
Eternal ID of this version: #1016572/116
Text MD5: d29f1d064be1c6dcad2f16d4622285fa
Transpilation MD5: 75b0b7ca6f89db2a0143e25a6af62f38
Author: stefan
Category: javax / a.i. / web
Type: JavaX module (desktop)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2022-04-08 18:47:05
Source code size: 13857 bytes / 478 lines
Pitched / IR pitched: No / No
Views / Downloads: 842 / 2995
Version history: 115 change(s)
Referenced in: #1020500 - dm_maximumDecentBytesToReturnFromRemoteEval
#1022169 - OS Instances Connector [NIO Socket, backup without buffers]
#1022731 - Debug OS Instances Connector
#1022732 - OS Instances Connector [NIO Socket, Web Bot, backup]
#1027046 - OSInstancesConnector as class [NIO Socket]
#1035030 - OS Instances Connector [backup]