Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

445
LINES

< > BotCompany Repo | #1027046 - OSInstancesConnector as class [NIO Socket]

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (9434L/64K).

sclass OSInstancesConnector {
  int updateInterval = 10;
  long timeout = 30000+10000; // idle time + latency
  int port = 6000;
  int maxLineLength = 10*1024*1024; // change #1020500 too if you change this
  
  import java.nio.*;
  import java.nio.channels.*;
  
  ConnectedInstances cI;
  ReliableSingleThread rst = new(r update);
  int theNumber;
  Map<SocketChannel, State> dataMapper = synchroMap();
  ServerSocketChannel serverChannel;
  Selector selector;
  Map<S, Channel> channels = synchroMap();
  Lock channelsLock = lock();
  java.util.Timer mainTimer;
  bool debug;
  L onLineReceived = syncList(); // L<vf(line, socket, state)>
  
  class State {
    S socketID = aGlobalID(), computerID;
    int countSeen = -1;
    new LineBuffer lineBuffer;
    long lastMessage = sysNow();
    Map<S, Channel> subbedChannels = synchroLinkedHashMap();
    Lock lock = lock();
    
    // TODO: clear eventually to avoid stalling attacks
    L<byte[]> toSend = synchroLinkedList();
  }
  
  class Channel<A> {
    S name;
    Set<SocketChannel> sockets = synchroSet();
    Map<SocketChannel, A> dataMapper = synchroMap();
    Lock lock = lock();
    
    void addSocket(SocketChannel socket) {
      lock lock;
      sockets.add(socket);
      initialMessage(socket);
    }
    
    void removeSocket(SocketChannel socket) {
      lock lock;
      sockets.remove(socket);
    }
    
    void update() {}
    void initialMessage(SocketChannel socket) {}
    void handleLine(S line, SocketChannel socket) {}
  }
  
  class ComputerIDsChannel extends Channel<O> {
    L<S> value;
    
    void initialMessage(SocketChannel socket) {
      lock lock;
      sendTo(socket);
    }
    
    void update() {
      if (set_trueIfChanged(this, value := asList(cI.set())))
        for (SocketChannel s : sockets) sendTo(s);
    }
    
    void sendTo(SocketChannel socket) {
      sendLineToSocket(socket, "ComputerIDs = " + struct(value));
    }
  }
  
  class ComputerCountChannel extends Channel<O> {
    int value;
    
    void initialMessage(SocketChannel socket) {
      lock lock;
      sendTo(socket);
    }
    
    void update() {
      if (set_trueIfChanged(this, value := cI.value()))
        for (SocketChannel s : sockets) sendTo(s);
    }
    
    void sendTo(SocketChannel socket) {
      sendLineToSocket(socket, "ComputerCount = " + value);
    }
  }
  
  class PublicChatChannel extends Channel<O> {
    L<S> msgs = synchroLinkedList();
    
    void handleLine(S line, SocketChannel socket) {
      postMessage(unquote(line));
    }
    
    void postMessage(S msg) {
      msgs.add(msg);
    }
    
    void update {
      S msg;
      while ((msg = syncPopFirst(msgs)) != null) {
        S quoted = quote(msg);
        //print("Chat send: " + shorten(quoted, 20));
        quoted = name + ":" + quoted;
        for (SocketChannel s : sockets)
          sendLineToSocket(s, quoted);
      }
    }
  }
  
  class PrivateChatChannel extends Channel<O> {
    S prefix = "";
    new MultiMap<S, SocketChannel> clients; // key: computer ID
  
    void initialMessage(SocketChannel socket) {
      clients.put(computerID(socket), socket);
    }
    
    void removeSocket(SocketChannel socket) {
      super.removeSocket(socket);
      clients.remove(computerID(socket), socket);
    }
    
    void handleLine(S line, SocketChannel socket) {
      pcall {
        int i = indexOf(line, ' ');
        S computerID = takeFirst(line, i);
        while licensed {
          SocketChannel socket2 = clients.getFirst(computerID);
          if (socket2 == null) break;
          try {
            S sender = computerID(socket);
            if (l(line) >= 10000)
              print("Forwarding line from " + sender + " to " + computerID + ": " + l(line));
            sendLineToSocket(socket2, prefix + "privately from " + sender + ": " + substring(line, i+1));
            break;
          } catch e {
            closeSocket(socket2);
            clients.remove(computerID, socket2);
          }
        }
      }
    }
  }
  
  class SnippetUpdatesChannel extends Channel<O> {
    ManualTailFile tail;
    
    *() {
      tail = manualTailFile_newOnly(programFile(#1019175, "changes.log"),
        vfAppendToLineBuffer(unquotingLineBuffer(voidfunc(S line) { sendLine(line) })));
    }
    
    void update {
      tail.check();
    }
    
    void sendLine(S line) {
      print("SnippetUpdatesChannel send: " + line);
      line = assertNoLineBreaks(line);
      print("Sockets: " + l(sockets));
      //S quoted = quote(line);
      for (SocketChannel s : sockets)
        sendLineToSocket(s, "snippetUpdates:" + line);
    }
  }
  
  State state(SocketChannel socket) {
    ret dataMapper.get(socket);
  }
  
  S computerID(SocketChannel socket) {
    State state = state(socket);
    ret state == null ? null : state.computerID;
  }
  
  *() {}
  *(int *port) {}
  
  void start {
    cI = new ConnectedInstances;
    cI.setAfterglowInterval(1000);
    cI.connected2.onChange(rst);
    cI.start();
    
    openServerSocket();
    
    // restart socket every hour (and once now for testing - XX)
    /*doEvery(10.0, 60*60.0, r {
      pcall { cleanMeUp_socket(); }
      openServerSocket();
    });*/
  }
  
  void openServerSocket ctex {
    print("Opening server socket");
    selector = Selector.open();
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    
    InetSocketAddress sockAddr = new InetSocketAddress(port);
    serverChannel.socket().bind(sockAddr);
    print("Server socket bound");
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
    thread "Server Accept Loop" { acceptLoop(); }
    
    mainTimer = doEvery(updateInterval, r {
      for (SocketChannel socket : keysList(dataMapper)) {
        State state = state(socket);
        if (state != null && sysTime() >= state.lastMessage+timeout) pcall {
          closeSocket(socket);
        }
      }
      for (Channel channel : values(channels)) pcall {
        lock channel.lock;
        channel.update();
      }
    });
  }
  
  void cleanMeUp_socket() ctex {
    if (serverChannel != null)
      serverChannel.close();
    for (SocketChannel socket : keys(getAndClearMap(dataMapper))) pcall {
      socket.close();
    }
  }
  
  void acceptLoop ctex {
    while licensed {
      // wait for events
      selector.select();
  
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
      while (keys.hasNext()) {
        SelectionKey key = keys.next();
        keys.remove();
        debug "have key";
        
        if (!key.isValid())
          continue with debug "key not valid";
          
        pcall {
          if (key.isAcceptable()) {
            debug "Key is acceptable";
            acceptOnSocket(key);
          } else if (key.isReadable()) {
            debug "Key is readable";
            readFromSocket(key);
          } else if (key.isWritable()) {
            debug "Key is writable";
            writeToSocket(key);
          } else
            debug "Unknown key type";
        }
      }
    }
  }
  
  //accept a connection made to this channel's socket
  void acceptOnSocket(SelectionKey key) throws IOException {
    ServerSocketChannel serverChannel = cast key.channel();
    SocketChannel channel = serverChannel.accept();
    channel.configureBlocking(false);
    Socket actualSocket = channel.socket();
    SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
    print("Connected to: " + remoteAddr);
    
    // register channel with selector for further IO
    dataMapper.put(channel, new State);
    channel.register(selector, SelectionKey.OP_READ);
  }
  
  void onSocketClose(SocketChannel socket) {
    Socket actualSocket = socket.socket();
    SocketAddress remoteAddr = actualSocket.getRemoteSocketAddress();
    print("Connection closed by client: " + remoteAddr);
    State state = state(socket);
    if (state != null && state.computerID != null)
      cI.lostConnection(state.computerID);
    dataMapper.remove(socket);
    removeFromAllChannels(socket);
  }
  
  void closeSocket(SocketChannel socket) {
    pcall { socket.close(); }
    pcall { onSocketClose(socket); }
  }
  
  void readFromSocket(SelectionKey key) throws IOException {
    final SocketChannel socket = cast key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int numRead = socket.read(buffer);
    if (numRead == -1) {
      closeSocket(socket);
      key.cancel();
      ret;
    }
  
    S s = new String(buffer.array(), 0, numRead);
    //print("Got data from client: " + s);
    final State state = state(socket);
    if (state != null) {
      state.lastMessage = sysNow();
      state.lineBuffer.append(s, voidfunc(S line) {
        //if (nempty(line)) print("Got line: " + line);
        pcall { handleLine(line, socket, state); }
      });
      if (state.lineBuffer.size() > maxLineLength) {
        print("Bad client (line too long");
        closeSocket(socket);
      }
    }
  }
  
  void writeToSocket(SelectionKey key) throws IOException {
    final SocketChannel socket = cast key.channel();
    socket.register(selector, SelectionKey.OP_READ);
    State state = state(socket);
    lock state.lock;
    if (empty(state.toSend)) ret;
    byte[] data = popFirst(state.toSend);
    print("Sending postponed data (" + n2(l(data), "byte") + ") to " + state.computerID);
    sendBytesToSocket(socket, data);
  }
  
  void sendBytesToSocket(SocketChannel socket, byte[] bytes) throws IOException {
    int length = l(bytes);
    State state = state(socket);
    lock state.lock;
    if (nempty(state.toSend)) {
      print("Postponing send to " + state.computerID + " because data already queued");
      ret with state.toSend.add(bytes);
    }
    ByteBuffer buf = ByteBuffer.wrap(bytes);
    int n = socket.write(buf);
    if (n < length) {
      byte[] remaining = subByteArray(bytes, n);
      state.toSend.add(remaining);
      print("Postponing sending " + n2(l(remaining), "byte") + " to " + state.computerID);
      socket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
  }
  
  void handleLine(S line, SocketChannel socket, State state) {
    new Matches m;
    
    vmBus_send oic_gotLine(this, line, socket, state);
    pcallFAll(onLineReceived, line, socket, state);
    
    if (empty(line)) ret with sendLineToSocket(socket, ""); // idle

    if (jmatch("computerID=*", line, m)) {
      if (state.computerID != null || !possibleComputerID($1)) ret;
      print("Got computer ID: " + $1);
      state.computerID = $1;
      cI.gotConnection(state.computerID);
      ret;
    }
    
    if (jmatch("sub *", line, m)) {
      Channel channel = getChannelForSocket(socket, $1);
      if (channel == null)
        sendLineToSocket(socket, "Channel " + $1 + " not found");
      else
        sendLineToSocket(socket, "Subbed " + $1);
      ret;
    }
    
    L<S> tok = javaTok(line);
    if (eq(get(tok, 3), ":")) {
      Channel channel = state.subbedChannels.get(get(tok, 1));
      if (channel != null)
        channel.handleLine(joinSubList(tok, 5), socket);
    }
    
    // ignore unknown line
  }
  
  void sendLineToSocket(SocketChannel socket, S s) ctex {
    byte[] bytes = toUtf8(s + "\n");
    sendBytesToSocket(socket, bytes);
  }
  
  void removeFromAllChannels(SocketChannel socket) {
    lock channelsLock;
    for (S name, Channel c : cloneMap(channels)) { pcall {
      c.removeSocket(socket);
      if (empty(c.sockets)) {
        print("Deleting channel: " + name);
        channels.remove(name);
      }
    }}
  }
  
  Channel getChannelForSocket(SocketChannel socket, S name) {
    lock channelsLock;
    Channel c = getChannel(name);
    c.addSocket(socket);
    state(socket).subbedChannels.put(name, c);
    ret c;
  }
  
  Channel getChannel(S name) {
    lock channelsLock;
    Channel c = channels.get(name);
    if (c == null) {
      c = makeChannel(name);
      c.name = name;
      if (c == null) null;
      print("New channel: " + name);
      channels.put(name, c);
      lock c.lock;
      c.update();
    }
    ret c;
  }
  Channel makeChannel(S name) {
    if (eq(name, "computerIDs")) ret new ComputerIDsChannel;
    if (eq(name, "computerCount")) ret new ComputerCountChannel;
    if (eq(name, "chat")) ret new PublicChatChannel;
    if (eq(name, "machineChat")) ret new PublicChatChannel;
    if (eq(name, "privateChat")) ret new PrivateChatChannel;
    if (eq(name, "privateMachineChat")) ret setAll(new PrivateChatChannel, prefix := name + ":");
    if (eq(name, "snippetUpdates")) ret new SnippetUpdatesChannel;
    if (eq(name, "voiceOutput")) ret new PublicChatChannel;
      
    ret null with print("Unknown channel: " + name);
  }
  
  void update {
    print("OS Instances update");
    if (empty(dataMapper)) ret;
  
    int value = cI.value();
    //if (value == theNumber) ret;
    theNumber = value;
    print("OS Instances update done");
  }
}

Author comment

Began life as a copy of #1016572

download  show line numbers  debug dex   

Travelled to 2 computer(s): mqqgnosmbjvj, xrpafgyirdlv

No comments. add comment

Snippet ID: #1027046
Snippet name: OSInstancesConnector as class [NIO Socket]
Eternal ID of this version: #1027046/13
Text MD5: 63d8c2ab4c15ac0d6f39c7edec70e203
Transpilation MD5: 4d18d9ac7057f7dd3cc980e5188e9cba
Author: stefan
Category: javax / networking
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2020-02-15 15:19:00
Source code size: 13398 bytes / 445 lines
Pitched / IR pitched: No / No
Views / Downloads: 28 / 76
Version history: 12 change(s)
Referenced in: [show references]

Formerly at http://tinybrain.de/1027046 & http://1027046.tinybrain.de