typedthreads.nim 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ##[
  10. Thread support for Nim. Threads allow multiple functions to execute concurrently.
  11. In Nim, threads are a low-level construct and using a library like `malebolgia`, `taskpools` or `weave` is recommended.
  12. When creating a thread, you can pass arguments to it. As Nim's garbage collector does not use atomic references, sharing
  13. `ref` and other variables managed by the garbage collector between threads is not supported.
  14. Use global variables to do so, or pointers.
  15. Memory allocated using [`sharedAlloc`](./system.html#allocShared.t%2CNatural) can be used and shared between threads.
  16. To communicate between threads, consider using [channels](./system.html#Channel)
  17. Examples
  18. ========
  19. ```Nim
  20. import std/locks
  21. var
  22. thr: array[0..4, Thread[tuple[a,b: int]]]
  23. L: Lock
  24. proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
  25. for i in interval.a..interval.b:
  26. acquire(L) # lock stdout
  27. echo i
  28. release(L)
  29. initLock(L)
  30. for i in 0..high(thr):
  31. createThread(thr[i], threadFunc, (i*10, i*10+5))
  32. joinThreads(thr)
  33. deinitLock(L)
  34. ```
  35. When using a memory management strategy that supports shared heaps like `arc` or `boehm`,
  36. you can pass pointer to threads and share memory between them, but the memory must outlive the thread.
  37. The default memory management strategy, `orc`, supports this.
  38. The example below is **not valid** for memory management strategies that use local heaps like `refc`!
  39. ```Nim
  40. import locks
  41. var l: Lock
  42. proc threadFunc(obj: ptr seq[int]) {.thread.} =
  43. withLock l:
  44. for i in 0..<100:
  45. obj[].add(obj[].len * obj[].len)
  46. proc threadHandler() =
  47. var thr: array[0..4, Thread[ptr seq[int]]]
  48. var s = newSeq[int]()
  49. for i in 0..high(thr):
  50. createThread(thr[i], threadFunc, s.addr)
  51. joinThreads(thr)
  52. echo s
  53. initLock(l)
  54. threadHandler()
  55. deinitLock(l)
  56. ```
  57. ]##
  58. import std/private/[threadtypes]
  59. export Thread
  60. import system/ansi_c
  61. when defined(nimPreviewSlimSystem):
  62. import std/assertions
  63. when defined(genode):
  64. import genode/env
  65. when hostOS == "any":
  66. {.error: "Threads not implemented for os:any. Please compile with --threads:off.".}
  67. when hasAllocStack or defined(zephyr) or defined(freertos) or defined(nuttx) or
  68. defined(cpu16) or defined(cpu8):
  69. const
  70. nimThreadStackSize {.intdefine.} = 8192
  71. nimThreadStackGuard {.intdefine.} = 128
  72. StackGuardSize = nimThreadStackGuard
  73. ThreadStackSize = nimThreadStackSize - nimThreadStackGuard
  74. else:
  75. const
  76. StackGuardSize = 4096
  77. ThreadStackMask =
  78. when defined(genode):
  79. 1024*64*sizeof(int)-1
  80. else:
  81. 1024*256*sizeof(int)-1
  82. ThreadStackSize = ThreadStackMask+1 - StackGuardSize
  83. when defined(gcDestructors):
  84. proc allocThreadStorage(size: int): pointer =
  85. result = c_malloc(csize_t size)
  86. zeroMem(result, size)
  87. else:
  88. template allocThreadStorage(size: untyped): untyped = allocShared0(size)
  89. #const globalsSlot = ThreadVarSlot(0)
  90. #sysAssert checkSlot.int == globalsSlot.int
  91. # Zephyr doesn't include this properly without some help
  92. when defined(zephyr):
  93. {.emit: """/*INCLUDESECTION*/
  94. #include <pthread.h>
  95. """.}
  96. # We jump through some hops here to ensure that Nim thread procs can have
  97. # the Nim calling convention. This is needed because thread procs are
  98. # ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just
  99. # use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway.
  100. {.push stack_trace:off.}
  101. when defined(windows):
  102. proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} =
  103. result = 0'i32
  104. nimThreadProcWrapperBody(closure)
  105. # implicitly return 0
  106. elif defined(genode):
  107. proc threadProcWrapper[TArg](closure: pointer) {.noconv.} =
  108. nimThreadProcWrapperBody(closure)
  109. else:
  110. proc threadProcWrapper[TArg](closure: pointer): pointer {.noconv.} =
  111. result = nil
  112. nimThreadProcWrapperBody(closure)
  113. {.pop.}
  114. proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
  115. ## Returns true if `t` is running.
  116. result = t.dataFn != nil
  117. proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
  118. ## Returns the thread handle of `t`.
  119. result = t.sys
  120. when hostOS == "windows":
  121. const MAXIMUM_WAIT_OBJECTS = 64
  122. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  123. ## Waits for the thread `t` to finish.
  124. discard waitForSingleObject(t.sys, -1'i32)
  125. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  126. ## Waits for every thread in `t` to finish.
  127. var a: array[MAXIMUM_WAIT_OBJECTS, SysThread] = default(array[MAXIMUM_WAIT_OBJECTS, SysThread])
  128. var k = 0
  129. while k < len(t):
  130. var count = min(len(t) - k, MAXIMUM_WAIT_OBJECTS)
  131. for i in 0..(count - 1): a[i] = t[i + k].sys
  132. discard waitForMultipleObjects(int32(count),
  133. cast[ptr SysThread](addr(a)), 1, -1)
  134. inc(k, MAXIMUM_WAIT_OBJECTS)
  135. elif defined(genode):
  136. proc joinThread*[TArg](t: Thread[TArg]) {.importcpp.}
  137. ## Waits for the thread `t` to finish.
  138. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  139. ## Waits for every thread in `t` to finish.
  140. for i in 0..t.high: joinThread(t[i])
  141. else:
  142. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  143. ## Waits for the thread `t` to finish.
  144. discard pthread_join(t.sys, nil)
  145. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  146. ## Waits for every thread in `t` to finish.
  147. for i in 0..t.high: joinThread(t[i])
  148. when false:
  149. # XXX a thread should really release its heap here somehow:
  150. proc destroyThread*[TArg](t: var Thread[TArg]) =
  151. ## Forces the thread `t` to terminate. This is potentially dangerous if
  152. ## you don't have full control over `t` and its acquired resources.
  153. when hostOS == "windows":
  154. discard TerminateThread(t.sys, 1'i32)
  155. else:
  156. discard pthread_cancel(t.sys)
  157. when declared(registerThread): unregisterThread(addr(t))
  158. t.dataFn = nil
  159. ## if thread `t` already exited, `t.core` will be `null`.
  160. if not isNil(t.core):
  161. deallocThreadStorage(t.core)
  162. t.core = nil
  163. when hostOS == "windows":
  164. proc createThread*[TArg](t: var Thread[TArg],
  165. tp: proc (arg: TArg) {.thread, nimcall.},
  166. param: TArg) =
  167. ## Creates a new thread `t` and starts its execution.
  168. ##
  169. ## Entry point is the proc `tp`.
  170. ## `param` is passed to `tp`. `TArg` can be `void` if you
  171. ## don't need to pass any data to the thread.
  172. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  173. when TArg isnot void: t.data = param
  174. t.dataFn = tp
  175. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  176. var dummyThreadId: int32 = 0'i32
  177. t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
  178. addr(t), 0'i32, dummyThreadId)
  179. if t.sys <= 0:
  180. raise newException(ResourceExhaustedError, "cannot create thread")
  181. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  182. ## Pins a thread to a `CPU`:idx:.
  183. ##
  184. ## In other words sets a thread's `affinity`:idx:.
  185. ## If you don't know what this means, you shouldn't use this proc.
  186. setThreadAffinityMask(t.sys, uint(1 shl cpu))
  187. elif defined(genode):
  188. var affinityOffset: cuint = 1
  189. ## CPU affinity offset for next thread, safe to roll-over.
  190. proc createThread*[TArg](t: var Thread[TArg],
  191. tp: proc (arg: TArg) {.thread, nimcall.},
  192. param: TArg) =
  193. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  194. when TArg isnot void: t.data = param
  195. t.dataFn = tp
  196. when hasSharedHeap: t.stackSize = ThreadStackSize
  197. t.sys.initThread(
  198. runtimeEnv,
  199. ThreadStackSize.culonglong,
  200. threadProcWrapper[TArg], addr(t), affinityOffset)
  201. inc affinityOffset
  202. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  203. {.hint: "cannot change Genode thread CPU affinity after initialization".}
  204. discard
  205. else:
  206. proc createThread*[TArg](t: var Thread[TArg],
  207. tp: proc (arg: TArg) {.thread, nimcall.},
  208. param: TArg) =
  209. ## Creates a new thread `t` and starts its execution.
  210. ##
  211. ## Entry point is the proc `tp`. `param` is passed to `tp`.
  212. ## `TArg` can be `void` if you
  213. ## don't need to pass any data to the thread.
  214. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  215. when TArg isnot void: t.data = param
  216. t.dataFn = tp
  217. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  218. var a {.noinit.}: Pthread_attr
  219. doAssert pthread_attr_init(a) == 0
  220. when hasAllocStack:
  221. var
  222. rawstk = allocThreadStorage(ThreadStackSize + StackGuardSize)
  223. stk = cast[pointer](cast[uint](rawstk) + StackGuardSize)
  224. let setstacksizeResult = pthread_attr_setstack(addr a, stk, ThreadStackSize)
  225. t.rawStack = rawstk
  226. else:
  227. let setstacksizeResult = pthread_attr_setstacksize(a, ThreadStackSize)
  228. when not defined(ios):
  229. # This fails on iOS
  230. doAssert(setstacksizeResult == 0)
  231. if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0:
  232. raise newException(ResourceExhaustedError, "cannot create thread")
  233. doAssert pthread_attr_destroy(a) == 0
  234. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  235. ## Pins a thread to a `CPU`:idx:.
  236. ##
  237. ## In other words sets a thread's `affinity`:idx:.
  238. ## If you don't know what this means, you shouldn't use this proc.
  239. when not defined(macosx):
  240. var s {.noinit.}: CpuSet
  241. cpusetZero(s)
  242. cpusetIncl(cpu.cint, s)
  243. setAffinity(t.sys, csize_t(sizeof(s)), s)
  244. proc createThread*(t: var Thread[void], tp: proc () {.thread, nimcall.}) =
  245. createThread[void](t, tp)
  246. when not defined(gcOrc):
  247. include system/threadids