web3Base.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. const _ = require('../utils/underscore.js');
  2. const Q = require('bluebird');
  3. const oboe = require('oboe');
  4. const SocketBase = require('./base');
  5. const Socket = SocketBase.Socket;
  6. const STATE = SocketBase.STATE;
  7. module.exports = class Web3Socket extends Socket {
  8. constructor(socketMgr, id) {
  9. super(socketMgr, id);
  10. this._sendRequests = {};
  11. this._handleSocketResponse();
  12. }
  13. /**
  14. * Send an RPC call.
  15. * @param {Array|Object} single or array of payloads.
  16. * @param {Object} options Additional options.
  17. * @param {Boolean} [options.fullResult] If set then will return full result
  18. * JSON, not just result value.
  19. * @return {Promise}
  20. */
  21. send(payload, options) {
  22. return Q.try(() => {
  23. if (!this.isConnected) {
  24. throw new Error('Not connected');
  25. }
  26. const isBatch = _.isArray(payload);
  27. const finalPayload = isBatch
  28. ? _.map(payload, p => this._finalizeSinglePayload(p))
  29. : this._finalizeSinglePayload(payload);
  30. /*
  31. * For batch requests we use the id of the first request as the
  32. * id to refer to the batch as one. We can do this because the
  33. * response will also come back as a batch, in the same order as the
  34. * the requests within the batch were sent.
  35. */
  36. const id = isBatch ? finalPayload[0].id : finalPayload.id;
  37. this._log.trace(isBatch ? 'Batch request' : 'Request', id, finalPayload);
  38. this._sendRequests[id] = {
  39. options,
  40. /* Preserve the original id of the request so that we can
  41. update the response with it */
  42. origId: isBatch ? _.map(payload, p => p.id) : payload.id
  43. };
  44. this.write(JSON.stringify(finalPayload));
  45. return new Q((resolve, reject) => {
  46. _.extend(this._sendRequests[id], {
  47. resolve,
  48. reject
  49. });
  50. });
  51. });
  52. }
  53. /**
  54. * Construct a payload object.
  55. * @param {Object} payload Payload to send.
  56. * @param {String} payload.method Method name.
  57. * @param {Object} [payload.params] Method arguments.
  58. * @return {Object} final payload object
  59. */
  60. _finalizeSinglePayload(payload) {
  61. if (!payload.method) {
  62. throw new Error('Method required');
  63. }
  64. return {
  65. jsonrpc: '2.0',
  66. id: _.uuid(),
  67. method: payload.method,
  68. params: payload.params || []
  69. };
  70. }
  71. /**
  72. * Handle responses from Geth.
  73. * Responses are false, a single object, or an array of objects
  74. */
  75. _handleSocketResponse() {
  76. oboe(this)
  77. .done(result => {
  78. this._log.trace('JSON response', result);
  79. try {
  80. const isBatch = _.isArray(result);
  81. const firstItem = isBatch ? result[0] : result;
  82. const req = firstItem.id ? this._sendRequests[firstItem.id] : null;
  83. if (req) {
  84. this._log.trace(
  85. isBatch ? 'Batch response' : 'Response',
  86. firstItem.id,
  87. result
  88. );
  89. // if we don't want full JSON result, send just the result
  90. if (!_.get(req, 'options.fullResult')) {
  91. if (isBatch) {
  92. result = _.map(result, r => r.result);
  93. } else {
  94. result = result.result;
  95. }
  96. } else {
  97. // restore original ids
  98. if (isBatch) {
  99. req.origId.forEach((id, idx) => {
  100. if (result[idx]) {
  101. result[idx].id = id;
  102. }
  103. });
  104. } else {
  105. result.id = req.origId;
  106. }
  107. }
  108. req.resolve({
  109. isBatch,
  110. result
  111. });
  112. } else {
  113. // not a response to a request so pass it on as a notification
  114. this.emit('data-notification', result);
  115. }
  116. } catch (err) {
  117. this._log.error('Error handling socket response', err);
  118. }
  119. })
  120. .fail(err => {
  121. this._log.error('Socket response error', err);
  122. _.each(this._sendRequests, req => {
  123. if (req.reject) {
  124. req.reject(err);
  125. }
  126. });
  127. this._sendRequests = {};
  128. });
  129. }
  130. };