123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- #include "xrcu.hpp"
- #include "xatomic.hpp"
- #include "version.hpp"
- #include <thread>
- #include <mutex>
- #include <atomic>
- #include <chrono>
- #include <cstdint>
- #include <ctime>
- #include <functional>
- namespace xrcu
- {
- struct td_link
- {
- td_link *next;
- td_link *prev;
- void init_head ()
- {
- this->next = this->prev = this;
- }
- void add (td_link *headp)
- { // Add self to HEADP.
- this->next = headp->next;
- this->prev = headp;
- headp->next->prev = this;
- headp->next = this;
- }
- void del ()
- { // Unlink self.
- this->next->prev = this->prev;
- this->prev->next = this->next;
- }
- bool empty_p () const
- {
- return (this == this->next);
- }
- bool linked_p () const
- {
- return (this->next != nullptr);
- }
- void splice (td_link *dst)
- {
- if (this->empty_p ())
- return;
- this->next->prev = dst;
- this->prev->next = dst->next;
- dst->next->prev = this->prev;
- dst->next = this->next;
- }
- };
- static const uintptr_t GP_PHASE_BIT =
- (uintptr_t)1 << (sizeof (uintptr_t) * 8 - 1);
- static const uintptr_t GP_NEST_MASK = GP_PHASE_BIT - 1;
- // Possible states for a reader thread.
- enum
- {
- rd_active,
- rd_inactive,
- rd_old
- };
- struct registry
- {
- std::atomic_uintptr_t counter;
- td_link root;
- std::mutex td_mtx;
- std::mutex gp_mtx;
- static const unsigned int QS_ATTEMPTS = 1000;
- registry () : counter (1)
- {
- this->root.init_head ();
- }
- void add_tdata (td_link *lp)
- {
- this->td_mtx.lock ();
- lp->add (&this->root);
- this->td_mtx.unlock ();
- }
- uintptr_t get_ctr () const
- {
- return (this->counter.load (std::memory_order_relaxed));
- }
- void poll_readers (td_link *, td_link *, td_link *);
- void sync ();
- };
- static registry global_reg;
- // Maximum number of pending finalizers before flushing.
- #ifndef XRCU_MAX_FINS
- # define XRCU_MAX_FINS 1000
- #endif
- static const unsigned int MAX_FINS = XRCU_MAX_FINS;
- struct tl_data : public td_link
- {
- bool must_flush;
- unsigned int n_fins;
- std::atomic_uintptr_t counter;
- size_t xrand_val;
- finalizable *fin_objs;
- uintptr_t get_ctr () const
- {
- return (this->counter.load (std::memory_order_relaxed));
- }
- int state () const
- {
- auto val = this->counter.load (std::memory_order_acquire);
- if (!(val & GP_NEST_MASK))
- return (rd_inactive);
- else if (!((val ^ global_reg.get_ctr ()) & GP_PHASE_BIT))
- return (rd_active);
- else
- return (rd_old);
- }
- bool in_cs () const
- {
- return ((this->get_ctr () & GP_NEST_MASK) != 0);
- }
- bool flush_all ()
- {
- if (!sync ())
- return (false);
- for (auto f = this->fin_objs; f != nullptr; )
- {
- auto next = f->_Fin_next;
- f->safe_destroy ();
- f = next;
- }
- this->fin_objs = nullptr;
- this->n_fins = 0;
- this->must_flush = false;
- return (true);
- }
- void finalize (finalizable *finp)
- {
- finp->_Fin_next = this->fin_objs;
- this->fin_objs = finp;
- if (++this->n_fins < MAX_FINS)
- ;
- else if (!this->flush_all ())
- /* Couldn't reclaim memory since we are in a critical section.
- * Set the flag to do it ASAP. */
- this->must_flush = true;
- }
- ~tl_data ()
- {
- if (!this->linked_p ())
- return;
- this->counter.store (0, std::memory_order_release);
- if (this->n_fins > 0)
- this->flush_all ();
- global_reg.td_mtx.lock ();
- this->del ();
- global_reg.td_mtx.unlock ();
- }
- };
- static thread_local tl_data tldata {};
- static inline tl_data*
- local_data ()
- {
- auto self = &tldata;
- if (!self->linked_p ())
- global_reg.add_tdata (self);
- return (self);
- }
- void enter_cs ()
- {
- auto self = local_data ();
- auto val = self->get_ctr ();
- val = (val & GP_NEST_MASK) == 0 ? global_reg.get_ctr () : val + 1;
- self->counter.store (val, std::memory_order_release);
- }
- void exit_cs ()
- {
- auto self = local_data ();
- auto val = self->get_ctr ();
- self->counter.store (val - 1, std::memory_order_release);
- if (self->must_flush && !self->in_cs ())
- self->flush_all ();
- }
- bool in_cs ()
- {
- auto self = &tldata;
- return (self->linked_p () && self->in_cs ());
- }
- void registry::poll_readers (td_link *readers, td_link *outp, td_link *qsp)
- {
- for (unsigned int loops = 0 ; ; ++loops)
- {
- td_link *next, *runp = readers->next;
- for (; runp != readers; runp = next)
- {
- next = runp->next;
- switch (((tl_data *)runp)->state ())
- {
- case rd_active:
- if (outp != nullptr)
- {
- runp->del ();
- runp->add (outp);
- break;
- }
- // Fallthrough.
- case rd_inactive:
- runp->del ();
- runp->add (qsp);
- break;
- default:
- break;
- }
- }
- if (readers->empty_p ())
- break;
- this->td_mtx.unlock ();
- if (loops < QS_ATTEMPTS)
- xatomic_spin_nop ();
- else
- std::this_thread::sleep_for (std::chrono::milliseconds (1));
- this->td_mtx.lock ();
- }
- }
- void registry::sync ()
- {
- this->gp_mtx.lock ();
- this->td_mtx.lock ();
- if (this->root.empty_p ())
- {
- this->td_mtx.unlock ();
- this->gp_mtx.unlock ();
- return;
- }
- td_link out, qs;
- qs.init_head ();
- out.init_head ();
- std::atomic_thread_fence (std::memory_order_acq_rel);
- poll_readers (&this->root, &out, &qs);
- this->counter.store (this->get_ctr () ^ GP_PHASE_BIT,
- std::memory_order_relaxed);
- poll_readers (&out, nullptr, &qs);
- qs.splice (&this->root);
- this->td_mtx.unlock ();
- this->gp_mtx.unlock ();
- }
- bool sync ()
- {
- if (in_cs ())
- return (false);
- global_reg.sync ();
- return (true);
- }
- void finalize (finalizable *finp)
- {
- if (finp)
- local_data()->finalize (finp);
- }
- bool flush_finalizers ()
- {
- auto tld = local_data ();
- bool ret = tld->flush_all ();
- if (!ret)
- tld->must_flush = true;
- return (ret);
- }
- unsigned int xrand ()
- {
- auto self = &tldata; // Avoid local_data ()
- if (!self->xrand_val)
- self->xrand_val = (unsigned int)(time (nullptr) ^
- std::hash<std::thread::id>() (std::this_thread::get_id ()));
- self->xrand_val = self->xrand_val * 1103515245 + 12345;
- return (self->xrand_val >> 16);
- }
- static void
- atfork_prepare ()
- {
- global_reg.td_mtx.lock ();
- }
- static void
- atfork_parent ()
- {
- global_reg.td_mtx.unlock ();
- }
- static void
- atfork_child ()
- {
- atfork_parent ();
- // Reset the registry
- global_reg.root.init_head ();
- auto self = &tldata;
- if (!self->linked_p ())
- return;
- // Manually add ourselves to the registry without locking.
- self->add (&global_reg.root);
- }
- atfork atfork_data ()
- {
- atfork ret;
- ret.prepare = atfork_prepare;
- ret.parent = atfork_parent;
- ret.child = atfork_child;
- return (ret);
- }
- void library_version (int& major, int& minor)
- {
- major = MAJOR, minor = MINOR;
- }
- } // namespace rcu
|