Paradigms (Patterns) for Process Interaction in Distributed Programs

Andrews. ACM Computing Surveys, Vol. 23, No. 1, March, 1991

 

Programming Notation


 

Filters: A Sorting Network

 


 

Description: Description: W:\CIS720Fall2012\ppdigms.htg\img.gif
 
 
 
 
 
 
 
 
 
 
 
 
 

// Constructor for Sort Thread
Sort()
{
//Three channels must be passed to this thread constructor
//"in1" and "in2" are input channels
//"out" is output channel
}

public void run()
{
    v1 = in1.get();
    v2 = in2.get();
    while( (v1!=EOS) && (v2!=EOS) )
    {//find the smallest input value
    if(v1<=v2)
        {//output smallest value (from "in1") to output channel
        out.put(v1);
        v1 = in1.get();
        }
    else
        {//output smallest value(from "in2") to output channel
        out.put(v2);
        v2 = in2.get();
        }
    }

while( (v1!=EOS) && (v2==EOS) )
    {//channel "in2" is empty; keep processing items in "in1"
    out.put(v1);
    v1 = in1.get();
    }

while( (v1==EOS) && (v2!=EOS) )
    {//channel "in1" is empty; keep processing items in "in2"
    out.put(v2);
    v2 = in2.get();
    }
out.put(EOS);
}
}
 
 

Clients and Servers

 

// Resource allocation Monitor
class ResAllocMon{
    int avail; int units;
    public ResAllocMon() {
        //if resources available, allocate them, else "wait"
        synchronized void acquire(int id) {
            while(avail==0) {
                try {
                    wait();
                } catch (InterruptedException e) {}
            }
            avail--;
            id = remove(units);
        }
        //Just release the resources and put them back into the pool
        synchronized void release(int id) {
            insert(id, units);
            if(avail==0)
                notify();
            avail++;
        }
    }

Resource Allocation Using a Central Process

 

monitor-based programs vs. message-based programs

permanent variables             local server variables

procedure identifiers             request channel and operation kinds

procedure call                     send request; receive reply

monitor entry                     receive request

procedure return                 send reply

wait statement                   save pending request

signal statement                 retrieve and process pending request

procedure bodies               arms of case statement on operation kind

// Resource allocator and clients
 

class Allocator extends Thread{
    int N = 10;
    Channel request; Channel reply[];
    //These two channels are used to accept requests from clients
    //and reply with "acquired resources"
    int avail; int units; int[] pending; int[] index; int[] unitid;
    // Constructor of Allocator thread
    Allocator()
    {
    //initialize variables
    }

    public void run()
    {//This thread will run forever, or until system crashes or it is "stop"ed
        while(true)       {
            kind = request.get(index, unitid);
            if(kind==ACQUIRE) {
            //This is an ACQUIRE request from the client
                if(avail>0) {//request can be satisfied
                    avail--;
                    unitid[0] = remove(units);
                    reply[index[0]].put(unitid[0]);
                }
                else {//can't satisfy request, so put it in pending queue
                    insert(pending, index[0]);
                }
            }
            //This is a RELEASE request from the client
            else {//are there any requests for resources in the pending queue
                if(empty(pending)) {
                    avail++;
                    insert(units, unitid[0]);
                }
                else {//a thread is waiting for this RELEASEd resource
                    index[0] = remove(pending);
                    reply[index[0]].put(unitid[0]);
                }
            }
        }
    }
}
 

class Client{
    int N = 10;
    int unitid;
    Channel request;
    Channel reply[];

    // Constructor
    Client(){
    //initial
    }
    public void run(){
        request.put(i, ACQUIRE, 0);
        //wait for resource to be allocated
        unitid = reply[i].get();

- - - use the resource
        request.put(i, RELEASE, unitid);
    }
}

Disk Scheduling and Disk Access

 

- "read(disk address)" and "write(disk address)"

- disk address is cylinder, track, and offset number

- objective in moving head disk is to minimize disk head movement

- Shortest Seek Time first is a typical scheduling algorithm

* queue incoming requests by their distance from current head position (some are lower and some are higher)

* seek to the closest request
 
 
 


 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 

 


 

Description: Description: W:\CIS720Fall2012\ppdigms.htg\img2.gif
 
 
 
 
 
 
 
 
 
 
 
 
 


 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Description: Description: W:\CIS720Fall2012\ppdigms.htg\img3.gif
 
 
 
 
 
 
 
 
 
 
 

 

 


 


 

// Self_scheduling disk driver
class Schedule{
    int N = 10;
    //channels from which to receive requests from clients in the request channel
    //and reply using a unique reply channel per   client
    Channel request; Channel reply[];
    //requests are queued in "lower" and "higher", respectively if their
    //disk addresses are lower or higher than the current head position
    int lower[] = new int[100];
    int higher[] = new int[100];
    int headpos = 1, nsaved = 0;
    int index[] = new int[1];
    int cyl;

    // Constructor
    Schedule(){
    //initial values
    }

    public void run()
    {
        while(true)
        {//any requests in channel or some pending
            while(!empty(request)||(nsaved==0)){
                cyl = request.get(index, args);
                if(cyl<=headpos)
                    insert(lower,index[0],cyl,args);
                else
                    insert(higher,index[0],cyl,args);
                nsaved++;
            }
            //if lower pending queue is empty schedule smallest one in "higher"
            //if higher pending queue is empty schedule largest one in "lower"
            if(size(lower)==0)
                remove(higher);
                else if(size(higher)==0)
                    remove(lower);
                    //take the closest request to the current head position by scanning both
                    //"lower" and "higher" pending queues
                        else if( (headpos-get(lower)) > (get(higher)-headpos) )
                            remove(higher);
                            else remove(lower);
                    //move head to next cylinder and access disk
                    headpos = cyl;
                    nsaved--;
                    // get result and send to client
                    reply[index[0]].put(results);
        }
    }
}
 
 

File Servers: Conversational Continuity

 


 

Description: Description: W:\CIS720Fall2012\ppdigms.htg\img4.gif
 
 
 
 
 
 
 
 
 
 
 


 

// File servers and clients
 

class File{//channel objects for communication with clients
    Channel open;
    Channel access[];
    Channel open_reply[];
    Channel access_reply[];
    int clientid;
        // Constructor
    File(){
    //initial
    }

    public void run(){
        while(true)
            {//listen for request from client on "open" channel
                clientid = open.get(fname);
                // open data file; if successful then reply on the client-unique channel
                open_reply[clientid].put(i);
                file_open = true;
                while(file_open) {
                //only accept "reads", "writes", and "closes" if the file is open
                    k = access[i].get(args);
                        if(k==READ)
                    // process read request
                            else if(k==WRITE)
                    // process write request
                                else if(k==CLOSE){
                                // close file;
                                file_open = false;
                                }
                        //unique channel between server and client
                        access_reply[clientid].put(results);
                }
            }
        }
    }
 

class Client{
    Channel open;
    Channel access[];
    Channel open_reply[];
    Channel access_reply[];
    // Constructor
    Client(){
    //initial
    }

    public void run(){
        open.put("foo", j);
        serverid = open_reply[j].get();
        //use the file and eventually close the file by executing
        access[serverid].put(arguments);
        results = access_reply[j].get();
        }
    }
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Heartbeat Algorithms: Dispersing and Gathering State


 
 

Description: Description: W:\CIS720Fall2012\ppdigms_files\img5.gif
 
 
 
 

After R rounds the following predicate must hold:

ROUND: (forall: 1<=q<=N:
    (dist(p,q) <= R implies top[q,*] is filled in)
 
 

Loop
    Send messages to neighbors;
    Receive message from neighbors;
    Until termination condition.
    // Heartbeat algorithm for network topology(figure 10)
 

class Heartbeat{
    int N = 10;
    Channel topology[]; //array of channels to exchange "top" with neighbors
    //If sender is done channel will return a "true" value.
    int R = 0;
    boolean done = false;
    int[] sender = new int[1];
    boolean qdone;
    boolean[][] newtop = new boolean[[N][N];
    boolean value=false, value1=true;

    // Constructor

    Heartbeat()
    {
    //initial
    }

    public void run()
    {while(!done)
    {
    for(int q=0; q<N; q++)
    //if links[q] is true, then q is a neighbor of this thread
    if(links[q])
    //send your thread number, a false to indicate this is not your last message,
    // and the adjacency matrix to neighbor "q"
        topology[q].put(p, false, top);
        for(int q=0; q<N; q++)
            if(links[q])
            {
            //read "top" matrix from neighbors
            // if "qdone", this is your neighbor's last message
            qdone = topology[p].get(sender, newtop);
          for(int y=0; y<N; y++)
              for(int z=0; z<N; z++)
                  top[y][z] = top[y][z] || newtop[y][z];
                  if(qdone)
                      active[sender[0]] = false;
            }
        // if every row of "top" has a "true" entry, then we have heard from all
        //nodes up to a distance of D from this thread; then we are done
            value1 = true;
            for(int y=0; y<N; y++) {
                value = false;
                for(int z=0; z<N; z++)
                //update your adjacency matrix
                value = value || top[y][z];
                value1 = value1 && value;
            }
            if(value1)
                done = true;
            R++;
            }

        //send last "top" (and a "true" to indicate this is your last message) to
        //neighbors and receive one more message from neighbors
        //to clear out channels
        for(int q=0; q<N; q++)
            if(active[q])
                topology[q].put(p,true,top);
                    for(int q=0; q<N; q++)
                        if(active[q])
                            qdone = topology[p].get(sender, newtop);
        }
}
 
 



Probe/echo Algorithms

 


 
 
 
 
 
 
 


 
 
 

Description: Description: W:\CIS720Fall2012\ppdigms.htg\img6.gif
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

 
 
 
 

- send message to all neighbors; when a node receives the message for the first time, it passes the message to all neighbors, including the one from which it was received; after this first message, all others are ignored; this will cause 2(n-1) messages for a tree and 2n(n-1) for a complete graph.

- A node gathers the topology in two phases:

*probes are sent to neighbors

*echoes containing topology are returned to initiator

- initiator sends probe to neighbors

- when a node receives a probe, it sends a probe to all neighbors, except the one which sent it the probe; it waits for echoes from all neighbors; since there are cycles nodes can receive more than one probe from different routes; when this happens, it immediately sends an echo; when echoes are all reflected to the initiator, the topology is known; deadlock is avoided since every probe is echoed; this costs two messages across each link.

- This technique, sometimes called diffusing computations, can also be used to determine stable global states such as deadlock and program termination.


 

// Probe/echo algorithm for topology of a tree
 

class Probe{
    int source = 1;
    int N = 10;
    boolean[] links = new boolean[N];
    boolean[][] localtop = new boolean[N][N];
    boolean[][] newtop = new boolean[N][N];
    int parent;
    Channel probe[]; //channels to send probes
    Channel echo[];//channels to receive echoes
    Channel finalecho; // one last echo to initiator
 

// Constructor
    Probe()
    {
    //initialization
    }
 

    public void run(){
        parent = probe[p].get();//remember your parent's name
        for(int q=0; q<N; q++)
            if(links[q] && (q!=parent) )
                probe[q].put(p); //send probe to all neighbors
        for(int q=0; q<N; q++)
            if(links[q] && (q!=parent) )  
                 echo[q].get(newtop); //wait for echoes to return from neighbors
                    for(int y=0; y<N; y++)
                        for(int z=0; z<N; z++) {
                            localtop[y][z] = localtop[y][z] || newtop[y][z];

                            //merge knowledge of topology
                    }
                    if(p==source)
                        finalecho.put(localtop);//echo to initiator
                        else echo[parent].put(localtop);//echo to parent
       }
}
 

class Initiator{
    int source = 1;
    boolean[][] top = new boolean[N][N];
    Channel probe[];
    Channel finalecho;

// Constructor
    Initiator()
    {
    //initial
    }

    public void run(){
        probe[source].put(source);
        finalecho.get(top);
        }
    }
 


 
 

// Probe/echo algorithm for topology of a network
 

class Probe{
    int source = 1;
    int N = 10;
    boolean[] links = new boolean[N];
    boolean[][] localtop = new boolean[N][N];
    boolean[][] newtop = new boolean[N][N];
    int[] first = new int[1];
    int[] sender = new int[1];
    int need_echo;

    //need two types of channels - one with parms (type of message,sender,top)
    //and one with parms (sender,top) for final echo to initiator
    Channel1 probe_echo[];
    Channel2 finalecho;

    // Constructor
    Probe(){
    //initial
    }

    public void run(){
        k = probe_echo[p].get(first,newtop);
        for(int q=0; q<N; q++) {
        if(links[q] && (q!=parent) ) need_echo++;
            probe_echo[q].put(k,p,empty); }
            while(need_echo>0) {
                k = probe_echo[p].get(sender,newtop);
                if(k==PROBE)
                    //send echo immediately - this is redundant probe
                    probe_echo[sender[0]].put(ECHO,p,empty);
                else {
                    for(int y=0; y<N; y++)
                        for(int z=0; z<N; z++)
                        //merge local topology with received topology
                        localtop[y][z] = localtop[y][z] || newtop[y][z];
                        need_echo--; //until all echoes have been returned
                }
        }
        if(p==source)
            finalecho.put(localtop);
        else probe_echo[first[0]].put(ECHO,p,localtop);
    }

}
 

class Initiator{
    boolean[][] top = new boolean[N][N];
    Channel1 probe_echo[];
    Channel2 finalecho;

    // Constructor
    Initiator(){
    //initial
    }

    public void run(){
        probe[source].put(PROBE,source,empty);
        finalecho.get(top);
    }
}
 
 

Broadcast Algorithms

 


Code Sketch for Distributed Semaphores Algorithm

type kind = enum(V,P,ACK)
chan sem[1:n] (sender: int, kind, timestamp: int)
chan go[1:n](timestamp: int)
User[i: 1..n]:: var lc: int:=0;  #logical clock
    var ts: int;                      #timestamp in go message
    # execute a V operation
    broadcast sem(i,V,lc); lc := lc+1;  #time has passed
    ...
    #execute a P operation
    broadcast sem(i,P,lc); lc := lc+1; #time has passed
    receive go[i](tx); lc := max(lc,ts+1); lc := lc+1; #set to max of local clock or timestamp +1
    ...

Helper[i:1..n]:: var mq: queue of (int,kind,int)  #ordered by timestamp
    var lc: int:=0;               #logical clock
    var nV: int:= 0; nP:int :=0;    #semaphore counters
    var sender: int; k: kind; ts: int; # values in messages
    do true -> {loop invariant DSEM}
      receive sem[i](sender,k,ts); lc := max(lc,ts+1); lc:=lc+1;
      if k=P or k=V ->
        insert(sender,k,ts) at appropriate place in mq
        broadcast sem(i,ACK,lc); lc:=lc+1;
      || k=ACK ->
          record that another ACK has been see
            fa fully acknowledge V messages ->
              remove the message from mq; nV:= nV+1;
            af
            fa fully acknowledged P messages such that nV > nP ->
              remove the message from mq; nP:= nP+1;
              if sender =i -> send go[i](lc); lc :=lc+1 fi
            af
      fi
   od
 
 









Token-passing Algorithms

 

{RING: p[1]blue => (p[1]...p[1+token]blue and
                ch[2]...ch[1+token mod n] empty)}

actions of p[1] when it first becomes idle
    - color[1]:=blue; token:=0; send ch[2](token)

actions of p[i:1..n] upon receiving a regular message:
    - color[i]:= red

actions of p[i:2..n] upon receiving the token
    - color[i] := blue; token:=token + 1; send ch[i mod n+1](token)

actions of p[1] upon receiving the token:
    - if color[1] = blue then announce termination and halt
    - else {color[1] := blue; token:= 0; send ch[2](token)}
 
 
 

- assume a complete graph where every process is connected to every other process via a channel

- token must traverse every link (and therefore idle process) in the graph before we can determine that a program is terminated

- even though a process is blocked, waiting for a regular message, it will handle the token messages

- a process can receive either regular messages or tokens

- if a process sends a token around a cycle and it returns with a value of "nc" (the length of the cycle), then the program can terminate

- must find all cycles in the graph for the program to be terminated

- must maintain the GRAPH invariant:

{token has value T => (the last T channels token was received from were emtpy and all p[i] that passed it on were blue when they passed it) }

-actions of p[i: 1..n] upon receive a regular message

    * color[i] := red

- actions of p[i:1..n] upon receiving the token

    * if token = nc -> {announce termination and halt}

    * if color[i] = red -> color[i] := blue; token:=0

    * or color[i] = blue -> token := token + 1

    * set j to the channel corresponding to the next edge in cycle "c"

    * send ch[j](token)