// Router.js (c) 2010-2014 Loren West and other contributors
// May be freely distributed under the MIT license.
// For further details and documentation:
// http://lorenwest.github.com/node-monitor
(function(root){
// Module loading
var Monitor = root.Monitor || require('./Monitor'),
log = Monitor.getLogger('Router'),
stat = Monitor.getStatLogger('Router'),
Cron = Monitor.Cron, _ = Monitor._, Backbone = Monitor.Backbone,
Config = Monitor.Config, Probe = Monitor.Probe,
Connection = Monitor.Connection, Server = Monitor.Server,
SocketIO = root.io || require('socket.io'),
// Set if server-side
hostName = Monitor.commonJS ? require('os').hostname() : null;
// Constants
var PROBE_TIMEOUT_MS = 10000;
/**
* Probe location and message routing
*
* The router is a class used internally to locate probes and connect
* events so messages are correctly routed between probes and their monitors.
*
* It is a *singleton* class, designed to run one instance within
* a monitor process, and accessed via the (protected) `getRouter()`
* method of the <a href="Monitor.html">Monitor</a> object.
*
* It manages all outbound requests to probes, either internally or externally
* via the <a href="Connection.html">Connection</a> to the remote process.
*
* @class Router
* @extends Backbone.Model
* @constructor
*/
/**
* A new connection has been established
*
* @event
* connection:add
* @param connection {Connection} The added connection
*/
/**
* A connection has been terminated
*
* @event
* connection:remove
* @param connection {Connection} The removed connection
*/
var Router = Monitor.Router = Backbone.Model.extend({
initialize: function() {
var t = this;
t.defaultGateway = null;
t.firewall = false;
t.connections = new Connection.List();
t.runningProbesByKey = {}; // key=probeKey, data=probeImpl
t.runningProbesById = {}; // key=probeId, data=probeImpl
t.addHostCallbacks = {}; // key=hostName, data=[callbacks]
log.info('init', 'Router initialized');
},
/**
* Firewall new connections from inbound probe requests
*
* When two monitor processes connect, they become peers. By default each
* process can request probe connections with the other.
*
* If you want to connect with a remote probe, but don't want those servers
* to connect with probes in this process, call this method to firewall
* those inbound probe requests.
*
* Setting this will change the firewall value for all *new* connections.
* Any existing connections will still accept incoming probe requests.
*
* @static
* @method setFirewall
* @param firewall {Boolean} - Firewall new connections?
*/
setFirewall: function(firewall) {
var t = Monitor.getRouter(); // This is a static method
t.firewall = firewall;
log.info('setFirewall', firewall);
},
/**
* Set the default gateway server
*
* The gateway server is used if a monitor cannot connect directly with the
* server hosting the probe.
*
* When a monitor is requested to connect with a probe on a specific server,
* a direct connection is attempted. If that direct connection fails, usually
* due to a firewall or browser restriction, the monitor will attempt the
* connection to the probe through the gateway server.
*
* The server specified in this method must have been started as a gateway
* like this:
*
* // Start a monitor server and act as a gateway
* var server = new Monitor.Server({gateway:true});
*
* @method setGateway
* @param options {Object} - Connection parameters
* @param options.hostName {String} - Name of the gateway host
* @param options.hostPort {Integer} - Port number to connect with
* @param options.url {String} - The URL used to connect (created, or used if supplied)
* @param options.socket {io.socket} - Pre-connected socket.io socket to the gateway server.
* @return connection {Connection} - The connection with the gateway server
*/
setGateway: function(options) {
var t = this;
options.gateway = false; // New connection can't be an inbound gateway
options.firewall = true; // Gateways are for outbound requests only
return t.defaultGateway = t.addConnection(options);
},
/**
* Return a stable host name.
*
* @method getHostName
* @protected
* @return hostName {String} - The platform's host name, or an otherwise stable ID
*/
getHostName: function() {
var localStorage = root.localStorage;
if (!hostName) {
if (localStorage) {hostName = localStorage.hostName;}
hostName = hostName || Monitor.generateUniqueId();
if (localStorage) {localStorage.hostName = hostName;}
}
return hostName;
},
/**
* Set the current host name.
*
* This sets the host name that this router publishes to new connections.
* It's only useful if the os hostname isn't the name you want to publish.
*
* @protected
* @method setHostName
* @param hostName {String} - The host name to publish to new connections
*/
setHostName: function(name) {
hostName = name;
log.info('setHostName', name);
},
/**
* Add a connection to a remote Monitor process
*
* @method addConnection
* @protected
* @param options {Object} - Connection parameters
* @param options.hostName {String} - Name of the host to connect with
* @param options.hostPort {Integer} - Port number to connect with
* @param options.url {String} - The URL used to connect (created, or used if supplied)
* @param options.socket {io.socket} - Pre-connected socket.io socket to a Monitor server.
* @param options.gateway {Boolean} - Allow this connection to use me as a gateway (default false)
* @param options.firewall {Boolean} Firewall inbound probe requests on this connection?
* @return connection {Connection} - The added connection
*/
addConnection: function(options) {
var t = this,
startTime = Date.now();
// Default the firewall value
if (_.isUndefined(options.firewall)) {
options = _.extend({},options, {firewall: t.firewall});
}
// Generate a unique ID for the connection
options.id = Monitor.generateUniqueCollectionId(t.connections);
var connStr = 'Conn_' + options.id;
if (options.hostName) {
connStr += ' - ' + options.hostName + ':' + options.hostPort;
}
log.info('addConnection', connStr);
// Instantiate and add the connection for use, once connected
var connection = new Connection(options);
// Add a connect and disconnect function
var onConnect = function(){
t.trigger('connection:add', connection);
log.info('connected', connStr, (Date.now() - startTime) + 'ms');
};
var onDisconnect = function(){
t.removeConnection(connection);
connection.off('connect', onConnect);
connection.off('disconnect', onConnect);
log.info('disconnected', connStr, (Date.now() - startTime) + 'ms');
};
connection.on('connect', onConnect);
connection.on('disconnect', onDisconnect);
// Add to the connections
t.connections.add(connection);
return connection;
},
/**
* Remove a connection from the router.
*
* This is called to remove the connection and associated routes from the router.
*
* @method removeConnection
* @protected
* @param connection {Connection} - The connection to remove
*/
removeConnection: function(connection) {
var t = this;
log.info('removeConnection', 'Conn_' + connection.id);
connection.disconnect('connection_removed');
t.connections.remove(connection);
t.trigger('connection:remove', connection);
},
/**
* Connect a Monitor object to a remote Probe
*
* This accepts an instance of a Monitor and figures out how to connect it
* to a running Probe.
*
* Upon callback the probe data is set into the monitor (unless an error
* occurred)
*
* @method connectMonitor
* @protected
* @param monitor {Monitor} - The monitor requesting to connect with the probe
* @param callback {Function(error)} - (optional) Called when connected
*/
connectMonitor: function(monitor, callback) {
callback = callback || function(){};
var t = this,
monitorJSON = monitor.toMonitorJSON(),
probeJSON = null,
probeClass = monitorJSON.probeClass,
startTime = Date.now(),
monitorStr = probeClass + '.' + monitor.toServerString().replace(/:/g, '.');
// Class name must be set
if (!probeClass) {
var errStr = 'probeClass must be set';
log.error('connectMonitor', errStr);
return callback(errStr);
}
// Determine the connection (or internal), and listen for change events
t.determineConnection(monitorJSON, true, function(err, connection) {
if (err) {return callback(err);}
// Function to run on connection (internal or external)
var onConnect = function(error, probe) {
if (error) {return callback(error);}
probeJSON = probe.toJSON();
probeJSON.probeId = probeJSON.id; delete probeJSON.id;
monitor.probe = probe;
// Perform the initial set silently. This assures the initial
// probe contents are available on the connect event,
// but doesn't fire a change event before connect.
monitor.set(probeJSON, {silent:true});
// Watch the probe for changes.
monitor.probeChange = function(){
monitor.set(probe.changedAttributes());
log.info('probeChange', {probeId: probeJSON.probeId, changed: probe.changedAttributes()});
};
probe.on('change', monitor.probeChange);
// Call the callback. This calls the original caller, issues
// the connect event, then fires the initial change event.
callback(null);
};
// Connect internally or externally
if (connection) {
t.connectExternal(monitorJSON, connection, onConnect);
} else {
t.connectInternal(monitorJSON, onConnect);
}
});
},
/**
* Disconnect a monitor
*
* This accepts an instance of a connected monitor, and disconnects it from
* the remote probe.
*
* The probe implementation will be released if this is the only monitor
* object watching it.
*
* @method disconnectMonitor
* @protected
* @param monitor {Monitor} - The connected monitor
* @param reason {String} - Reason for the disconnect
* @param callback {Function(error)} - (optional) Called when connected
*/
disconnectMonitor: function(monitor, reason, callback) {
callback = callback || function(){};
var t = this, probe = monitor.probe, probeId = monitor.get('probeId');
// The monitor must be connected
if (!probe) {return callback('Monitor must be connected');}
// Called upon disconnect (internal or external)
var onDisconnect = function(error) {
if (error) {
return callback(error);
}
probe.off('change', monitor.probeChange);
monitor.probe = monitor.probeChange = null;
monitor.set({probeId:null});
return callback(null, reason);
};
// Disconnect from an internal or external probe
if (probe.connection) {
t.disconnectExternal(probe.connection, probeId, onDisconnect);
} else {
t.disconnectInternal(probeId, onDisconnect);
}
},
/**
* Build a probe key from the probe data
*
* @method buildProbeKey
* @protected
* @param probeJSON {Object} - An object containing:
* @param probeJSON.probeClass {String} - The probe class name (required)
* @param probeJSON.initParams {Object} - Probe initialization parameters (if any)
* @return probeKey {String} - A string identifying the probe
*/
buildProbeKey: function(probeJSON) {
var probeKey = probeJSON.probeClass, initParams = probeJSON.initParams;
if (initParams) {
_.keys(initParams).sort().forEach(function(key){
probeKey += ':' + key + '=' + initParams[key];
});
}
return probeKey;
},
/**
* Determine the connection to use for a probe
*
* This uses the connection parameters of the specified monitor to determine
* (or create) the connection to use for the probe.
*
* If the probe can be instantiated internally, a null is returned as the
* connection.
*
* This attempts to use an existing connection if available. If not, a
* connection attempt will be made with the host. If the host cannot be
* reached directly, it returns a connection with the gateway.
*
* @method determineConnection
* @protected
* @param monitorJSON {Object} - The connection attributes of the monitor
* @param makeNewConnections {Boolean} - Establish a new connection if one doesn't exist?
* @param callback {Function(error, connection)} - Called when the connection is known
* <ul>
* <li>error - Set if any errors</li>
* <li>connection - The connection object, or null to run in this process</li>
* <ul>
*/
determineConnection: function(monitorJSON, makeNewConnections, callback) {
var t = this, connection = null, probeClass = monitorJSON.probeClass,
errStr = '',
hostName = monitorJSON.hostName,
appName = monitorJSON.appName,
appInstance = monitorJSON.appInstance,
thisHostName = t.getHostName().toLowerCase(),
thisAppName = Config.Monitor.appName || 'unknown',
thisAppInstance = typeof process !== 'undefined' ? process.env.NODE_APP_INSTANCE : '1';
// Return a found connection immediately if it's connected.
// If connecting, wait for connection to complete.
// If not connected (and not connecting) re-try the connection.
var connectedCheck = function(isGateway) {
// Remove the host/app/instance params if connecting directly.
if (!isGateway) {
delete monitorJSON.hostName;
delete monitorJSON.appName;
delete monitorJSON.appInstance;
}
// Define the connect/error listeners
var onConnect = function() {
removeListeners();
callback(null, connection);
};
var onError = function(err) {
removeListeners();
log.error('connect.error', err);
callback({msg: 'connection error', err:err});
};
var removeListeners = function() {
connection.off('connect', onConnect);
connection.off('error', onError);
};
// Wait if the connection is still awaiting connect
if (connection && connection.connecting) {
connection.on('connect', onConnect);
connection.on('error', onError);
return;
}
// Re-try if disconnected
if (connection && !connection.connected) {
connection.on('connect', onConnect);
connection.on('error', onError);
return connection.connect(callback);
}
// Verified connection
return callback(null, connection);
};
// Connect with this process (internally)?
hostName = hostName ? hostName.toLowerCase() : null;
var thisHost = (!hostName || hostName === thisHostName);
var thisApp = (!appName || appName === thisAppName);
var thisInstance = (!appInstance || appInstance === thisAppInstance);
if (thisHost && thisApp && thisInstance) {
// Connect internally if the probe is available
if (Probe.classes[probeClass] != null) {
return callback(null, null);
}
// No probe with that name in this process.
// Fallback to the default gateway.
if (!t.defaultGateway) {
errStr = 'Probe class "' + probeClass + '" not available in this process';
log.error('connect.internal', errStr);
return callback({err:errStr});
}
connection = t.defaultGateway;
return connectedCheck(true);
}
// Return if connection is known
connection = t.findConnection(hostName, appName, appInstance);
if (connection) {
return connectedCheck();
}
// Prefer the gateway if it exists
if (t.defaultGateway) {
connection = t.defaultGateway;
return connectedCheck(true);
}
// See if we can establish new connections with the host
if (hostName && makeNewConnections) {
t.addHostConnections(hostName, function(err) {
if (err) {
log.error('connect.toHost', err);
return callback(err);
}
// Try finding now that new connections have been made
connection = t.findConnection(hostName, appName, appInstance);
if (!connection) {
errStr = 'No route to host: ' + Monitor.toServerString(monitorJSON);
log.error('connect.toHost', errStr);
return callback({err:errStr});
}
return connectedCheck();
});
// Wait for addHostConnections to complete
return;
}
// We tried...
if (!hostName) {
// App name was specified, it wasn't this process, and no hostname
errStr = 'No host specified for app: ' + appName;
log.error('connect', errStr);
return callback({msg:errStr},null);
} else {
// Not allowed to try remote hosts
errStr = 'Not a gateway to remote monitors';
log.error('connect', errStr);
return callback({msg:errStr});
}
},
/**
* Find an existing connection to use
*
* This method looks into the existing known connections to find one
* that matches the specified parameters.
*
* Firewalled connections are not returned.
*
* @method findConnection
* @protected
* @param hostName {String} - Host name to find a connection for (null = any host)
* @param appName {String} - App name to find a connection with (null = any app)
* @param appInstance {Any} - Application instance running on this host (null = any instance)
* @return connection {Connection} - A Connection object if found, otherwise null
*/
findConnection: function(hostName, appName, appInstance) {
var t = this, thisInstance = 0;
return t.connections.find(function(conn) {
// Host or app matches if not specified or if specified and equal
var matchesHost = !hostName || conn.isThisHost(hostName);
var matchesApp = !appName || appName === conn.get('remoteAppName');
var matchesInstance = !appInstance || appInstance === conn.get('remoteAppInstance');
var remoteFirewall = conn.get('remoteFirewall');
// This is a match if host + app + instance matches, and it's not firewalled
return (!remoteFirewall && matchesHost && matchesApp && matchesInstance);
});
},
/**
* Find all connections matching the selection criteria
*
* This method looks into the existing known connections to find all
* that match the specified parameters.
*
* Firewalled connections are not returned.
*
* @method findConnections
* @protected
* @param hostName {String} - Host name to search for (null = any host)
* @param appName {String} - App name to search for (null = any app)
* @return connections {Array of Connection} - An array of Connection objects matching the criteria
*/
findConnections: function(hostName, appName) {
var t = this;
return t.connections.filter(function(conn) {
// Host or app matches if not specified or if specified and equal
var matchesHost = !hostName || conn.isThisHost(hostName);
var matchesApp = !appName || appName === conn.get('remoteAppName');
var remoteFirewall = conn.get('remoteFirewall');
// This is a match if host + app matches, and it's not firewalled
return (!remoteFirewall && matchesHost && matchesApp);
});
},
/**
* Add connections for the specified host
*
* This performs a scan of monitor ports on the server, and adds connections
* for newly discovered servers.
*
* It can take a while to complete, and if called for the same host before
* completion, it will save the callback and call all callbacks when the
* original task is complete.
*
* @method addHostConnections
* @protected
* @param hostName {String} - The host to add connections with
* @param callback {Function(error)} - Called when complete
*/
addHostConnections: function(hostName, callback) {
var t = this,
errStr = '',
connectedPorts = [],
portStart = Config.Monitor.serviceBasePort,
portEnd = Config.Monitor.serviceBasePort + Config.Monitor.portsToScan - 1;
// Create an array to hold callbacks for this host
if (!t.addHostCallbacks[hostName]) {
t.addHostCallbacks[hostName] = [];
}
// Remember this callback and return if we're already adding connections for this host
if (t.addHostCallbacks[hostName].push(callback) > 1) {
return;
}
// Called when done
var doneAdding = function(error) {
t.addHostCallbacks[hostName].forEach(function(cb) {
cb(error);
});
delete t.addHostCallbacks[hostName];
};
// Build the list of ports already connected
t.connections.each(function(connection){
var host = connection.get('hostName').toLowerCase();
var port = connection.get('hostPort');
if (host === hostName && port >= portStart && port <= portEnd) {
connectedPorts.push(port);
}
});
// Scan non-connected ports
var portsToScan = Config.Monitor.portsToScan - connectedPorts.length;
if (portsToScan === 0) {
errStr = 'All monitor ports in use. Increase the Config.Monitor.portsToScan configuration';
log.error('addHostConnections', errStr);
return doneAdding(errStr);
}
var doneScanning = function() {
var conn = this; // called in the context of the connection
conn.off('connect disconnect error', doneScanning);
if (--portsToScan === 0) {
return doneAdding();
}
};
for (var i = portStart; i <= portEnd; i++) {
if (connectedPorts.indexOf(i) < 0) {
var connection = t.addConnection({hostName:hostName, hostPort:i});
connection.on('connect disconnect error', doneScanning, connection);
}
}
},
/**
* Connect to an internal probe implementation
*
* This connects with a probe running in this process. It will instantiate
* the probe if it isn't currently running.
*
* @method connectInternal
* @protected
* @param monitorJSON {Object} - The monitor toJSON data. Containing:
* @param monitorJSON.probeClass {String} - The probe class name to connect with (required)
* @param monitorJSON.initParams {Object} - Probe initialization parameters.
* @param callback {Function(error, probeImpl)} - Called when connected
*/
connectInternal: function(monitorJSON, callback) {
// Build a key for this probe from the probeClass and initParams
var t = this,
probeKey = t.buildProbeKey(monitorJSON),
probeClass = monitorJSON.probeClass,
initParams = monitorJSON.initParams,
probeImpl = null;
var whenDone = function(error) {
// Wait one tick before firing the callback. This simulates a remote
// connection, making the client callback order consistent, regardless
// of a local or remote connection.
setTimeout(function() {
// Dont connect the probe on error
if (error) {
if (probeImpl) {
delete t.runningProbesByKey[probeKey];
delete t.runningProbesById[probeImpl.id];
try {
// This may fail depending on how many resources were created
// by the probe before failure. Ignore errors.
probeImpl.release();
} catch (e){}
}
return callback(error);
}
// Probes are released based on reference count
probeImpl.refCount++;
log.info('connectInternal', {probeKey: probeKey, probeId: probeImpl.id});
callback(null, probeImpl);
}, 0);
};
// Get the probe instance
probeImpl = t.runningProbesByKey[probeKey];
if (!probeImpl) {
// Instantiate the probe
var ProbeClass = Probe.classes[probeClass];
if (!ProbeClass) {
return whenDone({msg:'Probe not available: ' + probeClass});
}
var initOptions = {asyncInit: false, callback: whenDone};
try {
// Deep copy the init params, because Backbone mutates them. This
// is bad if the init params came in from defaults of another object,
// because those defaults will get mutated.
var paramCopy = Monitor.deepCopy(initParams);
// Instantiate a new probe
probeImpl = new ProbeClass(paramCopy, initOptions);
probeImpl.set({id: Monitor.generateUniqueId()});
probeImpl.refCount = 0;
probeImpl.probeKey = probeKey;
t.runningProbesByKey[probeKey] = probeImpl;
t.runningProbesById[probeImpl.id] = probeImpl;
} catch (e) {
var error = {msg: 'Error instantiating probe ' + probeClass, error: e.message};
return whenDone(error);
}
// Return early if the probe constructor transferred responsibility
// for calling the callback.
if (initOptions.asyncInit) {
return;
}
}
// The probe impl is found, and instantiated if necessary
whenDone();
},
/**
* Disconnect with an internal probe implementation.
*
* @method disconnectInternal
* @protected
* @param probeId {String} - The probe implementation ID to disconnect
* @param callback {Function(error, probeImpl)} - Called when disconnected
*/
disconnectInternal: function(probeId, callback) {
var t = this, probeImpl = t.runningProbesById[probeId];
if (!probeImpl) {return callback('Probe not running');}
if (--probeImpl.refCount === 0) {
// Release probe resources & internal references if still no references after a while
setTimeout(function() {
if (probeImpl.refCount === 0) {
try {
probeImpl.release();
} catch (e){}
delete t.runningProbesByKey[probeImpl.probeKey];
delete t.runningProbesById[probeId];
}
}, PROBE_TIMEOUT_MS);
}
callback(null, probeImpl);
},
/**
* Connect to an external probe implementation.
*
* This connects with a probe running in another process. It will
* coordinate the remote instantiation of the probe if it's not running.
*
* @method connectExternal
* @protected
* @param monitorJSON {Object} - An object containing:
* @param monitorJSON.probeClass {String} - The probe class name (required)
* @param monitorJSON.initParams {Object} - Probe initialization parameters (if any)
* @param connection {Connection} - The connection to use
* @param callback {Function(error, probeProxy)} - Called when connected
*/
connectExternal: function(monitorJSON, connection, callback) {
// Build a key for this probe from the probeClass and initParams
var t = this,
errStr = '',
probeKey = t.buildProbeKey(monitorJSON);
// Get the probe proxy
var probeId = connection.remoteProbeIdsByKey[probeKey];
var probeProxy = connection.remoteProbesById[probeId];
if (!probeProxy) {
// Connect with the remote probe
connection.emit('probe:connect', monitorJSON, function(error, probeJSON){
if (error) {
errStr = "probe:connect returned an error for probeClass '" + monitorJSON.probeClass +
"' on " + Monitor.toServerString(monitorJSON);
return callback({err: error, msg: errStr});
}
probeId = probeJSON.id;
// See if the proxy was created while waiting for return
probeProxy = connection.remoteProbesById[probeId];
if (probeProxy) {
probeProxy.refCount++;
log.info('connectExternal.connected.existingProxy', {probeId: probeId, refCount: probeProxy.refCount, whileWaiting: true});
return callback(null, probeProxy);
}
// Create the probe proxy
probeProxy = new Probe(probeJSON);
probeProxy.refCount = 1;
probeProxy.connection = connection;
connection.remoteProbeIdsByKey[probeKey] = probeId;
connection.remoteProbesById[probeId] = probeProxy;
connection.addEvent('probe:change:' + probeId, function(attrs){probeProxy.set(attrs);});
log.info('connectExternal.connected.newProxy', {probeId: probeId});
return callback(null, probeProxy);
});
return;
}
// Probes are released based on reference count
probeProxy.refCount++;
log.info('connectExternal.connected.existingProxy', {probeId: probeId, refCount: probeProxy.refCount});
return callback(null, probeProxy);
},
/**
* Disconnect with an external probe implementation.
*
* @method disconnectExternal
* @protected
* @param connection {Connection} - The connection to use
* @param probeId {String} - Probe ID
* @param callback {Function(error)} - Called when disconnected
*/
disconnectExternal: function(connection, probeId, callback) {
var t = this, proxy = connection.remoteProbesById[probeId];
if (!proxy) {return callback('Probe not running');}
if (--proxy.refCount === 0) {
// Release probe resources
proxy.release();
proxy.connection = null;
delete connection.remoteProbesById[probeId];
delete connection.remoteProbeIdsByKey[proxy.probeKey];
connection.removeEvent('probe:change:' + probeId);
return connection.emit('probe:disconnect', {probeId:probeId}, function(error){
return callback(error);
});
}
callback(null);
}
});
}(this));