When I started out using node.js and async
I didn't find any good/thorough resources on how to really use the
async module. That's why I decided to make a little cookbook about it.
What is async solving? An antipattern
Async and other similar Node.js control flow modules seek to simplify code such as this:
//DON'T DO THIS AT HOME!
app.get('/user/:userId', function(req, res, next) {
var locals = {};
var userId = req.params.userId;
var callbackCounter = 0;
var gotError = false;
db.get('users', userId, function(err, user) {
if (gotError) {
return;
}
if (err) {
gotError = true;
return next(err);
}
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callbackCounter++;
if (callbackCounter == 2) {
res.render('user-profile', locals);
}
});
db.query('posts', {userId: userId}, function(err, posts) {
if (gotError) {
return;
}
if (err) {
gotError = true;
return next(err);
}
locals.posts = posts;
callbackCounter++;
if (callbackCounter == 2) {
res.render('user-profile', locals);
}
});
});
Since the two db calls are asynchronous we don't know
which one of them is going to finish first. So we have to use
callbackCounter to keep track of how many tasks have finished. If an
error occurs we also have to handle this in a special way for each task.
And we have code duplication.
And what happens when we need to add another asynchronous task? Then we need to change if (callbackCounter == 2)
to if (callbackCounter == 3)
, which won't be fun to maintain in the long run.
This
is where async comes to our aid and makes the code sane to look at and
easy to maintain. In this post I'll give you some pointers on how how to
use async in real life.
Important note about callbacks and errors
One thing that wasn't obvious to me when I first looked at async, was the way callbacks are used.
Generally
all the async functions take a set of tasks to perform as argument.
These tasks can for example be an array of functions, or a collection to
iterate over. Each task is given a callback function, let's call this
the task callback. This callback must be called when the task is completed, e.g. after an asynchronous call to the database has completed.
Besides the set of tasks the async functions also take a callback function as argument, let's call this the final callback. The final callback is called when all tasks have completed, i.e. called their respective task callback functions.
Example:
async.parallel([
function(callback) { //This is the first task, and callback is its callback task
db.save('xxx', 'a', function(err) {
//Now we have saved to the DB, so let's tell async that this task is done
callback();
});
},
function(callback) { //This is the second task, and callback is its callback task
db.save('xxx', 'b', callback); //Since we don't do anything interesting in db.save()'s callback, we might as well just pass in the task callback
}
], function(err) { //This is the final callback
console.log('Both a and b are saved now');
});
If a task encounters an error, the best thing is to call the task callback with the error object as the first argument.
When
a task callbacks with an error, the final callback will be called
immediately with the error object, and no more outstanding tasks will be
initiated.
Example:
async.parallel([
function(callback) {
db.save('xxx', 'a', function(err) {
if (err) {
callback(err);
return; //It's important to return so that the task callback isn't called twice
}
callback();
});
},
function(callback) {
db.save('xxx', 'b', callback); //If we just pass in the task callback, it will automatically be called with an eror, if the db.save() call fails
}
], function(err) { // finally, called at the end
if (err) {
throw err; //Or pass it on to an outer callback, log it or whatever suits your needs
}
console.log('Both a and b are saved now');
});
The 4 lines of error handling gets pretty tedious. So I prefer to put those lines on one line, as in:
if (err) return callback(err);
Note about modules used implicitly in this post
In this post's examples I'm using some node modules implicitly.
async
(of course). In your own script you should use var async = require('async');
to include it.- Express.js is used as an http server.
db
is a fictionary database module. It has db.get(bucket, key, callback)
and db.query(bucket, properties, callback)
methods that are supposed to work like any normal NoSQL database.- Underscore.js used as
_
.
Now let's get on to the interesting stuff!
I need to run multiple tasks that doesn't depend on each other and when they all finish do something else
Then you should use async.parallel.
An example could be to load a forum user's profile with his details and a list of all his posts.
As input we get the user's ID, so we can easily get both user details and posts independently of each other.
app.get('/user/:userId', function(req, res, next) {
var locals = {};
var userId = req.params.userId;
async.parallel([
//Load user
function(callback) {
db.get('users', userId, function(err, user) {
if (err) return callback(err);
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
}
], function(err) { //This function gets called after the two tasks have called their "task callbacks"
if (err) return next(err); //If an error occured, we let express/connect handle it by calling the "next" function
//Here locals will be populated with 'user' and 'posts'
res.render('user-profile', locals);
});
});
If you have more than two tasks to run, you just add to the tasks array.
I need to run multiple tasks that depends on each other and when they all finish do something else
Then you should use async.series.
Again we will use the forum user example.
This
time we get the user's name as input, but our data model is the same as
before. This means that we need to find the user's id based on name
before we can load the posts.
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId; //Define userId out here, so both tasks can access the variable
async.series([
//Load user to get userId first
function(callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.');
}
var user = users[0];
userId = user.id; //Set the userId here, so the next task can access it
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts (won't be called before task 1's "task callback" has been called)
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
}
], function(err) { //This function gets called after the two tasks have called their "task callbacks"
if (err) return next(err);
//Here locals will be populated with 'user' and 'posts'
res.render('user-profile', locals);
});
});
In this example you don't gain that much from using
async.series, since you only have two tasks to run. The above example
could be somewhat simplified to the following: (you can also do a nested call back)
app.get('/user/:name', function(req, res, next) {
var name = req.params.name;
db.query('users', {name: name}, function(err, users) {
if (err) return next(err);
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.');
}
var user = users[0];
db.query('posts', {userId: user.id}, function(err, posts) {
if (err) return next(err);
locals.posts = posts;
res.render('user-profile', {
user: {
name: user.name,
email: user.email,
bio: user.bio
},
posts: posts
});
});
});
});
But what happens when you suddenly need to run 3
different tasks? Or even more? Then you end up cooking callback
spaghetti (bad).
IMO when you have two levels you can use either solution. If you have more, always go with async.series.
I need to iterate over a collection, perform an asynchronous task for each item, and when they're all done do something else
Then you use async.forEach.
An
example could be to have a webservice where you support deleting
multiple messages in one request. You get the message IDs as a comma
separated string in the URL. Each deletion requires a call to the
database. When all deletions have completed you want to reply the user
with a response.
app.delete('/messages/:messageIds', function(req, res, next) {
var messageIds = req.params.messageIds.split(',');
async.forEach(messageIds, function(messageId, callback) { //The second argument (callback) is the "task callback" for a specific messageId
db.delete('messages', messageId, callback); //When the db has deleted the item it will call the "task callback". This way async knows which items in the collection have finished
}, function(err) {
if (err) return next(err);
//Tell the user about the great success
res.json({
success: true,
message: messageIds.length+' message(s) was deleted.'
});
});
});
Bonus tip: If you ever need to iterate over an object, there is an easy way too. You just use Object.keys(o)
(or _.keys(o)
if you prefer Underscore.js) on the object, and iterate over the keys. Example:
var trafficLightActions = {
red: 'Stop',
yellow: 'Wait',
green: 'Go'
}
async.forEach(Object.keys(trafficLightActions), function(color, callback) { //The second argument (callback) is the "task callback" for a specific messageId
var action = trafficLightActions[color];
//Play around with the color and action
}, function(err) {
//When done
});
I need to iterate over a collection, perform an
asynchronous task for each item, but only let x tasks run at the same
time, and when they're all done do something else
But what if
your database only allows a limited number of connections at a time, and
your user might delete thousands of messages in a single request? Then
you use async.forEach's brother async.forEachLimit.
async.forEachLimit
takes three arguments: a collection, a concurrency value, and the
iterator function. The concurrency value is an integer that tells async
how many tasks that should be running at a time. Let's say that our
database only allows 5 connections at a time, then we simply change our
code to:
app.delete('/messages/:messageIds', function(req, res, next) {
var messageIds = req.params.messageIds.split(',');
async.forEachLimit(messageIds, 5, function(messageId, callback) {
db.delete('messages', messageId, callback);
}, function(err) {
if (err) return next(err);
res.json({
success: true,
message: messageIds.length+' message(s) was deleted.'
});
});
});
If you are working with large collections it's
normally a good idea to use async.forEachLimit over async.forEach to
throttle i/o resources.
I need to iterate over a collection,
perform an asynchronous task for one item at a time, and when they're
all done do something else
The third async.forEach brother is async.forEachSeries, which does the same as async.forEachLimit with a concurrency of 1.
You
can use this if it's important that the task of one item finishes
before the task of the next one is started. I can't really think of any
use case where this would be a requirement, except for throttling i/o
resources.
I need to perform an arbitrary set of asynchronous tasks
Then you should use async.queue.
The
syntax of async.queue is a little different than the other functions.
async.queue takes two arguments: A task function and a concurrency
value.
The task function itself should take two arguments. The
first is the task to be performed. This can be anything that the
function can use to perform its task. Second argument is a callback,
which will be the task callback if we use the same terminology as earlier, that should be called when the task is done.
The
concurrency value is just like the one from async.forEachLimit, i.e. it
limits how many tasks that can be under execution at a time.
async.queue returns an object where you can push tasks to, using queue.push(task)
. Read about the other properties of the object on the Github page. The most useful property is drain
.
If you set this to a function it will be called everytime the queue's
last task has been processed, which is very useful for performing an
action when queue processing is done. Look at it as async.queue's final callback.
A
good example of using a queue is when your input is streamed from
another source, which makes it difficult to use async.forEach. An
example could be to copy all objects from one AWS S3
bucket to another. Since AWS only lets you list 1000 objects at a time,
you can't get a single array with all object names from the source
bucket at once. You have to list 1000 objects at a time, and give the
last object name from the previous response as the marker in
the next request (just like pagination). You could choose to load all
object names into a single array first, but then you'd have to list all
objects, and not until they've all been listed you can start copying -
but that would be a terrible waste of valuable time.
A smarter way
is to set up an async.queue, and add object names to the queue as we
get them from the list. As I said, a queue task can be anything. In this
case an S3 object name is a task.
Let's get some code on the table. In this example I'm using the API of Apps Attic's awssum module for AWS services (awesome name by the way).
//Prepare S3 access and bucket names
var awssum = require('awssum');
var s3 = new awssum.load('amazon/s3').S3({
accessKeyId: '...',
secretAccessKey: '..',
});
var sourceBucket = 'old-bucket';
var destinationBucket = 'new-bucket';
var listObjectsDone = false;
//Set up our queue
var queue = async.queue(function(objectName, callback) {
//This is the queue's task function
//It copies objectName from source- to destination bucket
var options = {
BucketName: destinationBucket,
ObjectName: objectName,
SourceBucket: sourceBucket,
SourceObject: objectName
};
s3.CopyObject(options, function(err) {
if (err) throw err;
callback(); //Tell async that this queue item has been processed
});
}, 20); //Only allow 20 copy requests at a time
//When the queue is emptied we want to check if we're done
queue.drain = function() {
checkDone();
};
//Define the function that lists objects from the source bucket
function listObjects(marker) {
var options = {
BucketName: sourceBucket,
Marker: marker,
MaxKeys: 1000
};
s3.ListObjects(options, function(err, data) {
if (err) throw err;
var result = data.Body.ListBucketResult;
var contents = _.isArray(result.Contents) ? result.Contents : [result.Contents]; //AWS sends an array if multiple, and a single object if there was only one result
_.each(contents, function(item) {
var objectName = item.Key;
marker = objectName; //Save the marker
queue.push(objectName); //Push the object to our queue
});
if (result.IsTruncated == 'true') {
//The result is truncated, i.e. we have to list once more from the new marker
listObjects(marker);
} else {
listObjectsDone = true; //Tell our routine that we don't need to wait for more objects from S3
checkDone();
}
});
}
//This function gets called when a) list didn't return a truncated result (because we were at the end), and b) when the last task of the queue is finished
function checkDone() {
if (queue.length() == 0 && listObjectsDone) {
console.log('Tada! All objects have been copied :)');
}
}
//Start the routine by calling listObjects with null as the marker
listObjects(null);
Note that
the queue can be drained multiple times, and thereby call queue.drain()
multiple times. This would for example happen if our copy requests
finished much faster than each list operation. That's why we have the listObjectsDone
boolean. Even if the queue is empty we're not done until this variable gets set to true.
One
missing feature of async.queue that you should be aware of is that the
task callbacks do not support being passed an error as its first
argument. Check out this example:
var counter = 0;
var queue = async.queue(function(shouldFail, callback) {
counter++;
console.log(counter);
if (shouldFail) {
callback(new Error('An error just for fun.')); //Nobody will handle this error
} else {
callback();
}
}, 1);
queue.push(false);
queue.push(true);
queue.push(false);
I would expect this to print out 1 and 2, and then I would see the error somewhere. But it will print out 1, 2 and 3.
If I get some free time I will see if I'm able to contribute to async to get support for this use case in some way.
Combination:
I need to perform some parallel tasks, some serial tasks and iterate
over a collection performing an asynchrounous task for each item
Then you use a combination of async.parallel, async.series, and async.forEach.
An
example could be to load a forum user by name, his posts, and his
photos. If we have the same data model as before we need to look up the
user's id based on name before we can load his posts and photos (which
are both stored using the userId) in parallel. Moreover we also have to
check that each of the photos exists on the disk.
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId;
async.series([
//Load user to get userId first
function(callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.');
}
var user = users[0];
userId = user.id; //Set the userId here, so the next tasks can access it
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts and photos in parallel (won't be called before task 1's "task callback" has been called)
function(callback) {
async.parallel([
//Load posts
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
},
//Load photos
function(callback) {
db.query('photos', {userId: userId}, function(err, photos) {
if (err) return callback(err);
locals.photos = [];
//Iterate over each photo
async.forEach(photos, function(photo, callback) {
fs.exists(photo.path, function(exists) {
//Only add the photo to locals.photos if it exists on disk
if (exists) {
locals.photos.push(photo);
}
callback();
});
}, callback);
});
}
], callback); //Remember to put in the second series task's "task callback" as the "final callback" for the async.parallel operation
}
], function(err) { //This function gets called after the two series tasks have called their "task callbacks"
if (err) return next(err);
//Here locals will be populated with 'user', 'posts' and 'photos
res.render('user-profile', locals);
});
});
You can nest and combine async.parallel and
async.series as crazy as you want. A good trick when you find yourself
nesting too deep is to divide the code into multiple functions. The
above example could be changed to this:
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId;
async.series([
//Load user
function(callback) {
loadUserByName(name, function(err, user) {
if (err) return callback(err);
userId = user;
locals.user = user;
});
},
function(callback) {
async.parallel([
//Load posts
function(callback) {
loadPostsByUserId(userId, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
},
//Load photos
function(callback) {
loadPhotosByUserId(userId, function(err, photos) {
if (err) return callback(err);
locals.photos = photos;
callback();
});
}
], callback);
}
], function(err) {
if (err) return next(err);
res.render('user-profile', locals);
});
});
function loadUserByName(name, callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.');
}
var user = users[0];
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback(null, user);
});
}
function loadPostsByUserId(userId, callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
callback(null, posts);
});
}
function loadPhotosByUserId(userId, callback) {
db.query('photos', {userId: userId}, function(err, photos) {
if (err) return callback(err);
var photos = [];
async.forEach(photos, function(photo, callback) {
fs.exists(photo.path, function(exists) {
if (exists) {
photos.push(photo);
}
callback();
});
}, function(err) {
if (err) return callback(err);
callback(null, photos);
});
});
}
This makes your code look much more "flat" and less nested. The main logic in the app.get('/user/:name'...
part looks much more readable, since each of the functions nicely describes what it's supposed to do.
Your coworkers will like you better if you write your code like this.
That's all I had to say about that. Big thanks to Caolan McMahon for making this fantastic node.js module.
If you have any comments, or feel like I left something out, I'd love to hear from you.