ipcProviderBackend.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. /**
  2. The IPC provider backend filter and tunnel all incoming request to the ethereum node.
  3. @module ipcProviderBackend
  4. */
  5. const _ = require('../utils/underscore.js');
  6. const Q = require('bluebird');
  7. const { ipcMain: ipc } = require('electron');
  8. const fs = require('fs');
  9. const path = require('path');
  10. const log = require('../utils/logger').create('ipcProviderBackend');
  11. const Sockets = require('../socketManager');
  12. const Settings = require('../settings');
  13. const ethereumNode = require('../ethereumNode');
  14. const ethereumNodeRemote = require('../ethereumNodeRemote');
  15. const ERRORS = {
  16. INVALID_PAYLOAD: {
  17. code: -32600,
  18. message:
  19. "Payload, or some of its content properties are invalid. Please check if they are valid HEX with '0x' prefix."
  20. },
  21. METHOD_DENIED: { code: -32601, message: 'Method __method__ not allowed.' },
  22. METHOD_TIMEOUT: {
  23. code: -32603,
  24. message: 'Request timed out for method __method__.'
  25. },
  26. TX_DENIED: { code: -32603, message: 'Transaction denied' },
  27. BATCH_TX_DENIED: {
  28. code: -32603,
  29. message:
  30. 'Transactions denied, sendTransaction is not allowed in batch requests.'
  31. },
  32. BATCH_COMPILE_DENIED: {
  33. code: -32603,
  34. message:
  35. 'Compilation denied, compileSolidity is not allowed in batch requests.'
  36. },
  37. SOCKET_NOT_CONNECTED: {
  38. code: -32604,
  39. message: 'Socket not connected when trying to send method __method__.'
  40. }
  41. };
  42. /**
  43. * IPC provider backend.
  44. */
  45. class IpcProviderBackend {
  46. constructor() {
  47. this._connections = {};
  48. this.ERRORS = ERRORS;
  49. ethereumNode.on('state', _.bind(this._onNodeStateChanged, this));
  50. ipc.on('ipcProvider-create', _.bind(this._getOrCreateConnection, this));
  51. ipc.on('ipcProvider-destroy', _.bind(this._destroyConnection, this));
  52. ipc.on('ipcProvider-write', _.bind(this._sendRequest, this, false));
  53. ipc.on('ipcProvider-writeSync', _.bind(this._sendRequest, this, true));
  54. this._connectionPromise = {};
  55. // dynamically load in method processors
  56. const processors = fs.readdirSync(path.join(__dirname, 'methods'));
  57. // get response processors
  58. this._processors = {};
  59. processors.forEach(p => {
  60. const name = path.basename(p, '.js');
  61. const PClass = require(path.join(__dirname, 'methods', p));
  62. this._processors[name] = new PClass(name, this);
  63. });
  64. log.trace('Loaded processors', _.keys(this._processors));
  65. store.dispatch({ type: '[MAIN]:IPC_PROVIDER_BACKEND:INIT' });
  66. // Store remote subscriptions.
  67. // > key: local subscription id
  68. // > value: remote subscription id
  69. this._remoteSubscriptions = {};
  70. // Store subscription owners for sending remote results
  71. // > key: local subscription id
  72. // > value: connection ownerId
  73. this._subscriptionOwners = {};
  74. }
  75. /**
  76. * Get/create new connection to node.
  77. * @return {Promise}
  78. */
  79. _getOrCreateConnection(event) {
  80. const owner = event.sender;
  81. const ownerId = owner.id;
  82. let socket;
  83. return Q.try(() => {
  84. // already got?
  85. if (this._connections[ownerId]) {
  86. socket = this._connections[ownerId].socket;
  87. } else {
  88. log.debug(`Create new socket connection, id=${ownerId}`);
  89. socket = Sockets.get(ownerId, Settings.rpcMode);
  90. }
  91. })
  92. .then(() => {
  93. if (!this._connections[ownerId]) {
  94. // save to collection
  95. this._connections[ownerId] = {
  96. id: ownerId,
  97. owner,
  98. socket
  99. };
  100. // if something goes wrong destroy the socket
  101. ['error', 'timeout', 'end'].forEach(ev => {
  102. socket.on(ev, data => {
  103. log.debug(
  104. `Destroy socket connection due to event: ${ev}, id=${ownerId}`
  105. );
  106. socket.destroy().finally(() => {
  107. if (!owner.isDestroyed()) {
  108. owner.send(`ipcProvider-${ev}`, JSON.stringify(data));
  109. }
  110. });
  111. delete this._connections[ownerId];
  112. Sockets.remove(ownerId);
  113. });
  114. });
  115. socket.on('connect', data => {
  116. if (!owner.isDestroyed()) {
  117. owner.send('ipcProvider-connect', JSON.stringify(data));
  118. }
  119. });
  120. // pass notifications back up the chain
  121. socket.on('data-notification', data => {
  122. log.trace('Notification received', ownerId, data);
  123. if (data.error) {
  124. data = this._makeErrorResponsePayload(data, data);
  125. } else {
  126. data = this._makeResponsePayload(data, data);
  127. }
  128. if (!owner.isDestroyed()) {
  129. owner.send('ipcProvider-data', JSON.stringify(data));
  130. }
  131. });
  132. }
  133. })
  134. .then(() => {
  135. if (!socket.isConnected) {
  136. // since we may enter this function multiple times for the same
  137. // event source's IPC we don't want to repeat the connection
  138. // process each time - so let's track things in a promise
  139. if (!this._connectionPromise[ownerId]) {
  140. this._connectionPromise[ownerId] = Q.try(() => {
  141. log.debug(`Connecting socket ${ownerId}`);
  142. // wait for node to connect first.
  143. if (ethereumNode.state !== ethereumNode.STATES.CONNECTED) {
  144. return new Q((resolve, reject) => {
  145. const onStateChange = newState => {
  146. if (ethereumNode.STATES.CONNECTED === newState) {
  147. ethereumNode.removeListener('state', onStateChange);
  148. log.debug(
  149. `Ethereum node connected, resume connecting socket ${ownerId}`
  150. );
  151. resolve();
  152. }
  153. };
  154. ethereumNode.on('state', onStateChange);
  155. });
  156. }
  157. })
  158. .then(() => {
  159. return socket.connect(
  160. Settings.rpcConnectConfig,
  161. {
  162. timeout: 5000
  163. }
  164. );
  165. })
  166. .then(() => {
  167. log.debug(`Socket connected, id=${ownerId}`);
  168. })
  169. .finally(() => {
  170. delete this._connectionPromise[ownerId];
  171. });
  172. }
  173. return this._connectionPromise[ownerId];
  174. }
  175. })
  176. .then(() => {
  177. if (!owner.isDestroyed()) {
  178. owner.send('ipcProvider-setWritable', true);
  179. }
  180. return this._connections[ownerId];
  181. });
  182. }
  183. /**
  184. * Handle IPC call to destroy a connection.
  185. */
  186. _destroyConnection(event) {
  187. const ownerId = event.sender.id;
  188. if (this._connections[ownerId]) {
  189. log.debug('Destroy socket connection', ownerId);
  190. this._connections[ownerId].owner.send('ipcProvider-setWritable', false);
  191. this._connections[ownerId].socket.destroy();
  192. delete this._connections[ownerId];
  193. Sockets.remove(ownerId);
  194. }
  195. }
  196. /**
  197. * Handler for when Ethereum node state changes.
  198. *
  199. * Auto-reconnect sockets when ethereum node state changes
  200. *
  201. * @param {String} state The new state.
  202. */
  203. _onNodeStateChanged(state) {
  204. switch (
  205. state // eslint-disable-line default-case
  206. ) {
  207. // stop syncing when node about to be stopped
  208. case ethereumNode.STATES.STOPPING:
  209. log.info('Ethereum node stopping, disconnecting sockets');
  210. // Unsubscribe remote subscriptions
  211. _.each(this._remoteSubscriptions, remoteSubscriptionId => {
  212. ethereumNodeRemote.send('eth_unsubscribe', [remoteSubscriptionId]);
  213. });
  214. this._remoteSubscriptions = {};
  215. this._subscriptionOwners = {};
  216. Q.all(
  217. _.map(this._connections, item => {
  218. if (item.socket.isConnected) {
  219. return item.socket.disconnect().then(() => {
  220. log.debug(
  221. `Tell owner ${item.id} that socket is not currently writeable`
  222. );
  223. item.owner.send('ipcProvider-setWritable', false);
  224. });
  225. }
  226. return Q.resolve();
  227. })
  228. ).catch(err => {
  229. log.error('Error disconnecting sockets', err);
  230. });
  231. break;
  232. }
  233. }
  234. /**
  235. * Handle IPC call to send a request.
  236. * @param {Boolean} isSync whether request is sync.
  237. * @param {Object} event IPC event.
  238. * @param {String} payload request payload.
  239. * @param {Boolean} retry whether trying request again due to error
  240. */
  241. _sendRequest(isSync, event, payload, retry = false) {
  242. const ownerId = event.sender.id;
  243. log.trace('sendRequest', isSync ? 'sync' : 'async', ownerId, payload);
  244. const originalPayloadStr = payload;
  245. return Q.try(() => {
  246. // overwrite playload var with parsed version
  247. payload = JSON.parse(originalPayloadStr);
  248. return this._getOrCreateConnection(event);
  249. })
  250. .then(conn => {
  251. // reparse original string (so that we don't modify input payload)
  252. const finalPayload = JSON.parse(originalPayloadStr);
  253. // is batch?
  254. const isBatch = _.isArray(finalPayload);
  255. const finalPayloadList = isBatch ? finalPayload : [finalPayload];
  256. if (!conn.socket.isConnected) {
  257. const error = Object.assign({}, this.ERRORS.SOCKET_NOT_CONNECTED, {
  258. message: this.ERRORS.SOCKET_NOT_CONNECTED.message.replace(
  259. '__method__',
  260. finalPayloadList.map(p => p.method).join(', ')
  261. )
  262. });
  263. // Try again if not already a retry
  264. if (!retry) {
  265. error.message += ' Will retry...';
  266. setTimeout(() => {
  267. this._sendRequest(isSync, event, originalPayloadStr, true);
  268. }, 2000);
  269. }
  270. log.debug(error);
  271. throw error;
  272. }
  273. // sanitize each and every request payload
  274. _.each(finalPayloadList, p => {
  275. const processor = this._processors[p.method]
  276. ? this._processors[p.method]
  277. : this._processors.base;
  278. processor.sanitizeRequestPayload(conn, p, isBatch);
  279. });
  280. // if a single payload and has an error then throw it
  281. if (!isBatch && finalPayload.error) {
  282. throw finalPayload.error;
  283. }
  284. // get non-error payloads
  285. const nonErrorPayloads = _.filter(finalPayloadList, p => !p.error);
  286. // execute non-error payloads
  287. return Q.try(() => {
  288. if (nonErrorPayloads.length) {
  289. // if single payload check if we have special processor for it
  290. // if not then use base generic processor
  291. const processor = this._processors[finalPayload.method]
  292. ? this._processors[finalPayload.method]
  293. : this._processors.base;
  294. return processor.exec(
  295. conn,
  296. isBatch ? nonErrorPayloads : nonErrorPayloads[0]
  297. );
  298. } else {
  299. return [];
  300. }
  301. }).then(ret => {
  302. log.trace('Got result', ret);
  303. let finalResult = [];
  304. // collate results
  305. _.each(finalPayloadList, p => {
  306. if (p.error) {
  307. finalResult.push(p);
  308. } else {
  309. p = _.extend({}, p, isBatch ? ret.shift() : ret);
  310. const processor = this._processors[p.method]
  311. ? this._processors[p.method]
  312. : this._processors.base;
  313. // sanitize response payload
  314. processor.sanitizeResponsePayload(conn, p, isBatch);
  315. // if subscription, save connection ownerId for sending responses later
  316. if (p.method === 'eth_subscribe') {
  317. const subscriptionId = p.result;
  318. if (subscriptionId) {
  319. this._subscriptionOwners[subscriptionId] = ownerId;
  320. }
  321. }
  322. finalResult.push(p);
  323. }
  324. });
  325. // extract single payload result
  326. if (!isBatch) {
  327. finalResult = finalResult.pop();
  328. // check if it's an error
  329. if (finalResult.error) {
  330. throw finalResult.error;
  331. }
  332. }
  333. return finalResult;
  334. });
  335. })
  336. .then(result => {
  337. log.trace('Got result', result);
  338. return this._makeResponsePayload(payload, result);
  339. })
  340. .catch(err => {
  341. log.error('Send request failed', err);
  342. err = this._makeErrorResponsePayload(payload || {}, {
  343. message: typeof err === 'string' ? err : err.message,
  344. code: err.code
  345. });
  346. return err;
  347. })
  348. .then(returnValue => {
  349. returnValue = JSON.stringify(returnValue);
  350. log.trace('Return', ownerId, returnValue);
  351. if (isSync) {
  352. event.returnValue = returnValue;
  353. } else if (!event.sender.isDestroyed()) {
  354. event.sender.send('ipcProvider-data', returnValue);
  355. }
  356. });
  357. }
  358. /**
  359. Sanitize a single or batch request payload.
  360. This will modify the passed-in payload.
  361. @param {Object} conn The connection.
  362. @param {Object|Array} payload The request payload.
  363. */
  364. _sanitizeRequestPayload(conn, payload) {
  365. if (_.isArray(payload)) {
  366. _.each(payload, p => {
  367. if (p.method === 'eth_sendTransaction') {
  368. p.error = ERRORS.BATCH_TX_DENIED;
  369. } else {
  370. this._processors.base.sanitizePayload(conn, p);
  371. }
  372. });
  373. } else {
  374. this._processors.base.sanitizePayload(conn, payload);
  375. }
  376. }
  377. /**
  378. Make an error response payload
  379. @param {Object|Array} originalPayload Original payload
  380. @param {Object} error Error result
  381. */
  382. _makeErrorResponsePayload(originalPayload, error) {
  383. const e = [].concat(originalPayload).map(item => {
  384. const e = _.extend(
  385. {
  386. jsonrpc: '2.0'
  387. },
  388. error
  389. );
  390. if (e.message) {
  391. if (_.isArray(e.message)) {
  392. e.message = e.message.pop();
  393. }
  394. e.error = {
  395. code: e.code,
  396. message: e.message.replace(/'[a-z_]*'/i, `'${item.method}'`)
  397. };
  398. delete e.code;
  399. delete e.message;
  400. }
  401. // delete stuff leftover from request
  402. delete e.params;
  403. delete e.method;
  404. e.id = item.id;
  405. return e;
  406. });
  407. return _.isArray(originalPayload) ? e : e[0];
  408. }
  409. /**
  410. Make a response payload.
  411. @param {Object|Array} originalPayload Original payload
  412. @param {Object|Array} value Response results.
  413. @method makeReturnValue
  414. */
  415. _makeResponsePayload(originalPayload, value) {
  416. const finalValue = _.isArray(originalPayload) ? value : [value];
  417. const allResults = [].concat(originalPayload).map((item, idx) => {
  418. const finalResult = finalValue[idx];
  419. let ret;
  420. // handle error result
  421. if (finalResult.error) {
  422. ret = this._makeErrorResponsePayload(item, finalResult.error);
  423. } else {
  424. ret = _.extend({}, item, {
  425. result: finalResult.result
  426. });
  427. }
  428. ret = this._handleSubscriptions(ret);
  429. if (ret) {
  430. if (item.id) {
  431. delete ret.params;
  432. delete ret.method;
  433. }
  434. ret.jsonrpc = '2.0';
  435. }
  436. return ret;
  437. });
  438. return _.isArray(originalPayload) ? allResults : allResults[0];
  439. }
  440. /**
  441. Handles eth_subscribe|eth_subscription|eth_unsubscribe to:
  442. 1. Create a remote subscription when created on local (except syncing subscriptions)
  443. 2. Send remote subscription result if on remote (with remote subscription id swapped for local id)
  444. 3. Ignore local subscription result if on remote
  445. 4. Unsubscribe remote subscription when unsubscribing local subscription
  446. @param {Object} result
  447. @return {Object} result
  448. */
  449. _handleSubscriptions(result) {
  450. if (result.method === 'eth_subscribe') {
  451. // If subscription is created in local, also create the subscription in remote
  452. const subscriptionType = result.params[0];
  453. const subscriptionId = result.result;
  454. // Create subscription in remote node
  455. this._remoteSubscriptions[subscriptionId] = this._subscribeRemote(
  456. subscriptionId,
  457. result.params
  458. );
  459. }
  460. if (result.method === 'eth_subscription') {
  461. // Skip if syncing result
  462. if (result.params.result.syncing) {
  463. return result;
  464. }
  465. // If remote node is active, cancel propogating response
  466. // since we'll return the remote response instead
  467. if (store.getState().nodes.active === 'remote') {
  468. log.trace(
  469. `Ignoring local subscription result (remote node is active). subscription id: ${
  470. result.params.subscription
  471. }`
  472. );
  473. return null;
  474. } else {
  475. log.trace(
  476. `Sending local subscription result (local node is active). subscription id: ${
  477. result.params.subscription
  478. }`
  479. );
  480. return result;
  481. }
  482. }
  483. if (result.method === 'eth_unsubscribe') {
  484. const subscriptionId = result.params[0];
  485. const localSubscriptionId = this._remoteSubscriptions[subscriptionId];
  486. if (localSubscriptionId) {
  487. ethereumNodeRemote.send('eth_unsubscribe', [localSubscriptionId]);
  488. delete this._remoteSubscriptions[subscriptionId];
  489. delete this._subscriptionOwners[subscriptionId];
  490. }
  491. }
  492. return result;
  493. }
  494. /**
  495. Creates a subscription in remote node and
  496. sends results down the pipe if remote node is active
  497. @param {Object} params - Subscription params
  498. @param {Boolean} retry - Is this request a retry
  499. */
  500. _subscribeRemote(localSubscriptionId, params, retry = false) {
  501. return new Promise(async (resolve, reject) => {
  502. log.trace(
  503. `Creating remote subscription: ${params} (local subscription id: ${localSubscriptionId})`
  504. );
  505. var remoteSubscriptionId;
  506. const requestId = await ethereumNodeRemote.send('eth_subscribe', params);
  507. if (!requestId) {
  508. log.error('No return id for request');
  509. return;
  510. }
  511. const callback = data => {
  512. if (!data) {
  513. return;
  514. }
  515. try {
  516. data = JSON.parse(data);
  517. } catch (error) {
  518. log.trace('Error parsing data: ', data);
  519. }
  520. if (data.id === requestId && data.result) {
  521. if (data.result) {
  522. remoteSubscriptionId = data.result;
  523. resolve(remoteSubscriptionId);
  524. }
  525. }
  526. if (
  527. data.params &&
  528. data.params.subscription &&
  529. data.params.subscription === remoteSubscriptionId
  530. ) {
  531. this._sendRemoteResult(localSubscriptionId, data.params.result);
  532. }
  533. };
  534. ethereumNodeRemote.ws.on('message', callback);
  535. });
  536. }
  537. _sendRemoteResult(localSubscriptionId, remoteResult) {
  538. if (store.getState().nodes.active === 'remote') {
  539. // Set up object to send
  540. const res = {
  541. jsonrpc: '2.0',
  542. method: 'eth_subscription',
  543. params: {
  544. result: remoteResult,
  545. subscription: localSubscriptionId
  546. }
  547. };
  548. const owner =
  549. this._subscriptionOwners[localSubscriptionId] &&
  550. this._connections[this._subscriptionOwners[localSubscriptionId]]
  551. ? this._connections[this._subscriptionOwners[localSubscriptionId]]
  552. .owner
  553. : null;
  554. if (!owner) {
  555. log.trace('No owner to send result', res);
  556. } else if (owner.isDestroyed()) {
  557. log.trace('Owner to send result already destroyed', res);
  558. } else {
  559. log.trace(
  560. `Sending remote subscription result (remote node is active)`,
  561. res
  562. );
  563. owner.send('ipcProvider-data', JSON.stringify(res));
  564. }
  565. }
  566. }
  567. }
  568. exports.init = () => {
  569. return new IpcProviderBackend();
  570. };