diff --git a/README.md b/README.md index c1871c75..7637fb52 100755 --- a/README.md +++ b/README.md @@ -30,6 +30,24 @@ module.exports.connections = { password : 'password', database : 'MySQL Database Name' + // OR (replication / clusterPool) + replication: { + canRetry: true, + defaultSelector: 'RR', + + read: [ + {host: 'localhost'} + ], + + write: [ + {host: 'localhost', user: 'username', password: 'password'} + ], + + readwrite: [ + {host: 'localhost', user: 'username', password: 'password'} + ] + }, + // OR (explicit sets take precedence) module : 'sails-mysql', url : 'mysql2://USER:PASSWORD@HOST:PORT/DATABASENAME' diff --git a/lib/adapter.js b/lib/adapter.js index e8a3ed1e..150ce97d 100644 --- a/lib/adapter.js +++ b/lib/adapter.js @@ -84,7 +84,7 @@ module.exports = (function() { // Direct access to query - query: function(connectionName, collectionName, query, data, cb, connection) { + query: function(connectionName, collectionName, query, data, cb, connection, queryType) { if (_.isFunction(data)) { cb = data; @@ -92,7 +92,7 @@ module.exports = (function() { } if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __QUERY__, cb); + return spawnConnection(queryType || 'WRITE', connectionName, __QUERY__, cb); } else { __QUERY__(connection, cb); } @@ -100,7 +100,7 @@ module.exports = (function() { function __QUERY__(connection, cb) { // Run query - log('MySQL.query: ', query); + log('MySQL.query: ', connection._clusterId || '', query); if (data) connection.query(query, data, cb); else connection.query(query, cb); @@ -114,7 +114,7 @@ module.exports = (function() { describe: function(connectionName, collectionName, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __DESCRIBE__, cb); + return spawnConnection('READ', connectionName, __DESCRIBE__, cb); } else { __DESCRIBE__(connection, cb); } @@ -132,8 +132,8 @@ module.exports = (function() { var pkQuery = 'SHOW INDEX FROM ' + tableName; // Run query - log('MySQL.describe: ', query); - log('MySQL.describe(pk): ', pkQuery); + log('MySQL.describe: ', connection._clusterId || '', query); + log('MySQL.describe(pk): ', connection._clusterId || '', pkQuery); connection.query(query, function __DESCRIBE__(err, schema) { if (err) { @@ -191,7 +191,7 @@ module.exports = (function() { var self = this; if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __DEFINE__, cb); + return spawnConnection('WRITE', connectionName, __DEFINE__, cb); } else { __DEFINE__(connection, cb); } @@ -222,7 +222,7 @@ module.exports = (function() { // Run query - log('MYSQL.define: ', query); + log('MYSQL.define: ', connection._clusterId || '', query); connection.query(query, function __DEFINE__(err, result) { if (err) return cb(err); @@ -250,7 +250,7 @@ module.exports = (function() { } if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __DROP__, cb); + return spawnConnection('WRITE', connectionName, __DROP__, cb); } else { __DROP__(connection, cb); } @@ -270,7 +270,7 @@ module.exports = (function() { var query = 'DROP TABLE ' + tableName; // Run query - log('MYSQL.drop: ', query); + log('MYSQL.drop: ', connection._clusterId || '', query); connection.query(query, function __DROP__(err, result) { if (err) { @@ -294,7 +294,7 @@ module.exports = (function() { addAttribute: function (connectionName, collectionName, attrName, attrDef, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __ADD_ATTRIBUTE__, cb); + return spawnConnection('WRITE', connectionName, __ADD_ATTRIBUTE__, cb); } else { __ADD_ATTRIBUTE__(connection, cb); } @@ -308,7 +308,7 @@ module.exports = (function() { var query = sql.addColumn(tableName, attrName, attrDef); // Run query - log('MYSQL.addAttribute: ', query); + log('MYSQL.addAttribute: ', connection._clusterId || '', query); connection.query(query, function(err, result) { if (err) return cb(err); @@ -324,7 +324,7 @@ module.exports = (function() { removeAttribute: function (connectionName, collectionName, attrName, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __REMOVE_ATTRIBUTE__, cb); + return spawnConnection('WRITE', connectionName, __REMOVE_ATTRIBUTE__, cb); } else { __REMOVE_ATTRIBUTE__(connection, cb); } @@ -338,7 +338,7 @@ module.exports = (function() { var query = sql.removeColumn(tableName, attrName); // Run query - log('MYSQL.removeAttribute: ', query); + log('MYSQL.removeAttribute: ', connection._clusterId || '', query); connection.query(query, function(err, result) { if (err) return cb(err); @@ -358,7 +358,7 @@ module.exports = (function() { create: function(connectionName, collectionName, data, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __CREATE__, cb); + return spawnConnection('WRITE', connectionName, __CREATE__, cb); } else { __CREATE__(connection, cb); } @@ -389,7 +389,7 @@ module.exports = (function() { } // Run query - log('MySQL.create: ', _query.query); + log('MySQL.create: ', connection._clusterId || '', _query.query); connection.query(_query.query, function(err, result) { if (err) return cb( handleQueryError(err) ); @@ -419,7 +419,7 @@ module.exports = (function() { createEach: function (connectionName, collectionName, valuesList, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __CREATE_EACH__, cb); + return spawnConnection('WRITE', connectionName, __CREATE_EACH__, cb); } else { __CREATE_EACH__(connection, cb); } @@ -453,7 +453,7 @@ module.exports = (function() { } // Run query - log('MySQL.createEach: ', _query.query); + log('MySQL.createEach: ', connection._clusterId || '', _query.query); connection.query(_query.query, function(err, results) { if (err) return cb( handleQueryError(err) ); @@ -480,7 +480,7 @@ module.exports = (function() { var query = 'SELECT * FROM ' + mysql.escapeId(tableName) + ' WHERE ' + mysql.escapeId(pk) + ' IN (' + records + ');'; // Run Query returing results - log('MYSQL.createEach: ', query); + log('MYSQL.createEach: ', connection._clusterId || '', query); connection.query(query, function(err, results) { if(err) return cb(err); @@ -502,7 +502,7 @@ module.exports = (function() { join: function (connectionName, collectionName, options, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __JOIN__, cb); + return spawnConnection('READ', connectionName, __JOIN__, cb); } else { __JOIN__(connection, cb); } @@ -798,7 +798,7 @@ module.exports = (function() { find: function(connectionName, collectionName, options, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __FIND__, cb); + return spawnConnection('READ', connectionName, __FIND__, cb); } else { __FIND__(connection, cb); } @@ -829,7 +829,7 @@ module.exports = (function() { } // Run query - log('MYSQL.find: ', _query.query[0]); + log('MYSQL.find: ', connection._clusterId || '', _query.query[0]); connection.query(_query.query[0], function(err, result) { if(err) return cb(err); @@ -845,7 +845,7 @@ module.exports = (function() { count: function(connectionName, collectionName, options, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __COUNT__, cb); + return spawnConnection('READ', connectionName, __COUNT__, cb); } else { __COUNT__(connection, cb); } @@ -876,7 +876,7 @@ module.exports = (function() { } // Run query - log('MYSQL.count: ', _query.query[0]); + log('MYSQL.count: ', connection._clusterId || '', _query.query[0]); connection.query(_query.query[0], function(err, result) { if(err) return cb(err); @@ -892,7 +892,7 @@ module.exports = (function() { stream: function(connectionName, collectionName, options, stream, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __STREAM__); + return spawnConnection('READ', connectionName, __STREAM__); } else { __STREAM__(connection); } @@ -904,21 +904,10 @@ module.exports = (function() { var tableName = collectionName; // Build find query - var schema = connectionObject.schema; - var _query; - - var sequel = new Sequel(schema, sqlOptions); - - // Build a query for the specific query strategy - try { - _query = sequel.find(collectionName, options); - } catch(e) { - return cb(e); - } - var query = _query.query[0]; + var query = sql.selectQuery(tableName, options); // Run query - log('MySQL.stream: ', query); + log('MySQL.stream: ', connection._clusterId || '', query); var dbStream = connection.query(query); @@ -951,7 +940,7 @@ module.exports = (function() { update: function(connectionName, collectionName, options, values, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __UPDATE__, cb); + return spawnConnection('WRITE', connectionName, __UPDATE__, cb); } else { __UPDATE__(connection, cb); } @@ -974,7 +963,7 @@ module.exports = (function() { return cb(e); } - log('MySQL.update(before): ', _query.query[0]); + log('MySQL.update(before): ', connection._clusterId || '', _query.query[0]); connection.query(_query.query[0], function(err, results) { if(err) return cb(err); @@ -1009,7 +998,7 @@ module.exports = (function() { } // Run query - log('MySQL.update: ', _query.query); + log('MySQL.update: ', connection._clusterId || '', _query.query); connection.query(_query.query, function(err, result) { if (err) return cb( handleQueryError(err) ); @@ -1031,7 +1020,7 @@ module.exports = (function() { } // Run query - log('MySQL.update(after): ', _query.query[0]); + log('MySQL.update(after): ', connection._clusterId || '', _query.query[0]); connection.query(_query.query[0], function(err, result) { if(err) return cb(err); @@ -1047,7 +1036,7 @@ module.exports = (function() { destroy: function(connectionName, collectionName, options, cb, connection) { if(_.isUndefined(connection)) { - return spawnConnection(connectionName, __DESTROY__, cb); + return spawnConnection('WRITE', connectionName, __DESTROY__, cb); } else { __DESTROY__(connection, cb); } @@ -1079,7 +1068,7 @@ module.exports = (function() { }, destroyRecords: ['findRecords', function(next) { - log('MySQL.destroy: ', _query.query); + log('MySQL.destroy: ', connection._clusterId || '', _query.query); connection.query(_query.query, next); }] @@ -1117,8 +1106,9 @@ module.exports = (function() { * @param {Function} fn * @param {[type]} cb */ - function spawnConnection(connectionName, fn, cb) { + function spawnConnection(queryType, connectionName, fn, cb) { _spawnConnection( + queryType, getConnectionObject(connectionName), fn, wrapCallback(cb) diff --git a/lib/connections/register.js b/lib/connections/register.js index ad45de54..2b5a5bc9 100644 --- a/lib/connections/register.js +++ b/lib/connections/register.js @@ -11,6 +11,13 @@ var utils = require('../utils'); module.exports = {}; +function inheritConfigProperties (source, dest, properties) { + properties.forEach(function (name) { + if (_.isUndefined(dest[name])) { + dest[name] = source[name]; + } + }); +} module.exports.configure = function ( connections ) { @@ -70,9 +77,35 @@ module.exports.configure = function ( connections ) { var activeConnection = connections[connection.identity]; + // Create a connection pool cluster if configured to do so. + // (and set up the necessary `releaseConnection` functionality to drain it.) + if (activeConnection.config.replication) { + activeConnection.connection.poolCluster = mysql.createPoolCluster({ + canRetry: activeConnection.config.replication.canRetry || true, + defaultSelector: activeConnection.config.replication.defaultSelector || 'RR' + }); + + activeConnection.config.replication.read = activeConnection.config.replication.read || []; + activeConnection.config.replication.write = activeConnection.config.replication.write || []; + activeConnection.config.replication.readwrite = activeConnection.config.replication.readwrite || []; + + activeConnection.config.replication.read = activeConnection.config.replication.read.concat(activeConnection.config.replication.readwrite); + activeConnection.config.replication.write = activeConnection.config.replication.write.concat(activeConnection.config.replication.readwrite); + + activeConnection.config.replication.read.forEach(function (config, index) { + inheritConfigProperties(activeConnection.config, config, ['user', 'password', 'database']); + activeConnection.connection.poolCluster.add('READ' + index, config); + }); + + activeConnection.config.replication.write.forEach(function (config, index) { + inheritConfigProperties(activeConnection.config, config, ['user', 'password', 'database']); + activeConnection.connection.poolCluster.add('WRITE' + index, config); + }); + + activeConnection.connection.releaseConnection = _releaseConnection.poolfully; // Create a connection pool if configured to do so. // (and set up the necessary `releaseConnection` functionality to drain it.) - if (activeConnection.config.pool) { + } else if (activeConnection.config.pool) { activeConnection.connection.pool = mysql.createPool(activeConnection.config); activeConnection.connection.releaseConnection = _releaseConnection.poolfully; } diff --git a/lib/connections/spawn.js b/lib/connections/spawn.js index 2632e434..200ddf99 100644 --- a/lib/connections/spawn.js +++ b/lib/connections/spawn.js @@ -20,8 +20,7 @@ var STRINGFILE = { * @param {[type]} cb__spawnConnection */ -module.exports = function spawnConnection (connectionObject, fn, cb__spawnConnection) { - +module.exports = function spawnConnection (queryType, connectionObject, fn, cb__spawnConnection) { // // TODO: // @@ -35,7 +34,17 @@ module.exports = function spawnConnection (connectionObject, fn, cb__spawnConnec // If pooling is used, grab a connection from the pool and run the // logic for the query. - if (connectionObject.connection.pool) { + if (connectionObject.connection.poolCluster) { + connectionObject.connection.poolCluster.getConnection(queryType + '*', function (err, conn) { + if (err && err.code === 'POOL_NOEXIST') { + err = new Error(queryType + ' pool does not exist.'); + err.code = 'POOL_NOEXIST'; + } + + afterwards(err, conn); + }); + return; + } else if (connectionObject.connection.pool) { connectionObject.connection.pool.getConnection(function (err, conn) { afterwards(err, conn); });