ptr_ring.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. /*
  2. * Definitions for the 'struct ptr_ring' datastructure.
  3. *
  4. * Author:
  5. * Michael S. Tsirkin <mst@redhat.com>
  6. *
  7. * Copyright (C) 2016 Red Hat, Inc.
  8. *
  9. * This program is free software; you can redistribute it and/or modify it
  10. * under the terms of the GNU General Public License as published by the
  11. * Free Software Foundation; either version 2 of the License, or (at your
  12. * option) any later version.
  13. *
  14. * This is a limited-size FIFO maintaining pointers in FIFO order, with
  15. * one CPU producing entries and another consuming entries from a FIFO.
  16. *
  17. * This implementation tries to minimize cache-contention when there is a
  18. * single producer and a single consumer CPU.
  19. */
  20. #ifndef _LINUX_PTR_RING_H
  21. #define _LINUX_PTR_RING_H 1
  22. #ifdef __KERNEL__
  23. #include <linux/spinlock.h>
  24. #include <linux/cache.h>
  25. #include <linux/types.h>
  26. #include <linux/compiler.h>
  27. #include <linux/cache.h>
  28. #include <linux/slab.h>
  29. #include <asm/errno.h>
  30. #endif
  31. struct ptr_ring {
  32. int producer ____cacheline_aligned_in_smp;
  33. spinlock_t producer_lock;
  34. int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  35. int consumer_tail; /* next entry to invalidate */
  36. spinlock_t consumer_lock;
  37. /* Shared consumer/producer data */
  38. /* Read-only by both the producer and the consumer */
  39. int size ____cacheline_aligned_in_smp; /* max entries in queue */
  40. int batch; /* number of entries to consume in a batch */
  41. void **queue;
  42. };
  43. /* Note: callers invoking this in a loop must use a compiler barrier,
  44. * for example cpu_relax().
  45. *
  46. * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  47. * see e.g. ptr_ring_full.
  48. */
  49. static inline bool __ptr_ring_full(struct ptr_ring *r)
  50. {
  51. return r->queue[r->producer];
  52. }
  53. static inline bool ptr_ring_full(struct ptr_ring *r)
  54. {
  55. bool ret;
  56. spin_lock(&r->producer_lock);
  57. ret = __ptr_ring_full(r);
  58. spin_unlock(&r->producer_lock);
  59. return ret;
  60. }
  61. static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  62. {
  63. bool ret;
  64. spin_lock_irq(&r->producer_lock);
  65. ret = __ptr_ring_full(r);
  66. spin_unlock_irq(&r->producer_lock);
  67. return ret;
  68. }
  69. static inline bool ptr_ring_full_any(struct ptr_ring *r)
  70. {
  71. unsigned long flags;
  72. bool ret;
  73. spin_lock_irqsave(&r->producer_lock, flags);
  74. ret = __ptr_ring_full(r);
  75. spin_unlock_irqrestore(&r->producer_lock, flags);
  76. return ret;
  77. }
  78. static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  79. {
  80. bool ret;
  81. spin_lock_bh(&r->producer_lock);
  82. ret = __ptr_ring_full(r);
  83. spin_unlock_bh(&r->producer_lock);
  84. return ret;
  85. }
  86. /* Note: callers invoking this in a loop must use a compiler barrier,
  87. * for example cpu_relax(). Callers must hold producer_lock.
  88. * Callers are responsible for making sure pointer that is being queued
  89. * points to a valid data.
  90. */
  91. static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
  92. {
  93. if (unlikely(!r->size) || r->queue[r->producer])
  94. return -ENOSPC;
  95. /* Make sure the pointer we are storing points to a valid data. */
  96. /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
  97. smp_wmb();
  98. WRITE_ONCE(r->queue[r->producer++], ptr);
  99. if (unlikely(r->producer >= r->size))
  100. r->producer = 0;
  101. return 0;
  102. }
  103. /*
  104. * Note: resize (below) nests producer lock within consumer lock, so if you
  105. * consume in interrupt or BH context, you must disable interrupts/BH when
  106. * calling this.
  107. */
  108. static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
  109. {
  110. int ret;
  111. spin_lock(&r->producer_lock);
  112. ret = __ptr_ring_produce(r, ptr);
  113. spin_unlock(&r->producer_lock);
  114. return ret;
  115. }
  116. static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
  117. {
  118. int ret;
  119. spin_lock_irq(&r->producer_lock);
  120. ret = __ptr_ring_produce(r, ptr);
  121. spin_unlock_irq(&r->producer_lock);
  122. return ret;
  123. }
  124. static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
  125. {
  126. unsigned long flags;
  127. int ret;
  128. spin_lock_irqsave(&r->producer_lock, flags);
  129. ret = __ptr_ring_produce(r, ptr);
  130. spin_unlock_irqrestore(&r->producer_lock, flags);
  131. return ret;
  132. }
  133. static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
  134. {
  135. int ret;
  136. spin_lock_bh(&r->producer_lock);
  137. ret = __ptr_ring_produce(r, ptr);
  138. spin_unlock_bh(&r->producer_lock);
  139. return ret;
  140. }
  141. static inline void *__ptr_ring_peek(struct ptr_ring *r)
  142. {
  143. if (likely(r->size))
  144. return READ_ONCE(r->queue[r->consumer_head]);
  145. return NULL;
  146. }
  147. /*
  148. * Test ring empty status without taking any locks.
  149. *
  150. * NB: This is only safe to call if ring is never resized.
  151. *
  152. * However, if some other CPU consumes ring entries at the same time, the value
  153. * returned is not guaranteed to be correct.
  154. *
  155. * In this case - to avoid incorrectly detecting the ring
  156. * as empty - the CPU consuming the ring entries is responsible
  157. * for either consuming all ring entries until the ring is empty,
  158. * or synchronizing with some other CPU and causing it to
  159. * re-test __ptr_ring_empty and/or consume the ring enteries
  160. * after the synchronization point.
  161. *
  162. * Note: callers invoking this in a loop must use a compiler barrier,
  163. * for example cpu_relax().
  164. */
  165. static inline bool __ptr_ring_empty(struct ptr_ring *r)
  166. {
  167. if (likely(r->size))
  168. return !r->queue[READ_ONCE(r->consumer_head)];
  169. return true;
  170. }
  171. static inline bool ptr_ring_empty(struct ptr_ring *r)
  172. {
  173. bool ret;
  174. spin_lock(&r->consumer_lock);
  175. ret = __ptr_ring_empty(r);
  176. spin_unlock(&r->consumer_lock);
  177. return ret;
  178. }
  179. static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
  180. {
  181. bool ret;
  182. spin_lock_irq(&r->consumer_lock);
  183. ret = __ptr_ring_empty(r);
  184. spin_unlock_irq(&r->consumer_lock);
  185. return ret;
  186. }
  187. static inline bool ptr_ring_empty_any(struct ptr_ring *r)
  188. {
  189. unsigned long flags;
  190. bool ret;
  191. spin_lock_irqsave(&r->consumer_lock, flags);
  192. ret = __ptr_ring_empty(r);
  193. spin_unlock_irqrestore(&r->consumer_lock, flags);
  194. return ret;
  195. }
  196. static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
  197. {
  198. bool ret;
  199. spin_lock_bh(&r->consumer_lock);
  200. ret = __ptr_ring_empty(r);
  201. spin_unlock_bh(&r->consumer_lock);
  202. return ret;
  203. }
  204. /* Must only be called after __ptr_ring_peek returned !NULL */
  205. static inline void __ptr_ring_discard_one(struct ptr_ring *r)
  206. {
  207. /* Fundamentally, what we want to do is update consumer
  208. * index and zero out the entry so producer can reuse it.
  209. * Doing it naively at each consume would be as simple as:
  210. * consumer = r->consumer;
  211. * r->queue[consumer++] = NULL;
  212. * if (unlikely(consumer >= r->size))
  213. * consumer = 0;
  214. * r->consumer = consumer;
  215. * but that is suboptimal when the ring is full as producer is writing
  216. * out new entries in the same cache line. Defer these updates until a
  217. * batch of entries has been consumed.
  218. */
  219. /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
  220. * to work correctly.
  221. */
  222. int consumer_head = r->consumer_head;
  223. int head = consumer_head++;
  224. /* Once we have processed enough entries invalidate them in
  225. * the ring all at once so producer can reuse their space in the ring.
  226. * We also do this when we reach end of the ring - not mandatory
  227. * but helps keep the implementation simple.
  228. */
  229. if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
  230. consumer_head >= r->size)) {
  231. /* Zero out entries in the reverse order: this way we touch the
  232. * cache line that producer might currently be reading the last;
  233. * producer won't make progress and touch other cache lines
  234. * besides the first one until we write out all entries.
  235. */
  236. while (likely(head >= r->consumer_tail))
  237. r->queue[head--] = NULL;
  238. r->consumer_tail = consumer_head;
  239. }
  240. if (unlikely(consumer_head >= r->size)) {
  241. consumer_head = 0;
  242. r->consumer_tail = 0;
  243. }
  244. /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
  245. WRITE_ONCE(r->consumer_head, consumer_head);
  246. }
  247. static inline void *__ptr_ring_consume(struct ptr_ring *r)
  248. {
  249. void *ptr;
  250. /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
  251. * accessing data through the pointer is up to date. Pairs
  252. * with smp_wmb in __ptr_ring_produce.
  253. */
  254. ptr = __ptr_ring_peek(r);
  255. if (ptr)
  256. __ptr_ring_discard_one(r);
  257. return ptr;
  258. }
  259. static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
  260. void **array, int n)
  261. {
  262. void *ptr;
  263. int i;
  264. for (i = 0; i < n; i++) {
  265. ptr = __ptr_ring_consume(r);
  266. if (!ptr)
  267. break;
  268. array[i] = ptr;
  269. }
  270. return i;
  271. }
  272. /*
  273. * Note: resize (below) nests producer lock within consumer lock, so if you
  274. * call this in interrupt or BH context, you must disable interrupts/BH when
  275. * producing.
  276. */
  277. static inline void *ptr_ring_consume(struct ptr_ring *r)
  278. {
  279. void *ptr;
  280. spin_lock(&r->consumer_lock);
  281. ptr = __ptr_ring_consume(r);
  282. spin_unlock(&r->consumer_lock);
  283. return ptr;
  284. }
  285. static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
  286. {
  287. void *ptr;
  288. spin_lock_irq(&r->consumer_lock);
  289. ptr = __ptr_ring_consume(r);
  290. spin_unlock_irq(&r->consumer_lock);
  291. return ptr;
  292. }
  293. static inline void *ptr_ring_consume_any(struct ptr_ring *r)
  294. {
  295. unsigned long flags;
  296. void *ptr;
  297. spin_lock_irqsave(&r->consumer_lock, flags);
  298. ptr = __ptr_ring_consume(r);
  299. spin_unlock_irqrestore(&r->consumer_lock, flags);
  300. return ptr;
  301. }
  302. static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
  303. {
  304. void *ptr;
  305. spin_lock_bh(&r->consumer_lock);
  306. ptr = __ptr_ring_consume(r);
  307. spin_unlock_bh(&r->consumer_lock);
  308. return ptr;
  309. }
  310. static inline int ptr_ring_consume_batched(struct ptr_ring *r,
  311. void **array, int n)
  312. {
  313. int ret;
  314. spin_lock(&r->consumer_lock);
  315. ret = __ptr_ring_consume_batched(r, array, n);
  316. spin_unlock(&r->consumer_lock);
  317. return ret;
  318. }
  319. static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
  320. void **array, int n)
  321. {
  322. int ret;
  323. spin_lock_irq(&r->consumer_lock);
  324. ret = __ptr_ring_consume_batched(r, array, n);
  325. spin_unlock_irq(&r->consumer_lock);
  326. return ret;
  327. }
  328. static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
  329. void **array, int n)
  330. {
  331. unsigned long flags;
  332. int ret;
  333. spin_lock_irqsave(&r->consumer_lock, flags);
  334. ret = __ptr_ring_consume_batched(r, array, n);
  335. spin_unlock_irqrestore(&r->consumer_lock, flags);
  336. return ret;
  337. }
  338. static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
  339. void **array, int n)
  340. {
  341. int ret;
  342. spin_lock_bh(&r->consumer_lock);
  343. ret = __ptr_ring_consume_batched(r, array, n);
  344. spin_unlock_bh(&r->consumer_lock);
  345. return ret;
  346. }
  347. /* Cast to structure type and call a function without discarding from FIFO.
  348. * Function must return a value.
  349. * Callers must take consumer_lock.
  350. */
  351. #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
  352. #define PTR_RING_PEEK_CALL(r, f) ({ \
  353. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  354. \
  355. spin_lock(&(r)->consumer_lock); \
  356. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  357. spin_unlock(&(r)->consumer_lock); \
  358. __PTR_RING_PEEK_CALL_v; \
  359. })
  360. #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
  361. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  362. \
  363. spin_lock_irq(&(r)->consumer_lock); \
  364. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  365. spin_unlock_irq(&(r)->consumer_lock); \
  366. __PTR_RING_PEEK_CALL_v; \
  367. })
  368. #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
  369. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  370. \
  371. spin_lock_bh(&(r)->consumer_lock); \
  372. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  373. spin_unlock_bh(&(r)->consumer_lock); \
  374. __PTR_RING_PEEK_CALL_v; \
  375. })
  376. #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
  377. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  378. unsigned long __PTR_RING_PEEK_CALL_f;\
  379. \
  380. spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  381. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  382. spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  383. __PTR_RING_PEEK_CALL_v; \
  384. })
  385. /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
  386. * documentation for vmalloc for which of them are legal.
  387. */
  388. static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
  389. {
  390. if (size > KMALLOC_MAX_SIZE / sizeof(void *))
  391. return NULL;
  392. return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
  393. }
  394. static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
  395. {
  396. r->size = size;
  397. r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
  398. /* We need to set batch at least to 1 to make logic
  399. * in __ptr_ring_discard_one work correctly.
  400. * Batching too much (because ring is small) would cause a lot of
  401. * burstiness. Needs tuning, for now disable batching.
  402. */
  403. if (r->batch > r->size / 2 || !r->batch)
  404. r->batch = 1;
  405. }
  406. static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
  407. {
  408. r->queue = __ptr_ring_init_queue_alloc(size, gfp);
  409. if (!r->queue)
  410. return -ENOMEM;
  411. __ptr_ring_set_size(r, size);
  412. r->producer = r->consumer_head = r->consumer_tail = 0;
  413. spin_lock_init(&r->producer_lock);
  414. spin_lock_init(&r->consumer_lock);
  415. return 0;
  416. }
  417. /*
  418. * Return entries into ring. Destroy entries that don't fit.
  419. *
  420. * Note: this is expected to be a rare slow path operation.
  421. *
  422. * Note: producer lock is nested within consumer lock, so if you
  423. * resize you must make sure all uses nest correctly.
  424. * In particular if you consume ring in interrupt or BH context, you must
  425. * disable interrupts/BH when doing so.
  426. */
  427. static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
  428. void (*destroy)(void *))
  429. {
  430. unsigned long flags;
  431. int head;
  432. spin_lock_irqsave(&r->consumer_lock, flags);
  433. spin_lock(&r->producer_lock);
  434. if (!r->size)
  435. goto done;
  436. /*
  437. * Clean out buffered entries (for simplicity). This way following code
  438. * can test entries for NULL and if not assume they are valid.
  439. */
  440. head = r->consumer_head - 1;
  441. while (likely(head >= r->consumer_tail))
  442. r->queue[head--] = NULL;
  443. r->consumer_tail = r->consumer_head;
  444. /*
  445. * Go over entries in batch, start moving head back and copy entries.
  446. * Stop when we run into previously unconsumed entries.
  447. */
  448. while (n) {
  449. head = r->consumer_head - 1;
  450. if (head < 0)
  451. head = r->size - 1;
  452. if (r->queue[head]) {
  453. /* This batch entry will have to be destroyed. */
  454. goto done;
  455. }
  456. r->queue[head] = batch[--n];
  457. r->consumer_tail = head;
  458. /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
  459. WRITE_ONCE(r->consumer_head, head);
  460. }
  461. done:
  462. /* Destroy all entries left in the batch. */
  463. while (n)
  464. destroy(batch[--n]);
  465. spin_unlock(&r->producer_lock);
  466. spin_unlock_irqrestore(&r->consumer_lock, flags);
  467. }
  468. static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
  469. int size, gfp_t gfp,
  470. void (*destroy)(void *))
  471. {
  472. int producer = 0;
  473. void **old;
  474. void *ptr;
  475. while ((ptr = __ptr_ring_consume(r)))
  476. if (producer < size)
  477. queue[producer++] = ptr;
  478. else if (destroy)
  479. destroy(ptr);
  480. if (producer >= size)
  481. producer = 0;
  482. __ptr_ring_set_size(r, size);
  483. r->producer = producer;
  484. r->consumer_head = 0;
  485. r->consumer_tail = 0;
  486. old = r->queue;
  487. r->queue = queue;
  488. return old;
  489. }
  490. /*
  491. * Note: producer lock is nested within consumer lock, so if you
  492. * resize you must make sure all uses nest correctly.
  493. * In particular if you consume ring in interrupt or BH context, you must
  494. * disable interrupts/BH when doing so.
  495. */
  496. static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
  497. void (*destroy)(void *))
  498. {
  499. unsigned long flags;
  500. void **queue = __ptr_ring_init_queue_alloc(size, gfp);
  501. void **old;
  502. if (!queue)
  503. return -ENOMEM;
  504. spin_lock_irqsave(&(r)->consumer_lock, flags);
  505. spin_lock(&(r)->producer_lock);
  506. old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
  507. spin_unlock(&(r)->producer_lock);
  508. spin_unlock_irqrestore(&(r)->consumer_lock, flags);
  509. kvfree(old);
  510. return 0;
  511. }
  512. /*
  513. * Note: producer lock is nested within consumer lock, so if you
  514. * resize you must make sure all uses nest correctly.
  515. * In particular if you consume ring in interrupt or BH context, you must
  516. * disable interrupts/BH when doing so.
  517. */
  518. static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
  519. unsigned int nrings,
  520. int size,
  521. gfp_t gfp, void (*destroy)(void *))
  522. {
  523. unsigned long flags;
  524. void ***queues;
  525. int i;
  526. queues = kmalloc_array(nrings, sizeof(*queues), gfp);
  527. if (!queues)
  528. goto noqueues;
  529. for (i = 0; i < nrings; ++i) {
  530. queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
  531. if (!queues[i])
  532. goto nomem;
  533. }
  534. for (i = 0; i < nrings; ++i) {
  535. spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
  536. spin_lock(&(rings[i])->producer_lock);
  537. queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
  538. size, gfp, destroy);
  539. spin_unlock(&(rings[i])->producer_lock);
  540. spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
  541. }
  542. for (i = 0; i < nrings; ++i)
  543. kvfree(queues[i]);
  544. kfree(queues);
  545. return 0;
  546. nomem:
  547. while (--i >= 0)
  548. kvfree(queues[i]);
  549. kfree(queues);
  550. noqueues:
  551. return -ENOMEM;
  552. }
  553. static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
  554. {
  555. void *ptr;
  556. if (destroy)
  557. while ((ptr = ptr_ring_consume(r)))
  558. destroy(ptr);
  559. kvfree(r->queue);
  560. }
  561. #endif /* _LINUX_PTR_RING_H */