static long webSocketTimeOut = 60000; // can be changed per WebSocket static O eleu_webSocket_onMessage, eleu_webSocket_new; static O eleu_webSocket_handler; static Set webSockets = synchroHashSet(); static new AtomicLong webSocketPings; static new AtomicLong webSocketPongs; svoid eleu_webSocket_init { if (nempty(webServerPorts)) pcall { //serveHttp(webServerPort); //serveHttpWithWebsockets(webServerPort, eleu_webSocket_handler()); //serveHttpWithWebsockets_server.printServes = false; serveHttpWithWebsockets_multiplePorts(eleu_webSocket_handler(), webServerPorts); } doEvery_daemon(5000, r cleanWebSockets); } sclass MyWebSocket extends WebSocket { L msgs = synchroList(); volatile long lastMessage = sysNow(); S botID; WeakReference bot; long timeout = webSocketTimeOut; *(NanoHTTPD.IHTTPSession handshake) { super(handshake); } public void sendFrame(WebSocketFrame frame) throws IOException { frame.write(out); // assume write worked? lastMessage = sysNow(); } protected void onPong(WebSocketFrame pongFrame) { // don't use WebSocket ping/pong - it doesn't work } protected void onMessage(WebSocketFrame messageFrame) { //print("websocket msg: " + messageFrame.getTextPayload()); lastMessage = sysNow(); S s = messageFrame.getTextPayload(); if (eq(s, "ping")) incAtomicLong(webSocketPongs); pcall { if (botID != null) call(getBot(botID), 'onWebSocketMessage, this, s); else { msgs.add(s); callF(eleu_webSocket_onMessage, this, s); } } } protected void onClose(WebSocketFrame.CloseCode code, String reason, boolean initiatedByRemote) { //print("websocket close"); _removeMe(); } protected void onException(IOException e) { printStackTrace(e); } void _removeMe() { webSockets.remove(this); if (botID != null) pcall(getBot(botID), 'onWebSocketClosed, this); } void closeMe() ctex { _removeMe(); close(WebSocketFrame.CloseCode.NormalClosure, ""); } void clean() { if (timeout > 0 && sysNow() >= lastMessage + timeout) { print("Timing out web socket for bot " + botID); _removeMe(); pcall { close(WebSocketFrame.CloseCode.NormalClosure, "timeout"); } } } } static O eleu_webSocket_handler() { if (eleu_webSocket_handler == null) eleu_webSocket_handler = func(final NanoHTTPD.IHTTPSession handshake) { MyWebSocket ws = new(handshake); S uri = handshake.getUri(); print("WebSocket URI: " + uri); webSockets.add(ws); bool dispatched = false; try { uri = dropPrefix("/", uri); int i = smartIndexOf(uri, '/'); S botID = takeFirst(i, uri); if (isInteger(botID)) { botID = fsI(botID); dispatched = true; O bot = getBot(botID); if (bot == null) fail("Bot not found: " + botID); ws.bot = new WeakReference(bot); call(bot, 'onNewWebSocket, ws, or2(substring(uri, i), "/")); print("WebSocket dispatched to " + botID); ws.botID = botID; } } catch e { printException(e); ws.closeMe(); null; } if (!dispatched) pcallF(eleu_webSocket_new, ws); ret ws; }; ret eleu_webSocket_handler; } static void cleanWebSockets() { for (MyWebSocket ws : cloneList(webSockets)) ws.clean(); }