sockrw.c 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // SPDX-License-Identifier: GPL-2.0 or GPL-3.0
  2. // Copyright © 2018-2019 Ariadne Devos
  3. /* sHT -- IO on stream sockets */
  4. #include "fd.h"
  5. #include <sHT/bugs.h>
  6. #include <sHT/compiler.h>
  7. #include <sHT/nospec.h>
  8. #include <sHT/stream.h>
  9. #include <sHT/test.h>
  10. #include "worker.h"
  11. #include <errno.h>
  12. #include <stddef.h>
  13. #include <stdint.h>
  14. #include <sys/epoll.h>
  15. #include <sys/socket.h>
  16. /* These are sorted in order of expected prevalence */
  17. enum sHT_send_err_type {
  18. sHT_SEND_BLOCKING,
  19. sHT_SEND_GRACEFUL_RESET,
  20. sHT_SEND_BLUNT_RESET,
  21. sHT_SEND_GRACEFUL_CLOSE,
  22. /* like in EINTR */
  23. sHT_SEND_INTERRUPTED,
  24. /* e.g. timeout, connection reset */
  25. sHT_SEND_TRANSIENT,
  26. sHT_SEND_KERNEL_OOM,
  27. /* Anything we didn't expect.
  28. (We expect malicious clients,
  29. but no sHT bugs.)
  30. This should be logged as a warning. */
  31. sHT_SEND_OTHER,
  32. };
  33. /* TODO: due to Spectre mitigations interfering with optimisation,
  34. inline into @var{sHT_socker_sendrecv_errno}. */
  35. __attribute__((const))
  36. static enum sHT_send_err_type
  37. sHT_classify_sentrecv_tcp(int err)
  38. {
  39. switch (err) {
  40. #if EWOULDBLOCK != EAGAIN
  41. case EWOULDBLOCK: /* fallthrough */
  42. #endif
  43. case EAGAIN:
  44. return sHT_SEND_BLOCKING;
  45. case EINTR:
  46. return sHT_SEND_INTERRUPTED;
  47. case ECONNRESET:
  48. return sHT_SEND_GRACEFUL_RESET;
  49. case EPIPE:
  50. return sHT_SEND_GRACEFUL_CLOSE;
  51. case ETIMEDOUT: /* fallthrough */
  52. case EHOSTUNREACH:
  53. return sHT_SEND_BLUNT_RESET;
  54. case ENOBUFS:
  55. /* no busy loops? */
  56. return sHT_SEND_TRANSIENT;
  57. case ENOMEM:
  58. return sHT_SEND_KERNEL_OOM;
  59. default:
  60. return sHT_SEND_OTHER;
  61. }
  62. }
  63. /* True if it should be retried directly, false otherwise. */
  64. __attribute__((nonnull (1, 2)))
  65. static _Bool
  66. sHT_socket_sendrecv_errno(struct sHT_worker *worker, struct sHT_task_stream *task, int err, uint32_t epollflags)
  67. {
  68. /* XXX use err, not errno */
  69. switch (sHT_classify_sentrecv_tcp(errno)) {
  70. case sHT_SEND_BLOCKING:
  71. task->task.epollflags &= ~epollflags;
  72. return 0;
  73. case sHT_SEND_GRACEFUL_CLOSE:
  74. task->stream.flags |= sHT_STREAM_WRITE_EOF;
  75. task->task.epollflags &= ~epollflags;
  76. return 0;
  77. case sHT_SEND_GRACEFUL_RESET:
  78. task->stream.flags |= sHT_STREAM_WRITE_EOF | sHT_STREAM_READ_EOF | sHT_STREAM_RESET_GRACEFUL;
  79. task->task.epollflags &= ~(EPOLLIN | EPOLLOUT);
  80. return 0;
  81. case sHT_SEND_BLUNT_RESET:
  82. task->stream.flags |= sHT_STREAM_WRITE_EOF | sHT_STREAM_READ_EOF | sHT_STREAM_RESET_BLUNT;
  83. task->task.epollflags &= ~(EPOLLIN | EPOLLOUT);
  84. return 0;
  85. case sHT_SEND_INTERRUPTED:
  86. return 1;
  87. case sHT_SEND_TRANSIENT:
  88. /* TODO: may be a good idea to log these too,
  89. as an informational message */
  90. task->task.flags |= sHT_TASK_SCHEDULE;
  91. return 0;
  92. case sHT_SEND_KERNEL_OOM:
  93. /* No, I don't like overcommiting.
  94. Killing is better than hanging, though. */
  95. worker->flags |= sHT_WORKER_OOM;
  96. task->task.flags |= sHT_TASK_SCHEDULE;
  97. return 0;
  98. case sHT_SEND_OTHER:
  99. sHT_todo("didn't recognise TCP error");
  100. default:
  101. sHT_assert(0);
  102. }
  103. }
  104. void
  105. sHT_socket_sendsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task)
  106. {
  107. const unsigned char *buf = task->stream.to_write.first;
  108. size_t start = task->stream.to_write.offset;
  109. size_t end = (task->stream.to_write.offset + task->stream.to_write.length) % sHT_PAPER_SIZE;
  110. /* TODO: do this branchless (feasible on x86, <sHT/minmax.h>) */
  111. if (sHT_gt(start, end))
  112. end = sHT_PAPER_SIZE;
  113. end = sHT_index_nospec(end, sHT_PAPER_SIZE - start);
  114. do {
  115. /* XXX: speculatively negative sizes? */
  116. ssize_t sent = send(task->stream.fd, buf + start, end - start, MSG_DONTWAIT | MSG_NOSIGNAL);
  117. if (sHT_lt0(sent))
  118. continue;
  119. /* some data is on the kernel queue, or the NIC ... */
  120. sHT_assert(sent <= task->stream.to_write.length);
  121. task->stream.to_write.offset = (task->stream.to_write.offset + sent) % sHT_PAPER_SIZE;
  122. task->stream.to_write.length -= sent;
  123. return;
  124. /* TODO intrusive Spectre mitigations*/
  125. } while (sHT_unlikely(sHT_socket_sendrecv_errno(worker, task, errno, EPOLLOUT)));
  126. }
  127. void
  128. sHT_socket_readsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task)
  129. {
  130. unsigned char *buf = task->stream.has_read.first;
  131. size_t start = (task->stream.has_read.offset + task->stream.has_read.length) % sHT_PAPER_SIZE;
  132. size_t end = task->stream.has_read.offset;
  133. if (sHT_gt(start, end))
  134. end = sHT_PAPER_SIZE;
  135. /* XXX: this doesn't seem correct */
  136. end %= sHT_PAPER_SIZE - start;
  137. /* XXX: speculatively negative sizes? */
  138. do {
  139. ssize_t received;
  140. received = recv(task->stream.fd, buf + start, end - start, MSG_DONTWAIT);
  141. if (sHT_lt0(received))
  142. continue;
  143. sHT_assert(received <= sHT_PAPER_SIZE - task->stream.has_read.length);
  144. task->stream.has_read.length += received;
  145. return;
  146. /* TODO intrusive Spectre mitigations*/
  147. } while (sHT_unlikely(sHT_socket_sendrecv_errno(worker, task, errno, EPOLLOUT)));
  148. }