C++/CLI Threading: Part II

Rex continues his discussion of the C++/CLI threading implementation, focusing on shared variables during concurrent operations, thread-local storage, and interlocked operations.


November 01, 2005
URL:http://www.drdobbs.com/cpp/ccli-threading-part-ii/184402029

C++/CLI supports the ability to create multiple threads of execution within a single program. Last month, we saw how threads are created and synchronized. This month, we'll see how shared variables can be guarded against compromise during concurrent operations, learn about thread-local storage, and we'll look at interlocked operations.

Other Forms of Synchronization

We can control synchronization of threads directly by using a number of functions in classes Monitor and Thread. Listing One contains an example.

Listing One

using namespace System;
using namespace System::Threading;

int main()
{
/*1*/   MessageBuffer^ m = gcnew MessageBuffer;

/*2a*/  ProcessMessages^ pm = gcnew ProcessMessages(m);
/*2b*/  Thread^ pmt = gcnew Thread(gcnew ThreadStart(pm,
            &ProcessMessages::ProcessMessagesEntryPoint));
/*2c*/  pmt->Start();

/*3a*/  CreateMessages^ cm = gcnew CreateMessages(m);
/*3b*/  Thread^ cmt = gcnew Thread(gcnew ThreadStart(cm, 
            &CreateMessages::CreateMessagesEntryPoint));
/*3c*/  cmt->Start();

/*4*/   cmt->Join();
/*5*/   pmt->Interrupt();
/*6*/   pmt->Join();

    Console::WriteLine("Primary thread terminating");
}

public ref class MessageBuffer 
{
    String^ messageText;
public:
    void SetMessage(String^ s)
    {
/*7*/       Monitor::Enter(this);
            messageText = s;
/*8*/       Monitor::Pulse(this);
            Console::WriteLine("Set new message {0}", messageText);
            Monitor::Exit(this);
    }

    void ProcessMessages()
    {
/*9*/       Monitor::Enter(this);
            while (true)
            {
                try
                {
/*10*/              Monitor::Wait(this);
                }
            catch (ThreadInterruptedException^ e)
                {
            Console::WriteLine("ProcessMessage interrupted");
                return;
            }

            Console::WriteLine("Processed new message {0}", messageText);
        }
        Monitor::Exit(this);
    }
};

public ref class CreateMessages
{
    MessageBuffer^ msg;
public:
    CreateMessages(MessageBuffer^ m)
    {
        msg = m;
    }

    void CreateMessagesEntryPoint()
    {
        for (int i = 1; i <= 5; ++i)
        {
            msg->SetMessage(String::Concat("M-", i.ToString()));
            Thread::Sleep(2000);
        }
        Console::WriteLine("CreateMessages thread terminating");
    }
};

public ref class ProcessMessages
{
    MessageBuffer^ msg;
public:
    ProcessMessages(MessageBuffer^ m)
    {
        msg = m;
    }

    void ProcessMessagesEntryPoint()
    {
        msg->ProcessMessages();
        Console::WriteLine("ProcessMessages thread terminating");
    }
};

In case 1, a shared buffer of type MessageBuffer is created. In cases 2a, 2b, and 2c, a thread is created and started such that it processes each message placed in that buffer. Cases 3a, 3b, and 3c create and start a thread that causes a series of five messages to be put into the shared buffer for processing. The two threads are synchronized such that the processor can't process the buffer until the creator has put something there, and the creator can't put another message there until the previous one has been processed. In case 4, we wait until the creator thread has completed its work.

By the time case 5 executes, the processor thread should have processed all of the messages the creator put there, so we tell it to stop work by interrupting it using Thread::Interrupt. We then wait on that thread in case 6 by calling Thread::Join, which allows the calling thread to block itself until some other thread terminates. (Instead of waiting indefinitely, a thread can specify a maximum time that it will wait.)

The CreateMessages thread is quite straightforward. It writes five messages to the shared message buffer, waiting two seconds between each one. To suspend a thread for a given amount of time (in milliseconds), we call Thread::Sleep. A sleeping thread is resumed by the runtime environment rather than by another thread.

The ProcessMessages thread is even simpler because it has the MessageBuffer class do all its work. Class MessageBuffer's functions are synchronized because only one of them at a time can have access to the shared buffer.

The main program starts the processor thread first. As such, that thread starts executing ProcessMessages, which causes the parent object's synchronization lock to be obtained. However, it immediately runs into a call to Wait in case 10, which causes it to wait until it is told to continue; however, it also gives up its hold on the synchronization lock in the meantime, allowing the creator thread to obtain the synchronization lock and to execute SetMessage. Once that function has put the new message in the shared buffer, it calls Pulse in case 8, which allows any one thread waiting on that lock to wake up and resume operation. However, this cannot happen until SetMessage completes execution because it doesn't give up its hold on the lock until that function returns. Once that happens, the processor thread regains the lock, the wait is satisfied, and execution resumes beyond case 10. A thread can wait indefinitely or until a specified amount of time has lapsed. For completeness, the output is shown in Figure 1.

Figure 1: Output of Listing One.

Set new message M-1
Processed new message M-1
Set new message M-2
Processed new message M-2
Set new message M-3
Processed new message M-3
Set new message M-4
Processed new message M-4
Set new message M-5
Processed new message M-5
CreateMessages thread terminating
ProcessMessage interrupted
ProcessMessages thread terminating
Primary thread terminating

Note carefully that the processor thread was started before the creator thread. If they were started in the opposite order, the first message would be added, yet no processor thread would be waiting, so no processor thread is woken up. By the time the processor thread gets to its first call to Wait, it will have missed the first message and will only be woken up when the second one has been stored.

Managing Threads

By default, a thread is a foreground thread that executes until its entry-point function terminates, regardless of the life span of its parent. On the other hand, a background thread automatically terminates when its parent terminates. We configure a thread as being a background thread by setting Thread's property IsBackground. A background thread can also be made a foreground thread by the same approach.

Once a thread has been started, it is alive. We can test for this by inspecting Thread's property IsAlive. A thread can give up the rest of its CPU time slice by calling Wait with a time of zero milliseconds. A thread can get at its own Thread object via the property CurrentThread::Thread::CurrentThread.

Each thread has a priority level associated with it and this is used by the runtime environment to schedule the execution of threads. A thread's priority can be set or tested via the property Thread::Priority. Priorities range from ThreadPriority::Lowest to ThreadPriority::Highest. By default, a thread has priority ThreadPriority::Normal. Because thread scheduling varies from one implementation to another, we should not rely too heavily on priority levels as a means of controlling threads.

Volatile Fields

The type qualifier volatile tells the compiler that no single thread controls all aspects of the object to which it is applied; specifically, one or more other threads might be reading from and/or writing to this variable asynchronously. Essentially, this qualifier forces the compiler to be less aggressive when performing optimization.

Consider the code fragment in Listing Two. In the absence of volatile, case 1 could safely be ignored because we immediately overwrite the value of i in case 2; however, given the volatile qualifier, the compiler must perform both store operations.

Listing Two


    volatile int i = 0;
/*1*/   i = 10;
/*2*/   i = 20;
/*3*/   if (i < 5 || i > 10) {
        // ...
    }

    int copy = i;
/*4*/   if (copy < 5 || copy > 10) {
        // ...
    }

In case 3, the compiler must generate code to fetch the value of i twice; however, its value might change between fetches. To make sure we are testing the same value, we have to write something like case 4 instead. By storing a snapshot of i in the nonvolatile variable copy, we can safely use the value of copy multiple times, knowing that its value cannot be changing "behind the scenes." By using volatile, we can avoid explicit synchronization for certain kinds of variable access.

Thread-Local Storage

When writing a multithreaded application, it can be useful to have variables that are specific to a particular thread. For example, consider the program in Listing Three.

Listing Three

using namespace System;
using namespace System::Threading;

public ref class ThreadX
{
/*1*/   int m1;
/*2*/   static int m2 = 20;
/*3*/   [ThreadStatic] static int m3 = 30;

public:
    ThreadX()
    {
        m1 = 10;
    }
    
    void TMain()
    {
        String^ threadName = Thread::CurrentThread->Name;
        
/*4*/       Monitor::Enter(ThreadX::typeid);
        for (int i = 1; i <= 5; ++i)
        {
            ++m1;
            ++m2;
            ++m3;
        }
        Console::WriteLine("Thread {0}: m1 = {1}, m2 = {2}, m3 = {3}",
            threadName, m1, m2, m3);
        Monitor::Exit(ThreadX::typeid);
    }
};

int main()
{
/*5*/   Thread::CurrentThread->Name = "t0";

    ThreadX^ o1 = gcnew ThreadX;
    Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::TMain));
    t1->Name = "t1";

    ThreadX^ o2 = gcnew ThreadX;
    Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::TMain));
    t2->Name = "t2";

    t1->Start();
/*6*/   (gcnew ThreadX)->TMain();
    t2->Start();
    t1->Join();
    t2->Join();
}

m1 is an instance field, so each instance of type ThreadX has its own copy, and that exists for the life of its parent object. On the other hand, m2 is a class field, so there is only one occurrence of it for the class, regardless of the number of instances of that class. In theory, this field exists until the application terminates. Neither of these fields is specific to a thread. With the appropriate constructs, both kinds of fields can be accessed by multiple threads.

Simply stated, thread-local storage is memory that is owned by a particular thread, and that memory is allocated when a new thread is created, and deallocated when that thread terminates. It combines the privacy of local variables with the persistence of static variables. A field is marked as being thread-local by attaching to it the attribute ThreadStatic, as shown in case 3 of Listing Three. Being a static field, m3 can have an initializer.

Function TMain is the entry point for new threads. This function simply increments the three fields m1, m2, and m3, five times each, and prints their current value. The lock block in case 4 makes sure that no other thread can concurrently access these fields while their values are being incremented or printed.

The primary thread sets its own name to t0 in case 5, and then creates and starts two threads. It also calls TMain directly, as a regular function rather than as part of thread creation and startup. One example of the output that can result is shown in Figure 2. (The only difference between the possible outputs is the order in which the threads do their incrementing and printing.)

Figure 2: One possible output of Listing Three.

Thread t0: m1 = 15, m2 = 25, m3 = 35
Thread t1: m1 = 15, m2 = 30, m3 = 5
Thread t2: m1 = 15, m2 = 35, m3 = 5

Each of the three threads has its own instance of m1, which is initialized to 10, so it is no surprise that each has the value 15 after being incremented five times. In the case of m2, all three threads share the same variable, so that one variable is incremented 15 times.

The threads t1 and t2 go through the thread-creation process, each getting its own version of m3. However, these thread-local variables take on their default value zero, rather than the initializer 30 shown in the source code. Beware! After being incremented five times, each has the value 5. Thread t0 exhibits different behavior. As we can see, this thread was not created by the same machinery as the other two threads. As a result, its m3 does take on the explicit initial value, 30. Also note that in case 6, TMain is being called as a regular function, not as part of the creation of a new thread.

Atomicity and Interlocked Operations

Consider the following scenario: An application has multiple threads executing in parallel, with each thread having write access to some shared integer variable. Each thread simply increments that variable by 1, using ++value. That looks harmless enough; after all, this looks like an atomic operation, and on many systems, it is — at least from the point of view of a machine instruction. However, C++/CLI's execution environment does not universally guarantee this for all integer types.

To demonstrate this, the program in Listing Four has three threads, each concurrently incrementing a shared 64-bit integer variable 10 million times. It then displays that variable's final value, which, in theory, should be 30 million. The resulting application can be run in one of two modes: the default mode is unsynchronized and uses the ++ operator; the alternate mode, indicated by using a command-line argument of Y or y, uses a synchronized library increment function instead.

Listing Four

using namespace System;
using namespace System::Threading;

static bool interlocked = false;
const int maxCount = 10000000;
/*1*/   static long long value = 0;

void TMain()
{
    if (interlocked)
    {
        for (int i = 1; i <= maxCount; ++i)
        {
/*2*/           Interlocked::Increment(value);
        }
    }
    else
    {
        for (int i = 1; i <= maxCount; ++i)
        {
/*3*/           ++value;
        }
    }
}

int main(array<String^>^ argv)
{
    if (argv->Length == 1)
    {
        if (argv[0]->Equals("Y") || argv[0]->Equals("y"))
        {
            interlocked = true;
        }
    }

/*4*/   Thread^ t1 = gcnew Thread(gcnew ThreadStart(&TMain));
    Thread^ t2 = gcnew Thread(gcnew ThreadStart(&TMain));
    Thread^ t3 = gcnew Thread(gcnew ThreadStart(&TMain));

    t1->Start();
    t2->Start();
    t3->Start();
    t1->Join();
    t2->Join();
    t3->Join();

    Console::WriteLine("After {0} operations, value = {1}", 3 * maxCount, value);
}

When the standard ++ operator is used, five consecutive executions of the application resulted in the output shown in Figure 3. As we can see, the reported total falls far short of the correct answer. Simply stated, between 17 and 50 percent of the increments went unreported. When the same program was run in synchronized mode — that is, using Interlocked's Increment instead, all 30 million increments are done and reported correctly.

Figure 3: Output of Listing Four.

Output using the ++ operator

After 30000000 operations, value = 14323443
After 30000000 operations, value = 24521969
After 30000000 operations, value = 20000000
After 30000000 operations, value = 24245882
After 30000000 operations, value = 25404963


Output using Interlocked's Increment

After 30000000 operations, value = 30000000

Class Interlocked also has a Decrement function.

Exercises

To reinforce the material we've covered, perform the following activities:

  1. In Listing Four, change the type of the shared variable, value, and run the application with and without synchronization.
  2. In your implementation's documentation, carefully read the description of the Increment and Decrement functions in class Interlocked. Note that there are two sets, one for int and one for long long. Note also that there is no support for arguments of type unsigned int or unsigned long long.
  3. Carefully read the description of the other functions in Interlocked, especially Add, Exchange, and CompareExchange.
  4. The class Queue in Listing Five (available online, see link at the beginning of this article) implements a queue of strings. Modify this class so that it is thread safe; that is, provide support for multiple threads adding and/or removing nodes from the same queue at the same time. The class has two public functions:
  5.      void AddNode(String^ s);
         String^ RemoveNode();
    
    

  6. RemoveNode must not return to its caller until it has something to return; that is, it must wait indefinitely, if necessary, for a node to be added.
  7. In some other assembly, write a Main that creates three threads that each adds some fixed number of nodes, and one thread that removes nodes, all running asynchronously. Once all the adder threads have finished, have the main thread add one last string with the value "END" and wait for the remover thread to shut itself down, which it does when it sees this node. Hint: You will need to use the Wait, Pulse, and Join functions, and you might find it useful to use Sleep as well, to stagger the adder threads' actions.

Rex Jaeschke is an independent consultant, author, and seminar leader. He serves as editor of the Standards for C++/CLI, CLI, and C#. Rex can be reached at http://www.RexJaeschke.com/.

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.