123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- /* Definitions for the threading interface.
- This file is part of khipu.
- khipu is free software: you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <https://www.gnu.org/licenses/>. */
- #include <new>
- #include <climits>
- #include "khipu.hpp"
- #ifdef KP_PLATFORM_LINUX
- #include "sysdeps/futex-linux.hpp"
- #else
- #include "sysdeps/futex-generic.hpp"
- # if defined (KP_PLATFORM_WINDOWS)
- #include <windows.h>
- # endif
- #endif
- KP_DECLS_BEGIN
- // Mutexes.
- void lock::init (bool recursive)
- {
- if (recursive)
- this->vo_full |= lock::recursive_flag;
- this->word = 0;
- this->owner = UNBOUND;
- this->lock_cnt = 0;
- }
- result<object> alloc_lock (interpreter *interp, bool recursive_p)
- {
- lock *ret = KP_TRY (lock::alloc (interp, "lock", 4));
- ret->init (recursive_p);
- interp->alval = ret->as_obj ();
- gc_register (interp, ret);
- return (interp->alval);
- }
- struct futex_wait_point : public wait_point
- {
- atomic_t *ptr;
- futex_wait_point (interpreter *ip, atomic_t *p) :
- wait_point (ip), ptr (p)
- {
- }
- futex_wait_point () : futex_wait_point (interpreter::self (), nullptr)
- {
- }
- static result<futex_wait_point>
- make (interpreter *interp, atomic_t *p)
- {
- futex_wait_point ret { interp, p };
- KP_VTRY (ret.begin ());
- return (ret);
- }
- ~futex_wait_point ()
- {
- atomic_mfence_acq ();
- if (*this->ptr == 0)
- futex_wake (this->ptr, true);
- }
- };
- static result<bool>
- futex_wait (interpreter *interp, atomic_t *addr,
- int val, double *tp, bool tst_intr)
- {
- auto w = KP_TRY (futex_wait_point::make (interp, addr));
- int rv = futex_wait_impl (interp, addr, val, tp);
- if (tst_intr && rv == THR_INTR)
- KP_VTRY (interp->handle_evs ());
- return (rv != THR_TIMEOUT);
- }
- static result<bool>
- lwlock_grab_impl (interpreter *interp, atomic_t *lockp,
- double *tp, bool tst_intr = true)
- {
- if (lwlock_trygrab (lockp))
- return (true);
- while (true)
- {
- const int NSPINS = 100;
- for (int i = 0; i < NSPINS; ++i)
- if (*lockp != 0)
- break;
- else
- atomic_spin_nop ();
- if (atomic_swap (lockp, 2) == 0)
- return (true);
- bool rv = KP_TRY (futex_wait (interp, lockp, 2, tp, tst_intr));
- if (!rv)
- return (rv);
- }
- }
- result<void> lwlock_grab (interpreter *interp, atomic_t *lockp)
- {
- KP_VTRY (lwlock_grab_impl (interp, lockp, nullptr));
- return (0);
- }
- result<bool> lwlock_grab (interpreter *interp, atomic_t *lockp, double timeout)
- {
- bool rv = KP_TRY (lwlock_grab_impl (interp, lockp, &timeout));
- return (rv);
- }
- void lwlock_grab_nointr (interpreter *interp, atomic_t *lockp)
- {
- (void)lwlock_grab_impl (interp, lockp, nullptr, false);
- }
- bool lwlock_grab_nointr (interpreter *interp, atomic_t *lockp, double timeout)
- {
- return (deref (lwlock_grab_impl (interp, lockp, &timeout, false)));
- }
- void lwlock_drop (atomic_t *lockp)
- {
- if (atomic_swap (lockp, 0) == 2)
- futex_wake (lockp, false);
- }
- static result<int>
- lock_grab (interpreter *interp, lock& self, double *tp)
- {
- if (self.owner == interp->thread)
- {
- if (!self.flagged_p (lock::recursive_flag))
- return (THR_DEADLK);
- ++self.lock_cnt;
- return (0);
- }
- bool rv = KP_TRY (lwlock_grab_impl (interp, &self.word, tp));
- if (!rv)
- return (THR_TIMEOUT);
- self.owner = interp->thread;
- ++self.lock_cnt;
- new (self.ref.ptr ()) valref (interp, self.as_obj ());
- return (0);
- }
- result<int> lock::grab (interpreter *interp)
- {
- return (lock_grab (interp, *this, nullptr));
- }
- static inline result<double>
- validate_time (interpreter *interp, object timeout, bool absolute)
- {
- double tm;
- int iv;
- if (as<int> (timeout, iv))
- tm = iv;
- else if (!as<double> (timeout, tm))
- return (interp->raise ("type-error",
- "timeout must be an integer or float"));
- else if (finf_p (tm) || fnan_p (tm))
- return (interp->raise ("arg-error",
- "timeout must not ne NaN or infinite"));
- return (absolute ? tm : tm + real_time ());
- }
- result<int> lock::grab (interpreter *interp, object timeout, bool absolute)
- {
- double tp = KP_TRY (validate_time (interp, timeout, absolute));
- auto ret = KP_TRY (lock_grab (interp, *this, &tp));
- return (ret);
- }
- int lock::trygrab (interpreter *interp)
- {
- if (this->owner == interp->thread)
- {
- if (!this->flagged_p (lock::recursive_flag))
- return (THR_DEADLK);
- ++this->lock_cnt;
- return (0);
- }
- if (lwlock_trygrab (&this->word))
- {
- this->owner = interp->thread;
- ++this->lock_cnt;
- new (this->ref.ptr ()) valref (interp, this->as_obj ());
- return (0);
- }
- return (THR_ERROR);
- }
- int lock::drop (interpreter *interp)
- {
- if (this->owner != interp->thread)
- return (THR_PERM);
- else if (--this->lock_cnt == 0)
- {
- destroy (this->ref.ptr ());
- this->owner = UNBOUND;
- lwlock_drop (&this->word);
- }
- return (0);
- }
- void lock::force_release (interpreter *interp)
- {
- // XXX: Issue a warning and set it inconsistent (i.e: EOWNERDEAD).
- this->lock_cnt = 1;
- this->drop (interp);
- }
- // Condition variables.
- struct cv_wait_point : public wait_point
- {
- raw_condvar *cvp;
- int seq;
- cv_wait_point (interpreter *ip, raw_condvar *p, int s) :
- wait_point (ip), cvp (p), seq (s)
- {
- }
- ~cv_wait_point ()
- {
- if (!this->interp || this->seq == (int)this->cvp->seq)
- return;
- lwlock_grab_nointr (this->interp, this->cvp->lockp);
- this->cvp->wake (true);
- lwlock_drop (this->cvp->lockp);
- }
- };
- template <bool Tst>
- result<int> raw_condvar_wait (interpreter *interp, raw_condvar& self,
- atomic_t *lockp, double *tp)
- {
- if (self.lockp != lockp &&
- !atomic_cas_bool ((atomic_t *)&self.lockp, 0, (atomic_t)lockp))
- return (THR_ERROR);
- int seq = self.seq;
- cv_wait_point w { interp, &self, seq };
- if (Tst)
- KP_VTRY (w.begin ());
- else
- w.interp = nullptr;
- lwlock_drop (lockp);
- int rv = futex_wait_impl (interp, &self.seq, seq, tp);
- if (Tst && rv == THR_INTR)
- KP_VTRY (interp->handle_evs ());
- while (atomic_swap (lockp, 2) != 0)
- if (Tst && futex_wait_impl (interp, lockp, 2, nullptr) == THR_INTR)
- KP_VTRY (interp->handle_evs ());
- return (rv != THR_TIMEOUT ? 0 : rv);
- }
- result<int> raw_condvar::wait (interpreter *interp, atomic_t *lockp, double *tp)
- {
- return (raw_condvar_wait<true> (interp, *this, lockp, tp));
- }
- int raw_condvar::wait_nointr (interpreter *interp, atomic_t *lockp, double *tp)
- {
- return (deref (raw_condvar_wait<false> (interp, *this, lockp, tp)));
- }
- void raw_condvar::wake (bool wake_all)
- {
- atomic_t *lp = this->lockp;
- if (!lp)
- return;
- #ifdef KP_ARCH_WIDE
- while (true)
- {
- atomic_t val = this->seq;
- if (atomic_cas_bool (&this->seq, val, (val + 1) & 0x7fffffff))
- break;
- atomic_spin_nop ();
- }
- #else
- atomic_add (&this->seq, 1);
- #endif
- #ifdef FUTEX_MOVE
- if (wake_all)
- futex_move (&this->seq, lp, true);
- else
- #endif
- futex_wake (&this->seq, wake_all);
- }
- result<object> alloc_condvar (interpreter *interp)
- {
- condvar *ret = KP_TRY (condvar::alloc (interp, "condvar", 7));
- ret->base.init ();
- interp->alval = ret->as_obj ();
- gc_register (interp, ret);
- return (interp->alval);
- }
- static result<lock*>
- validate_lock (interpreter *interp, object lk)
- {
- lock *lp = as<lock> (lk);
- if (!lp)
- return (interp->raise ("type-error", "first argument must be a lock"));
- else if (lp->owner != interp->thread)
- return (interp->raise ("arg-error",
- "lock must be owned by the calling thread"));
- return (lp);
- }
- static result<int>
- cv_wait (interpreter *interp, condvar& cv, lock& lk, double *tp)
- {
- uint32_t cnt = lk.lock_cnt;
- lk.lock_cnt = 0;
- lk.owner = UNBOUND;
- destroy (lk.ref.ptr ());
- int ret = KP_TRY (cv.base.wait (interp, &lk.word, tp));
- if (ret == 0)
- { // Re-acquire the lock.
- new (lk.ref.ptr ()) valref (interp, lk.as_obj ());
- lk.lock_cnt = cnt;
- lk.owner = interp->thread;
- }
- return (ret);
- }
- result<int> condvar::wait (interpreter *interp, object lk)
- {
- lock *lp = KP_TRY (validate_lock (interp, lk));
- auto ret = KP_TRY (cv_wait (interp, *this, *lp, nullptr));
- return (ret);
- }
- result<int> condvar::wait (interpreter *interp, object lk,
- object timeout, bool absolute)
- {
- lock *lp = KP_TRY (validate_lock (interp, lk));
- double tp = KP_TRY (validate_time (interp, timeout, absolute));
- auto rv = KP_TRY (cv_wait (interp, *this, *lp, &tp));
- return (rv);
- }
- // External definitions.
- dlist all_threads;
- atomic_t all_threads_lock;
- uint32_t num_threads;
- static inline system_thread_t xthr_self ()
- {
- #ifdef KP_PLATFORM_UNIX
- return (pthread_self ());
- #elif defined (KP_PLATFORM_WINDOWS)
- void *ph = GetCurrentProcess (), *th = GetCurrentThread (), *tp = nullptr;
- DuplicateHandle (ph, th, ph, &tp, 0, FALSE, DUPLICATE_SAME_ACCESS);
- return (tp);
- #else
- # error "implement me"
- #endif
- }
- void sync_event::wait (interpreter *interp)
- {
- lwlock_grab_nointr (interp, &this->lock);
- if (++this->n_waiters == this->limit)
- { // If the suspender thread is waiting, awaken it.
- this->limit = 0;
- this->cv.wake (false);
- }
- while (!this->waiters_sig)
- deref (this->cv.wait (interp, &this->lock));
- lwlock_drop (&this->lock);
- }
- void sync_event::wake_all (interpreter *interp)
- {
- lwlock_grab_nointr (interp, &this->lock);
- this->waiters_sig = 1;
- lwlock_drop (&this->lock);
- this->cv.wake (true);
- }
- static int
- do_init_threads (interpreter *interp)
- {
- static thread main_thread;
- thread *tp = ensure_mask (&main_thread);
- tp->handle = xthr_self ();
- #ifdef KP_PLATFORM_WINDOWS
- if (!tp->handle)
- return (init_op::fail ("could not duplicate thread handle"));
- #endif
- #ifdef FUTEX_NEED_SLEEPQ
- if (!(tp->sleep_q = alloc_sleepq ()))
- return (init_op::fail ("could not allocate sleep queue"));
- #endif
- if (!futex_init (interp))
- return (init_op::fail ("could not initialize futex subsystem"));
- tp->join_ev = 0;
- all_threads.init_head ();
- all_threads.add (&tp->thr_link);
- tp->retval = UNBOUND;
- tp->vo_type = typecode::THREAD;
- lwlock_init (&tp->ilock);
- num_threads = 1;
- // Bind the thread and interpreter together.
- tp->interp = interp;
- interp->thread = tp->as_obj ();
- // Set TLS pointer.
- interpreter::set_self (interp);
- return (init_op::result_ok);
- }
- init_op init_threads (do_init_threads, "threads");
- KP_DECLS_END
|