base.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. const _ = require('../utils/underscore.js');
  2. const Q = require('bluebird');
  3. const EventEmitter = require('events').EventEmitter;
  4. const log = require('../utils/logger').create('Sockets');
  5. const CONNECT_INTERVAL_MS = 1000;
  6. const CONNECT_TIMEOUT_MS = 3000;
  7. /**
  8. * Socket connecting to Ethereum Node.
  9. */
  10. class Socket extends EventEmitter {
  11. constructor(socketMgr, id) {
  12. super();
  13. this._mgr = socketMgr;
  14. this._id = id;
  15. this._log = log.create(this._id);
  16. this._state = null;
  17. }
  18. get id() {
  19. return this._id;
  20. }
  21. get isConnected() {
  22. return STATE.CONNECTED === this._state;
  23. }
  24. /**
  25. * Connect to host.
  26. * @param {Object} connectConfig
  27. * @param {Object} [options]
  28. * @param {Number} [options.timeout] Milliseconds to wait before timeout (default is 5000).
  29. * @return {Promise}
  30. */
  31. connect(connectConfig, options) {
  32. this._log.info(`Connect to ${JSON.stringify(connectConfig)}`);
  33. options = _.extend(
  34. {
  35. timeout: CONNECT_TIMEOUT_MS
  36. },
  37. options
  38. );
  39. return this._resetSocket().then(() => {
  40. let connectTimerId = null;
  41. let timeoutTimerId = null;
  42. this._log.debug('Connecting...');
  43. this._log.debug(
  44. `Will wait ${options.timeout}ms for connection to happen.`
  45. );
  46. this._state = STATE.CONNECTING;
  47. return new Q((resolve, reject) => {
  48. this._socket.once('connect', () => {
  49. if (STATE.CONNECTING === this._state) {
  50. this._log.info('Connected!');
  51. this._state = STATE.CONNECTED;
  52. clearTimeout(connectTimerId);
  53. clearTimeout(timeoutTimerId);
  54. this.emit('connect');
  55. resolve();
  56. }
  57. });
  58. this._socket.on('error', err => {
  59. if (STATE.CONNECTING === this._state) {
  60. this._log.warn(
  61. `Connection failed, retrying after ${CONNECT_INTERVAL_MS}ms...`
  62. );
  63. connectTimerId = setTimeout(() => {
  64. this._socket.connect(connectConfig);
  65. }, CONNECT_INTERVAL_MS);
  66. }
  67. });
  68. timeoutTimerId = setTimeout(() => {
  69. if (STATE.CONNECTING === this._state) {
  70. this._log.error(`Connection failed (${options.timeout}ms elapsed)`);
  71. this._state = STATE.CONNECTION_TIMEOUT;
  72. clearTimeout(connectTimerId);
  73. return reject(new Error('Unable to connect to socket: timeout'));
  74. }
  75. }, options.timeout);
  76. // initial kick-off
  77. this._socket.connect(connectConfig);
  78. });
  79. });
  80. }
  81. resume() {
  82. this._socket.resume.apply(this, arguments);
  83. }
  84. pause() {
  85. this._socket.pause.apply(this, arguments);
  86. }
  87. pipe() {
  88. this._socket.pipe.apply(this, arguments);
  89. }
  90. /**
  91. * Disconnect from socket.
  92. * @return {Promise}
  93. */
  94. disconnect(options) {
  95. if (!this._disconnectPromise) {
  96. this._disconnectPromise = new Q((resolve, reject) => {
  97. this._log.info('Disconnecting...');
  98. this._state = STATE.DISCONNECTING;
  99. // remove all existing listeners
  100. this._socket.removeAllListeners();
  101. const timer = setTimeout(() => {
  102. log.warn('Disconnection timed out, closing socket anyway...');
  103. this._state = STATE.DISCONNECTION_TIMEOUT;
  104. resolve();
  105. }, 5000 /* wait 5 seconds for disconnection */);
  106. this._socket.once('close', () => {
  107. // if we manually killed it then all good
  108. if (STATE.DISCONNECTING === this._state) {
  109. this._log.debug('Disconnected as expected');
  110. } else {
  111. this._log.warn('Unexpectedly disconnected');
  112. }
  113. this._state = STATE.DISCONNECTED;
  114. clearTimeout(timer);
  115. resolve();
  116. });
  117. this._socket.destroy();
  118. }).finally(() => {
  119. this._disconnectPromise = null;
  120. });
  121. }
  122. return this._disconnectPromise;
  123. }
  124. /**
  125. * An alias to `disconnect()`.
  126. * @return {Promise}
  127. */
  128. destroy() {
  129. this.removeAllListeners();
  130. return this.disconnect();
  131. }
  132. /**
  133. * Write data to socket.
  134. * @param {String} data
  135. */
  136. write(data) {
  137. if (STATE.CONNECTED !== this._state) {
  138. throw new Error('Socket not connected');
  139. }
  140. this._log.trace('Write data', data);
  141. this._socket.write(data);
  142. }
  143. /**
  144. * Reset socket.
  145. *
  146. * Upon completion `this._socket` will be set to a valid socket object, but
  147. * not yet connected.
  148. *
  149. * To be implemented by subclasses.
  150. */
  151. _resetSocket() {
  152. return Q.reject(new Error('Not yet implemented'));
  153. }
  154. }
  155. exports.Socket = Socket;
  156. const STATE = (exports.STATE = Socket.STATE = {
  157. CREATED: 0,
  158. CONNECTING: 1,
  159. CONNECTED: 2,
  160. DISCONNECTING: 3,
  161. DISCONNECTED: 4,
  162. ERROR: -1,
  163. DISCONNECTION_TIMEOUT: -2,
  164. CONNECTION_TIMEOUT: -3
  165. });