Статьи

Небольшая ошибка с Node.js Async и Streams

Я наткнулся на небольшую ошибку, используя async с потоками Node.js : вы можете легко испортить свой вывод, если не будете осторожны.


Node.js Streams — это абстракция каналов Unix;
они позволяют вам выталкивать или извлекать данные постепенно, никогда не сохраняя в памяти больше, чем нужно. async — это библиотека, используемая для организации всех асинхронных обратных вызовов, используемых в приложениях узлов, без получения глубокого вложения «рождественской елки» обратных вызовов, которые могут возникнуть слишком легко.

Я работаю над небольшим количеством кода, чтобы извлечь файл изображения, сохраненный в MongoDB GridFS , масштабировать изображение с помощью ImageMagick , а затем передать результат в браузер.

Мой первый проход при этом не использовал ImageMagick или потоки, и работал отлично … но как только я добавил использование async (даже до добавления в ImageMagick), я начал получать испорченные изображения в браузере, что означает, что мой потоки были испорчены.

Перед добавлением async мой код был разумным:

  app.get "/images/review-thumbnail/:id", (req, res) ->

    id = req.params.id

    store = new GridStore mongoose.connection.db, new ObjectID(id), null, "r"

    # file is just an alias for store but has some semantic value
    store.open (err, file) ->

      if err
        return res.send "Unable to open file", 500

      if file.eof()
        return res.send "Image #{id} not found.", 404

      res.contentType file.contentType

      file.stream(true).pipe res

Однако я знал, что собираюсь добавить несколько новых шагов, чтобы передать содержимое файла через ImageMagick; именно тогда я решил проверить модуль асинхронности.

Логика для обработки этого запроса — водопад ; каждый шаг запускает некоторую работу, а затем передает данные на следующий шаг через асинхронный обратный вызов. Асинхронная библиотека называет шаги «задачами»; Вы передаете массив этих задач в async.waterfall () вместе с обратным вызовом конца водопада. Этот специальный обратный вызов может быть передан с ошибкой, предоставленной любой задачей, или конечным результатом последней задачи.

С помощью waterfall () каждой задаче передается специальная функция обратного вызова. Если функции обратного вызова передается ненулевая ошибка в качестве первого параметра, то остальные задачи пропускаются, и обработчик окончательного результата вызывается немедленно для обработки ошибки.

В противном случае вы передаете значение NULL в качестве первого параметра плюс любые дополнительные значения результата. Следующей задаче передаются значения результата плюс следующий обратный вызов. Это все очень умно.

Моим первым проходом было дублирование поведения моего исходного кода, но в асинхронной модели. Это означает множество мелких функций; Я также представил дополнительный шаг между получением открытого файла и передачей его содержимого в браузер. Дополнительный шаг предназначен для дальнейшего использования ImageMagick.

Код, несмотря на дополнительный шаг, был вполне читабелен:

  app.get "/images/review-thumbnail/:id", (req, res) ->

    id = req.params.id

    openFile = (callback) ->
      store = new GridStore mongoose.connection.db, new ObjectID(id), null, "r"

      store.open callback

    readFileContents = (file, callback) ->

      if file.eof()
        return callback(new ErrorResponse 404, "Image #{id} not found")

      res.contentType file.contentType

      callback null, file.stream(true)

    streamToClient = (stream, callback) ->

      stream.pipe res

      callback null

    errorCallback = (err) ->
      if err
        sendErrorResponse res, err

    async.waterfall [ openFile, readFileContents, streamToClient], errorCallback

Мой стиль — создавать локальные переменные с каждой функцией; так что openFile запускает процесс; как только файл будет извлечен из MongoDB, будет вызвана задача readFileContents … если только нет ошибки, в этом случае errorCallback вызывается немедленно.

Внутри readFileContents мы конвертируем файл в поток с помощью file.stream (true) (значение true означает автоматическое закрытие потока после того, как все содержимое файла будет считано из GridFS).

Далее следует streamToClient, он берет этот поток и передает его в браузер через объект res (response).

Таким образом, хотя теперь он разбит на более мелкие функции, логика та же, что и в последней строке: откройте файл, прочитайте его содержимое в виде потока, передайте данные клиенту.

Однако когда я начал тестировать это, прежде чем перейти к шагу масштабирования изображения, он больше не работал. Данные изображения были повреждены. Я немного потренировался: добавлял сообщения в журналы, просматривал исходники библиотек, угадывал и экспериментировал (и я сделал Pine для настоящего отладчика!).

В конце концов я понял, что все сводится к следующему коду из модуля async:

    async.waterfall = function (tasks, callback) {
        callback = callback || function () {};
        if (!tasks.length) {
            return callback();
        }
        var wrapIterator = function (iterator) {
            return function (err) {
                if (err) {
                    callback(err);
                    callback = function () {};
                }
                else {
                    var args = Array.prototype.slice.call(arguments, 1);
                    var next = iterator.next();
                    if (next) {
                        args.push(wrapIterator(next));
                    }
                    else {
                        args.push(callback);
                    }
                    async.nextTick(function () {
                        iterator.apply(null, args);
                    });
                }
            };
        };
        wrapIterator(async.iterator(tasks))();
    };

Код в строке 7 — это функция обратного вызова, передаваемая каждой задаче; обратите внимание, что как только он решает, что делать, в строке 21 он откладывает выполнение до «следующего такта».

Корень проблемы был просто в том, что «следующий тик» был слишком поздним. К тому времени, когда появился следующий тик и активизировался streamToClient, первый кусок данных уже был прочитан из MongoDB … но поскольку вызов функции pipe () еще не был выполнен, он просто был отброшен. Конечным результатом было то, что в потоке клиенту в начале отсутствовал фрагмент или даже он был полностью пустым.

Так что это наша Утечка Абстракция на сегодня; то, что выглядело как немедленный обратный вызов, было отложено настолько, чтобы изменить общее поведение. И это, в Node, все, что может быть отложено, будет отложено, так как это делает общее приложение намного быстрее.