Not logged in.  Login/Logout/Register | List snippets | | Create snippet | Upload image | Upload data

201
LINES

< > BotCompany Repo | #1033401 // ThreadPool [seems to work]

JavaX fragment (include) [tags: use-pretranspiled]

Libraryless. Click here for Pure Java version (6781L/38K).

1  
// The idea is to leave max as the actual number of cores the system
2  
// has (numberOfCores()), and in case of being fully booked, raise an
3  
// alert (customerMustWaitAlert) which can be handled by a strategy
4  
// object (different reactions are possible).
5  
// If nothing is done in such an event, clients are processed serially
6  
// (no guarantees of order), split up among the available threads.
7  
8  
/* SYNChronisation order:
9  
      1. PooledThread
10  
      2. ThreadPool */
11  
12  
sclass ThreadPool is AutoCloseable {
13  
  int max = numberOfCores();
14  
  new L<PooledThread> all;
15  
  new Set<PooledThread> used;
16  
  new Set<PooledThread> free;
17  
  bool verbose, retired;
18  
  
19  
  // our own ping surce so we can start threads & keep them running
20  
  class InternalPingSource extends PingSource {}
21  
  new InternalPingSource internalPingSource;
22  
  
23  
  new MultiSleeper sleeper;
24  
  
25  
  *() {}
26  
  *(int *max) {}
27  
  
28  
  synchronized int maxSize() { ret max; }
29  
  synchronized int total() { ret l(used)+l(free); }
30  
  
31  
  event customerMustWaitAlert;
32  
  
33  
  void fireCustomerMustWaitAlert {
34  
    vmBus_send customerMustWaitAlert(this, currentThread());
35  
    customerMustWaitAlert();
36  
  }
37  
38  
  // DOESN'T WAIT. adds action to a thread's queue if nothing is
39  
  // available immediately.
40  
  PooledThread acquireThreadOrQueue(Runnable action) {
41  
    if (action == null) null;
42  
    PooledThread t;
43  
    synchronized {
44  
      if (_hasFreeAfterCreating()) {
45  
        t = _firstFreeThread();
46  
        markUsed(t);
47  
      } else
48  
        t = _anyThread();
49  
    }
50  
    
51  
    t.addWork(action); // will move it from free to used
52  
    ret t;
53  
  }
54  
  
55  
  // run in synchronized block
56  
  bool _hasFreeAfterCreating() {
57  
    checkNotRetired();
58  
    if (nempty(free)) true;
59  
    if (total() < max) {
60  
      PooledThread t = newThread();
61  
      all.add(t);
62  
      free.add(t);
63  
      true;
64  
    }
65  
    false;
66  
  }
67  
  
68  
  // WAITS until thread is available
69  
  PooledThread acquireThreadOrWait(Runnable action) ctex {
70  
    if (action == null) null;
71  
    PooledThread t;
72  
    while true {
73  
      synchronized {
74  
        if (_hasFreeAfterCreating()) {
75  
          t = _firstFreeThread();
76  
          break;
77  
        } else
78  
          _waitWaitWait();
79  
      }
80  
    }
81  
    t.addWork(action);
82  
    ret t;
83  
  }
84  
  
85  
  PooledThread _firstFreeThread() {
86  
    ret first(free);
87  
  }
88  
  
89  
  PooledThread _anyThread() {
90  
    ret random(used);
91  
  }
92  
  
93  
  class PooledThread extends Thread {
94  
    *(S name) { super(name); }
95  
    
96  
    AppendableChain<Runnable> q;
97  
    
98  
    synchronized Runnable _grabWorkOrSleep() ctex {
99  
      Runnable r = first(q);
100  
      if (r == null) {
101  
        markFree(this);
102  
        if (verbose) print("Thread sleeps");
103  
        synchronized { wait(); }
104  
        if (verbose) print("Thread woke up");
105  
        null;
106  
      }
107  
      q = popFirst(q);
108  
      ret r;
109  
    }
110  
    
111  
    run {
112  
      pingSource_tl().set(internalPingSource);
113  
      while ping (!retired()) {
114  
        Runnable r = _grabWorkOrSleep();
115  
        if (verbose) print(this + " work: " + r);
116  
        if (r != null)
117  
          try {
118  
            if (verbose) print(this + " running: " + r);
119  
            r.run();
120  
            pingSource_tl().set(internalPingSource);
121  
            if (verbose) print(this + " done");
122  
          } catch e {
123  
            pingSource_tl().set(internalPingSource);
124  
            if (verbose) print(this + " error");
125  
            printStackTrace(e);
126  
          } finally {
127  
            pingSource_tl().set(internalPingSource);          
128  
            if (verbose) print("ThreadPool finally");
129  
          }
130  
      }
131  
    }
132  
    
133  
    synchronized bool isEmpty() { ret empty(q); }
134  
    
135  
    // append to q (do later)
136  
    void addWork(Runnable r) {
137  
      if (verbose) print("Added work to " + this + ": " + r);
138  
      synchronized {
139  
        q = chainPlus(q, r);
140  
        notifyAll();
141  
      }
142  
    }  
143  
  }
144  
  
145  
  PooledThread newThread() {
146  
    PooledThread t = new("Thread Pool Inhabitant " + n2(total()+1));
147  
    t.start();
148  
    ret t;
149  
  }
150  
  
151  
  synchronized void markFree(PooledThread t) {
152  
    used.remove(t);
153  
    free.add(t);
154  
    notifyAll();
155  
  }
156  
  
157  
  synchronized void markUsed(PooledThread t) {
158  
    free.remove(t);
159  
    used.add(t);
160  
  }
161  
  
162  
  synchronized toString {
163  
    ret retired()
164  
      ? "Retired ThreadPool"
165  
      : "ThreadPool " + roundBracket(commaCombine(
166  
        n2(used) + " used out of " + n2(total()),
167  
        max <= total() ? null : "could grow to " + n2(max));
168  
  }
169  
  
170  
  synchronized bool retired() { ret retired; }
171  
  synchronized void retire() {
172  
    if (verbose) print("ThreadPool Retiring");
173  
    set retired;
174  
    for (thread : free) syncNotifyAll(thread); // wake it up so it exits
175  
  }
176  
  void checkNotRetired {   
177  
    if (retired()) fail("retired");
178  
  }
179  
 
180  
  // We could do a soft-close here (stop the idle threads, let running threads finish, then end those too, stop accepting new orders)
181  
  // or a hard close (interrupt all threads, stop accepting new orders)
182  
  synchronized close {
183  
    retire();
184  
  }
185  
  
186  
  // run in synchronized block
187  
  void _waitWaitWait ctex {
188  
    do {
189  
      fireCustomerMustWaitAlert();
190  
      wait();
191  
      checkNotRetired();
192  
    } while (empty(free));
193  
  }
194  
  
195  
  void do(S text, Runnable r) {
196  
    if (r == null) ret;
197  
    new PingSource(this, text).do(r);
198  
  }
199  
  
200  
  ISleeper_v2 sleeper() { ret sleeper; }
201  
}

download  show line numbers  debug dex  old transpilations   

Travelled to 4 computer(s): bhatertpkbcr, ekrmjmnbrukm, mowyntqkapby, mqqgnosmbjvj

No comments. add comment

Snippet ID: #1033401
Snippet name: ThreadPool [seems to work]
Eternal ID of this version: #1033401/70
Text MD5: 7cc4fc4e1a67180c764962065ad26535
Transpilation MD5: 768813cb471ab25d49ef62edf67787c1
Author: stefan
Category: javax / code execution
Type: JavaX fragment (include)
Public (visible to everyone): Yes
Archived (hidden from active list): No
Created/modified: 2021-11-03 17:07:18
Source code size: 5365 bytes / 201 lines
Pitched / IR pitched: No / No
Views / Downloads: 248 / 528
Version history: 69 change(s)
Referenced in: [show references]