123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- 'use strict'
- const EventEmitter = require('events').EventEmitter
- const NOOP = function () {}
- const removeWhere = (list, predicate) => {
- const i = list.findIndex(predicate)
- return i === -1 ? undefined : list.splice(i, 1)[0]
- }
- class IdleItem {
- constructor(client, idleListener, timeoutId) {
- this.client = client
- this.idleListener = idleListener
- this.timeoutId = timeoutId
- }
- }
- class PendingItem {
- constructor(callback) {
- this.callback = callback
- }
- }
- function throwOnDoubleRelease() {
- throw new Error('Release called on client which has already been released to the pool.')
- }
- function promisify(Promise, callback) {
- if (callback) {
- return { callback: callback, result: undefined }
- }
- let rej
- let res
- const cb = function (err, client) {
- err ? rej(err) : res(client)
- }
- const result = new Promise(function (resolve, reject) {
- res = resolve
- rej = reject
- })
- return { callback: cb, result: result }
- }
- function makeIdleListener(pool, client) {
- return function idleListener(err) {
- err.client = client
- client.removeListener('error', idleListener)
- client.on('error', () => {
- pool.log('additional client error after disconnection due to error', err)
- })
- pool._remove(client)
- // TODO - document that once the pool emits an error
- // the client has already been closed & purged and is unusable
- pool.emit('error', err, client)
- }
- }
- class Pool extends EventEmitter {
- constructor(options, Client) {
- super()
- this.options = Object.assign({}, options)
- if (options != null && 'password' in options) {
- // "hiding" the password so it doesn't show up in stack traces
- // or if the client is console.logged
- Object.defineProperty(this.options, 'password', {
- configurable: true,
- enumerable: false,
- writable: true,
- value: options.password,
- })
- }
- if (options != null && options.ssl && options.ssl.key) {
- // "hiding" the ssl->key so it doesn't show up in stack traces
- // or if the client is console.logged
- Object.defineProperty(this.options.ssl, 'key', {
- enumerable: false,
- })
- }
- this.options.max = this.options.max || this.options.poolSize || 10
- this.options.maxUses = this.options.maxUses || Infinity
- this.options.allowExitOnIdle = this.options.allowExitOnIdle || false
- this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0
- this.log = this.options.log || function () {}
- this.Client = this.options.Client || Client || require('pg').Client
- this.Promise = this.options.Promise || global.Promise
- if (typeof this.options.idleTimeoutMillis === 'undefined') {
- this.options.idleTimeoutMillis = 10000
- }
- this._clients = []
- this._idle = []
- this._expired = new WeakSet()
- this._pendingQueue = []
- this._endCallback = undefined
- this.ending = false
- this.ended = false
- }
- _isFull() {
- return this._clients.length >= this.options.max
- }
- _pulseQueue() {
- this.log('pulse queue')
- if (this.ended) {
- this.log('pulse queue ended')
- return
- }
- if (this.ending) {
- this.log('pulse queue on ending')
- if (this._idle.length) {
- this._idle.slice().map((item) => {
- this._remove(item.client)
- })
- }
- if (!this._clients.length) {
- this.ended = true
- this._endCallback()
- }
- return
- }
- // if we don't have any waiting, do nothing
- if (!this._pendingQueue.length) {
- this.log('no queued requests')
- return
- }
- // if we don't have any idle clients and we have no more room do nothing
- if (!this._idle.length && this._isFull()) {
- return
- }
- const pendingItem = this._pendingQueue.shift()
- if (this._idle.length) {
- const idleItem = this._idle.pop()
- clearTimeout(idleItem.timeoutId)
- const client = idleItem.client
- client.ref && client.ref()
- const idleListener = idleItem.idleListener
- return this._acquireClient(client, pendingItem, idleListener, false)
- }
- if (!this._isFull()) {
- return this.newClient(pendingItem)
- }
- throw new Error('unexpected condition')
- }
- _remove(client) {
- const removed = removeWhere(this._idle, (item) => item.client === client)
- if (removed !== undefined) {
- clearTimeout(removed.timeoutId)
- }
- this._clients = this._clients.filter((c) => c !== client)
- client.end()
- this.emit('remove', client)
- }
- connect(cb) {
- if (this.ending) {
- const err = new Error('Cannot use a pool after calling end on the pool')
- return cb ? cb(err) : this.Promise.reject(err)
- }
- const response = promisify(this.Promise, cb)
- const result = response.result
- // if we don't have to connect a new client, don't do so
- if (this._isFull() || this._idle.length) {
- // if we have idle clients schedule a pulse immediately
- if (this._idle.length) {
- process.nextTick(() => this._pulseQueue())
- }
- if (!this.options.connectionTimeoutMillis) {
- this._pendingQueue.push(new PendingItem(response.callback))
- return result
- }
- const queueCallback = (err, res, done) => {
- clearTimeout(tid)
- response.callback(err, res, done)
- }
- const pendingItem = new PendingItem(queueCallback)
- // set connection timeout on checking out an existing client
- const tid = setTimeout(() => {
- // remove the callback from pending waiters because
- // we're going to call it with a timeout error
- removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
- pendingItem.timedOut = true
- response.callback(new Error('timeout exceeded when trying to connect'))
- }, this.options.connectionTimeoutMillis)
- this._pendingQueue.push(pendingItem)
- return result
- }
- this.newClient(new PendingItem(response.callback))
- return result
- }
- newClient(pendingItem) {
- const client = new this.Client(this.options)
- this._clients.push(client)
- const idleListener = makeIdleListener(this, client)
- this.log('checking client timeout')
- // connection timeout logic
- let tid
- let timeoutHit = false
- if (this.options.connectionTimeoutMillis) {
- tid = setTimeout(() => {
- this.log('ending client due to timeout')
- timeoutHit = true
- // force kill the node driver, and let libpq do its teardown
- client.connection ? client.connection.stream.destroy() : client.end()
- }, this.options.connectionTimeoutMillis)
- }
- this.log('connecting new client')
- client.connect((err) => {
- if (tid) {
- clearTimeout(tid)
- }
- client.on('error', idleListener)
- if (err) {
- this.log('client failed to connect', err)
- // remove the dead client from our list of clients
- this._clients = this._clients.filter((c) => c !== client)
- if (timeoutHit) {
- err.message = 'Connection terminated due to connection timeout'
- }
- // this client won’t be released, so move on immediately
- this._pulseQueue()
- if (!pendingItem.timedOut) {
- pendingItem.callback(err, undefined, NOOP)
- }
- } else {
- this.log('new client connected')
- if (this.options.maxLifetimeSeconds !== 0) {
- setTimeout(() => {
- this.log('ending client due to expired lifetime')
- this._expired.add(client)
- const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client)
- if (idleIndex !== -1) {
- this._acquireClient(
- client,
- new PendingItem((err, client, clientRelease) => clientRelease()),
- idleListener,
- false
- )
- }
- }, this.options.maxLifetimeSeconds * 1000)
- }
- return this._acquireClient(client, pendingItem, idleListener, true)
- }
- })
- }
- // acquire a client for a pending work item
- _acquireClient(client, pendingItem, idleListener, isNew) {
- if (isNew) {
- this.emit('connect', client)
- }
- this.emit('acquire', client)
- client.release = this._releaseOnce(client, idleListener)
- client.removeListener('error', idleListener)
- if (!pendingItem.timedOut) {
- if (isNew && this.options.verify) {
- this.options.verify(client, (err) => {
- if (err) {
- client.release(err)
- return pendingItem.callback(err, undefined, NOOP)
- }
- pendingItem.callback(undefined, client, client.release)
- })
- } else {
- pendingItem.callback(undefined, client, client.release)
- }
- } else {
- if (isNew && this.options.verify) {
- this.options.verify(client, client.release)
- } else {
- client.release()
- }
- }
- }
- // returns a function that wraps _release and throws if called more than once
- _releaseOnce(client, idleListener) {
- let released = false
- return (err) => {
- if (released) {
- throwOnDoubleRelease()
- }
- released = true
- this._release(client, idleListener, err)
- }
- }
- // release a client back to the poll, include an error
- // to remove it from the pool
- _release(client, idleListener, err) {
- client.on('error', idleListener)
- client._poolUseCount = (client._poolUseCount || 0) + 1
- // TODO(bmc): expose a proper, public interface _queryable and _ending
- if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
- if (client._poolUseCount >= this.options.maxUses) {
- this.log('remove expended client')
- }
- this._remove(client)
- this._pulseQueue()
- return
- }
- const isExpired = this._expired.has(client)
- if (isExpired) {
- this.log('remove expired client')
- this._expired.delete(client)
- this._remove(client)
- this._pulseQueue()
- return
- }
- // idle timeout
- let tid
- if (this.options.idleTimeoutMillis) {
- tid = setTimeout(() => {
- this.log('remove idle client')
- this._remove(client)
- }, this.options.idleTimeoutMillis)
- if (this.options.allowExitOnIdle) {
- // allow Node to exit if this is all that's left
- tid.unref()
- }
- }
- if (this.options.allowExitOnIdle) {
- client.unref()
- }
- this._idle.push(new IdleItem(client, idleListener, tid))
- this._pulseQueue()
- }
- query(text, values, cb) {
- // guard clause against passing a function as the first parameter
- if (typeof text === 'function') {
- const response = promisify(this.Promise, text)
- setImmediate(function () {
- return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported'))
- })
- return response.result
- }
- // allow plain text query without values
- if (typeof values === 'function') {
- cb = values
- values = undefined
- }
- const response = promisify(this.Promise, cb)
- cb = response.callback
- this.connect((err, client) => {
- if (err) {
- return cb(err)
- }
- let clientReleased = false
- const onError = (err) => {
- if (clientReleased) {
- return
- }
- clientReleased = true
- client.release(err)
- cb(err)
- }
- client.once('error', onError)
- this.log('dispatching query')
- client.query(text, values, (err, res) => {
- this.log('query dispatched')
- client.removeListener('error', onError)
- if (clientReleased) {
- return
- }
- clientReleased = true
- client.release(err)
- if (err) {
- return cb(err)
- } else {
- return cb(undefined, res)
- }
- })
- })
- return response.result
- }
- end(cb) {
- this.log('ending')
- if (this.ending) {
- const err = new Error('Called end on pool more than once')
- return cb ? cb(err) : this.Promise.reject(err)
- }
- this.ending = true
- const promised = promisify(this.Promise, cb)
- this._endCallback = promised.callback
- this._pulseQueue()
- return promised.result
- }
- get waitingCount() {
- return this._pendingQueue.length
- }
- get idleCount() {
- return this._idle.length
- }
- get expiredCount() {
- return this._clients.reduce((acc, client) => acc + (this._expired.has(client) ? 1 : 0), 0)
- }
- get totalCount() {
- return this._clients.length
- }
- }
- module.exports = Pool
|