Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

273
LINES

< > BotCompany Repo | #1032715 // StandaloneHttpProxy [OK]

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (11966L/67K).

sclass StandaloneHttpProxy implements AutoCloseable {
  bool debug;
  bool serveLocalhostOnly; // IMPORTANT if you need it!
  bool sendXProying = true;
  int port = 8443;
  bool rewriteHostHeader = true; // Send a modified Host header
  int upBufSize = 1; // otherwise POST seems to break...
  int downBufSize = 1024;
  
  Set<S> blockedIPs = syncSet();
  
  // set or override this!
  swappable HostAndPort forwardServerAndPort(Request req) { null; }
  
  // can set or override this for https
  swappable ServerSocket makeServerSocket() {
    ctex { ret new ServerSocket(port); }
  }
  
  // internal
  Thread acceptThread;
  ServerSocket serverSocket;

  void start() throws IOException {
    serverSocket = makeServerSocket();

    acceptThread = startThread("HTTP Proxy Accept Port " + port, r acceptLoop);
    print("HTTP proxy on port " + port + " started."); 
  }
  
  void acceptLoop() throws IOException {
    while (true) {
      try {
        Socket s = serverSocket.accept();
        new Request(s).run();
      } catch (SocketTimeoutException e) {
      }
    }
  }

  swappable S rewriteHeaderLine(S line) { ret line; }
  
  svoid headerSend(OutputStream out, S header) ctex {
    print("Sending header: " + header);
    out.write((header + "\r\n").getBytes("US-ASCII"));
  }
  
  close { dispose serverSocket; }
  
  class Request is AutoCloseable {
    Socket s; // client socket
    S client;
    bool blocked;
    Socket outSocket;
    
    // TODO: Values may be inaccurate when there is stream rewriting
    //   by overriding outProxied and/or outOutProxied.
    new AtomicLong transmitted;
    new AtomicLong outTransmitted;
    
    S threadName = "StandaloneHttpProxy";
    Thread thread, inThread, outThread;
    OutputStream outDirect, outOutDirect;
    OutputStream outProxied, outOutProxied;
    InputStream in, outIn;
    MultiMap<S> incomingHeaders = ciMultiMap();
    bool forwardedFor;
    S host;
    new ByteArrayOutputStream headersOut;
    bool headersRead;
    new CloseablesHolder resources;
    
    *(Socket *s) {}
    *(Socket *s, InputStream *in, OutputStream out) { setOut(out); }
    
    swappable void setOut(OutputStream out) {
      outDirect = outProxied = out;
    }
    
    swappable void setOutOut(OutputStream outOut) {
      outOutDirect = outOutProxied = outOut;
    }
    
    run {
      try {
        client = dropPrefix("/", s.getInetAddress().toString()); // Hmm... how to get the actual IP properly?
        
        if (serveLocalhostOnly && !socketIsLocalhost(s)) {
          print("Dropping non-localhost connection from " + client); 
          ret with close();
        }
        
        threadName = "PROXY: Handling client " + client;
          
        if (blocked = blockedIPs.contains(client)) {
          print("Blocked IP!! " + client);
          ret with close();
        }
  
        run2();
      } on fail e {
        print("Closing because of: " + e);
        close();
      }
    } // Request.run
    
    void run2 {
      try {
        thread = startDaemonThread(threadName, r handleIt_impl);
      } on fail e {
        print("Closing because of: " + e);
        close();
      }
    }
    
    void handleIt_impl() throws IOException {
      try {
        print("HTTP Proxy Incoming!");
        
        if (outDirect == null) setOut(s.getOutputStream());
        if (in == null) in = s.getInputStream();
        
        // collect headers
        
        if (!headersRead) while (true) {
          S line = readHttpHeaderLine(in);
          if (line == null) ret;
          print("Header line read: " + quote(line));
          
          int i = indexOf(line, ':');
          if (i >= 0)
            incomingHeaders.put(trim(takeFirst(i, line)), trim(dropFirst(i+1, line)));
          
          bool isHostLine = swic(line, "Host:");
          if (isHostLine)
            host = dropPrefixIgnoreCase("Host:", line).trim();
            
          if (!isHostLine || !rewriteHostHeader) {
            S rewritten;
            if (startsWithIgnoreCase(line, "X-Forwarded-For:")) {
              forwardedFor = true;
              rewritten = line + ", " + client;
            } else
              rewritten = empty(line) ? line : rewriteHeaderLine(line);
            if (!eq(rewritten, line))
              print("Rewritten as: " + quote(rewritten));
            if (nempty(rewritten)) {
              byte[] bytes = (rewritten + "\r\n").getBytes("US-ASCII");
              //print("Sending: " + bytesToHex(bytes));
              headersOut.write(bytes);
            }
          }
          
          if (l(line) == 0)
            break;
        }
        
        HostAndPort hap = forwardServerAndPort(this);
        if (hap == null || empty(hap.host) || hap.port == 0)
          ret with print("Can't forward request");
          
        S forwardServer = hap.host;  
        int port = hap.port;
        
        outSocket = new Socket(forwardServer, port);
        print("Connected to " + forwardServer + ":" + port);
        outOutDirect = outOutProxied = outSocket.getOutputStream();
        outIn = outSocket.getInputStream();
        
        byte[] headerBytes = headersOut.toByteArray();
        if (containsLine(fromUtf8(headerBytes), "X-Proxying: yes"))
          ret with print("Won't double-proxy");
          
        outOutProxied.write(headerBytes);
        
        if (!forwardedFor)
          headerSend(outOutProxied, "X-Forwarded-For: " + client);
          
        if (rewriteHostHeader)
          headerSend(outOutProxied, "Host: " + addPortToHost_drop80(host, port));
        
        if (sendXProying)
          headerSend(outOutProxied, "X-Proxying: yes");
          
        headerSend(outOutProxied, "X-OriginalPort: " + s.getLocalPort());
        
        headerSend(outOutProxied, ""); // end of headers
          
        outOutProxied.flush();
        
        // forward content in both directions
        
        inThread = startThread("Proxy In", r proxyIn_impl);
        outThread = startThread("Proxy Out", r proxyOut_impl);
        
        joinThreads(inThread, outThread);
      } catch (Exception e) {
        if (isIOException(e))
          print("[internal] " + e);
        else
          print("stack exit: " + renderStackTrace(e));
      } finally {
        print("Request done");
        close();
      }
    } // end of handleIt_impl
    
    void proxyIn_impl() throws IOException {
      try {
        byte[] buf = new[downBufSize];
        while (true) {
          int n = outIn.read(buf);
          if (n <= 0) ret with print("proxyIn: n=" + n);
          outProxied.write(buf, 0, n);
          long t = transmitted.addAndGet(n);
          if (t % 100000 == 0) print("Transmitted to client: " + t);
        }
      } finally {
        print("Server closed stream, shutting down client output.");
        s.shutdownOutput();
      }
    }

    void proxyOut_impl() throws IOException {
      try {
        print("Proxy Out started.");
        byte[] buf = new[upBufSize];
        while (true) {
          int n = in.read(buf);
          if (n <= 0) ret with print("proxyOut: n=" + n);
          outOutProxied.write(buf, 0, n);
          if (outTransmitted.get() == 0)
            print("Proxy Out first byte!");
          long t = outTransmitted.addAndGet(n);
          if (t % (debug ? 10 : 100000) == 0) print("Transmitted to server: " + t);
        }
      } finally {
        print("Client closed stream, shutting down server output.");
        outSocket.shutdownOutput();
      }
    }
    
    close {
      if (s == null && outSocket == null) ret;
      
      print("Proxy request done, got " + transmitted! + ", sent " + outTransmitted! + " + headers");
      dispose s;
      dispose outSocket;
      dispose resources;
    }
  } // end of Request
  
  Request takeOverIncomingSocket(S host, Socket socket, InputStream in, OutputStream out, byte[] headers) throws IOException {
    ret takeOverIncomingSocket(host, socket, in, out, null);
  }

  // rewriteHostHeader should be false when you use this method  
  Request takeOverIncomingSocket(S host, Socket socket, InputStream in, OutputStream out, byte[] headers, IVF1<Request> customize) throws IOException {
    Request r = new(socket, in, out);
    r.host = host;
    r.headersOut.write(headers);
    
    // various flags to read no more headers
    // & send just the headers we received
    r.headersRead = true; 
    r.forwardedFor = true;
    
    callF(customize, r);
    
    r.run2();
    ret r;
  }
}

Author comment

Began life as a copy of #1002822

download  show line numbers  debug dex  old transpilations   

Travelled to 3 computer(s): bhatertpkbcr, mowyntqkapby, mqqgnosmbjvj

No comments. add comment

Snippet ID: #1032715
Snippet name: StandaloneHttpProxy [OK]
Eternal ID of this version: #1032715/52
Text MD5: ea202c43c33db8e9a78171af87897323
Transpilation MD5: 49cd695426a9957a10a8574a7dad8702
Author: stefan
Category: javax / http
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2023-04-17 19:37:13
Source code size: 8788 bytes / 273 lines
Pitched / IR pitched: No / No
Views / Downloads: 241 / 526
Version history: 51 change(s)
Referenced in: [show references]