Back

class Heap {
  static Connector c1, c2, c3, c4;
}


class PipeInt {
  /**
   * @observable
   *   LOCATION[c1StopCallLoc] c1StopCall;
   */
  static public void main (String[] args) {
    Heap.c1 = new Connector();
    Heap.c2 = new Connector();
    (new Stage1()).start();

    Heap.c3 = new Connector();
    (new Stage2()).start();

    Heap.c4 = new Connector();
    (new Stage3()).start();

    (new Listener()).start();

    for (int i = 1; i < 100; i++)
      Heap.c1.add(i);

    c1StopCallLoc:
    Heap.c1.stop();
  }
}

final class Connector {
  int queue = -1;
  public final synchronized int take() {
    int value;
    while (queue < 0)
      try {
        wait();
      } catch (InterruptedException ex) {}
    value = queue;
    queue = -1;
    return value;
  }

  public final synchronized void add(int o) {
    queue = o;
    notifyAll();
  }

  /**
    * @observable
    *   INVOKE inv1: (this == Heap.c1);
    * @assert
    *   POST post1: (queue == 0);
    */
  public final synchronized void stop() {
    queue = 0;
    notifyAll();
  }
}



final class Stage1 extends Thread {
  /**
   * @observable
   *   LOCATION[stage1ShutdownLoc] stage1Shutdown;
   */
  public void run() {
    System.out.println("Stage1 startup");
    int tmp = -1;
    while (tmp != 0) {
      if ((tmp = Heap.c1.take()) != 0)
        Heap.c2.add(tmp + 1);
    }
    Heap.c2.stop();

    stage1ShutdownLoc:
    System.out.println("Stage1 shutdown");
  }
}

final class Stage2 extends Thread {
  /**
   * @observable
   *   RETURN stage2run;
   */
  public void run() {
    System.out.println("Stage2 startup");
    int tmp = -1;
    while (tmp != 0) {
      if ((tmp = Heap.c2.take()) != 0)
        Heap.c3.add(tmp + 1);
    }
    Heap.c3.stop();

    System.out.println("Stage2 shutdown");
  } // end public void run()
}

final class Stage3 extends Thread {
  /**
   * @observable
   *   RETURN stage3run;
   */
  public void run() {
    System.out.println("Stage3 startup");
    int tmp = -1;
    while (tmp != 0) {
      if ((tmp = Heap.c3.take()) != 0)
        Heap.c4.add(tmp + 1);
    }
    Heap.c4.stop();

    System.out.println("Stage3 shutdown");
  } // end public void run()
}

final class Listener extends Thread {
  /**
   * @observable
   *   LOCATION[listenerShutdownLoc] listenerShutdown;
   */
  public void run() {
    System.out.println("Listener startup");
    int tmp = -1;
    while (tmp != 0) {
      if ((tmp = Heap.c4.take()) != 0)
        System.out.println("output is " + tmp);
    }

    listenerShutdownLoc:
    System.out.println("Listener shutdown");
  } // end public void run()
}

Back