素人使用 Node.js 进行大规模数据的逐步处理
我开始觉得Node.js的回调地狱很有趣了。
我想做的事情
将文本文件按换行符分隔,并插入到MongoDB中。
很简单的。使用 readline 来进行行分割。
var mongo = require('mongodb'),
fs = require('fs'),
readline = require('readline');
rs = fs.ReadStream('test.txt');
rl = readline.createInterface({'input': rs, 'output': {}});
rl.on( 'line', function (line) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
});
});
});
});
rl.resume();
挺容易的,不是吗?
使用fs从文件中读取
使用readline从流中输入
由于每行都会调用readline::on方法,因此将获取的数据转换为json
连接到MongoDB
获取集合
插入数据
直到所有行都结束之前,重复执行。
是的,非常顺利!只有当数据量较少时!
当数据量大时会发生错误。
根据日志显示,当流通了十万个数据时,似乎没有执行数据库连接处理,而是一次处理了十万行记录,并将其排入任务队列。
当然会被吓坏啊!!
如果不以异步方式进行行读取,
而是逐步处理直至将数据插入到数据库中,
虽然会消耗掉 Node 的优势,但仍然可以实现预期的处理。
逐步处理该怎么做?
根据读取线索来看,readline似乎是异步固定的,所以我放弃使用它,而是选择将文件全部读取并按换行符进行拆分,然后再通过foreach循环遍历。
var mongo = require('mongodb'),
fs = require('fs');
fs.readFileSync('test.txt').toString().split('\n').forEach(function (line) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
});
});
});
});
可是不行啊!!
非常感谢,`readFileSync` 是同步读取文件的,但是之后的 `forEach` 是异步的。所以和之前一样,而且由于没有使用流式传输,内存开销也很大,是不可取的。
在中文中,有一个叫做”async”的东西。
根据名字来看,我原以为这是一个异步处理的库
但它似乎是一个同时支持异步和同步的库!太厉害了!
因此,我会使用它的forEachSeries
从名字上看,非常完美!
var mongo = require('mongodb'),
fs = require('fs');
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
cb();
});
});
});
});
对于async::forEachSeries函数,由于它会阻塞直到调用回调函数的第二个参数(在这里是cb),因此我们可以在完成一次插入并关闭数据库后调用cb(),以实现逐步处理,避免任务积压。
虽然持久了一段时间,但还是不行…
为什么?
我的假设是,执行db.close()并调用cb(),然后进行下一个数据处理,但是db.close()是异步的?垃圾收集器????虽然我不太清楚原因,但是在db.close()和调用回调之间插入了一个类似于睡眠的操作,问题就解决了。(我还想进一步调查这个问题,如果有人知道,请告诉我!!)
最终来源 (zuì
var mongo = require('mongodb'),
fs = require('fs');
function sleep(milliSeconds) {
var startTime = new Date().getTime();
while (new Date().getTime() < startTime + milliSeconds);
}
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
sleep(50);
cb();
});
});
})
});
ファイナルアンサー?
shuheiさんの指摘より、DBへのconnect closeは毎回必要なのか。
そう、nodeの場合、forEachは非同期で走るため、毎回別のセッションなので
connectと closeは必要なのですが
最終形態は、forEachSeriesを使い 同期処理になってます
这意味着只需要连接一次,因此,我的最后答案是:没有必要再连接关闭一次。
而且,如果在插入时发生错误,似乎可以通过回调函数将错误传递给err并在那时中断。
太方便了!
在使用forEachSeries时,可以通过其第三个参数来处理循环结束时需要执行的操作。
太棒了!Node.js越来越实用了!非常感谢大家!
var mongo = require('mongodb'),
fs = require('fs');
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
cb(err);
});
},
function(){db.close();
});
});
});
睡觉不必要了,只要建立一次连接就可以了。源代码也变得简单,插入操作原本需要30分钟的时间,现在只需要大约10秒就可以完成了。
但愿 readline 有一个同步方法(可能有的话我再查一下)真的非常感谢你。