Paradigms (Patterns) for Process Interaction in
Distributed Programs
Andrews.
ACM Computing Surveys, Vol. 23, No. 1, March, 1991
Heartbeat
Algorithms: Dispersing and Gathering State
After R rounds the following predicate must hold:
ROUND: (forall:
1<=q<=N:
(dist(p,q)
<= R implies top[q,*] is filled in)
Loop
Send messages to neighbors;
Receive message from
neighbors;
Until
termination condition.
// Heartbeat algorithm for
network topology(figure 10)
class Heartbeat{
int
N = 10;
Channel topology[]; //array
of channels to exchange "top" with neighbors
//If sender is done channel
will return a "true" value.
int
R = 0;
boolean
done = false;
int[]
sender = new int[1];
boolean
qdone;
boolean[][]
newtop = new boolean[[N][N];
boolean
value=false, value1=true;
// Constructor
Heartbeat()
{
//initial
}
public void run()
{while(!done)
{
for(int
q=0; q<N; q++)
//if links[q] is true, then q
is a neighbor of this thread
if(links[q])
//send your thread number, a false
to indicate this is not your last message,
// and the adjacency matrix
to neighbor "q"
topology[q].put(p, false, top);
for(int q=0; q<N; q++)
if(links[q])
{
//read "top" matrix from neighbors
// if "qdone", this is your neighbor's last
message
qdone = topology[p].get(sender, newtop);
for(int y=0; y<N; y++)
for(int z=0; z<N; z++)
top[y][z] = top[y][z] || newtop[y][z];
if(qdone)
active[sender[0]] = false;
}
// if
every row of "top" has a "true" entry, then we have heard
from all
//nodes up to a distance of D from this thread; then we are done
value1 = true;
for(int y=0; y<N; y++) {
value = false;
for(int z=0; z<N; z++)
//update your adjacency matrix
value = value || top[y][z];
value1 = value1 && value;
}
if(value1)
done = true;
R++;
}
//send last "top" (and a "true" to indicate this is your
last message) to
//neighbors and receive one more message from neighbors
//to
clear out channels
for(int q=0; q<N; q++)
if(active[q])
topology[q].put(p,true,top);
for(int q=0; q<N; q++)
if(active[q])
qdone = topology[p].get(sender, newtop);
}
}
Probe/echo
Algorithms
- send message to all neighbors; when a node receives the message for the first time, it passes the message to all neighbors, including the one from which it was received; after this first message, all others are ignored; this will cause 2(n-1) messages for a tree and 2n(n-1) for a complete graph.
- A node gathers the topology in two phases:
*probes are sent to neighbors
*echoes containing topology are returned to initiator
- initiator sends probe to neighbors
- when a node receives a probe, it sends a probe to all neighbors, except the one which sent it the probe; it waits for echoes from all neighbors; since there are cycles nodes can receive more than one probe from different routes; when this happens, it immediately sends an echo; when echoes are all reflected to the initiator, the topology is known; deadlock is avoided since every probe is echoed; this costs two messages across each link.
- This technique, sometimes called diffusing computations, can also be used to determine stable global states such as deadlock and program termination.
// Probe/echo algorithm for topology of a
tree
class Probe{
int
source = 1;
int
N = 10;
boolean[]
links = new boolean[N];
boolean[][]
localtop = new boolean[N][N];
boolean[][]
newtop = new boolean[N][N];
int
parent;
Channel probe[]; //channels
to send probes
Channel echo[];//channels to
receive echoes
Channel finalecho;
// one last echo to initiator
// Constructor
Probe()
{
//initialization
}
public void run(){
parent = probe[p].get();//remember your parent's name
for(int q=0; q<N; q++)
if(links[q] && (q!=parent) )
probe[q].put(p); //send probe to all neighbors
for(int q=0; q<N; q++)
if(links[q] && (q!=parent) )
{
echo[q].get(newtop); //wait for echoes to return from
neighbors
for(int y=0; y<N; y++)
for(int z=0; z<N; z++)
localtop[y][z] = localtop[y][z]
|| newtop[y][z];
//merge knowledge of topology
}
if(p==source)
finalecho.put(localtop);//echo
to initiator
else echo[parent].put(localtop);//echo to parent
}
}
class Initiator{
int
source = 1;
boolean[][]
top = new boolean[N][N];
Channel probe[];
Channel finalecho;
// Constructor
Initiator()
{
//initial
}
public
void run(){
probe[source].put(source);
finalecho.get(top);
}
}
// Probe/echo algorithm for
topology of a network
class Probe{
int
source = 1;
int
N = 10;
boolean[]
links = new boolean[N];
boolean[][]
localtop = new boolean[N][N];
boolean[][]
newtop = new boolean[N][N];
int[]
first = new int[1];
int[]
sender = new int[1];
int
need_echo;
//need two types of
channels - one with parms (type of message,sender,top)
//and one with parms (sender,top) for final echo
to initiator
Channel1 probe_echo[];
Channel2 finalecho;
// Constructor
Probe(){
//initial
}
public void run(){
k = probe_echo[p].get(first,newtop);
for(int q=0; q<N; q++)
if(links[q] && (q!=parent) )
probe_echo[q].put(k,p,empty);
while(need_echo>0) {
k = probe_echo[p].get(sender,newtop);
if(k==PROBE)
//send echo immediately - this is redundant probe
probe_echo[sender[0]].put(ECHO,p,empty);
else {
for(int y=0; y<N; y++)
for(int z=0; z<N; z++)
//merge local topology with received topology
localtop[y][z] = localtop[y][z]
|| newtop[y][z];
need_echo--; //until all echoes have been returned
}
}
if(p==source)
finalecho.put(localtop);
else probe_echo[first[0]].put(ECHO,p,localtop);
}
}
class Initiator{
boolean[][]
top = new boolean[N][N];
Channel1 probe_echo[];
Channel2 finalecho;
// Constructor
Initiator(){
//initial
}
public void run(){
probe[source].put(PROBE,source,empty);
finalecho.get(top);
}
}
Broadcast
Algorithms
Code Sketch for Distributed Semaphores Algorithm
type kind = enum(V,P,ACK)
chan sem[1:n] (sender: int, kind, timestamp: int)
chan go[1:n](timestamp: int)
User[i: 1..n]:: var lc: int:=0; #logical clock
var ts: int;
#timestamp in go message
# execute a V operation
broadcast sem(i,V,lc);
lc := lc+1; #time has passed
...
#execute a P operation
broadcast sem(i,P,lc);
lc := lc+1; #time has passed
receive go[i](tx); lc := max(lc,ts+1); lc := lc+1;
#set to max of local clock or timestamp +1
...
Helper[i:1..n]:: var mq: queue of (int,kind,int)
#ordered by timestamp
var lc: int:=0;
#logical clock
var nV: int:= 0; nP:int
:=0; #semaphore counters
var sender: int;
k: kind; ts: int; # values
in messages
do true -> {loop invariant DSEM}
receive sem[i](sender,k,ts); lc := max(lc,ts+1);
lc:=lc+1;
if k=P or k=V ->
insert(sender,k,ts)
at appropriate place in mq
broadcast sem(i,ACK,lc); lc:=lc+1;
|| k=ACK ->
record that another ACK
has been see
fa fully acknowledge V messages ->
remove the message from mq; nV:=
nV+1;
af
fa fully acknowledged P messages such that nV > nP ->
remove the message from mq; nP:=
nP+1;
if sender =i -> send go[i](lc); lc :=lc+1 fi
af
fi
od
Token-passing Algorithms
{RING: p[1]blue
=> (p[1]...p[1+token]blue and
ch[2]...ch[1+token mod n]
empty)}
actions of p[1] when it first becomes idle
- color[1]:=blue; token:=0; send
ch[2](token)
actions of p[i:1..n] upon receiving a regular message:
- color[i]:= red
actions of p[i:2..n] upon receiving the token
- color[i] := blue;
token:=token + 1; send ch[i mod n+1](token)
actions of p[1] upon receiving the token:
- if color[1] = blue then
announce termination and halt
- else {color[1] :=
blue; token:= 0; send ch[2](token)}
- assume a complete graph where every process is connected to every other process via a channel
- token must traverse every link (and therefore idle process) in the graph before we can determine that a program is terminated
- even though a process is blocked, waiting for a regular message, it will handle the token messages
- a process can receive either regular messages or tokens
- if a process sends a token around a cycle and it returns with a value of "nc" (the length of the cycle), then the program can terminate
- must find all cycles in the graph for the program to be terminated
- must maintain the GRAPH invariant:
{token has value T => (the last T channels token was received from were emtpy and all p[i] that passed it on were blue when they passed it) }
-actions of p[i: 1..n] upon receive a regular message
* color[i] := red
- actions of p[i:1..n] upon receiving the token
* if token = nc -> {announce termination and halt}
* if color[i] = red -> color[i] := blue; token:=0
* or color[i] = blue -> token := token + 1
* set j to the channel corresponding to the next edge in cycle "c"
* send ch[j](token)