readable.asynct.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. var es = require('../')
  2. , it = require('it-is').style('colour')
  3. , u = require('ubelt')
  4. exports ['read an array'] = function (test) {
  5. console.log('readable')
  6. return test.end()
  7. var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
  8. console.log('readable')
  9. var reader =
  10. es.readable(function (i, callback) {
  11. if(i >= readThis.length)
  12. return this.emit('end')
  13. console.log('readable')
  14. callback(null, readThis[i])
  15. })
  16. var writer = es.writeArray(function (err, array){
  17. if(err) throw err
  18. it(array).deepEqual(readThis)
  19. test.done()
  20. })
  21. reader.pipe(writer)
  22. }
  23. exports ['read an array - async'] = function (test) {
  24. var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
  25. var reader =
  26. es.readable(function (i, callback) {
  27. if(i >= readThis.length)
  28. return this.emit('end')
  29. u.delay(callback)(null, readThis[i])
  30. })
  31. var writer = es.writeArray(function (err, array){
  32. if(err) throw err
  33. it(array).deepEqual(readThis)
  34. test.done()
  35. })
  36. reader.pipe(writer)
  37. }
  38. exports ['emit data then call next() also works'] = function (test) {
  39. var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
  40. var reader =
  41. es.readable(function (i, next) {
  42. if(i >= readThis.length)
  43. return this.emit('end')
  44. this.emit('data', readThis[i])
  45. next()
  46. })
  47. var writer = es.writeArray(function (err, array){
  48. if(err) throw err
  49. it(array).deepEqual(readThis)
  50. test.done()
  51. })
  52. reader.pipe(writer)
  53. }
  54. exports ['callback emits error, then stops'] = function (test) {
  55. var err = new Error('INTENSIONAL ERROR')
  56. , called = 0
  57. var reader =
  58. es.readable(function (i, callback) {
  59. if(called++)
  60. return
  61. callback(err)
  62. })
  63. reader.on('error', function (_err){
  64. it(_err).deepEqual(err)
  65. u.delay(function() {
  66. it(called).equal(1)
  67. test.done()
  68. }, 50)()
  69. })
  70. }
  71. exports['readable does not call read concurrently'] = function (test) {
  72. var current = 0
  73. var source = es.readable(function(count, cb){
  74. current ++
  75. if(count > 100)
  76. return this.emit('end')
  77. u.delay(function(){
  78. current --
  79. it(current).equal(0)
  80. cb(null, {ok: true, n: count});
  81. })();
  82. });
  83. var destination = es.map(function(data, cb){
  84. //console.info(data);
  85. cb();
  86. });
  87. var all = es.connect(source, destination);
  88. destination.on('end', test.done)
  89. }
  90. exports ['does not raise a warning: Recursive process.nextTick detected'] = function (test) {
  91. var readThisDelayed;
  92. u.delay(function () {
  93. readThisDelayed = [1, 3, 5];
  94. })();
  95. es.readable(function (count, callback) {
  96. if (readThisDelayed) {
  97. var that = this;
  98. readThisDelayed.forEach(function (item) {
  99. that.emit('data', item);
  100. });
  101. this.emit('end');
  102. test.done();
  103. }
  104. callback();
  105. });
  106. };
  107. //
  108. // emitting multiple errors is not supported by stream.
  109. //
  110. // I do not think that this is a good idea, at least, there should be an option to pipe to
  111. // continue on error. it makes alot ef sense, if you are using Stream like I am, to be able to emit multiple errors.
  112. // an error might not necessarily mean the end of the stream. it depends on the error, at least.
  113. //
  114. // I will start a thread on the mailing list. I'd rather that than use a custom `pipe` implementation.
  115. //
  116. // basically, I want to be able use pipe to transform objects, and if one object is invalid,
  117. // the next might still be good, so I should get to choose if it's gonna stop.
  118. // re-enstate this test when this issue progresses.
  119. //
  120. // hmm. I could add this to es.connect by deregistering the error listener,
  121. // but I would rather it be an option in core.
  122. /*
  123. exports ['emit multiple errors, with 2nd parameter (continueOnError)'] = function (test) {
  124. var readThis = d.map(1, 100, d.id)
  125. , errors = 0
  126. var reader =
  127. es.readable(function (i, callback) {
  128. console.log(i, readThis.length)
  129. if(i >= readThis.length)
  130. return this.emit('end')
  131. if(!(readThis[i] % 7))
  132. return callback(readThis[i])
  133. callback(null, readThis[i])
  134. }, true)
  135. var writer = es.writeArray(function (err, array) {
  136. if(err) throw err
  137. it(array).every(function (u){
  138. it(u % 7).notEqual(0)
  139. }).property('length', 80)
  140. it(errors).equal(14)
  141. test.done()
  142. })
  143. reader.on('error', function (u) {
  144. errors ++
  145. console.log(u)
  146. if('number' !== typeof u)
  147. throw u
  148. it(u % 7).equal(0)
  149. })
  150. reader.pipe(writer)
  151. }
  152. */
  153. require('./helper')(module)