connection.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. 'use strict'
  2. var net = require('net')
  3. var EventEmitter = require('events').EventEmitter
  4. const { parse, serialize } = require('pg-protocol')
  5. const flushBuffer = serialize.flush()
  6. const syncBuffer = serialize.sync()
  7. const endBuffer = serialize.end()
  8. // TODO(bmc) support binary mode at some point
  9. class Connection extends EventEmitter {
  10. constructor(config) {
  11. super()
  12. config = config || {}
  13. this.stream = config.stream || new net.Socket()
  14. this._keepAlive = config.keepAlive
  15. this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
  16. this.lastBuffer = false
  17. this.parsedStatements = {}
  18. this.ssl = config.ssl || false
  19. this._ending = false
  20. this._emitMessage = false
  21. var self = this
  22. this.on('newListener', function (eventName) {
  23. if (eventName === 'message') {
  24. self._emitMessage = true
  25. }
  26. })
  27. }
  28. connect(port, host) {
  29. var self = this
  30. this._connecting = true
  31. this.stream.setNoDelay(true)
  32. this.stream.connect(port, host)
  33. this.stream.once('connect', function () {
  34. if (self._keepAlive) {
  35. self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
  36. }
  37. self.emit('connect')
  38. })
  39. const reportStreamError = function (error) {
  40. // errors about disconnections should be ignored during disconnect
  41. if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
  42. return
  43. }
  44. self.emit('error', error)
  45. }
  46. this.stream.on('error', reportStreamError)
  47. this.stream.on('close', function () {
  48. self.emit('end')
  49. })
  50. if (!this.ssl) {
  51. return this.attachListeners(this.stream)
  52. }
  53. this.stream.once('data', function (buffer) {
  54. var responseCode = buffer.toString('utf8')
  55. switch (responseCode) {
  56. case 'S': // Server supports SSL connections, continue with a secure connection
  57. break
  58. case 'N': // Server does not support SSL connections
  59. self.stream.end()
  60. return self.emit('error', new Error('The server does not support SSL connections'))
  61. default:
  62. // Any other response byte, including 'E' (ErrorResponse) indicating a server error
  63. self.stream.end()
  64. return self.emit('error', new Error('There was an error establishing an SSL connection'))
  65. }
  66. var tls = require('tls')
  67. const options = {
  68. socket: self.stream,
  69. }
  70. if (self.ssl !== true) {
  71. Object.assign(options, self.ssl)
  72. if ('key' in self.ssl) {
  73. options.key = self.ssl.key
  74. }
  75. }
  76. if (net.isIP(host) === 0) {
  77. options.servername = host
  78. }
  79. try {
  80. self.stream = tls.connect(options)
  81. } catch (err) {
  82. return self.emit('error', err)
  83. }
  84. self.attachListeners(self.stream)
  85. self.stream.on('error', reportStreamError)
  86. self.emit('sslconnect')
  87. })
  88. }
  89. attachListeners(stream) {
  90. stream.on('end', () => {
  91. this.emit('end')
  92. })
  93. parse(stream, (msg) => {
  94. var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
  95. if (this._emitMessage) {
  96. this.emit('message', msg)
  97. }
  98. this.emit(eventName, msg)
  99. })
  100. }
  101. requestSsl() {
  102. this.stream.write(serialize.requestSsl())
  103. }
  104. startup(config) {
  105. this.stream.write(serialize.startup(config))
  106. }
  107. cancel(processID, secretKey) {
  108. this._send(serialize.cancel(processID, secretKey))
  109. }
  110. password(password) {
  111. this._send(serialize.password(password))
  112. }
  113. sendSASLInitialResponseMessage(mechanism, initialResponse) {
  114. this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
  115. }
  116. sendSCRAMClientFinalMessage(additionalData) {
  117. this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
  118. }
  119. _send(buffer) {
  120. if (!this.stream.writable) {
  121. return false
  122. }
  123. return this.stream.write(buffer)
  124. }
  125. query(text) {
  126. this._send(serialize.query(text))
  127. }
  128. // send parse message
  129. parse(query) {
  130. this._send(serialize.parse(query))
  131. }
  132. // send bind message
  133. bind(config) {
  134. this._send(serialize.bind(config))
  135. }
  136. // send execute message
  137. execute(config) {
  138. this._send(serialize.execute(config))
  139. }
  140. flush() {
  141. if (this.stream.writable) {
  142. this.stream.write(flushBuffer)
  143. }
  144. }
  145. sync() {
  146. this._ending = true
  147. this._send(flushBuffer)
  148. this._send(syncBuffer)
  149. }
  150. ref() {
  151. this.stream.ref()
  152. }
  153. unref() {
  154. this.stream.unref()
  155. }
  156. end() {
  157. // 0x58 = 'X'
  158. this._ending = true
  159. if (!this._connecting || !this.stream.writable) {
  160. this.stream.end()
  161. return
  162. }
  163. return this.stream.write(endBuffer, () => {
  164. this.stream.end()
  165. })
  166. }
  167. close(msg) {
  168. this._send(serialize.close(msg))
  169. }
  170. describe(msg) {
  171. this._send(serialize.describe(msg))
  172. }
  173. sendCopyFromChunk(chunk) {
  174. this._send(serialize.copyData(chunk))
  175. }
  176. endCopyFrom() {
  177. this._send(serialize.copyDone())
  178. }
  179. sendCopyFail(msg) {
  180. this._send(serialize.copyFail(msg))
  181. }
  182. }
  183. module.exports = Connection