Node Monitor v0.6.5

Show:

File: lib/probes/StreamProbe.js

// StreamProbe.js (c) 2010-2014 Loren West and other contributors
// May be freely distributed under the MIT license.
// For further details and documentation:
// http://lorenwest.github.com/node-monitor
(function(root){

  // Module loading
  var Monitor = root.Monitor || require('../Monitor'),
      Probe = Monitor.Probe,
      _ = Monitor._;

  // Constants
  var DEFAULT_BUNDLE_INTERVAL = 1000;

  /**
  * Base class for probes that stream data
  *
  * Offering real time data streaming can result in degraded performance due
  * to the I/O overhead of sending individual stream elements to remote monitors.
  *
  * This class eases that overhead by bundling stream elements, and sending those
  * bundles in scheduled intervals.  The monitor gets to decide the interval based
  * on the stream volume, and their needs.
  *
  * Derived classes output their stream data as elements of the ```bundle```
  * attribute.
  *
  * A ```sequence``` attribute is incremented sequentially to assure change
  * events are fired, and to allow clients to insure stream ordering and
  * completeness.
  *
  * @class StreamProbe
  * @extends Probe
  * @constructor
  * @param [initParams] {Object} Probe initialization parameters
  *     @param [initParams.interval=1000] {Numeric} Number of milliseconds
  *         to wait between bundles.
  */
  var StreamProbe = Monitor.StreamProbe = Probe.extend({


    defaults: _.extend({}, Probe.prototype.defaults, {
      bundle: [],
      interval: DEFAULT_BUNDLE_INTERVAL,
      sequence: 0
    }),

    initialize: function(){
      var t = this;

      // Initialize parent
      Probe.prototype.initialize.apply(t, arguments);

      // Moving the interval into an instance variable for performance
      t.interval = t.get('interval');

      // Set up for the first bundle
      t.queue = [];
      t.timer = null;
      t.lastSendTime = 0;
    },

    /**
    * Queue an item in the stream
    *
    * This method places the item into the stream and outputs it to the
    * monitor, or queues it up for the next bundle.
    *
    * @method queueItem
    * @param item {Any} Item to place into the queue
    */
    queueItem: function(item) {
      var t = this,
          now = Date.now(),
          msSinceLastSend = now - t.lastSendTime;

      // Queue the item
      t.queue.push(item);

      // Send the bundle?
      if (msSinceLastSend > t.interval) {
        // It's been a while since the last send.  Send it now.
        t._send();
      }
      else {
        // Start the timer if it's not already running
        if (!t.timer) {
          t.timer = setTimeout(function(){
            t._send();
          }, t.interval - msSinceLastSend);
        }
      }
    },

    /**
    * Send the bundle to the montitor
    *
    * @private
    * @method _send
    */
    _send: function() {
      var t = this,
          now = Date.now();

      // This kicks off the send
      t.lastSendTime = now;
      t.set({
        bundle: t.queue,
        sequence: t.get('sequence') + 1
      });

      // Reset
      t.queue = [];
      if (t.timer) {
        clearTimeout(t.timer);
        t.timer = null;
      }
    }

  });

}(this));