Статьи

Введение в новые потоки Node

В последнее время в твиттере и в # node.js было много споров о новом API-интерфейсе streams2 . Официальные потоковые документы оставляют желать лучшего, что приводит к общей путанице. Это очень плохо, потому что использование новых потоков может действительно упростить ваш код, как только вы поймете, как они работают. Позволь мне отвезти тебя туда …

Если вы никогда ранее не использовали потоки узлов, я настоятельно рекомендую прочитать введение Макса Огдена . Для ленивых: поток передает данные из «источника» в «приемник», не считывая весь набор данных в память одновременно. (Подумайте, unix pipe, но для узла.)

Зачем нам нужна новая версия потоков?

В предыдущих версиях узла единственная помощь, которую вам оказывали, была базовый Streamпрототип. Это не дало вам намного больше, чем .pipeфункция, которая соединяла ваш вход и выход. Потоки было трудно реализовать. Ну, это не совсем так. Их было сложно правильно реализовать .

Раньше существовали всевозможные неявные правила, которым должны следовать надлежащие потоки для записи данных. Если поток больше не может содержать данные, запись должна вернуться false. Но вряд ли какие-либо потоки должным образом учитывают «противодавление» в сложных многотрубных операциях.

Что еще хуже, потоки начали бы отправлять данные волей-неволей (как правило, на nextTick). И если вы хотите объявить обработчики событий асинхронно, вам следует помолиться , чтобы автор потока реализовал этот .pauseметод!

Авторы потоков должны были отслеживать слишком много состояний и определять слишком много методов.

В узле 0.10.x вы получаете намного больше сразу.

Вместо одного Streamпрототипа вы получаете четыре новых базовых прототипа. Каждый из прототипов предоставляет вам множество функциональных возможностей: они будут обрабатывать противодавление, надлежащую обработку событий и даже оптимизируют буферизацию данных. Вам нужно только переопределить один или два метода, и у вас будет свой собственный поток!

Я покажу вам пример из реальной жизни.

На Segment.io мы регистрируем все наши данные в сервисе Amazon S3, где мы храним их в формате GSON, разделенном строкой JSON. Существуют миллионы этих файлов журналов (конечно, больше, чем у меня есть место на жестком диске), поэтому, когда мы хотим манипулировать ими, имеет смысл передавать их прямо из Amazon.

Я хотел создать поток, который позволил бы нам экспортировать эти данные и отправлять их туда, где нам нужно. Может быть, это локальный файл, пользовательский скрипт или даже внешний API.

Самая крутая часть? Все это может быть реализовано правильно (и по-прежнему взаимодействовать с основными функциями) всего с сотней строк кода!

Давайте начнем.

Новые прототипы Stream.

Как я упоминал ранее, Streamв 0.10 появилось четыре новых прототипа, встроенных в модуль потока . Я собираюсь охватить три, которые имеют отношение к нашему примеру.

Если вам нужна совместимость с 0.8, вам нужно будет использовать модуль readable-stream . Это всего лишь модуль ядра, превращенный в версию, которую вы можете установить через npm. Тогда всякий раз, когда вы хотите отказаться от поддержки 0.8, вы просто меняете свои requireзаявления!

stream.Readable

Любые прочитанные вами данные всегда будут начинаться с объекта stream.Readable . Может быть получение данных из ответа HTTP, чтение из файла или запрос к базе данных. Читаемые потоки — это то, что вы используете для чтения данных из внешнего источника.

После того, как вы добавили 'data'обработчик событий в ваш читаемый поток, он будет выдавать новые данные каждый раз, когда будет что-то новое для чтения. Вы можете направить читаемый поток в доступный для записи поток или решить прочитать его самостоятельно:

var fs = require('fs');

var readable = fs.createReadStream('my-file.txt');

// this is the classic api
readable
  .on('data',  function (data) { console.log('Data!', data);  })
  .on('error', function (err)  { console.error('Error', err); })
  .on('end',   function ()     { console.log('All done!');    });

Я не буду говорить о новом readableAPI, так как считаю его более запутанным и менее полезным для разработчиков потоков. .on('data')Интерфейс будет работать для любого типа потоков, так это то, что я буду использовать здесь. Если вы пипец, это вообще не имеет значения.

stream.Writable

Записываемые потоки как раз наоборот. Они являются «раковиной» потока; где будут записаны все ваши данные. Вы также можете сразу передать данные в поток для записи.

var fs = require('fs');

var readable = fs.createReadStream('old-file.txt')
  , writable = fs.createWriteStream('copy.txt');

readable.pipe(writable)
  .on('finish', function () { writable.write('an extra line'); });

Поток, доступный для записи, также будет выводить 'finish'событие после его завершения, что позволит вам корректно выйти из программы после записи всего вывода.

stream.Transform

Потоки преобразования доступны как для чтения, так и для записи, где входные данные связаны с выходными данными. Любой вид карты или шага фильтра хорошо моделируется как поток преобразования.

Обычно вы реализуете поток преобразования, чтобы преодолеть разрыв между читаемыми и записываемыми потоками, как правило, как канал между ними. Если вы использовали сквозной модуль dominictarr , потоки преобразования обеспечивают очень похожую функциональность.


Основываясь на этих трех прототипах, мы можем написать довольно сложную функциональность в несколько строк кода.

Давайте посмотрим на все крошечные потоки, которые идут в нашем примере S3 …

1. Перечислите файлы из S3.

Помните, ваш источник потока всегда будет включать в себя читаемый поток. В нашем случае первый шаг — это перечисление всех файлов в нашей корзине S3. Поскольку в нашем ведении может быть тысячи или миллионы файлов, мы хотим передать результаты.

Реализация читаемого потока состоит из двух частей. Сначала мы хотим унаследовать от читаемого прототипа потока и поместить наш поток в objectMode.

Что это objectMode? Этот флаг используется для указания потоку, что он должен иметь дело с объектами, а не со строками или буферами. Потоки, которые имеют дело только с текстом (что часто происходит в ядре), могут выполнять оптимизацию и более точное обратное давление, связанное с буферизацией.

Как правило, всякий раз, когда вы хотите читать или записывать объекты JavaScript, которые не являются строками или буферами, вам нужно поместить свой поток в objectMode. Это действительно важно, так как любой поток не objectModeбудет отказываться от записи, передачи или чтения объекта.

Как и в стороне, objectModeчувствует себя очень недоделанным в новых потоках. Я могу понять причину, но моя интуиция в том, что потоки должны просто работать с «данными». Проведение различия между объектами и текстом кажется неестественным.

Наш конструктор также устанавливает несколько переменных для сохранения состояния в нашем потоке. Здесь s3переменная является клиентом knox, используемым для доступа к корзинам s3.

function S3Lister (s3, options) {
  options || (options = {});
  stream.Readable.call(this, { objectMode : true });

  this.s3 = s3; // a knox-like client.
  this.marker = options.start;
  this.connecting = false;
  this.ended  = false;
}

util.inherits(S3Lister, stream.Readable);

Мы устанавливаем поток для использования, objectModeпоскольку хотим возвращать не только имена файлов, но и некоторые метаданные о каждом из них.

Далее мы должны переопределить ._readфункцию. Всякий раз, когда программа готова читать данные из вашего читаемого потока, вызывается внутренняя реализация ._read. Вот где ваш код вступает во владение.

Нокс s3-клиент имеет list()функцию , но это основной вызов API Amazon. Они ограничивают нас в получении метаданных для 1000 файлов одновременно . Чтобы составить список всего содержимого корзины s3, вам нужно s3.list()каждый раз повторно вызывать с другим стартовым маркером.

Каждый раз, когда клиент получает ответ, он возвращает флаг, есть ли еще данные для чтения. Затем мы можем обновить маркер начала ключом для последнего полученного нами файла.

Вот наша ._readфункция вместе с помощником для составления списка файлов.

S3Lister.prototype._read = function () {
  // Limit _read to call only once per response
  if (this.connecting || this.ended) return;

  var options = {
    prefix     : this.options.prefix,
    marker     : this.marker,
    delimiter  : this.options.delimiter,
    'max-keys' : this.options.maxKeys
  };

  this._list(options);
};

S3Lister.prototype._list = function (options) {
  var self = this;
  this.connecting = true; // ensure that we have only a single connection

  this.s3.list(options, function (err, data) {
    self.connecting = false;
    if (err) return self.emit('error', err);

    var files = data.Contents;

    // if there's still more data, set the start as the last file
    if (data.IsTruncated) self.marker = files[files.length - 1].Key;
    else self.ended = true;

    files.forEach(function (file) { self.push(file); });
    if (self.ended) self.push(null);
  });
};

Чтобы отправить данные читателю, вам нужно звонить .pushс каждым фрагментом данных. В нашем случае это метаданные файла, которые мы получили.

Когда считыватель полностью завершит работу, вы захотите .push(null)сообщить об окончании потока.

Одна важная ошибка заключается в том, что ._readона будет вызвана сразу после того, .pushкак была вызвана один раз. Если вы пишете несколько фрагментов для одного ._read, вам нужно позаботиться о том, чтобы у вас был какой-то флаг, который ограничивает ._readпродолжение. Я использовал connectingфлаг, чтобы гарантировать, что повторяющиеся запросы не передаются на s3, и я обязательно установлю маркер, прежде чем нажимать какие-либо куски.

2. Прочитайте файлы с S3.

Итак, теперь у нас есть читаемый поток, который будет генерировать метаданные для каждого из наших файлов журнала. Теперь мы на самом деле хотим передать содержимое этих файлов.

Для этого шага мы будем использовать поток преобразования. Помните, что потоки преобразования фильтруют или отображают входные данные. Вы захотите использовать его всякий раз, когда входные данные связаны с выводом.

В нашем случае каждый кусок файла отображается на его содержимое, поэтому поток преобразования будет работать хорошо.

Еще раз, мы настроили наш поток на наследование от базового преобразования и установили objectMode в доступной для записи части потока.

function S3Cat(s3, options) {
  stream.Transform.call(this);
  this._writableState.objectMode = true;
  this.s3 = s3;
  this.options = options;
}

util.inherits(S3Cat, stream.Transform);

Мы должны сделать хакерскую вещь, чтобы установить только вход, objectModeтак как поток будет получать объекты javascript, но выходные буферы. Я действительно надеюсь, что это будет исправлено в следующей версии потоков для использования публичного API. Есть существующие проблемы для этого, но, кажется, было небольшое движение по этому вопросу .

Next, we implement the ._transform function. Transform streams will call ._transform once per chunk of the input stream. Since our earlier stream outputs chunks of single-file metadata, each of our input chunks will be a file object.

For each chunk, we’ll use knox to retrieve the contents of the file, and then push those into our output buffer. Once we’re done, we call the callback function.

S3Cat.prototype._transform = function (file, encoding, callback) {
  var self   = this
    , path   = file.Key;

  this.s3.getFile(path, function (err, res) {
    if (err) return callback(err);

    if (self.options.gzip) res = res.pipe(zlib.createGunzip());

    res.on('data',  function (data) { self.push(data); })
       .on('error', function (err)  { callback(err);   })
       .on('end',   function ()     { callback();      });
  });
};

Since our files are gzipped, we can first pipe our response through a Gunzip stream to make sure that we are sending uncompressd text to our output.

You might notice that this stream is technically not doing everything it could to respect the backpressure of the output stream! A complete example would pause the response stream whenever .push returned false.

Thankfully, we do get some benefit from built-in backpressure. Our stream won’t call ._transform until it detects that the output buffer has returned below its highWaterMark.

If you want to specifically tune the memory of any of your buffered streams, you can specifically pass a highWaterMark option to the constructor. For streams in objectMode, this will be the number of objects instead of the buffer size.

3. Split the files into individual lines.

At this point we’re able to stream the entire contents of an s3 bucket with only a few lines of code. However, we still want to convert those lines of JSON into actual javascript objects.

To do this, we can use another set of transform streams. The first should take in chunks of text and output individual lines. A toy implementation looks something like this:

LineSplitter.prototype._transform = function (chunk, encoding, callback) {
  chunk = this.current + chunk.toString();
  var split = chunk.split('\n');
  this.current = split.pop();

  var self = this;
  split.forEach(function(line) { self.push(line); });
  callback();
};

LineSplitter.prototype._flush = function (callback) {
  this.push(this.current);
  callback();
}

What’s going on here? Each input chunk isn’t necessarily guaranteed to be at a line boundary. We have to do a little bookkeeping to keep track of the current chunk and then add new chunks to it.

Once we’ve added our current string to the chunk, we split the complete buffer by our delimiter. We’ll pop off the last chunk of the split and set it as our current half-complete line. We can then push all of our complete lines into the output buffer right away.

Transform streams also contain a special ._flush method which is called after the input has finished. In our case, we’ll want to push the final line of our file into the output once we are sure that there are no more delimiters to be found.

4. Parse the JSON strings.

We’re almost there, we’ve got files streaming from s3 and split into nice, manageable lines of JSON.

Using another transform stream, the JSON parser is trivial to write. This time, you’ll want to be sure set the readableState into objectMode so that the strings can be properly converted to objects.

JSONParser.prototype._transform = function (chunk, encoding, callback) {
  try {
    var parsed = JSON.parse(chunk.toString());
    this.push(parsed);
    callback();
  } catch (err) {
    callback(err);
  }
}

5. Write the output.

Finally, we’ll want to use a writable stream to actually make use of our parsed log data! We’ll inherit from the Writable stream, and override the ._write method. Once again, we’ll enable objectMode since we’re receiving the results of parsed JSON.

function Uploader () {
  stream.Writable.call(this, { objectMode : true });
}

util.inherits(Uploader, stream.Writable);

Uploader.prototype._write = function (data, encoding, callback) {
  api.upload({
    event     : data.event,
    timestamp : data.timestamp
  }, callback);
};

That’s it! We can then pipe to our uploader and start replaying our logged data to whatever source we’d like!

Now let’s put it all together!

The best part of this is that we can now pipe these streams together to provide a single ‘Readable’ interface to our program! Combined with node’s module pattern, this is a really powerful way to hide interface complexity.

Here’s what our replay module can look like:

module.exports = function (s3) {
  return new S3Lister(s3)                 // list all the files in our bucket
    .pipe(new S3Cat(s3, { gzip : true })) // stream the file contents
    .pipe(new LineSplitter()              // split by lines
    .pipe(new JSONParser());              // parse each line
}

It returns a readable stream, which we can use wherever we’d like! The possibilities are endless.

var replay = require('replay')
  , knox   = require('knox');

var s3 = knox.createClient({ ... });

// We can pipe to our custom uploader!
replay(s3).pipe(new Uploader());

// Or maybe print all of our data to the console!
replay(s3).on('data', function (data) { console.log(data); });

Since each stream implements only a single function, we can compose them however we’d like. Streaming all of our logs to our hard drive is just as easy:

var stream = new S3Lister(s3)
              .pipe(new S3Cat(s3))
              .pipe(fs.createWriteStream('./my-file.log');

stream.on('finish', function () { console.log('Logs finished writing!'); });

Look at the magic of that .pipe! Now, you’re thinking with streams!

Don’t stop there…

So now that we’ve cut our teeth on new streams, where do we go from here?

Your first stop should be Dominic Tarr’s pull-stream for a different take on streams in node.

While core streams rely on the source pushing data, pull-streams take the opposite approach. The sink drives the interaction by ‘pulling’ data from the source. That means no backpressure problems, and (by design) no differentiation between object streams and text streams.

This is particularly useful in the case where you want to add concurrent processing to your streams. For instance, perhaps I don’t care about the exact ordering of my logs, but would like to process 10 lines at a time. Instead of building an awkward transform stream to batch multiple items at a time, I can use a parallel pull stream.

For general streaming goodness, it’s also worth looking at Substack’s stream-handbook and new stream-adventure. There’s some seriously cool streaming applications to be found there.

Personally, I hope to see streams get rid of objectMode. There are also quite a few gotchas related to how streams are implemented internally which tripped me up. I could only solve them once I started reading through the stream source itself. Having better stream documentation would go a long way to help solve this problem.

I do like the fact that new streams are compact and simple to implement correctly (once you’ve got the hang of them). For that, I really applaud Isaacs’ work in trying to tackle the problem of making streams which handle edge cases easy to implement.

I don’t think streams even necessarily have to live in the node core, though I’d like to see a few common implementations which are well-written and play nicely together.

Overall, I’ve found the simplicity and composability of streams to be one of node’s nicest features. I’m very excited to see the community actually start trying out the new streams and improving them.