123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- /**
- The IPC provider backend filter and tunnel all incoming request to the ethereum node.
- @module ipcProviderBackend
- */
- const _ = require('../utils/underscore.js');
- const Q = require('bluebird');
- const { ipcMain: ipc } = require('electron');
- const fs = require('fs');
- const path = require('path');
- const log = require('../utils/logger').create('ipcProviderBackend');
- const Sockets = require('../socketManager');
- const Settings = require('../settings');
- const ethereumNode = require('../ethereumNode');
- const ethereumNodeRemote = require('../ethereumNodeRemote');
- const ERRORS = {
- INVALID_PAYLOAD: {
- code: -32600,
- message:
- "Payload, or some of its content properties are invalid. Please check if they are valid HEX with '0x' prefix."
- },
- METHOD_DENIED: { code: -32601, message: 'Method __method__ not allowed.' },
- METHOD_TIMEOUT: {
- code: -32603,
- message: 'Request timed out for method __method__.'
- },
- TX_DENIED: { code: -32603, message: 'Transaction denied' },
- BATCH_TX_DENIED: {
- code: -32603,
- message:
- 'Transactions denied, sendTransaction is not allowed in batch requests.'
- },
- BATCH_COMPILE_DENIED: {
- code: -32603,
- message:
- 'Compilation denied, compileSolidity is not allowed in batch requests.'
- },
- SOCKET_NOT_CONNECTED: {
- code: -32604,
- message: 'Socket not connected when trying to send method __method__.'
- }
- };
- /**
- * IPC provider backend.
- */
- class IpcProviderBackend {
- constructor() {
- this._connections = {};
- this.ERRORS = ERRORS;
- ethereumNode.on('state', _.bind(this._onNodeStateChanged, this));
- ipc.on('ipcProvider-create', _.bind(this._getOrCreateConnection, this));
- ipc.on('ipcProvider-destroy', _.bind(this._destroyConnection, this));
- ipc.on('ipcProvider-write', _.bind(this._sendRequest, this, false));
- ipc.on('ipcProvider-writeSync', _.bind(this._sendRequest, this, true));
- this._connectionPromise = {};
- // dynamically load in method processors
- const processors = fs.readdirSync(path.join(__dirname, 'methods'));
- // get response processors
- this._processors = {};
- processors.forEach(p => {
- const name = path.basename(p, '.js');
- const PClass = require(path.join(__dirname, 'methods', p));
- this._processors[name] = new PClass(name, this);
- });
- log.trace('Loaded processors', _.keys(this._processors));
- store.dispatch({ type: '[MAIN]:IPC_PROVIDER_BACKEND:INIT' });
- // Store remote subscriptions.
- // > key: local subscription id
- // > value: remote subscription id
- this._remoteSubscriptions = {};
- // Store subscription owners for sending remote results
- // > key: local subscription id
- // > value: connection ownerId
- this._subscriptionOwners = {};
- }
- /**
- * Get/create new connection to node.
- * @return {Promise}
- */
- _getOrCreateConnection(event) {
- const owner = event.sender;
- const ownerId = owner.id;
- let socket;
- return Q.try(() => {
- // already got?
- if (this._connections[ownerId]) {
- socket = this._connections[ownerId].socket;
- } else {
- log.debug(`Create new socket connection, id=${ownerId}`);
- socket = Sockets.get(ownerId, Settings.rpcMode);
- }
- })
- .then(() => {
- if (!this._connections[ownerId]) {
- // save to collection
- this._connections[ownerId] = {
- id: ownerId,
- owner,
- socket
- };
- // if something goes wrong destroy the socket
- ['error', 'timeout', 'end'].forEach(ev => {
- socket.on(ev, data => {
- log.debug(
- `Destroy socket connection due to event: ${ev}, id=${ownerId}`
- );
- socket.destroy().finally(() => {
- if (!owner.isDestroyed()) {
- owner.send(`ipcProvider-${ev}`, JSON.stringify(data));
- }
- });
- delete this._connections[ownerId];
- Sockets.remove(ownerId);
- });
- });
- socket.on('connect', data => {
- if (!owner.isDestroyed()) {
- owner.send('ipcProvider-connect', JSON.stringify(data));
- }
- });
- // pass notifications back up the chain
- socket.on('data-notification', data => {
- log.trace('Notification received', ownerId, data);
- if (data.error) {
- data = this._makeErrorResponsePayload(data, data);
- } else {
- data = this._makeResponsePayload(data, data);
- }
- if (!owner.isDestroyed()) {
- owner.send('ipcProvider-data', JSON.stringify(data));
- }
- });
- }
- })
- .then(() => {
- if (!socket.isConnected) {
- // since we may enter this function multiple times for the same
- // event source's IPC we don't want to repeat the connection
- // process each time - so let's track things in a promise
- if (!this._connectionPromise[ownerId]) {
- this._connectionPromise[ownerId] = Q.try(() => {
- log.debug(`Connecting socket ${ownerId}`);
- // wait for node to connect first.
- if (ethereumNode.state !== ethereumNode.STATES.CONNECTED) {
- return new Q((resolve, reject) => {
- const onStateChange = newState => {
- if (ethereumNode.STATES.CONNECTED === newState) {
- ethereumNode.removeListener('state', onStateChange);
- log.debug(
- `Ethereum node connected, resume connecting socket ${ownerId}`
- );
- resolve();
- }
- };
- ethereumNode.on('state', onStateChange);
- });
- }
- })
- .then(() => {
- return socket.connect(
- Settings.rpcConnectConfig,
- {
- timeout: 5000
- }
- );
- })
- .then(() => {
- log.debug(`Socket connected, id=${ownerId}`);
- })
- .finally(() => {
- delete this._connectionPromise[ownerId];
- });
- }
- return this._connectionPromise[ownerId];
- }
- })
- .then(() => {
- if (!owner.isDestroyed()) {
- owner.send('ipcProvider-setWritable', true);
- }
- return this._connections[ownerId];
- });
- }
- /**
- * Handle IPC call to destroy a connection.
- */
- _destroyConnection(event) {
- const ownerId = event.sender.id;
- if (this._connections[ownerId]) {
- log.debug('Destroy socket connection', ownerId);
- this._connections[ownerId].owner.send('ipcProvider-setWritable', false);
- this._connections[ownerId].socket.destroy();
- delete this._connections[ownerId];
- Sockets.remove(ownerId);
- }
- }
- /**
- * Handler for when Ethereum node state changes.
- *
- * Auto-reconnect sockets when ethereum node state changes
- *
- * @param {String} state The new state.
- */
- _onNodeStateChanged(state) {
- switch (
- state // eslint-disable-line default-case
- ) {
- // stop syncing when node about to be stopped
- case ethereumNode.STATES.STOPPING:
- log.info('Ethereum node stopping, disconnecting sockets');
- // Unsubscribe remote subscriptions
- _.each(this._remoteSubscriptions, remoteSubscriptionId => {
- ethereumNodeRemote.send('eth_unsubscribe', [remoteSubscriptionId]);
- });
- this._remoteSubscriptions = {};
- this._subscriptionOwners = {};
- Q.all(
- _.map(this._connections, item => {
- if (item.socket.isConnected) {
- return item.socket.disconnect().then(() => {
- log.debug(
- `Tell owner ${item.id} that socket is not currently writeable`
- );
- item.owner.send('ipcProvider-setWritable', false);
- });
- }
- return Q.resolve();
- })
- ).catch(err => {
- log.error('Error disconnecting sockets', err);
- });
- break;
- }
- }
- /**
- * Handle IPC call to send a request.
- * @param {Boolean} isSync whether request is sync.
- * @param {Object} event IPC event.
- * @param {String} payload request payload.
- * @param {Boolean} retry whether trying request again due to error
- */
- _sendRequest(isSync, event, payload, retry = false) {
- const ownerId = event.sender.id;
- log.trace('sendRequest', isSync ? 'sync' : 'async', ownerId, payload);
- const originalPayloadStr = payload;
- return Q.try(() => {
- // overwrite playload var with parsed version
- payload = JSON.parse(originalPayloadStr);
- return this._getOrCreateConnection(event);
- })
- .then(conn => {
- // reparse original string (so that we don't modify input payload)
- const finalPayload = JSON.parse(originalPayloadStr);
- // is batch?
- const isBatch = _.isArray(finalPayload);
- const finalPayloadList = isBatch ? finalPayload : [finalPayload];
- if (!conn.socket.isConnected) {
- const error = Object.assign({}, this.ERRORS.SOCKET_NOT_CONNECTED, {
- message: this.ERRORS.SOCKET_NOT_CONNECTED.message.replace(
- '__method__',
- finalPayloadList.map(p => p.method).join(', ')
- )
- });
- // Try again if not already a retry
- if (!retry) {
- error.message += ' Will retry...';
- setTimeout(() => {
- this._sendRequest(isSync, event, originalPayloadStr, true);
- }, 2000);
- }
- log.debug(error);
- throw error;
- }
- // sanitize each and every request payload
- _.each(finalPayloadList, p => {
- const processor = this._processors[p.method]
- ? this._processors[p.method]
- : this._processors.base;
- processor.sanitizeRequestPayload(conn, p, isBatch);
- });
- // if a single payload and has an error then throw it
- if (!isBatch && finalPayload.error) {
- throw finalPayload.error;
- }
- // get non-error payloads
- const nonErrorPayloads = _.filter(finalPayloadList, p => !p.error);
- // execute non-error payloads
- return Q.try(() => {
- if (nonErrorPayloads.length) {
- // if single payload check if we have special processor for it
- // if not then use base generic processor
- const processor = this._processors[finalPayload.method]
- ? this._processors[finalPayload.method]
- : this._processors.base;
- return processor.exec(
- conn,
- isBatch ? nonErrorPayloads : nonErrorPayloads[0]
- );
- } else {
- return [];
- }
- }).then(ret => {
- log.trace('Got result', ret);
- let finalResult = [];
- // collate results
- _.each(finalPayloadList, p => {
- if (p.error) {
- finalResult.push(p);
- } else {
- p = _.extend({}, p, isBatch ? ret.shift() : ret);
- const processor = this._processors[p.method]
- ? this._processors[p.method]
- : this._processors.base;
- // sanitize response payload
- processor.sanitizeResponsePayload(conn, p, isBatch);
- // if subscription, save connection ownerId for sending responses later
- if (p.method === 'eth_subscribe') {
- const subscriptionId = p.result;
- if (subscriptionId) {
- this._subscriptionOwners[subscriptionId] = ownerId;
- }
- }
- finalResult.push(p);
- }
- });
- // extract single payload result
- if (!isBatch) {
- finalResult = finalResult.pop();
- // check if it's an error
- if (finalResult.error) {
- throw finalResult.error;
- }
- }
- return finalResult;
- });
- })
- .then(result => {
- log.trace('Got result', result);
- return this._makeResponsePayload(payload, result);
- })
- .catch(err => {
- log.error('Send request failed', err);
- err = this._makeErrorResponsePayload(payload || {}, {
- message: typeof err === 'string' ? err : err.message,
- code: err.code
- });
- return err;
- })
- .then(returnValue => {
- returnValue = JSON.stringify(returnValue);
- log.trace('Return', ownerId, returnValue);
- if (isSync) {
- event.returnValue = returnValue;
- } else if (!event.sender.isDestroyed()) {
- event.sender.send('ipcProvider-data', returnValue);
- }
- });
- }
- /**
- Sanitize a single or batch request payload.
- This will modify the passed-in payload.
- @param {Object} conn The connection.
- @param {Object|Array} payload The request payload.
- */
- _sanitizeRequestPayload(conn, payload) {
- if (_.isArray(payload)) {
- _.each(payload, p => {
- if (p.method === 'eth_sendTransaction') {
- p.error = ERRORS.BATCH_TX_DENIED;
- } else {
- this._processors.base.sanitizePayload(conn, p);
- }
- });
- } else {
- this._processors.base.sanitizePayload(conn, payload);
- }
- }
- /**
- Make an error response payload
- @param {Object|Array} originalPayload Original payload
- @param {Object} error Error result
- */
- _makeErrorResponsePayload(originalPayload, error) {
- const e = [].concat(originalPayload).map(item => {
- const e = _.extend(
- {
- jsonrpc: '2.0'
- },
- error
- );
- if (e.message) {
- if (_.isArray(e.message)) {
- e.message = e.message.pop();
- }
- e.error = {
- code: e.code,
- message: e.message.replace(/'[a-z_]*'/i, `'${item.method}'`)
- };
- delete e.code;
- delete e.message;
- }
- // delete stuff leftover from request
- delete e.params;
- delete e.method;
- e.id = item.id;
- return e;
- });
- return _.isArray(originalPayload) ? e : e[0];
- }
- /**
- Make a response payload.
- @param {Object|Array} originalPayload Original payload
- @param {Object|Array} value Response results.
- @method makeReturnValue
- */
- _makeResponsePayload(originalPayload, value) {
- const finalValue = _.isArray(originalPayload) ? value : [value];
- const allResults = [].concat(originalPayload).map((item, idx) => {
- const finalResult = finalValue[idx];
- let ret;
- // handle error result
- if (finalResult.error) {
- ret = this._makeErrorResponsePayload(item, finalResult.error);
- } else {
- ret = _.extend({}, item, {
- result: finalResult.result
- });
- }
- ret = this._handleSubscriptions(ret);
- if (ret) {
- if (item.id) {
- delete ret.params;
- delete ret.method;
- }
- ret.jsonrpc = '2.0';
- }
- return ret;
- });
- return _.isArray(originalPayload) ? allResults : allResults[0];
- }
- /**
- Handles eth_subscribe|eth_subscription|eth_unsubscribe to:
- 1. Create a remote subscription when created on local (except syncing subscriptions)
- 2. Send remote subscription result if on remote (with remote subscription id swapped for local id)
- 3. Ignore local subscription result if on remote
- 4. Unsubscribe remote subscription when unsubscribing local subscription
- @param {Object} result
- @return {Object} result
- */
- _handleSubscriptions(result) {
- if (result.method === 'eth_subscribe') {
- // If subscription is created in local, also create the subscription in remote
- const subscriptionType = result.params[0];
- const subscriptionId = result.result;
- // Create subscription in remote node
- this._remoteSubscriptions[subscriptionId] = this._subscribeRemote(
- subscriptionId,
- result.params
- );
- }
- if (result.method === 'eth_subscription') {
- // Skip if syncing result
- if (result.params.result.syncing) {
- return result;
- }
- // If remote node is active, cancel propogating response
- // since we'll return the remote response instead
- if (store.getState().nodes.active === 'remote') {
- log.trace(
- `Ignoring local subscription result (remote node is active). subscription id: ${
- result.params.subscription
- }`
- );
- return null;
- } else {
- log.trace(
- `Sending local subscription result (local node is active). subscription id: ${
- result.params.subscription
- }`
- );
- return result;
- }
- }
- if (result.method === 'eth_unsubscribe') {
- const subscriptionId = result.params[0];
- const localSubscriptionId = this._remoteSubscriptions[subscriptionId];
- if (localSubscriptionId) {
- ethereumNodeRemote.send('eth_unsubscribe', [localSubscriptionId]);
- delete this._remoteSubscriptions[subscriptionId];
- delete this._subscriptionOwners[subscriptionId];
- }
- }
- return result;
- }
- /**
- Creates a subscription in remote node and
- sends results down the pipe if remote node is active
- @param {Object} params - Subscription params
- @param {Boolean} retry - Is this request a retry
- */
- _subscribeRemote(localSubscriptionId, params, retry = false) {
- return new Promise(async (resolve, reject) => {
- log.trace(
- `Creating remote subscription: ${params} (local subscription id: ${localSubscriptionId})`
- );
- var remoteSubscriptionId;
- const requestId = await ethereumNodeRemote.send('eth_subscribe', params);
- if (!requestId) {
- log.error('No return id for request');
- return;
- }
- const callback = data => {
- if (!data) {
- return;
- }
- try {
- data = JSON.parse(data);
- } catch (error) {
- log.trace('Error parsing data: ', data);
- }
- if (data.id === requestId && data.result) {
- if (data.result) {
- remoteSubscriptionId = data.result;
- resolve(remoteSubscriptionId);
- }
- }
- if (
- data.params &&
- data.params.subscription &&
- data.params.subscription === remoteSubscriptionId
- ) {
- this._sendRemoteResult(localSubscriptionId, data.params.result);
- }
- };
- ethereumNodeRemote.ws.on('message', callback);
- });
- }
- _sendRemoteResult(localSubscriptionId, remoteResult) {
- if (store.getState().nodes.active === 'remote') {
- // Set up object to send
- const res = {
- jsonrpc: '2.0',
- method: 'eth_subscription',
- params: {
- result: remoteResult,
- subscription: localSubscriptionId
- }
- };
- const owner =
- this._subscriptionOwners[localSubscriptionId] &&
- this._connections[this._subscriptionOwners[localSubscriptionId]]
- ? this._connections[this._subscriptionOwners[localSubscriptionId]]
- .owner
- : null;
- if (!owner) {
- log.trace('No owner to send result', res);
- } else if (owner.isDestroyed()) {
- log.trace('Owner to send result already destroyed', res);
- } else {
- log.trace(
- `Sending remote subscription result (remote node is active)`,
- res
- );
- owner.send('ipcProvider-data', JSON.stringify(res));
- }
- }
- }
- }
- exports.init = () => {
- return new IpcProviderBackend();
- };
|