smc_rx.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Shared Memory Communications over RDMA (SMC-R) and RoCE
  4. *
  5. * Manage RMBE
  6. * copy new RMBE data into user space
  7. *
  8. * Copyright IBM Corp. 2016
  9. *
  10. * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
  11. */
  12. #include <linux/net.h>
  13. #include <linux/rcupdate.h>
  14. #include <linux/sched/signal.h>
  15. #include <net/sock.h>
  16. #include "smc.h"
  17. #include "smc_core.h"
  18. #include "smc_cdc.h"
  19. #include "smc_tx.h" /* smc_tx_consumer_update() */
  20. #include "smc_rx.h"
  21. /* callback implementation to wakeup consumers blocked with smc_rx_wait().
  22. * indirectly called by smc_cdc_msg_recv_action().
  23. */
  24. static void smc_rx_wake_up(struct sock *sk)
  25. {
  26. struct socket_wq *wq;
  27. /* derived from sock_def_readable() */
  28. /* called already in smc_listen_work() */
  29. rcu_read_lock();
  30. wq = rcu_dereference(sk->sk_wq);
  31. if (skwq_has_sleeper(wq))
  32. wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
  33. EPOLLRDNORM | EPOLLRDBAND);
  34. sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
  35. if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
  36. (sk->sk_state == SMC_CLOSED))
  37. sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
  38. rcu_read_unlock();
  39. }
  40. /* Update consumer cursor
  41. * @conn connection to update
  42. * @cons consumer cursor
  43. * @len number of Bytes consumed
  44. * Returns:
  45. * 1 if we should end our receive, 0 otherwise
  46. */
  47. static int smc_rx_update_consumer(struct smc_sock *smc,
  48. union smc_host_cursor cons, size_t len)
  49. {
  50. struct smc_connection *conn = &smc->conn;
  51. struct sock *sk = &smc->sk;
  52. bool force = false;
  53. int diff, rc = 0;
  54. smc_curs_add(conn->rmb_desc->len, &cons, len);
  55. /* did we process urgent data? */
  56. if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
  57. diff = smc_curs_comp(conn->rmb_desc->len, &cons,
  58. &conn->urg_curs);
  59. if (sock_flag(sk, SOCK_URGINLINE)) {
  60. if (diff == 0) {
  61. force = true;
  62. rc = 1;
  63. conn->urg_state = SMC_URG_READ;
  64. }
  65. } else {
  66. if (diff == 1) {
  67. /* skip urgent byte */
  68. force = true;
  69. smc_curs_add(conn->rmb_desc->len, &cons, 1);
  70. conn->urg_rx_skip_pend = false;
  71. } else if (diff < -1)
  72. /* we read past urgent byte */
  73. conn->urg_state = SMC_URG_READ;
  74. }
  75. }
  76. smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn);
  77. /* send consumer cursor update if required */
  78. /* similar to advertising new TCP rcv_wnd if required */
  79. smc_tx_consumer_update(conn, force);
  80. return rc;
  81. }
  82. static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
  83. {
  84. struct smc_connection *conn = &smc->conn;
  85. union smc_host_cursor cons;
  86. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  87. smc_rx_update_consumer(smc, cons, len);
  88. }
  89. struct smc_spd_priv {
  90. struct smc_sock *smc;
  91. size_t len;
  92. };
  93. static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
  94. struct pipe_buffer *buf)
  95. {
  96. struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
  97. struct smc_sock *smc = priv->smc;
  98. struct smc_connection *conn;
  99. struct sock *sk = &smc->sk;
  100. if (sk->sk_state == SMC_CLOSED ||
  101. sk->sk_state == SMC_PEERFINCLOSEWAIT ||
  102. sk->sk_state == SMC_APPFINCLOSEWAIT)
  103. goto out;
  104. conn = &smc->conn;
  105. lock_sock(sk);
  106. smc_rx_update_cons(smc, priv->len);
  107. release_sock(sk);
  108. if (atomic_sub_and_test(priv->len, &conn->splice_pending))
  109. smc_rx_wake_up(sk);
  110. out:
  111. kfree(priv);
  112. put_page(buf->page);
  113. sock_put(sk);
  114. }
  115. static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe,
  116. struct pipe_buffer *buf)
  117. {
  118. return 1;
  119. }
  120. static const struct pipe_buf_operations smc_pipe_ops = {
  121. .can_merge = 0,
  122. .confirm = generic_pipe_buf_confirm,
  123. .release = smc_rx_pipe_buf_release,
  124. .steal = smc_rx_pipe_buf_nosteal,
  125. .get = generic_pipe_buf_get
  126. };
  127. static void smc_rx_spd_release(struct splice_pipe_desc *spd,
  128. unsigned int i)
  129. {
  130. put_page(spd->pages[i]);
  131. }
  132. static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
  133. struct smc_sock *smc)
  134. {
  135. struct splice_pipe_desc spd;
  136. struct partial_page partial;
  137. struct smc_spd_priv *priv;
  138. int bytes;
  139. priv = kzalloc(sizeof(*priv), GFP_KERNEL);
  140. if (!priv)
  141. return -ENOMEM;
  142. priv->len = len;
  143. priv->smc = smc;
  144. partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
  145. partial.len = len;
  146. partial.private = (unsigned long)priv;
  147. spd.nr_pages_max = 1;
  148. spd.nr_pages = 1;
  149. spd.pages = &smc->conn.rmb_desc->pages;
  150. spd.partial = &partial;
  151. spd.ops = &smc_pipe_ops;
  152. spd.spd_release = smc_rx_spd_release;
  153. bytes = splice_to_pipe(pipe, &spd);
  154. if (bytes > 0) {
  155. sock_hold(&smc->sk);
  156. get_page(smc->conn.rmb_desc->pages);
  157. atomic_add(bytes, &smc->conn.splice_pending);
  158. }
  159. return bytes;
  160. }
  161. static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
  162. {
  163. return atomic_read(&conn->bytes_to_rcv) &&
  164. !atomic_read(&conn->splice_pending);
  165. }
  166. /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  167. * @smc smc socket
  168. * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout
  169. * @fcrit add'l criterion to evaluate as function pointer
  170. * Returns:
  171. * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
  172. * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
  173. */
  174. int smc_rx_wait(struct smc_sock *smc, long *timeo,
  175. int (*fcrit)(struct smc_connection *conn))
  176. {
  177. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  178. struct smc_connection *conn = &smc->conn;
  179. struct sock *sk = &smc->sk;
  180. int rc;
  181. if (fcrit(conn))
  182. return 1;
  183. sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
  184. add_wait_queue(sk_sleep(sk), &wait);
  185. rc = sk_wait_event(sk, timeo,
  186. sk->sk_err ||
  187. sk->sk_shutdown & RCV_SHUTDOWN ||
  188. fcrit(conn),
  189. &wait);
  190. remove_wait_queue(sk_sleep(sk), &wait);
  191. sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
  192. return rc;
  193. }
  194. static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
  195. int flags)
  196. {
  197. struct smc_connection *conn = &smc->conn;
  198. union smc_host_cursor cons;
  199. struct sock *sk = &smc->sk;
  200. int rc = 0;
  201. if (sock_flag(sk, SOCK_URGINLINE) ||
  202. !(conn->urg_state == SMC_URG_VALID) ||
  203. conn->urg_state == SMC_URG_READ)
  204. return -EINVAL;
  205. if (conn->urg_state == SMC_URG_VALID) {
  206. if (!(flags & MSG_PEEK))
  207. smc->conn.urg_state = SMC_URG_READ;
  208. msg->msg_flags |= MSG_OOB;
  209. if (len > 0) {
  210. if (!(flags & MSG_TRUNC))
  211. rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
  212. len = 1;
  213. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  214. if (smc_curs_diff(conn->rmb_desc->len, &cons,
  215. &conn->urg_curs) > 1)
  216. conn->urg_rx_skip_pend = true;
  217. /* Urgent Byte was already accounted for, but trigger
  218. * skipping the urgent byte in non-inline case
  219. */
  220. if (!(flags & MSG_PEEK))
  221. smc_rx_update_consumer(smc, cons, 0);
  222. } else {
  223. msg->msg_flags |= MSG_TRUNC;
  224. }
  225. return rc ? -EFAULT : len;
  226. }
  227. if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
  228. return 0;
  229. return -EAGAIN;
  230. }
  231. static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
  232. {
  233. struct smc_connection *conn = &smc->conn;
  234. if (smc_rx_data_available(conn))
  235. return true;
  236. else if (conn->urg_state == SMC_URG_VALID)
  237. /* we received a single urgent Byte - skip */
  238. smc_rx_update_cons(smc, 0);
  239. return false;
  240. }
  241. /* smc_rx_recvmsg - receive data from RMBE
  242. * @msg: copy data to receive buffer
  243. * @pipe: copy data to pipe if set - indicates splice() call
  244. *
  245. * rcvbuf consumer: main API called by socket layer.
  246. * Called under sk lock.
  247. */
  248. int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
  249. struct pipe_inode_info *pipe, size_t len, int flags)
  250. {
  251. size_t copylen, read_done = 0, read_remaining = len;
  252. size_t chunk_len, chunk_off, chunk_len_sum;
  253. struct smc_connection *conn = &smc->conn;
  254. int (*func)(struct smc_connection *conn);
  255. union smc_host_cursor cons;
  256. int readable, chunk;
  257. char *rcvbuf_base;
  258. struct sock *sk;
  259. int splbytes;
  260. long timeo;
  261. int target; /* Read at least these many bytes */
  262. int rc;
  263. if (unlikely(flags & MSG_ERRQUEUE))
  264. return -EINVAL; /* future work for sk.sk_family == AF_SMC */
  265. sk = &smc->sk;
  266. if (sk->sk_state == SMC_LISTEN)
  267. return -ENOTCONN;
  268. if (flags & MSG_OOB)
  269. return smc_rx_recv_urg(smc, msg, len, flags);
  270. timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
  271. target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
  272. /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
  273. rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
  274. do { /* while (read_remaining) */
  275. if (read_done >= target || (pipe && read_done))
  276. break;
  277. if (smc_rx_recvmsg_data_available(smc))
  278. goto copy;
  279. if (sk->sk_shutdown & RCV_SHUTDOWN ||
  280. conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) {
  281. /* smc_cdc_msg_recv_action() could have run after
  282. * above smc_rx_recvmsg_data_available()
  283. */
  284. if (smc_rx_recvmsg_data_available(smc))
  285. goto copy;
  286. break;
  287. }
  288. if (read_done) {
  289. if (sk->sk_err ||
  290. sk->sk_state == SMC_CLOSED ||
  291. !timeo ||
  292. signal_pending(current))
  293. break;
  294. } else {
  295. if (sk->sk_err) {
  296. read_done = sock_error(sk);
  297. break;
  298. }
  299. if (sk->sk_state == SMC_CLOSED) {
  300. if (!sock_flag(sk, SOCK_DONE)) {
  301. /* This occurs when user tries to read
  302. * from never connected socket.
  303. */
  304. read_done = -ENOTCONN;
  305. break;
  306. }
  307. break;
  308. }
  309. if (signal_pending(current)) {
  310. read_done = sock_intr_errno(timeo);
  311. break;
  312. }
  313. if (!timeo)
  314. return -EAGAIN;
  315. }
  316. if (!smc_rx_data_available(conn)) {
  317. smc_rx_wait(smc, &timeo, smc_rx_data_available);
  318. continue;
  319. }
  320. copy:
  321. /* initialize variables for 1st iteration of subsequent loop */
  322. /* could be just 1 byte, even after waiting on data above */
  323. readable = atomic_read(&conn->bytes_to_rcv);
  324. splbytes = atomic_read(&conn->splice_pending);
  325. if (!readable || (msg && splbytes)) {
  326. if (splbytes)
  327. func = smc_rx_data_available_and_no_splice_pend;
  328. else
  329. func = smc_rx_data_available;
  330. smc_rx_wait(smc, &timeo, func);
  331. continue;
  332. }
  333. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  334. /* subsequent splice() calls pick up where previous left */
  335. if (splbytes)
  336. smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
  337. if (conn->urg_state == SMC_URG_VALID &&
  338. sock_flag(&smc->sk, SOCK_URGINLINE) &&
  339. readable > 1)
  340. readable--; /* always stop at urgent Byte */
  341. /* not more than what user space asked for */
  342. copylen = min_t(size_t, read_remaining, readable);
  343. /* determine chunks where to read from rcvbuf */
  344. /* either unwrapped case, or 1st chunk of wrapped case */
  345. chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
  346. cons.count);
  347. chunk_len_sum = chunk_len;
  348. chunk_off = cons.count;
  349. smc_rmb_sync_sg_for_cpu(conn);
  350. for (chunk = 0; chunk < 2; chunk++) {
  351. if (!(flags & MSG_TRUNC)) {
  352. if (msg) {
  353. rc = memcpy_to_msg(msg, rcvbuf_base +
  354. chunk_off,
  355. chunk_len);
  356. } else {
  357. rc = smc_rx_splice(pipe, rcvbuf_base +
  358. chunk_off, chunk_len,
  359. smc);
  360. }
  361. if (rc < 0) {
  362. if (!read_done)
  363. read_done = -EFAULT;
  364. smc_rmb_sync_sg_for_device(conn);
  365. goto out;
  366. }
  367. }
  368. read_remaining -= chunk_len;
  369. read_done += chunk_len;
  370. if (chunk_len_sum == copylen)
  371. break; /* either on 1st or 2nd iteration */
  372. /* prepare next (== 2nd) iteration */
  373. chunk_len = copylen - chunk_len; /* remainder */
  374. chunk_len_sum += chunk_len;
  375. chunk_off = 0; /* modulo offset in recv ring buffer */
  376. }
  377. smc_rmb_sync_sg_for_device(conn);
  378. /* update cursors */
  379. if (!(flags & MSG_PEEK)) {
  380. /* increased in recv tasklet smc_cdc_msg_rcv() */
  381. smp_mb__before_atomic();
  382. atomic_sub(copylen, &conn->bytes_to_rcv);
  383. /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
  384. smp_mb__after_atomic();
  385. if (msg && smc_rx_update_consumer(smc, cons, copylen))
  386. goto out;
  387. }
  388. } while (read_remaining);
  389. out:
  390. return read_done;
  391. }
  392. /* Initialize receive properties on connection establishment. NB: not __init! */
  393. void smc_rx_init(struct smc_sock *smc)
  394. {
  395. smc->sk.sk_data_ready = smc_rx_wake_up;
  396. atomic_set(&smc->conn.splice_pending, 0);
  397. smc->conn.urg_state = SMC_URG_READ;
  398. }