Query.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. var Sequence = require('./Sequence');
  2. var Util = require('util');
  3. var Packets = require('../packets');
  4. var ResultSet = require('../ResultSet');
  5. var ServerStatus = require('../constants/server_status');
  6. var fs = require('fs');
  7. var Readable = require('readable-stream');
  8. module.exports = Query;
  9. Util.inherits(Query, Sequence);
  10. function Query(options, callback) {
  11. Sequence.call(this, options, callback);
  12. this.sql = options.sql;
  13. this.values = options.values;
  14. this.typeCast = (options.typeCast === undefined)
  15. ? true
  16. : options.typeCast;
  17. this.nestTables = options.nestTables || false;
  18. this._resultSet = null;
  19. this._results = [];
  20. this._fields = [];
  21. this._index = 0;
  22. this._loadError = null;
  23. }
  24. Query.prototype.start = function() {
  25. this.emit('packet', new Packets.ComQueryPacket(this.sql));
  26. };
  27. Query.prototype.determinePacket = function determinePacket(byte, parser) {
  28. var resultSet = this._resultSet;
  29. if (!resultSet) {
  30. switch (byte) {
  31. case 0x00: return Packets.OkPacket;
  32. case 0xff: return Packets.ErrorPacket;
  33. default: return Packets.ResultSetHeaderPacket;
  34. }
  35. }
  36. if (resultSet.eofPackets.length === 0) {
  37. return (resultSet.fieldPackets.length < resultSet.resultSetHeaderPacket.fieldCount)
  38. ? Packets.FieldPacket
  39. : Packets.EofPacket;
  40. }
  41. if (byte === 0xff) {
  42. return Packets.ErrorPacket;
  43. }
  44. if (byte === 0xfe && parser.packetLength() < 9) {
  45. return Packets.EofPacket;
  46. }
  47. return Packets.RowDataPacket;
  48. };
  49. Query.prototype['OkPacket'] = function(packet) {
  50. // try...finally for exception safety
  51. try {
  52. if (!this._callback) {
  53. this.emit('result', packet, this._index);
  54. } else {
  55. this._results.push(packet);
  56. this._fields.push(undefined);
  57. }
  58. } finally {
  59. this._index++;
  60. this._resultSet = null;
  61. this._handleFinalResultPacket(packet);
  62. }
  63. };
  64. Query.prototype['ErrorPacket'] = function(packet) {
  65. var err = this._packetToError(packet);
  66. var results = (this._results.length > 0)
  67. ? this._results
  68. : undefined;
  69. var fields = (this._fields.length > 0)
  70. ? this._fields
  71. : undefined;
  72. err.index = this._index;
  73. err.sql = this.sql;
  74. this.end(err, results, fields);
  75. };
  76. Query.prototype['ResultSetHeaderPacket'] = function(packet) {
  77. if (packet.fieldCount === null) {
  78. this._sendLocalDataFile(packet.extra);
  79. } else {
  80. this._resultSet = new ResultSet(packet);
  81. }
  82. };
  83. Query.prototype['FieldPacket'] = function(packet) {
  84. this._resultSet.fieldPackets.push(packet);
  85. };
  86. Query.prototype['EofPacket'] = function(packet) {
  87. this._resultSet.eofPackets.push(packet);
  88. if (this._resultSet.eofPackets.length === 1 && !this._callback) {
  89. this.emit('fields', this._resultSet.fieldPackets, this._index);
  90. }
  91. if (this._resultSet.eofPackets.length !== 2) {
  92. return;
  93. }
  94. if (this._callback) {
  95. this._results.push(this._resultSet.rows);
  96. this._fields.push(this._resultSet.fieldPackets);
  97. }
  98. this._index++;
  99. this._resultSet = null;
  100. this._handleFinalResultPacket(packet);
  101. };
  102. Query.prototype._handleFinalResultPacket = function(packet) {
  103. if (packet.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  104. return;
  105. }
  106. var results = (this._results.length > 1)
  107. ? this._results
  108. : this._results[0];
  109. var fields = (this._fields.length > 1)
  110. ? this._fields
  111. : this._fields[0];
  112. this.end(this._loadError, results, fields);
  113. };
  114. Query.prototype['RowDataPacket'] = function(packet, parser, connection) {
  115. packet.parse(parser, this._resultSet.fieldPackets, this.typeCast, this.nestTables, connection);
  116. if (this._callback) {
  117. this._resultSet.rows.push(packet);
  118. } else {
  119. this.emit('result', packet, this._index);
  120. }
  121. };
  122. Query.prototype._sendLocalDataFile = function(path) {
  123. var self = this;
  124. var localStream = fs.createReadStream(path, {
  125. flag : 'r',
  126. encoding : null,
  127. autoClose : true
  128. });
  129. this.on('pause', function () {
  130. localStream.pause();
  131. });
  132. this.on('resume', function () {
  133. localStream.resume();
  134. });
  135. localStream.on('data', function (data) {
  136. self.emit('packet', new Packets.LocalDataFilePacket(data));
  137. });
  138. localStream.on('error', function (err) {
  139. self._loadError = err;
  140. localStream.emit('end');
  141. });
  142. localStream.on('end', function () {
  143. self.emit('packet', new Packets.EmptyPacket());
  144. });
  145. };
  146. Query.prototype.stream = function(options) {
  147. var self = this;
  148. options = options || {};
  149. options.objectMode = true;
  150. var stream = new Readable(options);
  151. stream._read = function() {
  152. self._connection && self._connection.resume();
  153. };
  154. stream.once('end', function() {
  155. process.nextTick(function () {
  156. stream.emit('close');
  157. });
  158. });
  159. this.on('result', function(row, i) {
  160. if (!stream.push(row)) self._connection.pause();
  161. stream.emit('result', row, i); // replicate old emitter
  162. });
  163. this.on('error', function(err) {
  164. stream.emit('error', err); // Pass on any errors
  165. });
  166. this.on('end', function() {
  167. stream.push(null); // pushing null, indicating EOF
  168. });
  169. this.on('fields', function(fields, i) {
  170. stream.emit('fields', fields, i); // replicate old emitter
  171. });
  172. return stream;
  173. };