Using MapReduce with PLINQ
MapReduce is a special pattern of reduction used on large computer systems in which data collections can be distributed across a grid of computers. MapReduce allows you to perform an action on this highly dispersed collection of data. You probably do not own a large computer system with hundreds of computer nodes. Fortunately, the MapReduce pattern is also applicable and useful in smaller systems and multicore personal computers. MapReduce transforms a collection first to an ordered key-value pair, which is then reduced (typically based on the key) to a derivative collection.
More Insights
White Papers
- Evaluating the Performance of Shared WAN Links for Data Center Backup and Disaster Recovery
- Demystifying Unified Communications
Reports
More >>Webcasts
- Real Time Analytics: A Case Study Webinar
- Banking on Results: Turn an Avalanche of Data into Actionable Insight
Here are the basic steps of the MapReduce pattern in PLINQ:
- Create or define the source, which must be a
ParallelQuerytype. - In the
mapoperation, map the input source to an ordered and intermediate collection. - Finally, reduce the intermediate collection to the output collection.
PLINQ does not offer a direct implementation of the MapReduce pattern. Fortunately, Stephen Toub's "Parallel Programming with Microsoft .NET: Design Patterns for Decomposition and Coordination on Multicore Architectures" recommends a possible implementation of the MapReduce pattern in PLINQ. Here is the suggested implementation:
public static ParallelQuery<TResult>
MapReduce<TSource, TMapped, TKey, Tresult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, Tmapped>,
IEnumerable<TResult
{
return source.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
There are several parameters:
- The
sourceparameter is the source collection. - The
mapparameter is a delegate for the operation that maps the input collection to an intermediate collection. - The
keySelectorparameter is a delegate for the operation that identifies the key. The intermediate collection is grouped on this key. - The
reduceparameter is also a delegate. It indicates where the reduction is performed typically on the range of values associated with each key.
Here is code that reads Shakespearean sonnets and displays the word distribution, implemented with PLINQ. The source collection consists of four sonnets.
string [] files={
@”C:\shakespeare\Sonnet 1.txt”,
@”C:\shakespeare\Sonnet 2.txt”,
@”C:\shakespeare\Sonnet 3.txt”,
@”C:\shakespeare\Sonnet 4.txt”
};
Define a PLINQ query with the collection of sonnets as the source. Call the PLINQ implementation of MapReduce to create a word count and distribution from these files. The first parameter is a delegate implemented as a lambda expression. In the lambda expression, read the lines of text in each file, which are then split along delimiters into words. The next parameter identifies each word as a key. The result is an array of KeyValuePairs containing the word (key) and word count.
var counts = files.AsParallel().MapReduce(
path => File.ReadLines(path).SelectMany(
line => line.Split(delimiters)),
word => word,
group => new[] {
new KeyValuePair<string, int>(group.Key, group.Count())
}
);
You can now display the results.
foreach (var word in counts)
{
Console.WriteLine(word.Key + " " + word.Value);
}
Compile and run the program. Here is the entire application.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
namespace MapReduce
{
static class PLINQ
{
public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, Tresult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, Tmapped>, IEnumerable<TResult>> reduce)
{
return source.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
}
class Program
{
static void Main(string[] args)
{
char [] delimiters={' ', ',', ';', '.'};
string [] files={
@"C:\shakespeare\Sonnet 1.txt",
@"C:\shakespeare\Sonnet 2.txt",
@"C:\shakespeare\Sonnet 3.txt",
@"C:\shakespeare\Sonnet 4.txt"
};
var counts = files.AsParallel().MapReduce(
path => File.ReadLines(path).SelectMany(
line => line.Split(delimiters)),
word => word,
group => new[] {
new KeyValuePair<string, int>(group.Key, group.Count())
}
);
foreach (var word in counts)
{
Console.WriteLine(word.Key + " " + word.Value);
}
//Console.WriteLine("Press enter to exit");
Console.ReadLine();
}
}
}
Figure 9 shows the partial results from the application.
Figure 9: Partial output for the Shakespearean sonnets word-frequency application.
Wrapping Up
As an extension of LINQ, PLINQ implements data parallelism while supporting parallel queries across disparate data domains. PLINQ shares many of the same clauses and methods as LINQ. For this reason, if you are familiar with LINQ, you have the underlying knowledge of PLINQ. In most circumstances, just add the AsParallel clause to the LINQ query to transform it to PLINQ.
When iterating PLINQ results, use the ParallelQuery.ForAll method instead of the for and foreach methods. Serializing access to the results can negate some of the benefit to parallel processing.
There are several important clauses in PLINQ:
ParallelExecutionModeexplicitly sets the PLINQ query to execute in parallel.WithMergeOptionssets the buffer mode for rendering the results of a PLINQ query.AsSequentialmandates that subsequent LINQ clauses execute sequentially.AsOrderedorders the results (but the query itself is still executed in parallel).WithDegreeOfParallelismsets the number of concurrent tasks used in the PLINQ query.
Handle the AggregateException exception to catch exceptions raised in a PLINQ query. You can enumerate the AggregateExceptions.InnerExceptions attributes to inspect the underlying exceptions raised in one or more parallel tasks assigned to the PLINQ query.
You can cancel a PLINQ query by using the Cancellation model. Create an instance of the CancellationTokenSource and pass a cancellation token to the PLINQ query by using the WithCancellation method. The PLINQ query can then be canceled with the CancellationToken.Cancel method.
Reduction reduces a collection to a value. PLINQ directly supports the most common reductions, such as Sum, Average, Max, Min, and Count. For a custom reduction operation, use the ParallelEnumerate.Aggregate method.
If you want, you can implement the MapReduce pattern for PLINQ.
For more on PLINQ, see "Parallel LINQ".
This article was adapted from information in Donis Marshall's Parallel Programming with Microsoft Visual Studio 2010 Step by Step, published by Microsoft Press.



