threadpool.nim 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
  10. ##
  11. ## Unstable API.
  12. ##
  13. ## See also
  14. ## ========
  15. ## * `threads module <threads.html>`_ for basic thread support
  16. ## * `channels module <channels_builtin.html>`_ for message passing support
  17. ## * `locks module <locks.html>`_ for locks and condition variables
  18. ## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
  19. when not compileOption("threads"):
  20. {.error: "Threadpool requires --threads:on option.".}
  21. import cpuinfo, cpuload, locks, os
  22. {.push stackTrace:off.}
  23. type
  24. Semaphore = object
  25. c: Cond
  26. L: Lock
  27. counter: int
  28. proc initSemaphore(cv: var Semaphore) =
  29. initCond(cv.c)
  30. initLock(cv.L)
  31. proc destroySemaphore(cv: var Semaphore) {.inline.} =
  32. deinitCond(cv.c)
  33. deinitLock(cv.L)
  34. proc blockUntil(cv: var Semaphore) =
  35. acquire(cv.L)
  36. while cv.counter <= 0:
  37. wait(cv.c, cv.L)
  38. dec cv.counter
  39. release(cv.L)
  40. proc signal(cv: var Semaphore) =
  41. acquire(cv.L)
  42. inc cv.counter
  43. release(cv.L)
  44. signal(cv.c)
  45. const CacheLineSize = 64 # true for most archs
  46. type
  47. Barrier {.compilerproc.} = object
  48. entered: int
  49. cv: Semaphore # Semaphore takes 3 words at least
  50. left {.align(CacheLineSize).}: int
  51. interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
  52. proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
  53. # due to the signaling between threads, it is ensured we are the only
  54. # one with access to 'entered' so we don't need 'atomicInc' here:
  55. inc b.entered
  56. # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
  57. # will be called which already will perform a fence for us.
  58. proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
  59. atomicInc b.left
  60. when not defined(x86): fence()
  61. # We may not have seen the final value of b.entered yet,
  62. # so we need to check for >= instead of ==.
  63. if b.interest and b.left >= b.entered: signal(b.cv)
  64. proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
  65. b.entered = 0
  66. b.left = 0
  67. b.interest = false
  68. proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
  69. fence()
  70. if b.left != b.entered:
  71. b.cv.initSemaphore()
  72. fence()
  73. b.interest = true
  74. fence()
  75. while b.left != b.entered: blockUntil(b.cv)
  76. destroySemaphore(b.cv)
  77. {.pop.}
  78. # ----------------------------------------------------------------------------
  79. type
  80. AwaitInfo = object
  81. cv: Semaphore
  82. idx: int
  83. FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
  84. FlowVarBaseObj = object of RootObj
  85. ready, usesSemaphore, awaited: bool
  86. cv: Semaphore # for 'blockUntilAny' support
  87. ai: ptr AwaitInfo
  88. idx: int
  89. data: pointer # we incRef and unref it to keep it alive; note this MUST NOT
  90. # be RootRef here otherwise the wrong GC keeps track of it!
  91. owner: pointer # ptr Worker
  92. FlowVarObj[T] = object of FlowVarBaseObj
  93. blob: T
  94. FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
  95. ToFreeQueue = object
  96. len: int
  97. lock: Lock
  98. empty: Semaphore
  99. data: array[128, pointer]
  100. WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
  101. Worker = object
  102. taskArrived: Semaphore
  103. taskStarted: Semaphore #\
  104. # task data:
  105. f: WorkerProc
  106. data: pointer
  107. ready: bool # put it here for correct alignment!
  108. initialized: bool # whether it has even been initialized
  109. shutdown: bool # the pool requests to shut down this worker thread
  110. q: ToFreeQueue
  111. readyForTask: Semaphore
  112. const threadpoolWaitMs {.intdefine.}: int = 100
  113. proc blockUntil*(fv: var FlowVarBaseObj) =
  114. ## Waits until the value for `fv` arrives.
  115. ##
  116. ## Usually it is not necessary to call this explicitly.
  117. if fv.usesSemaphore and not fv.awaited:
  118. fv.awaited = true
  119. blockUntil(fv.cv)
  120. destroySemaphore(fv.cv)
  121. proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
  122. if cas(addr w.ready, true, false):
  123. w.data = data
  124. w.f = fn
  125. signal(w.taskArrived)
  126. blockUntil(w.taskStarted)
  127. result = true
  128. proc cleanFlowVars(w: ptr Worker) =
  129. let q = addr(w.q)
  130. acquire(q.lock)
  131. for i in 0 ..< q.len:
  132. GC_unref(cast[RootRef](q.data[i]))
  133. #echo "GC_unref"
  134. q.len = 0
  135. release(q.lock)
  136. proc wakeupWorkerToProcessQueue(w: ptr Worker) =
  137. # we have to ensure it's us who wakes up the owning thread.
  138. # This is quite horrible code, but it runs so rarely that it doesn't matter:
  139. while not cas(addr w.ready, true, false):
  140. cpuRelax()
  141. discard
  142. w.data = nil
  143. w.f = proc (w, a: pointer) {.nimcall.} =
  144. let w = cast[ptr Worker](w)
  145. cleanFlowVars(w)
  146. signal(w.q.empty)
  147. signal(w.taskArrived)
  148. proc attach(fv: FlowVarBase; i: int): bool =
  149. acquire(fv.cv.L)
  150. if fv.cv.counter <= 0:
  151. fv.idx = i
  152. result = true
  153. else:
  154. result = false
  155. release(fv.cv.L)
  156. proc finished(fv: var FlowVarBaseObj) =
  157. doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
  158. # we have to protect against the rare cases where the owner of the flowVar
  159. # simply disregards the flowVar and yet the "flowVar" has not yet written
  160. # anything to it:
  161. blockUntil(fv)
  162. if fv.data.isNil: return
  163. let owner = cast[ptr Worker](fv.owner)
  164. let q = addr(owner.q)
  165. acquire(q.lock)
  166. while not (q.len < q.data.len):
  167. #echo "EXHAUSTED!"
  168. release(q.lock)
  169. wakeupWorkerToProcessQueue(owner)
  170. blockUntil(q.empty)
  171. acquire(q.lock)
  172. q.data[q.len] = cast[pointer](fv.data)
  173. inc q.len
  174. release(q.lock)
  175. fv.data = nil
  176. # the worker thread waits for "data" to be set to nil before shutting down
  177. owner.data = nil
  178. proc `=destroy`[T](fv: var FlowVarObj[T]) =
  179. finished(fv)
  180. `=destroy`(fv.blob)
  181. proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
  182. new(result)
  183. proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
  184. fv.cv.initSemaphore()
  185. fv.usesSemaphore = true
  186. proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
  187. if fv.ai != nil:
  188. acquire(fv.ai.cv.L)
  189. fv.ai.idx = fv.idx
  190. inc fv.ai.cv.counter
  191. release(fv.ai.cv.L)
  192. signal(fv.ai.cv.c)
  193. if fv.usesSemaphore:
  194. signal(fv.cv)
  195. proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
  196. ## Blocks until `fv` is available and then passes its value
  197. ## to `action`.
  198. ##
  199. ## Note that due to Nim's parameter passing semantics, this
  200. ## means that `T` doesn't need to be copied, so `awaitAndThen` can
  201. ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
  202. blockUntil(fv[])
  203. when defined(nimV2):
  204. action(fv.blob)
  205. elif T is string or T is seq:
  206. action(cast[T](fv.data))
  207. elif T is ref:
  208. {.error: "'awaitAndThen' not available for FlowVar[ref]".}
  209. else:
  210. action(fv.blob)
  211. finished(fv[])
  212. proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
  213. ## Blocks until the value is available and then returns this value.
  214. blockUntil(fv[])
  215. when defined(nimV2):
  216. result = cast[ptr T](fv.blob)
  217. else:
  218. result = cast[ptr T](fv.data)
  219. finished(fv[])
  220. proc `^`*[T](fv: FlowVar[T]): T =
  221. ## Blocks until the value is available and then returns this value.
  222. blockUntil(fv[])
  223. when not defined(nimV2) and (T is string or T is seq or T is ref):
  224. deepCopy result, cast[T](fv.data)
  225. else:
  226. result = fv.blob
  227. finished(fv[])
  228. proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
  229. ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
  230. ## for which a value arrived.
  231. ##
  232. ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
  233. ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
  234. ## the second call will only block until `c`. If there is no `flowVar` left
  235. ## to be able to wait on, -1 is returned.
  236. ##
  237. ## **Note:** This results in non-deterministic behaviour and should be avoided.
  238. var ai: AwaitInfo
  239. ai.cv.initSemaphore()
  240. var conflicts = 0
  241. result = -1
  242. for i in 0 .. flowVars.high:
  243. if cas(addr flowVars[i].ai, nil, addr ai):
  244. if not attach(flowVars[i], i):
  245. result = i
  246. break
  247. else:
  248. inc conflicts
  249. if conflicts < flowVars.len:
  250. if result < 0:
  251. blockUntil(ai.cv)
  252. result = ai.idx
  253. for i in 0 .. flowVars.high:
  254. discard cas(addr flowVars[i].ai, addr ai, nil)
  255. destroySemaphore(ai.cv)
  256. proc isReady*(fv: FlowVarBase): bool =
  257. ## Determines whether the specified `FlowVarBase`'s value is available.
  258. ##
  259. ## If `true`, awaiting `fv` will not block.
  260. if fv.usesSemaphore and not fv.awaited:
  261. acquire(fv.cv.L)
  262. result = fv.cv.counter > 0
  263. release(fv.cv.L)
  264. else:
  265. result = true
  266. proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
  267. let w = cast[ptr Worker](p)
  268. signal(w.taskStarted)
  269. const
  270. MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
  271. ## should be good enough for anybody ;-)
  272. MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
  273. type
  274. ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
  275. var
  276. currentPoolSize: int
  277. maxPoolSize = MaxThreadPoolSize
  278. minPoolSize = 4
  279. gSomeReady: Semaphore
  280. readyWorker: ptr Worker
  281. # A workaround for recursion deadlock issue
  282. # https://github.com/nim-lang/Nim/issues/4597
  283. var
  284. numSlavesLock: Lock
  285. numSlavesRunning {.guard: numSlavesLock.}: int
  286. numSlavesWaiting {.guard: numSlavesLock.}: int
  287. isSlave {.threadvar.}: bool
  288. numSlavesLock.initLock
  289. gSomeReady.initSemaphore()
  290. proc slave(w: ptr Worker) {.thread.} =
  291. isSlave = true
  292. while true:
  293. if w.shutdown:
  294. w.shutdown = false
  295. atomicDec currentPoolSize
  296. while true:
  297. if w.data != nil:
  298. sleep(threadpoolWaitMs)
  299. else:
  300. # The flowvar finalizer ("finished()") set w.data to nil, so we can
  301. # safely terminate the thread.
  302. #
  303. # TODO: look for scenarios in which the flowvar is never finalized, so
  304. # a shut down thread gets stuck in this loop until the main thread exits.
  305. break
  306. break
  307. when declared(atomicStoreN):
  308. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  309. else:
  310. w.ready = true
  311. readyWorker = w
  312. signal(gSomeReady)
  313. blockUntil(w.taskArrived)
  314. # XXX Somebody needs to look into this (why does this assertion fail
  315. # in Visual Studio?)
  316. when not defined(vcc) and not defined(tcc): assert(not w.ready)
  317. withLock numSlavesLock:
  318. inc numSlavesRunning
  319. w.f(w, w.data)
  320. withLock numSlavesLock:
  321. dec numSlavesRunning
  322. if w.q.len != 0: w.cleanFlowVars
  323. proc distinguishedSlave(w: ptr Worker) {.thread.} =
  324. while true:
  325. when declared(atomicStoreN):
  326. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  327. else:
  328. w.ready = true
  329. signal(w.readyForTask)
  330. blockUntil(w.taskArrived)
  331. assert(not w.ready)
  332. w.f(w, w.data)
  333. if w.q.len != 0: w.cleanFlowVars
  334. var
  335. workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
  336. workersData: array[MaxThreadPoolSize, Worker]
  337. distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
  338. distinguishedData: array[MaxDistinguishedThread, Worker]
  339. when defined(nimPinToCpu):
  340. var gCpus: Natural
  341. proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
  342. ## Sets the minimum thread pool size. The default value of this is 4.
  343. minPoolSize = size
  344. proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
  345. ## Sets the maximum thread pool size. The default value of this
  346. ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
  347. maxPoolSize = size
  348. if currentPoolSize > maxPoolSize:
  349. for i in maxPoolSize..currentPoolSize-1:
  350. let w = addr(workersData[i])
  351. w.shutdown = true
  352. when defined(nimRecursiveSpawn):
  353. var localThreadId {.threadvar.}: int
  354. proc activateWorkerThread(i: int) {.noinline.} =
  355. workersData[i].taskArrived.initSemaphore()
  356. workersData[i].taskStarted.initSemaphore()
  357. workersData[i].initialized = true
  358. workersData[i].q.empty.initSemaphore()
  359. initLock(workersData[i].q.lock)
  360. createThread(workers[i], slave, addr(workersData[i]))
  361. when defined(nimRecursiveSpawn):
  362. localThreadId = i+1
  363. when defined(nimPinToCpu):
  364. if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
  365. proc activateDistinguishedThread(i: int) {.noinline.} =
  366. distinguishedData[i].taskArrived.initSemaphore()
  367. distinguishedData[i].taskStarted.initSemaphore()
  368. distinguishedData[i].initialized = true
  369. distinguishedData[i].q.empty.initSemaphore()
  370. initLock(distinguishedData[i].q.lock)
  371. distinguishedData[i].readyForTask.initSemaphore()
  372. createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
  373. proc setup() =
  374. let p = countProcessors()
  375. when defined(nimPinToCpu):
  376. gCpus = p
  377. currentPoolSize = min(p, MaxThreadPoolSize)
  378. readyWorker = addr(workersData[0])
  379. for i in 0..<currentPoolSize: activateWorkerThread(i)
  380. proc preferSpawn*(): bool =
  381. ## Use this proc to determine quickly if a `spawn` or a direct call is
  382. ## preferable.
  383. ##
  384. ## If it returns `true`, a `spawn` may make sense. In general
  385. ## it is not necessary to call this directly; use the `spawnX template
  386. ## <#spawnX.t>`_ instead.
  387. result = gSomeReady.counter > 0
  388. proc spawn*(call: sink typed) {.magic: "Spawn".}
  389. ## Always spawns a new task, so that the `call` is never executed on
  390. ## the calling thread.
  391. ##
  392. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  393. ## return type that is either `void` or compatible with `FlowVar[T]`.
  394. proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".}
  395. ## Always spawns a new task on the worker thread with `id`, so that
  396. ## the `call` is **always** executed on the thread.
  397. ##
  398. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  399. ## return type that is either `void` or compatible with `FlowVar[T]`.
  400. template spawnX*(call) =
  401. ## Spawns a new task if a CPU core is ready, otherwise executes the
  402. ## call in the calling thread.
  403. ##
  404. ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
  405. ## in order to not block the producer for an unknown amount of time.
  406. ##
  407. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  408. ## return type that is either 'void' or compatible with `FlowVar[T]`.
  409. (if preferSpawn(): spawn call else: call)
  410. proc parallel*(body: untyped) {.magic: "Parallel".}
  411. ## A parallel section can be used to execute a block in parallel.
  412. ##
  413. ## `body` has to be in a DSL that is a particular subset of the language.
  414. ##
  415. ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
  416. ## for further information.
  417. var
  418. state: ThreadPoolState
  419. stateLock: Lock
  420. initLock stateLock
  421. proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
  422. # implementation of 'spawn' that is used by the code generator.
  423. while true:
  424. if selectWorker(readyWorker, fn, data): return
  425. for i in 0..<currentPoolSize:
  426. if selectWorker(addr(workersData[i]), fn, data): return
  427. # determine what to do, but keep in mind this is expensive too:
  428. # state.calls < maxPoolSize: warmup phase
  429. # (state.calls and 127) == 0: periodic check
  430. if state.calls < maxPoolSize or (state.calls and 127) == 0:
  431. # ensure the call to 'advice' is atomic:
  432. if tryAcquire(stateLock):
  433. if currentPoolSize < minPoolSize:
  434. if not workersData[currentPoolSize].initialized:
  435. activateWorkerThread(currentPoolSize)
  436. let w = addr(workersData[currentPoolSize])
  437. atomicInc currentPoolSize
  438. if selectWorker(w, fn, data):
  439. release(stateLock)
  440. return
  441. case advice(state)
  442. of doNothing: discard
  443. of doCreateThread:
  444. if currentPoolSize < maxPoolSize:
  445. if not workersData[currentPoolSize].initialized:
  446. activateWorkerThread(currentPoolSize)
  447. let w = addr(workersData[currentPoolSize])
  448. atomicInc currentPoolSize
  449. if selectWorker(w, fn, data):
  450. release(stateLock)
  451. return
  452. # else we didn't succeed but some other thread, so do nothing.
  453. of doShutdownThread:
  454. if currentPoolSize > minPoolSize:
  455. let w = addr(workersData[currentPoolSize-1])
  456. w.shutdown = true
  457. # we don't free anything here. Too dangerous.
  458. release(stateLock)
  459. # else the acquire failed, but this means some
  460. # other thread succeeded, so we don't need to do anything here.
  461. when defined(nimRecursiveSpawn):
  462. if localThreadId > 0:
  463. # we are a worker thread, so instead of waiting for something which
  464. # might as well never happen (see tparallel_quicksort), we run the task
  465. # on the current thread instead.
  466. var self = addr(workersData[localThreadId-1])
  467. fn(self, data)
  468. blockUntil(self.taskStarted)
  469. return
  470. if isSlave:
  471. # Run under lock until `numSlavesWaiting` increment to avoid a
  472. # race (otherwise two last threads might start waiting together)
  473. withLock numSlavesLock:
  474. if numSlavesRunning <= numSlavesWaiting + 1:
  475. # All the other slaves are waiting
  476. # If we wait now, we-re deadlocked until
  477. # an external spawn happens !
  478. if currentPoolSize < maxPoolSize:
  479. if not workersData[currentPoolSize].initialized:
  480. activateWorkerThread(currentPoolSize)
  481. let w = addr(workersData[currentPoolSize])
  482. atomicInc currentPoolSize
  483. if selectWorker(w, fn, data):
  484. return
  485. else:
  486. # There is no place in the pool. We're deadlocked.
  487. # echo "Deadlock!"
  488. discard
  489. inc numSlavesWaiting
  490. blockUntil(gSomeReady)
  491. if isSlave:
  492. withLock numSlavesLock:
  493. dec numSlavesWaiting
  494. var
  495. distinguishedLock: Lock
  496. initLock distinguishedLock
  497. proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
  498. acquire(distinguishedLock)
  499. if not distinguishedData[id].initialized:
  500. activateDistinguishedThread(id)
  501. release(distinguishedLock)
  502. while true:
  503. if selectWorker(addr(distinguishedData[id]), fn, data): break
  504. blockUntil(distinguishedData[id].readyForTask)
  505. proc sync*() =
  506. ## A simple barrier to wait for all `spawn`ed tasks.
  507. ##
  508. ## If you need more elaborate waiting, you have to use an explicit barrier.
  509. while true:
  510. var allReady = true
  511. for i in 0 ..< currentPoolSize:
  512. if not allReady: break
  513. allReady = allReady and workersData[i].ready
  514. if allReady: break
  515. sleep(threadpoolWaitMs)
  516. # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
  517. # the time we establish that some are not "ready" and the time we wait for a
  518. # "signal(gSomeReady)" from inside "slave()" that can never come.
  519. setup()