Google предоставляет довольно интересные данные о гриппе в формате CSV , и я хотел отобразить их на графике в Dash . Однако необработанные данные не совсем подходят для моих нужд:
- Он содержит несколько вводных / заголовочных текстов (авторские права, описание данных и т. Д.), А Dash нужны только необработанные данные.
- Он показывает десятки штатов / регионов / городов, и я просто хочу показать общие данные США и мой родной штат.
К счастью, Dash может читать данные с любой общедоступной конечной точки, поэтому я решил собрать быстрое
приложение Node.js , чтобы преобразовать данные в то, что мне нужно. Наиболее простым решением было, вероятно, загрузить весь файл, прочитать его построчно, создать массив данных, а затем записать его. А поскольку объем данных в настоящее время составляет чуть менее 400 КБ, возможно, это было бы хорошо. Но лучшая модель (и более увлекательная, IMO) — это использование потоков узлов. Пока мы используем потоки на протяжении всего процесса, мы можем быть уверены, что в любой момент времени в памяти сохраняется только небольшой буфер.
Если вы просто хотите увидеть полное приложение,
оно на GitHub . В противном случае, читайте дальше, чтобы увидеть мой мыслительный процесс.
Отфильтровать вступление / заголовок текста
Сначала мы напишем поток, который отфильтровывает информацию об авторском праве / обзоре и передает остальные:
var stream = require('stream')
, util = require('util')
function CleanIntro(options) {
stream.Transform.call(this, options)
}
util.inherits(CleanIntro, stream.Transform)
CleanIntro.prototype._transform = function (chunk, enc, cb) {
if (this.readingData) {
this.push(chunk, enc)
} else {
// Ignore all text until we find a line beginning with 'Date,''
var start = chunk.toString().search(/^Date,/m)
if (start !== -1) {
this.readingData = true
this.push(chunk.slice(start), enc)
}
}
cb()
}
Поток Transform просто берет данные, которые были переданы из другого потока, делает с ним все, что хочет, а затем выталкивает обратно все, что хочет. В нашем случае мы просто игнорируем что-либо до начала фактических данных, а затем отбрасываем оставшиеся данные обратно. Легко.
Разобрать данные CSV
Теперь, когда у нас есть фильтр для получения только необработанных данных CSV, мы можем начать его анализ. Есть много библиотек разбора CSV; Мне нравится
CSV-поток, потому что, ну, это поток. Итак, наш основной процесс — сделать HTTP-запрос, направить его в наш фильтр очистки заголовка, затем направить его в csv-stream и начать работать с данными:
var request = require('request')
, csv = require('csv-stream')
, util = require('util')
, _ = require('lodash')
, moment = require('moment')
, OutStream = require('./out-stream')
, CleanIntroFilter = require('./clean-intro-filter')
// Returns a Stream that emits CSV records from Google Flu Trends.
// options:
// - regions: an array of regions for which data should be generated.
// See http://www.google.org/flutrends/us/data.txt for possible values
module.exports = function (options) {
options = _.extend({
regions: ['United States']
}, options)
var earliest = moment().subtract('years', 1)
request('http://www.google.org/flutrends/us/data.txt')
.pipe(new CleanIntroFilter())
.pipe(csv.createStream({}))
.on('error',function(err){
// Oops, got an error
})
.on('data',function(data) {
var date = moment(data.Date)
// Only return data from the past year
if (date.isAfter(earliest) || date.isSame(earliest)) {
// Let's build the output String...
console.log(data.Date + ',' + _.map(options.regions, function (region) {
return data[region]
}).join())
}
})
.on('end', function () {
// Okay we're done, now what?
})
}
Хорошо, теперь мы приближаемся. Мы создали выход CSV, но что нам с ним делать? Поместить все это в массив и вернуть это?
НЕТ! Помните, что мы потеряем преимущества тонкой памяти потоков, если не будем использовать их до конца.
Выпиши в другой поток
Вместо этого давайте просто сделаем наш собственный поток для записи:
var stream = require('stream')
var OutStream = function() {
stream.Transform.call(this,{objectMode: false})
}
OutStream.prototype = Object.create(
stream.Transform.prototype, {constructor: {value: OutStream}} )
OutStream.prototype._transform = function(chunk, encoding, callback) {
this.push(chunk, encoding)
callback && callback()
}
OutStream.prototype.write = function () {
this._transform.apply(this, arguments)
}
OutStream.prototype.end = function () {
this._transform.apply(this, arguments)
this.emit('end')
}
И теперь наша функция синтаксического анализа может вернуть этот поток и записать в него:
module.exports = function (options) {
options = _.extend({
regions: ['United States']
}, options)
var out = new OutStream()
out.write('Date,' + options.regions.join())
var earliest = moment().subtract('years', 1)
request('http://www.google.org/flutrends/us/data.txt')
.pipe(new CleanIntroFilter())
.pipe(csv.createStream({}))
.on('error',function(err){
out.emit('error', err)
})
.on('data',function(data) {
var date = moment(data.Date)
// Only return data from the past year
if (date.isAfter(earliest) || date.isSame(earliest)) {
out.write(data.Date + ',' + _.map(options.regions, function (region) {
return data[region]
}).join())
}
})
.on('end', function () {
out.end()
})
return out
}
Подайте это
Наконец, мы будем использовать
Express для предоставления наших данных в качестве веб-конечной точки:
var express = require('express')
, data = require('./lib/data')
, _ = require('lodash')
var app = express()
app.get('/', function(req, res){
var options = {}
if (req.query.region) {
options.regions = _.isArray(req.query.region) ? req.query.region : [req.query.region]
}
res.setHeader('Content-Type', 'text/csv')
data(options)
.on('data', function (data) {
res.write(data)
res.write('\n')
})
.on('end', function (data) {
res.end()
})
.on('error', function (err) {
console.log('error: ', error)
})
})
var port = process.env.PORT || 5000
app.listen(port)
console.log('Listening on port ' + port)
Еще раз отметим, что как только мы получаем данные из нашего потока, мы манипулируем и записываем их в другой поток (в данном случае HTTP-ответ). Это удерживает нас от ненужного хранения большого количества данных в памяти.
Теперь, если мы запустим сервер, мы можем использовать curl, чтобы увидеть его в действии:
$ curl 'http://localhost:5000' Date,United States 2012-11-04,2492 2012-11-11,2913 2012-11-18,3040 2012-11-25,3641 2012-12-02,4427 [and so on] $ curl 'http://localhost:5000?region=United%20States®ion=Pennsylvania' Date,United States,Pennsylvania 2012-11-04,2492,2579 2012-11-11,2913,2889 2012-11-18,3040,2785 2012-11-25,3641,3248 2012-12-02,4427,3679 [and so on]
Пока сервер работает в каком-то месте, доступном для общественности, мы можем перейти к
Dash и подключить его к виджету Custom Chart, что даст нам что-то вроде этого:
Эй, похоже, декабрь и январь — большие месяцы для гриппа в США. Кто знал?
Хотите попробовать это сами? Полный исходный код этого приложения
находится на GitHub вместе с пошаговыми инструкциями по запуску проекта и созданию виджета в Dash. Наслаждайтесь!
