Статьи

Использование BlockingCollection для связи между потоками


Рассмотрим эти (несколько) общие проблемы программирования:

  • Я использую стороннюю библиотеку, которая не является поточно-ориентированной, но я хочу, чтобы мое приложение делило работу между несколькими потоками. Как мне организовать вызовы между моим многопоточным кодом и однопоточной библиотекой?
  • У меня есть один источник событий в одном потоке, но я хочу разделить работу между пулом из нескольких потоков?
  • У меня есть несколько потоков, генерирующих события, но я хочу использовать их в одном потоке?

Один из способов сделать это — иметь некоторое общее состояние, поле или свойство в статическом классе и обернуть его замками, чтобы несколько потоков могли безопасно обращаться к нему. Это довольно распространенный способ попытаться снять шкуру с этой особой кошки, но в нее попадают ловушки для неосторожных. Кроме того, это может снизить производительность, поскольку доступ к общему ресурсу сериализуется, даже если доступ к нему выполняется параллельно.

Лучшим способом является использование BlockingCollection и взаимодействие ваших потоков через классы сообщений.

BlockingCollection — это класс в новом пространстве имен System.Collections.Concurrent, поставляемом с .NET 4.0. Он содержит ConcurrentQueue, хотя вы можете поменять его на ConcurrentStack или ConcurrentBag, если хотите. Вы вставляете объекты в один конец и сидите в цикле, поглощая их от другого. (Несколько) производитель (и) и (несколько) потребитель (ы) могут работать в разных потоках без каких-либо блокировок. Это нормально, потому что классы коллекции пространства имен Concurrent гарантированно безопасны для потоков. «Блокирующая» часть имени присутствует потому, что конечные блоки потребляют до тех пор, пока объект не станет доступен. Джастин Этередж имеет отличный пост, который более подробно рассматривает BlockingCollection здесь .

Для примера давайте реализуем параллельный конвейер. Аппарат ИВЛ создает задачи для параллельной обработки, группа рабочих обрабатывает задачи в отдельных потоках, а приемник снова собирает результаты. Он показывает связь между потоками «один ко многим» и «многие к одному». Я украл идею и диаграмму из отличного руководства ZeroMQ :

zguide_parallel_workers

Сначала нам понадобится класс, представляющий часть работы, в этом примере мы будем очень просты:

public class WorkItem
{
    public string Text { get; set; }
}

Нам понадобятся две BlockingCollections, одна для передачи задач от аппарата ИВЛ к рабочим, а другая для доставки готовой работы от рабочих к раковине:

var ventilatorQueue = new BlockingCollection<WorkItem>();
var sinkQueue = new BlockingCollection<WorkItem>();

Теперь давайте напишем наш вентилятор:

public static void StartVentilator(BlockingCollection<WorkItem> ventilatorQueue)
{
    Task.Factory.StartNew(() =>
    {
        for (int i = 0; i < 100; i++)
        {
            ventilatorQueue.Add(new WorkItem { Text = string.Format("Item {0}", i) });
        }
    }, TaskCreationOptions.LongRunning);
}

Он просто повторяет 100 раз, создавая рабочие элементы и помещая их в ventilatorQueue.

Вот рабочий:

public static void StartWorker(int workerNumber,
    BlockingCollection<WorkItem> ventilatorQueue,
    BlockingCollection<WorkItem> sinkQueue)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var workItem in ventilatorQueue.GetConsumingEnumerable())
        {
            // pretend to take some time to process
            Thread.Sleep(30);
            workItem.Text = workItem.Text + " processed by worker " + workerNumber;
            sinkQueue.Add(workItem);
        }
    }, TaskCreationOptions.LongRunning);
}

BlockingCollection предоставляет метод GetConsumingEnumerable, который возвращает каждый элемент по очереди. Блокируется, если в очереди нет элементов. Обратите внимание, что я не беспокоюсь о шаблонах выключения в этом коде. В рабочем коде вам нужно беспокоиться о том, как закрыть ваши рабочие потоки.

Далее давайте напишем наш приемник:

public static void StartSink(BlockingCollection<WorkItem> sinkQueue)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var workItem in sinkQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Processed Messsage: {0}", workItem.Text);
        }
    }, TaskCreationOptions.LongRunning);
}

Еще раз, это находится в бесконечном цикле foreach, потребляющем элементы из sinkQueue.

Наконец нам нужно соединить кусочки и запустить их:

StartSink(sinkQueue);

StartWorker(0, ventilatorQueue, sinkQueue);
StartWorker(1, ventilatorQueue, sinkQueue);
StartWorker(2, ventilatorQueue, sinkQueue);

StartVentilator(ventilatorQueue);

Сначала я начал слив, потом рабочие и, наконец, продюсер. Не имеет большого значения, в каком порядке они запускаются, поскольку в очередях будут храниться любые задачи, которые ИВЛ создает до запуска рабочих и приемника.

Запустив код, я получаю вывод примерно так:

Processed Messsage: Item 1 processed by worker 1
Processed Messsage: Item 2 processed by worker 0
Processed Messsage: Item 0 processed by worker 2
Processed Messsage: Item 5 processed by worker 2
Processed Messsage: Item 3 processed by worker 1

....

Processed Messsage: Item 95 processed by worker 0
Processed Messsage: Item 98 processed by worker 0
Processed Messsage: Item 97 processed by worker 2
Processed Messsage: Item 96 processed by worker 1
Processed Messsage: Item 99 processed by worker 0

Этот шаблон — отличный способ развязать связь между источником и потребителем, или производителем, и потребителем. Он также позволяет иметь несколько источников и несколько приемников, но в первую очередь это безопасный способ взаимодействия нескольких потоков.

Полный пример здесь на GitHub.