CIS 720 – Fall 2012
Job Objects and Thread
Pool
Notes from Mizuno paper
1. Each job is implemented in an
execute() method in a job class
2. The job class implements a job
interface called GenericJob
3. Each request is an instance of a job object
that is placed in the thread pool
4. When a thread in the thread pool is
free, it is assigned a job object and executes its execute()
method.
5. At execute()
termination, the job is deleted and the thread is returned to the threadpool.
6. Threadpool maintains a queue of ready jobs of
type GenericJob, called a readyQueue.
Threadpool methods:
Void addJob(GenericJob, job): adds job in ReadyQueue
and wakes up a job thread if there is any sleeping one
public synchronized void addJob(GenericJobjob){
readyQueue.add(job); notify();
}
GenericJob getJob(void):
returns one ready job found in readyQueue . If there
is no ready job in the queue, it blocks the calling thread.
private synchronized GenericJob getJob() {
while(readyQueue.isEmpty()) try {wait();}
catch (Interrupted Exception e) {}
return(GenericJob)
readyQueue.removeFirst();
}
Body of threads managed by the thread
pool is:
while(true) {(threadPool.getJob()).execute();}
Synchronization method and job object
· waitQueue is a queue of GenericJob
objects
· if the synchronization condition is
satisfied, the synchronization method returns true.
· If not true, the job is placed into
the waitQueue and returns false
· When jobs are runnable (at execute()), they are moved from the waitQueue
to the readyQueue.
· Execution is in phases; phase is
incremented by each time it passes the synchronization method.
· The body of execute();
Switch(phase){
Case 0: Sequential code for phase
0;
If(! synchronizationMethod(this))
{phase =1; return;} //job is blocked
Case 1:
…….
}
}
Note: return statement is to the body of the thread while(true) {(threadPool.get()).execute();}
Thread per request synchronization with barrier
Synchronization
Thread per (job) Request
arrived[id] = true; Boolean go=true;
for(i=0; i<PR;i++)
if (!arrived[i]) {go = false;
break;}
if(go) {for(i=0; i<PR; i++)
arrived[i]= false; notifyAll();}
else {try{wait();} catch(interruptedException e){}}
Thread Pool Implementation
· notifyAll() replaced by moving jobs from waitQueue to readyQueue.
· wait() replaced
by placing the job object in waitQueue and returning
false (when wait() is not called, it returns true).
arrived[i] =true; Boolean go = true;
for (i=0; i<PR;i++) if (!arrived[i]) {go = false; break;}
if(go){
for(i=0; i<PR; i++)
arrived[i]=false;
while(!waitQueue.isEmpty())
threadPool.addJob((GenericJob)waitQueue.removeFirst());
return true;
}else {waitQueue.add(job);
return false; }