123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- const _ = require('../utils/underscore.js');
- const Q = require('bluebird');
- const EventEmitter = require('events').EventEmitter;
- const log = require('../utils/logger').create('Sockets');
- const CONNECT_INTERVAL_MS = 1000;
- const CONNECT_TIMEOUT_MS = 3000;
- /**
- * Socket connecting to Ethereum Node.
- */
- class Socket extends EventEmitter {
- constructor(socketMgr, id) {
- super();
- this._mgr = socketMgr;
- this._id = id;
- this._log = log.create(this._id);
- this._state = null;
- }
- get id() {
- return this._id;
- }
- get isConnected() {
- return STATE.CONNECTED === this._state;
- }
- /**
- * Connect to host.
- * @param {Object} connectConfig
- * @param {Object} [options]
- * @param {Number} [options.timeout] Milliseconds to wait before timeout (default is 5000).
- * @return {Promise}
- */
- connect(connectConfig, options) {
- this._log.info(`Connect to ${JSON.stringify(connectConfig)}`);
- options = _.extend(
- {
- timeout: CONNECT_TIMEOUT_MS
- },
- options
- );
- return this._resetSocket().then(() => {
- let connectTimerId = null;
- let timeoutTimerId = null;
- this._log.debug('Connecting...');
- this._log.debug(
- `Will wait ${options.timeout}ms for connection to happen.`
- );
- this._state = STATE.CONNECTING;
- return new Q((resolve, reject) => {
- this._socket.once('connect', () => {
- if (STATE.CONNECTING === this._state) {
- this._log.info('Connected!');
- this._state = STATE.CONNECTED;
- clearTimeout(connectTimerId);
- clearTimeout(timeoutTimerId);
- this.emit('connect');
- resolve();
- }
- });
- this._socket.on('error', err => {
- if (STATE.CONNECTING === this._state) {
- this._log.warn(
- `Connection failed, retrying after ${CONNECT_INTERVAL_MS}ms...`
- );
- connectTimerId = setTimeout(() => {
- this._socket.connect(connectConfig);
- }, CONNECT_INTERVAL_MS);
- }
- });
- timeoutTimerId = setTimeout(() => {
- if (STATE.CONNECTING === this._state) {
- this._log.error(`Connection failed (${options.timeout}ms elapsed)`);
- this._state = STATE.CONNECTION_TIMEOUT;
- clearTimeout(connectTimerId);
- return reject(new Error('Unable to connect to socket: timeout'));
- }
- }, options.timeout);
- // initial kick-off
- this._socket.connect(connectConfig);
- });
- });
- }
- resume() {
- this._socket.resume.apply(this, arguments);
- }
- pause() {
- this._socket.pause.apply(this, arguments);
- }
- pipe() {
- this._socket.pipe.apply(this, arguments);
- }
- /**
- * Disconnect from socket.
- * @return {Promise}
- */
- disconnect(options) {
- if (!this._disconnectPromise) {
- this._disconnectPromise = new Q((resolve, reject) => {
- this._log.info('Disconnecting...');
- this._state = STATE.DISCONNECTING;
- // remove all existing listeners
- this._socket.removeAllListeners();
- const timer = setTimeout(() => {
- log.warn('Disconnection timed out, closing socket anyway...');
- this._state = STATE.DISCONNECTION_TIMEOUT;
- resolve();
- }, 5000 /* wait 5 seconds for disconnection */);
- this._socket.once('close', () => {
- // if we manually killed it then all good
- if (STATE.DISCONNECTING === this._state) {
- this._log.debug('Disconnected as expected');
- } else {
- this._log.warn('Unexpectedly disconnected');
- }
- this._state = STATE.DISCONNECTED;
- clearTimeout(timer);
- resolve();
- });
- this._socket.destroy();
- }).finally(() => {
- this._disconnectPromise = null;
- });
- }
- return this._disconnectPromise;
- }
- /**
- * An alias to `disconnect()`.
- * @return {Promise}
- */
- destroy() {
- this.removeAllListeners();
- return this.disconnect();
- }
- /**
- * Write data to socket.
- * @param {String} data
- */
- write(data) {
- if (STATE.CONNECTED !== this._state) {
- throw new Error('Socket not connected');
- }
- this._log.trace('Write data', data);
- this._socket.write(data);
- }
- /**
- * Reset socket.
- *
- * Upon completion `this._socket` will be set to a valid socket object, but
- * not yet connected.
- *
- * To be implemented by subclasses.
- */
- _resetSocket() {
- return Q.reject(new Error('Not yet implemented'));
- }
- }
- exports.Socket = Socket;
- const STATE = (exports.STATE = Socket.STATE = {
- CREATED: 0,
- CONNECTING: 1,
- CONNECTED: 2,
- DISCONNECTING: 3,
- DISCONNECTED: 4,
- ERROR: -1,
- DISCONNECTION_TIMEOUT: -2,
- CONNECTION_TIMEOUT: -3
- });
|