CIS 720 – Fall 2012

Job Objects and Thread Pool

Notes from Mizuno paper

1.    Each job is implemented in an execute() method in a job class

2.    The job class implements a job interface called GenericJob

3.    Each request is an instance of a job object that is placed in the thread pool

4.    When a thread in the thread pool is free, it is assigned a job object and executes its execute() method.

5.    At execute() termination, the job is deleted and the thread is returned to the threadpool.

6.    Threadpool maintains a queue of ready jobs of type GenericJob, called a readyQueue.

Threadpool methods:

Void addJob(GenericJob, job): adds job in ReadyQueue and wakes up a job thread if there is any sleeping one

public synchronized void addJob(GenericJobjob){

readyQueue.add(job); notify();

}

GenericJob getJob(void): returns one ready job found in readyQueue . If there is no ready job in the queue, it blocks the calling thread.

private synchronized GenericJob getJob() {

  while(readyQueue.isEmpty()) try {wait();}

           catch (Interrupted Exception e) {}

  return(GenericJob) readyQueue.removeFirst();

}

 

Body of threads managed by the thread pool is:

 

while(true) {(threadPool.getJob()).execute();}

 

Synchronization method and job object

·       waitQueue is a queue of GenericJob objects

·       if the synchronization condition is satisfied, the synchronization method returns true.

·       If not true, the job is placed into the waitQueue and returns false

·       When jobs are runnable (at execute()), they are moved from the waitQueue to the readyQueue.

·       Execution is in phases; phase is incremented by each time it passes the synchronization method.

·       The body of execute();

Switch(phase){

  Case 0: Sequential code for phase 0;

  If(! synchronizationMethod(this)) {phase =1; return;} //job is blocked

  Case 1:

 …….

        }

}

Note: return statement is to the body of the thread while(true) {(threadPool.get()).execute();}

 

Thread per request synchronization with barrier

 

Text Box: Time
 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


Synchronization

Thread per (job) Request

 

arrived[id] = true; Boolean go=true;

for(i=0; i<PR;i++) if (!arrived[i]) {go = false; break;}

if(go) {for(i=0; i<PR; i++) arrived[i]= false; notifyAll();}

else {try{wait();} catch(interruptedException e){}}

 

Thread Pool Implementation

·       notifyAll() replaced by moving jobs from waitQueue to readyQueue.

·       wait() replaced by placing the job object in waitQueue and returning false (when wait() is not called, it returns true).

 

arrived[i] =true; Boolean go = true;

for (i=0; i<PR;i++) if (!arrived[i]) {go = false; break;}

if(go){

  for(i=0; i<PR; i++) arrived[i]=false;

  while(!waitQueue.isEmpty()) threadPool.addJob((GenericJob)waitQueue.removeFirst());

  return true;

}else {waitQueue.add(job); return false; }