BlackWaspTM

This web site uses cookies. By using the site you accept the cookie policy.This message is for compliance with the UK ICO law.

Parallel and Asynchronous
.NET 4.0+

Thread-Safe Queues with ConcurrentQueue<T>

The standard .NET queue classes are not thread-safe so require additional locking to avoid errors when used in multithreaded or parallel scenarios. This additional complexity is not required with the generic ConcurrentQueue<T> class.

Queues and Parallelism

When you are processing items from a collection in a first-in, first-out (FIFO) manner, the obvious data structure to use is a queue. Items can be added to a queue as required, generally without considering the collection's current content. As you wish to process elements from the collection you can dequeue them, knowing that their order will be preserved. The .NET framework includes generic and non-generic versions of a Queue class for this purpose.

When you wish to process a queue of items as quickly as possible you may decide to parallelise your code. However, the standard Queue classes are not thread-safe. This means that it is possible to introduce unexpected errors if they are used in multithreaded or parallel operations without appropriate locking. For example, one thread may check if a queue is empty before dequeuing the next item. It is possible that the queue may have items present for the initial check but be empty for the dequeuing. This happens if another thread exhausts the queue between the two operations. It's also possible that multiple threads working on the same queue could encounter a race condition such that items in the queue are skipped or duplicated.

To demonstrate one such problem, create a new console application project. We'll be using the Interlocked class from the System.Threading namespace, so add the following to the default class:

using System.Threading;

The code for the Program class is shown below. The example creates a queue containing all of the integers between one and one million. Two parallel tasks are started, each executing the static method, ProcessQueue. This method loops until the queue is exhausted, at which point an exception is thrown and swallowed. Each iteration of the loop dequeues a value and adds it to a total.

At the end of the program the total is outputted to the console. You should see a total of 500,000,500,000. However, if you are using a computer with more than one processor core you are likely to see a different result.

class Program
{
    static long _total;
    static Queue<int> _queued;

    static void Main()
    {
        IEnumerable<int> numbers = Enumerable.Range(1, 1000000);
        _queued = new Queue<int>(numbers);
        _total = 0;

        Task task1 = Task.Run(() => ProcessQueue());
        Task task2 = Task.Run(() => ProcessQueue());

        Task.WaitAll(task1, task2);

        Console.WriteLine("Total: {0}", _total);
    }

    static void ProcessQueue()
    {
        try
        {
            while (true)
            {
                Interlocked.Add(ref _total, _queued.Dequeue());
            }
        }
        catch (InvalidOperationException e) { }
    }
}

NB: The above code could be fixed with the correct use of the lock statement. However, this is not the purpose of this article.

ConcurrentQueue<T>

The ConcurrentQueue<T> class was introduced in version 4.0 of the .NET framework, along with several other new collection types in the System.Collections.Concurrent namespace. These collections are designed for use with parallel or multithreaded code without requiring any additional thread synchronisation.

using System.Collections.Concurrent;

ConcurrentQueue<T> provides a thread-safe queue with similar functionality to the Queue class. Queues are empty on instantiation if you use the default constructor. Alternatively, an IEnumerable<T> can be passed to an overloaded constructor's single parameter to initialise the queue with a sequence of items. As with the standard queues, you can add single items to the end of a queue with the Enqueue method.

Reading elements from a concurrent queue is slightly different than when using the non-thread-safe alternatives. To read the next value in the queue without removing it you call TryPeek. To dequeue an item completely, you use TryDequeue. Both methods require an output parameter to receive the value from the front of the queue. Both methods return true if they executed successfully and set the output parameter's value. If the queue is exhausted, the methods return false. This approach is necessary as there is no thread-safe way in which to check if the queue contains items before peeking or dequeuing. It is also quite possible that a call to TryPeek followed by an immediate call to TryDequeue will yield different values if another thread extracts a value between the two calls.

Let's rewrite the earlier example to use a concurrent queue. This requires only a few modifications. Firstly we need to change the type of the queue object. We then need to use TryDequeue instead of Dequeue and add the value from the output parameter to the total. We no longer need to wait for an exception to be thrown to know that we have exhausted the queue. We can use the return value from TryDequeue instead, halting the loop when the method returns false.

The updated code is shown below. When you execute it you should see that the correct total is generated every time.

class Program
{
    static long _total;
    static ConcurrentQueue<int> _queued;

    static void Main(string[] args)
    {
        IEnumerable<int> numbers = Enumerable.Range(1, 1000000);
        _queued = new ConcurrentQueue<int>(numbers);
        _total = 0;

        Task task1 = Task.Run(() => ProcessQueue());
        Task task2 = Task.Run(() => ProcessQueue());

        Task.WaitAll(task1, task2);

        Console.WriteLine("Total: {0}", _total);
    }

    static void ProcessQueue()
    {
        int value;

        while (_queued.TryDequeue(out value))
        {
            Interlocked.Add(ref _total, value);
        }
    }
}

/* OUTPUT

Total: 500000500000

*/
21 May 2013