queue.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. #ifndef __XRCU_QUEUE_HPP__
  2. #define __XRCU_QUEUE_HPP__ 1
  3. #include "xrcu.hpp"
  4. #include "optional.hpp"
  5. #include "xatomic.hpp"
  6. #include "utils.hpp"
  7. #include <cstddef>
  8. #include <atomic>
  9. #include <type_traits>
  10. #include <initializer_list>
  11. namespace std
  12. {
  13. struct forward_iterator_tag;
  14. }
  15. namespace xrcu
  16. {
  17. namespace detail
  18. {
  19. static inline uint32_t upsize (uint32_t x)
  20. {
  21. x |= x >> 1;
  22. x |= x >> 2;
  23. x |= x >> 4;
  24. x |= x >> 8;
  25. x |= x >> 16;
  26. return (x + 1);
  27. }
  28. static inline uint64_t upsize (uint64_t x)
  29. {
  30. x |= x >> 1;
  31. x |= x >> 2;
  32. x |= x >> 4;
  33. x |= x >> 8;
  34. x |= x >> 16;
  35. x |= x >> 32;
  36. return (x + 1);
  37. }
  38. struct alignas (uintptr_t) q_data : public finalizable
  39. {
  40. uintptr_t *ptrs;
  41. size_t cap;
  42. std::atomic<size_t> wr_idx;
  43. std::atomic<size_t> rd_idx;
  44. static q_data* make (size_t cnt, uintptr_t empty);
  45. size_t _Wridx () const
  46. {
  47. return (this->wr_idx.load (std::memory_order_relaxed));
  48. }
  49. size_t _Rdidx () const
  50. {
  51. return (this->rd_idx.load (std::memory_order_relaxed));
  52. }
  53. bool push (uintptr_t val, uintptr_t xbit, uintptr_t empty)
  54. {
  55. while (true)
  56. {
  57. size_t curr = this->_Wridx ();
  58. if (curr >= this->cap)
  59. return (false);
  60. uintptr_t xv = this->ptrs[curr];
  61. if ((xv & xbit) != 0)
  62. return (false);
  63. else if (xv == empty &&
  64. xatomic_cas_bool (&this->ptrs[curr], xv, val))
  65. {
  66. this->wr_idx.fetch_add (1, std::memory_order_acq_rel);
  67. return (true);
  68. }
  69. xatomic_spin_nop ();
  70. }
  71. }
  72. uintptr_t pop (uintptr_t xbit, uintptr_t dfl)
  73. {
  74. while (true)
  75. {
  76. size_t curr = this->_Rdidx ();
  77. if (curr >= this->_Wridx ())
  78. return (dfl);
  79. uintptr_t rv = this->ptrs[curr];
  80. if ((rv & xbit) != 0 || rv == dfl)
  81. return (xbit);
  82. else if (xatomic_cas_bool (&this->ptrs[curr], rv, dfl))
  83. {
  84. this->rd_idx.fetch_add (1, std::memory_order_relaxed);
  85. return (rv);
  86. }
  87. xatomic_spin_nop ();
  88. }
  89. }
  90. uintptr_t front () const
  91. {
  92. return (this->ptrs[this->_Rdidx ()]);
  93. }
  94. uintptr_t back () const
  95. {
  96. size_t idx = this->_Wridx ();
  97. if (idx == 0)
  98. return (this->ptrs[this->cap]);
  99. return (this->ptrs[idx - 1]);
  100. }
  101. size_t size () const
  102. {
  103. return (this->_Wridx () - this->_Rdidx ());
  104. }
  105. void safe_destroy ();
  106. };
  107. inline void
  108. q_replace_cb (std::atomic<q_data *>& ptr, q_data *old, q_data *nq, uintptr_t)
  109. {
  110. finalize (old);
  111. ptr.store (nq, std::memory_order_relaxed);
  112. }
  113. inline void
  114. q_clear_cb (std::atomic<q_data *>&, q_data *old, q_data *, uintptr_t empty)
  115. {
  116. old->wr_idx.store (old->cap, std::memory_order_relaxed);
  117. old->rd_idx.store (old->cap, std::memory_order_relaxed);
  118. for (size_t i = 0; i < old->cap; ++i)
  119. old->ptrs[i] = empty;
  120. old->rd_idx.store (0, std::memory_order_release);
  121. old->wr_idx.store (0, std::memory_order_release);
  122. }
  123. } // namespace detail
  124. template <class T>
  125. struct queue
  126. {
  127. typedef detail::wrapped_traits<(sizeof (T) < sizeof (uintptr_t) &&
  128. std::is_integral<T>::value) || (std::is_pointer<T>::value &&
  129. alignof (T) >= 8), T> val_traits;
  130. std::atomic<detail::q_data *> impl;
  131. typedef T value_type;
  132. typedef T& reference;
  133. typedef const T& const_reference;
  134. typedef T* pointer;
  135. typedef const T* const_pointer;
  136. typedef ptrdiff_t difference_type;
  137. typedef size_t size_type;
  138. void _Init (size_t size)
  139. {
  140. this->impl = detail::q_data::make (size, val_traits::FREE);
  141. }
  142. detail::q_data* _Data ()
  143. {
  144. return (this->impl.load (std::memory_order_relaxed));
  145. }
  146. const detail::q_data* _Data () const
  147. {
  148. return (this->impl.load (std::memory_order_relaxed));
  149. }
  150. void _Set_data (detail::q_data *qdp)
  151. {
  152. this->impl.store (qdp, std::memory_order_relaxed);
  153. }
  154. struct iterator : public cs_guard
  155. {
  156. const detail::q_data *qdp;
  157. size_t idx;
  158. uintptr_t c_val;
  159. typedef std::forward_iterator_tag iterator_category;
  160. iterator (const detail::q_data *q, size_t s) : qdp (q), idx (s)
  161. {
  162. if (this->qdp)
  163. this->_Adv ();
  164. }
  165. iterator (const iterator& it) :
  166. qdp (it.qdp), idx (it.idx), c_val (it.c_val)
  167. {
  168. }
  169. iterator (iterator&& it) : qdp (it.qdp), idx (it.idx), c_val (it.c_val)
  170. {
  171. it.qdp = nullptr;
  172. }
  173. void _Adv ()
  174. {
  175. for (; this->idx < this->qdp->cap; ++this->idx)
  176. {
  177. this->c_val = this->qdp->ptrs[this->idx] & ~val_traits::XBIT;
  178. if (this->c_val != val_traits::DELT &&
  179. this->c_val != val_traits::FREE)
  180. return;
  181. }
  182. this->qdp = nullptr;
  183. this->idx = 0;
  184. }
  185. T operator* ()
  186. {
  187. return (val_traits::get (this->c_val));
  188. }
  189. iterator& operator++ ()
  190. {
  191. ++this->idx;
  192. this->_Adv ();
  193. return (*this);
  194. }
  195. iterator operator++ (int)
  196. {
  197. iterator rv { *this };
  198. ++*this;
  199. return (rv);
  200. }
  201. bool operator== (const iterator& it) const
  202. {
  203. return (this->qdp == it.qdp && this->idx == it.idx);
  204. }
  205. bool operator!= (const iterator& it) const
  206. {
  207. return (!(*this == it));
  208. }
  209. };
  210. typedef iterator const_iterator;
  211. queue ()
  212. {
  213. this->_Init (8);
  214. }
  215. template <class T1, class T2>
  216. void _Init (T1 n, const T2& val, std::true_type)
  217. {
  218. size_t ns = (size_t)n;
  219. auto qdp = detail::q_data::make (detail::upsize (ns), val_traits::FREE);
  220. try
  221. {
  222. for (size_t i = 0; i < ns; )
  223. {
  224. qdp->ptrs[i] = val_traits::make (val);
  225. qdp->wr_idx.store (++i, std::memory_order_relaxed);
  226. }
  227. }
  228. catch (...)
  229. {
  230. _Destroy (qdp);
  231. throw;
  232. }
  233. this->_Set_data (qdp);
  234. }
  235. template <class It>
  236. void _Init (It first, It last, std::false_type)
  237. {
  238. auto qdp = detail::q_data::make (8, val_traits::FREE);
  239. try
  240. {
  241. for (size_t i = 0; first != last; ++first)
  242. {
  243. qdp->ptrs[i] = val_traits::make (*first);
  244. qdp->wr_idx.store (++i, std::memory_order_relaxed);
  245. if (i == qdp->cap)
  246. {
  247. auto q2 = detail::q_data::make (i * 2, val_traits::FREE);
  248. for (size_t j = 0; j < i; ++j)
  249. q2->ptrs[j] = qdp->ptrs[j];
  250. qdp->safe_destroy ();
  251. qdp = q2;
  252. }
  253. }
  254. }
  255. catch (...)
  256. {
  257. _Destroy (qdp);
  258. throw;
  259. }
  260. this->_Set_data (qdp);
  261. }
  262. template <class T1, class T2>
  263. queue (T1 first, T2 last)
  264. {
  265. this->_Init (first, last, typename std::is_integral<T1>::type ());
  266. }
  267. queue (const queue<T>& right) : queue (right.begin (), right.end ())
  268. {
  269. }
  270. queue (queue<T>&& right)
  271. {
  272. this->_Set_data (right._Data ());
  273. right._Set_data (nullptr);
  274. }
  275. queue (std::initializer_list<T> lst) : queue (lst.begin (), lst.end ())
  276. {
  277. }
  278. bool _Rearm (uintptr_t elem, detail::q_data *qdp)
  279. {
  280. size_t ix;
  281. uintptr_t prev;
  282. while (true)
  283. {
  284. ix = qdp->_Rdidx ();
  285. prev = xatomic_or (&qdp->ptrs[ix], val_traits::XBIT);
  286. if (prev == val_traits::DELT)
  287. { // Another thread deleted this entry - Retry.
  288. xatomic_spin_nop ();
  289. continue;
  290. }
  291. else if ((prev & val_traits::XBIT) == 0)
  292. break;
  293. while (true)
  294. {
  295. if (qdp != this->_Data ())
  296. // Impl pointer has been installed - Return.
  297. return (false);
  298. else if ((qdp->ptrs[ix] & val_traits::XBIT) == 0)
  299. /* The thread rearming the queue raised an exception.
  300. * Recurse and see if we can pick up the slack. */
  301. return (this->_Rearm (elem, qdp));
  302. xatomic_spin_nop ();
  303. }
  304. }
  305. detail::q_data *nq = nullptr;
  306. try
  307. {
  308. nq = detail::q_data::make (qdp->cap * 2, val_traits::FREE);
  309. }
  310. catch (...)
  311. {
  312. xatomic_and (&qdp->ptrs[ix], ~val_traits::XBIT);
  313. val_traits::free (elem);
  314. throw;
  315. }
  316. uintptr_t *outp = nq->ptrs;
  317. for (*outp++ = prev; ++ix < qdp->cap; )
  318. *outp++ = xatomic_or (&qdp->ptrs[ix], val_traits::XBIT);
  319. *outp++ = elem;
  320. nq->wr_idx.store (outp - nq->ptrs, std::memory_order_relaxed);
  321. finalize (qdp);
  322. this->_Set_data (nq);
  323. return (true);
  324. }
  325. void _Push (uintptr_t val)
  326. {
  327. while (true)
  328. {
  329. auto qdp = this->_Data ();
  330. if (qdp->push (val, val_traits::XBIT, val_traits::FREE) ||
  331. this->_Rearm (val, qdp))
  332. break;
  333. }
  334. }
  335. void push (const T& elem)
  336. {
  337. cs_guard g;
  338. this->_Push (val_traits::make (elem));
  339. }
  340. template <class ...Args>
  341. void emplace (Args&& ...args)
  342. {
  343. cs_guard g;
  344. this->_Push (val_traits::make (std::forward<Args>(args)...));
  345. }
  346. optional<T> pop ()
  347. {
  348. cs_guard g;
  349. while (true)
  350. {
  351. auto qdp = this->_Data ();
  352. uintptr_t val = qdp->pop (val_traits::XBIT, val_traits::DELT);
  353. if (val == val_traits::DELT)
  354. // Queue is empty.
  355. return (optional<T> ());
  356. else if (val != val_traits::XBIT)
  357. {
  358. val_traits::destroy (val);
  359. optional<T> rv (val_traits::get (val));
  360. return (rv);
  361. }
  362. while (qdp == this->_Data ())
  363. xatomic_spin_nop ();
  364. }
  365. }
  366. optional<T> front () const
  367. {
  368. cs_guard g;
  369. uintptr_t rv = this->_Data()->front () & ~val_traits::XBIT;
  370. if (rv == val_traits::FREE)
  371. return (optional<T> ());
  372. return (optional<T> (val_traits::get (rv)));
  373. }
  374. optional<T> back () const
  375. {
  376. cs_guard g;
  377. uintptr_t rv = this->_Data()->back () & ~val_traits::XBIT;
  378. if (rv == val_traits::FREE)
  379. return (optional<T> ());
  380. return (optional<T> (val_traits::get (rv)));
  381. }
  382. size_t size () const
  383. {
  384. cs_guard g;
  385. return (this->_Data()->size ());
  386. }
  387. bool empty () const
  388. {
  389. return (this->size () == 0);
  390. }
  391. size_t max_size () const
  392. {
  393. return (~(size_t)0);
  394. }
  395. iterator begin () const
  396. {
  397. cs_guard g;
  398. auto qdp = this->_Data ();
  399. return (iterator (qdp, qdp->_Rdidx ()));
  400. }
  401. iterator end () const
  402. {
  403. return (iterator (nullptr, 0));
  404. }
  405. const_iterator cbegin () const
  406. {
  407. return (this->begin ());
  408. }
  409. const_iterator cend () const
  410. {
  411. return (this->end ());
  412. }
  413. void _Call_cb (detail::q_data *nq, uintptr_t xv,
  414. void (*f) (std::atomic<detail::q_data *>&,
  415. detail::q_data *, detail::q_data *, uintptr_t))
  416. {
  417. while (true)
  418. {
  419. auto qdp = this->_Data ();
  420. size_t ix = qdp->_Rdidx ();
  421. uintptr_t prev = xatomic_or (&qdp->ptrs[ix], val_traits::XBIT);
  422. if (prev == val_traits::DELT)
  423. continue;
  424. else if ((prev & val_traits::XBIT) == 0)
  425. {
  426. if (prev != val_traits::FREE)
  427. val_traits::destroy (prev);
  428. while (++ix < qdp->cap)
  429. {
  430. prev = xatomic_or (&qdp->ptrs[ix], val_traits::XBIT);
  431. if (prev != val_traits::FREE && prev != val_traits::DELT)
  432. val_traits::destroy (prev);
  433. }
  434. f (this->impl, qdp, nq, xv);
  435. return;
  436. }
  437. while (true)
  438. {
  439. if (qdp != this->_Data () ||
  440. (qdp->ptrs[ix] & val_traits::XBIT) == 0)
  441. break;
  442. xatomic_spin_nop ();
  443. }
  444. }
  445. }
  446. void _Assign (detail::q_data *nq)
  447. {
  448. this->_Call_cb (nq, 0, detail::q_replace_cb);
  449. }
  450. template <class T1, class T2>
  451. void assign (T1 first, T2 last)
  452. {
  453. cs_guard g;
  454. auto tmp = queue<T> (first, last);
  455. this->_Assign (tmp._Data ());
  456. tmp._Set_data (nullptr);
  457. }
  458. void assign (std::initializer_list<T> lst)
  459. {
  460. this->assign (lst.begin (), lst.end ());
  461. }
  462. queue<T>& operator= (const queue<T>& right)
  463. {
  464. if (this == &right)
  465. return (*this);
  466. this->assign (right.begin (), right.end ());
  467. return (*this);
  468. }
  469. bool operator== (const queue<T>& right) const
  470. {
  471. auto x1 = this->cbegin (), x2 = this->cend ();
  472. auto y1 = right.cbegin (), y2 = right.cend ();
  473. for (; x1 != x2 && y1 != y2; ++x1, ++y1)
  474. if (*x1 != *y1)
  475. return (false);
  476. return (x1 == x2 && y1 == y2);
  477. }
  478. bool operator!= (const queue<T>& right) const
  479. {
  480. return (!(*this == right));
  481. }
  482. bool operator< (const queue<T>& right) const
  483. {
  484. auto x1 = this->cbegin (), x2 = this->cend ();
  485. auto y1 = right.cbegin (), y2 = right.cend ();
  486. for (; x1 != x2; ++x1, ++y1)
  487. {
  488. if (y1 == y2 || *y1 < *x1)
  489. return (false);
  490. else if (*x1 < *y1)
  491. return (true);
  492. }
  493. return (y1 != y2);
  494. }
  495. bool operator> (const queue<T>& right) const
  496. {
  497. return (right < *this);
  498. }
  499. bool operator<= (const queue<T>& right) const
  500. {
  501. return (!(right < *this));
  502. }
  503. bool operator>= (const queue<T>& right) const
  504. {
  505. return (!(*this < right));
  506. }
  507. queue<T>& operator= (queue<T>&& right)
  508. {
  509. auto prev = this->impl.exchange (right._Data (),
  510. std::memory_order_acq_rel);
  511. finalize (prev);
  512. right._Set_data (nullptr);
  513. return (*this);
  514. }
  515. static void _Destroy (detail::q_data *qdp)
  516. {
  517. for (size_t i = qdp->_Rdidx (); i < qdp->cap; ++i)
  518. {
  519. uintptr_t val = qdp->ptrs[i] & ~val_traits::XBIT;
  520. if (val != val_traits::FREE && val != val_traits::DELT)
  521. val_traits::free (val);
  522. }
  523. qdp->safe_destroy ();
  524. }
  525. void clear ()
  526. {
  527. cs_guard g;
  528. this->_Call_cb (nullptr, val_traits::FREE, detail::q_clear_cb);
  529. }
  530. size_t _Lock ()
  531. {
  532. while (true)
  533. {
  534. auto qdp = this->_Data ();
  535. size_t ix = qdp->_Rdidx ();
  536. uintptr_t prev = xatomic_or (&qdp->ptrs[ix], val_traits::XBIT);
  537. if ((prev & val_traits::XBIT) == 0)
  538. return (qdp->wr_idx.exchange (qdp->cap,
  539. std::memory_order_acq_rel));
  540. while (qdp == this->_Data () &&
  541. (qdp->ptrs[ix] & val_traits::XBIT) != 0)
  542. xatomic_spin_nop ();
  543. }
  544. }
  545. void swap (queue<T>& right)
  546. {
  547. if (this == &right)
  548. return;
  549. size_t s1 = this->_Lock ();
  550. size_t s2 = right._Lock ();
  551. auto tmp = this->_Data ();
  552. this->_Set_data (right._Data ());
  553. right._Set_data (tmp);
  554. auto d1 = this->_Data();
  555. auto d2 = right._Data();
  556. d1->wr_idx.store (s2, std::memory_order_relaxed);
  557. d2->wr_idx.store (s1, std::memory_order_relaxed);
  558. d1->ptrs[d1->_Rdidx ()] &= ~val_traits::XBIT;
  559. d2->ptrs[d2->_Rdidx ()] &= ~val_traits::XBIT;
  560. }
  561. ~queue ()
  562. {
  563. auto qdp = this->_Data ();
  564. if (!qdp)
  565. return;
  566. _Destroy (qdp);
  567. this->_Set_data (nullptr);
  568. }
  569. };
  570. } // namespace xrcu
  571. #endif