Channels ▼
RSS

Parallel

PLINQ: Parallel Queries in .NET


Using MapReduce with PLINQ

MapReduce is a special pattern of reduction used on large computer systems in which data collections can be distributed across a grid of computers. MapReduce allows you to perform an action on this highly dispersed collection of data. You probably do not own a large computer system with hundreds of computer nodes. Fortunately, the MapReduce pattern is also applicable and useful in smaller systems and multicore personal computers. MapReduce transforms a collection first to an ordered key-value pair, which is then reduced (typically based on the key) to a derivative collection.

More Insights

White Papers

More >>

Reports

More >>

Webcasts

More >>

Here are the basic steps of the MapReduce pattern in PLINQ:

  1. Create or define the source, which must be a ParallelQuery type.
  2. In the map operation, map the input source to an ordered and intermediate collection.
  3. Finally, reduce the intermediate collection to the output collection.

PLINQ does not offer a direct implementation of the MapReduce pattern. Fortunately, Stephen Toub's "Parallel Programming with Microsoft .NET: Design Patterns for Decomposition and Coordination on Multicore Architectures" recommends a possible implementation of the MapReduce pattern in PLINQ. Here is the suggested implementation:

public static ParallelQuery<TResult>
  MapReduce<TSource, TMapped, TKey, Tresult>(
  this ParallelQuery<TSource> source,
  Func<TSource, IEnumerable<TMapped>> map,
  Func<TMapped, TKey> keySelector,
  Func<IGrouping<TKey, Tmapped>,
  IEnumerable<TResult
  {
    return source.SelectMany(map)
    .GroupBy(keySelector)
    .SelectMany(reduce);
  }

There are several parameters:

  • The source parameter is the source collection.
  • The map parameter is a delegate for the operation that maps the input collection to an intermediate collection.
  • The keySelector parameter is a delegate for the operation that identifies the key. The intermediate collection is grouped on this key.
  • The reduce parameter is also a delegate. It indicates where the reduction is performed — typically on the range of values associated with each key.

Here is code that reads Shakespearean sonnets and displays the word distribution, implemented with PLINQ. The source collection consists of four sonnets.

string [] files={
@”C:\shakespeare\Sonnet 1.txt”,
@”C:\shakespeare\Sonnet 2.txt”,
@”C:\shakespeare\Sonnet 3.txt”,
@”C:\shakespeare\Sonnet 4.txt”
};

Define a PLINQ query with the collection of sonnets as the source. Call the PLINQ implementation of MapReduce to create a word count and distribution from these files. The first parameter is a delegate implemented as a lambda expression. In the lambda expression, read the lines of text in each file, which are then split along delimiters into words. The next parameter identifies each word as a key. The result is an array of KeyValuePairs containing the word (key) and word count.

var counts = files.AsParallel().MapReduce(
  path => File.ReadLines(path).SelectMany(
  line => line.Split(delimiters)),
  word => word,
  group => new[] {
    new KeyValuePair<string, int>(group.Key, group.Count())
  }
);

You can now display the results.

foreach (var word in counts)
{
  Console.WriteLine(word.Key + " " + word.Value);
}

Compile and run the program. Here is the entire application.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;

namespace MapReduce
{
  static class PLINQ
  {
    public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, Tresult>(
      this ParallelQuery<TSource> source,
      Func<TSource, IEnumerable<TMapped>> map,
      Func<TMapped, TKey> keySelector,
      Func<IGrouping<TKey, Tmapped>, IEnumerable<TResult>> reduce)
    {
      return source.SelectMany(map)
      .GroupBy(keySelector)
      .SelectMany(reduce);
    }
  }

  class Program
  {
    static void Main(string[] args)
    {
      char [] delimiters={' ', ',', ';', '.'};
      string [] files={ 
        @"C:\shakespeare\Sonnet 1.txt", 
        @"C:\shakespeare\Sonnet 2.txt", 
        @"C:\shakespeare\Sonnet 3.txt", 
        @"C:\shakespeare\Sonnet 4.txt"
      };
      var counts = files.AsParallel().MapReduce(
        path => File.ReadLines(path).SelectMany(
        line => line.Split(delimiters)),
        word => word,
        group => new[] {
          new KeyValuePair<string, int>(group.Key, group.Count())
        }
      );
      foreach (var word in counts)
      {
        Console.WriteLine(word.Key + " " + word.Value);
      }
      //Console.WriteLine("Press enter to exit");
      Console.ReadLine();
    }
  }
}

Figure 9 shows the partial results from the application.


Figure 9: Partial output for the Shakespearean sonnets word-frequency application.

Wrapping Up

As an extension of LINQ, PLINQ implements data parallelism while supporting parallel queries across disparate data domains. PLINQ shares many of the same clauses and methods as LINQ. For this reason, if you are familiar with LINQ, you have the underlying knowledge of PLINQ. In most circumstances, just add the AsParallel clause to the LINQ query to transform it to PLINQ.

When iterating PLINQ results, use the ParallelQuery.ForAll method instead of the for and foreach methods. Serializing access to the results can negate some of the benefit to parallel processing.

There are several important clauses in PLINQ:

  • ParallelExecutionMode explicitly sets the PLINQ query to execute in parallel.
  • WithMergeOptions sets the buffer mode for rendering the results of a PLINQ query.
  • AsSequential mandates that subsequent LINQ clauses execute sequentially.
  • AsOrdered orders the results (but the query itself is still executed in parallel).
  • WithDegreeOfParallelism sets the number of concurrent tasks used in the PLINQ query.

Handle the AggregateException exception to catch exceptions raised in a PLINQ query. You can enumerate the AggregateExceptions.InnerExceptions attributes to inspect the underlying exceptions raised in one or more parallel tasks assigned to the PLINQ query.

You can cancel a PLINQ query by using the Cancellation model. Create an instance of the CancellationTokenSource and pass a cancellation token to the PLINQ query by using the WithCancellation method. The PLINQ query can then be canceled with the CancellationToken.Cancel method.

Reduction reduces a collection to a value. PLINQ directly supports the most common reductions, such as Sum, Average, Max, Min, and Count. For a custom reduction operation, use the ParallelEnumerate.Aggregate method.

If you want, you can implement the MapReduce pattern for PLINQ.

For more on PLINQ, see "Parallel LINQ".


This article was adapted from information in Donis Marshall's Parallel Programming with Microsoft Visual Studio 2010 Step by Step, published by Microsoft Press.


Related Reading






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video