726 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			726 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| /**
 | |
|  * Support for concurrent task management and synchronization in web
 | |
|  * applications.
 | |
|  *
 | |
|  * @author Dave Longley
 | |
|  * @author David I. Lehn <dlehn@digitalbazaar.com>
 | |
|  *
 | |
|  * Copyright (c) 2009-2013 Digital Bazaar, Inc.
 | |
|  */
 | |
| var forge = require('./forge');
 | |
| require('./debug');
 | |
| require('./log');
 | |
| require('./util');
 | |
| 
 | |
| // logging category
 | |
| var cat = 'forge.task';
 | |
| 
 | |
| // verbose level
 | |
| // 0: off, 1: a little, 2: a whole lot
 | |
| // Verbose debug logging is surrounded by a level check to avoid the
 | |
| // performance issues with even calling the logging code regardless if it
 | |
| // is actually logged.  For performance reasons this should not be set to 2
 | |
| // for production use.
 | |
| // ex: if(sVL >= 2) forge.log.verbose(....)
 | |
| var sVL = 0;
 | |
| 
 | |
| // track tasks for debugging
 | |
| var sTasks = {};
 | |
| var sNextTaskId = 0;
 | |
| // debug access
 | |
| forge.debug.set(cat, 'tasks', sTasks);
 | |
| 
 | |
| // a map of task type to task queue
 | |
| var sTaskQueues = {};
 | |
| // debug access
 | |
| forge.debug.set(cat, 'queues', sTaskQueues);
 | |
| 
 | |
| // name for unnamed tasks
 | |
| var sNoTaskName = '?';
 | |
| 
 | |
| // maximum number of doNext() recursions before a context swap occurs
 | |
| // FIXME: might need to tweak this based on the browser
 | |
| var sMaxRecursions = 30;
 | |
| 
 | |
| // time slice for doing tasks before a context swap occurs
 | |
| // FIXME: might need to tweak this based on the browser
 | |
| var sTimeSlice = 20;
 | |
| 
 | |
| /**
 | |
|  * Task states.
 | |
|  *
 | |
|  * READY: ready to start processing
 | |
|  * RUNNING: task or a subtask is running
 | |
|  * BLOCKED: task is waiting to acquire N permits to continue
 | |
|  * SLEEPING: task is sleeping for a period of time
 | |
|  * DONE: task is done
 | |
|  * ERROR: task has an error
 | |
|  */
 | |
| var READY = 'ready';
 | |
| var RUNNING = 'running';
 | |
| var BLOCKED = 'blocked';
 | |
| var SLEEPING = 'sleeping';
 | |
| var DONE = 'done';
 | |
| var ERROR = 'error';
 | |
| 
 | |
| /**
 | |
|  * Task actions.  Used to control state transitions.
 | |
|  *
 | |
|  * STOP: stop processing
 | |
|  * START: start processing tasks
 | |
|  * BLOCK: block task from continuing until 1 or more permits are released
 | |
|  * UNBLOCK: release one or more permits
 | |
|  * SLEEP: sleep for a period of time
 | |
|  * WAKEUP: wakeup early from SLEEPING state
 | |
|  * CANCEL: cancel further tasks
 | |
|  * FAIL: a failure occured
 | |
|  */
 | |
| var STOP = 'stop';
 | |
| var START = 'start';
 | |
| var BLOCK = 'block';
 | |
| var UNBLOCK = 'unblock';
 | |
| var SLEEP = 'sleep';
 | |
| var WAKEUP = 'wakeup';
 | |
| var CANCEL = 'cancel';
 | |
| var FAIL = 'fail';
 | |
| 
 | |
| /**
 | |
|  * State transition table.
 | |
|  *
 | |
|  * nextState = sStateTable[currentState][action]
 | |
|  */
 | |
| var sStateTable = {};
 | |
| 
 | |
| sStateTable[READY] = {};
 | |
| sStateTable[READY][STOP] = READY;
 | |
| sStateTable[READY][START] = RUNNING;
 | |
| sStateTable[READY][CANCEL] = DONE;
 | |
| sStateTable[READY][FAIL] = ERROR;
 | |
| 
 | |
| sStateTable[RUNNING] = {};
 | |
| sStateTable[RUNNING][STOP] = READY;
 | |
| sStateTable[RUNNING][START] = RUNNING;
 | |
| sStateTable[RUNNING][BLOCK] = BLOCKED;
 | |
| sStateTable[RUNNING][UNBLOCK] = RUNNING;
 | |
| sStateTable[RUNNING][SLEEP] = SLEEPING;
 | |
| sStateTable[RUNNING][WAKEUP] = RUNNING;
 | |
| sStateTable[RUNNING][CANCEL] = DONE;
 | |
| sStateTable[RUNNING][FAIL] = ERROR;
 | |
| 
 | |
| sStateTable[BLOCKED] = {};
 | |
| sStateTable[BLOCKED][STOP] = BLOCKED;
 | |
| sStateTable[BLOCKED][START] = BLOCKED;
 | |
| sStateTable[BLOCKED][BLOCK] = BLOCKED;
 | |
| sStateTable[BLOCKED][UNBLOCK] = BLOCKED;
 | |
| sStateTable[BLOCKED][SLEEP] = BLOCKED;
 | |
| sStateTable[BLOCKED][WAKEUP] = BLOCKED;
 | |
| sStateTable[BLOCKED][CANCEL] = DONE;
 | |
| sStateTable[BLOCKED][FAIL] = ERROR;
 | |
| 
 | |
| sStateTable[SLEEPING] = {};
 | |
| sStateTable[SLEEPING][STOP] = SLEEPING;
 | |
| sStateTable[SLEEPING][START] = SLEEPING;
 | |
| sStateTable[SLEEPING][BLOCK] = SLEEPING;
 | |
| sStateTable[SLEEPING][UNBLOCK] = SLEEPING;
 | |
| sStateTable[SLEEPING][SLEEP] = SLEEPING;
 | |
| sStateTable[SLEEPING][WAKEUP] = SLEEPING;
 | |
| sStateTable[SLEEPING][CANCEL] = DONE;
 | |
| sStateTable[SLEEPING][FAIL] = ERROR;
 | |
| 
 | |
| sStateTable[DONE] = {};
 | |
| sStateTable[DONE][STOP] = DONE;
 | |
| sStateTable[DONE][START] = DONE;
 | |
| sStateTable[DONE][BLOCK] = DONE;
 | |
| sStateTable[DONE][UNBLOCK] = DONE;
 | |
| sStateTable[DONE][SLEEP] = DONE;
 | |
| sStateTable[DONE][WAKEUP] = DONE;
 | |
| sStateTable[DONE][CANCEL] = DONE;
 | |
| sStateTable[DONE][FAIL] = ERROR;
 | |
| 
 | |
| sStateTable[ERROR] = {};
 | |
| sStateTable[ERROR][STOP] = ERROR;
 | |
| sStateTable[ERROR][START] = ERROR;
 | |
| sStateTable[ERROR][BLOCK] = ERROR;
 | |
| sStateTable[ERROR][UNBLOCK] = ERROR;
 | |
| sStateTable[ERROR][SLEEP] = ERROR;
 | |
| sStateTable[ERROR][WAKEUP] = ERROR;
 | |
| sStateTable[ERROR][CANCEL] = ERROR;
 | |
| sStateTable[ERROR][FAIL] = ERROR;
 | |
| 
 | |
| /**
 | |
|  * Creates a new task.
 | |
|  *
 | |
|  * @param options options for this task
 | |
|  *   run: the run function for the task (required)
 | |
|  *   name: the run function for the task (optional)
 | |
|  *   parent: parent of this task (optional)
 | |
|  *
 | |
|  * @return the empty task.
 | |
|  */
 | |
| var Task = function(options) {
 | |
|   // task id
 | |
|   this.id = -1;
 | |
| 
 | |
|   // task name
 | |
|   this.name = options.name || sNoTaskName;
 | |
| 
 | |
|   // task has no parent
 | |
|   this.parent = options.parent || null;
 | |
| 
 | |
|   // save run function
 | |
|   this.run = options.run;
 | |
| 
 | |
|   // create a queue of subtasks to run
 | |
|   this.subtasks = [];
 | |
| 
 | |
|   // error flag
 | |
|   this.error = false;
 | |
| 
 | |
|   // state of the task
 | |
|   this.state = READY;
 | |
| 
 | |
|   // number of times the task has been blocked (also the number
 | |
|   // of permits needed to be released to continue running)
 | |
|   this.blocks = 0;
 | |
| 
 | |
|   // timeout id when sleeping
 | |
|   this.timeoutId = null;
 | |
| 
 | |
|   // no swap time yet
 | |
|   this.swapTime = null;
 | |
| 
 | |
|   // no user data
 | |
|   this.userData = null;
 | |
| 
 | |
|   // initialize task
 | |
|   // FIXME: deal with overflow
 | |
|   this.id = sNextTaskId++;
 | |
|   sTasks[this.id] = this;
 | |
|   if(sVL >= 1) {
 | |
|     forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Logs debug information on this task and the system state.
 | |
|  */
 | |
| Task.prototype.debug = function(msg) {
 | |
|   msg = msg || '';
 | |
|   forge.log.debug(cat, msg,
 | |
|     '[%s][%s] task:', this.id, this.name, this,
 | |
|     'subtasks:', this.subtasks.length,
 | |
|     'queue:', sTaskQueues);
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Adds a subtask to run after task.doNext() or task.fail() is called.
 | |
|  *
 | |
|  * @param name human readable name for this task (optional).
 | |
|  * @param subrun a function to run that takes the current task as
 | |
|  *          its first parameter.
 | |
|  *
 | |
|  * @return the current task (useful for chaining next() calls).
 | |
|  */
 | |
| Task.prototype.next = function(name, subrun) {
 | |
|   // juggle parameters if it looks like no name is given
 | |
|   if(typeof(name) === 'function') {
 | |
|     subrun = name;
 | |
| 
 | |
|     // inherit parent's name
 | |
|     name = this.name;
 | |
|   }
 | |
|   // create subtask, set parent to this task, propagate callbacks
 | |
|   var subtask = new Task({
 | |
|     run: subrun,
 | |
|     name: name,
 | |
|     parent: this
 | |
|   });
 | |
|   // start subtasks running
 | |
|   subtask.state = RUNNING;
 | |
|   subtask.type = this.type;
 | |
|   subtask.successCallback = this.successCallback || null;
 | |
|   subtask.failureCallback = this.failureCallback || null;
 | |
| 
 | |
|   // queue a new subtask
 | |
|   this.subtasks.push(subtask);
 | |
| 
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Adds subtasks to run in parallel after task.doNext() or task.fail()
 | |
|  * is called.
 | |
|  *
 | |
|  * @param name human readable name for this task (optional).
 | |
|  * @param subrun functions to run that take the current task as
 | |
|  *          their first parameter.
 | |
|  *
 | |
|  * @return the current task (useful for chaining next() calls).
 | |
|  */
 | |
| Task.prototype.parallel = function(name, subrun) {
 | |
|   // juggle parameters if it looks like no name is given
 | |
|   if(forge.util.isArray(name)) {
 | |
|     subrun = name;
 | |
| 
 | |
|     // inherit parent's name
 | |
|     name = this.name;
 | |
|   }
 | |
|   // Wrap parallel tasks in a regular task so they are started at the
 | |
|   // proper time.
 | |
|   return this.next(name, function(task) {
 | |
|     // block waiting for subtasks
 | |
|     var ptask = task;
 | |
|     ptask.block(subrun.length);
 | |
| 
 | |
|     // we pass the iterator from the loop below as a parameter
 | |
|     // to a function because it is otherwise included in the
 | |
|     // closure and changes as the loop changes -- causing i
 | |
|     // to always be set to its highest value
 | |
|     var startParallelTask = function(pname, pi) {
 | |
|       forge.task.start({
 | |
|         type: pname,
 | |
|         run: function(task) {
 | |
|            subrun[pi](task);
 | |
|         },
 | |
|         success: function(task) {
 | |
|            ptask.unblock();
 | |
|         },
 | |
|         failure: function(task) {
 | |
|            ptask.unblock();
 | |
|         }
 | |
|       });
 | |
|     };
 | |
| 
 | |
|     for(var i = 0; i < subrun.length; i++) {
 | |
|       // Type must be unique so task starts in parallel:
 | |
|       //    name + private string + task id + sub-task index
 | |
|       // start tasks in parallel and unblock when the finish
 | |
|       var pname = name + '__parallel-' + task.id + '-' + i;
 | |
|       var pi = i;
 | |
|       startParallelTask(pname, pi);
 | |
|     }
 | |
|   });
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Stops a running task.
 | |
|  */
 | |
| Task.prototype.stop = function() {
 | |
|   this.state = sStateTable[this.state][STOP];
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Starts running a task.
 | |
|  */
 | |
| Task.prototype.start = function() {
 | |
|   this.error = false;
 | |
|   this.state = sStateTable[this.state][START];
 | |
| 
 | |
|   // try to restart
 | |
|   if(this.state === RUNNING) {
 | |
|     this.start = new Date();
 | |
|     this.run(this);
 | |
|     runNext(this, 0);
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Blocks a task until it one or more permits have been released. The
 | |
|  * task will not resume until the requested number of permits have
 | |
|  * been released with call(s) to unblock().
 | |
|  *
 | |
|  * @param n number of permits to wait for(default: 1).
 | |
|  */
 | |
| Task.prototype.block = function(n) {
 | |
|   n = typeof(n) === 'undefined' ? 1 : n;
 | |
|   this.blocks += n;
 | |
|   if(this.blocks > 0) {
 | |
|     this.state = sStateTable[this.state][BLOCK];
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Releases a permit to unblock a task. If a task was blocked by
 | |
|  * requesting N permits via block(), then it will only continue
 | |
|  * running once enough permits have been released via unblock() calls.
 | |
|  *
 | |
|  * If multiple processes need to synchronize with a single task then
 | |
|  * use a condition variable (see forge.task.createCondition). It is
 | |
|  * an error to unblock a task more times than it has been blocked.
 | |
|  *
 | |
|  * @param n number of permits to release (default: 1).
 | |
|  *
 | |
|  * @return the current block count (task is unblocked when count is 0)
 | |
|  */
 | |
| Task.prototype.unblock = function(n) {
 | |
|   n = typeof(n) === 'undefined' ? 1 : n;
 | |
|   this.blocks -= n;
 | |
|   if(this.blocks === 0 && this.state !== DONE) {
 | |
|     this.state = RUNNING;
 | |
|     runNext(this, 0);
 | |
|   }
 | |
|   return this.blocks;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Sleep for a period of time before resuming tasks.
 | |
|  *
 | |
|  * @param n number of milliseconds to sleep (default: 0).
 | |
|  */
 | |
| Task.prototype.sleep = function(n) {
 | |
|   n = typeof(n) === 'undefined' ? 0 : n;
 | |
|   this.state = sStateTable[this.state][SLEEP];
 | |
|   var self = this;
 | |
|   this.timeoutId = setTimeout(function() {
 | |
|     self.timeoutId = null;
 | |
|     self.state = RUNNING;
 | |
|     runNext(self, 0);
 | |
|   }, n);
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Waits on a condition variable until notified. The next task will
 | |
|  * not be scheduled until notification. A condition variable can be
 | |
|  * created with forge.task.createCondition().
 | |
|  *
 | |
|  * Once cond.notify() is called, the task will continue.
 | |
|  *
 | |
|  * @param cond the condition variable to wait on.
 | |
|  */
 | |
| Task.prototype.wait = function(cond) {
 | |
|   cond.wait(this);
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * If sleeping, wakeup and continue running tasks.
 | |
|  */
 | |
| Task.prototype.wakeup = function() {
 | |
|   if(this.state === SLEEPING) {
 | |
|     cancelTimeout(this.timeoutId);
 | |
|     this.timeoutId = null;
 | |
|     this.state = RUNNING;
 | |
|     runNext(this, 0);
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Cancel all remaining subtasks of this task.
 | |
|  */
 | |
| Task.prototype.cancel = function() {
 | |
|   this.state = sStateTable[this.state][CANCEL];
 | |
|   // remove permits needed
 | |
|   this.permitsNeeded = 0;
 | |
|   // cancel timeouts
 | |
|   if(this.timeoutId !== null) {
 | |
|     cancelTimeout(this.timeoutId);
 | |
|     this.timeoutId = null;
 | |
|   }
 | |
|   // remove subtasks
 | |
|   this.subtasks = [];
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Finishes this task with failure and sets error flag. The entire
 | |
|  * task will be aborted unless the next task that should execute
 | |
|  * is passed as a parameter. This allows levels of subtasks to be
 | |
|  * skipped. For instance, to abort only this tasks's subtasks, then
 | |
|  * call fail(task.parent). To abort this task's subtasks and its
 | |
|  * parent's subtasks, call fail(task.parent.parent). To abort
 | |
|  * all tasks and simply call the task callback, call fail() or
 | |
|  * fail(null).
 | |
|  *
 | |
|  * The task callback (success or failure) will always, eventually, be
 | |
|  * called.
 | |
|  *
 | |
|  * @param next the task to continue at, or null to abort entirely.
 | |
|  */
 | |
| Task.prototype.fail = function(next) {
 | |
|   // set error flag
 | |
|   this.error = true;
 | |
| 
 | |
|   // finish task
 | |
|   finish(this, true);
 | |
| 
 | |
|   if(next) {
 | |
|     // propagate task info
 | |
|     next.error = this.error;
 | |
|     next.swapTime = this.swapTime;
 | |
|     next.userData = this.userData;
 | |
| 
 | |
|     // do next task as specified
 | |
|     runNext(next, 0);
 | |
|   } else {
 | |
|     if(this.parent !== null) {
 | |
|       // finish root task (ensures it is removed from task queue)
 | |
|       var parent = this.parent;
 | |
|       while(parent.parent !== null) {
 | |
|         // propagate task info
 | |
|         parent.error = this.error;
 | |
|         parent.swapTime = this.swapTime;
 | |
|         parent.userData = this.userData;
 | |
|         parent = parent.parent;
 | |
|       }
 | |
|       finish(parent, true);
 | |
|     }
 | |
| 
 | |
|     // call failure callback if one exists
 | |
|     if(this.failureCallback) {
 | |
|       this.failureCallback(this);
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Asynchronously start a task.
 | |
|  *
 | |
|  * @param task the task to start.
 | |
|  */
 | |
| var start = function(task) {
 | |
|   task.error = false;
 | |
|   task.state = sStateTable[task.state][START];
 | |
|   setTimeout(function() {
 | |
|     if(task.state === RUNNING) {
 | |
|       task.swapTime = +new Date();
 | |
|       task.run(task);
 | |
|       runNext(task, 0);
 | |
|     }
 | |
|   }, 0);
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Run the next subtask or finish this task.
 | |
|  *
 | |
|  * @param task the task to process.
 | |
|  * @param recurse the recursion count.
 | |
|  */
 | |
| var runNext = function(task, recurse) {
 | |
|   // get time since last context swap (ms), if enough time has passed set
 | |
|   // swap to true to indicate that doNext was performed asynchronously
 | |
|   // also, if recurse is too high do asynchronously
 | |
|   var swap =
 | |
|     (recurse > sMaxRecursions) ||
 | |
|     (+new Date() - task.swapTime) > sTimeSlice;
 | |
| 
 | |
|   var doNext = function(recurse) {
 | |
|     recurse++;
 | |
|     if(task.state === RUNNING) {
 | |
|       if(swap) {
 | |
|         // update swap time
 | |
|         task.swapTime = +new Date();
 | |
|       }
 | |
| 
 | |
|       if(task.subtasks.length > 0) {
 | |
|         // run next subtask
 | |
|         var subtask = task.subtasks.shift();
 | |
|         subtask.error = task.error;
 | |
|         subtask.swapTime = task.swapTime;
 | |
|         subtask.userData = task.userData;
 | |
|         subtask.run(subtask);
 | |
|         if(!subtask.error) {
 | |
|            runNext(subtask, recurse);
 | |
|         }
 | |
|       } else {
 | |
|         finish(task);
 | |
| 
 | |
|         if(!task.error) {
 | |
|           // chain back up and run parent
 | |
|           if(task.parent !== null) {
 | |
|             // propagate task info
 | |
|             task.parent.error = task.error;
 | |
|             task.parent.swapTime = task.swapTime;
 | |
|             task.parent.userData = task.userData;
 | |
| 
 | |
|             // no subtasks left, call run next subtask on parent
 | |
|             runNext(task.parent, recurse);
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   if(swap) {
 | |
|     // we're swapping, so run asynchronously
 | |
|     setTimeout(doNext, 0);
 | |
|   } else {
 | |
|     // not swapping, so run synchronously
 | |
|     doNext(recurse);
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Finishes a task and looks for the next task in the queue to start.
 | |
|  *
 | |
|  * @param task the task to finish.
 | |
|  * @param suppressCallbacks true to suppress callbacks.
 | |
|  */
 | |
| var finish = function(task, suppressCallbacks) {
 | |
|   // subtask is now done
 | |
|   task.state = DONE;
 | |
| 
 | |
|   delete sTasks[task.id];
 | |
|   if(sVL >= 1) {
 | |
|     forge.log.verbose(cat, '[%s][%s] finish',
 | |
|       task.id, task.name, task);
 | |
|   }
 | |
| 
 | |
|   // only do queue processing for root tasks
 | |
|   if(task.parent === null) {
 | |
|     // report error if queue is missing
 | |
|     if(!(task.type in sTaskQueues)) {
 | |
|       forge.log.error(cat,
 | |
|         '[%s][%s] task queue missing [%s]',
 | |
|         task.id, task.name, task.type);
 | |
|     } else if(sTaskQueues[task.type].length === 0) {
 | |
|       // report error if queue is empty
 | |
|       forge.log.error(cat,
 | |
|         '[%s][%s] task queue empty [%s]',
 | |
|         task.id, task.name, task.type);
 | |
|     } else if(sTaskQueues[task.type][0] !== task) {
 | |
|       // report error if this task isn't the first in the queue
 | |
|       forge.log.error(cat,
 | |
|         '[%s][%s] task not first in queue [%s]',
 | |
|         task.id, task.name, task.type);
 | |
|     } else {
 | |
|       // remove ourselves from the queue
 | |
|       sTaskQueues[task.type].shift();
 | |
|       // clean up queue if it is empty
 | |
|       if(sTaskQueues[task.type].length === 0) {
 | |
|         if(sVL >= 1) {
 | |
|           forge.log.verbose(cat, '[%s][%s] delete queue [%s]',
 | |
|             task.id, task.name, task.type);
 | |
|         }
 | |
|         /* Note: Only a task can delete a queue of its own type. This
 | |
|          is used as a way to synchronize tasks. If a queue for a certain
 | |
|          task type exists, then a task of that type is running.
 | |
|          */
 | |
|         delete sTaskQueues[task.type];
 | |
|       } else {
 | |
|         // dequeue the next task and start it
 | |
|         if(sVL >= 1) {
 | |
|           forge.log.verbose(cat,
 | |
|             '[%s][%s] queue start next [%s] remain:%s',
 | |
|             task.id, task.name, task.type,
 | |
|             sTaskQueues[task.type].length);
 | |
|         }
 | |
|         sTaskQueues[task.type][0].start();
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if(!suppressCallbacks) {
 | |
|       // call final callback if one exists
 | |
|       if(task.error && task.failureCallback) {
 | |
|         task.failureCallback(task);
 | |
|       } else if(!task.error && task.successCallback) {
 | |
|         task.successCallback(task);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| /* Tasks API */
 | |
| module.exports = forge.task = forge.task || {};
 | |
| 
 | |
| /**
 | |
|  * Starts a new task that will run the passed function asynchronously.
 | |
|  *
 | |
|  * In order to finish the task, either task.doNext() or task.fail()
 | |
|  * *must* be called.
 | |
|  *
 | |
|  * The task must have a type (a string identifier) that can be used to
 | |
|  * synchronize it with other tasks of the same type. That type can also
 | |
|  * be used to cancel tasks that haven't started yet.
 | |
|  *
 | |
|  * To start a task, the following object must be provided as a parameter
 | |
|  * (each function takes a task object as its first parameter):
 | |
|  *
 | |
|  * {
 | |
|  *   type: the type of task.
 | |
|  *   run: the function to run to execute the task.
 | |
|  *   success: a callback to call when the task succeeds (optional).
 | |
|  *   failure: a callback to call when the task fails (optional).
 | |
|  * }
 | |
|  *
 | |
|  * @param options the object as described above.
 | |
|  */
 | |
| forge.task.start = function(options) {
 | |
|   // create a new task
 | |
|   var task = new Task({
 | |
|     run: options.run,
 | |
|     name: options.name || sNoTaskName
 | |
|   });
 | |
|   task.type = options.type;
 | |
|   task.successCallback = options.success || null;
 | |
|   task.failureCallback = options.failure || null;
 | |
| 
 | |
|   // append the task onto the appropriate queue
 | |
|   if(!(task.type in sTaskQueues)) {
 | |
|     if(sVL >= 1) {
 | |
|       forge.log.verbose(cat, '[%s][%s] create queue [%s]',
 | |
|         task.id, task.name, task.type);
 | |
|     }
 | |
|     // create the queue with the new task
 | |
|     sTaskQueues[task.type] = [task];
 | |
|     start(task);
 | |
|   } else {
 | |
|     // push the task onto the queue, it will be run after a task
 | |
|     // with the same type completes
 | |
|     sTaskQueues[options.type].push(task);
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Cancels all tasks of the given type that haven't started yet.
 | |
|  *
 | |
|  * @param type the type of task to cancel.
 | |
|  */
 | |
| forge.task.cancel = function(type) {
 | |
|   // find the task queue
 | |
|   if(type in sTaskQueues) {
 | |
|     // empty all but the current task from the queue
 | |
|     sTaskQueues[type] = [sTaskQueues[type][0]];
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Creates a condition variable to synchronize tasks. To make a task wait
 | |
|  * on the condition variable, call task.wait(condition). To notify all
 | |
|  * tasks that are waiting, call condition.notify().
 | |
|  *
 | |
|  * @return the condition variable.
 | |
|  */
 | |
| forge.task.createCondition = function() {
 | |
|   var cond = {
 | |
|     // all tasks that are blocked
 | |
|     tasks: {}
 | |
|   };
 | |
| 
 | |
|   /**
 | |
|    * Causes the given task to block until notify is called. If the task
 | |
|    * is already waiting on this condition then this is a no-op.
 | |
|    *
 | |
|    * @param task the task to cause to wait.
 | |
|    */
 | |
|   cond.wait = function(task) {
 | |
|     // only block once
 | |
|     if(!(task.id in cond.tasks)) {
 | |
|        task.block();
 | |
|        cond.tasks[task.id] = task;
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   /**
 | |
|    * Notifies all waiting tasks to wake up.
 | |
|    */
 | |
|   cond.notify = function() {
 | |
|     // since unblock() will run the next task from here, make sure to
 | |
|     // clear the condition's blocked task list before unblocking
 | |
|     var tmp = cond.tasks;
 | |
|     cond.tasks = {};
 | |
|     for(var id in tmp) {
 | |
|       tmp[id].unblock();
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   return cond;
 | |
| };
 |