// Connection.js (c) 2010-2014 Loren West and other contributors
// May be freely distributed under the MIT license.
// For further details and documentation:
// http://lorenwest.github.com/node-monitor
(function(root){
// Module loading
var Monitor = root.Monitor || require('./Monitor'),
Cron = Monitor.Cron, _ = Monitor._, Backbone = Monitor.Backbone,
log = Monitor.getLogger('Connection'),
stat = Monitor.getStatLogger('Connection'),
Config = Monitor.Config, SocketIO = root.io || require('socket.io-client'),
Probe = Monitor.Probe,
nextConnectionNum = 1;
/**
* Core monitor classes
*
* Classes in this module represent baseline monitor functionality. They can
* be loaded and run in a node.js container as well as within a browser.
*
* @module Monitor
*/
/**
* Connection with a remote process
*
* Instances of this class represent a connection with a remote monitor
* process. The remote process is a peer of this process - it may produce
* and/or consume probe information.
*
* This is an internal class created when a connection to a server is
* requested from a monitor, or when an external connection is made from
* a <a href="Server.html">Server</a> instance.
*
* @class Connection
* @extends Backbone.Model
* @constructor
* @param model - Initial data model. Can be a JS object or another Model.
* @param [model.hostName] {String} The host name to connect with. Used if url isn't present.
* @param [model.hostPort] {Number} The host port to connect using. Used if url isn't present.
* @param [model.url] {String} The URL used to connect. Built if hostName is supplied.
* @param [model.socket] {io.socket} Use this pre-connected socket instead of creating a new one.
* @param [model.gateway=false] {Boolean} Allow this connection to use me as a gateway? See <code><a href="Router.html#method_setGateway">Router.setGateway()</a></code>
* @param [model.firewall=false] {Boolean} Firewall inbound probe requests on this connection?
* @param [model.remoteHostName] {String READONLY} Host name given by the remote server.
* @param [model.remoteAppName] {String READONLY} App name given by the remote server.
* @param [model.remoteAppInstance] {Integer READONLY} The remote application instance ID running on the host.
* @param [model.remotePID] {String READONLY} Remote process ID.
* @param [model.remoteProbeClasses] {Array of String READONLY} Array of probe classes available to the remote server.
* @param [model.remoteGateway] {Boolean READONLY} Can the remote process act as a gateway?
* @param [model.remoteFirewall] {Boolean READONLY} Is the remote side firewalled from inbound probe requests?
*/
/**
* Connected to remote monitor process
*
* This event is emitted after the two sides of the connection have exchanged
* information about themselves.
*
* @event connect
*/
var Connection = Monitor.Connection = Backbone.Model.extend({
defaults: {
hostName: '',
hostPort: null,
url: null,
socket: null,
gateway: false,
firewall: false,
remoteHostName: null,
remoteAppName: null,
remoteAppInstance: 0,
remotePID: 0,
remoteProbeClasses: [],
remoteGateway: false,
remoteFirewall: false
},
initialize: function(params) {
var t = this;
t.connecting = true; // Currently connecting?
t.connected = false; // Currently connected?
t.socketEvents = null; // Key = event name, data = handler function
t.remoteProbeIdsByKey = {}; // Key = probeKey, data = probeId
t.remoteProbesById = {}; // Key = probeId, data = {Probe proxy}
t.incomingMonitorsById = {}; // Key = probeId, data = {Monitor proxy}
// Create a connection ID for logging
t.logId = (nextConnectionNum++) + '.';
// Either connect to an URL or with an existing socket
if (params.socket) {
t.bindConnectionEvents();
log.info(t.logId + 'connect', {socketId:params.socket.id});
}
else if (params.url || (params.hostName && params.hostPort)) {
t.connect();
log.info(t.logId + 'connect', {url:t.get('url')});
}
else {
log.error('init', 'Connection must supply a socket, url, or host name/port');
}
},
// Initiate a connection with a remote server
connect: function() {
var t = this, hostName = t.get('hostName'), hostPort = t.get('hostPort'),
url = t.get('url');
// Build the URL if not specified
if (!url) {
url = t.attributes.url = 'http://' + hostName + ':' + hostPort;
t.set('url', url);
}
// Connect with this url
var opts = {
// 'transports': ['websocket', 'xhr-polling', 'jsonp-polling'],
'force new connection': true, // Don't re-use existing connections
'reconnect': false // Don't let socket.io reconnect.
// Reconnects are performed by the Router.
};
var socket = SocketIO.connect(url, opts);
t.set({socket:socket}).bindConnectionEvents();
},
/**
* Ping a remote connection
*
* @method ping
* @param callback {Function(error)} Callback when response is returned
*/
ping: function(callback) {
var t = this;
callback = callback || function(){};
var onPong = function() {
t.off('pong', onPong);
callback();
};
t.on('pong', onPong);
t.emit('connection:ping');
},
/**
* Disconnect from the remote process
*
* This can be called from the underlying transport if it detects a disconnect,
* or it can be manually called to force a disconnect.
*
* @method disconnect
* @param reason {String} Reason for the disconnect
*/
/**
* <strong>Disconnected from a remote monitor process</strong>
*
* This event is emitted after the remote connection is disconnected and
* resources released.
*
* @event disconnect
* @param reason {String} Reason for the disconnect
*/
disconnect: function(reason) {
var t = this, socket = t.get('socket');
t.connecting = false;
t.connected = false;
// Only disconnect once.
// This method can be called many times during a disconnect (manually,
// by socketIO disconnect, and/or by the underlying socket disconnect).
if (t.socketEvents) {
t.removeAllEvents();
socket.disconnect();
t.trigger('disconnect', reason);
log.info(t.logId + 'disconnect', reason);
}
},
/**
* Is this connection with the specified host?
*
* @method isThisHost
* @protected
* @param hostName {String} The host name to check
* @return withHost {Boolean} True if the connection is with this host
*/
isThisHost: function(hostName) {
var t = this, testHost = hostName.toLowerCase(),
myHostName = t.get('hostName'), remoteHostName = t.get('remoteHostName');
myHostName = myHostName && myHostName.toLowerCase();
remoteHostName = remoteHostName && remoteHostName.toLowerCase();
return (testHost === myHostName || testHost === remoteHostName);
},
/**
* Emit the specified message to the socket.
*
* The other side of the connection can handle and respond to the message
* using the 'on' method.
*
* @method emit
* @protected
* @param name {String} The message name to send
* @param args... {Mixed} Variable number of arguments to send with the message
* @param callback {Function} Called when remote sends a reply
*/
emit: function() {
var t = this, socket = t.get('socket');
log.info(t.logId + 'emit', Monitor.deepCopy(arguments, 5));
socket.emit.apply(socket, arguments);
},
/**
* Bind the specified handler to the remote socket message.
*
* Only a single handler (per message name) can be bound using this method.
*
* @method addEvent
* @protected
* @param eventName {String} The event name to handle
* @param handler {Function (args..., callback)} Called when the message is received.
* <ul>
* <li>args... {Mixed} Arguments sent in by the remote client</li>
* <li>callback {Function} Final arg if the client specified a callback</li>
* </ul>
*/
addEvent: function(eventName, handler) {
var t = this, socket = t.get('socket');
t.socketEvents = t.socketEvents || {};
if (t.socketEvents[eventName]) {
throw new Error('Event already connected: ' + eventName);
}
socket.on(eventName, handler);
t.socketEvents[eventName] = handler;
return t;
},
// Remove the specified event from the socket
removeEvent: function(eventName) {
var t = this, socket = t.get('socket');
if (t.socketEvents && t.socketEvents[eventName]) {
socket.removeListener(eventName, t.socketEvents[eventName]);
delete t.socketEvents[eventName];
}
return t;
},
// Remove all events bound to the socket
removeAllEvents: function() {
var t = this, socket = t.get('socket');
for (var event in t.socketEvents) {
socket.removeListener(event, t.socketEvents[event]);
}
t.socketEvents = null;
return t;
},
/**
* An error has occurred on the connection
*
* This event is triggered when an error occurs on the connection. Errors
* may occur when network is unstable, and can be an indication of impending
* disconnection.
*
* @event error
* @param err {Object} Reason for the error (from underlying transport)
*/
bindConnectionEvents: function() {
var t = this, socket = t.get('socket');
if (t.socketEvents) {throw new Error('Already connected');}
t.socketEvents = {}; // key = event name, data = handler
// Failure events
t.addEvent('connect_failed', function(){
t.trigger('error', 'connect failed');
t.disconnect('connect failed');
});
t.addEvent('disconnect', function(){t.disconnect('remote_disconnect');});
t.addEvent('error', function(reason){
t.trigger('error', reason);
t.disconnect('connect error');
});
// Inbound probe events
t.addEvent('probe:connect', t.probeConnect.bind(t));
t.addEvent('probe:disconnect', t.probeDisconnect.bind(t));
t.addEvent('probe:control', t.probeControl.bind(t));
// Connection events
t.addEvent('connection:ping', function(){socket.emit('connection:pong');});
t.addEvent('connection:pong', function(){t.trigger('pong');});
// Connected once remote info is known
t.addEvent('connection:info', function (info) {
t.set({
remoteHostName: info.hostName,
remoteAppName: info.appName,
remoteAppInstance: info.appInstance,
remotePID: info.pid,
remoteProbeClasses: info.probeClasses,
remoteGateway: info.gateway,
remoteFirewall: info.firewall
});
t.connecting = false;
t.connected = true;
t.trigger('connect');
});
// Determine the process id
var pid = typeof process === 'undefined' ? 1 : process.pid;
// Determine the app instance
var appInstance = '' + (typeof process === 'undefined' ? pid : process.env.NODE_APP_INSTANCE || pid);
// Exchange connection information
socket.emit('connection:info', {
hostName:Monitor.getRouter().getHostName(),
appName:Config.Monitor.appName,
appInstance: appInstance,
pid: pid,
probeClasses: _.keys(Probe.classes),
gateway:t.get('gateway'),
firewall:t.get('firewall')
});
},
/**
* Process an inbound request to connect with a probe
*
* This will fail if this connection was created as a firewall.
*
* @method probeConnect
* @protected
* @param monitorJSON {Object} Probe connection parameters, including:
* @param monitorJSON.probeClass {String} The probe class
* @param monitorJSON.initParams {Object} Probe initialization parameters
* @param monitorJSON.hostName {String} Connect with this host (if called as a gateway)
* @param monitorJSON.appName {String} Connect with this app (if called as a gateway)
* @param callback {Function(error, probeJSON)} Callback function
*/
probeConnect: function(monitorJSON, callback) {
callback = callback || function(){};
var t = this,
errorText = '',
router = Monitor.getRouter(),
gateway = t.get('gateway'),
startTime = Date.now(),
firewall = t.get('firewall'),
logCtxt = _.extend({}, monitorJSON);
// Don't allow inbound requests if this connection is firewalled
if (firewall) {
errorText = 'firewalled';
log.error('probeConnect', errorText, logCtxt);
return callback(errorText);
}
// Determine the connection to use (or internal)
router.determineConnection(monitorJSON, gateway, function(err, connection) {
if (err) {return callback(err);}
if (connection && !gateway) {return callback('Not a gateway');}
// Function to run upon connection (internal or external)
var onConnect = function(error, probe) {
if (error) {
log.error(t.logId + 'probeConnect', error, logCtxt);
return callback(error);
}
// Get probe info
var probeId = probe.get('id');
logCtxt.id = probeId;
// Check for a duplicate proxy for this probeId. This happens when
// two connect requests are made before the first one completes.
var monitorProxy = t.incomingMonitorsById[probeId];
if (monitorProxy != null) {
probe.refCount--;
logCtxt.dupDetected = true;
logCtxt.refCount = probe.refCount;
log.info(t.logId + 'probeConnected', logCtxt);
return callback(null, monitorProxy.probe.toJSON());
}
// Connect the montior proxy
monitorProxy = new Monitor(monitorJSON);
monitorProxy.set('probeId', probeId);
t.incomingMonitorsById[probeId] = monitorProxy;
monitorProxy.probe = probe;
monitorProxy.probeChange = function(){
try {
t.emit('probe:change:' + probeId, probe.changedAttributes());
}
catch (e) {
log.error('probeChange', e, probe, logCtxt);
}
};
probe.connectTime = Date.now();
var duration = probe.connectTime - startTime;
logCtxt.duration = duration;
logCtxt.refCount = probe.refCount;
log.info(t.logId + 'probeConnected', logCtxt);
stat.time(t.logId + 'probeConnected', duration);
callback(null, probe.toJSON());
probe.on('change', monitorProxy.probeChange);
// Disconnect the probe on connection disconnect
t.on('disconnect', function() {
t.probeDisconnect({probeId:probeId});
});
};
// Connect internally or externally
if (connection) {
router.connectExternal(monitorJSON, connection, onConnect);
} else {
router.connectInternal(monitorJSON, onConnect);
}
});
},
/**
* Process an inbound request to disconnect with a probe
*
* @method probeDisconnect
* @protected
* @param params {Object} Disconnect parameters, including:
* probeId {String} The unique probe id
* @param callback {Function(error)} Callback function
*/
probeDisconnect: function(params, callback) {
callback = callback || function(){};
var t = this,
errorText = '',
router = Monitor.getRouter(),
probeId = params.probeId,
monitorProxy = t.incomingMonitorsById[probeId],
firewall = t.get('firewall'),
logCtxt = null,
probe = null;
// Already disconnected
if (!monitorProxy || !monitorProxy.probe) {
return callback(null);
}
// Get a good logging context
probe = monitorProxy.probe;
logCtxt = {
probeClass: monitorProxy.get('probeClass'),
initParams: monitorProxy.get('initParams'),
probeId: probeId
};
// Called upon disconnect (internal or external)
var onDisconnect = function(error) {
if (error) {
log.error(t.logId + 'probeDisconnect', error);
return callback(error);
}
var duration = logCtxt.duration = Date.now() - probe.connectTime;
probe.off('change', monitorProxy.probeChange);
monitorProxy.probe = monitorProxy.probeChange = null;
delete t.incomingMonitorsById[probeId];
log.info(t.logId + 'probeDisconnected', logCtxt);
stat.time(t.logId + 'probeDisconnected', duration);
return callback(null);
};
// Disconnect from an internal or external probe
if (probe && probe.connection) {
router.disconnectExternal(probe.connection, probeId, onDisconnect);
} else {
router.disconnectInternal(probeId, onDisconnect);
}
},
/**
* Process an inbound control request to a probe
*
* @method probeControl
* @protected
* @param params {Object} Control parameters, including:
* probeId {String} The unique probe id
* name {String} The control message name
* params {Object} Any control message parameters
* @param callback {Function(error, returnParams)} Callback function
*/
probeControl: function(params, callback) {
callback = callback || function(){};
var t = this,
errorText = '',
logId = t.logId + 'probeControl',
startTime = Date.now(),
router = Monitor.getRouter(),
firewall = t.get('firewall');
// Don't allow inbound requests if this connection is firewalled
if (firewall) {
errorText = 'firewalled';
log.error(logId, errorText);
return callback(errorText);
}
// Called upon return
var onReturn = function(error) {
if (error) {
log.error(logId, error);
return callback(error);
}
else {
var duration = Date.now() - startTime;
log.info(logId + '.return', {duration:duration, returnArgs: arguments});
stat.time(logId, duration);
return callback.apply(null, arguments);
}
};
// Is this an internal probe?
var probe = router.runningProbesById[params.probeId];
if (!probe) {
// Is this a remote (proxied) probe?
var monitorProxy = t.incomingMonitorsById[params.probeId];
if (!monitorProxy) {
errorText = 'Probe id not found: ' + params.probeId;
log.error(errorText);
return callback(errorText);
}
// Proxying requires this form vs. callback as last arg.
return monitorProxy.control(params.name, params.params, function(err, returnParams) {
onReturn(err, returnParams);
});
}
logId = logId + '.' + probe.probeClass + '.' + params.name;
log.info(logId + '.request', {params:params.params, probeId:params.probeId});
return probe.onControl(params.name, params.params, onReturn);
}
});
/**
* Constructor for a list of Connection objects
*
* var myList = new Connection.List(initialElements);
*
* @static
* @method List
* @param [items] {Array} Initial list items. These can be raw JS objects or Connection data model objects.
* @return {Backbone.Collection} Collection of Connection data model objects
*/
Connection.List = Backbone.Collection.extend({model: Connection});
}(this));