Discrete Event Simulation in Concurrent C

The researchers who developed Concurrent C present a program theat models a multistage, multiserver queuing network, in which events in the simulated system happen at discrete times.


January 01, 1989
URL:http://www.drdobbs.com/cpp/discrete-event-simulation-in-concurrent/184408274

Figure 1


Copyright © 1989, Dr. Dobb's Journal

Figure 2


Copyright © 1989, Dr. Dobb's Journal

Figure 3


Copyright © 1989, Dr. Dobb's Journal

SP 89: DISCRETE EVENT SIMULATION IN CONCURRENT C

DISCRETE EVENT SIMULATION IN CONCURRENT C

Simulating discrete events with concurrent processing

N.H. Gehani and W.D. Roome

Narain and William are the architects of Concurrent C and authors of The Concurrent C Programming Language (Silicon Press) from which this article is adapted. They can be reached at AT&T Bell Labs, 600 Mountain Ave., Murray Hill, NJ 07974.


In discrete event simulation, events in the simulated system happen at discrete times, and these are the only times at which the system changes state. For example, when simulating a queue of customers waiting for service at a bank, the events include a new customer arriving, a customer reaching the head of the line and being served by a teller, a customer leaving, and so on. The key points are the times at which these events happen; as far as the simulation is concerned, nothing happens between the events. Discrete event simulation differs from "continuous time" simulation, which is used to simulate continuous systems such as water flowing over a dam.

Concurrent programming can simplify the task of writing a discrete event simulation program; the resulting program is easy to understand and modify. In particular:

We will illustrate these points by showing how to write a Concurrent C program that models a multi-stage, multiserver queuing network. Each queue and each server is modeled by a Concurrent C process. This is a natural way to simulate a queuing network: Each process runs independently, as do the queue and servers in the real network, and they interact when necessary, for example, when a server takes a job from its input queue. We'll present several general processes that can be used in other simulation programs; these include an event scheduler process and a queue manager process.

The Process-Interaction Model

Consider the queuing system in Figure 1 where the simulation uses three processes: One for the source, one for the queue, and one for the server. The source process generates items ("jobs"). The inter-arrival time -- the time between item arrivals -- is a random variable. The body of the source process consists of a simple loop that calculates the next inter-arrival time, waits for that many time units, generates a new item, and then places the new item on the queue as in Example 1.

Example 1: The server process

  while (1) {
    delay for random inter-arrival
                             time;
    generate item;
    call queue process to put item
                         in queue;
  }

Similarly, the server process repeatedly takes the next item from the queue, processes it for the appropriate service time, and finally discards the item. The queue accepts items from the source and gives them to the server.

General Method

The first step in the process-interaction model is to determine the sequential, independent entities in the system being simulated. The entities in a queuing network include sources, queues, and servers. Each entity performs a well-defined series of operations. Some of these operations may require interaction with another entity; one example is a server taking an item from a queue. Other than that, each entity is independent of the other entities.

The next step is to identify the types of interactions between the entities. Each such interaction becomes a transaction call. It is convenient to divide entities into two categories: Active entities, such as servers, and passive entities, such as queues. In general, passive entities wait for requests, and usually represent "resources" that are used by active entities. The process that implements a passive entity has one transaction for each type of request. For example, a queue process will have a put transaction to put an item into the queue, and a take transaction to remove the next item from the queue. The process for an active entity, such as the server, is not called by the other processes, and thus does not have transactions. This is not an absolute rule. For example, while handling a request, a passive process might actively request service from another passive process. But it is often a useful paradigm for structuring the processes in a simulation program.

For each distinct type of entity, we then write the specification and body for the Concurrent C process that simulates it. If the simulation needs several entities of the same type, we can create an instance of the corresponding process type for each entity. The process for simulating an active entity is a simple sequential program that performs the entity's operations. The process for simulating a passive entity consists of a loop that repeatedly executes a select statement with alternatives for all of its transactions. In general, each process keeps statistics, such as the mean time in system, and prints them out at the end of the simulation.

The final step is to write a main process that creates all the entity processes and connects these processes appropriately.

Scheduler Process

Delays in simulated time -- such as the service time delay -- are handled by a scheduler process. (Simulated time is different from actual time, so we cannot use the Concurrent C delay statement to simulate service or arrival delays.) This scheduler process maintains the current simulated time and advances it appropriately. You can think of the scheduler as maintaining a clock that gives the current simulated time. For each delay request from a process, the scheduler determines the simulated time at which the process is to be reactivated, and saves this request in an activation request list. When all processes are waiting for delays to expire, the scheduler searches this list for the entry with the lowest activation time. The scheduler then advances the simulated clock to this time, removes this entity from the list, and reactivates the selected process. If several processes are waiting to be reactivated at the same simulated time, the scheduler awakens all of them simultaneously. Any computation done by a process takes place in zero simulated time.

One complication is that a process can be waiting for an event other than an explicit delay request. For example, suppose that a server process tries to take an item from an empty queue. The server process waits for the queue process, which is waiting for a source process to put an item into the queue. The source process is waiting for a delay to expire, at which point it will place a new item in the queue. Thus, the scheduler advances the simulated time when every process is either waiting for a delay request to expire or is waiting for some event that will be generated by a process that is waiting for a delay request to expire.

Before showing how the scheduler process can determine when this has happened, we need to introduce some formalism. At any given time, each entity process is in one of these three states:

waiting: Waiting for an explicit delay request from the scheduler

active:  Computing in zero simulated time

passive: Waiting for something other than a delay request

As an example, consider a source process that waits for a random inter-arrival delay and then places a job on the queue. The source process starts in the active state. It enters the waiting state when it delays for the inter-arrival time; it becomes active again when the scheduler reactivates it. If the Q is not full, the source process stays active while it places the job in the queue. However, if the queue is full, the source process enters the passive state, and remains passive until some server process creates space by removing a job from the queue.

Thus the scheduler accepts delay requests until there are no active entity processes, at which time the scheduler advances the simulated clock and reactivates the appropriate process(es). The scheduler knows the number of processes waiting for the delay requests, so all it needs to know is the number of passive processes. To give the scheduler this information, we will call a scheduler transaction whenever a process changes state from active to passive or vice versa.

A Two-State Queuing Network

We will develop a simulation program for the queuing network in Figure 2. In this network, jobs enter queue 1 from a Poisson source (exponential inter-arrival times). The arrows indicate the direction in which jobs flow. The first stage has two servers (servers 1.1 and 1.2), each of which takes the next available job from a common queue. The second stage has one server; it takes jobs from the second queue, and discards them when done. The queues are FIFO (first-in-first-out), and can hold at most 100 jobs. If a queue is full, the server or source putting an item into this queue is blocked until space is available. Similarly, if a queue is empty, the server taking an item from the queue is blocked until an item is available (if more items are expected) or an end-of-file is indicated to the server. The service times are exponentially distributed, and are determined when a job enters the system. Servers 1.1 and 1.2 run at half the speed of server 2; that is, if server 2 takes x time units to process a job, server 1.1 takes 2x units.

Structure of the Simulation Program

In this section, we present the interfaces to the processes in the simulation program for the queuing network described in the previous section; the next section describes how these processes are implemented. This simulation program uses four types of processes:

sched:  Simulated-time scheduler
process queue:  Finite-capacity FIFO queue
source: Source of arriving jobs
server: Single-input, single-output server

The main process creates these processes and connects them appropriately.

The simulation program terminates after simulating the specified number of jobs. The source process terminates after generating these jobs. Before terminating, the source process informs its output queue that no more jobs will arrive. When the queue is empty (and no more jobs are expected), the queue process informs the servers that no more jobs will arrive. Each server then prints its statistics, tells its output queue that it is done, and terminates. In this way the queues are drained automatically and all processes eventually terminate.

Strictly speaking, this causes our statistics to be distorted by the "edge effects" of filling up the queues at the beginning of the simulation. And draining them at the end. However, we will simulate enough jobs to mitigate this distortion.

Scheduler Process Interface

The scheduler process manages simulated time. The time units are arbitrary. We will refer to the processes that call the scheduler's transactions as the "clients" of the scheduler. The specification of the scheduler process is in Listing One, page 71.

A delay request requires calling the transactions reqDelay and wait. The client process first calls reqDelay, giving the number of time units to delay, and then calls wait, giving the value returned by reqDelay. The wait transaction returns the simulated time as the end of the delay. Thus, if s is the scheduler process, the following statement delays the calling process for ten time units, and saves the time at the end of the delay in ts: ts = s.wait(s.reqDelay(10));

The scheduler accepts reqDelay calls until it has received a request from every active client process. Then the scheduler accepts the wait call from the process with the smallest delay request.

For this to work, the scheduler must know the number of its client processes, and must know how many of them are in the passive state. Each client process calls transaction addUser when it starts, and calls transaction dropUser before it terminates, so that the scheduler can maintain a client count. Whenever an active client process becomes passive, the scheduler is informed by calling transaction passive. This call is made by the server process that forces a client to wait. Whenever a passive process becomes active, the process that makes it active calls the scheduler's active transaction. Thus the scheduler can determine the number of passive processes.

Source Process Interface

The source process has the specifications shown in Listing Two, page 71. Arrivals are Poisson, and service times are exponentially distributed. The process parameters define the output queue, the mean values for the distributions, and so on.

The name parameter is a symbolic name, such as "source1," which the source process uses to identify itself when printing statistics (a simulation program could have several different source processes). The type name_t is a structure containing a character string:

  typedef struct {char str[20];} name_t;

This structure is passed by value to the source process. A simpler alternative would be to declare the parameter as a character pointer. However, if we do that, our simulation program will work only when run on an implementation that provides shared memory. Passing a structure by value, on the other hand, will work on any Concurrent implementation.

Server Process Interface

The server process has the specifications shown in Listing Three, page 71. If the output queue parameter is c_nullpid, then the server discards each job after processing it. The speed parameter is the relative speed of this server; this server takes xspeed time units to process a job whose service time is x.

Queue Process Interface

Each job in a queue is represented by a structure of type Item, as shown in Listing Four, page 71. We save the job arrival time in this structure so that we can calculate statistics for the time spent by jobs in the system.

The queue process simulates a FIFO queue, as shown in Listing Five (page 71). A queue process can have several clients. Clients are either consumers ("takers") or producers ("putters"). A source process is a producer; a server process is a consumer for its input queue and a producer for its output queue. Each producer process calls transaction addProd when it starts, and calls transaction dropProd before it terminates. Similarly, each consumer process calls transactions addCons and dropCons. This allows the queue process to keep track of the number of producer and consumer clients. When the queue is empty and the last producer has terminated, all subsequent take requests return an end-of-file indication. The queue process terminates when it has no more producers or consumers.

Transactions putReq and putWait put an item onto the queue, and transactions takeReq and takeWait take the next item from the queue. However, client processes do not call these transactions directly. Instead, these processes call functions qPut and qTake, which perform the put and take operations. Both functions wait until the operation can be performed. If an operation cannot be completed immediately, the queue process informs the scheduler process of the client's change of state (active to passive or passive to active). Function Take returns 1 if able to take (get) an item, or 0 on end-of-file. qPut and qTake are the interface functions for the queue process; they hide a complicated transaction interface.

Example 2 takes jobs from the queue qFrom and places them on the queue qTo and continues until qFrom is empty and all its producers have terminated. The implementations of the qPut and qTake functions, and the body of the queue process, will be described later.

Example 2: Moving jobs between queues

  qItem item;
  process queue qFrom, qTo;

  while (qTake (qFrom, &item))
    qput (qTo, item);

Statistical Functions

We will use a simple statistical package that calculates the mean and standard deviation of a set of values, as shown in Listing Six, page 71. The statistics are kept in a structure of type stats. Each of the functions declared earlier takes a pointer to such a structure as its first argument. Function stInit initializes the structure. Function stVal updates the statistics to reflect a new value. Functions stMean and stDev return the mean and standard deviation of these values. The random number generator errand(m) returns an exponentially distributed integer whose mean is the inter value m. (Strictly speaking, errand cannot be exponential because it returns integer values instead of floating-point values. However, we will use large mean values so that the round-off effect will be insignificant.)

Example 3 shows the code fragment that generates 1000 random numbers and prints their average.

Example 3: Generating random numbers

  stats mystats;
  int   i;

  stInit (&mystats);
  for (i = 1; i <= 10000; i++)
    stVal (&mystats, erand(100));
  printf ("Mean is %lf\n", stMean
                     (&mystats));

Process Implementations

The main process, shown in Listing Seven, page 71, creates the processes that simulate the entities and connects them together. Function makeName creates and returns a structure of type name_t that contains the string passed as an argument.

The structure of the queuing network -- the number of stages, the number of servers per stage, and so on -- is determined by main and can easily be changed. For example, we can add a third server to the first stage just by replicating the line that creates server 1.2, but with a different symbolic name.

Notice that main sleeps for a few seconds before terminating. This delay allows other processes to call the transaction addUser of the scheduler and to make their initial delay request (if any). The scheduler will now allow any other process to proceed until main calls transaction dropUser. Delaying main is necessary to allow other processes enough time to register with the scheduler. Otherwise, the scheduler will terminate immediately after main calls transaction dropUser.

We have assumed that all processes will be dispatched and will perform their initialization within two seconds. To avoid this assumption, we could modify main to call an initialization transaction belonging to each of the processes that it creates; this initialization transaction would call transaction addUser of the scheduler.

Source and Server Processes

Listing Eight, page 71, shows the body of the source process. First, the source process initializes the statistics counters, and tells the scheduler and queue processes that they have one more client. Then the source process repeatedly calculates an inter-arrival time, delays for this amount, generates a service time, and places a job in the output queue. Finally, the source process prints its statistics and tells the scheduler and the output queue that they have one less client. This pattern -- initialization, main processing, and termination -- is common in our processes.

Listing Nine, page 71, shows the body of the server process. It starts by telling the scheduler, the input queue, and the output queue to increment their client, consumer, and producer counts, respectively. The server process then repeatedly takes a job from its input queue, delays for the job's service time, and places the job on its output queue. Finally, the server process prints statistics on the time that job spent in the system. The server process also tells its input and output queues and the scheduler to decrement their client counts.

Queue Process

Listing Ten, page 71, shows the source for the interface functions for the queue process. A put or take request that can be done immediately requires one transaction call; otherwise two calls are required. As mentioned earlier, functions qPut and qTake, which are called by the clients of a queue process, implement the necessary protocol for interacting with the queue process. To putan item, qPtr first calls putReq. If the queue is not full, putReq puts the item and returns -1. If the queue is full, putReq returns a non-negative "ticket" value. In this case, qPutcalls the queue's putWait transaction, giving the ticket returned by putReq. When space becomes available in the queue, the queue process accepts transaction putWait and puts the item on the queue.

Function qTake is similar; the only difference is that the takeReq transaction returns a structure that contains either the item taken or the ticket value.

The body of the queue process uses several auxiliary functions to manipulate the data structures that maintain the queue's state. All the information necessary to define a queue is collected in a structure of type qInfo. The queue process passes a pointer from this structure to the auxiliary functions shown in Listing Eleven, page 72.

Member items of the structure type qInfo point to a circular buffer qItem structure. The integer head is the index of the next item that can be taken from the queue, and tail is the index of the next slot into which an item can be put. We keep statistics on "time in queue" and "number in queue." The latter is sampled when a job is placed in a queue.

Ticket numbers returned by putReq and takeReq are assigned circularly from 0 and 9999. Put and take tickets are assigned independently. Structure tInfo contains information about pending tickets. We need one instance of this structure for pending puts, and another for pending takes. Listing Twelve, page 72, shows the body of the queue process.

The queue process starts by initializing the qInfo structure, and then waits for the first producer's addProc request. The queue process then accepts requests as long as it has any clients. Before terminating, the queue process prints the statistics that it has recorded.

We will now analyze handling of the "take" requests by the queue process in detail. There are two alternatives for takeReq with mutually exclusive guards. The first of these alternatives accepts requests whenever there are items in the queue and when there are no pending takeWait requests (that is, no pending take tickets). This alternative calls takeItem, which removes an item from the buffer, updates the statistics, and sets the ticket and gotItem fields to indicate that an item was taken immediately. Also, if another client is blocked on a put request, takeItem tells the scheduler that the process is now active. This ensures that the scheduler will not honor another delay request until the process that is blocked on the put request is able to execute its pending putWait transaction. This takeReq alternative returns the item selected by takeItem to the client process.

The second takeReq alternative accepts requests whenever the queue is empty. It assigns a ticket, updates the ticket data, tells the scheduler that an active client has just become passive, and returns the ticket to the client.

There are two alternatives for takeWait. When there are items in the queue, the first alternative accepts the request with the oldest pending ticket. This alternative takes an item from the buffer and returns it to the client. When end-of-file has occurred (when there are no producers and the queue is empty) the second takeWait alternative accepts any remaining takeWait calls. This alternative returns an end-of-file indicator to the client.

The put requests are handled similarly. Transactions addCons, dropCons, addProd, and dropProd just update the consumer and producer counts. Listing Thirteen, page 72, shows the auxiliary functions called from the body of the queue process.

Scheduler Process

The scheduler keeps a list of pending delay requests, ordered by the time at which the client is to be reactivated. List entries are pairs (t, n) where t is the simulated time and n is the number of processes to be awakened at that time. Listing Fourteen, page 72, is an outline of the body of the scheduler process sched.

After initialization, the scheduler process repeatedly accepts requests until all clients become passive. For each delay request, the scheduler calculates the absolute time at which the requesting process should be reactivated, and adds an appropriate entry to the list of pending delay requests. This list is ordered by increasing reactivation times. List entries are allocated from an array; the reqDelay transaction uses the array index as a ticket value. When all client processes are passive, the scheduler takes the next request from the list, advances the simulated time, and accepts wait requests from all the clients waiting for the new simulated time.

The suchthat clause performs a linear search through the pending requests; this will be inefficient if there are hundreds or thousands of pending delay requests. For small simulations, however, the simplicity of the scheduler makes up for the inefficiency that may result from this linear search. If this search proves to be expensive in large simulations, we can eliminate the suchthat clause by making the client process supply a transaction pointer with which the scheduler will call the client when it is ready to accept the client's delay request.

A Feedback Queuing Network

Consider the feedback queuing network in Figure 3. This network differs from the previous network in two ways:

To simulate this network, we can reuse the server, queue, and scheduler processes. We will need two new process types: A source process that puts the job in the smaller of two queues, and another server process that discards a job with some probability.

The new source process will be similar to the original source process, except that it has two output queue parameters, outQ1 and outQ2. After generating a job, the new source process puts the job on the smaller of the two output queues, which it can do with the code fragment shown in Example 4.

Example 4: Choosing queues

  if (out Q1.itemCnt() < outQ2.
                     itemCnt())
    qPut (outQ1, item);
  else
    qPut (outQ2, item);

The new server process is similar to the original server process, except that it has a new parameter probDone, which is the probability of discarding a job after processing it. Because this new server is a feedback server, it does not tell the output queue that it is a consumer process. (If it did this, the simulation would never terminate. That is, the server will not terminate until its input queue indicates end-of-file. This will not happen until all of the input queue's producers have terminated. If the server process were one of these producers, it would never terminate.)

Extensions and Modifications

We will now discuss some ways of extending our simulation program to model other queuing programs.

Other Queue Disciplines -- Our queue process can be easily modified to use a last-in-first-out (LIFO) discipline, or a more complicated queue discipline, such as shortest-service-time-first. This would require simple changes to the putItem and takeItem functions, but the basic structure of the queue process would remain unchanged.

Servers with Multiple Output Queues -- Modeling a more general queuing network in which servers output to multiple queues is straightforward. For example, the server could be given an array of output queue process identifiers, and the server could randomly select the output queue for each job. Alternatively, the server could send each job to all of its output queues.

More General Networks -- The structure of the queuing network is determined by how the main process creates the source, server, and queue processes, and how it connects them together. This is easy to extend; we could even write a main process that reads a network description at run time.

Statistics -- In general, we want to measure the steady-state performance of a queuing system. Our processes do not. Instead, they measure the entire simulation run, including the initialization phase (starting with an empty queue) and the termination phase (draining the queues after the source stops). We compensate for this by simulating many jobs, so that the initialization and termination phases are small compared to the steady-state period.

A better technique is to monitor the statistics during the simulation and terminate the simulation when the statistics have been stable for a long time. We can accomplish this by adding a giveStats transaction to each entity process. This transaction will return the current statistics of the process to which it belongs. After starting all processes, main would repeat this cycle until it decided that the simulation had reached a steady state. Thus, main would look something like Example 5. Transaction giveStats is easy to add to the queue process; it is just another alternative in the central select statement. For the source and server process, the central loop would be modified to conditionally accept a giveStats transaction (that is, accept a call if one is available, but continue if no call is pending).

Example 5: An example of main( )

  main()
  {

      create all processes
      while (1) {
         s.wait (s.reqDelay (10000));
         for each entity process p
            entity-stats = p.
                 giveStats ();
         if (statistics have been stable
                         for long enough)
            break;
      }
      print final statistics
      for all processes p
         c_abort (p);
  }

_DISCRETE EVENT SIMULATION IN CONCURRENT C_ by Narain Gehani and W.D. Roome

[LISTING ONE]



File: sim-sched.h

process spec sched() {
  trans long now()   /* return simulated time */
  trans long reqDelay(long del);
                     /* request delay */
  trans long wait(long x); /* wait for delay */
  trans void addUser();    /* add client */
  trans void dropUser();   /* drop client */
  trans void passive();    /* client is passive */
  trans void active();     /* client is active */
};






[LISTING TWO]


File: sim-source.h

process spec source(
  process sched s,    /* scheduler */
  process queue outQ, /* output queue */
  long meanIat,   /* mean inter-arrival time */
  long meanServt, /* mean service time */
  long nGen,      /* number to generate */
  name_t name);   /* symbolic name */






[LISTING THREE]


File: sim-server.h

process spec server (
  process sched s,    /* scheduler */
  process queue inQ,  /* input queue */
  process queue outQ, /* output queue */
  double speed,       /* speed of server */
  name_t name);       /* symbolic name */






[LISTING FOUR]


File: sim-qItem.h

typedef struct {
       /* Public: */
  long servt;   /* service time for job */
  long arrive;  /* time job arrived */

       /* Private to queue process: */
  long qEnter;  /* time entered queue */
  int  ticket;  /* ticket from takeReq */
  int gotItem;  /* !=0 if item was taken */
} qItem;






[LISTING FIVE]


File: sim-queue.h

process spec queue(
    process sched s,       /* scheduler */
    int maxSize,           /* max queue size */
    name_t name)           /* symbolic name */
{
  trans int  itemCnt();    /* return queue size */
  trans void addProd();    /* add producer */
  trans void dropProd();   /* drop producer */
  trans void addCons();    /* add consumer */
  trans void dropCons();   /* drop consumer */

          /* start and finish put request:*/
  trans int putReq(qItem);
  trans void putWait(int, qItem);

          /* start and finish take request:*/
  trans qItem takeReq();
  trans qItem takeWait(int);
};






[LISTING SIX]


File: sim-stats.h

typedef struct {
  long    nv;    /* number of values */
  long    maxv;  /* max value */
  double  sumv;  /* sum of values */
  double  sumsq; /* sum of squares */
} stats;

void   stInit(); /* initialize structure */
void   stVal();  /* add new value */
double stMean(); /* return mean value */
double stSdev(); /* return standard deviation */
long   erand();  /* exponential random number */







[LISTING SEVEN]


File: sim-main.cc

name_t makeName(name) char *name;
{  name_t ret;
   strcpy(ret.str, name);
   return ret;
}

main()
{  process sched s;
   process queue q1, q2;
   long nGen=100000;  /* number of jobs */
   long servt=500;    /* mean service time */
   long iat=1000;     /* mean inter-arrival */

/* Create virtual time scheduler. */
  s = create sched(); s.addUser();

/* Create queues and servers. */
  q1 = create queue(s, 100, makeName("Q1"));
  q2 = create queue(s, 100, makeName("Q2"));
  create source(s, q1, iat, servt, nGen,
                makeName("Src"));
  create server(s, q1, q2, 0.5,
                makename("Serv1.1"));
  create server(s, q1, q2, 0.5,
                makename("Serv1.2"));
  create server(s, q2, c_nullpid, 1.0,
                makename("Serv2"));

/* Wait for all processes to start. */
  delay 2.0;  s.dropUser();
}







[LISTING EIGHT]


File: si-source.cc

process body source(s, outQ, meanIat,
                    meanServt, nGen, name)
{  stats  iat, servt;
   qItem  item;
   long   i, t;

/* Initialization phase */
   s.addUser();
   outQ.addProd();
   stInit(&iat);  stInit(&servt);

/* Main processing phase: generate jobs. */
    for (i=1; i <= nGen; i++) {
      t = erand(meanIat);
      stVal(&iat, t);
      item.arrive = s.wait(s.reqDelay(t));
      item.servt = erand(meanServt);
      stVal(&servt, item.servt);
      qPut(outQ, item);
    }
/* Termination phase: print stats, etc. */
    print statistics;
    outQ.dropProd();  s.dropUser();
}







[LISTING NINE]


File: sim-server.cc

process body server(s, inQ, outQ, speed, name)
{  stats sysTime; /* time-in-system */
   qItem item;
   long ts;

   s.addUser();  stInit(&sysTime);
   inQ.addCons();
   if (outQ != c_nullpid)
     outQ.addProd();

   while (qTake(inQ, &item)) {
     ts = s.wait(s.reqDelay(item.servt/speed));
     stVal(&sysTime, ts - item.arrive);
     if (outQ != c_nullpid)
       qPut(outQ, item);
   }

   print statistics;
   if (outQ != c_nullpid)
     outQ.dropProd();
   inQ.addCons();  s.dropUser();
}







[LISTING TEN]


File sim-qPut.cc

/* Put item onto queue; wait if full. */
void qPut(q, item)
   process queue q;  qItem item;
{  int ticket = q.putReq(item);
   if (ticket >= 0)
      q.putWait(ticket, item);
}

/* Set *itemp to next item; wait if empty. */
/* Return 1 if item was taken, 0 on EOF */
int qTake(q, itemp)
   process queue q;  qItem *itemp;
{
   *itemp = q.takeReq();
   if (itemp->ticket >= 0)
      *itemp = q.takeWait(itemp->ticket);
   return itemp->gotItem;
}






[LISTING ELEVEN]


File: sim-qInfo.h

/* *tInfo: Describe outstanding tickets. */
typedef struct {
   int acc;    /* next ticket to accept */
   int give;   /* next ticket to give out */
   int nPass;  /* pending passive clients */
} tInfo;

/* qInfo: Describe one queue. */
typedef struct {
   process sched s;  /* scheduler process */
   int     max;      /* max queue size */
   int     nProd;    /* number of producers */
   int     nCons;    /* number of consumers */
   name_t  name;     /* name of queue */
   stats   qTime;    /* time-in-queue stats */
   stats   qSize;    /* for queue size stats */
   int     nElem;    /* items in queue */
   int     head;     /* index head of queue */
   int     tail;     /* index tail of queue */
   qItem   *items;   /* alloc'd array of items */

/* Describe pending put, take requests: */
   tInfo   pPut, pTake;
} qInfo;






[LISTING TWELVE]


File: sim-queue.cc

process body queue(s, maxSize, name)
{  qInfo q;  qItem x;
   initialize qInfo structure;
   accept addProd()  { q.nProd++; }

   while (q.nProd+q.nCons >0) {
      select {
      (q.nElem<q.max && q.pPut.acc==q.pPut.give):
         accept putReq(item)
            { putItem(&q, &item);  treturn -1; }
      or (q.nElem==q.max):
         accept putReq(item)
            { s.passive();  q.pPut.nPass++;
              treturn incTick(&q.pPut.give); }
      or (q.nElem<q.max):
         accept putWait(qt, item)
            suchthat (qt == q.pPut.acc)
         {  putItem(&q, &item);
            incTick(&q.pPut.acc); }
      or (q.nElem>0 && q.pTake.acc==q.pTake.give):
         accept takeReq()
            {  treturn takeItem(&q); }
      or (q.nElem==0):
         accept takeReq()
            {  x.ticket = incTick(&q.pTake.give);
               s.passive();  q.pTake.nPass++;
               treturn x; }
      or (q.nElem>0):
         accept takeWait(qt)
               suchthat (qt == q.pTake.acc)
            {  incTick (&q.pTake.acc);
               treturn takeItem(&q); }
      or (q.nProd==0 && q.nElem==0):
         accept takeWait(qt)
            {  x.gotItem = 0;  treturn x; }

      or accept itemCnt()  { treturn q.nElem; }
      or accept addCons()  { q.nCons++; }

      or accept addProd()  { q.nProd++; }
      or accept dropCons() { q.nCons--; }
      or accept dropProd() { q.nProd--; }
      }
   /* On EOF, make pending takers active. */
      if (q.nProd==0 && q.nElem==0)
          for (; q.pTake.nPass > 0; q.pTake.nPass--)
             s.active();
   }
   print statistics;
}






[LISTING THIRTEEN]


File: sim-qAux.cc

/* Increment ticket, return prev value. */
int incTick(tp)
   int *tp;
{  int t = *tp;
   *tp = (t+1)%10000;
   return t;
}

/* Remove and eturn the next item in the queue. */
qItem takeItem(qp)
   qInfo *qp;
{
   qItem item;
   item = qp->items[qp->head];
   item.ticket = -1;
   item.gotItem = 1;
   stVal(&qp->qTime, qp->s.now() - item.qEnter);
   qp->nElem--;
   qp->head = (qp->head+1) % qp->max;
   if (qp-pPut.nPass >0)
      { qp->s.active(); qp->pPut.nPass--; }
   return item;
}

/* Add item *itemp to queue. */
void putItem(qp, itemp)
   qInfo *qp; qItem *itemp;
{
   qp->items[qp->tail] = *itemp;
   qp->items[qp->tail].qEnter = qp->s.now();
   stVal(&qp->qSize, qp->nElem);
   qp->nElem++;
   qp->tail = (qp->tail+1) % qp->max;
   if (qp->pTake.nPass > 0)
      {  qp-<>s.active(); qp->pTake.nPass--; }
}






[LISTING FOURTEEN]


File: sim-sched.cc

process body sched()
{  int  nUser, nAct, i;
   long curTime = 0;  /* Current simulated time */
   ordered list of pending delay requests;

   initialize pending delay list data structures;
   accept addUser() { nUser = nAct = 1; }
   while (nUser >0) {
      select {
         accept addUser()  { ++nUser; ++nAct; }
      or accept dropUser() { --nUser; --nAct; }
      or accept active()   { ++nAct; }
      or accept passive()  { --nAct; }
      or accept now()      { treturn curTime; }
      or accept reqDelay(x)
         { add request for curTime+x to pending delay list;
           nAct--;  treturn request-index; }
      }
      if (nAct == 0 && pending delay list is not empty) {
         curTime = time of request at head of list;
         nAct = number of processes waiting for that time;
         for (i = 1; i <= nAct; i++)
            accept wait(x)
               suchthat (x == index-of-head-request)
                  { treturn curTime; }
         discard request at head of pending delay list;
      }
   }
}

Example 1: The Server Process

while (1) {
  delay for random inter-arrival time;
  generate item;
  call queue process to put item in queue;
}

Example 2: Moving Jobs Between Queues

qItem item;
process queue qFrom, qTo;

while (qTake(qFrom, &item))
  qput(qTo, item);



Example 3: Generating Random Numbers

stats mystats;
int   i;

stInit(&mystats);
for (i = 1; i <= 10000; i++)
  stVal(&mystats, erand(100));
printf("Mean is %lf\n", stMean(&mystats));


Example 4: Choosing Queues


if (outQ1.itemCnt() < outQ2.itemCnt())
  qPut(outQ1, item);
else
  qPut(outQ2, item);


Example 5: An Example of main()

main()
{
   create all processes
   while (1) {
      s.wait(s.reqDelay(10000));
      for each entity process p
         entity-stats = p.giveStats();
      if (statistics have been stable for long enough)
         break;
   }
   print final statistics
   for all processes p
      c_abort(p);
}












Copyright © 1989, Dr. Dobb's Journal

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.