Channels ▼
RSS

C/C++

Waiting for One-Off Events with Futures


Associating a Task with a Future

std::packaged_task<> ties a future to the result of a function call. When the function completes, the future is ready, and the associated data is the value returned by the function. This is ideal for operations that can be subdivided into a set of self-contained tasks. These tasks can be written as functions, packaged so each task is associated with a future using std::packaged_task, and then executed on separate threads. The driving function can then wait for the futures before processing the results of the subtasks. This lets you implement a facility similar to std::async, but but you get to control the threads explicitly. Maybe you want all the tasks to run sequentially on the same background thread, or you want to implement a thread pool.

The template parameter for the std::packaged_task<> class template is a function signature, like void() for a function taking no parameters with no return value or int(std::string&,double*) for a function that takes a non-const reference to a std::string and a pointer to a double and returns an int. When you construct an instance of std::packaged_task, you must pass in a function or callable object that can accept the specified parameters and that returns a type convertible to the specified return type. The types don't have to match exactly; you can construct a std::packaged_task<double(double)> from a function that takes an int and returns a float because the types are implicitly convertible.

The return type of the function signature identifies the type of the std::future<> returned from the get_future() member function, and the argument list of the function signature specifies the signature of the packaged task's function call operator. For example, a partial class definition for std::packaged_task<std::string(std::vector<char>*,int)> might look like this:

template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
  template<typename Callable>
  explicit packaged_task(Callable&& f);
  std::future<std::string> get_future();
  void operator()(std::vector<char>*,int);
};

The std::packaged_task object can be wrapped in a std::function object, passed to a std::thread as the thread function, passed to another function that requires a callable object, or even invoked directly. When the std::packaged_task is invoked as a function object, the arguments supplied to the function call operator are passed to the contained function and the return value is stored as the asynchronous result in the std::future obtained from get_future(). This means you can wrap a task in a std::packaged_task and retrieve the future before passing the std::packaged_task object elsewhere to be invoked in due course. When you need the result, you simply wait for the future to become ready.

Passing Tasks Between Threads

Many GUI frameworks require that updates to the GUI be done from specific threads, so if another thread needs to update the GUI, then it must send a message to the right thread in order to do so. std:packaged_task provides a way of doing this without requiring a custom message for each and every GUI-related activity:

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread()
{
  while(!gui_shutdown_message_received())
  {
    get_and_process_gui_message();
    std::packaged_task<void()> task;
    {
      std::lock_guard<std::mutex> lk(m);
      if(tasks.empty())
        continue;
      task=std::move(tasks.front());
      tasks.pop_front();
    }
    task();
  }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
  std::packaged_task<void()> task(f);
  std::future<void> res=task.get_future();
  std::lock_guard<std::mutex> lk(m);
  tasks.push_back(std::move(task));
  return res;
}

This code is very simple: The GUI thread (line 12) loops until a message has been received telling the GUI to shut down (line 14), repeatedly polling for messages to handle and tasks on the task queue (line 16). If there are no tasks on the queue (line 20), then it loops again; otherwise, it extracts the task from the queue (line 22), releases the lock on the queue, and runs the task (line 25). The future associated with the task will be ready when the task completes.

Posting a task on the queue is equally simple. A new packaged task is created from the supplied function (line 34), the future is obtained from that task (line 35) by calling the get_future() member function, and the task is put on the list (line 37) before the future is returned to the caller (line 38). The code that posted the message to the GUI thread can then wait for the future if it needs to know that the task has been completed, or it can discard the future if it doesn't need to know.

This example uses std::packaged_task<void()>, which wraps a function or other callable object that takes no parameters and returns void. (If it returns anything else, then the return value is discarded.) This is the simplest possible task. But of course, std::packaged_task can also be used in more complex situations. By specifying a different function signature as the template parameter, you can change the return type (and thus the type of data stored in the future's associated state) and also the argument types of the function call operator. This example could easily be extended to allow for tasks on the GUI thread to accept arguments and return values (rather than just a completion indicator) in the std::future.

What about tasks that can't be expressed as a simple function call or tasks where the result may come from more than one place? These cases are dealt with by the third way of creating a future: using a std::promise to set the value explicitly.

Making (std::)promises

When you have an application that needs to handle a lot of network connections, it's tempting to handle each connection on a separate thread. This can make network communication easier to think about and easier to program. It works well for low numbers of connections. Unfortunately, as the number of connections rises, this strategy becomes less suitable. As the number of threads grows, operating-system resources are consumed in large quantity. Context-switching becomes a concern. In the extreme case, the operating system may run out of resources for running new threads. In applications with very large numbers of network connections, it's therefore common to have a small number of threads (possibly only one) handling the connections, each thread dealing with multiple connections.

Consider one of these threads. Data packets will come in from the various connections being handled in essentially random order, and data packets will be queued to be sent in random order. In many cases, other parts of the application will be waiting for data to be sent or for a new batch of data to be received via a specific network connection.

std::promise<T> provides a means of setting a value (of type T), which can later be read through an associated std::future<T> object. A std::promise/std::future pair represents one possible mechanism for this facility; the waiting thread could block on the future, while the thread providing the data could use the promise half of the pairing to set the associated value and make the future ready.

You can obtain the std::future object associated with a given std::promise by calling the get_future() member function, just like with std::packaged_task. When the value of the promise is set the future becomes ready and can be used to retrieve the stored value. If you destroy the std::promise without setting a value, then an exception is stored instead. Later, I'll show you how these exceptions are transferred across threads.

Here is some thread-processing code to handle the situation just described. In this example, I used a std::promise<bool>/std::future<bool> pair to identify the successful transmission of a block of outgoing data. The value associated with the future is a simple success/failure flag. For incoming packets, the data associated with the future is the payload of the data packet.

#include <future>
void process_connections(connection_set& connections)
{
  while(!done(connections))
  {
    for(connection_iterator
      connection=connections.begin(), end=connections.end();
      connection!=end;
      ++connection)
    {
      if(connection->has_incoming_data())
      {
        data_packet data=connection->incoming();
        std::promise<payload_type>&p=connection->get_promise(data.id);
        p.set_value(data.payload);
      }
      if(connection->has_outgoing_data())
      {
        outgoing_packet data=connection->top_of_outgoing_queue();
        connection->send(data.payload);
        data.promise.set_value(true);
      }
    }
  }
}

The function process_connections() loops until done() returns true (line 4). Every time through the loop, it checks each connection in turn (line 6), retrieving incoming data if there is any (line 11), and sending queued outgoing data (line 17). This assumes that an incoming packet has an ID and a payload. The ID is mapped to a std::promise, perhaps by a lookup in an associative container (line 14), and the value is set to the packet's payload. For outgoing data, the packet is retrieved from the queue and sent through the connection. Once the packet has been sent, the promise associated with the outgoing data is set to true, indicating successful transmission (line 21). Whether this maps nicely to the actual network protocol depends on the protocol.

All the code up to now has completely disregarded exceptions. Although it might be nice to imagine a world in which everything works all the time, we don't live in that world. Sometimes disks fill up, sometimes what we're looking for isn't there, sometimes the network fails, and sometimes the database goes down. The C++ standard library provides a clean way to deal with exceptions in these cases and allows them to be saved as part of the associated result.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video