Paradigms (Patterns) for Process Interaction in Distributed Programs

Andrews. ACM Computing Surveys, Vol. 23, No. 1, March, 1991

Heartbeat Algorithms: Dispersing and Gathering State


 
 

Description: W:\CIS720Fall2012\ppdigms2_files\img5.gif
 
 
 
 

After R rounds the following predicate must hold:

ROUND: (forall: 1<=q<=N:
    (dist(p,q) <= R implies top[q,*] is filled in)
 
 

Loop
    Send messages to neighbors;
    Receive message from neighbors;
    Until termination condition.
    // Heartbeat algorithm for network topology(figure 10)
 

class Heartbeat{
    int N = 10;
    Channel topology[]; //array of channels to exchange "top" with neighbors
    //If sender is done channel will return a "true" value.
    int R = 0;
    boolean done = false;
    int[] sender = new int[1];
    boolean qdone;
    boolean[][] newtop = new boolean[[N][N];
    boolean value=false, value1=true;

    // Constructor

    Heartbeat()
    {
    //initial
    }

    public void run()
    {while(!done)
    {
    for(int q=0; q<N; q++)
    //if links[q] is true, then q is a neighbor of this thread
    if(links[q])
    //send your thread number, a false to indicate this is not your last message,
    // and the adjacency matrix to neighbor "q"
        topology[q].put(p, false, top);
        for(int q=0; q<N; q++)
            if(links[q])
            {
            //read "top" matrix from neighbors
            // if "qdone", this is your neighbor's last message
            qdone = topology[p].get(sender, newtop);
            for(int y=0; y<N; y++)
                for(int z=0; z<N; z++)
                    top[y][z] = top[y][z] || newtop[y][z];
                    if(qdone)
                        active[sender[0]] = false;
            }
        // if every row of "top" has a "true" entry, then we have heard from all
        //nodes up to a distance of D from this thread; then we are done
            value1 = true;
            for(int y=0; y<N; y++) {
                value = false;
                for(int z=0; z<N; z++)
                //update your adjacency matrix
                value = value || top[y][z];
                value1 = value1 && value;
            }
            if(value1)
                done = true;
            R++;
            }

        //send last "top" (and a "true" to indicate this is your last message) to
        //neighbors and receive one more message from neighbors
        //to clear out channels
        for(int q=0; q<N; q++)
            if(active[q])
                topology[q].put(p,true,top);
                    for(int q=0; q<N; q++)
                        if(active[q])
                            qdone = topology[p].get(sender, newtop);
        }
}
 
 



Probe/echo Algorithms

 


 
 
 
 
 
 
 


 
 
 

Description: W:\CIS720Fall2012\ppdigms.htg\img6.gif
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

- send message to all neighbors; when a node receives the message for the first time, it passes the message to all neighbors, including the one from which it was received; after this first message, all others are ignored; this will cause 2(n-1) messages for a tree and 2n(n-1) for a complete graph.

- A node gathers the topology in two phases:

*probes are sent to neighbors

*echoes containing topology are returned to initiator

- initiator sends probe to neighbors

- when a node receives a probe, it sends a probe to all neighbors, except the one which sent it the probe; it waits for echoes from all neighbors; since there are cycles nodes can receive more than one probe from different routes; when this happens, it immediately sends an echo; when echoes are all reflected to the initiator, the topology is known; deadlock is avoided since every probe is echoed; this costs two messages across each link.

- This technique, sometimes called diffusing computations, can also be used to determine stable global states such as deadlock and program termination.


 

// Probe/echo algorithm for topology of a tree
 

class Probe{
    int source = 1;
    int N = 10;
    boolean[] links = new boolean[N];
    boolean[][] localtop = new boolean[N][N];
    boolean[][] newtop = new boolean[N][N];
    int parent;
    Channel probe[]; //channels to send probes
    Channel echo[];//channels to receive echoes
    Channel finalecho; // one last echo to initiator
 

// Constructor
    Probe()
    {
    //initialization
    }
 

    public void run(){
        parent = probe[p].get();//remember your parent's name
        for(int q=0; q<N; q++)
            if(links[q] && (q!=parent) )
                probe[q].put(p); //send probe to all neighbors
                for(int q=0; q<N; q++)
                    if(links[q] && (q!=parent) )
                    {
                    echo[q].get(newtop); //wait for echoes to return from neighbors
                    for(int y=0; y<N; y++)
                        for(int z=0; z<N; z++)
                            localtop[y][z] = localtop[y][z] || newtop[y][z];

                            //merge knowledge of topology
                    }
                    if(p==source)
                        finalecho.put(localtop);//echo to initiator
                        else echo[parent].put(localtop);//echo to parent
       }
}
 

class Initiator{
    int source = 1;
    boolean[][] top = new boolean[N][N];
    Channel probe[];
    Channel finalecho;

// Constructor
    Initiator()
    {
    //initial
    }

    public void run(){
        probe[source].put(source);
        finalecho.get(top);
        }
    }
 


 
 

// Probe/echo algorithm for topology of a network
 

class Probe{
    int source = 1;
    int N = 10;
    boolean[] links = new boolean[N];
    boolean[][] localtop = new boolean[N][N];
    boolean[][] newtop = new boolean[N][N];
    int[] first = new int[1];
    int[] sender = new int[1];
    int need_echo;

    //need two types of channels - one with parms (type of message,sender,top)
    //and one with parms (sender,top) for final echo to initiator
    Channel1 probe_echo[];
    Channel2 finalecho;

    // Constructor
    Probe(){
    //initial
    }

    public void run(){
        k = probe_echo[p].get(first,newtop);
        for(int q=0; q<N; q++)
        if(links[q] && (q!=parent) )
            probe_echo[q].put(k,p,empty);
            while(need_echo>0) {
                k = probe_echo[p].get(sender,newtop);
                if(k==PROBE)
                    //send echo immediately - this is redundant probe
                    probe_echo[sender[0]].put(ECHO,p,empty);
                else {
                    for(int y=0; y<N; y++)
                        for(int z=0; z<N; z++)
                        //merge local topology with received topology
                        localtop[y][z] = localtop[y][z] || newtop[y][z];
                        need_echo--; //until all echoes have been returned
                }
        }
        if(p==source)
            finalecho.put(localtop);
        else probe_echo[first[0]].put(ECHO,p,localtop);
    }

}
 

class Initiator{
    boolean[][] top = new boolean[N][N];
    Channel1 probe_echo[];
    Channel2 finalecho;

    // Constructor
    Initiator(){
    //initial
    }

    public void run(){
        probe[source].put(PROBE,source,empty);
        finalecho.get(top);
    }
}
 
 

Broadcast Algorithms

 


Code Sketch for Distributed Semaphores Algorithm

type kind = enum(V,P,ACK)
chan sem[1:n] (sender: int, kind, timestamp: int)
chan go[1:n](timestamp: int)
User[i: 1..n]:: var lc: int:=0;  #logical clock
    var ts: int;                      #timestamp in go message
    # execute a V operation
    broadcast sem(i,V,lc); lc := lc+1;  #time has passed
    ...
    #execute a P operation
    broadcast sem(i,P,lc); lc := lc+1; #time has passed
    receive go[i](tx); lc := max(lc,ts+1); lc := lc+1; #set to max of local clock or timestamp +1
    ...

Helper[i:1..n]:: var mq: queue of (int,kind,int)  #ordered by timestamp
    var lc: int:=0;               #logical clock
    var nV: int:= 0; nP:int :=0;    #semaphore counters
    var sender: int; k: kind; ts: int; # values in messages
    do true -> {loop invariant DSEM}
      receive sem[i](sender,k,ts); lc := max(lc,ts+1); lc:=lc+1;
      if k=P or k=V ->
        insert(sender,k,ts) at appropriate place in mq
        broadcast sem(i,ACK,lc); lc:=lc+1;
      || k=ACK ->
          record that another ACK has been see
            fa fully acknowledge V messages ->
              remove the message from mq; nV:= nV+1;
            af
            fa fully acknowledged P messages such that nV > nP ->
              remove the message from mq; nP:= nP+1;
              if sender =i -> send go[i](lc); lc :=lc+1 fi
            af
      fi
   od
 
 









Token-passing Algorithms

 

{RING: p[1]blue => (p[1]...p[1+token]blue and
                ch[2]...ch[1+token mod n] empty)}

actions of p[1] when it first becomes idle
    - color[1]:=blue; token:=0; send ch[2](token)

actions of p[i:1..n] upon receiving a regular message:
    - color[i]:= red

actions of p[i:2..n] upon receiving the token
    - color[i] := blue; token:=token + 1; send ch[i mod n+1](token)

actions of p[1] upon receiving the token:
    - if color[1] = blue then announce termination and halt
    - else {color[1] := blue; token:= 0; send ch[2](token)}
 
 
 

- assume a complete graph where every process is connected to every other process via a channel

- token must traverse every link (and therefore idle process) in the graph before we can determine that a program is terminated

- even though a process is blocked, waiting for a regular message, it will handle the token messages

- a process can receive either regular messages or tokens

- if a process sends a token around a cycle and it returns with a value of "nc" (the length of the cycle), then the program can terminate

- must find all cycles in the graph for the program to be terminated

- must maintain the GRAPH invariant:

{token has value T => (the last T channels token was received from were emtpy and all p[i] that passed it on were blue when they passed it) }

-actions of p[i: 1..n] upon receive a regular message

    * color[i] := red

- actions of p[i:1..n] upon receiving the token

    * if token = nc -> {announce termination and halt}

    * if color[i] = red -> color[i] := blue; token:=0

    * or color[i] = blue -> token := token + 1

    * set j to the channel corresponding to the next edge in cycle "c"

    * send ch[j](token)