Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

Embedded Systems

Maximizing Performance of Real-time RISC Applications


JAN94: Maximizing Performance of Real-time RISC Applications

Maximizing Performance of Real-time RISC Applications

Guidelines for writing RISC-based, real-time application software

Mitchell Bunnell

Mitch is a cofounder of Lynx Real-Time Systems and can be contacted at 16780 Lark Ave., Los Gatos, CA 95030.


Although RISC processors were designed for fast computational performance rather than fast real-time performance, embedded real-time system designers can still benefit from RISC technology. One advantage of RISC processors compared to older-generation CPUs is very fast throughput--from 20 to 60 or more MIPS; floating-point computation is faster, too. A typical CISC processor, for example, runs at less than 1 megaflop; RISC processors execute at between 3 and 14 megaflops. Even though the instruction set is reduced, it's more than made up for by having instructions execute in a single-clock cycle (and at incredibly fast clock rates). Also, some RISC processors now use the extra on-chip real estate to provide special functions specifically for embedded designers, such as graphics functions for X Window terminal manufacturers. This can dramatically reduce hardware chip count.

To achieve high throughput, RISC processors support features such as pipelining, memory caching (including delayed write), and large numbers of registers or register windows. Pipelining allows the processor to perform several operations simultaneously. This is how the one cycle per instruction is achieved. Because of a RISC processor's high execution speed, a fast memory cache is used, allowing most memory operations to be executed with 0 wait states. Main memory access (with 5 or more wait states) is only done when cache misses occur. Delayed writes further lessen the chance the processor will have to wait for memory access.

Having a large number of registers allows local data to be kept in the processor without resorting to memory access. The SPARC processor, for instance, has 128 registers, partitioned into eight-register windows. Register windows allow subroutine and function calls to be performed without taking the time to save register contents to memory. Register windows and the large number of registers are what make RISC processors so fast when executing a single program: A 66-MHz PowerPC gets 50 Specmarks and 80 floating-point Specmarks; a 50-MHz SuperSparc gets 65 Specmarks and 83 floating-point Specmarks; and a 99-MHz PA-RISC processor gets 80 Specmarks and 150 floating-point Specmarks. By comparison, a 50-MHz 80486 gets 30 Specmarks and 14 floating-point Specmarks. Consequently, CISC designers are now adopting those RISC features, such as pipelines and write-back caching, that make RISC so fast.

RISC and Real Time

Real-time applications involve multiple simultaneous tasks running asynchronously. The CPU must switch execution from one task to another so that tasks begin execution after the beginning of their period (or in response to an external event) and finish before their deadlines. External events and periodic timing normally interrupt the processor, so interrupt handling must also be performed.

Analysis of a real-time system must account for the worst-case execution time of each task. The overhead of context switching from one task to another, or of having a task or interrupt preempt the current task, must also be accounted for. Context-switch time and preemption time must be added to the task's execution time for real-time analysis.

Many of the things that make RISC processors fast can hinder real-time performance. Saving and restoring a large number of processor registers or register windows can take more time than saving and restoring a few. This is part of the context switch and interrupt-preemption time. The overhead of software trap handling and pipeline saving/

restoring also makes for a longer preemption time; Figure 1(b) compares the number of instructions to deal with interrupts between a CISC (68030) and a RISC (SPARC) processor. Code implementations for CISC vs. RISC interrupt dispatch are available electronically; see "Availability," page 3. Because there are typically more context switches and interrupts in a real-time system, there are more translation look-aside buffer (TLB) reload misses. Some RISC processors reload the TLB in software. The additional overhead of loading the TLB in software vs. hardware can take its toll. In the worst case, a short yet critical section of code could take over ten times as long to execute as the best case because of TLB reloads. RISC processors are at their best when executing a single task to completion. Being constantly interrupted to run a different task can slow them down.

Predictability is another concern. It may not matter that a task usually meets its deadline easily. What matters is whether or not it always meets its deadline. Whether or not there's a memory cache miss or hit can make a big difference in RISC-instruction execution time; see Figure 1(a). The contents of the instruction pipeline can also make a difference. Program-execution time can vary in the turbulent environment of a real-time system where I/O constantly interrupts the CPU. To have more-predictable execution times, some designers turn off caching when running their application. Unfortunately, this makes most RISC processors run five to ten times slower.

Despite these difficulties, RISC can be effectively used for real-time work, but both the system-software designer and application designer must be careful. Figure 2 lists the guidelines I follow when writing real-time software for RISC processors. To illustrate how you can apply these application-design guidelines, I'll develop a packet-switch example with eight serial inputs and four serial outputs. Packets of up to 80 data bytes are sent in. These packets contain a byte describing the requested output port; they send out the requested output port with a checksum; see Listing One (page 90).

The real-time specifications for the application example are:

  • The baud rate of the incoming lines is 20,000 baud (2000 bytes per second).
  • The baud rate of the outgoing lines is 40,000 baud.
  • Total incoming packet rate to any outgoing channel is known never to exceed 4000 bytes per second. An incoming packet should generate an outgoing packet in 60 milliseconds or less for output channel 0. An incoming packet should generate an outgoing packet in 250 milliseconds or less for an output channel other than channel 0.
  • No packets should be lost.
  • There is hardware flow control on the output data lines other than channel 0. Flow on these channels could be stopped for, at most, 15 milliseconds every 100 milliseconds.
Real-time requirements should always be kept in mind because almost every aspect of application-software design affects real-time performance. For many real-time applications, a data rate, a response, or both are specified. Sometimes the response is implied. In this example, the data rate is 4000 bytes per

second for each output channel; data can come in on any input channel at up to 2000 bytes per second. The response is 60 milliseconds for channel 0. Once a packet is received, the system has no more than 60 milliseconds to completely transmit a corresponding output packet. For the other three output channels, the corresponding output packet should appear in 250 milliseconds or less from the time an input packet is received.

This is a "hard" real-time system because it isn't allowed to lose or transmit packets late. I've designed the example program to run on the RS/6000 RISC processor to meet the real-time requirements. The application program was designed to make it easy to show that it meets the real-time requirements. This was done by making the application multitasking, running it on an operating system with priority preemptive scheduling, and using rate-monotonic (RM) analysis.

Designing the Tasks

Real-time applications should be designed with the proper number of tasks--no fewer, no more. Too many tasks can waste time in extra scheduling and context-switch overhead. In RISC-based systems, it's particularly important to avoid needless context switches.

In the example, there are eight serial-input lines. While I could have used eight separate tasks to read from these channels, I'll instead rely on interrupt routines to buffer the input data. I reduced the number of context switches by having a single input task read from all input channels. This task could have been designed to wait for data on any input channel. Instead, it uses an interval timer to poll each channel every ten milliseconds. This further cuts down on context switches. The input task does as much work as possible before giving up the CPU because its period is just short enough to meet the response specification.

The input task puts each received packet in a single packet-list FIFO queue. Next, it performs a checksum on each packet and puts each packet into an output double buffer; see Figure 3. Each output buffer is emptied by a channel-output task. When the output buffer send is empty, the output task waits for the double buffer to be swapped. The input task swaps the double buffer at the configured rate for that output channel--20 milliseconds for output channel 0, 100 milliseconds for output channels 1 through 3.

Each output task writes the data from a double buffer to its associated output device. Writing the data to the device means it requests the device driver to send the data to the device. The device driver write-service routine buffers the data, and an interrupt routine writes each byte to the output serial device. I could have relied on this buffering by the driver and not had any output tasks, but there's always the chance that the buffer may be too small. By using output tasks, buffering is completely under application-software control. The output tasks are threads and share the same address space as the input task.

The eight serial input ports and the four serial output ports generate CPU interrupts. The serial ports have a FIFO--they interrupt each time four characters are received or sent. Since there's a hardware timeout, if less than four but at least one character is received in a four-character time period, an interrupt is generated. There's an interrupt from the clock every ten milliseconds. The system also has an Ethernet card for networking. We can log into the system remotely to start executing the sample program and retrieve any error messages. The network is only used by non-real-time tasks in this example.

Real-time Analysis

The packet-switch program was designed to make real-time analysis easy. First, assume that all tasks (including interrupt handlers) will complete execution for each of their cycles before the beginning of the next cycle. An input packet will be processed and put in its output double buffer within 20 milliseconds of being received. Because the rate of the input task is ten milliseconds, even a packet that just missed a poll will be handled during the next period; see Figure 4. For channel 0, the corresponding output packet will be written to the device within 20 milliseconds of being written to the double buffer. The packet will be transmitted in another 20 milliseconds, making the packet-response time 60 milliseconds in the worst case for channel 0. For the other output channels, the output task runs every 100 milliseconds. The packet response for these channels is 220 milliseconds--within the original specification of 60-millisecond response for channel 0 and 250-millisecond response for the other output channels.

The throughput requirements will be met as long as input and output buffers are large enough and all tasks meet their timing deadlines. The example program sets the buffer sizes in terms of throughput requirements and task periods. To determine that the program meets the throughput and response requirements, only an analysis showing whether all tasks meet their timing deadlines needs to be performed.

Rate-monotonic Analysis

Rate-monotonic analysis is a technique for determining if a set of tasks will always meet their timing deadlines. RM analysis is easy to apply on a set of periodic tasks where the deadline for each task is the end of its period. Each task should be scheduled according to priority preemptive scheduling; the tasks with a shorter period have higher priority. Tasks with the same period are still given different priorities. Priority preemptive scheduling means the CPU runs the highest-priority task that isn't waiting for something. A higher-priority task that finishes waiting will preempt a lower-priority task to execute, then resume the lower-priority task when it waits again.

To apply the basic RM formula, the set of tasks being analyzed must have known periods and known, bounded, worst-case execution times. Under RM theory, a set of n periodic tasks will always meet their deadlines if the conditions in Figure 5 are true. In Figure 5, Ci is the task-execution time and Ti is the task period for task i. Bi represents the time during which task i can be blocked by lower-priority tasks. This can be because they hold semaphores that either task i, or some task with higher priority than task i, needs. For a realizable system, Ci must include a context-switch time and a preemption time. Bi must include worst-case preemption disable time and interrupt-execution overhead (if the interrupt is generated by a device used only by lower-priority tasks). These execution times must be worst-case times.

To apply RM analysis to the packet-switch example, we have a set of periodic tasks with known periods, but we need worst-case execution times. To determine the worst-case execution times, I timed each code segment starting with a flushed cache, TLB, and pipeline using the RS/6000's high-resolution timer. This gave an accurate picture of how long it takes to execute the code, although it isn't the worst case because even a single cycle of execution of a task can benefit from the memory cache, instruction, data pipeline, and MMU-translation caching (the TLB). Since the application tasks are preemptive, the cache, pipeline, and TLB will be disturbed while a task is executing its cycle. This can cause longer execution times than those measured, so some application designers actually turn off the cache when running their real-time applications. We don't have to be that drastic.

The worst-case execution time (or at least a time longer than the worst-case execution time) can be accurately determined without turning off the cache. If each task restored the state of the cache and pipeline after it executed, then the measured execution time (one-shot timing starting with an empty cache and pipeline) would be the correct worst-case execution time. Each task doesn't do this; the important part of the state is restored when the lower-priority task runs after it resumes execution. The time it would take to restore the cache and pipeline couldn't be longer than the execution time of the higher-priority task. So you could double the measured execution time for the higher-priority task to use in the analysis and pretend it had restored the CPU state. Therefore, to determine the worst-case execution times, one-shot measured times are doubled. This may seem like an extreme solution, but it's usually better than disabling the cache and risking an increase in execution times by a factor of five or more.

Table 1 lists the worst-case execution and blocking times for the application tasks (running on a RS/6000 under LynxOS). The measured worst-case times have been doubled. Table 2 lists the calculations necessary to apply the RM formula for some of the tasks. According to this analysis, all tasks will complete executing all their cycles before the beginning of their next cycle; all deadlines will be met. Therefore, since the data throughput and packet response times would meet specification if all task deadlines were met, the requirements for the real-time application example are met.

Conclusion

With the proper operating system and careful coding and analyzing of application software, RISC processors can be used effectively in real-time applications. To achieve this, tasks should be designed to execute as much code as possible before relinquishing the CPU, and as few tasks as possible should be created. Task-execution times should be measured, and a real-time analysis should be performed.

For more information on rate-monotonic analysis, I recommend A Practitioner's Handbook for Real-time Analysis: A Guide to Rate Monotonic Analysis for Real-time Systems. Boston, MA: Kluwer Academic, 1993.

Figure 1: (a) Code execution time for cache miss and cache hit. Processor: RS/6000 50 MHz; cache: 0 wait state; main memory: 4 wait state. Cache-miss case measured by a one-shot run of code using the nanosecond resolution timer built into the RS/6000; (b) typical number of instructions for an interrupt dispatcher--how many instructions to dispatch and interrupt to a C handler; (c) time to save CPU context for one window and all windows. Processor: SPARC MHZ 40, machine: SparcStation 2. Time to store a single-register window vs. saving all eight-register windows. Times were measured in one-shot runs using 1-microsecond resolution time built onto the Sparcstation motherboard.

<b>(a)</b>
     Cache miss 4-Kbyte checksum          3275 microseconds
     Cache hit 4-Kbyte checksum           195 microseconds
     Cache miss <I>getpid()</I> system call     52 microseconds
     Cache hit <I>getpid()</I> system call      34 microseconds
<b>(b)</b>
     CISC (68040)      9 instructions
     RISC (SPARC)      114 instructions
<b>(c)</b>
     Single-register window     4 microseconds
     Eight-register window      30 microseconds

Figure 2: (a) RISC real-time system software guidelines; (b) RISC real-time application software guidelines.

(a)

  • Lock MMU TLB entries for interrupt handlers and high-priority tasks.
  • Optimize code for software trap handling.
  • Save as few registers as possible on system calls, interrupts, and context switches.
  • Publish performance numbers that can be used in a real-time analysis.
(b)
  • Understand your real-time requirements.
  • Design your application to use the fewest number of tasks possible.
  • Program each task to do as much work as possible before relinquishing the CPU.
  • Use threads instead of full processes for tasks when you can.
  • Perform a real-time analysis on your application using worst-case execution times.
  • Choose an operating system that follows RISC real-time system-software guidelines.

Figure 3: Packet-switch example design.

Figure 4: Packet-response timeline.

Figure 5: Under rate-monotonic theory, a set of n periodic tasks will meet their deadlines if these conditions are True.

Table 1: Worst-case execution and blocking times for application tasks. Preemption disable time is for LynxOS on an RS/6000. The treatment of the Ethernet interrupt as a blocking time (instead of a task) is allowed because of the LynxOS interrupt system.

<B>Serial-output interrupt handler</B>
number of tasks:          4
event:                    UART FIFO half empty (4 character times)
period:                   1 millisecond
priority:                 hardware (highest)
execution time:           0.040 milliseconds
blocking time:            0.065 milliseconds(OS worst-case interrupt(disable)

<b>Serial-input interrupt handler</b>
number of tasks:          8
event:                    UART FIFO full (4 character times)
period:                   2 milliseconds
priority:                 hardware (2nd highest)
execution time:           0.045 milliseconds
blocking time:            0.065 milliseconds(OS worst-case interrupt disable)

<b>Periodic-timer (clock) interrupt handler</b>
number of tasks:          1
event:                    hardware clock
period:                   10 milliseconds
priority:                 hardware (3rd highest)
execution time:           0.035 milliseconds
blocking time:            0.065 milliseconds(OS worst-case interrupt disable)

<b>Input/process task</b>
number of tasks:          1
event:                    clock timer
period:                   10 milliseconds
priority:                 very high
execution time:           1.6 milliseconds
blocking time:            0.15 milliseconds(OS worst-case preemption disable)
                          + 0.035 milliseconds (Ethernet interrupt blocking)

<b>Output task 0</b>
number of tasks:          1
event:                    output buffer switch
period:                   20 milliseconds
priority:                 high
execution time:           0.11 milliseconds
blocking time:            0.15 milliseconds(OS worst-case preemption disable)
                          + 0.035 milliseconds (Ethernet interrupt blocking)

<b>Output tasks 1--3</b>
number of tasks:          3
event:                    output buffer switch
period:                   100 milliseconds
priority:                 medium
execution time:           0.55 milliseconds
blocking time:            15 milliseconds (H.W. flow control)
                          + 0.15 milliseconds (OS worst-case preemption disable)
                          + 0.035 milliseconds (Ethernet interrupt blocking)

Table 2: Calculations for applying rate-monotonic formula.

<B>Output-interrupt 0</B>
0.04 ms/1 ms+0.065 ms/1 ms<=1(2 1/1 - 1)
4%+6.5%<=100%
10.5%<=100%
Output-interrupt 0 will meet all its deadlines.

<b>Output-interrupt 3</b>
3*0.04 ms/1 ms+0.04 ms/1 ms+0.065 ms/1 ms<=4(2 1/4 - 1)
12%+4%+6.5%<=75.7%
22.5%<=75.7%
Output-interrupt 3 will meet all its deadlines.

<b>Input-interrupt 7</b>
4*0.04 ms/1 ms+8*.045 ms/2 ms--0.065 ms/2 ms<=12(2 1/12 - 1)
16%+18%+3.25%<=71.35%
37.25%<=71.35%
Input-interrupt 7 will meet all its deadlines.

<b>Input/process task</b>
34% (I/O int.)+0.035 ms/10 ms+1.6 ms/10 ms+0.185 ms/10 ms<=13(2 1/13 - 1)
34%+18.2%<=71.2%
52.2%<=71.2%
Input/process task will meet all its deadlines.

<I>Note: Because the rate of the clock interrupt 
and the input/process task is the same, the clock 
interrupt handler and input/process task are 
treated as one task on the right-hand side 
of the equation.</I>

<b>Output-task 0</b>
34% (I/O int.)+16.4% (Input)+0.11 ms/20 ms+0.185 ms/20 ms<=14(2 1/14 - 1)
34%+16.4% +0.55%+0.925%<=71%
51.9%<=71%
Output-task 0 will meet all its deadlines.

<b>Output-task 3</b>
50.4%+0.55%+3*0.55 ms/ 100 ms+15.185 ms/100 ms<=17(2 1/17 - 1)
50.4%+0.55%+1.65%+15.19%<=70.7%
67.79%<=70.7%
Output-task 3 will meet all its deadlines.

[LISTING ONE]


/* conf.h -- tuning parameters for packet switch example */
#define OCHAN_MAX 8                 /* number of serial input channels */
#define ICHAN_MAX 8                 /* number of serial output channels */
#define IRATE_MIL_SEC 10            /* input/process task rate */
#define ORATE0_MIL_SEC 20           /* output task rate for output channel 0 */
#define ORATE_MIL_SEC 100           /* output rate for other output channels */
#define IBYTES_PER_SEC 2000         /* bytes per second on input channel */
#define MIN_PACKET_SIZE 4           /* minimum input packet size (bytes) */
#define PACK_BYTES_MAX 80           /* maximum number of data bytes/packet */
/* calculated buffer sizes based on throughput and task rates */
#define IBUFF_MAX ((IBYTES_PER_SEC*IRATE_MIL_SEC)/1000)
#define OBUFF_MAX ((IBUFF_MAX*ORATE_MIL_SEC)/IRATE_MIL_SEC)
#define PACKETS_MAX ((IBUFF_MAX*ICHAN_MAX)/MIN_PACKET_SIZE)
extern int num_ichans, num_ochans;
extern int packet_process_prio;
/* packet.h -- data structure definitions for packet switch example */
#define PACKET_START_BYTE 0xFE
/* internal structure for incoming/outgoing packets */
struct packet {
    struct packet *next;
    char checksum;
    char curr_bytes;
    char ochan;
    char nbytes;
    char data[PACK_BYTES_MAX];
};
/* structures for double buffers used for output */
struct buff {
    int nbytes;
    char data[OBUFF_MAX];
};
struct outbuff {
    int chan;
    int fd;
    int thread_id;
    int thread_priority;
    struct buff *ready_buff;
    struct buff *sending_buff;
    int sending_buff_sent;
    int new_send_sem;
    struct buff buffs[2];
};
extern struct outbuff ochans[];
extern struct packet *get_new_packet();
extern struct packet *deq_packet();
/* input.c -- code for input task of packet switch example */
#include <stdio.h>
#include <fcntl.h>
#include <time.h>
#include <signal.h>
#include "conf.h"
#include "packet.h"
/* input channel structures */

struct packet *partial_packet[ICHAN_MAX];
int input_state[ICHAN_MAX];
int infds[ICHAN_MAX];
unsigned char ibuff[IBUFF_MAX];
/* input states */
#define STATE1_NEED_STARTBYTE   0
#define STATE2_NEED_OCHAN       1
#define STATE3_NEED_BYTECOUNT   2
#define STATE4_NEED_DATA        3
/* input_packets -- read some data from incoming channel and store
 in packet structure. Enqueue these packets on the FIFO packet list. */
input_packets(chan)
int chan;
{
    int out;
    register int i;
    register struct packet *p;
    int state;
    out = read(infds[chan], ibuff, IBUFF_MAX);
    if (out <= 0)
        return out;
    p = partial_packet[chan];
    state = input_state[chan];
    for (i=0; i < out; i++) {
        switch (state) {
            case STATE1_NEED_STARTBYTE:
                if (ibuff[i] == PACKET_START_BYTE)
                    state = STATE2_NEED_OCHAN;
                break;
            case STATE2_NEED_OCHAN:
                p = get_new_packet();
                p->curr_bytes = 0;
                p->ochan = ibuff[i];
                state = STATE3_NEED_BYTECOUNT;
                break;
            case STATE3_NEED_BYTECOUNT:
                p->nbytes = ibuff[i];
                if (p->nbytes > PACK_BYTES_MAX)
                    p->nbytes = PACK_BYTES_MAX;
                state = STATE4_NEED_DATA;
                break;
            case STATE4_NEED_DATA:
                p->data[p->curr_bytes++]  = ibuff[i];
                if (p->curr_bytes >= p->nbytes) {
                    enq_packet(p);
                    state = STATE1_NEED_STARTBYTE;

                }
        }
    }
    partial_packet[chan] = p;
    input_state[chan] = state;
    return 0;
}
void handler()
{
}
/* input_task -- input thread main loop. Read incoming data from all channels
and seperate into packets. Perform checksum on these packets and put packets
in output double buffer for the output task to take care of. */
input_task()
{
    int chan;
    static struct itimerval v;
    int output_rc, output0_rc;
            /* set periodic timer for input rate */
    signal(SIGALRM, handler);
    sigblock(sigmask(SIGALRM));
    v.it_value.tv_sec = v.it_interval.tv_sec = 0;
    v.it_value.tv_usec = v.it_interval.tv_usec = IRATE_MIL_SEC*1000;
    setitimer(ITIMER_REAL, &v, (struct itimerval *)0);
    output_rc = output0_rc = 0;
    for (;;) {
        sigpause(0);
        for (chan=0; chan < num_ichans; chan++) {
            input_packets(chan);
        }
        process_packets();
        if (output0_rc++ >= (ORATE0_MIL_SEC/IRATE_MIL_SEC)) {
                switch_obuffer(0);
        }
        if (output_rc++ >= (ORATE_MIL_SEC/IRATE_MIL_SEC)) {
            output_rc = 0;
            for (chan=1; chan < num_ochans; chan++) {
                switch_obuffer(chan);
            }
        }
    }
}
/* open_input_channels -- open serial input channels */

open_input_channels()
{
    int i;
    char name[80];
    for (i=0; i < num_ichans; i++) {
        sprintf(name, "/dev/ichan%-d", i);
        infds[i] = open(name, O_RDONLY|O_NONBLOCK);
        if (infds[i] < 0) {
            fprintf(stderr, "couldn't open %s\n", name);
            exit(1);
        }
    }
}
/* main.c -- start of packet switch example */
#include <stdio.h>
#include <time.h>
#include <resource.h>
#include "conf.h"
int num_ichans, num_ochans;
int packet_process_prio;
main(argc, argv)
int argc;
char **argv;
{
    if (argc != 3) {
        fprintf(stderr, "usage %s <num in channels> <num out channels>\n");
        exit(1);
    }
    num_ichans = atoi(argv[1]);
    num_ochans = atoi(argv[2]);
    if (num_ichans <= 0 || num_ichans > ICHAN_MAX) {
        fprintf(stderr, "bad number of input channels (%d). ", num_ichans);
        fprintf(stderr, "minimum is 1 maximum is %d\n", ICHAN_MAX);
        exit(1);
    }
    if (num_ochans <= 0 || num_ochans > OCHAN_MAX) {
        fprintf(stderr, "bad number of output channels (%d). ", num_ochans);
        fprintf(stderr, "minimum is 1 maximum is %d\n", OCHAN_MAX);
        exit(1);
    }
    packet_process_prio = getpriority(PRIO_PROCESS, 0);
    open_input_channels();
    open_output_channels();
    init_output_tasks();
    init_packets();
    input_task();
}
/* output.c -- code for output double buffers and output threads */
#include <stdio.h>
#include <fcntl.h>
#include <pthread.h>
#include "conf.h"
#include "packet.h"
struct outbuff ochans[OCHAN_MAX];
/* switch_obuffer -- switch output double buffer */
switch_obuffer(chan)
int chan;
{
    struct outbuff *buffp;
    struct buff *tmp;

    buffp = &ochans[chan];
    if (!buffp->sending_buff_sent) {
        fprintf(stderr, "out of real-time. channel %d\n", chan);
        return;
    }
        /* switch buffers. double buffering */
    buffp->sending_buff_sent = 0;
    tmp = buffp->ready_buff;
    buffp->ready_buff = buffp->sending_buff;
    buffp->sending_buff = tmp;
    sem_signal(buffp->new_send_sem);
    buffp->ready_buff->nbytes = 0;
}
/* put_packet_in_buff: copy a packet (output format) to the output buffer */
put_packet_in_buff(chan, pkt)
int chan;
struct packet *pkt;
{
    struct outbuff *buffp;
    struct buff *rbuff;
    int i;

    if (chan == -1) {       /* broadcast */
        for (i=0; i < num_ochans; i++) {
            put_packet_in_buff(i, pkt);
        }
        return;
    }
    if (chan >= num_ochans) {
        return;             /* bad channel number */
    }
    buffp = &ochans[chan];
    rbuff = buffp->ready_buff;

    if (3+pkt->nbytes + rbuff->nbytes > OBUFF_MAX) {
        fprintf(stderr, "output buffer full %d\n", chan);
        return;             /* output buffer full */
    }
    rbuff->data[rbuff->nbytes++] = PACKET_START_BYTE;
    rbuff->data[rbuff->nbytes++] = pkt->nbytes;
    rbuff->data[rbuff->nbytes++] = pkt->checksum;
    bcopy(pkt->data, &rbuff->data[rbuff->nbytes], pkt->nbytes);
    rbuff->nbytes += pkt->nbytes;
    return;
}
/* output_task -- main routine for each output thread. */
output_task(buffp)
struct outbuff *buffp;
{
    int out;
    for (;;) {
        sem_wait(buffp->new_send_sem);
        out = write(buffp->fd,buffp->sending_buff->data,
                buffp->sending_buff->nbytes);
        buffp->sending_buff_sent = 1;
    }
}
/* init_dbuff -- initialize a double buffer (one per each output channel. */
init_dbuff(chan, p)
int chan;
struct outbuff *p;
{
    char name[80];
    p->chan = chan;
    p->ready_buff = &p->buffs[0];
    p->sending_buff = &p->buffs[1];
    p->sending_buff_sent = 1;
    sprintf(name, "ochan%-d", chan);
    sem_delete(name);
    p->new_send_sem = sem_get(name, 0);
    if (p->new_send_sem < 0) {
        fprintf(stderr, "could not get sem %s\n", name);
        exit(1);
    }
}
/* init_output_tasks: initialize output double buffers, create output threads*/
init_output_tasks()
{
    int i;
    int res;
    pthread_attr_t thread_attr;
    pthread_t tid;

    for (i=0; i < num_ochans; i++) {
        init_dbuff(i, &ochans[i]);
        pthread_attr_create(&thread_attr);
        thread_attr.pthread_attr_prio = packet_process_prio - 1 - i;
        res = pthread_create(&tid, thread_attr, output_task, &ochans[i]);
        if (res < 0) {
            fprintf(stderr, "could not create output thread %d\n", i);
            exit(1);
        }
    }
}
/* open_output_channels -- open serial output channels */
open_output_channels()
{
    int i;
    int fd;
    static char name[80];
    for (i=0; i < num_ichans; i++) {
        sprintf(name, "/dev/ochan%-d", i);
        ochans[i].fd = fd = open(name, O_WRONLY|O_TRUNC);
        if (fd < 0) {
            fprintf(stderr, "couldn't open %s\n", name);
            exit(1);
        }
    }
}
/* packet.c -- routines and data structures to deal with packets */
#include "conf.h"
#include "packet.h"
/* packet lists */
struct packet *packet_head, *packet_tail;
struct packet *packet_free;
struct packet packet_table[PACKETS_MAX];
/* free_packet -- put packet on free list */
free_packet(p)
struct packet *p;

{
    p->next = packet_free;
    packet_free = p;
}
/* get_new_packet -- get a free packet */
struct packet *get_new_packet()
{
    struct packet *p;
    p = packet_free;
    packet_free = p->next;
    return p;
}
/* enq_packet -- put packet on FIFO packet list */
enq_packet(p)
register struct packet *p;
{
    p->next = 0;
    if (packet_head)
        packet_tail->next = p;
    else
        packet_head = p;
    packet_tail = p;
}
/* deq_packet -- get packet form FIFO packet list */
struct packet *deq_packet()
{
    register struct packet *p;
    if (p=packet_head)
        packet_head = p->next;
    return p;
}
/* init_packets -- initialize free packet list */
init_packets()
{
    register int i;
    for (i=0; i < PACKETS_MAX; i++)

        free_packet(&packet_table[i]);
}
/* process.c -- perform processing needed on packets */
#include "conf.h"
#include "packet.h"
/* do_checksum -- perform checksum on a packet */
do_checksum(pkt)
struct packet *pkt;
{
    register int sum;
    register char *p;
    register count;
    count = sum = pkt->nbytes;
    p = pkt->data;
    while (count--) {
        sum += *p++;
    }
    pkt->checksum = sum;
}
/* process_packets -- put each packet of FIFO packet queue in output buffer */
process_packets()
{
    register struct packet *p;
    do {
        p = deq_packet();
        if (p) {
            do_checksum(p);
            put_packet_in_buff(p->ochan, p);
            free_packet(p);
        }
    } while (p);
}

End Listing


Copyright © 1994, Dr. Dobb's Journal


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.