Статьи

Асинхронный ввод-вывод и тупик ThreadPool (часть 2)

В первой части мы обнаружили тупик в синхронном подходе к чтению вывода Process и решили его с помощью асинхронного чтения. Сегодня мы распараллелим выполнение в попытке максимизировать эффективность и параллелизм.

Параллельное выполнение

А теперь давайте усложним нашу жизнь параллелизмом?

Если мы хотим запустить много процессов, мы могли бы (и должны) использовать все имеющиеся в нашем распоряжении ядра. Благодаря Parallel.Forи Parallel.ForEachэта задача сделана намного проще, чем иначе.

public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
{
    Parallel.ForEach(pathArgs, arg => ExecWithAsyncTasks(arg.Key, arg.Value, timeout));
}

Все не может быть проще! Мы передаем список исполняемых путей и их аргументов как KeyValuePairи время ожидания в миллисекундах. За исключением того, что это не сработает … по крайней мере, не всегда.

Сначала давайте обсудим, как это не будет работать, затем давайте поймем причину, прежде чем пытаться это исправить.

Когда абстракция имеет неприятные последствия

Приведенный выше код работает как брелок во многих случаях. Когда этого не происходит, число ожидания ожидания истекает. Это неприемлемо, так как мы бы не знали, получили ли мы весь вывод или его часть, если только мы не получим чистый выход без таймаутов. Впервые я заметил эту проблему совершенно по-другому. Я смотрел на

диспетчер задач
Process Explorer (если он не используется, начните сейчас, и я обещаю никому ничего не рассказывать), чтобы увидеть, насколько удивительно быстрее обстоят дела с этой единственной ForEachстрокой. Я ожидал увидеть дюжину или около того (на 12-ядерном компьютере) дочерних процессов, появляющихся и исчезающих в быстрой последовательности. Вместо этого, к моему сожалению, большую часть времени я видел только одного ребенка! Один !

И после многих испытаний, царапин на голове и чтения стало ясно, что время ожидания истекло, хотя дети явно закончили и вышли. Действительно, поскольку обычно процесс выполнялся за гораздо меньшее время, чем время ожидания, теперь он был медленнее с параллельным кодом, чем с последовательной версией. Поначалу это было неочевидно, и я разумно подозревал, что некоторые дети брали слишком много времени, или у них было слишком много записи в выходные каналы, которые могли быть взаимоблокирующими (что было мне незнакомо).

Testbed

Чтобы устранить неполадки, такие сложные, как этот, нужно начать с чистого тестового случая с минимальным количеством переменных. Для этого нужен фиктивный ребенок, который будет делать именно так, как я сказал, чтобы я мог имитировать разные сценарии. Одним из таких сценариев было бы вообще не порождать детей, а просто протестировать их Parallel.ForEachс какой-то внутрипроцессной задачей (т. Е. Просто с локальной функцией, которая выполняет работу, аналогичную той, которую выполняет ребенок).

using System;
using System.Threading;
 
namespace Child
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length < 2 || args.Length % 2 != 0)
            {
                Console.WriteLine("Usage: [echo|fill|sleep|return] ");
                return;
            }
 
            DoJob(args);
        }
 
        private static void DoJob(string[] args)
        {
            for (int argIdx = 0; argIdx < args.Length; argIdx += 2)
            {
                switch (args[argIdx].ToLowerInvariant())
                {
                    case "echo":
                        Console.WriteLine(args[argIdx + 1]);
                        break;
 
                    case "fill":
                        var rd = new Random();
                        int bytes = int.Parse(args[argIdx + 1]);
                        while (bytes-- > 0)
                        {
                            // Generate a random string as long as the .
                            Console.Write(rd.Next('a', 'z'));
                        }
                        break;
 
                    case "sleep":
                        Thread.Sleep(int.Parse(args[argIdx + 1]));
                        break;
 
                    case "return":
                        Environment.ExitCode = int.Parse(args[argIdx + 1]);
                        break;
 
                    default:
                        Console.WriteLine("Unknown command [" + args[argIdx] + "]. Skipping.");
                        break;
                }
            }
        }
    }
}

Теперь мы можем дать дочернему процессу команды для изменения его поведения, от выгрузки данных до вывода в спящий режим до немедленного возврата.

Как только проблема воспроизведена, мы можем сузить ее, чтобы точно определить источник. Выполнение точно такой же команды в одном и том же процессе (то есть без запуска другого процесса) не вызывает никаких проблем. Вызов DoJob500 раз напрямую в Parallel.ForEachфинишах до 500 мс (часто до 450 мс). Таким образом, мы можем быть уверены, что Parallel.ForEach работает нормально.

public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
{
    Parallel.ForEach(pathArgs, arg => Task.Factory.StartNew(() => DoJob(arg.Value.Split(' '))).Wait() );
}

Даже выполнение новой задачи (в пределах  Parallel.ForEach) не приводит к заметным изменениям во времени. Причиной такой хорошей производительности при выполнении заданий в новых задачах, вероятно, является то, что ThreadPoolпланировщик выбирает задачу для немедленного выполнения при вызове Wait()и выполнении. То есть, поскольку как Task.Factory.StartNew()вызов, так и DoJob()вызов выполняются в конечном итоге на ThreadPoolи потому, что Task предназначен специально для его использования, когда мы вызываем Wait()задачу, он знает, что должен запланировать следующую работу в очереди, которая в этот случай — работа задачи, над которой мы выполнили Wait! Так как вызывающий объект Wait()работает на ThreadPool, он просто выполняет его вместо планирования в другом потоке и блокирования. ДемпингThread.CurrentThread.ManagedThreadIdдо Task.Factory.StartNew()вызова и изнутри DoJobпоказывает, что действительно оба выполняются в одном потоке. Затраты на создание и планирование Задачи незначительны, поэтому мы не видим значительных изменений во времени за 500 выполнений.

Все это прекрасно и утешительно, но все же не помогает нам решить проблему: почему наши процессы не создаются и не выполняются с максимально возможной эффективностью? И почему они истекают?

В следующей части мы углубимся в проблему и узнаем, что происходит.