Paradigms (Patterns) for Process Interaction in
Distributed Programs
Andrews.
ACM Computing Surveys, Vol. 23, No. 1, March, 1991
Programming Notation
Filters: A Sorting Network
// Constructor for Sort
Thread
Sort()
{
//Three channels must be passed to this thread
constructor
//"in1" and "in2" are input
channels
//"out" is output channel
}
public void run()
{
v1 = in1.get();
v2 = in2.get();
while( (v1!=EOS) &&
(v2!=EOS) )
{//find the smallest input
value
if(v1<=v2)
{//output smallest value (from "in1") to output channel
out.put(v1);
v1 =
in1.get();
}
else
{//output smallest value(from "in2") to output channel
out.put(v2);
v2 =
in2.get();
}
}
while( (v1!=EOS) && (v2==EOS) )
{//channel "in2" is
empty; keep processing items in "in1"
out.put(v1);
v1 = in1.get();
}
while( (v1==EOS) && (v2!=EOS) )
{//channel "in1" is
empty; keep processing items in "in2"
out.put(v2);
v2 = in2.get();
}
out.put(EOS);
}
}
Clients
and Servers
// Resource allocation
Monitor
class ResAllocMon{
int
avail; int units;
public ResAllocMon()
{
//if
resources available, allocate them, else "wait"
synchronized void acquire(int id) {
while(avail==0) {
try {
wait();
} catch (InterruptedException e) {}
}
avail--;
id = remove(units);
}
//Just release the resources and put them back into the pool
synchronized void release(int id) {
insert(id, units);
if(avail==0)
notify();
avail++;
}
}
Resource
Allocation Using a Central Process
monitor-based programs vs. message-based programs
permanent variables local server variables
procedure identifiers request channel and operation kinds
procedure call send request; receive reply
monitor entry receive request
procedure return send reply
wait statement save pending request
signal statement retrieve and process pending request
procedure bodies arms of case statement on operation kind
// Resource allocator and
clients
class Allocator extends Thread{
int
N = 10;
Channel request; Channel
reply[];
//These two channels are used
to accept requests from clients
//and reply with
"acquired resources"
int
avail; int units; int[]
pending; int[] index; int[]
unitid;
// Constructor of Allocator
thread
Allocator()
{
//initialize variables
}
public void run()
{//This thread will run
forever, or until system crashes or it is "stop"ed
while(true) {
kind = request.get(index, unitid);
if(kind==ACQUIRE) {
//This is an ACQUIRE request from the client
if(avail>0) {//request can be satisfied
avail--;
unitid[0] = remove(units);
reply[index[0]].put(unitid[0]);
}
else {//can't satisfy request, so put it in pending queue
insert(pending, index[0]);
}
}
//This is a RELEASE request from the client
else {//are there any requests for resources in the pending queue
if(empty(pending)) {
avail++;
insert(units, unitid[0]);
}
else {//a thread is waiting for this RELEASEd
resource
index[0] = remove(pending);
reply[index[0]].put(unitid[0]);
}
}
}
}
}
class Client{
int
N = 10;
int
unitid;
Channel request;
Channel reply[];
// Constructor
Client(){
//initial
}
public void run(){
request.put(i, ACQUIRE, 0);
//wait for resource to be allocated
unitid = reply[i].get();
- - - use the
resource
request.put(i,
RELEASE, unitid);
}
}
Disk
Scheduling and Disk Access
- "read(disk address)" and "write(disk address)"
- disk address is cylinder, track, and offset number
- objective in moving head disk is to minimize disk head movement
- Shortest Seek Time first is a typical scheduling algorithm
* queue incoming requests by their distance from current head position (some are lower and some are higher)
* seek to the closest request
// Self_scheduling
disk driver
class Schedule{
int
N = 10;
//channels from which to
receive requests from clients in the request channel
//and reply using a unique reply
channel per client
Channel request; Channel
reply[];
//requests are queued in
"lower" and "higher", respectively if their
//disk addresses are lower or
higher than the current head position
int
lower[] = new int[100];
int
higher[] = new int[100];
int
headpos = 1, nsaved = 0;
int
index[] = new int[1];
int
cyl;
// Constructor
Schedule(){
//initial values
}
public void run()
{
while(true)
{//any requests in channel or some pending
while(!empty(request)||(nsaved==0)){
cyl = request.get(index, args);
if(cyl<=headpos)
insert(lower,index[0],cyl,args);
else
insert(higher,index[0],cyl,args);
nsaved++;
}
//if lower pending queue is empty schedule smallest one in "higher"
//if higher pending queue is empty schedule largest one in "lower"
if(size(lower)==0)
remove(higher);
else if(size(higher)==0)
remove(lower);
//take the closest request to the current head position by scanning both
//"lower" and "higher" pending queues
else if( (headpos-get(lower)) > (get(higher)-headpos) )
remove(higher);
else remove(lower);
//move head to next cylinder and access disk
headpos = cyl;
nsaved--;
// get result and send to client
reply[index[0]].put(results);
}
}
}
File
Servers: Conversational Continuity
// File servers and clients
class File{//channel
objects for communication with clients
Channel open;
Channel access[];
Channel open_reply[];
Channel access_reply[];
int
clientid;
//
Constructor
File(){
//initial
}
public void run(){
while(true)
{//listen for request from client on "open" channel
clientid = open.get(fname);
// open data file; if successful then reply on the client-unique channel
open_reply[clientid].put(i);
file_open = true;
while(file_open) {
//only accept "reads", "writes", and "closes" if
the file is open
k = access[i].get(args);
if(k==READ)
// process read request
else if(k==WRITE)
// process write request
else if(k==CLOSE){
// close file;
file_open = false;
}
//unique channel between server and client
access_reply[clientid].put(results);
}
}
}
}
class Client{
Channel open;
Channel access[];
Channel open_reply[];
Channel access_reply[];
// Constructor
Client(){
//initial
}
public
void run(){
open.put("foo", j);
serverid = open_reply[j].get();
//use
the file and eventually close the file by executing
access[serverid].put(arguments);
results = access_reply[j].get();
}
}
Heartbeat
Algorithms: Dispersing and Gathering State
After R rounds the following predicate must hold:
ROUND: (forall:
1<=q<=N:
(dist(p,q)
<= R implies top[q,*] is filled in)
Loop
Send messages to neighbors;
Receive message from
neighbors;
Until
termination condition.
// Heartbeat algorithm for
network topology(figure 10)
class Heartbeat{
int
N = 10;
Channel topology[]; //array
of channels to exchange "top" with neighbors
//If sender is done channel
will return a "true" value.
int
R = 0;
boolean
done = false;
int[]
sender = new int[1];
boolean
qdone;
boolean[][]
newtop = new boolean[[N][N];
boolean
value=false, value1=true;
// Constructor
Heartbeat()
{
//initial
}
public void run()
{while(!done)
{
for(int
q=0; q<N; q++)
//if links[q] is true, then q
is a neighbor of this thread
if(links[q])
//send your thread number, a
false to indicate this is not your last message,
// and the adjacency matrix
to neighbor "q"
topology[q].put(p, false, top);
for(int q=0; q<N; q++)
if(links[q])
{
//read "top" matrix from neighbors
// if "qdone", this is your neighbor's last
message
qdone = topology[p].get(sender, newtop);
for(int y=0; y<N; y++)
for(int z=0; z<N; z++)
top[y][z]
= top[y][z] || newtop[y][z];
if(qdone)
active[sender[0]]
= false;
}
// if
every row of "top" has a "true" entry, then we have heard
from all
//nodes up to a distance of D from this thread; then we are done
value1 = true;
for(int y=0; y<N; y++) {
value = false;
for(int z=0; z<N; z++)
//update your adjacency matrix
value = value || top[y][z];
value1 = value1 && value;
}
if(value1)
done = true;
R++;
}
//send last "top" (and a "true" to indicate this is your
last message) to
//neighbors and receive one more message from neighbors
//to
clear out channels
for(int q=0; q<N; q++)
if(active[q])
topology[q].put(p,true,top);
for(int q=0; q<N; q++)
if(active[q])
qdone = topology[p].get(sender, newtop);
}
}
Probe/echo
Algorithms
- send message to all neighbors; when a node receives the message for the first time, it passes the message to all neighbors, including the one from which it was received; after this first message, all others are ignored; this will cause 2(n-1) messages for a tree and 2n(n-1) for a complete graph.
- A node gathers the topology in two phases:
*probes are sent to neighbors
*echoes containing topology are returned to initiator
- initiator sends probe to neighbors
- when a node receives a probe, it sends a probe to all neighbors, except the one which sent it the probe; it waits for echoes from all neighbors; since there are cycles nodes can receive more than one probe from different routes; when this happens, it immediately sends an echo; when echoes are all reflected to the initiator, the topology is known; deadlock is avoided since every probe is echoed; this costs two messages across each link.
- This technique, sometimes called diffusing computations, can also be used to determine stable global states such as deadlock and program termination.
// Probe/echo algorithm for topology of a
tree
class Probe{
int
source = 1;
int
N = 10;
boolean[]
links = new boolean[N];
boolean[][]
localtop = new boolean[N][N];
boolean[][]
newtop = new boolean[N][N];
int
parent;
Channel probe[]; //channels
to send probes
Channel echo[];//channels to
receive echoes
Channel finalecho;
// one last echo to initiator
// Constructor
Probe()
{
//initialization
}
public void run(){
parent = probe[p].get();//remember your parent's name
for(int q=0; q<N; q++)
if(links[q] && (q!=parent) )
probe[q].put(p); //send probe to all neighbors
for(int q=0; q<N; q++)
if(links[q] && (q!=parent) )
echo[q].get(newtop); //wait for echoes to return from neighbors
for(int y=0; y<N; y++)
for(int z=0; z<N; z++) {
localtop[y][z] = localtop[y][z]
|| newtop[y][z];
//merge knowledge of topology
}
if(p==source)
finalecho.put(localtop);//echo
to initiator
else echo[parent].put(localtop);//echo to parent
}
}
class Initiator{
int
source = 1;
boolean[][]
top = new boolean[N][N];
Channel probe[];
Channel finalecho;
// Constructor
Initiator()
{
//initial
}
public
void run(){
probe[source].put(source);
finalecho.get(top);
}
}
// Probe/echo algorithm for
topology of a network
class Probe{
int
source = 1;
int
N = 10;
boolean[]
links = new boolean[N];
boolean[][]
localtop = new boolean[N][N];
boolean[][]
newtop = new boolean[N][N];
int[]
first = new int[1];
int[]
sender = new int[1];
int
need_echo;
//need two types of
channels - one with parms (type of message,sender,top)
//and one with parms (sender,top) for final echo
to initiator
Channel1 probe_echo[];
Channel2 finalecho;
// Constructor
Probe(){
//initial
}
public void run(){
k = probe_echo[p].get(first,newtop);
for(int q=0; q<N; q++) {
if(links[q] && (q!=parent) ) need_echo++;
probe_echo[q].put(k,p,empty);
}
while(need_echo>0) {
k = probe_echo[p].get(sender,newtop);
if(k==PROBE)
//send echo immediately - this is redundant probe
probe_echo[sender[0]].put(ECHO,p,empty);
else {
for(int y=0; y<N; y++)
for(int z=0; z<N; z++)
//merge local topology with received topology
localtop[y][z] = localtop[y][z]
|| newtop[y][z];
need_echo--; //until all echoes have been returned
}
}
if(p==source)
finalecho.put(localtop);
else probe_echo[first[0]].put(ECHO,p,localtop);
}
}
class Initiator{
boolean[][]
top = new boolean[N][N];
Channel1 probe_echo[];
Channel2 finalecho;
// Constructor
Initiator(){
//initial
}
public void run(){
probe[source].put(PROBE,source,empty);
finalecho.get(top);
}
}
Broadcast
Algorithms
Code Sketch for Distributed Semaphores Algorithm
type kind = enum(V,P,ACK)
chan sem[1:n] (sender: int, kind, timestamp: int)
chan go[1:n](timestamp: int)
User[i: 1..n]:: var lc: int:=0; #logical clock
var ts: int;
#timestamp in go message
# execute a V operation
broadcast sem(i,V,lc);
lc := lc+1; #time has passed
...
#execute a P operation
broadcast sem(i,P,lc);
lc := lc+1; #time has passed
receive go[i](tx); lc := max(lc,ts+1); lc := lc+1;
#set to max of local clock or timestamp +1
...
Helper[i:1..n]:: var mq: queue of (int,kind,int)
#ordered by timestamp
var lc: int:=0;
#logical clock
var nV: int:= 0; nP:int
:=0; #semaphore counters
var sender: int;
k: kind; ts: int; # values
in messages
do true -> {loop invariant DSEM}
receive sem[i](sender,k,ts); lc := max(lc,ts+1);
lc:=lc+1;
if k=P or k=V ->
insert(sender,k,ts)
at appropriate place in mq
broadcast sem(i,ACK,lc); lc:=lc+1;
|| k=ACK ->
record that another ACK
has been see
fa fully acknowledge V messages ->
remove the message from mq; nV:=
nV+1;
af
fa fully acknowledged P messages such that nV > nP ->
remove the message from mq; nP:=
nP+1;
if sender =i -> send go[i](lc); lc :=lc+1 fi
af
fi
od
Token-passing Algorithms
{RING: p[1]blue
=> (p[1]...p[1+token]blue and
ch[2]...ch[1+token mod n]
empty)}
actions of p[1] when it first becomes idle
- color[1]:=blue; token:=0; send
ch[2](token)
actions of p[i:1..n] upon receiving a regular message:
- color[i]:= red
actions of p[i:2..n] upon receiving the token
- color[i] := blue;
token:=token + 1; send ch[i mod n+1](token)
actions of p[1] upon receiving the token:
- if color[1] = blue then
announce termination and halt
- else {color[1] :=
blue; token:= 0; send ch[2](token)}
- assume a complete graph where every process is connected to every other process via a channel
- token must traverse every link (and therefore idle process) in the graph before we can determine that a program is terminated
- even though a process is blocked, waiting for a regular message, it will handle the token messages
- a process can receive either regular messages or tokens
- if a process sends a token around a cycle and it returns with a value of "nc" (the length of the cycle), then the program can terminate
- must find all cycles in the graph for the program to be terminated
- must maintain the GRAPH invariant:
{token has value T => (the last T channels token was received from were emtpy and all p[i] that passed it on were blue when they passed it) }
-actions of p[i: 1..n] upon receive a regular message
* color[i] := red
- actions of p[i:1..n] upon receiving the token
* if token = nc -> {announce termination and halt}
* if color[i] = red -> color[i] := blue; token:=0
* or color[i] = blue -> token := token + 1
* set j to the channel corresponding to the next edge in cycle "c"
* send ch[j](token)