coro.nim 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Rokas Kupstys
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Nim coroutines implementation, supports several context switching methods:
  10. ## ======== ============
  11. ## ucontext available on unix and alike (default)
  12. ## setjmp available on unix and alike (x86/64 only)
  13. ## fibers available and required on windows.
  14. ## ======== ============
  15. ##
  16. ## -d:nimCoroutines Required to build this module.
  17. ## -d:nimCoroutinesUcontext Use ucontext backend.
  18. ## -d:nimCoroutinesSetjmp Use setjmp backend.
  19. ## -d:nimCoroutinesSetjmpBundled Use bundled setjmp implementation.
  20. ##
  21. ## Unstable API.
  22. import system/coro_detection
  23. when not nimCoroutines and not defined(nimdoc):
  24. when defined(noNimCoroutines):
  25. {.error: "Coroutines can not be used with -d:noNimCoroutines".}
  26. else:
  27. {.error: "Coroutines require -d:nimCoroutines".}
  28. import os
  29. import lists
  30. include system/timers
  31. when defined(nimPreviewSlimSystem):
  32. import std/assertions
  33. const defaultStackSize = 512 * 1024
  34. const useOrcArc = defined(gcArc) or defined(gcOrc)
  35. when useOrcArc:
  36. proc nimGC_setStackBottom*(theStackBottom: pointer) = discard
  37. proc GC_addStack(bottom: pointer) {.cdecl, importc.}
  38. proc GC_removeStack(bottom: pointer) {.cdecl, importc.}
  39. proc GC_setActiveStack(bottom: pointer) {.cdecl, importc.}
  40. proc GC_getActiveStack() : pointer {.cdecl, importc.}
  41. const
  42. CORO_BACKEND_UCONTEXT = 0
  43. CORO_BACKEND_SETJMP = 1
  44. CORO_BACKEND_FIBERS = 2
  45. when defined(windows):
  46. const coroBackend = CORO_BACKEND_FIBERS
  47. when defined(nimCoroutinesUcontext):
  48. {.warning: "ucontext coroutine backend is not available on windows, defaulting to fibers.".}
  49. when defined(nimCoroutinesSetjmp):
  50. {.warning: "setjmp coroutine backend is not available on windows, defaulting to fibers.".}
  51. elif defined(haiku) or defined(openbsd):
  52. const coroBackend = CORO_BACKEND_SETJMP
  53. when defined(nimCoroutinesUcontext):
  54. {.warning: "ucontext coroutine backend is not available on haiku, defaulting to setjmp".}
  55. elif defined(nimCoroutinesSetjmp) or defined(nimCoroutinesSetjmpBundled):
  56. const coroBackend = CORO_BACKEND_SETJMP
  57. else:
  58. const coroBackend = CORO_BACKEND_UCONTEXT
  59. when coroBackend == CORO_BACKEND_FIBERS:
  60. import windows/winlean
  61. type
  62. Context = pointer
  63. elif coroBackend == CORO_BACKEND_UCONTEXT:
  64. type
  65. stack_t {.importc, header: "<ucontext.h>".} = object
  66. ss_sp: pointer
  67. ss_flags: int
  68. ss_size: int
  69. ucontext_t {.importc, header: "<ucontext.h>".} = object
  70. uc_link: ptr ucontext_t
  71. uc_stack: stack_t
  72. Context = ucontext_t
  73. proc getcontext(context: var ucontext_t): int32 {.importc,
  74. header: "<ucontext.h>".}
  75. proc setcontext(context: var ucontext_t): int32 {.importc,
  76. header: "<ucontext.h>".}
  77. proc swapcontext(fromCtx, toCtx: var ucontext_t): int32 {.importc,
  78. header: "<ucontext.h>".}
  79. proc makecontext(context: var ucontext_t, fn: pointer, argc: int32) {.importc,
  80. header: "<ucontext.h>", varargs.}
  81. elif coroBackend == CORO_BACKEND_SETJMP:
  82. proc coroExecWithStack*(fn: pointer, stack: pointer) {.noreturn,
  83. importc: "narch_$1", fastcall.}
  84. when defined(amd64):
  85. {.compile: "../arch/x86/amd64.S".}
  86. elif defined(i386):
  87. {.compile: "../arch/x86/i386.S".}
  88. else:
  89. # coroExecWithStack is defined in assembly. To support other platforms
  90. # please provide implementation of this procedure.
  91. {.error: "Unsupported architecture.".}
  92. when defined(nimCoroutinesSetjmpBundled):
  93. # Use setjmp/longjmp implementation shipped with compiler.
  94. when defined(amd64):
  95. type
  96. JmpBuf = array[0x50 + 0x10, uint8]
  97. elif defined(i386):
  98. type
  99. JmpBuf = array[0x1C, uint8]
  100. else:
  101. # Bundled setjmp/longjmp are defined in assembly. To support other
  102. # platforms please provide implementations of these procedures.
  103. {.error: "Unsupported architecture.".}
  104. proc setjmp(ctx: var JmpBuf): int {.importc: "narch_$1".}
  105. proc longjmp(ctx: JmpBuf, ret = 1) {.importc: "narch_$1".}
  106. else:
  107. # Use setjmp/longjmp implementation provided by the system.
  108. type
  109. JmpBuf {.importc: "jmp_buf", header: "<setjmp.h>".} = object
  110. proc setjmp(ctx: var JmpBuf): int {.importc, header: "<setjmp.h>".}
  111. proc longjmp(ctx: JmpBuf, ret = 1) {.importc, header: "<setjmp.h>".}
  112. type
  113. Context = JmpBuf
  114. when defined(unix):
  115. # GLibc fails with "*** longjmp causes uninitialized stack frame ***" because
  116. # our custom stacks are not initialized to a magic value.
  117. when defined(osx):
  118. # workaround: error: The deprecated ucontext routines require _XOPEN_SOURCE to be defined
  119. const extra = " -D_XOPEN_SOURCE"
  120. else:
  121. const extra = ""
  122. {.passc: "-U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=0" & extra.}
  123. const
  124. CORO_CREATED = 0
  125. CORO_EXECUTING = 1
  126. CORO_FINISHED = 2
  127. type
  128. Stack {.pure.} = object
  129. top: pointer # Top of the stack. Pointer used for deallocating stack if we own it.
  130. bottom: pointer # Very bottom of the stack, acts as unique stack identifier.
  131. size: int
  132. Coroutine {.pure.} = object
  133. execContext: Context
  134. fn: proc()
  135. state: int
  136. lastRun: Ticks
  137. sleepTime: float
  138. stack: Stack
  139. reference: CoroutineRef
  140. CoroutinePtr = ptr Coroutine
  141. CoroutineRef* = ref object
  142. ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
  143. ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
  144. ## object while it can be safely deallocated by coroutine scheduler loop. In this case
  145. ## Coroutine.reference.coro is set to nil. Public API checks for it being nil and
  146. ## gracefully fails if it is nil.
  147. coro: CoroutinePtr
  148. CoroutineLoopContext = ref object
  149. coroutines: DoublyLinkedList[CoroutinePtr]
  150. current: DoublyLinkedNode[CoroutinePtr]
  151. loop: Coroutine
  152. ncbottom: pointer # non coroutine stack botttom
  153. var ctx {.threadvar.}: CoroutineLoopContext
  154. proc getCurrent(): CoroutinePtr =
  155. ## Returns current executing coroutine object.
  156. var node = ctx.current
  157. if node != nil:
  158. return node.value
  159. return nil
  160. proc initialize() =
  161. ## Initializes coroutine state of current thread.
  162. if ctx == nil:
  163. ctx = CoroutineLoopContext()
  164. ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
  165. ctx.loop = Coroutine()
  166. ctx.loop.state = CORO_EXECUTING
  167. when not useOrcArc:
  168. ctx.ncbottom = GC_getActiveStack()
  169. when coroBackend == CORO_BACKEND_FIBERS:
  170. ctx.loop.execContext = ConvertThreadToFiberEx(nil, FIBER_FLAG_FLOAT_SWITCH)
  171. proc runCurrentTask()
  172. proc switchTo(current, to: CoroutinePtr) =
  173. ## Switches execution from `current` into `to` context.
  174. to.lastRun = getTicks()
  175. # Update position of current stack so gc invoked from another stack knows how much to scan.
  176. when not useOrcArc:
  177. GC_setActiveStack(current.stack.bottom)
  178. nimGC_setStackBottom(current.stack.bottom)
  179. var frame = getFrameState()
  180. block:
  181. # Execution will switch to another fiber now. We do not need to update current stack
  182. when coroBackend == CORO_BACKEND_FIBERS:
  183. SwitchToFiber(to.execContext)
  184. elif coroBackend == CORO_BACKEND_UCONTEXT:
  185. discard swapcontext(current.execContext, to.execContext)
  186. elif coroBackend == CORO_BACKEND_SETJMP:
  187. var res = setjmp(current.execContext)
  188. if res == 0:
  189. if to.state == CORO_EXECUTING:
  190. # Coroutine is resumed.
  191. longjmp(to.execContext, 1)
  192. elif to.state == CORO_CREATED:
  193. # Coroutine is started.
  194. coroExecWithStack(runCurrentTask, to.stack.bottom)
  195. #doAssert false
  196. else:
  197. {.error: "Invalid coroutine backend set.".}
  198. # Execution was just resumed. Restore frame information and set active stack.
  199. setFrameState(frame)
  200. when not useOrcArc:
  201. GC_setActiveStack(current.stack.bottom)
  202. nimGC_setStackBottom(ctx.ncbottom)
  203. proc suspend*(sleepTime: float = 0) =
  204. ## Stops coroutine execution and resumes no sooner than after `sleeptime` seconds.
  205. ## Until then other coroutines are executed.
  206. var current = getCurrent()
  207. current.sleepTime = sleepTime
  208. nimGC_setStackBottom(ctx.ncbottom)
  209. switchTo(current, addr(ctx.loop))
  210. proc runCurrentTask() =
  211. ## Starts execution of current coroutine and updates it's state through coroutine's life.
  212. var sp {.volatile.}: pointer
  213. sp = addr(sp)
  214. block:
  215. var current = getCurrent()
  216. current.stack.bottom = sp
  217. nimGC_setStackBottom(current.stack.bottom)
  218. # Execution of new fiber just started. Since it was entered not through `switchTo` we
  219. # have to set active stack here as well. GC_removeStack() has to be called in main loop
  220. # because we still need stack available in final suspend(0) call from which we will not
  221. # return.
  222. when not useOrcArc:
  223. GC_addStack(sp)
  224. # Activate current stack because we are executing in a new coroutine.
  225. GC_setActiveStack(sp)
  226. current.state = CORO_EXECUTING
  227. try:
  228. current.fn() # Start coroutine execution
  229. except:
  230. echo "Unhandled exception in coroutine."
  231. writeStackTrace()
  232. current.state = CORO_FINISHED
  233. nimGC_setStackBottom(ctx.ncbottom)
  234. suspend(0) # Exit coroutine without returning from coroExecWithStack()
  235. doAssert false
  236. proc start*(c: proc(), stacksize: int = defaultStackSize): CoroutineRef {.discardable.} =
  237. ## Schedule coroutine for execution. It does not run immediately.
  238. if ctx == nil:
  239. initialize()
  240. var coro: CoroutinePtr
  241. when coroBackend == CORO_BACKEND_FIBERS:
  242. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
  243. coro.execContext = CreateFiberEx(stacksize, stacksize,
  244. FIBER_FLAG_FLOAT_SWITCH,
  245. (proc(p: pointer) {.stdcall.} = runCurrentTask()), nil)
  246. else:
  247. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
  248. coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine))
  249. coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize)
  250. when coroBackend == CORO_BACKEND_UCONTEXT:
  251. discard getcontext(coro.execContext)
  252. coro.execContext.uc_stack.ss_sp = coro.stack.top
  253. coro.execContext.uc_stack.ss_size = stacksize
  254. coro.execContext.uc_link = addr(ctx.loop.execContext)
  255. makecontext(coro.execContext, runCurrentTask, 0)
  256. coro.fn = c
  257. coro.stack.size = stacksize
  258. coro.state = CORO_CREATED
  259. coro.reference = CoroutineRef(coro: coro)
  260. ctx.coroutines.append(coro)
  261. return coro.reference
  262. proc run*() =
  263. ## Starts main coroutine scheduler loop which exits when all coroutines exit.
  264. ## Calling this proc starts execution of first coroutine.
  265. initialize()
  266. ctx.current = ctx.coroutines.head
  267. var minDelay: float = 0
  268. while ctx.current != nil:
  269. var current = getCurrent()
  270. var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
  271. if remaining <= 0:
  272. # Save main loop context. Suspending coroutine will resume after this statement with
  273. switchTo(addr(ctx.loop), current)
  274. else:
  275. if minDelay > 0 and remaining > 0:
  276. minDelay = min(remaining, minDelay)
  277. else:
  278. minDelay = remaining
  279. if current.state == CORO_FINISHED:
  280. var next = ctx.current.prev
  281. if next == nil:
  282. # If first coroutine ends then `prev` is nil even if more coroutines
  283. # are to be scheduled.
  284. next = ctx.current.next
  285. current.reference.coro = nil
  286. ctx.coroutines.remove(ctx.current)
  287. when not useOrcArc:
  288. GC_removeStack(current.stack.bottom)
  289. when coroBackend == CORO_BACKEND_FIBERS:
  290. DeleteFiber(current.execContext)
  291. else:
  292. dealloc(current.stack.top)
  293. dealloc(current)
  294. ctx.current = next
  295. elif ctx.current == nil or ctx.current.next == nil:
  296. ctx.current = ctx.coroutines.head
  297. os.sleep(int(minDelay * 1000))
  298. else:
  299. ctx.current = ctx.current.next
  300. proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
  301. ## Returns `true` if coroutine has not returned, `false` otherwise.
  302. proc wait*(c: CoroutineRef, interval = 0.01) =
  303. ## Returns only after coroutine `c` has returned. `interval` is time in seconds how often.
  304. while alive(c):
  305. suspend(interval)