Parallel Adaptive Quadrature Algorithm
Admin:: var l,r,fl,fr,a,b,area, total:
real
//and other variables to record finished intervals
fl := f(l); fr := f(r);
area := (fl + fr) * (l+r)/2;
//put both left and right intervals into workpool
send workpool(l,r,fl,fr,area);
do //entire area not yet completed
{receive result(a,b,area);
total := total + area;
//record that the area between a and b has been calculated}
od
worker[1:n] :: var a,b,m,fa,fb,fm:real;
var larea, rarea, tarea, diff: real;
//get adjacent intervals from the workpool
do true -> {receive workpool(a,b,fa,fb, tarea);
m:=(a+b)/2; fm := f(m);
//compute difference between the two subintervals
// and the total area using trapezoids
diff := tarea -(larea + rarea);
if diff small then send result(a,b,tarea);
//if difference not small enough, put both left and right
//intervals back into pool
else diff large then {send workpool(a,m,fa,fm,larea);
send workpool(m,b,fm,fb,rarea);}
fi
od
Multiple Channel Workpool
Getwork(me, item)
Compute my channel number in the workpool;
Decrement counter for the channel;
if counter = -worker_group_size then
begin (*my worker group is now idle*)
increment mastercount;
if mastercount = number_of_worker_groups then
{send a termination flag to each worker
process}
for i:= 1 to number_of_worker_groups do
for j:= 1 to worker_group_size do
{put a termination flag
into channel of worker group i}
end;
read a work item from my channel into "item";
Putwork(me, item);
move my pointer to the next channel in workpool;
increment counter for the target channel;
if counter = - worker_group_size +1 then
decrement mastercount; (*idle worker group now active*)
write "item" into the target channel;
Workpool in Java
// Workpool classclass Workpool {
public Workpool(int a, int b, int[] c,
int[] d, int[] e, Channel[] f)
{worker_group_size = a; num_worker_groups = b;
nextchan = c; count = d;
mastercount = e; channel = f;
}
// get an item from the workpool
public int getwork(int me) {
int workcount, emptycount, mychan;
int value;
mychan = me/worker_group_size;
//decrease the # of workers in this group
synchronized(count) {
workcount = count[mychan]-1;
count[mychan] = workcount;
}
if(workcount == -worker_group_size) {//all workers in this group are
idle
synchronized(mastercount) {
emptycount = mastercount + 1;
mastercount = emptycount;
}
// if all workers are done and the channels are empty, send termination
message
if(emptycount == num_worker_groups)
for(int w=0; w<num_worker_groups; w++)
for(int j=0; j<worker_group_size; j++)
channel[w].put(-1);
}
value = channel[mychan].get();
return(value);
}
// put an item in the workpool
public void putwork(int me, int item) {
int workcount, emptycount, next;
next = nextchan[me];
//increase the # of workers in this pool
synchronized(count) {
workcount = count[next]+1;
count[next] = workcount;
}
if(workcount == -worker_group_size + 1) {//make pool active
again
synchronized(mastercount) {
emptycount = mastercount - 1;
mastercount = emptycount;
}
}
channel[next].put(item);
// increment my "next" pointer to point to the next channel in "round
robin" order
nextchan[me] = (next + 1) % num_worker_groups;
}
Example of Multichannel Workpool
Shortest Path Algorithm
// This is a Java program which implements the
shortest path algorithm using a
//replicated workers paradigm and multiple (shared
memory) channels to reduce
//contention. The goal is to establish the shortest
path from a node
//to every other node in a directed graph. The
graph is represented as an adjacency
//matrix whose cell contents are the distance
to each neighbor. Initially, the initial
//node of the graph is placed in a workpool.
Each thread (worker) gets a work item
//from the workpool which represents a node x.
It then calculates the distance to all
//of x's neighbors, using the procedure:
//for w=1..#neighbors do mindist[x] = min(mindist[x],
newdist[x,w]).
//Since this is a shared memory implementation,
all threads have access to shared
//variables, including an array "mindist", where
mindist[i,j] represents the current
//minimum distance from node i to node j, the
array "weight" which records the
//distance between neighboring nodes (this is
the adjacency matrix), and an integer
//"mastercount" which keeps track of the number
of currently active threads. In this
//program, the workpool is implemented using
multiple channels to reduce
//contention. Multiple workers (worker_group_size)
are assigned to a channel.
static final int num_worker_groups = 2;
static final int worker_group_size = 2;
static final int numworkers = num_worker_groups * worker_group_size;
static final int N = 5;
static int Infinity = 32000;
static int startvertex = 0;
public static void main(String[] args) {
int mastercount = 0;
int weight[][] = new int[N][N];
int mindist[] = new int[N];
boolean inflag[] = new boolean[N];
int count[] = new int[num_worker_groups];
int nextchan[] = new int[numworkers];
Channel[] channel = new Channel[num_worker_groups];
Worker[] worker_ = new Worker[numworkers];
Workpool workpool = new Workpool(worker_group_size,num_worker_groups,nextchan,count,mastercount,channel);
// set initial values
for(int j=0; j<N; j++) {
mindist[j] = infinity;
inflag[j] = false;
for(int k=0; k<N; k++)
weight[j][k] = Infinity;
}
//create a channel per worker_group
for(int j=0; j<num_worker_groups; j++)
channel[j] = new Channel();
// put (initial) vertex number 0 in the queue
mindist[startvertex] = 0;
inflag[startvertex] = true;
channel[0].put(startvertex);
count[0]=1;
for(int j=1; j<num_worker_groups; j++)
count[j] = 0;
mastercount = 0;
// initial weight values
weight[0][1] = 4;
weight[0][2] = 8;
weight[1][2] = 3;
weight[1][3] = 1;
weight[2][4] = 5;
weight[3][2] = 2;
weight[3][4] = 10;
// Creating thread (workers)
for (int i=0; i < worker_.length; i++) {
worker_[i] =
new Worker(i,weight,mindist,inflag,Infinity,nextchan,worker_group_size,N,workpool);
worker_[i].start();
System.out.println(i+" thread properly started");
}
// wait for the thread end
try {
for (int i=0; i < integrator.length; i++) {
worker_[i].join();
System.out.println(i+" thread properly
terminated");
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
// print the result
for(int j=0; j<N; j++)
System.out.println(mindist[j]);
}
// thread to compute the shortest path
class Worker extends Thread {
int vertex, newdist,i,infinity,n,worker_group_size;
int weight[][];
int mindist[];
int nextchan[];
boolean inflag[], change_flag=false;
Workpool workpool;
public Worker(int a,int[][] b,int[] c,boolean[]
d,int e,int[] g,int l,int o,Workpool w) {
i = a;
weight = b;
mindist = c;
inflag = d;
infinity = e;
nextchan = g;
worker_group_size = l;
n = 0;
workpool = w;
}
// shortest path algorithm worker thread
public void run ( ) {
//multiple workers taking from 1 channel
nextchan[i] = i / worker_group_size;
// get a new vertex number to examine
vertex = workpool.getwork(i);
while ( vertex != -1) {
// vertex is removed from Work Pool
inflag[vertex] = false;
// consider all outgoing edges of "vertex"
for(int w=0;w<n;w++) {
if(weight[vertex][w]<infinity) {
//is this a neighbor?
// see if this is a shorter path to w
newdist = mindist[vertex]
+ weight[vertex][w];
synchronized(mindist) {
// mindist is shared and must be updated atomically
if(newdist <
mindist[w]) {
mindist[w] = newdist;
change_flag = true;
}
}
if(change_flag) {
//if the value mindist of "w" changed, put it
back in the pool
change_flag = false;
if(!inflag[w]) {
inflag[w] = true;
workpool.putwork(i,w);
}
}
}
} //End for all neighbors
vertex = workpool.getwork(i);
} //End of main while statement
} //End of run
} // End of Worker thread
Replicated Workers in a Distributed Environment
- all acknowledgements handled by the workpool methods
- Putwork Method
*Putwork(vertex, outputdistance)
Increment ackcount;
Build work item consisting of (me, outputdistance);
Write work item into workpool[vertex];
*Getwork Method
If (worker in active state, and work channel empty
and all acknowledgments received) then
Begin
Acknowledge current parent;
Go into Idle State;
End
Repeat
Read new item from Workpool;
If (new item is acknowledgement) then decrement ack.count;
Until work item is received;
If (worker in active state) then
Send acknowledgment to source of work item
Else begin
Go into active state
Set new parent from source of this work item;
End;
Return "distance" part of work item to calling worker;
Implementing the Worker
mindistance:= infinity;
Getwork(trialdistance)
while (not done)
begin
if trialdistance < mindistance then
mindistance := trialdistance;
for all my outgoing neighbors
Putwork (neighbor vertex, mindistance + distance to neighbor)
Getwork(trialdistance);
end
answer := mindistance