Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

201
LINES

< > BotCompany Repo | #1033401 // ThreadPool [seems to work]

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (6781L/38K).

// The idea is to leave max as the actual number of cores the system
// has (numberOfCores()), and in case of being fully booked, raise an
// alert (customerMustWaitAlert) which can be handled by a strategy
// object (different reactions are possible).
// If nothing is done in such an event, clients are processed serially
// (no guarantees of order), split up among the available threads.

/* SYNChronisation order:
      1. PooledThread
      2. ThreadPool */

sclass ThreadPool is AutoCloseable {
  int max = numberOfCores();
  new L<PooledThread> all;
  new Set<PooledThread> used;
  new Set<PooledThread> free;
  bool verbose, retired;
  
  // our own ping surce so we can start threads & keep them running
  class InternalPingSource extends PingSource {}
  new InternalPingSource internalPingSource;
  
  new MultiSleeper sleeper;
  
  *() {}
  *(int *max) {}
  
  synchronized int maxSize() { ret max; }
  synchronized int total() { ret l(used)+l(free); }
  
  event customerMustWaitAlert;
  
  void fireCustomerMustWaitAlert {
    vmBus_send customerMustWaitAlert(this, currentThread());
    customerMustWaitAlert();
  }

  // DOESN'T WAIT. adds action to a thread's queue if nothing is
  // available immediately.
  PooledThread acquireThreadOrQueue(Runnable action) {
    if (action == null) null;
    PooledThread t;
    synchronized {
      if (_hasFreeAfterCreating()) {
        t = _firstFreeThread();
        markUsed(t);
      } else
        t = _anyThread();
    }
    
    t.addWork(action); // will move it from free to used
    ret t;
  }
  
  // run in synchronized block
  bool _hasFreeAfterCreating() {
    checkNotRetired();
    if (nempty(free)) true;
    if (total() < max) {
      PooledThread t = newThread();
      all.add(t);
      free.add(t);
      true;
    }
    false;
  }
  
  // WAITS until thread is available
  PooledThread acquireThreadOrWait(Runnable action) ctex {
    if (action == null) null;
    PooledThread t;
    while true {
      synchronized {
        if (_hasFreeAfterCreating()) {
          t = _firstFreeThread();
          break;
        } else
          _waitWaitWait();
      }
    }
    t.addWork(action);
    ret t;
  }
  
  PooledThread _firstFreeThread() {
    ret first(free);
  }
  
  PooledThread _anyThread() {
    ret random(used);
  }
  
  class PooledThread extends Thread {
    *(S name) { super(name); }
    
    AppendableChain<Runnable> q;
    
    synchronized Runnable _grabWorkOrSleep() ctex {
      Runnable r = first(q);
      if (r == null) {
        markFree(this);
        if (verbose) print("Thread sleeps");
        synchronized { wait(); }
        if (verbose) print("Thread woke up");
        null;
      }
      q = popFirst(q);
      ret r;
    }
    
    run {
      pingSource_tl().set(internalPingSource);
      while ping (!retired()) {
        Runnable r = _grabWorkOrSleep();
        if (verbose) print(this + " work: " + r);
        if (r != null)
          try {
            if (verbose) print(this + " running: " + r);
            r.run();
            pingSource_tl().set(internalPingSource);
            if (verbose) print(this + " done");
          } catch e {
            pingSource_tl().set(internalPingSource);
            if (verbose) print(this + " error");
            printStackTrace(e);
          } finally {
            pingSource_tl().set(internalPingSource);          
            if (verbose) print("ThreadPool finally");
          }
      }
    }
    
    synchronized bool isEmpty() { ret empty(q); }
    
    // append to q (do later)
    void addWork(Runnable r) {
      if (verbose) print("Added work to " + this + ": " + r);
      synchronized {
        q = chainPlus(q, r);
        notifyAll();
      }
    }  
  }
  
  PooledThread newThread() {
    PooledThread t = new("Thread Pool Inhabitant " + n2(total()+1));
    t.start();
    ret t;
  }
  
  synchronized void markFree(PooledThread t) {
    used.remove(t);
    free.add(t);
    notifyAll();
  }
  
  synchronized void markUsed(PooledThread t) {
    free.remove(t);
    used.add(t);
  }
  
  synchronized toString {
    ret retired()
      ? "Retired ThreadPool"
      : "ThreadPool " + roundBracket(commaCombine(
        n2(used) + " used out of " + n2(total()),
        max <= total() ? null : "could grow to " + n2(max));
  }
  
  synchronized bool retired() { ret retired; }
  synchronized void retire() {
    if (verbose) print("ThreadPool Retiring");
    set retired;
    for (thread : free) syncNotifyAll(thread); // wake it up so it exits
  }
  void checkNotRetired {   
    if (retired()) fail("retired");
  }
 
  // We could do a soft-close here (stop the idle threads, let running threads finish, then end those too, stop accepting new orders)
  // or a hard close (interrupt all threads, stop accepting new orders)
  synchronized close {
    retire();
  }
  
  // run in synchronized block
  void _waitWaitWait ctex {
    do {
      fireCustomerMustWaitAlert();
      wait();
      checkNotRetired();
    } while (empty(free));
  }
  
  void do(S text, Runnable r) {
    if (r == null) ret;
    new PingSource(this, text).do(r);
  }
  
  ISleeper_v2 sleeper() { ret sleeper; }
}

download  show line numbers  debug dex  old transpilations   

Travelled to 4 computer(s): bhatertpkbcr, ekrmjmnbrukm, mowyntqkapby, mqqgnosmbjvj

No comments. add comment

Snippet ID: #1033401
Snippet name: ThreadPool [seems to work]
Eternal ID of this version: #1033401/70
Text MD5: 7cc4fc4e1a67180c764962065ad26535
Transpilation MD5: 768813cb471ab25d49ef62edf67787c1
Author: stefan
Category: javax / code execution
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2021-11-03 17:07:18
Source code size: 5365 bytes / 201 lines
Pitched / IR pitched: No / No
Views / Downloads: 317 / 619
Version history: 69 change(s)
Referenced in: #1033405 - QPool [dev.]
#1034167 - Standard Classes + Interfaces (LIVE, continuation of #1003674)