bufferToggle.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import { __values } from "tslib";
  2. import { Subscription } from '../Subscription';
  3. import { operate } from '../util/lift';
  4. import { innerFrom } from '../observable/innerFrom';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { noop } from '../util/noop';
  7. import { arrRemove } from '../util/arrRemove';
  8. export function bufferToggle(openings, closingSelector) {
  9. return operate(function (source, subscriber) {
  10. var buffers = [];
  11. innerFrom(openings).subscribe(createOperatorSubscriber(subscriber, function (openValue) {
  12. var buffer = [];
  13. buffers.push(buffer);
  14. var closingSubscription = new Subscription();
  15. var emitBuffer = function () {
  16. arrRemove(buffers, buffer);
  17. subscriber.next(buffer);
  18. closingSubscription.unsubscribe();
  19. };
  20. closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(createOperatorSubscriber(subscriber, emitBuffer, noop)));
  21. }, noop));
  22. source.subscribe(createOperatorSubscriber(subscriber, function (value) {
  23. var e_1, _a;
  24. try {
  25. for (var buffers_1 = __values(buffers), buffers_1_1 = buffers_1.next(); !buffers_1_1.done; buffers_1_1 = buffers_1.next()) {
  26. var buffer = buffers_1_1.value;
  27. buffer.push(value);
  28. }
  29. }
  30. catch (e_1_1) { e_1 = { error: e_1_1 }; }
  31. finally {
  32. try {
  33. if (buffers_1_1 && !buffers_1_1.done && (_a = buffers_1.return)) _a.call(buffers_1);
  34. }
  35. finally { if (e_1) throw e_1.error; }
  36. }
  37. }, function () {
  38. while (buffers.length > 0) {
  39. subscriber.next(buffers.shift());
  40. }
  41. subscriber.complete();
  42. }));
  43. });
  44. }
  45. //# sourceMappingURL=bufferToggle.js.map