Algorithm Alley

Group sorting algorithms are well suited for applications where recursion is either not allowed or inefficient. In this month's column, Mingfu presents an N*logN adaptive group sort algorithm.


March 01, 2000
URL:http://www.drdobbs.com/database/algorithm-alley/184404045

Mar00: Algorithm Alley

Group sorting algorithms are well-suited applications where recursion is either not allowed or inefficient. In group sorting algorithms (such as the Shell Sort; see "A High-speed Sorting Procedure," by Donald L. Shell, Communications of the ACM, July, 1959), individual groups are sorted for each of a given group number. As the group number decreases to 1, the array becomes fully sorted. Two issues can affect the efficiency of group sorting routines:

To improve the efficiency of group sorting, the Adaptive Group Sort (AGS) algorithm (see Listing One) uses an "approaching sorting" methodology to improve sorting efficiency. AGS is an O(N*logN) sorting algorithm with O(1) auxiliary space requirements. Theoretically, AGS's flat structure makes it easier to be optimized than, say, QuickSort, and is ideal for many embedded-system applications. When running against the IBM AIX system call qsort(), for instance, AGS is about 2 times faster than qsort() when compiled with optimizing turned on (it is about 20 percent slower than qsort() when compiled without optimization). Listing One implements the AGS algorithm for an array[] with size N and the group number decreasing rate (GNDR)=N1/N2 (N2>N1). In this case, for each given numGroup in gsort1(), all groups go through a bidirectional bubble sort once. After this bidirectional processing, each group approaches being a sorted group, but may not be fully sorted.

Because each group may not be fully sorted as the group number decreases to 1, the array itself may not be fully sorted. Consequently, you may need to repeatedly loop through the group sort until array[] is fully sorted. In gsort1(), isSortedArray() is called after each group sort loop to make sure that gsort1() produces a fully sorted array.

Clearly, the running time of gsort1() with returned value gLoop is less than optimal. For instance, in Example 1, where gLoop is the number of gsort loops, the source of inefficiency is the overhead caused by isSortedArray() the group sort running time. Since gLoop depends on GNDR being in a reasonable range, gLoop will be a limited number.

After the first AGS loop in gsort1(), array[] is closer to being fully sorted. Consequently, it is not necessary for numGroup to start from N after the first loop. In gsort2() (see Listing Two), however, maxNumGroup is set to log(N) after the first AGS loop. Also, the variables divider and oldDivider calculate numGroup so that gsort2() can work for any given N.

A preprocess for loop is used to reduce the number of swaps in gsort2() with the cost of O(N). Furthermore, isSortedArray(array,N) is called before the AGS loop, which results in O(N) running time for some arrays. Example 2 is gsort2() running time with the returned value of gLoop.

In the worst-case bidirectional bubble-sort loop, an element may swap only once to the correct direction. This gives GNDR a low boundary for AGS in gsort1() and gsort2(). GNDR=1/2 is excluded from Example 3 because of regrouping. The upper boundary of N1/N2 in Example 3 is set to 2/3 because the running time both in Examples 1 and 2 is proportional of 1/log(N2/N1), although N1/N2 can be any value between 1/2 and 1 for gsort2().

Table 1 lists the statistic test results of gsort2() for different GNDRs and array sizes. Listing Five is the program that performed this analysis. Each N and GNDR were tested with 100 random data series. As Table 1 shows, gsort2() gives consistent N*logN comparing and swapping costs. However, AGS has the minimum swapping cost for GNDR=2/3. As GNDR decreases, comparing cost K decreases and swapping cost S increases. Based on the data swapping cost, you can optimize GNDR to get the minimum sorting time.

gsort2() efficiency can be improved by resetting GNDR to 2/3 when step < log(N) at the first AGS loop; see gsort3() in Listing Three. Table 2 shows gsort3() test results. Comparing Table 1 with Table 2, you can see a consistent improvement of swapping costs for all GNDR<2/3. Comparing cost K depends on gLoop improvement.

In short, the GNDR parameter gives choices for optimizing AGS for certain data constraints. GNDR=2/3 is suitable for data with high swapping costs, while GNDR =5/9 is good for general uses. It's hard to give a straight answer about the worst running time for AGS. The best estimation about AGS is between O(N*logN) and O(N*logN*logN). Listing Four is a template version of gsort3(), which is more general but could run much slower compared to a similar C/C++ version.

Acknowledgment

Special thanks to Dr. Gary Laison of Saint Joseph's University for the discussions on sorting algorithms.

DDJ

Listing One

// adaptive group sort - 1: running time <= gLoop*N/2 + 
//                                     2*gLoop*N*log(N)/log(N2/N1).
long gsort1(long *array, long N, long N1, long N2) {
       long numGroup; // number of groups
       long gLoop = 0; // number of AGS loops
       do { // AGS loop
         numGroups = N ;
         while (numGroups > 1) { // group-sort loop
           // update numGroup
           numGroup = numGroup * N1 / N2;
           for (i = 0; i < N-numGroup; i++) { //bidirectional bubble-sort loop
              cmprSwap2(array[i], array[i+numGroup]);
              cmprSwap2(array[N-1-i-numGroup], array[N-1-i]);
           }
        }
        gLoop++;
      } while (!isSortedArray(array, N));
      return gLoop;
}

Back to Article

Listing Two

//adaptive group sort - 2:  running time <= (gLoop+2)*N/2 + 
//              2*N*log(N)/log(N2/N1) + 2*(gLoop-1)*N*log(log(N))/log(N2/N1).
long gsort2(long *array, long N, long N1, long N2) {
       long maxNumGroup; // maximum group number
       long numGroup; // number of groups
       long divider;  // to calculate numGroup
       long oldDivider; // to calculate numGroup
       long gLoop = 0; // number of AGS loops
       long i;
       // pre-process to reduce number of swaps;
       maxNumGroup = N / 2;
       for ( i = 0; i < maxNumGroup; i++ ) {
           cmprSwap2(array[i], array[N-1-i]);
       }
       maxNumGroup = N;
       while (!isSortedArray(array, N)) { // AGS loop
            gLoop++;
            numGroups = maxNumGroup;
            oldDivider = divider = 1;
            while (numGroups > 1) { // group-sort loop
                // update numGroup
                divider = divider * N2 / N1;
                if (divider == oldDivider) divider++;
                oldDivider = divider;
                numGroup = maxNumGroup / divider;
                if (0 == numGroup) numGroup = 1;
                for (i = 0; i < N - numGroup; i++) { // bidirectional 
                                                     //    bubble-sort loop
                    cmprSwap2(array[i], array[i+numGroup]);
                    cmprSwap2(array[N-1-i-numGroup], array[N-1-i]);
                }
           }
           maxNumGroup = log(N); //reset maxNumGroup;
      }
      return gLoop;
}

Back to Article

Listing Three

//adaptive group sort - 3
long gsort2(long *array, long N, long N1, long N2) {
       long maxNumGroup; // maximum group number
       long numGroup; // number of groups
       long divider;  // to calculate numGroup
       long oldDivider; // to calculate numGroup
       long gLoop = 0; // number of AGS loops
       long i;
       // pre-process to reduce number of swaps;
       maxNumGroup = N / 2;
       for ( i = 0; i < maxNumGroup; i++ ) {
           cmprSwap2(array[i], array[N-1-i]);
       }
       maxNumGroup = N;
       while (!isSortedArray(array, N)) { // AGS loop
            gLoop++;
            numGroups = maxNumGroup;
            oldDivider = divider = 1;
            while (numGroups > 1) { // group-sort loop
                // update numGroup
                divider = divider * N2 / N1;
                if (divider == oldDivider) divider++;
                oldDivider = divider;
                numGroup = maxNumGroup / divider;
                if (0 == numGroup) numGroup = 1;
                if (numGroup < 2*log(N)) { //reset GNDR
                      N1 = 2; N2 = 3;
                }
                for (i = 0; i < N - numGroup; i++) { // bidirectional 
                                                     //    bubble-sort loop
                    cmprSwap2(array[i], array[i+numGroup]);
                    cmprSwap2(array[N-1-i-numGroup], array[N-1-i]);
                }
           }
           maxNumGroup = log(N); //reset maxNumGroup;
      }
      return gLoop;
}

Back to Article

Listing Four

// similar interface as qsort(); Depends on compiler, template normally
//                                    slower than simular C/C++ functions.
template <class T> void gsort(T *array, int arraySize, 
                            int (*ComparisonPointer)(const T*, const T*))
{
    T tmp;
    int maxStep;         // maximum group number
    int step;            // number of groups
    int divider;         // to calculate step
    int oldDivider;      // to calculate step
    int logN;
    int i, j;
    int N1, N2;
    boolean firstSet = TRUE;

    if (NULL == array || 1 >= arraySize ) return;
    if (sizeof(T) >= 8) { //set GNDR by swaping cost.
        N1 = 2; N2 = 3;
    } else {
        N1 = 5; N2 = 9;
    }
    // pre-processing;
    maxStep = arraySize / 2;
    for (i = 0, j = arraySize-1; i < maxStep; i++, j--) {
        if (ComparisonPointer(&array[i], &array[j]) > 0) {
          tmp = array[i];
          array[i] = array[j];
          array[j] = tmp;
        }
    }
    if (2 == arraySize)  return;

    logN = (int) (log((double)arraySize));
    if (logN < 2) logN = 2;

    maxStep = arraySize;
    while (TRUE) { //AGS loop;
        for (i = arraySize - 2; i >= 0; i--) { //isSortedArray();
          if (ComparisonPointer(&array[i], &array[i+1]) > 0) break;
        }
        if (0 >= i) break; // is sorted array;

        step = maxStep;
        oldDivider = divider = 1;
        while (step > 1) { // group-sort loop;
          divider = (divider * N2) / N1;
          if (divider == oldDivider) divider++;
          oldDivider = divider;
          step = maxStep / divider;
          if (firstSet && step <= 2*logN) { // reset GNDR = 2/3
            firstSet = FALSE;
            N1 = 2; N2 = 3;
          }
          if (0 == step) step = 1;
          for (i = 0, j = arraySize-1; j >= step; i++, j--)  
                                // bidirection bubble-sort for each group;
            if (ComparisonPointer(&array[i], &array[i+step]) > 0) {
              tmp = array[i];
              array[i] = array[i+step];
              array[i+step] = tmp;
            }
            if (ComparisonPointer(&array[j-step], &array[j]) > 0) {
              tmp = array[j-step];
              array[j-step] = array[j];
              array[j] = tmp;
            }
          }
        }
        maxStep = logN;
    }
}

Back to Article

Listing Five

//################################################################
// Test program for Adaptive Group Sort (AGS) algorithm statistic analysis.
// 05/04/98 M. Gong - initial code.
//################################################################
long gsort2(long *array, long arraySize, long n1, long n2);
long gsort3(long *array, long arraySize, long n1, long n2);
//
static long numCmpr, numSwap;
//
static short cmprSwap2(long &smaller, long &larger);
static short isSortedArray(long *array, long arraySize);
static void biDirectionGroupSort(long *array, 
                               long arraySize, long step, long loops);
//for test
static void initArray(long *array, long N);
static void swap(long &l1, long &l2);
int main(int argc, char **argv);
//
int main(int argc, char **argv)
{
    long minloop, maxloop, avrloop;
    long mincmpr, maxcmpr;
    long minswap, maxswap;
    double avrswap, avrcmpr;
    long gsortLoops;
    double avrf, minf, maxf;
    double loopf;
    double nlogn;
    double logN;
    long n1, n2;
    long seed1, seed2, arraySize, n;
    long *array = NULL;

    if (argc <6) {
            printf("usage: %s arraySize seedStart seedEnd n1 n2\n", argv[0]);
            return FALSE;
    }
    arraySize = atoi(argv[1]);
    seed1 = atoi(argv[2]);
    seed2 = atoi(argv[3]);
    n1 = atoi(argv[4]);
    n2 = atoi(argv[5]);
    logN = log((double)arraySize) / log(2.0);

    if (seed1 > seed2) swap(seed1, seed2);
    printf("****** adaptive group sort: numCmpr = K * N * log2(N); 
                                        numSwap = S * N * log2(N);\n");
    printf(" seed1 = %d; seed2 = %d; N = %d; n1 = %d; n2 = %d; 
              log2(N) = %.3f;\n", seed1, seed2, arraySize, n1, n2, logN);
    minloop = 0x7FFFFFFF; maxloop = 0; avrloop = 0;
    mincmpr = 0x7FFFFFFF; maxcmpr = 0, avrcmpr = 0;
    minswap = 0x7FFFFFFF; maxswap = 0; avrswap = 0;
    array = new long[arraySize];
    if (NULL == array) return FALSE;

    for(n = seed1; n <= seed2; n++) {
            numCmpr = 0; numSwap = 0;
            srand(n);
            initArray(array,arraySize);
            gsortLoops = gsort2((long*)array, arraySize, n1, n2);

            if (minloop > gsortLoops) minloop = gsortLoops;
            if (maxloop < gsortLoops) maxloop = gsortLoops;
            avrloop += gsortLoops;
            if (mincmpr > numCmpr) mincmpr = numCmpr;
            if (maxcmpr < numCmpr) maxcmpr = numCmpr;
            avrcmpr += ((float) numCmpr) / (float)(seed2 - seed1 +1);
            if (minswap > numSwap) minswap = numSwap;
            if (maxswap < numSwap) maxswap = numSwap;
            avrswap += ((float) numSwap) / (float)(seed2 - seed1 +1);
    }
    loopf = (double) (seed2 - seed1 +1);
    nlogn = logN * ((double)arraySize);

    avrf= ((double)avrloop) / loopf;
    printf("\tmin : max : average loop = %d : %d : 
                                     %.3f\n", minloop, maxloop, avrf);
    avrf = avrcmpr / nlogn;
    minf = ((double)mincmpr) / nlogn;
    maxf = ((double)maxcmpr) / nlogn;
    printf("\tmin : max : average K = %.3f : %.3f : 
                                      %.3f\n", minf, maxf, avrf);
    avrf = avrswap / nlogn;
    minf = ((double)minswap) / nlogn;
    maxf = ((double)maxswap) / nlogn;
    printf("\tmin : max : average S = %.3f : %.3f : 
                                      %.3f\n", minf, maxf, avrf);
    delete [] array;
    return TRUE;
}
//
static void initArray(long *array, long N)
{
    long i;
    for (i = 0; i < N; i++) {
            array[i] = (long) rand();
    }
}
//
static void swap(long &l1, long &l2)
{
    long tmp = l1;
    l1 = l2; l2 = tmp;
}
//reset maxStep = logN after the first AGS loop;
long gsort2(long *array, long arraySize, long N1, long N2)
{
    long maxStep;    // maximum group number
    long step;       // number of groups
    long divider;    // to calculate step
    long oldDivider; // to calculate step
    long i;
    long gsortloops = 0;
    long logN;

    if (1 >= arraySize) return 0;
    if (2 == arraySize) {
            cmprSwap2(array[0], array[1]);
            return 0;
    }
    if (N1 > N2) swap(N1, N2);
    else if (N1 == N2) {
            N1 = 2; N2 = 3;
    }
    maxStep = arraySize / 2;
    for (i = 0; i < maxStep; i++) { // pre-processing
            cmprSwap2(array[i], array[arraySize-1-i]);
    }
    logN = (long) (log((double)arraySize));

    maxStep = arraySize;
    while (!isSortedArray(array, arraySize)) { // AGS Loop
        step = maxStep;
        oldDivider = divider = 1;
        while (step > 1) { // group-sort loop
                divider = (divider * N2) / N1;
                if (divider == oldDivider) divider++;
                oldDivider = divider;
                step = maxStep / divider;
                if (0 == step) step = 1;
                biDirectionGroupSort(array, arraySize, step, 0);
        }
        gsortloops++;
        maxStep = logN;
        if (maxStep < 2) maxStep = 2;
    }
    return gsortloops;
}
//reset maxStep = logN after the first AGS loop;
//reset GNDR to 2/3 when step < logN;
long gsort3(long *array, long arraySize, long N1, long N2)
{
    long maxStep; // maximum group number
    long step; // number of groups
    long divider; // to calculate step
    long oldDivider; // to calculate step
    long i;
    long gsortloops = 0;
    long logN;

    if (1 >= arraySize) return 0;
    if (2 == arraySize) {
            cmprSwap2(array[0], array[1]);
            return 0;
    }
    if (N1 > N2) swap(N1, N2);
    else if (N1 == N2) {
            N1 = 2; N2 = 3;
    }
    maxStep = arraySize / 2;
    for (i = 0; i < maxStep; i++) { // pre-processing
            cmprSwap2(array[i], array[arraySize-1-i]);
    }
    logN = (long) (log((double)arraySize));

    maxStep = arraySize;
    while (!isSortedArray(array, arraySize)) { // AGS Loop
            step = maxStep;
            oldDivider = divider = 1;
            while (step > 1) { // group-sort loop
                    divider = (divider * N2) / N1;
                    if (divider == oldDivider) divider++;
                    oldDivider = divider;
                    step = maxStep / divider;
                    if (step <= 2*logN) { // reset GNDR = 2/3
                            N1 = 2; N2 = 3;
                    }
                    if (0 == step) step = 1;
                    biDirectionGroupSort(array, arraySize, step, 0);
            }
            gsortloops++;
            maxStep = logN;
            if (maxStep < 2) maxStep = 2;
    }
    return gsortloops;
}
//bidirection bubble-sort for each group
static void biDirectionGroupSort(long *array, long arraySize, long step,
long loops)
{
    long i;
    long maxIndex, lastIndex;
    lastIndex = arraySize - 1;
    maxIndex = arraySize - step - loops;
    for (i = loops; i < maxIndex; i++) { // bi-directional group sort
            cmprSwap2(array[i], array[i+step]);
            cmprSwap2(array[lastIndex-i-step], array[lastIndex-i]);
    }
}
//return TRUE if swapped
static short cmprSwap2(long &smaller, long &larger)
{
    numCmpr++;
    if (smaller > larger) {
            long tmp = smaller;
            smaller = larger;
            larger = tmp;
            numSwap++;
            return 1;
    }
    return 0;
}
//return TRUE if array is sorted
static short isSortedArray(long *array, long arraySize)
{
    arraySize--;
    for (long i = 0; i < arraySize; i++) {
            if (cmprSwap2(array[i], array[i+1]) > 0) return 0;
    }
    return 1;
}


Back to Article

Mar00: Algorithm Alley


gsort1() running time <= gLoop*N/2 + 2*gLoop*N*log(N)/log(N2/N1)

Example 1: The source of inefficiency is the overhead caused by isSortedArray().

Mar00: Algorithm Alley


gsort2() running time <= (gLoop+2)*N/2 + 2*N*log(N)/log(N2/N1) +
2*(gLoop-1)*N*log(log(N))/log(N2/N1).

Example 2: gsort2() running time with the returned value of gLoop.

Mar00: Algorithm Alley


1/2 < GNDR <= 2/3

Example 3: GNDR=1/2 is excluded from this example because of regrouping.

Mar00: Algorithm Alley

Table 1: gsort2() test results (numCompare=K*N*log2(N); numSwap=S*N*log2(N)).

Mar00: Algorithm Alley

Table 2: gsort3() test results (numCompare=K*N*log2(N); numSwap=N*log2(N)).

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.