index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. var _ = require('lodash');
  2. module.exports = dbPool;
  3. var dbPool = function(opts) {
  4. var defaults = {
  5. globalMax: 0,
  6. schemaMax: 0,
  7. schemaMin: 1,
  8. prefer: 'new', // or 'convert' to change schema of idle connections
  9. lazy: true, // don't create until needed, don't close until forced
  10. connectTimeout: 15000,
  11. // user supplied values
  12. dbCreds: {},
  13. db: null,
  14. };
  15. this.opts = _.extend({}, defaults, opts);
  16. this.connections = Object.create(null);
  17. this.freeConnections = Object.create(null);
  18. this.queue = [];
  19. this.ready = false;
  20. this.totalConnections = 0;
  21. };
  22. // needed:
  23. // fetchAll
  24. // _.groupBy type functionality
  25. // fancy transactions
  26. // cursors
  27. // create/find/get info on tables
  28. // multi-host support
  29. // maybe streaming support
  30. // this doesn't check the limits. it does what it's told.
  31. dbPool.prototype.newConnection = function(schema, cb) {
  32. var that = this;
  33. var pool = this.connections[schema];
  34. if(!pool) pool = this.connections[schema] = [];
  35. var dbConn = new this.db(this.opts.dbCreds);
  36. dbConn.connect(schema, withConnection); // this wrapper needs to set the schema
  37. function withConnection(err, dbConn) {
  38. if(err) return process.nextTick(function() { cb(err) });
  39. var conn = {
  40. schema: schema,
  41. conn: dbConn,
  42. inUse: false,
  43. error: null,
  44. };
  45. // listen for strange events.
  46. dbConn.onError(function(err) {
  47. conn.ready = err;
  48. conn.dbConn.release();
  49. });
  50. dbConn.onRelease(function(err) {
  51. that.removeConnection(conn);
  52. });
  53. pool.push(conn);
  54. that.totalConnections++;
  55. process.nextTick(function() { cb(null, conn); }
  56. };
  57. };
  58. // purges a connection object from the pools
  59. dbPool.prototype.removeConnection = function(conn) {
  60. this.connections[conn.schema] = _.filter(
  61. this.connections[conn.schema],
  62. function(q) { return q != conn }
  63. );
  64. this.totalConnections--;
  65. };
  66. // prolly need some fancy variadic argument support for escaping values
  67. dbPool.prototype.query = function(schema, query, cb) {
  68. var that = this;
  69. // run or queue query
  70. getConnection(function(err, conn) {
  71. if(err) return cb(err);
  72. if(!conn) return that.queueQuery(schema, query, cb)
  73. // we have a connection, actually run it
  74. that.runQuery(conn, query, cb);
  75. });
  76. };
  77. dbPool.prototype.setSchema = function(conn, schema, cb) {
  78. var that = this;
  79. conn.conn.setSchema(schema, function(err) {
  80. if(err) return process.nextTick(function() { cb(err) });
  81. // now make sure to move the connection to the new pool
  82. that.connections[conn.schema] = _.filter(that.connections[conn.schema], function(q) { return q != conn });
  83. var pool = that.connections[schema];
  84. if(!pool) pool = that.connections[schema] = [];
  85. conn.schema = schema;
  86. pool.push(conn);
  87. process.nextTick(null, conn);
  88. });
  89. };
  90. // fetches a connection from the pool and marks it in use
  91. dbPool.prototype.getConnection = function(schema, cb) {
  92. // try to use an idle connection
  93. if(this.connections[schema]) {
  94. var conn = _.first(this.connections[schema], function(q) {
  95. return q.connected && !q.inUse;
  96. });
  97. // we got one
  98. if(conn) {
  99. conn.inUse = true;
  100. return cb(null, conn);
  101. }
  102. }
  103. // try to make a new connection
  104. // check max connections
  105. if(this.totalConnections >= this.opts.globalMax)
  106. return cb(null, null);
  107. // messy code
  108. var pool = this.connections[schema];
  109. if(!pool) return this.newConnection(schema, cb);
  110. if(pool.length >= this.opts.schemaMax)
  111. return cb(null, null);
  112. this.newConnection(schema, cb);
  113. };
  114. // underlying raw function. not generally to be called externally
  115. dbPool.prototype.runQuery = function(conn, query, cb) {
  116. var sql = this.processQuery(query, conn);
  117. if(sql == null) return cb('invalid query');
  118. conn.conn.query(sql, function(err, rows) {
  119. conn.inUse = false;
  120. cb(err, rows);
  121. });
  122. };
  123. // replaces named parameters
  124. dbPool.prototype.processQuery = function(query, conn) {
  125. if(typeof query == 'string') return query;
  126. if(typeof query == 'function') return query();
  127. // something wrong has happened...
  128. if(typeof query != 'object') return null;
  129. var raw = query instanceof Array ? query[0] : query.sql;
  130. var args = query instanceof Array ? query[1] : query.args;
  131. // exctract all named parameters
  132. //var matches = raw.match(/\?(:?\w+)\b/g);
  133. // slow crappy way
  134. for(var i = 0; i < args.length; i++) {
  135. var key = args[i];
  136. raw.replace("/\\?" + key + "\\b/g", conn.escape(args[key]));
  137. raw.replace("/\\?:" + key + "\\b/g", conn.escapeID(args[key]));
  138. raw.replace("/\\?!" + key + "\\b/g", args[key]);
  139. }
  140. return raw;
  141. }
  142. // checks the queue and rties to run pending items
  143. dbPool.prototype.processQueue = function(cb) {
  144. while(getConnection) {
  145. ....
  146. }
  147. }
  148. dbPool.prototype.waitForConnection = function(schema, cb) {
  149. };