引子
在使用Node/JS编程的时候,经常会遇到这样的问题:有一连串的异步方法,需要按顺序执行,前后结果之间有依赖关系,形如(片断1):
1 2 3 4 5 6 7 8 9 10 11 12 13 |
asyncTask(initial, function (err, result) {//step 1 if (err) throw err; asyncTask(result, function (err, result2) {//step 2 if (err) throw err; asyncTask(result2, function (err, result3) {//final if (err) throw err; console.log(result3); }); }); }); |
之前也介绍过,这就是著名的回调地狱(Pyramid of Doom)。
Promise
解决回调嵌套及串行状态传送问题,是有规范可循的,如 CommonJS的Promise规范 。
Promise是对异步编程的一种抽象。它是一个代理对象,代表一个必须进行异步处理的函数返回的值或抛出的异常。
以实现较多的 Promise/A(thenable)来说:
- Promise作为一个对象,代表了一次任务的执行,它有三种状态:成功/失败/执行中
- Promise对象有一个 then接口(原型函数),形如:
then(fulfilledHandler, errorHandler, progressHandler)
。这三个参数在Promise对象完成任务时被执行,它们对应状态为成功/失败/执行中——我们可以把then方法等同于Promise对象的构造器,fulfilledHandler
、errorHandler
及progressHandler
可以对应到Promise.resolve(val)
、Promise.reject(reason)
及Promise.notify(update)
- Promise对象还有一个catch接口(原型函数),形如:
catch(onRejected)。
onRejected为错误处理的回调,接收从Promise.reject(reason)
传递过来的错误信息 - Promise对象暴露给调用方的then接口,是一个永远可以返回Promise对象自身的函数,所以then可以继续链式调用then,直到任务完成——简单说,Promise执行的结果可以传递给下一个Promise对象,这在异步任务的串行化中非常有用(并且我们不用担心这些Promise在执行任务时产生副作用,根据规范,每一个Promise与被调函数间都是相互独立的
- Promise对象的内部,维护一个队列,在构造执行链完成时,将待执行函数存入需要依次执行的任务——什么时候算完成呢?一般的Promise库实现,都需要在构造链的尾部调用一个done之类的函数,以明示构造链的终结。
Promise做为类和对象的细节,MDN上的描述更为详尽。回到实现层面,先来看一个开源的Promise框架,Q:
Q
Q是一个对Promise/A规范实现较为完备的开源框架。
针对前述的代码片断1 场景,Q提供了这样的可能性:Q.promise能将异步逻辑包装成一个thenable函数,从而注入它实现的回调函数,源码形如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Q.promise = promise; function promise(resolver) { if (typeof resolver !== "function") { throw new TypeError("resolver must be a function."); } var deferred = defer(); try { resolver(deferred.resolve, deferred.reject, deferred.notify); } catch (reason) { deferred.reject(reason); } return deferred.promise; } |
这个resolver就是我们的异步逻辑封装函数, 我们可以选择性的接收resolve和reject作为参数,并在异步方法完成/出错时进行回调,让Q获得流程控制权。
Q的异步串行例子
假设有两个文本文件:1.txt 和 2.txt,内容分别为:I'm the first text.\n
和I'm the second text.\n
。我们需要顺序且异步的读取它们,在全部读取完成/出错时,显示相应信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
var Q = require('q'); var path = require('path'); var fs = require('fs'); function readFile(previous, fileName) { return Q.promise(function (resolve, reject) { fs.readFile(path.join(process.cwd(), fileName), function (error, text) { if (error) { reject(new Error(error)); } else { resolve(previous + text.toString()); } }); }); } |
fs.readFile 读取文件,成功调用Q.defer.resolve,出错调用Q.defer.reject。readFile做为用于串联Promise对象的方法,提供了一个previous状态参数,用于累加上次执行的结果。有了这个方法,基于then的串行逻辑就能这样实现:
1 2 3 4 5 6 7 8 9 10 11 |
readFile('', '1.txt') .then(function (previous) { return readFile(previous, '2.txt'); }) .then(function (finalText) { console.log(finalText); }) .catch(function (error) { console.log(error); }) .done(); |
可以看出,thenable函数的链式调用总是能将上一个Promise.resolve的结果做为参数传入。
Async
Async 严格说起来不是一个Promise的实现,而是一个异步工具集(utilities),通过源码我们能看得很清楚,它导出了非常多的方法,集合/流程控制 都有涉及。
针对前述的代码片断1 场景,Async提供了若干种方法,挑两个有代表性的:
Async.waterfall
waterfall形如waterfall(tasks, [callback])
,tasks是一个function数组,[callback]参数是最终结果的回调。tasks数组里的函数按顺序执行,当前任务可以接收上一个任务执行的结果,看个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
var async = require('async'); var path = require('path'); var fs = require('fs'); function readFile4WaterFall(previous, fileName, callback) { fs.readFile(path.join(process.cwd(), fileName), function (error, text) { callback(error, previous + text.toString()); }); } async.waterfall( [ function (callback) { readFile4WaterFall('', '1.txt', callback) }, function (previous, callback) { readFile4WaterFall(previous, '2.txt', callback); } ], function (err, result) { console.log(result); } ); |
可以看出,不管是何种形式的异步流程控制,都需要注入实现的回调(这里是function(callback)),以获取流程控制权。运行结果:
1 2 |
I'm the first text. I'm the second text. |
Async.series
series形如series(tasks, [callback])
,和waterfall不同,tasks数组里的函数按顺序执行,每个任务只接受一个callback注入,并不能传递上一次任务执行的结果。每一个函数执行的结果,都被push到了一个result数组里,也就是[callback(error, result)]的第二个参数。例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
var async = require('async'); var path = require('path'); var fs = require('fs'); function readFile4Series(fileName, callback) { fs.readFile(path.join(process.cwd(), fileName), function (error, text) { callback(error, text.toString()); }); } async.series( [ function (callback) { readFile4Series('1.txt', callback) }, function (callback) { readFile4Series('2.txt', callback); } ], function (err, result) { console.log(result); } ); |
运行结果:
1 |
[ 'I\'m the first text.\n', 'I\'m the second text.\n' ] |
动态的异步串行
老实说,上面的示例仅仅展示了异步框架的威力,却并不实用:在实践中,我们遇到的情况并不是事先构造好要执行的函数链,而是代码动态决定要执行哪些函数,即 .then 是动态拼接出来的。
以前示Q的片断2 为例,要读取的文件名存在数组里,我们需要针对文件名构造执行链:
1 2 3 4 |
//要读取的文件数组 var files = ['1.txt', '2.txt', '3.txt']; //要构造的Promise链 var tasks = []; |
readFile高阶函数需要稍加改造,因为不是显式的构造链,原 .then 传递上次执行的函数需要嵌入到高阶函数中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
function readFileDynamic(fileName) { return function(previous) { //.then callback return Q.promise(function (resolve, reject) { fs.readFile(path.join(process.cwd(), fileName), function (error, text) { if (error) { reject(new Error(error)); } else { resolve(previous + text.toString()); } }); }); } } |
构造任务链和执行链:
1 2 3 4 5 6 7 8 |
files.forEach(function (fileName) { tasks.push(readFileDynamic(fileName)); }); var result = Q(''); tasks.forEach(function (f) { result = result.then(f); }); |
调用:
1 2 3 4 5 6 7 8 |
result .then(function (finalText) { console.log(finalText); }) .catch(function (error) { console.log(error); }) .done(); |
如此,借助Q就可实现动态的异步串行链,本质和静态构造执行链无二致,只是Promise构造形式进行了转换。至于Async就更简单了,前述的 waterfall/series的 tasks本就是个数组,天然动态。