thread.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /* Definitions for the threading interface.
  2. This file is part of khipu.
  3. khipu is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Lesser General Public License as published by
  5. the Free Software Foundation; either version 3 of the License, or
  6. (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with this program. If not, see <https://www.gnu.org/licenses/>. */
  13. #include <new>
  14. #include <climits>
  15. #include "khipu.hpp"
  16. #ifdef KP_PLATFORM_LINUX
  17. #include "sysdeps/futex-linux.hpp"
  18. #else
  19. #include "sysdeps/futex-generic.hpp"
  20. # if defined (KP_PLATFORM_WINDOWS)
  21. #include <windows.h>
  22. # endif
  23. #endif
  24. KP_DECLS_BEGIN
  25. // Mutexes.
  26. void lock::init (bool recursive)
  27. {
  28. if (recursive)
  29. this->vo_full |= lock::recursive_flag;
  30. this->word = 0;
  31. this->owner = UNBOUND;
  32. this->lock_cnt = 0;
  33. }
  34. result<object> alloc_lock (interpreter *interp, bool recursive_p)
  35. {
  36. lock *ret = KP_TRY (lock::alloc (interp, "lock", 4));
  37. ret->init (recursive_p);
  38. interp->alval = ret->as_obj ();
  39. gc_register (interp, ret);
  40. return (interp->alval);
  41. }
  42. struct futex_wait_point : public wait_point
  43. {
  44. atomic_t *ptr;
  45. futex_wait_point (interpreter *ip, atomic_t *p) :
  46. wait_point (ip), ptr (p)
  47. {
  48. }
  49. futex_wait_point () : futex_wait_point (interpreter::self (), nullptr)
  50. {
  51. }
  52. static result<futex_wait_point>
  53. make (interpreter *interp, atomic_t *p)
  54. {
  55. futex_wait_point ret { interp, p };
  56. KP_VTRY (ret.begin ());
  57. return (ret);
  58. }
  59. ~futex_wait_point ()
  60. {
  61. atomic_mfence_acq ();
  62. if (*this->ptr == 0)
  63. futex_wake (this->ptr, true);
  64. }
  65. };
  66. static result<bool>
  67. futex_wait (interpreter *interp, atomic_t *addr,
  68. int val, double *tp, bool tst_intr)
  69. {
  70. auto w = KP_TRY (futex_wait_point::make (interp, addr));
  71. int rv = futex_wait_impl (interp, addr, val, tp);
  72. if (tst_intr && rv == THR_INTR)
  73. KP_VTRY (interp->handle_evs ());
  74. return (rv != THR_TIMEOUT);
  75. }
  76. static result<bool>
  77. lwlock_grab_impl (interpreter *interp, atomic_t *lockp,
  78. double *tp, bool tst_intr = true)
  79. {
  80. if (lwlock_trygrab (lockp))
  81. return (true);
  82. while (true)
  83. {
  84. const int NSPINS = 100;
  85. for (int i = 0; i < NSPINS; ++i)
  86. if (*lockp != 0)
  87. break;
  88. else
  89. atomic_spin_nop ();
  90. if (atomic_swap (lockp, 2) == 0)
  91. return (true);
  92. bool rv = KP_TRY (futex_wait (interp, lockp, 2, tp, tst_intr));
  93. if (!rv)
  94. return (rv);
  95. }
  96. }
  97. result<void> lwlock_grab (interpreter *interp, atomic_t *lockp)
  98. {
  99. KP_VTRY (lwlock_grab_impl (interp, lockp, nullptr));
  100. return (0);
  101. }
  102. result<bool> lwlock_grab (interpreter *interp, atomic_t *lockp, double timeout)
  103. {
  104. bool rv = KP_TRY (lwlock_grab_impl (interp, lockp, &timeout));
  105. return (rv);
  106. }
  107. void lwlock_grab_nointr (interpreter *interp, atomic_t *lockp)
  108. {
  109. (void)lwlock_grab_impl (interp, lockp, nullptr, false);
  110. }
  111. bool lwlock_grab_nointr (interpreter *interp, atomic_t *lockp, double timeout)
  112. {
  113. return (deref (lwlock_grab_impl (interp, lockp, &timeout, false)));
  114. }
  115. void lwlock_drop (atomic_t *lockp)
  116. {
  117. if (atomic_swap (lockp, 0) == 2)
  118. futex_wake (lockp, false);
  119. }
  120. static result<int>
  121. lock_grab (interpreter *interp, lock& self, double *tp)
  122. {
  123. if (self.owner == interp->thread)
  124. {
  125. if (!self.flagged_p (lock::recursive_flag))
  126. return (THR_DEADLK);
  127. ++self.lock_cnt;
  128. return (0);
  129. }
  130. bool rv = KP_TRY (lwlock_grab_impl (interp, &self.word, tp));
  131. if (!rv)
  132. return (THR_TIMEOUT);
  133. self.owner = interp->thread;
  134. ++self.lock_cnt;
  135. new (self.ref.ptr ()) valref (interp, self.as_obj ());
  136. return (0);
  137. }
  138. result<int> lock::grab (interpreter *interp)
  139. {
  140. return (lock_grab (interp, *this, nullptr));
  141. }
  142. static inline result<double>
  143. validate_time (interpreter *interp, object timeout, bool absolute)
  144. {
  145. double tm;
  146. int iv;
  147. if (as<int> (timeout, iv))
  148. tm = iv;
  149. else if (!as<double> (timeout, tm))
  150. return (interp->raise ("type-error",
  151. "timeout must be an integer or float"));
  152. else if (finf_p (tm) || fnan_p (tm))
  153. return (interp->raise ("arg-error",
  154. "timeout must not ne NaN or infinite"));
  155. return (absolute ? tm : tm + real_time ());
  156. }
  157. result<int> lock::grab (interpreter *interp, object timeout, bool absolute)
  158. {
  159. double tp = KP_TRY (validate_time (interp, timeout, absolute));
  160. auto ret = KP_TRY (lock_grab (interp, *this, &tp));
  161. return (ret);
  162. }
  163. int lock::trygrab (interpreter *interp)
  164. {
  165. if (this->owner == interp->thread)
  166. {
  167. if (!this->flagged_p (lock::recursive_flag))
  168. return (THR_DEADLK);
  169. ++this->lock_cnt;
  170. return (0);
  171. }
  172. if (lwlock_trygrab (&this->word))
  173. {
  174. this->owner = interp->thread;
  175. ++this->lock_cnt;
  176. new (this->ref.ptr ()) valref (interp, this->as_obj ());
  177. return (0);
  178. }
  179. return (THR_ERROR);
  180. }
  181. int lock::drop (interpreter *interp)
  182. {
  183. if (this->owner != interp->thread)
  184. return (THR_PERM);
  185. else if (--this->lock_cnt == 0)
  186. {
  187. destroy (this->ref.ptr ());
  188. this->owner = UNBOUND;
  189. lwlock_drop (&this->word);
  190. }
  191. return (0);
  192. }
  193. void lock::force_release (interpreter *interp)
  194. {
  195. // XXX: Issue a warning and set it inconsistent (i.e: EOWNERDEAD).
  196. this->lock_cnt = 1;
  197. this->drop (interp);
  198. }
  199. // Condition variables.
  200. struct cv_wait_point : public wait_point
  201. {
  202. raw_condvar *cvp;
  203. int seq;
  204. cv_wait_point (interpreter *ip, raw_condvar *p, int s) :
  205. wait_point (ip), cvp (p), seq (s)
  206. {
  207. }
  208. ~cv_wait_point ()
  209. {
  210. if (!this->interp || this->seq == (int)this->cvp->seq)
  211. return;
  212. lwlock_grab_nointr (this->interp, this->cvp->lockp);
  213. this->cvp->wake (true);
  214. lwlock_drop (this->cvp->lockp);
  215. }
  216. };
  217. template <bool Tst>
  218. result<int> raw_condvar_wait (interpreter *interp, raw_condvar& self,
  219. atomic_t *lockp, double *tp)
  220. {
  221. if (self.lockp != lockp &&
  222. !atomic_cas_bool ((atomic_t *)&self.lockp, 0, (atomic_t)lockp))
  223. return (THR_ERROR);
  224. int seq = self.seq;
  225. cv_wait_point w { interp, &self, seq };
  226. if (Tst)
  227. KP_VTRY (w.begin ());
  228. else
  229. w.interp = nullptr;
  230. lwlock_drop (lockp);
  231. int rv = futex_wait_impl (interp, &self.seq, seq, tp);
  232. if (Tst && rv == THR_INTR)
  233. KP_VTRY (interp->handle_evs ());
  234. while (atomic_swap (lockp, 2) != 0)
  235. if (Tst && futex_wait_impl (interp, lockp, 2, nullptr) == THR_INTR)
  236. KP_VTRY (interp->handle_evs ());
  237. return (rv != THR_TIMEOUT ? 0 : rv);
  238. }
  239. result<int> raw_condvar::wait (interpreter *interp, atomic_t *lockp, double *tp)
  240. {
  241. return (raw_condvar_wait<true> (interp, *this, lockp, tp));
  242. }
  243. int raw_condvar::wait_nointr (interpreter *interp, atomic_t *lockp, double *tp)
  244. {
  245. return (deref (raw_condvar_wait<false> (interp, *this, lockp, tp)));
  246. }
  247. void raw_condvar::wake (bool wake_all)
  248. {
  249. atomic_t *lp = this->lockp;
  250. if (!lp)
  251. return;
  252. #ifdef KP_ARCH_WIDE
  253. while (true)
  254. {
  255. atomic_t val = this->seq;
  256. if (atomic_cas_bool (&this->seq, val, (val + 1) & 0x7fffffff))
  257. break;
  258. atomic_spin_nop ();
  259. }
  260. #else
  261. atomic_add (&this->seq, 1);
  262. #endif
  263. #ifdef FUTEX_MOVE
  264. if (wake_all)
  265. futex_move (&this->seq, lp, true);
  266. else
  267. #endif
  268. futex_wake (&this->seq, wake_all);
  269. }
  270. result<object> alloc_condvar (interpreter *interp)
  271. {
  272. condvar *ret = KP_TRY (condvar::alloc (interp, "condvar", 7));
  273. ret->base.init ();
  274. interp->alval = ret->as_obj ();
  275. gc_register (interp, ret);
  276. return (interp->alval);
  277. }
  278. static result<lock*>
  279. validate_lock (interpreter *interp, object lk)
  280. {
  281. lock *lp = as<lock> (lk);
  282. if (!lp)
  283. return (interp->raise ("type-error", "first argument must be a lock"));
  284. else if (lp->owner != interp->thread)
  285. return (interp->raise ("arg-error",
  286. "lock must be owned by the calling thread"));
  287. return (lp);
  288. }
  289. static result<int>
  290. cv_wait (interpreter *interp, condvar& cv, lock& lk, double *tp)
  291. {
  292. uint32_t cnt = lk.lock_cnt;
  293. lk.lock_cnt = 0;
  294. lk.owner = UNBOUND;
  295. destroy (lk.ref.ptr ());
  296. int ret = KP_TRY (cv.base.wait (interp, &lk.word, tp));
  297. if (ret == 0)
  298. { // Re-acquire the lock.
  299. new (lk.ref.ptr ()) valref (interp, lk.as_obj ());
  300. lk.lock_cnt = cnt;
  301. lk.owner = interp->thread;
  302. }
  303. return (ret);
  304. }
  305. result<int> condvar::wait (interpreter *interp, object lk)
  306. {
  307. lock *lp = KP_TRY (validate_lock (interp, lk));
  308. auto ret = KP_TRY (cv_wait (interp, *this, *lp, nullptr));
  309. return (ret);
  310. }
  311. result<int> condvar::wait (interpreter *interp, object lk,
  312. object timeout, bool absolute)
  313. {
  314. lock *lp = KP_TRY (validate_lock (interp, lk));
  315. double tp = KP_TRY (validate_time (interp, timeout, absolute));
  316. auto rv = KP_TRY (cv_wait (interp, *this, *lp, &tp));
  317. return (rv);
  318. }
  319. // External definitions.
  320. dlist all_threads;
  321. atomic_t all_threads_lock;
  322. uint32_t num_threads;
  323. static inline system_thread_t xthr_self ()
  324. {
  325. #ifdef KP_PLATFORM_UNIX
  326. return (pthread_self ());
  327. #elif defined (KP_PLATFORM_WINDOWS)
  328. void *ph = GetCurrentProcess (), *th = GetCurrentThread (), *tp = nullptr;
  329. DuplicateHandle (ph, th, ph, &tp, 0, FALSE, DUPLICATE_SAME_ACCESS);
  330. return (tp);
  331. #else
  332. # error "implement me"
  333. #endif
  334. }
  335. void sync_event::wait (interpreter *interp)
  336. {
  337. lwlock_grab_nointr (interp, &this->lock);
  338. if (++this->n_waiters == this->limit)
  339. { // If the suspender thread is waiting, awaken it.
  340. this->limit = 0;
  341. this->cv.wake (false);
  342. }
  343. while (!this->waiters_sig)
  344. deref (this->cv.wait (interp, &this->lock));
  345. lwlock_drop (&this->lock);
  346. }
  347. void sync_event::wake_all (interpreter *interp)
  348. {
  349. lwlock_grab_nointr (interp, &this->lock);
  350. this->waiters_sig = 1;
  351. lwlock_drop (&this->lock);
  352. this->cv.wake (true);
  353. }
  354. static int
  355. do_init_threads (interpreter *interp)
  356. {
  357. static thread main_thread;
  358. thread *tp = ensure_mask (&main_thread);
  359. tp->handle = xthr_self ();
  360. #ifdef KP_PLATFORM_WINDOWS
  361. if (!tp->handle)
  362. return (init_op::fail ("could not duplicate thread handle"));
  363. #endif
  364. #ifdef FUTEX_NEED_SLEEPQ
  365. if (!(tp->sleep_q = alloc_sleepq ()))
  366. return (init_op::fail ("could not allocate sleep queue"));
  367. #endif
  368. if (!futex_init (interp))
  369. return (init_op::fail ("could not initialize futex subsystem"));
  370. tp->join_ev = 0;
  371. all_threads.init_head ();
  372. all_threads.add (&tp->thr_link);
  373. tp->retval = UNBOUND;
  374. tp->vo_type = typecode::THREAD;
  375. lwlock_init (&tp->ilock);
  376. num_threads = 1;
  377. // Bind the thread and interpreter together.
  378. tp->interp = interp;
  379. interp->thread = tp->as_obj ();
  380. // Set TLS pointer.
  381. interpreter::set_self (interp);
  382. return (init_op::result_ok);
  383. }
  384. init_op init_threads (do_init_threads, "threads");
  385. KP_DECLS_END