Статьи

Асинхронный ввод-вывод и тупик ThreadPool (часть 1)

Я упоминал в прошлом посте, что он был задуман при чтении исходного кода для System.Diagnostics.Processкласса. Этот пост о причине, которая подтолкнула меня к чтению исходного кода в попытке решить проблему. Оказалось, что это был еще один случай Leaky Abstraction , который представляет особый интерес для меня.

Как оказалось, этот пост оказался слишком длинным (даже для меня). Я не люблю рассрочки, но я чувствовал, что это то, что стоит попробовать, так как размер был слишком большим для потребления в один пост. Таким образом, я разделил его на 5 частей, так что каждая часть была бы около 1000 слов или меньше. Я отправлю одну часть в день.

Чтобы дать вам представление о масштабах и тематике предстоящих событий, вот краткий обзор. В части 1 я выложу проблему. Мы пытаемся порождать процессы, читать их результаты и убивать, если они занимают слишком много времени. Наша первая попытка — использовать простой синхронный ввод / вывод для чтения вывода и обнаружения тупика. Мы решаем тупик, используя асинхронный ввод-вывод. Во второй части мы распараллеливаем код и обнаруживаем снижение производительности и еще один тупик. Мы создаем испытательный стенд и приступаем к углубленному исследованию проблемы. В третьей части мы выясним причину и обсудим механизм (как и почему), с которым мы столкнулись с такой проблемой. В четвертой части мы обсудим решения проблемы и разработаем общие решения (с кодом) для решения проблемы. Наконец, в части 5 мы видим, может ли универсальное решение работать, прежде чем подвести итоги и сделать выводы.

Давайте начнем с самого начала. Предположим, что вы хотите выполнить какую-то программу (назовите ее дочерней), получить весь ее вывод (и ошибку) и, если она не завершится в течение какого-то времени, убить ее. Обратите внимание, что нет взаимодействия и нет ввода. Вот как тесты выполняются в Phalanger  с использованием тестового бегуна.

Синхронный ввод / вывод

ProcessКласс удобно подвергается основные трубы для дочернего процесса с использованием экземпляров потока StandardOutput и StandardError. И, как и многие, у нас тоже может возникнуть соблазн просто позвонить StandardOutput.ReadToEnd()и StandardError.ReadToEnd(). Хотя это будет работать, пока не сработает. Как отметил Раймонд Чен , он будет работать до тех пор, пока данные помещаются во внутренний конвейерный буфер. Проблема этого подхода заключается в том, что мы просим прочитать до тех пор, пока мы не достигнем конца данных, что наверняка произойдет только тогда, когда завершится дочерний процесс, который мы породили. Однако, когда буфер канала, в который дочерний элемент записывает свои выходные данные, заполнен, дочерний элемент должен ждать, пока в буфере не останется свободного места для записи. Но вы говорите, что если мы всегда читаем и очищаем буфер?

Хорошая идея, за исключением того , что нам нужно сделать , что для обоих StandardOutput и StandardError одновременно . В StandardOutput.ReadToEnd()вызове мы читаем каждый байт, поступающий в буфер, пока не завершится дочерний процесс. Хотя мы опустошили StandardOutputбуфер (так что дочерний процесс не может быть заблокирован на этом,), если он заполнит StandardErrorбуфер, который мы еще не читаем, мы будем в тупике. Дочерний процесс не завершится до тех пор, пока не выполнит полную запись в StandardErrorбуфер (который заполнен, потому что никто его не читает), в то время как мы ожидаем завершения процесса, поэтому мы можем быть уверены, что прочитаем до конца, StandardOutputпрежде чем мы вернемся (и начать читатьStandardError). Та же проблема существует для StandardOutput, если мы сначала прочитаем StandardError, следовательно, необходимо истощить оба конвейерных буфера по мере их подачи, а не один за другим.

Асинхронное чтение

Очевидное (и единственное практическое) решение — читать оба канала одновременно, используя отдельные потоки. Для этого есть в основном два подхода. Подход до 4.0 (асинхронные события) и 4.5 и выше (задачи).

Асинхронное чтение с событиями

Код достаточно прост, поскольку использует события .Net. У нас есть два события ручного сброса и два делегата, которые вызываются асинхронно, когда мы читаем строку из каждого канала. Мы получаем нулевые данные, когда достигаем конца файла (т.е. когда процесс завершается) для каждого из двух каналов.

public static string ExecWithAsyncEvents(string path, string args, int timeoutMs)
{
    using (var outputWaitHandle = new ManualResetEvent(false))
    {
        using (var errorWaitHandle = new ManualResetEvent(false))
        {
            using (var process = new Process())
            {
                process.StartInfo = new ProcessStartInfo(path);
                process.StartInfo.Arguments = args;
                process.StartInfo.UseShellExecute = false;
                process.StartInfo.RedirectStandardOutput = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.ErrorDialog = false;
                process.StartInfo.CreateNoWindow = true;
 
                var sb = new StringBuilder(1024);
                process.OutputDataReceived += (sender, e) =>
                {
                    sb.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        outputWaitHandle.Set();
                    }
                };
                process.ErrorDataReceived += (sender, e) =>
                {
                    sb.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        errorWaitHandle.Set();
                    }
                };
 
                process.Start();
                process.BeginOutputReadLine();
                process.BeginErrorReadLine();
 
                process.WaitForExit(timeoutMs);
                outputWaitHandle.WaitOne(timeoutMs);
                errorWaitHandle.WaitOne(timeoutMs);
 
                process.CancelErrorRead();
                process.CancelOutputRead();
 
                return sb.ToString();
            }
        }
    }
}

Мы, конечно, можем улучшить приведенный выше код (например, мы должны сделать общий предел ожидания <= timeoutMs), но вы получите точку с этим примером. Кроме того, нет обработки ошибок или уничтожения дочернего процесса, когда он истекает и не завершается.

Асинхронное чтение с задачами

Гораздо более упрощенный и дезинфицированный подход состоит в том, чтобы использовать новое System.Threading.Tasksпространство имен / структуру, чтобы сделать все тяжелое для нас. Как вы можете видеть, код был сокращен вдвое, и он стал гораздо более читабельным, но нам нужна Framework 4.5 и новее, чтобы это работало (хотя моя цель — 4.0, но для целей сравнения я сделал это немного быстрее). Результаты одинаковы.

public static string ExecWithAsyncTasks(string path, string args, int timeout)
{
    using (var process = new Process())
    {
        process.StartInfo = new ProcessStartInfo(path);
        process.StartInfo.Arguments = args;
        process.StartInfo.UseShellExecute = false;
        process.StartInfo.RedirectStandardOutput = true;
        process.StartInfo.RedirectStandardError = true;
        process.StartInfo.ErrorDialog = false;
        process.StartInfo.CreateNoWindow = true;
 
        var sb = new StringBuilder(1024);
 
        process.Start();
        var stdOutTask = process.StandardOutput.ReadToEndAsync();
        var stdErrTask = process.StandardError.ReadToEndAsync();
 
        process.WaitForExit(timeout);
        stdOutTask.Wait(timeout);
        stdErrTask.Wait(timeout);
 
        return sb.ToString();
    }
}

Опять же, нормальная доза обработки ошибок в порядке, но в целях иллюстрации опущены. Следует отметить, что мы не можем предположить, что мы читаем потоки к моменту выхода ребенка. Существует условие гонки, и нам все еще нужно дождаться завершения операций ввода-вывода, прежде чем мы сможем прочитать результаты.

В следующей части мы распараллелим выполнение в попытке максимизировать эффективность и параллелизм.