tfuturestream.nim 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. discard """
  2. output: '''
  3. 0
  4. 1
  5. 2
  6. 3
  7. 4
  8. 5
  9. Done
  10. Finished
  11. '''
  12. """
  13. import asyncdispatch
  14. var fs = newFutureStream[int]()
  15. proc alpha() {.async.} =
  16. for i in 0 .. 5:
  17. await fs.write(i)
  18. await sleepAsync(100)
  19. echo("Done")
  20. fs.complete()
  21. proc beta() {.async.} =
  22. while not fs.finished:
  23. let (hasValue, value) = await fs.read()
  24. if hasValue:
  25. echo(value)
  26. echo("Finished")
  27. asyncCheck alpha()
  28. waitFor beta()
  29. template ensureCallbacksAreScheduled =
  30. # callbacks are called directly if the dispatcher is not running
  31. discard getGlobalDispatcher()
  32. proc testCompletion() {.async.} =
  33. ensureCallbacksAreScheduled
  34. var stream = newFutureStream[string]()
  35. for i in 1..5:
  36. await stream.write($i)
  37. var readFuture = stream.readAll()
  38. stream.complete()
  39. yield readFuture
  40. let data = readFuture.read()
  41. doAssert(data.len == 5, "actual data len = " & $data.len)
  42. waitFor testCompletion()
  43. # TODO: Something like this should work eventually.
  44. # proc delta(): FutureStream[string] {.async.} =
  45. # for i in 0 .. 5:
  46. # await sleepAsync(1000)
  47. # result.put($i)
  48. # return ""
  49. # proc omega() {.async.} =
  50. # let fut = delta()
  51. # while not fut.finished():
  52. # echo(await fs.takeAsync())
  53. # echo("Finished")
  54. # waitFor omega()