Replicated Servers
Lester. Parallel Programming

 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 
 


 
 















































Parallel Adaptive Quadrature Algorithm

chan workpool(a,b,fa,fb,area:real)
chan result(a,b,area:real)

Admin:: var l,r,fl,fr,a,b,area, total: real
//and other variables to record finished intervals

  fl := f(l); fr := f(r);
  area := (fl + fr) * (l+r)/2;

//put both left and right intervals into workpool

  send workpool(l,r,fl,fr,area);
  do //entire area not yet completed
    {receive result(a,b,area);
    total := total + area;
//record that the area between a and b has been calculated}
  od

worker[1:n] :: var a,b,m,fa,fb,fm:real;
var larea, rarea, tarea, diff: real;
//get adjacent intervals from the workpool

do true -> {receive workpool(a,b,fa,fb, tarea);
  m:=(a+b)/2; fm := f(m);

//compute difference between the two subintervals
// and the total area using trapezoids
  diff := tarea -(larea + rarea);
  if diff small then send result(a,b,tarea);
//if difference not small enough, put both left and right
//intervals back into pool
  else diff large then {send workpool(a,m,fa,fm,larea);
  send workpool(m,b,fm,fb,rarea);}
  fi
od
 
 

Multiple Channel Workpool



 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Getwork(me, item)
Compute my channel number in the workpool;
Decrement counter for the channel;
if counter = -worker_group_size then
  begin (*my worker group is now idle*)
    increment mastercount;
    if mastercount = number_of_worker_groups then
      {send a termination flag to each worker process}
    for i:= 1 to number_of_worker_groups do
      for j:= 1 to worker_group_size do
        {put a termination flag into channel of worker group i}
  end;

read a work item from my channel into "item";
Putwork(me, item);
move my pointer to the next channel in workpool;
increment counter for the target channel;
if counter = - worker_group_size +1 then
  decrement mastercount; (*idle worker group now active*)
write "item" into the target channel;
 
 

Workpool in Java

// Workpool class
class Workpool {
  int worker_group_size,num_worker_groups;
  int nextchan[]; int count[];
  int mastercount; Channel[] channel;

public Workpool(int a, int b, int[] c, int[] d, int[] e, Channel[] f)
{worker_group_size = a; num_worker_groups = b;
  nextchan = c; count = d;
  mastercount = e; channel = f;
}

// get an item from the workpool

public int getwork(int me) {
  int workcount, emptycount, mychan;
  int value;
  mychan = me/worker_group_size;
//decrease the # of workers in this group
  synchronized(count) {
  workcount = count[mychan]-1;
  count[mychan] = workcount;
  }
if(workcount == -worker_group_size) {//all workers in this group are idle
  synchronized(mastercount) {
  emptycount = mastercount + 1;
  mastercount = emptycount;
  }
// if all workers are done and the channels are empty, send termination message
if(emptycount == num_worker_groups)
  for(int w=0; w<num_worker_groups; w++)
    for(int j=0; j<worker_group_size; j++)
      channel[w].put(-1);
}
value = channel[mychan].get();
return(value);
}
 

// put an item in the workpool

public void putwork(int me, int item) {
  int workcount, emptycount, next;
  next = nextchan[me];
//increase the # of workers in this pool
  synchronized(count) {
  workcount = count[next]+1;
  count[next] = workcount;
  }
  if(workcount == -worker_group_size + 1) {//make pool active again
    synchronized(mastercount) {
      emptycount = mastercount - 1;
      mastercount = emptycount;
    }
  }
  channel[next].put(item);
// increment my "next" pointer to point to the next channel in "round robin" order
  nextchan[me] = (next + 1) % num_worker_groups;
}
 
 


Example of Multichannel Workpool

Shortest Path Algorithm



 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

// This is a Java program which implements the shortest path algorithm using a
//replicated workers paradigm and multiple (shared memory) channels to reduce
//contention. The goal is to establish the shortest path from a node
//to every other node in a directed graph. The graph is represented as an adjacency
//matrix whose cell contents are the distance to each neighbor. Initially, the initial
//node of the graph is placed in a workpool. Each thread (worker) gets a work item
//from the workpool which represents a node x. It then calculates the distance to all
//of x's neighbors, using the procedure:
//for w=1..#neighbors do mindist[x] = min(mindist[x], newdist[x,w]).
//Since this is a shared memory implementation, all threads have access to shared
//variables, including an array "mindist", where mindist[i,j] represents the current
//minimum distance from node i to node j, the array "weight" which records the
//distance between neighboring nodes (this is the adjacency matrix), and an integer
//"mastercount" which keeps track of the number of currently active threads. In this
//program, the workpool is implemented using multiple channels to reduce
//contention. Multiple workers (worker_group_size) are assigned to a channel.
static final int num_worker_groups = 2;
static final int worker_group_size = 2;
static final int numworkers = num_worker_groups * worker_group_size;
static final int N = 5;
static int Infinity = 32000;
static int startvertex = 0;

public static void main(String[] args) {
  int mastercount = 0;
  int weight[][] = new int[N][N];
  int mindist[] = new int[N];
  boolean inflag[] = new boolean[N];
  int count[] = new int[num_worker_groups];
  int nextchan[] = new int[numworkers];
  Channel[] channel = new Channel[num_worker_groups];
  Worker[] worker_ = new Worker[numworkers];
  Workpool workpool = new Workpool(worker_group_size,num_worker_groups,nextchan,count,mastercount,channel);
// set initial values
  for(int j=0; j<N; j++) {
    mindist[j] = infinity;
    inflag[j] = false;
    for(int k=0; k<N; k++)
      weight[j][k] = Infinity;
  }
//create a channel per worker_group
  for(int j=0; j<num_worker_groups; j++)
    channel[j] = new Channel();
// put (initial) vertex number 0 in the queue
  mindist[startvertex] = 0;
  inflag[startvertex] = true;
  channel[0].put(startvertex);
  count[0]=1;
  for(int j=1; j<num_worker_groups; j++)
  count[j] = 0;
  mastercount = 0;
// initial weight values
  weight[0][1] = 4;
  weight[0][2] = 8;
  weight[1][2] = 3;
  weight[1][3] = 1;
  weight[2][4] = 5;
  weight[3][2] = 2;
  weight[3][4] = 10;
// Creating thread (workers)
  for (int i=0; i < worker_.length; i++) {
    worker_[i] =
      new Worker(i,weight,mindist,inflag,Infinity,nextchan,worker_group_size,N,workpool);
    worker_[i].start();
    System.out.println(i+" thread properly started");
  }
// wait for the thread end
  try {
    for (int i=0; i < integrator.length; i++) {
      worker_[i].join();
      System.out.println(i+" thread properly terminated");
    }
  } catch (InterruptedException e) {
    System.out.println("Interrupted");
}
// print the result
  for(int j=0; j<N; j++)
    System.out.println(mindist[j]);
}

// thread to compute the shortest path

class Worker extends Thread {
  int vertex, newdist,i,infinity,n,worker_group_size;
  int weight[][];
  int mindist[];
  int nextchan[];
  boolean inflag[], change_flag=false;
  Workpool workpool;

public Worker(int a,int[][] b,int[] c,boolean[] d,int e,int[] g,int l,int o,Workpool w) {
  i = a;
  weight = b;
  mindist = c;
  inflag = d;
  infinity = e;
  nextchan = g;
  worker_group_size = l;
  n = 0;
  workpool = w;
}
// shortest path algorithm worker thread
public void run ( ) {
//multiple workers taking from 1 channel
  nextchan[i] = i / worker_group_size;
// get a new vertex number to examine
  vertex = workpool.getwork(i);
  while ( vertex != -1) {
// vertex is removed from Work Pool
    inflag[vertex] = false;
// consider all outgoing edges of "vertex"
    for(int w=0;w<n;w++) {
      if(weight[vertex][w]<infinity) { //is this a neighbor?
// see if this is a shorter path to w
        newdist = mindist[vertex] + weight[vertex][w];
        synchronized(mindist) {
// mindist is shared and must be updated atomically
          if(newdist < mindist[w]) {
            mindist[w] = newdist;
            change_flag = true;
          }
        }
        if(change_flag) {
//if the value mindist of "w" changed, put it back in the pool
         change_flag = false;
         if(!inflag[w]) {
            inflag[w] = true;
            workpool.putwork(i,w);
        }
      }
    }
  } //End for all neighbors
  vertex = workpool.getwork(i);
} //End of main while statement
} //End of run
} // End of Worker thread
 
 










Replicated Workers in a Distributed Environment



 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Implementing Getwork and Putwork

- all acknowledgements handled by the workpool methods

- Putwork Method

*Putwork(vertex, outputdistance)

Increment ackcount;
Build work item consisting of (me, outputdistance);
Write work item into workpool[vertex];

*Getwork Method

If (worker in active state, and work channel empty
    and all acknowledgments received) then

Begin
  Acknowledge current parent;
  Go into Idle State;
End
Repeat
Read new item from Workpool;
If (new item is acknowledgement) then decrement ack.count;
Until work item is received;

If (worker in active state) then
  Send acknowledgment to source of work item
Else begin
  Go into active state
  Set new parent from source of this work item;
End;
Return "distance" part of work item to calling worker;
 
 
 
 
 

Implementing the Worker
mindistance:= infinity;

Getwork(trialdistance)
while (not done)
  begin
  if trialdistance < mindistance then
    mindistance := trialdistance;
 for all my outgoing neighbors
   Putwork (neighbor vertex, mindistance + distance to neighbor)
Getwork(trialdistance);
end
answer := mindistance