BlackWaspTM

This web site uses cookies. By using the site you accept the cookie policy.This message is for compliance with the UK ICO law.

Parallel and Asynchronous
.NET 4.0+

Cancelling Parallel LINQ Queries

Parallel Language-Integrated Query (PLINQ) provides the performance benefits associated with parallel tasks, with the declarative approach of LINQ. This powerful combination also supports cancellation of queries, using standard cancellation tokens.

Cancelling a Simple Query

When you wish to enable cancellation for a simple query, where each operation is short-lived, you only need to provide the query with a cancellation token. If you decide to cancel the query, the remaining operations will end quickly and the query will terminate. To provide the cancellation token, you use the WithCancellation method, executed against the parallelised sequence. The cancellation token is provided to its single parameter.

Consider the following code, which executes a long but simple query and provides the option to cancel it:

static void Main()
{
    IEnumerable<int> range = Enumerable.Range(1, 10000000);

    CancellationTokenSource tokenSource = new CancellationTokenSource();

    new Thread(() =>
    {
        Console.WriteLine("Started");

        try
        {
            // Long running query
            List<double> sqrts = range
            .AsParallel()
            .WithCancellation(tokenSource.Token)
            .Select(n => Math.Sqrt(n))
            .ToList();

            Console.WriteLine("Finished processing {0} square roots", sqrts.Count);
        }
        catch (OperationCanceledException)
        {
            // Operation cancelled
            Console.WriteLine("Cancelled");
        }
        catch (AggregateException aex)
        {
            // At least one exception caught
            foreach (Exception ex in aex.WithoutCancellations().InnerExceptions)
            {
                Console.WriteLine(ex.Message);
            }
       }
    }).Start();

    Console.WriteLine("Press Enter to stop");
    Console.ReadLine();
    tokenSource.Cancel();
}

In the above code, we use the Enumerable.Range method to create a very large sequence of integers. This is the source sequence for the parallel query that we will execute. We also initialise the cancellation token source that will be used to generate the token that will signal the cancellation request to the parallel LINQ query.

The next element of the code creates and starts the new thread that executes the query. The query and the code that signals cancellation are in separate threads, as the PLINQ query blocks its thread whilst executing. We could cancel from the same thread, but only from within the code of the query itself. For user-driven cancellation, this is not a likely scenario. Note that the query is within a try / catch block. The first catch statement captures cancellation exceptions and shows a message. The second captures aggregated exceptions, ignores any cancellations and displays the details of any other, unexpected exceptions. If the query is allowed to run to completion, a success message is shown.

Once the thread is started, the code shows a message and waits for the user to press Enter. When Enter is pressed, the cancellation is requested. If the query has already completed, the call to Cancel has no effect and the output is as follows:

Press Enter to stop
Started
Finished processing 10000000 square roots

If you press Enter whilst the query is still running, the cancellation request prevents any further iteration. Once the already scheduled calculations are complete, an OperationCanceledException is thrown and the "Cancelled" message is outputted:

Started
Press Enter to stop

Cancelled

Cancelling Complex Queries

When you create more complex queries, where each iteration is slow, relying on the above approach is not ideal. As any started operation within the query is guaranteed to run to completion, if those operations take a long time to finish the cancellation can be slow. When cancellation is requested by the user, this can lead to a user interface that feels unresponsive.

The solution is to check the cancellation token periodically within the code for the query and terminate the long process early if the process has been cancelled. As with parallel tasks, you can check the token's IsCancellationRequested property to determine when the query is being cancelled. When set to true, you should perform any operations that are required to leave the data in a consistent state before exiting. If there is no clean-up required, you can simply call the token's ThrowIfCancellationRequested method. This does nothing when cancellation has not been requested. If the query is terminating, the method throws a cancellation exception.

The following code shows a modified version of the previous example. In this case the number of items in the source sequence has been lowered but the processing applied to each element takes much longer. You can see that the query's processing is performed using the SlowProcess method, which is passed the cancellation token. This method simulates a slow process by pausing for one second, using a series of one hundred, ten-millisecond delays. After each of the smaller pauses the token is checked, ensuring that cancellation for any iteration takes only a fraction of a second. In a real situation you should aim to check for cancellation at least this often and more frequently if possible.

static void Main(string[] args)
{
    IEnumerable<int> range = Enumerable.Range(1, 100);

    CancellationTokenSource tokenSource = new CancellationTokenSource();

    new Thread(() =>
    {
        Console.WriteLine("Started");

        try
        {
            List<int> results = range
            .AsParallel()
            .WithCancellation(tokenSource.Token)
            .Select(n => SlowProcess(n, tokenSource.Token))
            .ToList();

            Console.WriteLine("Finished processing {0} items", results.Count);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancelled");
        }
        catch (AggregateException aex)
        {
            foreach (Exception ex in aex.WithoutCancellations().InnerExceptions)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }).Start();

    Console.WriteLine("Press Enter to stop");
    Console.ReadLine();
    tokenSource.Cancel();
}

private static int SlowProcess(int value, CancellationToken token)
{
    // One second pause
    for (int i = 0; i < 100; i++)
    {
        // Detect cancellation
        token.ThrowIfCancellationRequested();
        Thread.Sleep(10);
    }
    return value;
}
7 October 2012