123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- 'use strict'
- var net = require('net')
- var EventEmitter = require('events').EventEmitter
- const { parse, serialize } = require('pg-protocol')
- const flushBuffer = serialize.flush()
- const syncBuffer = serialize.sync()
- const endBuffer = serialize.end()
- // TODO(bmc) support binary mode at some point
- class Connection extends EventEmitter {
- constructor(config) {
- super()
- config = config || {}
- this.stream = config.stream || new net.Socket()
- this._keepAlive = config.keepAlive
- this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
- this.lastBuffer = false
- this.parsedStatements = {}
- this.ssl = config.ssl || false
- this._ending = false
- this._emitMessage = false
- var self = this
- this.on('newListener', function (eventName) {
- if (eventName === 'message') {
- self._emitMessage = true
- }
- })
- }
- connect(port, host) {
- var self = this
- this._connecting = true
- this.stream.setNoDelay(true)
- this.stream.connect(port, host)
- this.stream.once('connect', function () {
- if (self._keepAlive) {
- self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
- }
- self.emit('connect')
- })
- const reportStreamError = function (error) {
- // errors about disconnections should be ignored during disconnect
- if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
- return
- }
- self.emit('error', error)
- }
- this.stream.on('error', reportStreamError)
- this.stream.on('close', function () {
- self.emit('end')
- })
- if (!this.ssl) {
- return this.attachListeners(this.stream)
- }
- this.stream.once('data', function (buffer) {
- var responseCode = buffer.toString('utf8')
- switch (responseCode) {
- case 'S': // Server supports SSL connections, continue with a secure connection
- break
- case 'N': // Server does not support SSL connections
- self.stream.end()
- return self.emit('error', new Error('The server does not support SSL connections'))
- default:
- // Any other response byte, including 'E' (ErrorResponse) indicating a server error
- self.stream.end()
- return self.emit('error', new Error('There was an error establishing an SSL connection'))
- }
- var tls = require('tls')
- const options = {
- socket: self.stream,
- }
- if (self.ssl !== true) {
- Object.assign(options, self.ssl)
- if ('key' in self.ssl) {
- options.key = self.ssl.key
- }
- }
- if (net.isIP(host) === 0) {
- options.servername = host
- }
- try {
- self.stream = tls.connect(options)
- } catch (err) {
- return self.emit('error', err)
- }
- self.attachListeners(self.stream)
- self.stream.on('error', reportStreamError)
- self.emit('sslconnect')
- })
- }
- attachListeners(stream) {
- stream.on('end', () => {
- this.emit('end')
- })
- parse(stream, (msg) => {
- var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
- if (this._emitMessage) {
- this.emit('message', msg)
- }
- this.emit(eventName, msg)
- })
- }
- requestSsl() {
- this.stream.write(serialize.requestSsl())
- }
- startup(config) {
- this.stream.write(serialize.startup(config))
- }
- cancel(processID, secretKey) {
- this._send(serialize.cancel(processID, secretKey))
- }
- password(password) {
- this._send(serialize.password(password))
- }
- sendSASLInitialResponseMessage(mechanism, initialResponse) {
- this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
- }
- sendSCRAMClientFinalMessage(additionalData) {
- this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
- }
- _send(buffer) {
- if (!this.stream.writable) {
- return false
- }
- return this.stream.write(buffer)
- }
- query(text) {
- this._send(serialize.query(text))
- }
- // send parse message
- parse(query) {
- this._send(serialize.parse(query))
- }
- // send bind message
- bind(config) {
- this._send(serialize.bind(config))
- }
- // send execute message
- execute(config) {
- this._send(serialize.execute(config))
- }
- flush() {
- if (this.stream.writable) {
- this.stream.write(flushBuffer)
- }
- }
- sync() {
- this._ending = true
- this._send(flushBuffer)
- this._send(syncBuffer)
- }
- ref() {
- this.stream.ref()
- }
- unref() {
- this.stream.unref()
- }
- end() {
- // 0x58 = 'X'
- this._ending = true
- if (!this._connecting || !this.stream.writable) {
- this.stream.end()
- return
- }
- return this.stream.write(endBuffer, () => {
- this.stream.end()
- })
- }
- close(msg) {
- this._send(serialize.close(msg))
- }
- describe(msg) {
- this._send(serialize.describe(msg))
- }
- sendCopyFromChunk(chunk) {
- this._send(serialize.copyData(chunk))
- }
- endCopyFrom() {
- this._send(serialize.copyDone())
- }
- sendCopyFail(msg) {
- this._send(serialize.copyFail(msg))
- }
- }
- module.exports = Connection
|