123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- 'use strict'
- const { EventEmitter } = require('events')
- const Result = require('./result')
- const utils = require('./utils')
- class Query extends EventEmitter {
- constructor(config, values, callback) {
- super()
- config = utils.normalizeQueryConfig(config, values, callback)
- this.text = config.text
- this.values = config.values
- this.rows = config.rows
- this.types = config.types
- this.name = config.name
- this.binary = config.binary
- // use unique portal name each time
- this.portal = config.portal || ''
- this.callback = config.callback
- this._rowMode = config.rowMode
- if (process.domain && config.callback) {
- this.callback = process.domain.bind(config.callback)
- }
- this._result = new Result(this._rowMode, this.types)
- // potential for multiple results
- this._results = this._result
- this.isPreparedStatement = false
- this._canceledDueToError = false
- this._promise = null
- }
- requiresPreparation() {
- // named queries must always be prepared
- if (this.name) {
- return true
- }
- // always prepare if there are max number of rows expected per
- // portal execution
- if (this.rows) {
- return true
- }
- // don't prepare empty text queries
- if (!this.text) {
- return false
- }
- // prepare if there are values
- if (!this.values) {
- return false
- }
- return this.values.length > 0
- }
- _checkForMultirow() {
- // if we already have a result with a command property
- // then we've already executed one query in a multi-statement simple query
- // turn our results into an array of results
- if (this._result.command) {
- if (!Array.isArray(this._results)) {
- this._results = [this._result]
- }
- this._result = new Result(this._rowMode, this.types)
- this._results.push(this._result)
- }
- }
- // associates row metadata from the supplied
- // message with this query object
- // metadata used when parsing row results
- handleRowDescription(msg) {
- this._checkForMultirow()
- this._result.addFields(msg.fields)
- this._accumulateRows = this.callback || !this.listeners('row').length
- }
- handleDataRow(msg) {
- let row
- if (this._canceledDueToError) {
- return
- }
- try {
- row = this._result.parseRow(msg.fields)
- } catch (err) {
- this._canceledDueToError = err
- return
- }
- this.emit('row', row, this._result)
- if (this._accumulateRows) {
- this._result.addRow(row)
- }
- }
- handleCommandComplete(msg, connection) {
- this._checkForMultirow()
- this._result.addCommandComplete(msg)
- // need to sync after each command complete of a prepared statement
- // if we were using a row count which results in multiple calls to _getRows
- if (this.rows) {
- connection.sync()
- }
- }
- // if a named prepared statement is created with empty query text
- // the backend will send an emptyQuery message but *not* a command complete message
- // since we pipeline sync immediately after execute we don't need to do anything here
- // unless we have rows specified, in which case we did not pipeline the intial sync call
- handleEmptyQuery(connection) {
- if (this.rows) {
- connection.sync()
- }
- }
- handleError(err, connection) {
- // need to sync after error during a prepared statement
- if (this._canceledDueToError) {
- err = this._canceledDueToError
- this._canceledDueToError = false
- }
- // if callback supplied do not emit error event as uncaught error
- // events will bubble up to node process
- if (this.callback) {
- return this.callback(err)
- }
- this.emit('error', err)
- }
- handleReadyForQuery(con) {
- if (this._canceledDueToError) {
- return this.handleError(this._canceledDueToError, con)
- }
- if (this.callback) {
- this.callback(null, this._results)
- }
- this.emit('end', this._results)
- }
- submit(connection) {
- if (typeof this.text !== 'string' && typeof this.name !== 'string') {
- return new Error('A query must have either text or a name. Supplying neither is unsupported.')
- }
- const previous = connection.parsedStatements[this.name]
- if (this.text && previous && this.text !== previous) {
- return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
- }
- if (this.values && !Array.isArray(this.values)) {
- return new Error('Query values must be an array')
- }
- if (this.requiresPreparation()) {
- this.prepare(connection)
- } else {
- connection.query(this.text)
- }
- return null
- }
- hasBeenParsed(connection) {
- return this.name && connection.parsedStatements[this.name]
- }
- handlePortalSuspended(connection) {
- this._getRows(connection, this.rows)
- }
- _getRows(connection, rows) {
- connection.execute({
- portal: this.portal,
- rows: rows,
- })
- // if we're not reading pages of rows send the sync command
- // to indicate the pipeline is finished
- if (!rows) {
- connection.sync()
- } else {
- // otherwise flush the call out to read more rows
- connection.flush()
- }
- }
- // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
- prepare(connection) {
- // prepared statements need sync to be called after each command
- // complete or when an error is encountered
- this.isPreparedStatement = true
- // TODO refactor this poor encapsulation
- if (!this.hasBeenParsed(connection)) {
- connection.parse({
- text: this.text,
- name: this.name,
- types: this.types,
- })
- }
- // because we're mapping user supplied values to
- // postgres wire protocol compatible values it could
- // throw an exception, so try/catch this section
- try {
- connection.bind({
- portal: this.portal,
- statement: this.name,
- values: this.values,
- binary: this.binary,
- valueMapper: utils.prepareValue,
- })
- } catch (err) {
- this.handleError(err, connection)
- return
- }
- connection.describe({
- type: 'P',
- name: this.portal || '',
- })
- this._getRows(connection, this.rows)
- }
- handleCopyInResponse(connection) {
- connection.sendCopyFail('No source stream defined')
- }
- // eslint-disable-next-line no-unused-vars
- handleCopyData(msg, connection) {
- // noop
- }
- }
- module.exports = Query
|