Simulating Hypercubes in Unix Part I

In this two-part article, our authors describe how you can simulate the execution of a hypercube program on a standard UNIX system. This month, they focus on partitions, the basic building blocks of a hypercube system. Next month, they present the source code for the simulator and discuss how to use the system.


December 01, 1992
URL:http://www.drdobbs.com/architecture-and-design/simulating-hypercubes-in-unix-part-i/184408898

Figure 1


Copyright © 1992, Dr. Dobb's Journal

DEC92: SIMULATING HYPERCUBES IN UNIX PART I

SIMULATING HYPERCUBES IN UNIX PART I

Parallel processing for UNIX systems

Jeffrey W. Hamilton and Eileen M. Ormsby

Jeff was lead programmer for IBM's W4 Multiprocessing Adapter. He can be contacted at [email protected]. Eileen is a staff programmer for IBM's FSD, working with W4 application development. She can be reached at [email protected].


Parallel computers are rapidly coming online, and a good number of these are hypercubes. Unfortunately, few of us can afford to have our own personal hypercube, so in this two-part article, we'll describe how you can simulate the execution of a hypercube program on a standard UNIX system. We'll also show you how to use a network of systems to execute your programs in parallel.

Our simulator, SIMCUBE, is designed to simulate an Intel iPSC/2 hypercube. While other hypercubes have different application interfaces, the basic ideas remain the same. A little time with your system's reference manual and good understanding of the SIMCUBE code should allow you to re-create any hypercube environment. This month, we'll focus on partitions, the basic building blocks of a hypercube system. Next month, we'll present the source code for the simulator and discuss how to use the system.

SIMCUBE was created so we could move one of several hypercube applications onto another system that ran UNIX. We wanted a reasonable simulation of the hypercube's environment with little or no modification of the application. If we could move one application with no changes, then moving the other 125 applications would be a snap. Another important goal was reasonable simulation speed. What is the use of simulating another computing environment if the simulation runs 100 times slower? Finally, we restricted ourselves to standard UNIX system calls. This permits the simulator and application to be moved to any UNIX environment. It is possible to combine different UNIX platforms to create one simulated hypercube.

What is a Hypercube Anyway?

A hypercube is a collection of computing elements (or nodes) joined to work cooperatively. Each node consists of a processor and memory. Some of the nodes may also have connections to I/O devices. Each node is joined to a certain number of nearby nodes. The number of connections is referred to as the order of the hypercube. Most hypercubes are order-4 machines. How the elements are joined defines the topology of the hypercube. The topology varies from manufacturer to manufacturer. The combination of order and topology affects the distance messages must travel between nodes. Ideally, you would like every node to be immediately connected to every other node. However, the number of wires between the nodes would rapidly get out of hand as the number of nodes in a machine increased. Therefore, compromises must be made. Some applications spend a fair amount of time organizing their work so that the communication path between any two subtasks is as short as possible, but for most applications it is easier to imagine each node as a computer system on a local area network.

Figure 1 shows a possible topology for an eight-node hypercube of order 4. Each computing element communicates with its neighbors by passing messages. Node 0 can send messages directly to node 1. To send a message to node 2, node 0 must first send the message to an intermediate node, which will forward the message. In the example given, each node can reach half the nodes in one step and the remaining nodes in two steps. In most hypercubes, the message routing is totally handled by hardware, so the user does not have to be concerned about how messages flow through a hypercube. However, if you want to get the most performancefrom a hypercube, you must minimize the number of steps a message takes to reach its destination node.

Node 0 has a special role: All communications to the outside world flow through this node. On Intel hypercubes, node 0 is connected to the system-resource manager (SRM)--a PC that controls the communications with the hypercube and allocates portions of the hypercube to applications.

Programming for a Hypercube

Programs are loaded into the hypercube from the command line of the SRM or by evoking a "host" program, which in turn loads the nodes of the hypercube. Most hypercube systems have a barebones executive running on each node to work with an application program. Each node can only execute one program at a time. There are no complex operating-system functions such as scheduling or virtual-memory support on the hypercube. It is like having a personal computer DOS on each node.

The SRM can load each node with a program directly or load node 0 with a program which in turn loads the remaining nodes. You do not have to load each node with the same program, but most programmers do--it makes life much simpler.

In SIMCUBE, we only consider the case of a host program loading the same program on all the nodes. Extending the program to handle other cases will be left as an exercise for the reader.

Hypercube Partitions

Few applications need all the nodes in a hypercube, and it is expensive to dedicate a whole machine to running one job at a time. Therefore, the hypercube is typically broken down into partitions--virtual hypercubes. The typical number of nodes in a partition is a power of two (1, 2, 4, 8, and so on).

SIMCUBE can simulate partitions of any size, including odd values like 3 or 7. In a hypercube, a "partition" refers to all the nodes within a virtual hypercube. In SIMCUBE, a "partition" will refer to the number of nodes simulated on one computer system. A collection of SIMCUBE partitions will simulate a single hypercube partition.

In a hypercube, a "node" is a semi-independent computer. In SIMCUBE, a "node" is a process. Each SIMCUBE partition will have NUMBER_IN_PART nodes, defined in the file cube.h (Listing One, page 108). You can set it to any value that makes sense for your particular system. Since an application requests a certain number of nodes, the value of NUMBER_IN_PART determines the number of SIMCUBE partitions needed.

Partition-manager Overview

The partition manager (PM), see Listing Two (page 108), is responsible for handling the communications between partitions. PM is also responsible for starting and terminating the node processes. Since it is responsible for starting the processes, PM can ensure that the communication paths are in place before the application process begins execution. For an application that does not need multiple partitions, the PM still executes and starts the application processes.

The file .pmrc contains a list of the computer systems (host names) which can execute a SIMCUBE partition. The host names are read sequentially from the file, until the number of partitions needed have been allocated to host systems. The first name in the .pmrc file must be the local-system name. The remaining names are the remote systems on the network. How the PMs are started is discussed in the description of the load command.

Partition Functions

Managing a partition is accomplished with the cubeinfo, getcube, setpid, load, relcube, and killcube commands. The cubeinfo function returns information about how the total system is subdivided and which users own which partitions. Since we will be simulating one partition in a hypercube, and it will always be available to use, this function is just a stub for future improvements.

The getcube function makes the request to the SRM for a partition. Only one of the parameters, cubetype, is interesting to our simulator. The cubetype parameter is a string that describes the desired size of the partition and the type of computing element needed to run the application. The getcube function only looks for the size information. The size can be expressed as either the total number of nodes or the dimension of the cube. Dimensions begin with the letter d.

The size of the requested hypercube partition determines how many node-processes are created by SIMCUBE. Since we are only simulating a single hypercube partition, we assume that the getcube command is called once per program execution.

In a hypercube, the host program running on the SRM can control and communicate with multiple partitions. To distinguish which group of processors is being addressed, the program uses setpid to assign a partition identifier to the partition immediately after obtaining the partition with the getcube call.

For SIMCUBE, the partition identifier will be referred to internally as the "group number," since PID already has special meaning in UNIX applications. (PID is the process identifier and is used to track specific instances of programs running in the system.)

The load function is used to load a program into one or more nodes within a partition. There are actually two forms of this command: One loads a specific node with a program; and the other loads all nodes within the partition with a copy of the same program. For our simulator, we have only implemented the latter case. The rest of the discussion on load is referring to our simulated version of load.

The load function is the core of SIMCUBE for the host application. Before loading the application, the simulator is initialized. Space is allocated in memory for the various control structures of the simulator. The load function reads the .pmrc file for a list of available systems. It determines how many partitions to start and which systems to use. Once the argument list for PM has been set, the load function starts the local PM with an exec call and the remaining systems are started indirectly by execing the rsh command. PM is responsible for starting the application processes.

Interrupt handlers are set up so that if we or the program we load dies unexpectedly, there will be a chance to clean up. This is important under UNIX, because we are asking the system to set aside global resources for our exclusive use. If we do not notify the system that we are done with the resources, we could run out of resources on a subsequent execution of SIMCUBE.

The relcube function is called at the end of program execution to release the partition gained with the getcube function. For SIMCUBE, this command is just a placeholder.

The killcube function is called to force the termination of program that will not quit. The killcube function can be used to terminate a single node within a partition or to terminate the entire partition. For SIMCUBE, we only needed to implement the latter function.

We simulate the behavior of killcube by sending a SIGTERM signal to each PM, which in turn sends the same signal to each application-program node that it manages. Cleanup is done by all parties to release the shared system resources.

Partition-manager Functions

The PM sets up communications for the nodes, starts the nodes (forks the application processes), and performs any communications for the nodes between the partitions, as required. Each PM performs these functions for the nodes in its partition.

The PM is started by the load function. This function does a significant amount of preparation to start each PM.

When the PM process is started, it is passed the following: the name of the application program to execute; its partition number; the access key, so the first PM can communicate with the host process; the partition (group) number assigned by setpid; the number of nodes being simulated; and a list of all systems running the simulator.

Once PM starts, it determines which partition it is, how many nodes are running on it, and how many other partitions exist. For SIMCUBE, sockets are used to communicate between the partitions, and shared memory is used for nodes within a partition. PM sets up a socket to communicate with each remote partition.

As in the load function, the interrupt handling is set up so that appropriate cleanup can be performed upon application termination.

PM is responsible for forking the appropriate number of application processes (nodes). Each application process must know its node number and the total number of nodes working, in addition to other simulator-specific information. PM must pass this information to each application process.

In line with our design goal of minimizing alterations to the hypercube application, we wanted to find the least obtrusive way to pass this information to the application. Our choices were to pass it via either the argument list, a file, or an environment variable. Passing the information in the application program's argument list would cause too many alterations to the application program, while a file would be too slow and would hinder executing multiple SIMCUBE partitions on one system. We therefore chose the environment variable. Each application process needs the following information: the access key, so that a node can communicate with other nodes; the node number assigned to this particular node (process); the partition number assigned by the setpid function; and the total number of nodes being simulated. PM forks each application node, sets the environment variable appropriately, and then execs the application.

PM then waits for the application program to communicate with other nodes on remote partitions. PM is actually running as two processes. The server PM is always waiting to receive data on the socket coming from remote partitions. The client PM is always waiting for a node to send data to a remote partition. If a node needs to communicate with a node local to its partition, shared memory is used instead.

Next Month

In next month's installment, we'll discuss SIMCUBE's application environment and present the source code to the simulate.c program.



_SIMULATING HYPERCUBES IN UNIX_
by Jeffery W. Hamilton and Eileen M. Ormsby


[LISTING ONE]


/***** cube.h *****/

/* Hypercube Simulation definitions */
#define NUMBER_IN_PART 4            /* number of nodes in partition */
#define PM_PORT        6000

/* Maximum message sent between nodes */
#define MAX_MESSAGE_SIZE (1024 * 16)

typedef struct {
  char *name;             /* network name of the computer hosting partition */
  int  socket;            /* file descriptor for the socket */
  int  errfdp;            /* file descriptor for sending "kill" values */
  struct sockaddr_in addr;
} subpart;

typedef struct {
  int type;      /* message type sent with the message -1 or greater */
  int spid;      /* sender's group number (pid) */

  int snode;     /* sender's node number */
  int dnode;     /* node this message is destined for */
  int t_length;  /* total length of message */
  int length;    /* length of the message */
  char valid[NUMBER_IN_PART+2];     /* 0= no message */
  char msg[MAX_MESSAGE_SIZE];     /* Actual message contents */





[LISTING TWO]


/***** pm.c *****/
/* PARTITION MANAGER -- This program will run on all partitions used for an
** application. It is started via a remote execution call from "load".
** The main program gets the input arguments, sets a few variables and calls
** the Partition Manager subroutine which performs the following functions:
**  determines local partition information; allocates neccessary partition
**  structures; sets up interrupt handling to free system resources when the
**  application is terminated; sets up server portion of socket communications;
**  forks a client PM that sets up client portion of the socket communications,
**  waits for a node to request data to be sent to a partition, and sends data
**  over the sockets; forks and execs application children; performs server PM
**  functions that waits to receive data from the sockets and notifies
**  appropriate nodes when data has arrived.
** The PM server only receives data for its nodes, and the PM client sends
** data to a remote partition.
**    BASIC SOFTWARE ARCHITECTURE: The load module in the simcube library will:
**    1) Read the .pmrc file; 2) Determine how many partitions will be used
**    for this application; 3) Fork and exec a local PM and the appropriate
**    number of remote PMs PM is passed the name of the application program,
**    its partition number, the key value for PM to communicate to the host
**    process with the group number, the total number of application nodes,
**    the names of the other partitions running this application.
**   The initialization portion (init_simulator) which is called by the
**   application processes (nodes) will set up interrupt handling and create
**   the shared memory and semaphores necessary for communications
**   between local nodes and the Partition Manager.
*/

/* These functions allow a UNIX system to simulate a hypercube environment. */
#include <sys/types.h>
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/wait.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <sys/time.h>
#include <sys/utsname.h>
#include <sys/param.h>
#include "cube.h"


#define  NUM_TRIES 60
#define  min(x,y) (((x) < (y)) ? (x) : (y))

/* Function Prototypes */
void *malloc(int size);
void *shmat(int, void*, int);

int pm (char *filename, char *pmsites[]);
void setup_server_sockets(void);
void pm_server(void);
void setup_client_sockets(void);
void pm_getmsg(void);
void abort_prog(void);
void sig_terminate(int sig, int code, struct sigcontext *scp);
void unexpected_death(int sig, int code, struct sigcontext *scp);
int _killcube(int node, int pid);
int init_shared_mem(void **pointer, int size, int key);
int init_semaphore(int *semid, int size, int value, int key);
int semcall_all(int semid, int size, int operation);
int semcall_one(int semid, int num, int operation);
int numnodes(void);
int numparts(void);
int mypart(void);
int partof(int node);
int pm_partof(int node);
int numbuffers(void);
int mybuffer(void);
int bufferof(int node);
int mynode(void);
int myhost(void);
void pm_client(void);

/* Local, Private Information */
fd_set node_part_set, temp_set;
             /* node_part_set is the variable that FD_XXX commands are */
             /* applied to. Definitions of fd_set structure, and FD_ZERO, */
             /* FD_SET, FD_CLR, and FD_ISSET macros are in <sys/types.h> */
             /* node_part_set will have socket file descriptors.*/
static int num_parts;         /* number of partitions */
static int my_part;           /* partition this process is in */
static int nodes_in_part;     /* number of nodes in this partition */
static subpart *partition;    /* list of partition information */
static int base;              /* base key value for allocating shared data */
static int my_node;           /* node number for this process */
static int my_group;          /* group id for this process */

                              /* There are two groups, host communications */
                              /* and inter-node communications */
static int num_nodes;         /* total number of nodes in all partitions */

static int msgavail = -1;     /* semaphores indicating message is available */
static int msgfree = -1;      /* semaphores indicating buffer is free */
static int next_message = -1; /* which message is to be received next */
static int shmid_m = -1;      /* id of shared area for messages */
static message *buffer = NULL;/* communication areas */
static int *children = NULL;  /* process ids of all child processes */
static int child_index = 0;   /* number of children created */
static int pmserver_pid = 0;  /* pid of pmserver */

/* Main:  reads arguments from command line, places them in local variables
** and calls pm. (Local variables are not necessary, but enhances readability)
** NOTE:  ONLY TEN NODES (PM SITES) ARE READ FROM THE COMMAND LINE */
int main(int argc, char *argv[])
{
   char *filename;
   char *pmsites[16];
   int  i;
   if (argc < 7 ) {
      fprintf (stderr, "PM main: error not enough arguments\n");
      fflush(stderr);
      exit(-1);
   }
   filename = argv[1];
   my_part = atoi(argv[2]);
   base = atoi(argv[3]);
   my_group = atoi(argv[4]);
   num_nodes = atoi(argv[5]);
   for (i = 0; i < argc - 6; i++) {
     pmsites[i] = argv[i + 6];
   }
   pm (filename, pmsites);
}

/* pm -- Determines partition information, sets up signal handling, sets up
** server sockets, forks client pm, forks application children. PM splits the
** application into NUMBER_IN_PART processes. The partition number is passed
** as an input parameter. The starting node number is the partition number
** NUMBER_IN_PART and remaining processes will be numbered consecutively.
** Shared memory will be allocated to serve as a communications vehicle within
** a partition. Sockets used between partitions to allow multiple UNIX systems
** to be combined to create a larger set of CPUs to be applied to a problem. */
int pm (char *filename, char *pmsites[])
{
   register int i, pid;
   char temp[128];              /* used to set up environment variables */
   char part_names[64];
   int  start_node;
   int  dest_node;
   /* Determine how many other partitions exist */
   num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
   /* Determine which node is the first for this partition */
   start_node = mypart() * NUMBER_IN_PART;
   /* Determine how many nodes are in this partition (1-4) */
   nodes_in_part = numnodes() - (mypart() * NUMBER_IN_PART);
   nodes_in_part = min(NUMBER_IN_PART, nodes_in_part);
   /* Set PM's node to be the last node on this partition */
   /* (The children will be start_node through start_node + nodes_in_part-1) */
   my_node = nodes_in_part;
   /* Create the structure to hold the partition names and socket fds */
   if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
      fprintf(stderr,"PM %d SERVER: insufficient memory\n, mypart()");
      fflush(stderr);
      return -1;
   }
   memset(partition, 0, num_parts * sizeof(subpart));
   /* Catch these signals so PM can notify children to clean up */
   signal(SIGINT,sig_terminate);
   signal(SIGTERM,sig_terminate);
   signal(SIGQUIT,sig_terminate);
   /* Watch for unexpected deaths */
   signal(SIGCHLD, unexpected_death);
   /* Create, bind, and listen on sockets */
   setup_server_sockets();
   if (mypart() != 0) {
      /* Only change the base on partitions that are not the one that includes
      ** host. That partition requires same base that host session is using. */

      base = getpid();
   }
   /* Allocate shared memory */
   shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
   if (mypart() != 0) {
      memset(buffer, 0, sizeof(message) * numbuffers());
   }
   /* Allocate communications semaphores */
   init_semaphore(&msgavail, numbuffers(), 0, base+10000);
   init_semaphore(&msgfree, numbuffers(), 0, base+20000);
   /* Flush stdout and stderr before doing a fork, so child doesn't inherit */
   fflush(stdout);
   fflush(stderr);
   /* Fork PM CLIENT here */
   if ((pmserver_pid = fork()) < 0) {
      /* Can't create the PM CLIENT */
      _killcube(0, 0);
      fprintf(stderr, "PM %d SERVER: unable to create PM CLIENT process %d\n",
              mypart(), i);
      fflush(stderr);
      return -1;
   } else if (pmserver_pid == 0) {
      /* Fill in the names of the other sites in the partition structure and
      ** close the socket file desciptors that this process just inherited. */
      for (i = 0; i < num_parts; i++) {
         if (mypart() != i) {
            partition[i].name = pmsites[i];
            close(partition[i].socket);
         }
      }
      /* CALL CLIENT SUBROUTINES */
      setup_client_sockets();
      pm_client();
   } else {
      /* SERVER: forks application children then calls pm_server subroutine */
      /* Read from pmsites array, create a comma delimited string for env */
      part_names[0] = '\0';
      for (i = 0; i < num_parts; i ++) {
         strcat(part_names, pmsites[i]);
         strcat(part_names, ",");
      }
      /* Allocate space for child pids */
      if ((children = malloc(nodes_in_part * sizeof(int))) == NULL) {
         fprintf(stderr,"PM %d SERVER: insufficient memory\n", mypart());
         fflush(stderr);
         return -1;
      }
      /* Load all nodes within this partition */
      for (i = start_node; (i < start_node + nodes_in_part); i++) {
         if ((pid = fork()) < 0) {
            /* Can't create all the children! */
            _killcube(0, 0);
            fprintf(stderr, "PM %d SERVER: unable to create node process %d\n",
                    mypart(), i);
            fflush(stderr);
            return -1;
         } else if (pid == 0) {
            /* I'm the child process */
            /* Start the node program */
            my_node = i;
            sprintf(temp, "SIM_INFO=%d,%d,%d,%d,%s",
                    base,my_node,my_group,num_nodes,part_names);
            if (putenv(temp) != 0) {
               fprintf(stderr,
                       "PM %d SERVER: Insufficient room to add env variable\n",
                       my_node);
               fflush(stderr);
               return -1;
            }
            execlp(filename,filename,NULL);
            /* If we get here, we had a problem */
            perror("execlp");
            fprintf(stderr,"PM %d SERVER: error execing node=%d file=%s
                    errno=%d\n", mypart(), my_node, filename, errno);
            fflush(stderr);
            return -1;
         } else {
            /* I'm the parent process */
            children[child_index++] = pid;
         }
      }
      /* CALL SERVER SUBROUTINE */
      pm_server();
   } /* end if PM SERVER */
}
/* setup_server_sockets -- SERVER SOCKETS- for all partitions except ourself:
**  Create a socket Bind the socket to a unique PORT id. (If the socket was
** in use in a prior iteration, it may not have been reset yet - therefore we
** loop a fixed number of times retrying.) Put a listen on socket. Put new
** socket file descriptor into our set */
static void setup_server_sockets(void)

{
   int i, j;
   struct sockaddr_in part_sock, tempaddr;
   /* Zero out the set of partition sockets */
   FD_ZERO(&node_part_set);
   FD_ZERO(&temp_set);
   for (i = 0; i < num_parts; i++) {
      /* Skip ourself */
      if (i == mypart () )
        continue;
      for (j = 0; j < NUM_TRIES; j++) {
         /* Create a SERVER socket to receive data */
         if ((partition[i].socket = socket(AF_INET, SOCK_STREAM, 0))
                    < 0) {
            fprintf(stderr, "PM %d SERVER: can't open stream socket, errno\n",
                    mypart(), errno);
            fflush(stderr);
            exit (100);
         }
         /* Bind SERVER socket to local addr so partitions can send to it */
         bzero((char*)&part_sock, sizeof(part_sock));
         part_sock.sin_family = AF_INET;
         part_sock.sin_addr.s_addr = htonl (INADDR_ANY);
         /* Create unique SERVER socket port address, up to 16 per computer */
         part_sock.sin_port = htons (PM_PORT + (mypart() << 4) + i);
         /* If socket is still in use from prev iter, keep trying to bind */
         if ((bind(partition[i].socket, &part_sock,
                     sizeof(part_sock))) < 0) {
            if ((errno == EADDRINUSE) || (errno == EINTR)) {
               /* Previous load hasn't shutdown yet, or we were interrupted. */
               close(partition[i].socket);
               sleep(2);
            } else {
               fprintf(stderr,"PM %d SERVER: can't bind local addr,
                       errno=%d\n", mypart(), errno);
               fflush(stderr);
               exit(100);
            }
         } else {
            /* It worked, exit the loop */
            break;
         }
      }
      if (j == NUM_TRIES) {
         /* Exceeded retry limit */
         fprintf(stderr,"PM %d SERVER: can't bind local addr, errno=%d\n",
                 mypart(), errno);
         fflush(stderr);
         exit(100);
      }

      /* Issue a listen for the server sockets */
      if (listen(partition[i].socket, 1) < 0) {
         fprintf(stderr,"PM %d SERVER: can't listen on %d, errno = %d\n",
                 mypart(),  partition[i].socket, errno);
         fflush(stderr);
         exit(100);
      }
      /* Set the bit for the socket file descriptor */
      FD_SET(partition[i].socket, &node_part_set);
   } /* end for setting up SERVER sockets */
}
/* pm_server -- SERVER- go into a receiving loop: Copy file desciptors to a
** temporary set. Determine how many sockets are ready to be accepted. For
** each file descriptor that is ready: Find file descriptor that is ready.
** If it is found in a partition's array of fd's then it is a base socket and
** it is "accept"ed and added to the fd set. Else it is an fd that has data to
** be received. Receive the size of the message. Loop until entire message is
** received. Clear the valid indicator bits. Inform nodes that a message has
** arrived. If a broadcast message, set everyone's valid bit, and wait until
** everyone receives it. Else verify that message belongs to a node on this
** part and set that node's valid bit, wait until it is recvd. */
static void pm_server(void)
{
   int    i, j;
   int    accept_rdy;
   int    newsockfd, templen;
   int    size, count, partial;
   char   *target;
   struct sockaddr_in tempaddr;
   /* forever, accept sockets and receive data */
   for ( ; ; ) {
      temp_set = node_part_set;
      /* Determine how many sockets are ready to be accepted */
      /* FD_SETSIZE is defined in <sys/types.h> to be 200 */
      if ((accept_rdy = select( FD_SETSIZE, &temp_set, 0, 0, 0)) == -1) {
         if (errno != 4) {
            fprintf(stderr, "PM %d SERVER: error in select, errno = %d\n",
                   mypart(), errno);
            perror( "pm select" ) ;
            fflush(stderr);

            _killcube(0,0);
            exit(-1);
         } else {
            /* We were interrupted, try again */
            continue;
         }
      }
      for (i = 1; (accept_rdy != 0) && (i < FD_SETSIZE) ; i++) {
         /* Find the file descriptor that needs servicing */
         if ( FD_ISSET( i, &temp_set)) {
            /* temporary modification */
            /* accept_rdy--; */
            accept_rdy = 0;
            /* Examine each partition's array of fd's to find ready one */
            for (j = 0; j < num_parts; j++) {
               /* Skip examining our own partition */
               if (j == mypart() )
                 continue;
               /* Since this matches our "base" socket, accept the socket */
               if (i == partition[j].socket) {
                  newsockfd = accept(partition[j].socket,
                                 (struct sockaddr_in *)&tempaddr, &templen);
                  FD_SET (newsockfd, &node_part_set);
                  /* Found "base" socket, break out of for each part loop */
                  break;
               } /* end if base socket */
            } /* end for check file descriptors in partition's array */
            /* If it wasn't a base socket, then need to receive data */
            if (j != num_parts) {
               continue;
            } else /* receive the data from the socket */ {
               /* First receive the size of the message */
               while (recv(i, &size, sizeof(size)) < 0) {
                  if (errno != 22) {
                    fprintf(stderr, "PM %d SERVER: recv size err,
                            errno=%d, fd=%d\n", mypart(),errno, i);
                     fflush(stderr);
                     _killcube(0,0);
                     exit(-1);
                  } else {
                     fprintf(stderr, "PM %d SERVER: recv size err, errno=%d,
                            fd=%d\n", mypart(), errno, i);
                     fflush(stderr);
                  }
               } /* end while recv msg */

               target = (char *) &buffer[nodes_in_part];
               count = 0;
               /* Now receive the message, it could come in pieces */
               while (count < size) {
                  if ((partial = recv(i, target, size - count)) < 0) {
                     fprintf(stderr, "PM %d SERVER: Error recvng msg;
                            errno=%d\n", mypart(),errno);
                     fflush(stderr);
                     exit(-1);
                  }
                  count += partial;
                  target += partial;
               }
               /* Make sure all valid bits are cleared */
               memset(buffer[nodes_in_part].valid,0,
                      sizeof(buffer[nodes_in_part].valid));
               /* Tell the node(s) the message is there */
               if (buffer[nodes_in_part].dnode == -1) {
                  /* Broadcast the message to nodes in this partition */
                  for (j=0; j < nodes_in_part; j++) {
                     buffer[nodes_in_part].valid[j] = 1;
                  }
                  semcall_all(msgavail,nodes_in_part, 1);
                  /* Wait until everyone receives the message */
                  semcall_one(msgfree, nodes_in_part, -nodes_in_part);
               } else {
                 if (mypart() != partof(buffer[nodes_in_part].dnode))
                                      {
                  fprintf(stderr, "PM %d SERVER: Recvd msg for node %d
                                      not this partition\n",
                             mypart(), buffer[nodes_in_part].dnode);
                     fflush(stderr);
                  } else {
                     /* Point to point to another node in same partition */
                     j = bufferof(buffer[nodes_in_part].dnode);
                     buffer[nodes_in_part].valid[j] = 1;
                     semcall_one(msgavail, j, 1);
                     /* Wait until it is received */
                     semcall_one(msgfree, nodes_in_part, -1);
                  }
               } /* endif broadcast message */
            } /* endif receiving data from this socket */
         } /* endif this socket */
      } /* endfor */
   } /* end forever receive messages on sockets */
}
/* setup_client_sockets -- Setting up CLIENT sockets- for all partitions
** except ourself: Create a socket to send data. Look up address of host,
** place in the sockaddr_in structure. Determine appropriate PORT id (needs to
** match with SERVER). (If socket was in use in a prior iteration, it may not
** have been reset yet - therefore we loop a fixed number of times retrying.).
** Issue a connect for the socket */
static void setup_client_sockets(void)
{
   int i, j;
   struct hostent *hent;
   /* Establish socket communications with other partitions */
   for (i = 0; i < num_parts; i++) {
      /* Skip ourself */
      if (i == mypart () )
        continue;
      for (j = 0; j < NUM_TRIES; j++) {
         /* Create a CLIENT socket to send data */
         partition[i].socket = socket(AF_INET, SOCK_STREAM, 0);
         /* Lookup host address and place in the socket address structure */
         memset(&partition[i].addr, 0, sizeof(struct sockaddr_in));
         partition[i].addr.sin_family = AF_INET;
         if ((hent = gethostbyname(partition[i].name))
                               == NULL) {
            fprintf(stderr,"PM %d CLIENT: No entry for %d in /etc/hosts\n",
                    mypart(), partition[i].name);
            fflush(stderr);
            exit(100);
         }
         memcpy(&partition[i].addr.sin_addr, hent->h_addr,
                                hent->h_length);
         partition[i].addr.sin_port = htons(PM_PORT + (i << 4) +
                                 mypart());
         /* Connect to the socket */
       if (connect(partition[i].socket, &partition[i].addr,
                     sizeof(struct sockaddr_in)) < 0) {
            if (errno == ECONNREFUSED) {
               /* unsuccessful connect, sleep and try again */
               sleep(3);
            } else {
               /* another error occurred, quit trying to connect */
               j = NUM_TRIES;
               break;
            }
         } else {
            /* successful connect, break out of loop */
            break;
         } /* endif connect */
      } /* endfor NUM_TRIES */
      if (j == NUM_TRIES) {

         fprintf(stderr,
                 "PM %d CLIENT: Unable to connect sock to %s, errno %d\n",
                 mypart(), partition[i].name, errno);
         fflush(stderr);
         exit(100);
      }
   } /* end for setting up CLIENT sockets */
}
/* pm_client -- The PM CLIENT process sends data to partitions. Set up client
** sockets. Send messages over the sockets: Get message. Send message (if it
** is a broadcast message send it to all partitions, if not send it to
** appropriate partition). Acknowledge sending of message. Release buffer.
** Reset next message indicator. */
static void pm_client(void)
{
   int  i, size;
   /*  CLIENT- GO INTO INFINITE SENDING LOOP */
   /* Initial setting to indicate the next message has not been selected */
   next_message = -1;
   /* Forever, wait for messages to send over socket */
   for ( ; ; ) {
      /* Get the message */
      pm_getmsg();
      /* Determine where to send the message */
      if (buffer[next_message].dnode == -1) {
         /* BROADCAST MESSAGE, SEND TO ALL PARTITIONS */
         for (i = 0; i < numparts(); i++) {
            /* Don't send broadcast to self */
            if (i == mypart () )
               continue;
            /* First send the size of the message */
            size = buffer[next_message].length;
            if (send(partition[i].socket, &size, sizeof(size),0)
                        < 0) {
               fprintf(stderr,
                       "PM %d CLIENT: send to PM %d failed, errno=%d\n",

                       mypart(), i, errno);
               fflush(stderr);
               return -1;
            }
            /* Then send the actual message */
            if (send(partition[i].socket,
                         &buffer[next_message], size, 0) < 0) {
               fprintf(stderr,
                       "PM %d CLIENT: send to PM %d failed, errno=%d\n",
                       mypart(), i, errno);
               fflush(stderr);
               return -1;
            }
         } /* endfor SEND BROADCAST TO ALL PARTITIONS */
      } else {
         /* SEND TO A SPECIFIC PARTITION */
         /* First send the size of the message */
         size = buffer[next_message].length;
         i = partof(buffer[next_message].dnode);
         if (send(partition[i].socket, &size, sizeof(size),0)
                          < 0) {
            fprintf(stderr,
                    "PM %d CLIENT: send to PM %d failed, errno=%d\n",
                    mypart(), i, errno);
            fflush(stderr);
            return -1;
         }
         /* Then send the actual message */
         if (send(partition[i].socket,
                         &buffer[next_message], size, 0) < 0) {
            fprintf(stderr,
                    "PM %d CLIENT: send to PM %d failed, errno=%d\n",
                    mypart(), i, errno);
            return -1;
         }
      }
      /* FOR BOTH BROADCAST AND REGULAR MESSAGES */
      /* acknowledge the sending of the message */
      buffer[next_message].valid[mybuffer()] = 0;
      /* release (free) the buffer */
      semcall_one(msgfree, next_message, 1);
      /* reset next_message so the next getmsg will work */
      next_message = -1;
   } /* end forever CLIENT PROCESS sending messages over socket */
}
/***** Initialization and Termination routines *****/
/* abort_prog -- Clean up in the case of an error */
static void abort_prog(void)
{
   int i;
   /* Remove the sets of semaphores */
   if (pmserver_pid != 0) {
      if (msgavail != -1) {
         semctl(msgavail, 0, IPC_RMID, 0);
         msgavail = -1;
      }
      if (msgfree != -1) {
         semctl(msgfree, 0, IPC_RMID, 0);
         msgfree = -1;
      }
   }
   /* Remove the shared memory */
   if (buffer != NULL) {
      shmdt(buffer);
      buffer = NULL;
   }
   /* Only PM SERVER process should execute this code */
   if (pmserver_pid != 0) {
      if (shmid_m != -1) {
         shmctl(shmid_m, IPC_RMID, 0);
         shmid_m = -1;
      }
   }
   /* Close the sockets */
   for (i = 0; i < num_parts; i++) {
      if (i != mypart() ) {
         close (partition[i].socket);
         partition[i].socket = 0;
      }
   }
   /* Make sure all pending output gets out */
   fflush(stdout);
   fflush(stderr);
}
/* Handle termination signals */
void sig_terminate(int sig, int code, struct sigcontext *scp)
{
   int i;
   /* Send termination signal to each of PM SERVER's children */

   if (pmserver_pid != 0) {
      for (i = 0; i < child_index; i++) {
         kill(children[i], SIGTERM);
      }
      child_index = 0;
      kill(pmserver_pid, SIGTERM);
   }
   /* Clean up the use of semaphores and shared memory */
   abort_prog();
   exit(100);
}
/* Handle unexpected termination signals */
void unexpected_death(int sig, int code, struct sigcontext *scp)
{
  int statval;
  int waitpid;
  /* Only PM SERVER process should execute this code */
  if (pmserver_pid != 0) {
     waitpid = wait(&statval);
     if (waitpid < 0) {
        printf("Error determining who died unexpectedly. Errno=%d\n", errno);
     } else {
        if (WIFSIGNALED(statval) != 0) {
           printf("Process %d did not catch signal %d.\n",
                  waitpid, WTERMSIG(statval));
        } else if (WIFSTOPPED(statval) != 0) {
           printf("Process %d stopped due to signal %d.\n",
                  waitpid, WSTOPSIG(statval));
        } else if (WIFEXITED(statval) == 0) {
           /* Normal termination */
        } else {
           /* Terminated with exit code */
        }
     }
  }
  fflush(stdout);
}
/* killcube -- On abort, kill off all children on the hypercube partition */
int _killcube(int node, int pid)
{
  int i;
  int statval;
  int waitpid;

  /* Only PM SERVER process should execute this code */
  if (pmserver_pid != 0) {
     for (i = 0; i < child_index; i++) {
        kill(children[i], SIGTERM);
     }
     kill(pmserver_pid, SIGTERM);
     for (i = 0; i <= child_index; i++) {
        waitpid = wait(&statval);
        if (waitpid < 0) {
           /* No more children left */
           break;
        } else {
           if (WIFSIGNALED(statval) != 0) {
              printf("Process %d did not catch signal %d.\n",
                     waitpid, WTERMSIG(statval));
           } else if (WIFSTOPPED(statval) != 0) {
              printf("Process %d stopped due to signal %d.\n",
                     waitpid, WSTOPSIG(statval));
           } else if (WIFEXITED(statval) == 0) {
              /* Normal termination */
           } else {
              /* Terminated with exit code */
           }
        }
     }
  }
  /* Clean up after ourself */
  abort_prog();
  child_index = 0;
  return 0;
}
/* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
** in this process's memory space and returns the shared memory identifier. */
static int init_shared_mem(void **pointer, int size, int key)
{
  int shmid;
  if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
    printf("init_shm: allocation of shared memory failed. Errno=%d\n",errno);
    printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
    _killcube(0,0);
    exit(-1);
  }
  *pointer = shmat(shmid, NULL, 0);
  return shmid;
}

/* init_semaphore -- Allocates a set of semaphores and initializes them */
static int init_semaphore(int *semid, int size, int value, int key)
{
   register int i;
   if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
     printf("init_sem: allocation of semaphores failed. Errno=%d\n",errno);
     printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
     _killcube(0,0);
     exit(-1);
   }
   for (i = 0; i < size; i++) {
      if (semctl(*semid, i, SETVAL, value) < 0) {
        printf("init_sem: init of semaphores failed. Errno=%d\n",errno);
        printf("      mynode=%d offset=%d value=%d\n",my_node,i,value);
        _killcube(0,0);
        exit(-1);
      }
   }
   return *semid;
}
/* semcall_all --Perform same operation on all elements of semaphore at once.*/
static int semcall_all(int semid, int size, int operation)
{
   struct sembuf sbuf[NUMBER_IN_PART+1];
   register int i;
   for (i = 0; i < size; i++) {
      sbuf[i].sem_num = i;
      sbuf[i].sem_op = operation;
      sbuf[i].sem_flg = 0;
   }
   while (semop(semid, sbuf, size) < 0) {
      /* repeat operation if interrupted */
      if (errno != EINTR) {
        printf("PM %d: Semaphore broadcast failed. Errno = %d\n",
               mypart(), errno);
        fflush(stdout);
        return -1;
      }
   }
   return 0;
}
/* semcall_one -- Perform an operation on an element of a semaphore. */
static int semcall_one(int semid, int num, int operation)
{
   struct sembuf sbuf;
   sbuf.sem_num = num;
   sbuf.sem_op = operation;
   sbuf.sem_flg = 0;
   while (semop(semid, &sbuf, 1) < 0) {
      /* repeat operation if interrupted */
      if (errno != EINTR) {
        printf("PM %d: Semaphore failed. Errno = %d\n", mypart(), errno);
        fflush(stdout);
        return -1;
      }
   }
   return 0;
}
/***** Environment Information (External and Internal) *****/
/* numnodes -- Returns the number of simulated nodes */
int numnodes(void)
{
  return num_nodes;
}
/* numparts -- number of partitions */
static int numparts(void)
{
  return num_parts;
}
/* mypart -- Partition this process is in */
static int mypart(void)
{
  return my_part;
}

/* partof -- Determines which partition a given node is a member of */
static int partof(int n)
{
  if (n == myhost()) {
    return 0;
  } else {
    return n / NUMBER_IN_PART;
  }
}
/* pm_partof -- Determines which subpartition a given node is a member of
** A -1 can be passed if a destination node is broadcast, return -1. */
static int pm_partof(int n)
{
  if (n == myhost()) {
    return 0;
  } else if (n == -1) {
    return -1;
  } else {
    return n / NUMBER_IN_PART;
  }
}
/* numbuffers -- Number of buffers in this partition */
static int numbuffers(void)
{

  if (mypart() == 0) {
    return (nodes_in_part + 2);
  } else {
    return (nodes_in_part + 1);
  }
}
/* mybuffer -- returns the index for this process's buffer */
static int mybuffer(void)
{
  return (nodes_in_part);
}

/* bufferof -- Returns the buffer offset of the given node. Host is always
** second to last buffer in partition 0. The PM is always the last buffer */
static int bufferof(int n)
{
   if (mypart() != partof(n)) {
      return nodes_in_part;             /* Return the buffer of PM */
   } else if (n == myhost()) {
      return nodes_in_part + 1;         /* This partition, buffer of host */
   } else {
      return n % NUMBER_IN_PART;        /* This partition, buffer of node */
   }
}
/* mynode -- Returns the node number for this process */
int mynode(void)
{
  return my_node;
}
/* myhost -- Returns the node number of the host */
int myhost(void)
{
  return numnodes();
}
/***** Communications *****/
/* pm_getmsg -- Wait until a message is available. This routine differs from
** getmsg, in that it checks to ensure that destination node is not in this
** partition. (Getmsg checks that current node equals destination node.)
** OUTPUT: next_message - set to the message found of the proper type */
static void pm_getmsg(void)

{
   int i;
   /* Only wait if a message is not already selected */
   if (next_message != -1) return;
   /* Wait for a message for me */
   semcall_one(msgavail, mybuffer(), -1);
   /* Search for those messages that are for me */
   for (i = 0; i < numbuffers(); i++) {
      if (buffer[i].valid[mybuffer()] != 0) {
         next_message = i;
         return;
      }
   }
}






[LISTING THREE]



/*****   simulate.c  *****/
/* These functions allow a UNIX system simulate a hypercube environment. */
#include <stdio.h>
#include <ctype.h>
#include <sys/types.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/signal.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <sys/in.h>
#include <netdb.h>
#include "cube.h"

/* Prototypes */
char *getenv(char *variable);
void *shmat(int shmid, void *shmaddr, int shmflg);
char *strtok(char *, char *);
char *strcpy(char *, char *);
void *malloc(int size);
#define min(x,y) (((x) < (y)) ? (x) : (y))
int csend(int type, void *msg, int length, int target_node, int group);
int crecv(int type, void *buf, int len);
int killcube(int, int);
int numnodes(void);
int myhost(void);
int mynode(void);
int numparts(void);
int numbuffers(void);
int mybuffer(void);
int bufferof(int node);
int mypart(void);
int partof(int node);

/* Local, Private Information */
static int num_parts;         /* number of partitions */
static int my_part;           /* partition this process is in */
static int nodes_in_part;     /* number of nodes in this partition */
static subpart *partition = NULL; /* list of partition information */
static int base;              /* base key value for allocating shared data */
static int my_node;           /* node number for this process */
static int my_group;          /* group id for this process */
                              /* There are two groups, host communications */
                              /* and inter-node communications */
static int num_nodes;         /* total number of nodes in all partitions */
static int msgavail = -1;     /* semaphores indicating message is available */
static int msgfree = -1;      /* semaphores indicating buffer is free */
static int next_message;      /* which message is to be received next */
static int shmid_m = -1;      /* id of shared area for messages */
static message *buffer = NULL;/* communication areas */
static int *children = NULL;  /* process ids of all child processes */
static int child_index = 0;   /* number of children created */

/ ** Initialization and Termination routines ** /
/* abort_prog -- Clean up when the program terminates */
void abort_prog(void)
{
   /* Remove the sets of semaphores */
   if (mynode() == myhost()) {
      if (msgavail != -1) {
         semctl(msgavail, 0, IPC_RMID, 0);
         msgavail = -1;
      }
      if (msgfree != -1) {
         semctl(msgfree, 0, IPC_RMID, 0);
         msgfree = -1;
      }
    }
   /* Remove the shared memory */
   if (buffer != NULL) {
      shmdt(buffer);
      buffer = NULL;
   }
   if (mynode() == myhost()) {
      if (shmid_m != -1) {
         shmctl(shmid_m, IPC_RMID, 0);
         shmid_m = -1;
      }
   }
   /* Make sure all pending output gets out */
   fflush(stdout);
   fflush(stderr);
}
/* Handle termination signals */
void sig_terminate(int sig, int code, struct sigcontext *scp)
{
  if (mynode() == myhost()) {
    /* Pass on the termination signal to the node processes */
    killcube(0,0);
  } else {
    /* This is executed by the node processes */
    /* Clean up the use of semaphores and shared memory */
    abort_prog();
  }
  exit(100);
}
/* Handle unexpected termination signals. Used by the host process. */
void unexpected_death(int sig, int code, struct sigcontext *scp)
{
  int statval;
  int waitpid;
  waitpid = wait(&statval);
  if (waitpid < 0) {
    printf("Error determining who died unexpectedly. Errno=%d\n", errno);
  } else {
    if (WIFSIGNALED(statval) != 0) {
      printf("Process %d did not catch signal %d.\n",
             waitpid, WTERMSIG(statval));
    } else if (WIFSTOPPED(statval) != 0) {
      printf("Process %d stopped due to signal %d.\n",
             waitpid, WSTOPSIG(statval));
    } else if (WIFEXITED(statval) == 0) {
      /* Normal termination */
    } else {
      /* Terminated with exit code */
    }
  }
  fflush(stdout);
}
/* handler -- handles hypercube specific errors that do not map to UNIX. */
void handler(int type, void (*proc)())
{
  /* ignore this */
}
/* getcube -- Called by host process to gain possession of a partition in a
** hypercube. Note: Assuming getcube is only called once per host process. */
void getcube(char *cubename, char *cubetype, char *srmname, int keep,
              char *account)
{
  char size[8];
  int is_dimension = 0;
  int i;
  char *ptr;
  char *target;
  /* Pull out the requested number of nodes */
  ptr = cubetype;
  if (*ptr == 'd') {
    ptr++;
    is_dimension = 1;
  }
  target = size;
  i = 4;
  while (isdigit(*ptr) && (i-- != 0)) {
    *target++ = *ptr++;
  }

  *target = '\0';
  /* The rest of the parameters don't matter */
  /* Determine the total number of nodes */
  num_nodes = NUMBER_IN_PART; /* default size */
  sscanf(size,"%d",&num_nodes);
  if (is_dimension) {
    num_nodes = 1 << num_nodes;
  }
}
/* cubeinfo -- Passes back information about the partitions on a hypercube.
** Input: global=0 current attached cube; 1. all cubes you own and allocated
** by the current host; 2. all cubes on the system from which the command was
** executed; 3. how cubes are allocated on all SRMs; 4. 1 addition parameter
** (srmname) returns info for that SRM */
int cubeinfo(struct cubetable *ct, int numslots, int global, ...)
{
  /* returns the number of cubes for which information is available */
  /* Ignore this for now */
  return 0;
}
/* relcube -- release cube gained by the getcube call. */
void relcube(char *cubename)
{
   /* Ignore this for now */
}
/* killcube -- On abort, kill off all processes in the hypercube partition */
int killcube(int node, int pid)
{
  int i;
  int statval;
  int waitpid;
  /* Force everyone to terminate */
  for (i = 0; i < child_index; i++) {
     kill(children[i], SIGTERM);
  }

  /* Give the children a chance to terminate */
  if (child_index > 0) sleep(1);
  /* Wait for everyone to exit, check status in case */
  for (i = 0; i < child_index; i++) {
    waitpid = wait(&statval);
    if (waitpid < 0) {
      /* No more children left */
      break;
    } else {
      if (WIFSIGNALED(statval) != 0) {
        printf("Process %d did not catch signal %d.\n",
               waitpid, WTERMSIG(statval));
      } else if (WIFSTOPPED(statval) != 0) {
        printf("Process %d stopped due to signal %d.\n",
               waitpid, WSTOPSIG(statval));
      } else if (WIFEXITED(statval) == 0) {
        /* Normal termination */
      } else {
         /* Terminated with exit code */
      }
    }
  }
  /* Clean up after ourself */
  abort_prog();
  child_index = 0;
  return 0;
}
/* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
** in this process's memory space and returns the shared memory identifier. */
static int init_shared_mem(void **pointer, int size, int key)
{
  int shmid;
  if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
    printf("init: allocation of shared memory failed. Errno=%d\n",errno);
    printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
    fflush(stdout);
    sig_terminate(0,0,NULL);
  }
  *pointer = shmat(shmid, NULL, 0);
  return shmid;
}
/* init_semaphore -- Allocates a set of semaphores and initializes them */
static int init_semaphore(int *semid, int size, int value, int key)
{
   register int i;
   if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
     printf("init: allocation of semaphores failed. Errno=%d\n",errno);
     printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
     fflush(stdout);
     sig_terminate(0,0,NULL);
   }
   for (i = 0; i < size; i++) {
      if (semctl(*semid, i, SETVAL, value) < 0) {
        printf("init: initialization of semaphores failed. Errno=%d\n",errno);
        printf("      mynode=%d offset=%d value=%d\n",my_node,i,value);
        fflush(stdout);
        sig_terminate(0,0,NULL);
      }
   }
   return *semid;
}
/* semcall_all -- Perform same operation on all elements of a semaphore. */
static int semcall_all(int semid, int size, int operation)
{
   struct sembuf sbuf[NUMBER_IN_PART+1];
   register int i;
   for (i = 0; i < size; i++) {
      sbuf[i].sem_num = i;
      sbuf[i].sem_op = operation;
      sbuf[i].sem_flg = 0;
   }
   while (semop(semid, sbuf, size) < 0) {
      /* repeat operation if interrupted */
      if (errno != EINTR) {
        printf("%d: Semaphore broadcast failed. Errno = %d\n",mynode(),errno);
        abort_prog();
        exit(-1);
      }
   }
   return 0;
}
/* semcall_one -- Perform an operation on an element of a semaphore. */
static int semcall_one(int semid, int num, int operation)
{
   struct sembuf sbuf;
   sbuf.sem_num = num;
   sbuf.sem_op = operation;
   sbuf.sem_flg = 0;
   while (semop(semid, &sbuf, 1) < 0) {
      /* repeat operation if interrupted */
      if (errno != EINTR) {
        printf("%d: Semaphore failed. Errno = %d\n",mynode(), errno);
        abort_prog();
        exit(-1);
      }
   }
   return 0;
}
/* setpid -- Assigns a partition identifier to the simulated partition. */
int setpid(int id)
{
   my_group = id;
   return 0;
}
/* init_simulator -- Should be called near the beginning of an application
** before any hypercube-related functions are called. */
void init_simulator(void)
{
   register int i, pid;
   char filename[20];
   char *temp;
   static char env[256];        /* must be static */
   struct hostent *hent;
   /* parent cm will send child cm SIGINT when a CTRL-BREAK is pressed */
   signal(SIGINT,sig_terminate);
   signal(SIGTERM,sig_terminate);
   signal(SIGQUIT,sig_terminate);
   /* Pick up the base key value from the environment */
   if ((temp = getenv("SIM_INFO")) == NULL) {
     fprintf(stderr,"init_sim: Missing environment variable\n");
     fflush(stderr);
     exit(-1);
   }

   strcpy(env,temp);
   if ((temp = strtok(env,",")) == NULL) {
    fprintf(stderr, "init_sim: Missing information in environment variable\n");
    fflush(stderr);
    exit(-1);
   }
   sscanf(temp,"%d",&base);
   if ((temp = strtok(NULL,",")) == NULL) {
     fprintf(stderr,"init_sim: Missing node info in environment variable\n");
     fflush(stderr);
     exit(-1);
   }
   sscanf(temp,"%d",&my_node);
   if ((temp = strtok(NULL,",")) == NULL) {
     fprintf(stderr,"init_sim: Missing pid info in environment variable\n");
     fflush(stderr);
     exit(-1);
   }
   sscanf(temp,"%d",&my_group);
   if ((temp = strtok(NULL,",")) == NULL) {
     fprintf(stderr,"init_sim: Missing number of node info in environment
                                                                  variable\n");
     fflush(stderr);
     exit(-1);
   }
   sscanf(temp,"%d",&num_nodes);
   num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
   my_part = my_node / NUMBER_IN_PART;
   /* Calcuate the number of nodes in this and remaining partitions */
   i = numnodes() - (mypart() * NUMBER_IN_PART);
   nodes_in_part = min(NUMBER_IN_PART, i);
   /* Allocate shared memory */
   shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
   /* Allocate communications semaphores */
   init_semaphore(&msgavail, numbuffers(), 0, base+10000);

   init_semaphore(&msgfree, numbuffers(), 0, base+20000);
}
/* load -- Should be called near the beginning of a host application before any
** hypercube-related functions are called, except for getcube. It will start
** the appropriate number of PMs on the appropriate systems (as read from the
** .pmrc file.) Parent process will be node 0, which has special roles on a
** hypercube. Remaining processes will be numbered consecutively. */
int load(char *filename, int which_node, int group_id)
{
   register int i, j, pid, size;
   char *argv[20];

   char base_string[20];
   char partition_number[20];
   char group_string[20];
   char number_of_nodes[20];
   char temp[256];
   char *ptr;
   struct servent *sp;
   FILE *fd;
   /* Allocate space for child pids */
   if (children == NULL) {
      if ((children = malloc(numnodes() * sizeof(int))) == NULL) {
         fprintf(stderr,"load: insufficient memory\n");
         fflush(stderr);
         return -1;
      }
   }
   /* parent will send us SIGINT when CTRL-BREAK is pressed */
   signal(SIGINT,sig_terminate);
   signal(SIGTERM,sig_terminate);
   signal(SIGQUIT,sig_terminate);
   signal(SIGCHLD, unexpected_death);
   base = getpid();
   num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
   if (partition == NULL) {
      if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
         fprintf(stderr,"load: insufficient memory\n");
         fflush(stderr);
         return -1;
      }
      memset(partition, 0, num_parts * sizeof(subpart));
      if ((fd = fopen(".pmrc","r")) == NULL) {
         fprintf(stderr,"load: Missing configuration file \".pmrc\"\n");
         fflush(stderr);
         return -1;
      }
      for (i = 0; i < num_parts; i++) {
         temp[0] = '\0';
         fscanf(fd," %[^ \n] \n",temp);
         size = strlen(temp);
         if ((ptr = malloc(size+1)) == NULL) {
            fprintf(stderr,"load: Insufficent memory\n");
            fflush(stderr);
            return -1;
         }
         strcpy(ptr,temp);
         partition[i].name = ptr;
      }
      fclose(fd);
   }

   /* Host program's node number is the same as the number of nodes */
   my_node = numnodes();
   my_part = 0;
   /* Calcuate the number of nodes in this and remaining partitions */
   i = numnodes() - (mypart() * NUMBER_IN_PART);
   nodes_in_part = min(NUMBER_IN_PART, i);
   /* Allocate shared memory */
   if (shmid_m == -1) {
      shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(),base);
   }
   memset(buffer,0,sizeof(message) * numbuffers());
   /* Allocate communications semaphores */
   if (msgavail == -1) {
      init_semaphore(&msgavail, numbuffers(), 0, base+10000);
   }
   if (msgfree == -1) {
      init_semaphore(&msgfree, numbuffers(), 0, base+20000);
   }
   /* Split into node processes */
   fflush(stdout);
   fflush(stderr);
   /* Start the local and remote Partition Managers */
   for (i = 0; i < num_parts; i++) {
     if ((pid = fork()) < 0) {
       /* Can't create all the children! */
       killcube(0,0);
       fprintf(stderr, "LOAD: unable to create Partition Managers\n");
       return -1;
     } else if (pid == 0) {
        /* I'm the child process */
        my_node = -1;
        /* Start the Partition Managers */
        if (i == 0) {
           argv[0] = "pm";
           argv[1] = filename;
           sprintf(partition_number, "%d", i);
           argv[2] = partition_number;
           sprintf(base_string,"%d", base);
           argv[3] = base_string;
           sprintf(group_string, "%d", group_id);
           argv[4] = group_string;
           sprintf(number_of_nodes, "%d", numnodes());
           argv[5] = number_of_nodes;
           for (i = 0; i < num_parts; i++) {
              argv[i+6] = partition[i].name;
           }
           argv[i+6] = NULL;
           execvp("pm",argv);
           /* If we get here, we had a problem */
           printf("execvp of PM 0 failed. errno=%d\n",errno);
           fflush(stdout);
           exit(-1);
        } else {
           argv[0] = "rsh";
           argv[1] = partition[i].name;
           argv[2] = "pm";
           argv[3] = filename;
           sprintf(partition_number, "%d", i);
           argv[4] = partition_number;
           sprintf(base_string,"%d", base);
           argv[5] = base_string;
           sprintf(group_string, "%d", group_id);
           argv[6] = group_string;
           sprintf(number_of_nodes, "%d", numnodes());
           argv[7] = number_of_nodes;
           for (i = 0; i < num_parts; i++) {
              argv[i+8] = partition[i].name;
           }
           argv[i+8] = NULL;
           execvp("rsh",argv);
           /* If we get here, we had a problem */
           printf("execvp of PM 0 failed. errno=%d\n",errno);
           fflush(stdout);
           exit(-1);
        }
     } else {
       /* I'm the parent process */
       children[child_index++] = pid;
     }
   }
}
/** Environment Information (External and Internal) **/
/* availmem -- returns amount of memory available */
int availmem(void)
{
  return 0;
}
/* nodedim -- Returns the dimension of the simulated hypercube */
int nodedim(void)
{
  unsigned int i, temp;
  temp = num_nodes;
  i = 0;
  while (temp != 0) {
    temp >> 1;
    i++;
  }
  return i;
}
/* numnodes -- Returns the number of simulated nodes */
int numnodes(void)
{
  return num_nodes;
}
/* numparts -- number of simulator partitions */
static int numparts(void)
{
  return num_parts;
}
/* mypart --  Simulator partition this process is in */
static int mypart(void)
{
  return my_part;
}
/* partof -- Determines which simulator partition a given node is member of */
static int partof(int n)
{
  if (n == myhost()) {
    return 0;
  } else {

    return n / NUMBER_IN_PART;
  }
}
/* numbuffers -- Number of buffers in this simulator partition */
static int numbuffers(void)
{
  if (mypart() == 0) {
    return nodes&us.in&us.part + 2;
  } else {
    return nodes&us.in&us.part + 1;
  }
}
/* mybuffer -- returns the index for this process's buffer */
static int mybuffer(void)
{
  if (mynode() == myhost()) {
    return nodes&us.in&us.part+1;
  } else {
    return mynode() % NUMBER_IN_PART;
  }
}
/* bufferof -- Returns the buffer offset of the given node. The host is always
** the last buffer in partition 0. The PM is always second to last buffer */
static int bufferof(int n)
{
   if (mypart() != partof(n)) {
      return nodes_in_part;             /* Return the buffer of PM */
   } else if (n == myhost()) {
      return nodes_in_part + 1;         /* This partition, buffer of host */
   } else {
      return n % NUMBER_IN_PART;        /* This partition, buffer of node */
   }
}
/* mynode -- Returns the node number for this process */

int mynode(void)
{
  return my_node;
}
/* mypid -- Returns the group number */
int mypid(void)
{
  return my_group;
}
/* myhost -- Returns the node number of the host */
int myhost(void)
{
  return numnodes();
}
/** Communications **/
/* cread -- Special read for files on hypercube's high-speed disk system. We
just issue a standard read instead. */
int cread(int fd, void *buffer, int size)
{
  return read(fd, buffer, size);
}
/* gdsum -- Sum individual elements of an array on all processes */
void gdsum(double x[], long elements,  double work[])
{
  register int i,j;
  double temp;
  if ((mybuffer()) == 0) {
     /* The first node in each partition sums the local data */
     if (nodes_in_part > 1) {
        /* Only sum when we aren't the only ones in the partition */
        for (i = 1; i < nodes_in_part; i++) {

           /* Get the next set of numbers to sum */
           crecv(-2, work, elements * sizeof(double));
           for (j = 0; j < elements; j++) {
              x[j] += work[j];
           }
        }
     }
     /* Node 0 sums for all partitions */
     if (mynode() == 0) {
        /* Only sum if there are more than one partition */
        if (numparts() > 1) {
           for (i = 1; i < numparts(); i++) {
              /* Get the next set of numbers to sum */
              crecv(-3, work, elements * sizeof(double));

              for (j = 0; j < elements; j++) {
                 x[j] += work[j];
              }
           }
        }
        /* Only broadcast if there is more than one node */
        if (nodes_in_part > 1) {
           /* Broadcast the results */
           csend(-4,x,elements * sizeof(double),-1,mypid());
        }
     } else {
        /* Each partition needs to send the partial sum to node 0 */
        csend(-3,x,elements * sizeof(double),0,mypid());
        /* Wait for the answer */
        crecv(-4,x,elements * sizeof(double));
     }
  } else {
    /* Send the data to local node to do the summation */
    csend(-2,x,elements * sizeof(double),mypart()*4,mypid());
    /* Wait for the answer */
    crecv(-4,x,elements * sizeof(double));
  }
}
/* getmsg -- Wait until a message is available. OUTPUT: next_message, set to
** the message found of the proper type */
static void getmsg(void)
{
   int i;
   /* Only wait if a message is not already selected */
   if (next_message != -1) return;

   /* Wait for a message for me */
   semcall_one(msgavail, mybuffer(), -1);
   /* Search for those messages that are for me */
   for (i = 0; i < numbuffers(); i++) {
     if (buffer[i].valid[mybuffer()] == 1) {
       if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
         next_message = i;
         return;
       }
     }
   }
}
/* cprobe -- Wait until a message of a specific type is available. OUTPUT:
** next_message, set to the message found of the proper type */
void cprobe(int type)
{
   int i,j;
   /* Make sure all pending writes in application have occured */
   fflush(stdout);
   fflush(stderr);
   /* See if a specific type was requested */
   if (type == -1) {
      getmsg();
      return;
   } else if ((next_message != -1) && (type == buffer[next_message].type)) {
      /* message was already located */
      return;
   } else {
      while (1) {
         /* Wait for a message for me */
         semcall_one(msgavail, mybuffer(), -1);
         /* Search for those messages that are for me and is the type I need */
         for (i = 0; i < numbuffers(); i++) {
           if (buffer[i].valid[mybuffer()] == 1) {
             if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
               if (buffer[i].type == type) {
                 next_message = i;
                 /* Put back all skipped messages back */
                 for (j = 0; j < numbuffers(); j++) {
                    if (buffer[j].valid[mybuffer()] == 2) {
                       buffer[j].valid[mybuffer()] = 1;
                       semcall_one(msgavail, mybuffer(), 1);
                    }

                 }
                 return;
               } else {
                  /* Mark the message so that we don't look at it again */
                  buffer[i].valid[mybuffer()] = 2;
               }
             }
           }
         }
      }
   }
}
/* infocount -- Return the length of the message that will be received. */
int infocount(void)
{
   getmsg();
   return buffer[next_message].t_length;
}
/* infonode -- Returns the node that sent the message */
int infonode(void)
{
  getmsg();
  return buffer[next_message].snode;
}
/* infopid -- Returns the group (pid) of the node that sent the message */
int infopid(void)
{
  getmsg();
  return buffer[next_message].spid;
}
/* csend -- Synchronous message sending between two nodes. If the target node
** number is -1, then the message is broadcasted to all nodes. Limitations:
** Assumes that the message buffer is free to use. In other words, if
** an asynchronous send was previously done, we assume that a msgwait
** was done to ensure the previous message reached its destination. */
int csend(int type, void *msg, int length, int target_node, int group)
{
   int i,j, sent_length = 0;
   char *source;
   i = mybuffer();
   /* Fill in the message */
   source = msg;
   buffer[i].type = type;
   buffer[i].dnode = target_node;
   buffer[i].spid = mypid();
   buffer[i].snode = mynode();
   buffer[i].t_length = length;
   while (length > 0) {
      /* Divide the message into smaller chunks */
      buffer[i].length = min(MAX_MESSAGE_SIZE, length);
      memcpy(buffer[i].msg, source, buffer[i].length);
      source += buffer[i].length;
      sent_length += buffer[i].length;
      length -= buffer[i].length;
      /* Tell the node(s) the message is there */
      if (target_node == -1) {
         /* Broadcast the message to nodes in this partition */
         /* and to the process manager */
         for (j=0; j < nodes&us.in&us.part + 1; j++) {
            buffer[i].valid[j] = 1;
         }
         semcall_all(msgavail,nodes&us.in&us.part+1, 1);
         /* Of course, we already have the message */
         semcall_one(msgavail,i, -1);
         /* Wait until everyone receives the message */
         semcall_one(msgfree, i, -nodes&us.in&us.part);
      } else {
         /* Point to point to another node */
         j = bufferof(target_node);
         buffer[i].valid[j] = 1;
         semcall_one(msgavail, j, 1);
         /* Wait until it is received */
         semcall_one(msgfree, i, -1);
      }
   }
   return sent_length;
}
/* crecv -- Synchronous message reception between two nodes. */
int crecv(int type, void *buf, int len)
{
   int recv_len = 0, copy_len = 0, temp_len, total_len;
   int recv_node;
   char *target;
   /* Get a message of this type */
   cprobe(type);
   target = buf;
   total_len = buffer[next_message].t_length;
   recv_node = buffer[next_message].snode;
   do {
      if (recv_node != buffer[next_message].snode) {
         /* Message is from another node, put off receiving */
         buffer[next_message].valid[mybuffer()] = 3;
      } else {
         /* Message is from same node, add it the previous messages */
         recv_len += buffer[next_message].length;
         temp_len = min(len - copy_len, buffer[next_message].length);
         if (temp_len > 0) {
            memcpy(target, buffer[next_message].msg, temp_len);
            target += temp_len;
         }
         copy_len += buffer[next_message].length;
         /* Acknowledge the receipt of the message */
         buffer[next_message].valid[mybuffer()] = 0;
         semcall_one(msgfree, next_message, 1);
      }
      /* Indicate that no message has been selected */
      next_message = -1;
      if (recv_len < total_len) {
         cprobe(type);
      }
   } while (recv_len < total_len);
   /* Scan buffers to restore any skipped messages */
   for (i = 0; i < numbuffers(); i++) {
      if (buffer[i].valid[mybuffer()] == 3) {
         buffer[i].valid[mybuffer()] = 1;
         semcall_one(msgavail, mybuffer(), 1);
      }
   }
   return total_len;
}
/* isend -- Asynchronous message sending between two nodes. If the target node
** number is -1, then the message is broadcasted to all nodes. Limitations:
** Assumes that the message buffer is free to use. In other words, if
** an asynchronous send was previously done, we assume that a msgwait
** was done to ensure the previous message reached its destination. */
int isend(int type, void *msg, int length, int target_node, int group)
{
   int i,j, sent_length = 0;
   char *source;
   i = mybuffer();
   buffer[i].type = type;
   buffer[i].dnode = target_node;
   buffer[i].spid = mypid();
   buffer[i].snode = mynode();
   buffer[i].t_length = length;
   while (length > 0) {
      /* Divide the message into smaller chunks */
      buffer[i].length = min(MAX_MESSAGE_SIZE, length);
      memcpy(buffer[i].msg, source, buffer[i].length);
      source += buffer[i].length;
      sent_length += buffer[i].length;
      length -= buffer[i].length;
      /* Tell the node(s) the message is there */
      if (target_node == -1) {
         /* Broadcast the message to nodes in this partition */
         /* and to the process manager */
         for (j=0; j < nodes_in_part+1; j++) {
            buffer[i].valid[j] = 1;
         }
         semcall_all(msgavail,nodes_in_part+1, 1);
         /* Of course, we already have the message */
         semcall_one(msgavail,i, -1);
         /* Wait for acknowledge on all but the last part */
         if (length > 0) {
            /* Wait until everyone receives the message */
            semcall_one(msgfree, i, -nodes_in_part);
         }
      } else {
         /* Point to point to another node */
         j = bufferof(target_node);
         buffer[i].valid[j] = 1;
         semcall_one(msgavail, j, 1);
         /* Wait for acknowledge on all but the last part */
         if (length > 0) {
            /* Wait until it is received */
            semcall_one(msgfree, i, -1);
         }
      }

   }
   /* Return which buffer needs to be waited on */
   return i;
}
/* irecv -- Asynchronous message reception between two nodes. Returns message
** identifier for acknowledging the message. */
int irecv(int type, void *buf, int len)
{
   int mid;
   int recv_len = 0, copy_len = 0, temp_len, total_len;
   char *target;
   /* Get a message of this type */
   cprobe(type);
   mid = next_message;
   target = buf;
   total_len = buffer[next_message].t_length;
   do {
      recv_len += buffer[next_message].length;
      temp_len = min(len - copy_len, buffer[next_message].length);
      if (temp_len > 0) {
         memcpy(target, buffer[next_message].msg, temp_len);
         target += temp_len;
      }
      copy_len += buffer[next_message].length;
      /* Acknowledge all but last partial message */
      if (recv_len < total_len) {
         /* Acknowledge the receipt of the message */
         buffer[next_message].valid[mybuffer()] = 0;
         semcall_one(msgfree, next_message, 1);
      }
      /* Indicate that no message has been selected */
      next_message = -1;
      if (recv_len < total_len) {
         cprobe(type);
      }
   } while (recv_len < total_len);
   return mid;
}
/* msgwait -- Wait for a message to be received by the target node(s) */
void msgwait(int mid)


{
  if (mid == mybuffer()) {
    /* Then it was a send to another node */
    if (buffer[mid].dnode == -1) {
      /* Wait for everyone to receive the message */
      semcall_all(msgfree, mid, -nodes&us.in&us.part);
    } else {
      semcall_one(msgfree, mid, -1);
    }
  } else {
    /* It was a receive from another node */
    semcall_one(msgfree, mid, 1);
  }
}
/* flushmsg -- Forces the removal of pending messages to a node */
void flushmsg(int type, int target_node, int group)
{
  /* Do nothing for now */
  fflush(stdout);
  fflush(stderr);
}
/* mclock -- Return time in milliseconds. */
unsigned long mclock(void)
{
  unsigned long current_time;
  time(¤t_time);
  return current_time * 1000;
}














Copyright © 1992, Dr. Dobb's Journal

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.