Parallel LINQ

PLINQ lets you tap into extra power but with little extra work


January 05, 2009
URL:http://www.drdobbs.com/parallel/parallel-linq/212700663

Multicore processors are a standard part of computing these days. For instance, the laptop I'm writing this article on has a Core 2 Duo Intel 64-bit processor and 3 GBs of RAM. That's a lot of computing power for single-threaded code. Luckily, Parallel LINQ (PLINQ), which is part of the Parallel FX extensions for .NET, lets me use the basic LINQ keywords to tap into that extra power with little extra work on my part. The the Parallel FX Library is a managed concurrency library that includes PLINQ and the Task Parallel Library (TPL).

The basic use of PLINQ is to add a reference to the downloaded System.Threading.dll installed by default at C:\ Program Files\ Microsoft Parallel Extensions Jun08 CTP and call the IParallelEnumerable.AsParallel extension method on your collection. IParallelEnumerable<T> inherits from IEnumerable<T> and generally appears in a LINQ query at the end of the from range in collection clause. For instance, if the collection were an array of integers named numbers, then you would substitute collection with numbers.As Parallel().

Listing One uses the Sieve of Eratosthenes to determine if a number is a prime. A list of primes is built using the sieve technique, and a LINQ query runs through a bunch of numbers testing for primality. Listing One(a) is the sequential query and One(b) the parallel version.

Listing One


(a)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace Primes
{
  class Program
  {
    static void Main(string[] args)
    {
      const long upper = 1000000;
      SieveOfEratosthenes.BuildPrimes(upper);
      int[] candidates = new int[upper-2];
      for (int i = 2; i < upper; i++)
        candidates[i-2] = i;
      Stopwatch watch = Stopwatch.StartNew();
      var primes = from p in candidates
                   where SieveOfEratosthenes.IsPrime(p)
                   select p;
      Console.WriteLine("Sequential Elapsed: {0}", watch.Elapsed);

(b)
      watch = Stopwatch.StartNew();
      primes = from p in candidates.AsParallel().AsOrdered()
               where SieveOfEratosthenes.IsPrime(p)
               select p;

      Console.WriteLine("Parallel Elapsed: {0}", watch.Elapsed);
      Array.ForEach(primes.Take(20).ToArray(), x => Console.WriteLine(x));
      Console.ReadLine();
    }
  }
  public class SieveOfEratosthenes
  {
    private static List<long> primes = new List<long>();
    public static List<long> Primes
    {
    	get {	return primes; }
    	set { primes = value;	}
    }
    public static void BuildPrimes(long max)
    {
      if (max < 3) return;
      for (long i = 2; i <= max; i++)
      {
        if (IsPrime(i))
          Primes.Add(i);
      }
    }
    /// <summary>
    /// Use the Sieve of Eratosthenes: no number is divisible 
    /// by a number greater than its square root
    /// </summary>
    /// <param name="v"></param>
    /// <returns></returns>
    public static bool IsPrime(long v)
    {
      if (v == 2) return true;
      for (int i = 0; i < Primes.Count; i++)
      {
        if (v % Primes[i] == 0) return false;
        if (Primes[i] >= Math.Sqrt(v)) return true;
      }
      return true;
    }
  }
}

If this is the first time you've used LINQ queries, then the queries are on the lines starting with var and followed by = from p in candidates...where...select.... The LINQ queries look somewhat like inverted SQL queries.

Again, the first LINQ query in Listing One is sequential and the second is parallel. The AsParallel extension method kicks off the background threads. At minimum, calling AsParallel is all you need to do to use PLINQ and multiple threads for your LINQ queries.

By default, the original sequence order is not maintained when you invoke a PLINQ query. If you want the order of the sequence maintained, then call AsOrdered. However, maintaining order incurs some performance penalties. On my Dell Precision 470 workstation, the parallel version of this ran consistently slower than the sequential version. Let's explore some reasons performance may actually degrade for parallel operations.

Understanding Exclusion Scenarios for PLINQ

PLINQ works with LINQ-for-Objects and LINQ-for-XML. PLINQ isn't intended for use with LINQ-to-SQL or LINQ-to-Entities. Why? Because the IQueryProvider implementation for SQL Server basically translates LINQ queries to SQL queries for LINQ-to-SQL and LINQ-to-Entities--queries that are processed by the SQL engine instead of in memory.

CPU-bound queries that query large amounts of data, perform intensive computations, or a combination of both will yield better results than queries that are I/O bound, such as queries against the filesystem or SQL server.

You might reasonably wonder why a parallel capability couldn't just automatically be added to LINQ behind the scenes. The answer is that programmer involvement is needed to handle data impurity, concurrency exceptions, thread affinity, ordering expectations, and poorer than expected performance problems in some scenarios. Adverse conditions for parallelism are referred to as 'parallelism blockers' and include:

Finally, optimal parallelism is affected by Amdahl's law, which says roughly that performance speedup is limited by the amount of code that must be processed sequentially. That is, at some point after all of the code and data are partitioned, some of it must be processed sequentially, and the sequential processing that must occur and all of that synchronizing, partitioning, cross-thread communicating, and results-merging detract from performance speedup. The net effect is that having two or four processors do not necessarily mean that your code will run two times or four times faster, respectively.

Handling Concurrency Exceptions

Parallel extensions always throw a single AggregateException object. AggregateException contains an inner collection property named InnerExceptions. If individual exceptions were thrown, then it would be easier for programmers to miss one or the other possible catch blocks. However, using a single exception class makes debugging a little harder because the break happens at the location of the AggregateException catch block rather than the location of the underlying exception.

The net result is that debugging exceptions require you to do some extra detective work by examining the AggregateException.InnerExceptions collection. Sometimes you can navigate the stack trace to figure out where things went wrong, but the Visual Studio call stack is not 100-percent navigable.

Tips for Enhancing Performance

I just finished re-reading Dan Brown's Digital Fortress, which is about the NSA, cryptography, spies, and secrets. The book includes some examples of simple word ciphers and references to the World War II Enigma cipher machine. This led me to Listing Two which reads the text of a book from Gutenberg.org and uses a simple cipher to encrypt the text. The sample text is from Lewis Carroll's Alice's Adventures in Wonderland and uses a simple substitution cipher to encrypt the text. At 29,000 words, the document is not huge, but if you run the sample code, you see the parallel encryption loop is significantly faster than the sequential loop.

Listing Two uses the WebClient class to download the text of Alice in Wonderland (referring to the Cypher.Demo method). The text of the story is split into lines -- characters actually resulted in poorer performance than the sequential version--and encrypted. The text is encrypted three times sequentially and three times in parallel--referring to the for loop and Cy pher.ScrambleSequentially and Cypher.ScrambleParallel methods. Performance improves significantly after ScrambleParallel is called the first time; this is probably due to how long it takes to queue worker threads the first time. The text is encrypted and a Stopwatch is used to display the elapsed time.

Listing Two


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Diagnostics;
using System.Threading;
using System.Collections;

namespace SimpleCypher
{
  // convert to simple cypher demo - encrypting the message
  class Program
  {
    static void Main(string[] args)
    {
      Cypher.Test();
      Cypher.Demo();
    }
  }
  public class Cypher
  {
    public static void Demo()
    {
      WriteHeader("Scramble Sequential and Parallel Times");
      
      string text =
          new WebClient().DownloadString(
          "http://www.gutenberg.org/files/11/11.txt");
      // Alice's Adventures in Wonderland
      string[] lines = text.Split(new char[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries);

      CreateNewCypher();

      for (int i = 0; i < 3; i++)
      {
        Stopwatch watch = Stopwatch.StartNew();
        var encrypted1 = Cypher.ScrambleSequentially(lines);

        watch.Stop();
        Console.WriteLine("Elapsed sequential: {0}", watch.Elapsed);

        // encrypt parallel
        watch = Stopwatch.StartNew();
        var encrypted2 = Cypher.ScrambleParallel(lines);

        watch.Stop();
        Console.WriteLine("Elapsed parallel: {0}", watch.Elapsed);
        Console.WriteLine();
      }

      Console.ReadLine();
    }

    private static void WriteHeader(string header)
    {
      Console.WriteLine('+' + new string('-', 40));
      Console.WriteLine('|' + header);
      Console.WriteLine('+' + new string('-', 40));
      Console.WriteLine();
    }

    public static void Test()
    {
      
      WriteHeader("Scramble/Unscramble Test");
      string text =
          new WebClient().DownloadString(
          "http://www.gutenberg.org/files/11/11.txt");
      // Alice's Adventures in Wonderland
      string[] lines = text.Split(new char[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries);

      CreateNewCypher();

      var encrypted = Cypher.ScrambleSequentially(lines);
      Array.ForEach(encrypted.Take(5).ToArray(), x => Console.WriteLine(x));
      Console.WriteLine();
      var decrypted = Cypher.Unscramble(encrypted);
      Array.ForEach(decrypted.Take(5).ToArray(), x=>Console.WriteLine(x));
      Console.WriteLine();
    }

    private static List<Character> cypher = new List<Character>();

    private class Character : IComparable
    { 
      public char cypher; 
      public char ch;


      #region IComparable Members

      public int CompareTo(object obj)
      {
        return this.cypher.CompareTo(((Character)obj).cypher);
      }

      #endregion
    }
    public static void CreateNewCypher()
    {
      string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";

      cypher.Clear();
      Random random = new Random(DateTime.Now.Millisecond);

      for (char c = 'A'; c <= 'Z'; c++)
      {
        char rnd = chars[(char)random.Next(0, chars.Length-1)];
        chars = chars.Remove(chars.IndexOf(rnd), 1);
        cypher.Add(new Character() { ch = c, cypher = rnd });
      }

      cypher.Sort();
      Array.ForEach(cypher.ToArray(), x => Debug.WriteLine(
        string.Format("{0} = {1}", x.cypher, x.ch)));

    }

    public static string[] ScrambleSequentially(string[] lines)
    {
      return (from line in lines
              select Cypher.EncryptLine(line)).ToArray();
    }

    public static string[] ScrambleParallel(string[] lines)
    {
      return (from line in lines.AsParallel().AsOrdered()
                       select Cypher.EncryptLine(line)).ToArray();
    }

    public static string[] Unscramble(string[] lines)
    {
      return (from line in lines
              select Cypher.DecryptLine(line)).ToArray();
    }

    public static string EncryptLine(string line)
    {
      StringBuilder builder = new StringBuilder();
      foreach (char ch in line.ToCharArray())
      {
        Character character = cypher.Find(test => test.ch == Char.ToUpper(ch));
        if (character == null)
          builder.Append(ch);
        else if (Char.IsLower(ch))
          builder.Append(Char.ToLower(character.cypher));
        else
          builder.Append(character.cypher);
      }
      return builder.ToString();
    }

    public static string DecryptLine(string line)
    {
      StringBuilder builder = new StringBuilder();
      foreach (char ch in line.ToCharArray())
      {
        Character character = cypher.Find(test => test.cypher == Char.ToUpper(ch));
        if (character == null)
          builder.Append(ch);
        else if (Char.IsLower(ch))
          builder.Append(Char.ToLower(character.ch));
        else
          builder.Append(character.ch);
      }
      return builder.ToString();
    }
  }
}

The EncryptLine and DecryptLine methods work by using the simple substitution cypher generated in CreateNewCypher. CreateNewCypher uses all of the uppercase letters of the alphabet and assigns one of 26 possible substitute values until all letters are exhausted. Thus, A might be randomly substituted for F, B for Z, and so on. The easiest way to decrypt is to reverse the process using the same cypher values. (Or use trial-and-error, manually, which any first-year cryptographer could do. You could probably use something like this to perplex casual snoops, but I wouldn't break any laws and encode details in an e-mail using cypher substitution -- if you know what I mean.)

Of course, not all problems are amenable for parallel execution. For instance, the cypher example didn't work so well at the character level, probably because the limited amount of work -- substituting one character at a timeÑcosts less than partitioning the data and spinning threads. You can enhance the results of parallel performance by following a few basic recommendations:

It is worth noting that PLINQ is part of Parallel FX, so some of these tips apply to parallelism in general.

Conclusion

There are several samples with the Parallel FX Library, including a ray tracer and a C++ example that renders Mandelbrot Fractals. When I ran the ray tracer algorithm, the image rendered in 47 seconds sequentially and in 32 seconds in parallel.


Paul is an applications architect for EDS and author of LINQ Unleashed for C#. He can be contacted at [email protected].

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.